Sfoglia il codice sorgente

优化全量人脉

xuzhiheng 9 mesi fa
parent
commit
cdec07cf8c
2 ha cambiato i file con 36 aggiunte e 82 eliminazioni
  1. 7 6
      data_project_wy_all/main.go
  2. 29 76
      data_project_wy_all/task.go

+ 7 - 6
data_project_wy_all/main.go

@@ -10,7 +10,7 @@ import (
 
 	"github.com/gogf/gf/v2/util/gconv"
 	"github.com/olivere/elastic/v7"
-	// "github.com/robfig/cron"
+	"github.com/robfig/cron"
 )
 
 func init() {
@@ -22,11 +22,12 @@ func init() {
 }
 
 func main() {
-	// c := cron.New()
-	//增量
-	// c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
-	// c.Start()
-	IncTransactionDataFromBidAndPro()
+	c := cron.New()
+	c.AddFunc("0 0 4 ? * *", IncTransactionDataFromBid) //增量bidding和项目数据
+	c.Start()
+	d := cron.New()
+	d.AddFunc("0 0 5 ? * *", IncTransactionDataFromPro) //增量bidding和项目数据
+	d.Start()
 	//历史
 	//HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
 	//HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息

+ 29 - 76
data_project_wy_all/task.go

@@ -55,22 +55,19 @@ func IncTransactionDataFromBidAndPro() {
 
 // IncTransactionDataFromBid 增量bidding
 func IncTransactionDataFromBid() {
-	// endTime := GetTime(-1) //前一天凌晨
-	// fmt.Println("开始执行增量采购意向、拟建信息", BidStartTime, endTime)
-	// if BidStartTime >= endTime {
-	// 	fmt.Println("增量bidding采购意向、拟建查询异常:", BidStartTime, endTime)
-	// 	return
-	// }
+	stime := time.Now().AddDate(0, 0, -1)
+	BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix()
+	fmt.Println("开始执行增量采购意向、拟建信息")
 	query := map[string]interface{}{
 		"pici": map[string]interface{}{
-			// "$gte": BidStartTime,
-			"$lt": 1729440000,
+			"$gte": BidStartTime,
+			"$lt":  BidStartTime + 86400,
 		},
 	}
 	fmt.Println("增量bidding采购意向query:", query)
 	sess := MgoB.GetMgoConn()
 	defer MgoB.DestoryMongoConn(sess)
-	ch := make(chan bool, 10)
+	ch := make(chan bool, 1)
 	wg := &sync.WaitGroup{}
 	// lock := &sync.Mutex{}
 	fields := map[string]interface{}{
@@ -139,16 +136,6 @@ func IncTransactionDataFromBid() {
 				fmt.Println("数据保存es失败,项目project_id", result["project_id"])
 			}
 			SaveDataToClickHouse(result)
-			// lock.Lock()
-			// if len(result) > 0 {
-			// 	arr = append(arr, result)
-			// 	count++
-			// }
-			// if len(arr) > 50 {
-			// 	MgoPro.SaveBulk("projectset_wy", arr...)
-			// 	arr = []map[string]interface{}{}
-			// }
-			// lock.Unlock()
 		}(tmp)
 		if n%1000 == 0 {
 			fmt.Println("current:", n)
@@ -156,12 +143,7 @@ func IncTransactionDataFromBid() {
 		tmp = map[string]interface{}{}
 	}
 	wg.Wait()
-	// if len(arr) > 0 {
-	// 	MgoPro.SaveBulk("projectset_wy", arr...)
-	// 	arr = []map[string]interface{}{}
-	// }
-	// fmt.Println("执行增量采购意向、拟建信息完毕", BidStartTime, endTime, count)
-	// BidStartTime = endTime //替换
+	fmt.Println("增量采购意向、拟建信息结束")
 }
 
 // DealTransactionForBid bidding采购意向、拟建数据处理
@@ -250,22 +232,19 @@ func DealTransactionForBid(tmp map[string]interface{}, business_type string, pro
 
 // IncTransactionDataFromProject 增量project
 func IncTransactionDataFromPro() {
-	// endTime := GetTime(-1) //前一天凌晨
-	// fmt.Println("开始执行增量项目信息", ProStartTime, endTime)
-	// if ProStartTime >= endTime {
-	// 	fmt.Println("增量项目信息查询异常:", ProStartTime, endTime)
-	// 	return
-	// }
+	stime := time.Now().AddDate(0, 0, -1)
+	BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix()
+	fmt.Println("开始执行增量项目信息")
 	query := map[string]interface{}{
 		"pici": map[string]interface{}{
-			// "$gte": ProStartTime,
-			"$lt": 1729440000,
+			"$gte": BidStartTime,
+			"$lt":  BidStartTime + 86400,
 		},
 	}
 	fmt.Println("增量项目查询query:", query)
 	sess := MgoPro.GetMgoConn()
 	defer MgoPro.DestoryMongoConn(sess)
-	ch := make(chan bool, 10)
+	ch := make(chan bool, 1)
 	wg := &sync.WaitGroup{}
 	// lock := &sync.Mutex{}
 	fields := map[string]interface{}{
@@ -295,10 +274,8 @@ func IncTransactionDataFromPro() {
 		"tag_topinformation":    1,
 		"tag_topinformation_ai": 1,
 	}
-	// arr := [][]map[string]interface{}{}
-	it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Sort("-_id").Iter()
+	it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
 	n := 0
-	// count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
 		ch <- true
 		wg.Add(1)
@@ -307,9 +284,6 @@ func IncTransactionDataFromPro() {
 				<-ch
 				wg.Done()
 			}()
-			// if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
-			// 	return
-			// }
 			bidstatus := gconv.String(tmp["bidstatus"])
 			if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "招标" {
 				result := DealTransactionForPro(tmp)
@@ -317,37 +291,21 @@ func IncTransactionDataFromPro() {
 				if !SaveDataToEs(result) { //保存、更新es
 					fmt.Println("数据保存es失败,项目project_id", result["project_id"])
 				}
-				err := SaveDataToClickHouse(result)
-				if err != nil {
-					fmt.Println("clickhouse保存失败", project_id, result)
+				count := FindClickHouseByProjectId(project_id) //查询
+				if count > 0 {                                 //更新
+					delete(result, "create_time") //不更新创建时间
+					delete(result, "project_id")  //不更新项目id(主键)
+					err := UpdateDataToClickHouse(result, map[string]interface{}{"project_id": project_id})
+					if err != nil {
+						fmt.Println("clickhouse更新失败", project_id, result)
+					}
+				} else { //插入
+					err := SaveDataToClickHouse(result)
+					if err != nil {
+						fmt.Println("clickhouse保存失败", project_id, result)
+					}
 				}
 			}
-
-			// count := FindClickHouseByProjectId(project_id) //查询
-			// if count > 0 {                                 //更新
-			// 	delete(result, "create_time") //不更新创建时间
-			// 	delete(result, "project_id")  //不更新项目id(主键)
-			// 	err = UpdateDataToClickHouse(result, map[string]interface{}{"project_id": project_id})
-			// 	if err != nil {
-			// 		fmt.Println("clickhouse更新失败", project_id, data)
-			// 	}
-			// } else { //插入
-
-			// }
-			// lock.Lock()
-			// if len(result) > 0 {
-			// 	count++
-			// 	update := []map[string]interface{}{
-			// 		{"project_id": mongodb.BsonIdToSId(tmp["_id"])},
-			// 		{"$set": result},
-			// 	}
-			// 	arr = append(arr, update)
-			// }
-			// if len(arr) > 50 {
-			// 	MgoPro.UpSertBulk("projectset_wy_back", arr...)
-			// 	arr = [][]map[string]interface{}{}
-			// }
-			// lock.Unlock()
 		}(tmp)
 		if n%1000 == 0 {
 			fmt.Println("current:", n)
@@ -355,12 +313,7 @@ func IncTransactionDataFromPro() {
 		tmp = map[string]interface{}{}
 	}
 	wg.Wait()
-	// if len(arr) > 0 {
-	// 	MgoPro.UpSertBulk("projectset_wy_back", arr...)
-	// 	arr = [][]map[string]interface{}{}
-	// }
-	// fmt.Println("执行增量项目信息完毕", ProStartTime, endTime, count)
-	// ProStartTime = endTime //替换
+	fmt.Println("增量项目信息结束")
 }
 
 // DealTransactionForPro project数据处理
@@ -640,7 +593,7 @@ func UpdateDataToClickHouse(data, querys map[string]interface{}) error {
 		qs = append(qs, fmt.Sprintf("%s=?", k))
 		values = append(values, v)
 	}
-	query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info", strings.Join(sets, ","), strings.Join(qs, ","))
+	query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info_all", strings.Join(sets, ","), strings.Join(qs, ","))
 	//query := `ALTER TABLE information.transaction_info UPDATE update_time = ? WHERE project_id = '5c9ee78ca5cb26b9b7fd0b57'`
 	return CkhTool.Exec(context.Background(), query, values...)
 }