Преглед на файлове

更新代码,批量更新es

wcc преди 9 месеца
родител
ревизия
596f56a56f
променени са 2 файла, в които са добавени 140 реда и са изтрити 92 реда
  1. 139 91
      bidding_tags/main.go
  2. 1 1
      bidding_tags/tools.go

+ 139 - 91
bidding_tags/main.go

@@ -24,6 +24,12 @@ var (
 	RuleC = make([]string, 0)
 	RuleD = make([]string, 0)
 	RuleE = make([]string, 0)
+	//更新es
+	updateEsPool = make(chan []map[string]interface{}, 5000)
+	updateEsSp   = make(chan bool, 3) //保存协程
+	key          = "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
+	model        = "glm-4-flash"
+	re           = regexp.MustCompile(`title:(.*?),projectname:(.*?),id:(.*?),class:(.*?)(?:\s*$|\n)`)
 )
 
 type RequestData struct {
@@ -100,12 +106,12 @@ func Init() {
 
 func main() {
 	Init()
+
+	go updateEsMethod()
 	//dealAll() //规则处理存量数据
 	//dealInc()
 	//dealAllAi() //大模型
 
-	//select {}
-
 	local, _ := time.LoadLocation("Asia/Shanghai")
 	c := cron.New(cron.WithLocation(local), cron.WithSeconds())
 	_, err := c.AddFunc(GF.Env.Spec, dealInc)
@@ -115,7 +121,7 @@ func main() {
 
 	c.Start()
 	defer c.Stop()
-
+	//
 	select {}
 }
 
@@ -154,20 +160,22 @@ func dealAll() {
 	dealTopInformation(where, timeTypeAll) //处理情报标签一级
 }
 
+// dealAllAi 大模型处理存量数据
 func dealAllAi() {
-	//where := map[string]interface{}{
-	//	//"comeintime": map[string]interface{}{
-	//	//	//"$gte": 1726070400,
-	//	//	"$lt": 1727078591,
-	//	//},
-	//}
+	where := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			//"$gte": 1726070400,
+			"$lt": 1728316800,
+		},
+	}
 
 	log.Println("开始处理数据")
 	//dealTopInformationAi(where) //处理情报标签一级
 	//dealTopInformationAi2(where) //处理情报标签一级;50个一组调用
 	//dealTopInformationAi3(where) //单个数据调用
-	dealTopInformationAi4(nil) //5一组,开启携程
+	dealTopInformationAi4(where) //5一组,开启携程
 	//dealTopInformationAi5(nil) //单条调用智普,开启携程
+	log.Println("存量数据处理完毕")
 }
 
 // dealTopInformation  处理情报标签一级;剑鱼码 处理方式
@@ -269,68 +277,12 @@ func dealTopInformationAi4(where interface{}) {
 	defer Mgo.DestoryMongoConn(sess)
 	count := 0
 
-	key := "4d5206b1b297c1e7b77f9578edcb2cf7.TNU2i8G1oUNdR02i"
-	model := "glm-4-flash"
-	//key := "6c86cea8659ff1d33b161ea7213ea97c.m4OcENaRan8NeLSZ" //我自己的key
-
-	re := regexp.MustCompile(`title:(.*?),projectname:(.*?),id:(.*?),class:(.*?)(?:\s*$|\n)`)
 	var lines = make([]string, 0)
 	it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Select(nil).Sort("-_id").Iter()
-	resultsChan := make(chan map[string]string, 5000)
-	sem := make(chan struct{}, 100) // 控制并发数量
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-		count2 := 0
-		for item := range resultsChan {
-			count2++
-			// 处理返回结果,例如更新数据库
-			tags := make([]string, 0)
-			class := util.ObjToString(item["class"])
-			if class == "其他分类" || class == "" || item["id"] == "" {
-				continue
-			}
-			for _, v2 := range strings.Split(class, ",") {
-				if v2 == "车辆领域" {
-					tags = append(tags, "情报_车辆租赁")
-				} else if v2 == "安防领域" {
-					tags = append(tags, "情报_安防")
-				} else if v2 == "印务领域" {
-					tags = append(tags, "情报_印务商机")
-				} else if v2 == "环境领域" {
-					tags = append(tags, "情报_环境采购")
-				} else if v2 == "家具领域" {
-					tags = append(tags, "情报_家具招投标")
-				}
-			}
-			if len(tags) > 0 {
-				id := util.ObjToString(item["id"])
-				updateMgo := map[string]interface{}{
-					"tag_topinformation_zp": tags,
-					//"topinformation_time":   time.Now().Unix(),
-				}
-				Mgo.UpdateById("bidding", id, map[string]interface{}{"$set": updateMgo})
-				updateEs := map[string]interface{}{
-					"tag_topinformation_zp": tags,
-				}
-
-				if count2%100 == 0 {
-					log.Println("update es", id, tags)
-				}
-				if GF.Esa.URL != "" {
-					_ = Esa.UpdateDocument("bidding", id, updateEs)
-				}
-
-				if GF.Esb.URL != "" {
-					_ = Esb.UpdateDocument("bidding", id, updateEs)
-				}
-			}
-		}
-	}()
+	sem := make(chan struct{}, 50) // 控制并发数量
 
 	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
-		if count%1000 == 0 {
+		if count%100 == 0 {
 			log.Println("current:", count, tmp["title"], tmp["_id"])
 		}
 		title := util.ObjToString(tmp["title"])
@@ -344,22 +296,50 @@ func dealTopInformationAi4(where interface{}) {
 				report := strings.Join(lines, "\n")
 				resu, err := ZpAI4(key, model, report)
 				lines = make([]string, 0)
-
 				if err != nil {
-					log.Println("智普请求失败,", err, report)
+					log.Println("智普请求失败,", err)
 					return
 				}
 				splitLines := strings.Split(resu, `;`)
 				for _, line := range splitLines {
 					matches := re.FindStringSubmatch(line)
 					if len(matches) == 5 {
-						result := map[string]string{
-							"title":       matches[1],
-							"projectname": matches[2],
-							"id":          matches[3],
-							"class":       matches[4],
+						class := matches[4]
+						id := matches[3]
+						tags := make([]string, 0)
+						if class == "其他分类" || class == "" || id == "" {
+							continue
+						}
+						for _, v2 := range strings.Split(class, ",") {
+							if v2 == "车辆领域" {
+								tags = append(tags, "情报_车辆租赁")
+							} else if v2 == "安防领域" {
+								tags = append(tags, "情报_安防")
+							} else if v2 == "印务领域" {
+								tags = append(tags, "情报_印务商机")
+							} else if v2 == "环境领域" {
+								tags = append(tags, "情报_环境采购")
+							} else if v2 == "家具领域" {
+								tags = append(tags, "情报_家具招投标")
+							}
+						}
+						if len(tags) > 0 {
+							updateMgo := map[string]interface{}{
+								"tag_topinformation_zp": tags,
+							}
+							Mgo.UpdateById("bidding", id, map[string]interface{}{"$set": updateMgo})
+							updateEs := map[string]interface{}{
+								"tag_topinformation_zp": tags,
+							}
+							if count%1000 == 0 {
+								log.Println("update es", id, tags)
+							}
+							// 更新es
+							updateEsPool <- []map[string]interface{}{
+								{"_id": id},
+								updateEs,
+							}
 						}
-						resultsChan <- result
 					}
 				}
 			}(lines)
@@ -371,29 +351,55 @@ func dealTopInformationAi4(where interface{}) {
 	if len(lines) > 0 {
 		report := strings.Join(lines, "\n")
 		resu, err := ZpAI4(key, model, report)
+		lines = make([]string, 0)
 		if err != nil {
-			log.Println("智普请求失败,尝试再次请求", err)
+			log.Println("智普请求失败,", err, report)
+			return
 		}
-		resu, err = ZpAI4(key, model, report)
-		if err == nil && resu != "" {
-			splitLines := strings.Split(resu, "\n")
-			for _, line := range splitLines {
-				matches := re.FindStringSubmatch(line)
-				if len(matches) == 5 {
-					result := map[string]string{
-						"title":       matches[1],
-						"projectname": matches[2],
-						"id":          matches[3],
-						"class":       matches[4],
+		splitLines := strings.Split(resu, `;`)
+		for _, line := range splitLines {
+			matches := re.FindStringSubmatch(line)
+			if len(matches) == 5 {
+				class := matches[4]
+				id := matches[3]
+				tags := make([]string, 0)
+				if class == "其他分类" || class == "" || id == "" {
+					continue
+				}
+				for _, v2 := range strings.Split(class, ",") {
+					if v2 == "车辆领域" {
+						tags = append(tags, "情报_车辆租赁")
+					} else if v2 == "安防领域" {
+						tags = append(tags, "情报_安防")
+					} else if v2 == "印务领域" {
+						tags = append(tags, "情报_印务商机")
+					} else if v2 == "环境领域" {
+						tags = append(tags, "情报_环境采购")
+					} else if v2 == "家具领域" {
+						tags = append(tags, "情报_家具招投标")
+					}
+				}
+				if len(tags) > 0 {
+					updateMgo := map[string]interface{}{
+						"tag_topinformation_zp": tags,
+					}
+					Mgo.UpdateById("bidding", id, map[string]interface{}{"$set": updateMgo})
+					updateEs := map[string]interface{}{
+						"tag_topinformation_zp": tags,
+					}
+					if count%1000 == 0 {
+						log.Println("update es", id, tags)
+					}
+					// 更新es
+					updateEsPool <- []map[string]interface{}{
+						{"_id": id},
+						updateEs,
 					}
-					resultsChan <- result
 				}
 			}
 		}
 	}
 
-	close(resultsChan) // 关闭 channel
-	wg.Wait()
 	log.Println("大模型处理数据处理完毕")
 }
 
@@ -526,3 +532,45 @@ func dealAi(datas []RequestData) {
 	//	}
 	//}
 }
+
+// updateEsMethod 更新es
+func updateEsMethod() {
+	arru := make([][]map[string]interface{}, 200) //200条一组更新es
+	indexu := 0
+	for {
+		select {
+		case v := <-updateEsPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				updateEsSp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-updateEsSp
+					}()
+					Esa.UpdateBulk("bidding", arru...)
+					if Esb.S_esurl != "" {
+						Esb.UpdateBulk("bidding", arru...)
+					}
+				}(arru)
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				updateEsSp <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-updateEsSp
+					}()
+					Esa.UpdateBulk("bidding", arru...)
+					if Esb.S_esurl != "" {
+						Esb.UpdateBulk("bidding", arru...)
+					}
+				}(arru[:indexu])
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}

+ 1 - 1
bidding_tags/tools.go

@@ -295,7 +295,7 @@ title:文本,projectname:文本,id:13123123,class:文本;
 		TopP:        &originalValue2,
 	}
 
-	postResponse, err := zhipu.BeCommonModel(expireAtTime, mssage, apiKey, time.Minute*8)
+	postResponse, err := zhipu.BeCommonModel(expireAtTime, mssage, apiKey, time.Minute*10)
 	if err != nil {
 		log.Println("请求智普失败", err)
 		return