Browse Source

迭代优化

xuzhiheng 8 months ago
parent
commit
c5e8b4d9ff
1 changed files with 203 additions and 202 deletions
  1. 203 202
      field_sync/task.go

+ 203 - 202
field_sync/task.go

@@ -67,17 +67,21 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	log.Println("bidding表 同步总数:", count)
 	c := 0
 	// if count < 500000 {
-	var res []map[string]interface{}
+	// var res []map[string]interface{}
 	result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
 		"contenthtml": 0,
 	}).Iter()
 	for tmp := make(map[string]interface{}); result.Next(tmp); {
-		res = append(res, tmp)
+		// res = append(res, tmp)
+		ec := doIndex(tmp, bkey, stype)
+		if ec > 0 {
+			c++
+		}
 		tmp = make(map[string]interface{})
 	}
 	MgoB.DestoryMongoConn(biddingConn)
 	// log.Println("查询结果 bidding", count, "抽取:", extCount)
-	c = doIndex(res, bkey, stype)
+	// c = doIndex(res, bkey, stype)
 	// } else {
 	// 	log.Println("查询结果 数据量太大,放弃", count)
 	// 	MgoB.DestoryMongoConn(biddingConn)
@@ -255,236 +259,233 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 	log.Println("biddingAll sync...over all", count)
 }
 
-func doIndex(infos []map[string]interface{}, bkey, stype string) int {
+func doIndex(tmp map[string]interface{}, bkey, stype string) int {
 	syncNo := 0 //抽取表数据同步数量
 	//对比两张表数据,减少查询次数
 	var compare map[string]interface{}
-	var bidUpdate [][]map[string]interface{}
-	var extUpdate [][]map[string]interface{}
+	var bidUpdate = map[string]interface{}{}
+	var extUpdate = map[string]interface{}{}
 	//SaveEsLock := &sync.Mutex{}
 	ygsiteArr := []map[string]interface{}{}
 	sitedata, _ := MgoE.Find("bidding_yg_site", map[string]interface{}{}, nil, nil, false, -1, -1)
 	if sitedata != nil && len(*sitedata) > 0 {
 		ygsiteArr = *sitedata
 	}
-	log.Println("start ...")
-	for n, tmp := range infos {
-		tid := mongodb.BsonIdToSId(tmp["_id"])
-		update := map[string]interface{}{} //要更新的mongo数据
-		del := map[string]interface{}{}
-		edata, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, tid, nil)
-		//对比方法----------------
-		if edata != nil && len(*edata) > 0 {
-			compare = *edata
-			if stype == "bidding" {
-				// 增量id段 正常数据
-				if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过
-					tmp = make(map[string]interface{})
-					compare = nil
-					continue
-				}
-				// delete(eMap, tid)
-			}
-			if stype == "bidding_history" {
-				//增量id段 历史数据
-				if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过
-					tmp = make(map[string]interface{})
-					compare = nil
-					continue
-				}
-				// delete(eMap, tid)
-			}
-			syncNo++
-			log.Println("抽取区域 省", compare["area"], " 市 ", compare["city"], " 区 ", compare["district"], " id ", tid)
-			modifyinfo := make(map[string]bool)
-			if tmp["modifyinfo"] != nil {
-				if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok {
-					for k := range tmpmodifyinfo {
-						modifyinfo[k] = true
-					}
-				}
-			}
-			for _, k := range config.Conf.Serve.FieldS {
-				v1 := compare[k] //extract
-				v2 := tmp[k]     //bidding
-				if v2 == nil && v1 != nil {
-					update[k] = v1
-				} else if v2 != nil && v1 != nil && !modifyinfo[k] {
-					update[k] = v1
-				} else if v2 != nil && v1 == nil && !modifyinfo[k] {
-					if k == "s_subscopeclass" && del["subscopeclass"] == nil {
-						continue
-					} else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
-						continue
-					} else if k == "city" || k == "district" {
-						update[k] = ""
-					} else {
-						del[k] = 1
-					}
-				}
-			}
-			// 附件重采,数据同步时不更新判重标识
-			if util.IntAll(compare["repeat"]) == 1 {
-				update["extracttype"] = -1
-				update["dataprocess"] = 7
-				if compare["repeat_id"] != nil {
-					update["repeat_id"] = compare["repeat_id"]
-				}
-			} else {
-				update["extracttype"] = 1
-				update["dataprocess"] = 8
+	// log.Println("start ...")
+	// for n, tmp := range infos {
+	tid := mongodb.BsonIdToSId(tmp["_id"])
+	update := map[string]interface{}{} //要更新的mongo数据
+	del := map[string]interface{}{}
+	edata, _ := MgoE.FindById(config.Conf.DB.MongoE.Coll, tid, nil)
+	//对比方法----------------
+	if edata != nil && len(*edata) > 0 {
+		compare = *edata
+		if stype == "bidding" {
+			// 增量id段 正常数据
+			if dg := util.IntAll(compare["dataging"]); dg == 1 { //extract中dataging=1跳过
+				tmp = make(map[string]interface{})
+				compare = nil
+				return 0
 			}
-		} else {
-			compare = nil
-			if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
-				update["dataging"] = 0
+			// delete(eMap, tid)
+		}
+		if stype == "bidding_history" {
+			//增量id段 历史数据
+			if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过
+				tmp = make(map[string]interface{})
+				compare = nil
+				return 0
 			}
-			update["dataprocess"] = 8
+			// delete(eMap, tid)
 		}
-		//下面可以多线程跑的--->
-		//处理分类
-		if compare != nil { //extract
-			fieldFun(compare, update)
-			// publishtime 20230523
-			if util.IntAll(tmp["publishtime"]) == -1 {
-				if pb := methodPb(compare); pb > 0 {
-					update["publishtime"] = pb
+		syncNo++
+		log.Println("抽取区域 省", compare["area"], " 市 ", compare["city"], " 区 ", compare["district"], " id ", tid)
+		modifyinfo := make(map[string]bool)
+		if tmp["modifyinfo"] != nil {
+			if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok {
+				for k := range tmpmodifyinfo {
+					modifyinfo[k] = true
 				}
 			}
-			compare = nil
-		}
-		//------------------对比结束
-
-		//处理key descript
-		if bkey == "" {
-			DealInfo(&tmp, &update)
 		}
-		// entidlist
-		extractMap := make(map[string]interface{})
-		if update["s_winner"] != "" {
-			cid := companyFun(update)
-			if len(cid) > 0 {
-				tmp["entidlist"] = cid
-				update["entidlist"] = cid
-				extractMap["entidlist"] = cid
+		for _, k := range config.Conf.Serve.FieldS {
+			v1 := compare[k] //extract
+			v2 := tmp[k]     //bidding
+			if v2 == nil && v1 != nil {
+				update[k] = v1
+			} else if v2 != nil && v1 != nil && !modifyinfo[k] {
+				update[k] = v1
+			} else if v2 != nil && v1 == nil && !modifyinfo[k] {
+				if k == "s_subscopeclass" && del["subscopeclass"] == nil {
+					continue
+				} else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
+					continue
+				} else if k == "city" || k == "district" {
+					update[k] = ""
+				} else {
+					del[k] = 1
+				}
 			}
 		}
-		//阳光采购
-		spidercode := util.ObjToString(tmp["spidercode"])
-		site := util.ObjToString(tmp["site"])
-		infoAttribute := util.ObjToString(tmp["infoattribute"])
-		if infoAttribute == "zc_cgxx" {
-			update["public_type"] = "用户发布"
+		// 附件重采,数据同步时不更新判重标识
+		if util.IntAll(compare["repeat"]) == 1 {
 			update["extracttype"] = -1
 			update["dataprocess"] = 7
-			MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "user"})
-			log.Println("阳光采购用户发布", tid)
-		} else {
-			if util.IntAll(compare["repeat"]) != 1 {
-				for _, v := range ygsiteArr {
-					spidercodes := util.ObjToString(v["spidercode"])
-					sites := util.ObjToString(v["site"])
-					datatype := util.ObjToString(v["datatype"])
-					if datatype == "1" && spidercodes == spidercode {
-						update["infoattribute"] = "zc_cgxx"
-						update["public_type"] = "平台发布"
-						MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "spidercode"})
-						log.Println("阳光采购平台发布爬虫", tid)
-					} else if datatype == "2" && site == sites {
-						update["infoattribute"] = "zc_cgxx"
-						update["public_type"] = "平台发布"
-						MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "site"})
-						log.Println("阳光采购平台发布站点", tid)
-					} else if datatype == "3" && spidercodes == spidercode {
-						update["infoattribute"] = "zc_cgxx"
-						update["public_type"] = "平台发布"
-						update["is_yg_new"] = 1
-						update["extracttype"] = -1
-						update["dataprocess"] = 7
-						MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "is_yg_new"})
-						log.Println("阳光采购平台新采集爬虫", tid)
-					}
-				}
+			if compare["repeat_id"] != nil {
+				update["repeat_id"] = compare["repeat_id"]
 			}
+		} else {
+			update["extracttype"] = 1
+			update["dataprocess"] = 8
 		}
-		// 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
-		typeFunc(tmp, update, extractMap)
-		if len(extractMap) > 0 {
-			if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
-				extUpdate = append(extUpdate, []map[string]interface{}{
-					{"_id": tmp["_id"]},
-					{"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
-				})
-			} else {
-				extUpdate = append(extUpdate, []map[string]interface{}{
-					{"_id": tmp["_id"]},
-					{"$set": extractMap},
-				})
-			}
-			if len(extUpdate) >= MgoBulkSize {
-				tmps := extUpdate
-				MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
-				extUpdate = [][]map[string]interface{}{}
-			}
+	} else {
+		compare = nil
+		if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
+			update["dataging"] = 0
 		}
-		// 附件有效字段
-		if i := validFile(tmp); i != 0 {
-			if i == -1 {
-				tmp["isValidFile"] = false
-				update["isValidFile"] = false
-			} else {
-				tmp["isValidFile"] = true
-				update["isValidFile"] = true
+		update["dataprocess"] = 8
+	}
+	//下面可以多线程跑的--->
+	//处理分类
+	if compare != nil { //extract
+		fieldFun(compare, update)
+		// publishtime 20230523
+		if util.IntAll(tmp["publishtime"]) == -1 {
+			if pb := methodPb(compare); pb > 0 {
+				update["publishtime"] = pb
 			}
 		}
-		// 2024-02-21 徐志恒 情报标签字段
-		toptype := util.ObjToString(tmp["toptype"])
-		subtype := util.ObjToString(tmp["subtype"])
-		buyerclass := util.ObjToString(update["buyerclass"])
-		if buyerclass != "" {
-			update["buyer_type"] = getStr(buyerclass)
-		}
-		s_topscopeclass := util.ObjToString(update["s_topscopeclass"])
-		if (tmp["tag_topinformation"] != nil && (subtype == "合同" || subtype == "中标" || subtype == "成交" || subtype == "采购意向" || toptype == "招标")) || (tmp["tag_topinformation"] == nil && toptype == "拟建" && strings.Contains(s_topscopeclass, "建筑工程")) {
-			update["tag_set"] = getTagSet(tmp, compare)
+		compare = nil
+	}
+	//------------------对比结束
+
+	//处理key descript
+	if bkey == "" {
+		DealInfo(&tmp, &update)
+	}
+	// entidlist
+	extractMap := make(map[string]interface{})
+	if update["s_winner"] != "" {
+		cid := companyFun(update)
+		if len(cid) > 0 {
+			tmp["entidlist"] = cid
+			update["entidlist"] = cid
+			extractMap["entidlist"] = cid
 		}
-		if len(update) > 0 {
-			log.Println("保存bidding区域 省", update["area"], " 市 ", update["city"], " 区 ", update["district"], " buyerclass ", update["buyerclass"], update["buyer_type"], " id ", tid)
-			if len(del) > 0 {
-				bidUpdate = append(bidUpdate, []map[string]interface{}{{
-					"_id": tmp["_id"],
-				},
-					{"$set": update, "$unset": del},
-				})
-			} else {
-				bidUpdate = append(bidUpdate, []map[string]interface{}{{
-					"_id": tmp["_id"],
-				},
-					{"$set": update},
-				})
-			}
-			if len(bidUpdate) >= MgoBulkSize {
-				tmps := bidUpdate
-				MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
-				bidUpdate = [][]map[string]interface{}{}
+	}
+	//阳光采购
+	spidercode := util.ObjToString(tmp["spidercode"])
+	site := util.ObjToString(tmp["site"])
+	infoAttribute := util.ObjToString(tmp["infoattribute"])
+	if infoAttribute == "zc_cgxx" {
+		update["public_type"] = "用户发布"
+		update["extracttype"] = -1
+		update["dataprocess"] = 7
+		MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "user"})
+		log.Println("阳光采购用户发布", tid)
+	} else {
+		if util.IntAll(compare["repeat"]) != 1 {
+			for _, v := range ygsiteArr {
+				spidercodes := util.ObjToString(v["spidercode"])
+				sites := util.ObjToString(v["site"])
+				datatype := util.ObjToString(v["datatype"])
+				if datatype == "1" && spidercodes == spidercode {
+					update["infoattribute"] = "zc_cgxx"
+					update["public_type"] = "平台发布"
+					MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "spidercode"})
+					log.Println("阳光采购平台发布爬虫", tid)
+				} else if datatype == "2" && site == sites {
+					update["infoattribute"] = "zc_cgxx"
+					update["public_type"] = "平台发布"
+					MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "site"})
+					log.Println("阳光采购平台发布站点", tid)
+				} else if datatype == "3" && spidercodes == spidercode {
+					update["infoattribute"] = "zc_cgxx"
+					update["public_type"] = "平台发布"
+					update["is_yg_new"] = 1
+					update["extracttype"] = -1
+					update["dataprocess"] = 7
+					MgoE.Save("bidding_yg", map[string]interface{}{"id": tid, "source": "is_yg_new"})
+					log.Println("阳光采购平台新采集爬虫", tid)
+				}
 			}
 		}
-		if n%500 == 0 {
-			log.Println("biddingTask current ", n)
+	}
+	// 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
+	typeFunc(tmp, update, extractMap)
+	if len(extractMap) > 0 {
+		if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
+			// extUpdate = append(extUpdate, []map[string]interface{}{
+			// 	{"_id": tmp["_id"]},
+			// 	{"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
+			// })
+			extUpdate = map[string]interface{}{"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}}
+		} else {
+			extUpdate = map[string]interface{}{"$set": extractMap}
 		}
-		tmp = make(map[string]interface{})
+		// if len(extUpdate) >= MgoBulkSize {
+		// 	tmps := extUpdate
+		// 	MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
+		// 	extUpdate = [][]map[string]interface{}{}
+		// }
+		MgoE.UpdateById(config.Conf.DB.MongoE.Coll, tid, extUpdate)
 	}
-	//SaveEsLock.Lock()
-	if len(bidUpdate) > 0 {
-		tmps := bidUpdate
-		MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
-		bidUpdate = [][]map[string]interface{}{}
+	// 附件有效字段
+	if i := validFile(tmp); i != 0 {
+		if i == -1 {
+			tmp["isValidFile"] = false
+			update["isValidFile"] = false
+		} else {
+			tmp["isValidFile"] = true
+			update["isValidFile"] = true
+		}
+	}
+	// 2024-02-21 徐志恒 情报标签字段
+	toptype := util.ObjToString(tmp["toptype"])
+	subtype := util.ObjToString(tmp["subtype"])
+	buyerclass := util.ObjToString(update["buyerclass"])
+	if buyerclass != "" {
+		update["buyer_type"] = getStr(buyerclass)
 	}
-	if len(extUpdate) > 0 {
-		tmps := extUpdate
-		MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
-		extUpdate = [][]map[string]interface{}{}
+	s_topscopeclass := util.ObjToString(update["s_topscopeclass"])
+	if (tmp["tag_topinformation"] != nil && (subtype == "合同" || subtype == "中标" || subtype == "成交" || subtype == "采购意向" || toptype == "招标")) || (tmp["tag_topinformation"] == nil && toptype == "拟建" && strings.Contains(s_topscopeclass, "建筑工程")) {
+		update["tag_set"] = getTagSet(tmp, compare)
 	}
+	if len(update) > 0 {
+		log.Println("保存bidding区域 省", update["area"], " 市 ", update["city"], " 区 ", update["district"], " buyerclass ", update["buyerclass"], update["buyer_type"], " id ", tid)
+		if len(del) > 0 {
+			bidUpdate = map[string]interface{}{"$set": update, "$unset": del}
+			// bidUpdate = append(bidUpdate, []map[string]interface{}{{
+			// 	"_id": tmp["_id"],
+			// },
+			// 	{"$set": update, "$unset": del},
+			// })
+		} else {
+			bidUpdate = map[string]interface{}{"$set": update}
+		}
+		// if len(bidUpdate) >= MgoBulkSize {
+		// 	tmps := bidUpdate
+		// 	MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
+		// 	bidUpdate = [][]map[string]interface{}{}
+		// }
+		MgoB.UpdateById(config.Conf.DB.MongoB.Coll, tid, bidUpdate)
+	}
+	// if n%500 == 0 {
+	// 	log.Println("biddingTask current ", n)
+	// }
+	// tmp = make(map[string]interface{})
+	// }
+	//SaveEsLock.Lock()
+	// if len(bidUpdate) > 0 {
+	// 	tmps := bidUpdate
+	// 	MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
+	// 	bidUpdate = [][]map[string]interface{}{}
+	// }
+	// if len(extUpdate) > 0 {
+	// 	tmps := extUpdate
+	// 	MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
+	// 	extUpdate = [][]map[string]interface{}{}
+	// }
 	//SaveEsLock.Unlock()
 	return syncNo
 }