소스 검색

更新 bidding_detail

wcc 1 개월 전
부모
커밋
a566cc17f2
4개의 변경된 파일186개의 추가작업 그리고 26개의 파일을 삭제
  1. 94 5
      createEsIndex/bidding_es.go
  2. 3 2
      createEsIndex/common.toml
  3. 18 17
      createEsIndex/config/conf.go
  4. 71 2
      createEsIndex/main.go

+ 94 - 5
createEsIndex/bidding_es.go

@@ -31,8 +31,96 @@ var (
 	HtmlReg     = regexp.MustCompile("<[^>]+>")
 )
 
+// biddingDetailTask 针对 detail  contenthtml 二个字段的索引;bidding_detail
+func biddingDetailTask(mapInfo map[string]interface{}) {
+	defer util.Catch()
+
+	//stype := util.ObjToString(mapInfo["stype"])
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	} else {
+		//针对gte/lte,单独转换
+		q = convertToMongoID(q)
+	}
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	//bidding库
+	biddingConn := MgoB.GetMgoConn()
+	count, _ := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(&q).Count()
+	log.Info("biddingDetailTask", zap.Int64("同步总数:", count))
+	it := biddingConn.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(&q).Select(nil).Iter()
+	c1, index := 0, 0
+	var indexLock sync.Mutex
+	for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
+		if c1%1000 == 0 {
+			log.Info("biddingDetailTask", zap.Int("current:", c1))
+			log.Info("biddingDetailTask", zap.Any("current:_id =>", tmp["_id"]))
+		}
+		ch <- true
+		wg.Add(1)
+		// 创建一个新的map用于goroutine,避免重用
+		docCopy := make(map[string]interface{})
+		for k, v := range tmp {
+			docCopy[k] = v
+		}
+
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+
+			indexLock.Lock()
+			index++
+			indexLock.Unlock()
+
+			//
+			detail, _ := tmp["detail"].(string)
+			detail = filterSpace.ReplaceAllString(detail, "")
+			detail_new := ""
+
+			if tmp["cleartag"] != nil {
+				if tmp["cleartag"].(bool) {
+					text, _ := FilterDetail(detail)
+					detail_new = util.ObjToString(tmp["title"]) + " " + text
+				} else {
+					detail_new = util.ObjToString(tmp["title"]) + " " + detail
+				}
+			} else {
+				text, _ := FilterDetail(detail)
+				detail_new = util.ObjToString(tmp["title"]) + " " + text
+			}
+
+			//
+			insert := map[string]interface{}{
+				"detail":      detail_new,
+				"id":          mongodb.BsonIdToSId(tmp["_id"]),
+				"_id":         mongodb.BsonIdToSId(tmp["_id"]),
+				"contenthtml": tmp["contenthtml"],
+			}
+			//
+			saveDetailEsPool <- insert
+
+		}(docCopy)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	log.Info("biddingDetailTask over", zap.Int("count", c1), zap.Int("index", index))
+
+}
+
 func biddingTask(mapInfo map[string]interface{}) {
 	defer util.Catch()
+	// 同时处理详情索引
+	go biddingDetailTask(mapInfo)
 
 	stype := util.ObjToString(mapInfo["stype"])
 	q, _ := mapInfo["query"].(map[string]interface{})
@@ -137,6 +225,7 @@ func biddingTask(mapInfo map[string]interface{}) {
 				//之前存在pici,就不在添加
 				if pici, ok := tmp["pici"]; ok {
 					newTmp["pici"] = pici
+					//log.Info("dddddddd", zap.Any("bidding_id", tmp["_id"]), zap.Any("pici", pici))
 				} else {
 					newTmp["pici"] = time.Now().Unix()
 					update["pici"] = time.Now().Unix()
@@ -640,11 +729,11 @@ func GetEsField(tmp map[string]interface{}, stype string) (map[string]interface{
 		}
 	}
 	YuceEndtime(newTmp) // 预测结果时间
-	if stype == "bidding" || stype == "bidding_history" || stype == "index-by-id" {
-		newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有
-		newTmp["pici"] = time.Now().Unix()       //createtime跟pici一样,为了剑鱼功能需要,并行存在一段时间,之后可以删掉createtime
-		update["pici"] = time.Now().Unix()
-	}
+	//if stype == "bidding" || stype == "bidding_history" || stype == "index-by-id" {
+	//	newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有
+	//	newTmp["pici"] = time.Now().Unix()       //createtime跟pici一样,为了剑鱼功能需要,并行存在一段时间,之后可以删掉createtime
+	//	update["pici"] = time.Now().Unix()
+	//}
 
 	if len(saveErr) > 0 {
 		saveErr["infoid"] = mongodb.BsonIdToSId(tmp["_id"])

+ 3 - 2
createEsIndex/common.toml

@@ -77,8 +77,8 @@
     filesize = 500000        ## 单位字节,附件总字节长度限制;超过就不再读取
     detailbucket = "jy-datadetail" ## 获取详情时,oss的配置
 [db.es]
-    addr = "http://127.0.0.1:19908"      ## 正常bidding 链接
-#    addr = "http://192.168.3.149:9201"      ## 测试环境 bidding 链接
+#    addr = "http://127.0.0.1:19908"      ## 正常bidding 链接
+    addr = "http://172.20.45.129:9206"      ## 测试环境 bidding 链接
 #    addrp = "http://192.168.3.149:9201"    ##  采集使用的单机版地址
     username = "jybid"
     password = "Top2023_JEB01i@31"
@@ -87,6 +87,7 @@
 #    indextmp = "bidding_temporary"       ## 临时索引,其他程序需要;目前已不需要
     indexp = "projectset"
     indexpd = "projectdetail" ## 项目支持detail的新索引
+    biddingdetail = "bidding_detail" ## 标讯详情
     detailcount = 50000 ## 项目索引中,detail汉字长度限制
     indexwinner = "winner_v1"
     indexbuyer = "buyer_v3"

+ 18 - 17
createEsIndex/config/conf.go

@@ -127,23 +127,24 @@ type env struct {
 }
 
 type es struct {
-	Addr         string
-	AddrP        string
-	Size         int
-	Username     string
-	Password     string
-	IndexB       string
-	TypeB        string
-	IndexP       string
-	IndexPD      string
-	DetailCount  int
-	TypeP        string
-	IndexWinner  string
-	TypeWinner   string
-	IndexBuyer   string
-	TypeBuyer    string
-	DetailFilter []string
-	IndexTmp     string
+	Addr          string
+	AddrP         string
+	Size          int
+	Username      string
+	Password      string
+	IndexB        string
+	TypeB         string
+	IndexP        string
+	IndexPD       string
+	BiddingDetail string
+	DetailCount   int
+	TypeP         string
+	IndexWinner   string
+	TypeWinner    string
+	IndexBuyer    string
+	TypeBuyer     string
+	DetailFilter  []string
+	IndexTmp      string
 	//FieldEs              map[string]interface{}
 	//FieldPurchasingist  map[string]interface{}
 	//FieldProcurementList map[string]interface{}

+ 71 - 2
createEsIndex/main.go

@@ -44,6 +44,8 @@ var (
 	updateBiddingSp         = make(chan bool, 5)
 	saveEsPool              = make(chan map[string]interface{}, 5000) //保存binding数据到es
 	saveEsSp                = make(chan bool, 5)
+	saveDetailEsSp          = make(chan bool, 5)
+	saveDetailEsPool        = make(chan map[string]interface{}, 5000) //保存binding detail,contenthtml 二个字段数据到es
 	saveProjectEsPool       = make(chan map[string]interface{}, 5000) //保存project数据到es
 	saveProjectSp           = make(chan bool, 5)
 	saveProjectDetailEsPool = make(chan map[string]interface{}, 5000) //保存project detail 数据到es
@@ -106,8 +108,10 @@ func main() {
 		go task_index()  //定时同步更新winner_enterprise、buyer_enterprise ES索引;这个功能很少变动,几乎不需要维护
 	}
 
-	go UpdateBidding() //更新bidding表数据
-	go SaveEsMethod()  //保存es bidding数据
+	//go SaveDetailEsMethod() //保存 bidding_detail 索引
+	go BatchSaveBiddingDetailEsMethod() //保存 bidding_detail 索引
+	go UpdateBidding()                  //更新bidding表数据
+	go SaveEsMethod()                   //保存es bidding数据
 	//go SaveBiddingEsMethod()  //保存es bidding数据
 	go SaveAllEsMethod()      // 保存爬虫采集临时数据
 	go SaveProjectEs()        //保存项目索引数据
@@ -182,6 +186,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					biddingTask(mapInfo)
 				}()
+			case "bidding_detail": //bidding_detail 标讯详情索引;历史数据需要读取oss
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingDetailTask(mapInfo)
+				}()
 			case "biddingall": //补充存量数据
 				pool <- true
 				go func() {
@@ -467,6 +479,63 @@ func SaveEsMethod() {
 	}
 }
 
+// SaveDetailEsMethod 保存bidding detail、contenghtml 字段的详情索引
+func SaveDetailEsMethod() {
+	if config.Conf.DB.Es.BiddingDetail == "" {
+		config.Conf.DB.Es.BiddingDetail = "bidding_detail"
+		log.Info("SaveDetailEsMethod", zap.String("config.Conf.DB.Es.BiddingDetail", "配置文件中,biddingdetail 没有配置,默认初始化为 bidding_detail"))
+	}
+
+	for {
+		select {
+		case v := <-saveDetailEsPool:
+			Es.Save(config.Conf.DB.Es.BiddingDetail, v)
+
+		}
+	}
+}
+
+// BatchSaveBiddingDetailEsMethod 批量保存bidding_detail
+func BatchSaveBiddingDetailEsMethod() {
+	arru := make([]map[string]interface{}, EsBulkSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveDetailEsPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == EsBulkSize {
+				saveDetailEsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveDetailEsSp
+					}()
+					if config.Conf.DB.Es.Addr != "" {
+						Es.BulkSave(config.Conf.DB.Es.BiddingDetail, arru)
+					}
+
+				}(arru)
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveDetailEsSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveDetailEsSp
+					}()
+					if config.Conf.DB.Es.Addr != "" {
+						Es.BulkSave(config.Conf.DB.Es.BiddingDetail, arru)
+					}
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
 // SaveBiddingEsMethod 批量保存bidding数据
 func SaveBiddingEsMethod() {
 	arru := make([]map[string]interface{}, EsBulkSize)