Selaa lähdekoodia

extract中dataging为1的不生索引

maxiaoshan 5 vuotta sitten
vanhempi
commit
e9e79be166

+ 9 - 4
udpcreateindex/src/biddingindex.go

@@ -95,15 +95,20 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	log.Println("开始迭代..")
 	for n, tmp := range infos {
 		n1++
-		if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
-			tmp = make(map[string]interface{})
-			continue
-		}
+		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
+		// 	tmp = make(map[string]interface{})
+		// 	continue
+		// }
 		update := map[string]interface{}{} //要更新的mongo数据
 		//对比方法----------------
 		tid := qutil.BsonIdToSId(tmp["_id"])
 		if eMap[tid] != nil {
 			compare = eMap[tid]
+			if qutil.IntAll(compare["dataging"]) == 1 { //extract中dataging=1不生索引
+				tmp = make(map[string]interface{})
+				compare = nil
+				continue
+			}
 			delete(eMap, tid)
 			//更新bidding表,生成索引
 			for _, k := range fields {

+ 151 - 0
udpcreateindex/src/bidingpurchasing.go

@@ -204,3 +204,154 @@ func getFileText(tmp map[string]interface{}) (filetext string) {
 	}
 	return
 }
+
+func site_attach_text(q map[string]interface{}) {
+	defer util.Catch()
+	//锁
+	SaveUpdageLock := sync.Mutex{}
+	//连接参数
+	c, _ := bidding["collect"].(string)   //bidding表
+	db, _ := bidding["db"].(string)       //库
+	index, _ := bidding["index"].(string) //索引别名
+	itype, _ := bidding["type"].(string)
+	//
+	session := mgo.GetMgoConn(86400)
+	defer mgo.DestoryMongoConn(session)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	log.Println("site_attach_text:	", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Select(bson.M{
+		"projectinfo.attachment": 0,
+		"contenthtml":            0,
+	}).Iter()
+	arrEs := make([]map[string]interface{}, savesizei)
+	arrMgo := [][]map[string]interface{}{}
+	var n int
+	var indexnum int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		n++
+		site, _ := tmp["site"].(string)
+		if util.IntAll(tmp["extracttype"]) == -1 || util.IntAll(tmp["dataging"]) == 1 || tmp["attach_text"] == nil || site != "中国招标投标公共服务平台" {
+			tmp = make(map[string]interface{})
+			continue
+		}
+		newTmp := map[string]interface{}{} //最终生索引的数据
+		saveArr := []map[string]interface{}{}
+
+		filetext := getFileText(tmp) //oss拼装filetext
+		if len(filetext) > 0 {
+			tmp["detail"] = filetext //filetext替换detail
+			saveArr = append(saveArr, map[string]interface{}{"_id": tmp["_id"]})
+			saveArr = append(saveArr, map[string]interface{}{
+				"$set": map[string]interface{}{
+					"filedetail": 1,
+					"detail":     filetext,
+				},
+			})
+			newTmp["filetext"] = filetext //
+		} else {
+			//log.Println("filetext is null string:", tmp["_id"])
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		indexnum++
+
+		//purchasing
+		if purchasing, ok := tmp["purchasing"].(string); ok {
+			if len(purchasing) > 0 {
+				newTmp["purchasing"] = tmp["purchasing"]
+			}
+		}
+		//purchasinglist
+		if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok {
+			if len(purchasinglist) > 0 {
+				purchasinglist_new := []map[string]interface{}{}
+				for _, ls := range purchasinglist {
+					lsm_new := make(map[string]interface{})
+					lsm := ls.(map[string]interface{})
+					for _, pf := range purchasinglistFields {
+						if lsm[pf] != nil {
+							lsm_new[pf] = lsm[pf]
+						}
+					}
+					if lsm_new != nil && len(lsm_new) > 0 {
+						purchasinglist_new = append(purchasinglist_new, lsm_new)
+					}
+				}
+				if len(purchasinglist_new) > 0 {
+					newTmp["purchasinglist"] = purchasinglist_new
+				}
+			}
+		}
+
+		//处理数据
+		if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
+			if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
+				delete(tmp, "supervisorrate")
+			}
+		}
+		//对projectscope字段的索引处理
+		ps, _ := tmp["projectscope"].(string)
+		if len(ps) > ESLEN {
+			tmp["projectscope"] = string(([]rune(ps))[:4000])
+		}
+		SaveUpdageLock.Lock()
+		for _, v := range biddingIndexFields { //索引字段
+			if tmp[v] != nil {
+				if "projectinfo" == v {
+					mp, _ := tmp[v].(map[string]interface{})
+					if mp != nil {
+						newmap := map[string]interface{}{}
+						for _, v1 := range projectinfoFields {
+							if mp[v1] != nil {
+								newmap[v1] = mp[v1]
+							}
+						}
+						newTmp[v] = newmap
+					}
+				} else {
+					if v == "detail" {
+						detail, _ := tmp[v].(string)
+						newTmp[v] = FilterDetail(detail)
+					} else {
+						newTmp[v] = tmp[v]
+					}
+				}
+			}
+		}
+		arrEs = append(arrEs, newTmp) //要生索引数据
+		if len(saveArr) > 0 {
+			arrMgo = append(arrMgo, saveArr) //要更新数据
+		}
+		//批量更新
+		if len(arrMgo) >= savesizei-1 {
+			mgo.UpdateBulkAll(db, c, arrMgo...)
+			arrMgo = [][]map[string]interface{}{}
+		}
+		//生索引
+		if len(arrEs) >= savesizei-1 {
+			tmps := arrEs
+			elastic.BulkSave(index, itype, &tmps, true)
+			arrEs = []map[string]interface{}{}
+		}
+		SaveUpdageLock.Unlock()
+		//计数
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+
+	SaveUpdageLock.Lock()
+	if len(arrMgo) > 0 {
+		mgo.UpdateBulkAll(db, c, arrMgo...)
+	}
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		elastic.BulkSave(index, itype, &tmps, true)
+	}
+	SaveUpdageLock.Unlock()
+	log.Println("create filetext index...over", n, indexnum)
+}

+ 3 - 3
udpcreateindex/src/config.json

@@ -36,9 +36,9 @@
         "extractdb": "mxs",
         "extractcollect": "extract",
         "indexfields":[ 
-        "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo","filetext","purchasing","purchasinglist"
+        "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo"
         ],
-        "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass",
+        "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging",
         "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,industry",
         "purchasinglist":"itemname,model,unitname,number",
         "multiIndex": ""
@@ -92,7 +92,7 @@
         "db": "mxs"
     },
     "elastic": {
-        "addr": "http://127.0.0.1:9800",
+        "addr": "http://192.168.3.128:9800",
         "pool": 12
     }
 }

+ 11 - 11
udpcreateindex/src/task.go

@@ -12,7 +12,7 @@ import (
 func task_index() {
 	c := cron.New()
 	c.AddFunc("20 30 5 * * *", func() { task_projects() })
-	c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每两小时执行一次
+	c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
 	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 	c.Start()
 }
@@ -55,13 +55,13 @@ func task_qyxyindex() {
 	qyxyTask(q)
 }
 
-// func crontab() {
-// 	defer qutil.Catch()
-// 	q := map[string]interface{}{
-// 		"_id": map[string]interface{}{
-// 			"$gte": qutil.StringTOBsonId("5e344f0b50b5ea296ed0cfbd"), //2020-02-01
-// 			"$lte": qutil.StringTOBsonId("5e9f1880f2c1a7850ba43979"), //2020-04-22
-// 		},
-// 	}
-// 	site_attach_text(q)
-// }
+func crontab() {
+	defer qutil.Catch()
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": qutil.StringTOBsonId("5e990e1e50b5ea296ef47129"), //2020-02-01 5e344f0b50b5ea296ed0cfbd
+			"$lte": qutil.StringTOBsonId("5ea00952511b12033763cebd"), //2020-04-22 5e9f1880f2c1a7850ba43979
+		},
+	}
+	site_attach_text(q)
+}