Bladeren bron

功能修改

mxs 1 jaar geleden
bovenliggende
commit
ada6cfa7a3
3 gewijzigde bestanden met toevoegingen van 82 en 53 verwijderingen
  1. 11 8
      data_project_wy/history.go
  2. 68 41
      data_project_wy/main.go
  3. 3 4
      data_project_wy/task.go

+ 11 - 8
data_project_wy/history.go

@@ -278,18 +278,21 @@ func HisTransactionDataAddInformation() {
 		//},
 
 		//历史projectset_wy
-		"project_id": map[string]interface{}{
-			"$lte": "662143800000000000000000",
-		},
+		//"project_id": map[string]interface{}{
+		//	//"$gt":  "662143800000000000000000",
+		//	"$gt": "667c3b5166cf0db42ae965e6",
+		//},
 
+		"project_id": "6637ae0866cf0db42aeeb5d4",
 		//历史projectset_wy_back
 		//"update_time": map[string]interface{}{
-		//	"$lt": 1714959573,
+		//	"$gte": 1714959573,
+		//	"$lte": 1719795791,
 		//},
 	}
-	count := MgoPro.Count("projectset_wy", query)
+	count := MgoPro.Count("projectset_wy_back", query)
 	fmt.Println("count:", count)
-	it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
+	it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
 	n := 0
 	arr := [][]map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
@@ -368,7 +371,7 @@ func HisTransactionDataAddInformation() {
 			lock.Lock()
 			arr = append(arr, update)
 			if len(arr) > 100 {
-				MgoPro.UpdateBulk("projectset_wy", arr...)
+				MgoPro.UpdateBulk("projectset_wy_back", arr...)
 				arr = [][]map[string]interface{}{}
 			}
 			lock.Unlock()
@@ -380,7 +383,7 @@ func HisTransactionDataAddInformation() {
 	}
 	wg.Wait()
 	if len(arr) > 0 {
-		MgoPro.UpdateBulk("projectset_wy", arr...)
+		MgoPro.UpdateBulk("projectset_wy_back", arr...)
 		arr = [][]map[string]interface{}{}
 	}
 	fmt.Println("迁移结束...")

+ 68 - 41
data_project_wy/main.go

@@ -39,9 +39,9 @@ func main() {
 func tmp() {
 	sess := MgoPro.GetMgoConn()
 	defer MgoPro.DestoryMongoConn(sess)
-	ch := make(chan bool, 1)
+	ch := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
-	//lock := &sync.Mutex{}
+	lock := &sync.Mutex{}
 	query := map[string]interface{}{
 		//"project_bidstatus": 4,
 		//"_id": map[string]interface{}{
@@ -51,14 +51,18 @@ func tmp() {
 		//	"$lt": 1714959573,
 		//},
 		//"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
-		"repeat": true,
+		//"repeat": true,
+		"update_time": map[string]interface{}{
+			"$gte": 1714959573,
+			"$lte": 1719795791,
+		},
 	}
 	repeat := map[string]bool{}
 	count := MgoPro.Count("projectset_wy_back", query)
 	fmt.Println("count:", count)
 	it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
 	n := 0
-	//arr := []map[string]interface{}{}
+	arr := [][]map[string]interface{}{}
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
 		ch <- true
 		wg.Add(1)
@@ -67,13 +71,13 @@ func tmp() {
 				<-ch
 				wg.Done()
 			}()
-			//update := []map[string]interface{}{}
-			//project_id := gconv.String(tmp["project_id"])
+			update := []map[string]interface{}{}
+			project_id := gconv.String(tmp["project_id"])
 			//lock.Lock()
 			//if !repeat[project_id] {
-			//	Es.DelById(Config.Es.Index, project_id)
-			//	CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
-			//	repeat[project_id] = true
+			//Es.DelById(Config.Es.Index, project_id)
+			//CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
+			//repeat[project_id] = true
 			//}
 			//lock.Unlock()
 			//err, result := Es.GetById(Config.Es.Index, project_id)
@@ -87,20 +91,36 @@ func tmp() {
 			//		fmt.Println("11", project_id)
 			//	}
 			//}
-			if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
-				fmt.Println("project_id")
-				//update = append(update, map[string]interface{}{"_id": tmp["_id"]})
-				//update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": true}})
+
+			tt := map[string]bool{}
+			err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
+			if err != nil || len(result) == 0 {
+				tt["es"] = true
 			}
-			//if len(update) > 0 {
-			//	lock.Lock()
-			//	arr = append(arr, update)
-			//	if len(arr) > 500 {
-			//		MgoPro.UpdateBulk("projectset_wy_back", arr...)
-			//		arr = [][]map[string]interface{}{}
-			//	}
-			//	lock.Unlock()
+			if FindClickHouseByProjectId(project_id) == 0 {
+				tt["click"] = true
+			}
+			if len(tt) > 0 {
+				update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+				update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": tt}})
+			}
+			//if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
+			//	fmt.Println("project_id")
+			//	update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+			//	update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "project"}})
+			//} else if MgoB.Count("projectset_wy", map[string]interface{}{"project_id": tmp["project_id"]}) > 0 {
+			//	update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+			//	update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "bidding"}})
 			//}
+			if len(update) > 0 {
+				lock.Lock()
+				arr = append(arr, update)
+				if len(arr) > 500 {
+					MgoPro.UpdateBulk("projectset_wy_back", arr...)
+					arr = [][]map[string]interface{}{}
+				}
+				lock.Unlock()
+			}
 		}(tmp)
 		if n%1000 == 0 {
 			fmt.Println("current:", n)
@@ -108,15 +128,16 @@ func tmp() {
 		tmp = map[string]interface{}{}
 	}
 	wg.Wait()
-	//if len(arr) > 0 {
-	//	MgoPro.SaveBulk("projectset_wy_tmp2", arr...)
-	//	arr = []map[string]interface{}{}
-	//}
+	if len(arr) > 0 {
+		MgoPro.UpdateBulk("projectset_wy_back", arr...)
+		arr = [][]map[string]interface{}{}
+	}
 	fmt.Println("迁移结束...", len(repeat))
 }
 
 func getBiddingData() {
 	url := "http://172.17.4.184:19908"
+	//url := "http://127.0.0.1:19908"
 	//url := "http://192.168.3.149:9200"
 	username := "jybid"
 	password := "Top2023_JEB01i@31"
@@ -132,8 +153,8 @@ func getBiddingData() {
 	}
 
 	//rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
-	//query := elastic.NewBoolQuery().
-	//	Must(elastic.NewTermQuery("id", "652d2f432ccb08936fe2ed60")) //
+	query := elastic.NewBoolQuery().
+		Must(elastic.NewTermQuery("id", "64d7146cb44bf08751e3c133")) //
 	//Must(rangeQuery)
 	//Must(elastic.NewTermQuery("subtype", "招标"))
 
@@ -149,7 +170,7 @@ func getBiddingData() {
 	scrollID := ""
 	scroll := "10m"
 	searchSource := elastic.NewSearchSource().
-		//Query(query).
+		Query(query).
 		Size(500)
 	//Sort("_doc", true) //升序排序
 	//Sort("_doc", false) //降序排序
@@ -181,17 +202,19 @@ func getBiddingData() {
 				log.Printf("解析文档失败:%s", err)
 				continue
 			}
+			set := map[string]interface{}{}
 			id := gconv.String(doc["id"])
 			//情报
+			information_id := gconv.String(doc["information_id"])
 			info_ids := gconv.Strings(doc["info_ids"])
 			info := FindInfomationData(info_ids...)
-			doc["information_id"] = info.Id
-			doc["starttime"] = info.Starttime
-			doc["endtime"] = info.Endtime
-			set := map[string]interface{}{
-				"information_id": info.Id,
-				"starttime":      info.Starttime,
-				"endtime":        info.Endtime,
+			if information_id != info.Id {
+				doc["information_id"] = info.Id
+				doc["starttime"] = info.Starttime
+				doc["endtime"] = info.Endtime
+				set["information_id"] = info.Id
+				set["starttime"] = info.Starttime
+				set["endtime"] = info.Endtime
 			}
 
 			//法人
@@ -231,12 +254,16 @@ func getBiddingData() {
 			} else if business_type == "采购意向" {
 				coll = "projectset_wy"
 			}
-			//更新es
-			client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
-			//更新clickhouse
-			UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
-			//更新mgo
-			MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
+			fmt.Println(set)
+			if len(set) > 0 {
+				//更新es
+				client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
+				//更新clickhouse
+				err := UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
+				fmt.Println("11", err)
+				//更新mgo
+				MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
+			}
 		}
 		total = total + len(res.Hits.Hits)
 		scrollID = res.ScrollId

+ 3 - 4
data_project_wy/task.go

@@ -46,9 +46,8 @@ type Transaction struct {
 }
 
 func IncTransactionDataFromBidAndPro() {
-	IncTransactionDataFromBid() //bidding
-	IncTransactionDataFromPro() //project
-	return
+	IncTransactionDataFromBid()       //bidding
+	IncTransactionDataFromPro()       //project
 	IncTransactionDataMgoToCkhAndEs() //mongodb迁移至clickhouse
 }
 
@@ -376,7 +375,7 @@ func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
 	//查询情报信息
 	ids := gconv.Strings(data["ids"])
 	info := FindInfomationData(ids...) //情报信息查询
-
+	//查询法人信息
 	winners := []string{}
 	if winner != "" {
 		winners = strings.Split(winner, ",")