瀏覽代碼

同步双库生索引

maxiaoshan 5 年之前
父節點
當前提交
1d88ac4e73

+ 244 - 1
udpcreateindex/src/biddingindex.go

@@ -70,7 +70,7 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 			}
 			udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd)
 		} else {
-			n1, n2 = doIndex(res, eMap, index, itype, db, c, bkey)
+			n1, n2 = doIndex1(res, eMap, index, itype, db, c, bkey)
 			if (n1 + n2) != count {
 				log.Println("任务错误,结果不一致")
 			}
@@ -280,6 +280,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		if len(arrEs) >= BulkSize-1 {
 			tmps := arrEs
 			elastic.BulkSave(index, itype, &tmps, true)
+			if other_index != "" && other_itype != "" {
+				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+			}
 			if len(multiIndex) == 2 {
 				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
 			}
@@ -298,6 +301,246 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	if len(arrEs) > 0 {
 		tmps := arrEs
 		elastic.BulkSave(index, itype, &tmps, true)
+		if other_index != "" && other_itype != "" {
+			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+		}
+		if len(multiIndex) == 2 {
+			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+		}
+	}
+	UpdatesLock.Unlock()
+	return n1, n2
+}
+
+func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
+	n1, n2 := 0, 0
+	//线程池
+	UpdatesLock := sync.Mutex{}
+	fields := strings.Split(bidding["fields"].(string), ",")
+	//更新数组
+	arr := [][]map[string]interface{}{}
+	arrEs := []map[string]interface{}{}
+	//对比两张表数据,减少查询次数
+	var compare bson.M
+	log.Println("开始迭代..")
+	for n, tmp := range infos {
+		n1++
+		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
+		// 	tmp = make(map[string]interface{})
+		// 	continue
+		// }
+		update := map[string]interface{}{} //要更新的mongo数据
+		//对比方法----------------
+		tid := qutil.BsonIdToSId(tmp["_id"])
+		if eMap[tid] != nil {
+			compare = eMap[tid]
+			if qutil.IntAll(compare["dataging"]) == 1 { //extract中dataging=1不生索引
+				tmp = make(map[string]interface{})
+				compare = nil
+				continue
+			}
+			delete(eMap, tid)
+			//更新bidding表,生成索引
+			for _, k := range fields {
+				v1 := compare[k] //extract
+				v2 := tmp[k]     //bidding
+				if v2 == nil && v1 != nil {
+					update[k] = v1
+				} else if v2 != nil && v1 != nil {
+					//update[k+"_b"] = v2
+					update[k] = v1
+				} else if v2 != nil && v1 == nil {
+					//update[k+"_b"] = v2
+				}
+			}
+			if qutil.IntAll(compare["repeat"]) == 1 {
+				update["extracttype"] = -1
+			} else {
+				update["extracttype"] = 1
+			}
+		} else {
+			compare = nil
+		}
+		//下面可以多线程跑的--->
+		//处理分类
+		if compare != nil { //extract
+			subscopeclass, _ := compare["subscopeclass"].([]interface{})
+			if subscopeclass != nil {
+				//str := ","
+				m1 := map[string]bool{}
+				newclass := []string{}
+				for _, sc := range subscopeclass {
+					sclass, _ := sc.(string)
+					if !m1[sclass] {
+						m1[sclass] = true
+						//str += sclass + ","
+						newclass = append(newclass, sclass)
+					}
+				}
+				update["s_subscopeclass"] = strings.Join(newclass, ",")
+				update["subscopeclass"] = newclass
+			}
+			//处理中标企业
+			//			winner, _ := compare["winner"].(string)
+			//			m1 := map[string]bool{}
+			//			if winner != "" {
+			//				m1[winner] = true
+			//			}
+			//			package1 := compare["package"]
+			//			if package1 != nil {
+			//				packageM, _ := package1.(map[string]interface{})
+			//				for _, p := range packageM {
+			//					pm, _ := p.(map[string]interface{})
+			//					pw, _ := pm["winner"].(string)
+			//					if pw != "" {
+			//						m1[pw] = true
+			//					}
+			//				}
+			//			}
+			compare = nil
+			//			if len(m1) > 0 {
+			//				//str := ","
+			//				winnerarr := []string{}
+			//				for k, _ := range m1 {
+			//					//str += k + ","
+			//					winnerarr = append(winnerarr, k)
+			//				}
+			//				update["s_winner"] = strings.Join(winnerarr, ",")
+			//			}
+		}
+		//------------------对比结束
+
+		//处理key descript
+		if bkey == "" {
+			DealInfo(&tmp, &update)
+		}
+		//同时保存到elastic
+		for tk, tv := range update {
+			tmp[tk] = tv
+		}
+		if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
+			if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
+				delete(tmp, "supervisorrate")
+			}
+		}
+		//对projectscope字段的索引处理
+		ps, _ := tmp["projectscope"].(string)
+		//		if ps == "" {
+		//			tmp["projectscope"] = "" //= tmp["detail"]
+		//		}
+		if len(ps) > ESLEN {
+			tmp["projectscope"] = string(([]rune(ps))[:4000])
+		}
+		//对标的物为空处理
+		if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text
+			if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
+				tmp["detail"] = filetext    //更新es中detail
+				update["detail"] = filetext //更新mongo中detail
+				update["filedetail"] = 1    //mongo中打标记
+			}
+			tmp["filetext"] = filetext
+		}
+		if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
+			delete(tmp, "purchasing")
+		}
+		if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
+			delete(tmp, "purchasinglist")
+		}
+		UpdatesLock.Lock()
+		//		for k1, _ := range tmp {
+		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
+		//				delete(tmp, k1)
+		//			}
+		//		}
+		go IS.Add("bidding")
+		if qutil.IntAll(update["extracttype"]) != -1 {
+			newTmp := map[string]interface{}{}     //最终生索引的数据
+			for _, v := range biddingIndexFields { //索引字段
+				if tmp[v] != nil {
+					if "projectinfo" == v {
+						mp, _ := tmp[v].(map[string]interface{})
+						if mp != nil {
+							newmap := map[string]interface{}{}
+							for _, v1 := range projectinfoFields {
+								if mp[v1] != nil {
+									newmap[v1] = mp[v1]
+								}
+							}
+							if len(newmap) > 0 {
+								newTmp[v] = newmap
+							}
+							// attachments := mp["attachments"]
+							// con := ""
+							// if attachments != nil {
+							// 	am, _ := attachments.(map[string]interface{})
+							// 	if am != nil {
+							// 		for _, v1 := range am {
+							// 			vm, _ := v1.(map[string]interface{})
+							// 			if vm != nil {
+							// 				c, _ := vm["content"].(string)
+							// 				con += c
+							// 			}
+							// 		}
+							// 	}
+							// }
+							// con = FilterDetailSpace(con)
+							// if con != "" {
+							// 	newTmp["attachments"] = con
+							// }
+						}
+					} else {
+						if v == "detail" {
+							detail, _ := tmp[v].(string)
+							newTmp[v] = FilterDetail(detail)
+						} else {
+							newTmp[v] = tmp[v]
+						}
+					}
+				}
+			}
+			arrEs = append(arrEs, newTmp)
+		}
+		if len(update) > 0 {
+			arr = append(arr, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": update,
+				},
+			})
+		}
+		if len(arr) >= BulkSize-1 {
+			mgo.UpdateBulkAll(db, c, arr...)
+			arr = [][]map[string]interface{}{}
+		}
+		if len(arrEs) >= BulkSize-1 {
+			tmps := arrEs
+			elastic.BulkSave(index, itype, &tmps, true)
+			if other_index != "" && other_itype != "" {
+				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+			}
+			if len(multiIndex) == 2 {
+				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+			}
+			arrEs = []map[string]interface{}{}
+		}
+		UpdatesLock.Unlock()
+		if n%100 == 0 {
+			log.Println("current:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+	UpdatesLock.Lock()
+	if len(arr) > 0 {
+		mgo.UpdateBulkAll(db, c, arr...)
+	}
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		elastic.BulkSave(index, itype, &tmps, true)
+		if other_index != "" && other_itype != "" {
+			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+		}
 		if len(multiIndex) == 2 {
 			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
 		}

+ 2 - 1
udpcreateindex/src/bidingpurchasing.go

@@ -7,7 +7,8 @@ import (
 	"sync"
 	"unicode/utf8"
 
-	u "./util"
+	u "util"
+
 	"gopkg.in/mgo.v2/bson"
 )
 

+ 6 - 0
udpcreateindex/src/config.json

@@ -94,5 +94,11 @@
     "elastic": {
         "addr": "http://192.168.3.128:9800",
         "pool": 12
+    },
+    "elastic_other": {
+        "addr": "http://127.0.0.1:9800",
+        "pool": 12,
+        "index": "bidding_other_v1",
+        "type": "bidding"
     }
 }

+ 19 - 3
udpcreateindex/src/main.go

@@ -10,8 +10,7 @@ import (
 	"qfw/util/mongodb"
 	"strings"
 	"time"
-
-	u "./util"
+	u "util"
 )
 
 var (
@@ -29,6 +28,10 @@ var (
 	multiIndex           []string
 	purchasinglistFields []string
 	BulkSize             = 400
+	//bidding_other连接信息
+	bidding_other_es *elastic.Elastic
+	other_index      string
+	other_itype      string
 
 	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
 )
@@ -91,9 +94,22 @@ func init() {
 	}
 	mgostandard.InitPool()
 	log.Println(standard["addr"].(string))
-
+	//初始化es
+	//bidding
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
+	//bidding_other
+	if Sysconfig["elastic_other"] != nil {
+		econf_other := Sysconfig["elastic_other"].(map[string]interface{})
+		other_index = econf_other["index"].(string)
+		other_itype = econf_other["type"].(string)
+		bidding_other_es = &elastic.Elastic{
+			S_esurl: econf_other["addr"].(string),
+			I_size:  util.IntAllDef(econf_other["pool"], 5),
+		}
+		bidding_other_es.InitElasticSize()
+	}
+	//
 	if bidding["indexfields"] != nil {
 		biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
 	}