Kaynağa Gözat

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

apple 5 yıl önce
ebeveyn
işleme
274ee32291

+ 3 - 304
udpcreateindex/src/biddingall.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -230,310 +231,7 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 								newmap := map[string]interface{}{}
 								for _, v1 := range projectinfoFields {
 									if mp[v1] != nil {
-										newmap[v1] = mp[v1]
-									}
-								}
-								newTmp[v] = newmap
-							}
-						} else if v == "purchasinglist" { //标的物处理
-							purchasinglist_new := []map[string]interface{}{}
-							if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
-								for _, ls := range pcl {
-									lsm_new := make(map[string]interface{})
-									lsm := ls.(map[string]interface{})
-									for _, pf := range purchasinglistFields {
-										if lsm[pf] != nil {
-											lsm_new[pf] = lsm[pf]
-										}
-									}
-									if lsm_new != nil && len(lsm_new) > 0 {
-										purchasinglist_new = append(purchasinglist_new, lsm_new)
-									}
-								}
-							}
-							if len(purchasinglist_new) > 0 {
-								newTmp[v] = purchasinglist_new
-							}
-
-						} else {
-							if v == "detail" {
-								detail, _ := tmp[v].(string)
-								newTmp[v] = FilterDetail(detail)
-							} else {
-								newTmp[v] = tmp[v]
-							}
-						}
-					} /*else if v == "budget" || v == "bidamount" {
-						newTmp[v] = nil
-					}*/
-				}
-				arrEs = append(arrEs, newTmp)
-			}
-			if len(update) > 0 {
-				queryId := map[string]interface{}{"_id": tmp["_id"]}
-				set := map[string]interface{}{"$set": update}
-				if len(del) > 0 { //删除的数据
-					set["$unset"] = del
-				}
-				arr = append(arr, []map[string]interface{}{queryId, set})
-			}
-			if len(arr) >= BulkSize {
-				mgo.UpdateBulkAll(db, c, arr...)
-				arr = [][]map[string]interface{}{}
-			}
-			if len(arrEs) >= BulkSize {
-				tmps := arrEs
-				elastic.BulkSave(index, itype, &tmps, true)
-				if len(multiIndex) == 2 {
-					elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-				}
-				arrEs = []map[string]interface{}{}
-			}
-			UpdatesLock.Unlock()
-		}(tmp, update, compare, del, bnil)
-		if n%1000 == 0 {
-			log.Println("current:", n, _id)
-		}
-		tmp = make(map[string]interface{})
-	}
-	for i := 0; i < thread; i++ {
-		mpool <- true
-	}
-	UpdatesLock.Lock()
-	//log.Println(db, c, index, itype, arr, arrEs)
-	if len(arr) > 0 {
-		mgo.UpdateBulkAll(db, c, arr...)
-	}
-	if len(arrEs) > 0 {
-		tmps := arrEs
-		elastic.BulkSave(index, itype, &tmps, true)
-		if len(multiIndex) == 2 {
-			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-		}
-	}
-	UpdatesLock.Unlock()
-	log.Println(mapInfo, "create bidding index...over", n)
-}
-
-func biddingAllTask1(data []byte, mapInfo map[string]interface{}) {
-	defer qutil.Catch()
-	thread := 40
-	var mpool = make(chan bool, thread)
-	q, _ := mapInfo["query"].(map[string]interface{})
-	if q == nil {
-		q = map[string]interface{}{
-			"_id": bson.M{
-				"$gt":  qutil.StringTOBsonId(mapInfo["gtid"].(string)),
-				"$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
-			},
-		}
-	}
-	//bidding库
-	session := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(session)
-	//extract库
-	extractsession := extractmgo.GetMgoConn()
-	defer extractmgo.DestoryMongoConn(extractsession)
-	//连接信息
-	c, _ := mapInfo["coll"].(string)
-	if c == "" {
-		c, _ = bidding["collect"].(string)
-	}
-
-	extractc, _ := bidding["extractcollect"].(string)
-	db, _ := bidding["db"].(string)
-	extractdb, _ := bidding["extractdb"].(string)
-	index, _ := bidding["index"].(string)
-	itype, _ := bidding["type"].(string)
-	count, _ := session.DB(db).C(c).Find(&q).Count()
-	fields := strings.Split(bidding["fields"].(string), ",")
-	//线程池
-	UpdatesLock := sync.Mutex{}
-
-	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
-	//查询招标数据
-	query := session.DB(db).C(c).Find(q).Select(bson.M{
-		"projectinfo.attachment": 0,
-		"contenthtml":            0,
-	}).Sort("_id").Iter()
-	//查询抽取结果
-	extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter()
-
-	n := 0
-
-	//更新数组
-	arr := [][]map[string]interface{}{}
-	arrEs := []map[string]interface{}{}
-	//对比两张表数据,减少查询次数
-	var compare bson.M
-	bnil := false
-	for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
-		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
-		// 	tmp = make(map[string]interface{})
-		// 	continue
-		// }
-		update := map[string]interface{}{}
-		del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
-		//对比方法----------------
-		for {
-			if compare == nil {
-				compare = make(bson.M)
-				if !extractquery.Next(compare) {
-					break
-				}
-			}
-			if compare != nil {
-				//对比
-				cid := qutil.BsonIdToSId(compare["_id"])
-				tid := qutil.BsonIdToSId(tmp["_id"])
-				if cid == tid {
-					bnil = false
-					//更新bidding表,生成索引
-					for _, k := range fields { //fields更新到mongo的字段
-						v1 := compare[k] //extract
-						v2 := tmp[k]     //bidding
-						if v2 == nil && v1 != nil {
-							update[k] = v1
-						} else if v2 != nil && v1 != nil {
-							update[k] = v1
-						} else if v2 != nil && v1 == nil { //
-							if k == "s_subscopeclass" && del["subscopeclass"] == nil {
-								continue
-							}
-							del[k] = 1
-							//qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
-						}
-					}
-					if qutil.IntAll(compare["repeat"]) == 1 {
-						update["extracttype"] = -1
-					} else {
-						update["extracttype"] = 1
-					}
-					break
-				} else {
-					if cid < tid {
-						bnil = false
-						compare = nil
-						continue
-					} else {
-						bnil = true
-						break
-					}
-				}
-			} else {
-				bnil = false
-				break
-			}
-		}
-		//下面可以多线程跑的--->
-		//处理分类
-		mpool <- true
-		_id := tmp["_id"]
-		go func(tmp, update, compare, del map[string]interface{}, bnil bool) {
-			defer func() {
-				<-mpool
-			}()
-			if !bnil && compare != nil {
-				subscopeclass, _ := compare["subscopeclass"].([]interface{})
-				if subscopeclass != nil {
-					m1 := map[string]bool{}
-					newclass := []string{}
-					for _, sc := range subscopeclass {
-						sclass, _ := sc.(string)
-						if !m1[sclass] {
-							m1[sclass] = true
-							newclass = append(newclass, sclass)
-						}
-					}
-					update["s_subscopeclass"] = strings.Join(newclass, ",")
-					update["subscopeclass"] = newclass
-				}
-				//处理中标企业
-				//				winner, _ := compare["winner"].(string)
-				//				m1 := map[string]bool{}
-				//				if winner != "" {
-				//					m1[winner] = true
-				//				}
-				//				package1 := compare["package"]
-				//				if package1 != nil {
-				//					packageM, _ := package1.(map[string]interface{})
-				//					for _, p := range packageM {
-				//						pm, _ := p.(map[string]interface{})
-				//						pw, _ := pm["winner"].(string)
-				//						if pw != "" {
-				//							m1[pw] = true
-				//						}
-				//					}
-				//				}
-				compare = nil
-				//				if len(m1) > 0 {
-				//					//str := ","
-				//					winnerarr := []string{}
-				//					for k, _ := range m1 {
-				//						//str += k + ","
-				//						winnerarr = append(winnerarr, k)
-				//					}
-				//					update["s_winner"] = strings.Join(winnerarr, ",")
-				//				}
-			}
-			//------------------对比结束
-			//同时保存到elastic
-			for tk, tv := range update {
-				tmp[tk] = tv
-			}
-			if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
-				if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
-					delete(tmp, "supervisorrate")
-				}
-			}
-			//对projectscope字段的索引处理
-			ps, _ := tmp["projectscope"].(string)
-			//			if ps == "" {
-			//				tmp["projectscope"] = ""
-			//			}
-			if len(ps) > ESLEN {
-				tmp["projectscope"] = string(([]rune(ps))[:4000])
-			}
-			//对标的物为空处理
-			if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text
-				if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
-					tmp["detail"] = filetext    //更新es中detail
-					update["detail"] = filetext //更新mongo中detail
-					update["filedetail"] = 1    //mongo中打标记
-				}
-				tmp["filetext"] = filetext
-			}
-			if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
-				delete(tmp, "purchasing")
-			}
-			if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
-				delete(tmp, "purchasinglist")
-			}
-			//预算和中标金额
-			//			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			//				tmp["budget"] = nil
-			//			} else if sbd, ok := tmp["budget"].(string); ok {
-			//				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			//			}
-			//			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			//				tmp["bidamount"] = nil
-			//			} else if sbd, ok := tmp["bidamount"].(string); ok {
-			//				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			//			}
-
-			//go IS.Add("bidding")
-			UpdatesLock.Lock()
-			if qutil.IntAll(update["extracttype"]) != -1 {
-				newTmp := map[string]interface{}{}
-				for _, v := range biddingIndexFields { //
-					if tmp[v] != nil && del[v] == nil { //
-						if "projectinfo" == v {
-							mp, _ := tmp[v].(map[string]interface{})
-							if mp != nil {
-								newmap := map[string]interface{}{}
-								for _, v1 := range projectinfoFields {
-									if mp[v1] != nil {
-										newmap[v1] = mp[v1]
+										newmap[v1] = fmt.Sprint(mp[v1])
 									}
 								}
 								if len(newmap) > 0 {
@@ -559,6 +257,7 @@ func biddingAllTask1(data []byte, mapInfo map[string]interface{}) {
 							if len(purchasinglist_new) > 0 {
 								newTmp[v] = purchasinglist_new
 							}
+
 						} else {
 							if v == "detail" {
 								detail, _ := tmp[v].(string)

+ 44 - 17
udpcreateindex/src/biddingdata.go

@@ -1,7 +1,7 @@
 package main
 
 import (
-	//	"fmt"
+	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -122,12 +122,11 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 					bnil = false
 					//更新bidding表,生成索引
 					for _, k := range fields {
-						v1 := compare[k]
-						v2 := tmp[k]
+						v1 := compare[k] //extract
+						v2 := tmp[k]     //bidding
 						if v2 == nil && v1 != nil {
 							update[k] = v1
 						} else if v2 != nil && v1 != nil {
-							//update[k+"_b"] = v2
 							update[k] = v1
 						} else if v2 != nil && v1 == nil {
 							update[k] = v2
@@ -165,14 +164,12 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 			if !bnil && compare != nil {
 				subscopeclass, _ := compare["subscopeclass"].([]interface{})
 				if subscopeclass != nil {
-					//str := ","
 					m1 := map[string]bool{}
 					newclass := []string{}
 					for _, sc := range subscopeclass {
 						sclass, _ := sc.(string)
 						if !m1[sclass] {
 							m1[sclass] = true
-							//str += sclass + ","
 							newclass = append(newclass, sclass)
 						}
 					}
@@ -213,14 +210,26 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 			for tk, tv := range update {
 				tmp[tk] = tv
 			}
+			if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
+				if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
+					delete(tmp, "supervisorrate")
+				}
+			}
 			//对projectscope字段的索引处理
 			ps, _ := tmp["projectscope"].(string)
-			if ps == "" {
-				tmp["projectscope"] = "" //= tmp["detail"]
-			}
 			if len(ps) > ESLEN {
 				tmp["projectscope"] = string(([]rune(ps))[:4000])
 			}
+			//对标的物为空处理
+			if filetext := getFileText(tmp); len(filetext) > 0 { //attach_text
+				tmp["filetext"] = filetext
+			}
+			if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
+				delete(tmp, "purchasing")
+			}
+			if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
+				delete(tmp, "purchasinglist")
+			}
 			//			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
 			//				tmp["budget"] = nil
 			//			} else if sbd, ok := tmp["budget"].(string); ok {
@@ -234,7 +243,7 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 
 			if qutil.IntAll(update["extracttype"]) != -1 {
 				newTmp := map[string]interface{}{}
-				for _, v := range indexfield {
+				for _, v := range biddingIndexFields { // indexfield
 					if tmp[v] != nil {
 						if "projectinfo" == v {
 							mp, _ := tmp[v].(map[string]interface{})
@@ -242,10 +251,12 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 								newmap := map[string]interface{}{}
 								for _, v1 := range projectinfoFields {
 									if mp[v1] != nil {
-										newmap[v1] = mp[v1]
+										newmap[v1] = fmt.Sprint(mp[v1])
 									}
 								}
-								newTmp[v] = newmap
+								if len(newmap) > 0 {
+									newTmp[v] = newmap
+								}
 								// attachments := mp["attachments"]
 								// con := ""
 								// if attachments != nil {
@@ -265,6 +276,25 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 								// 	newTmp["attachments"] = con
 								// }
 							}
+						} else if v == "purchasinglist" { //标的物处理
+							purchasinglist_new := []map[string]interface{}{}
+							if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
+								for _, ls := range pcl {
+									lsm_new := make(map[string]interface{})
+									lsm := ls.(map[string]interface{})
+									for _, pf := range purchasinglistFields {
+										if lsm[pf] != nil {
+											lsm_new[pf] = lsm[pf]
+										}
+									}
+									if lsm_new != nil && len(lsm_new) > 0 {
+										purchasinglist_new = append(purchasinglist_new, lsm_new)
+									}
+								}
+							}
+							if len(purchasinglist_new) > 0 {
+								newTmp[v] = purchasinglist_new
+							}
 						} else {
 							if v == "detail" {
 								detail, _ := tmp[v].(string)
@@ -273,8 +303,6 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 								newTmp[v] = tmp[v]
 							}
 						}
-					} else if v == "budget" || v == "bidamount" {
-						newTmp[v] = nil
 					}
 				}
 				UpdatesLock.Lock()
@@ -284,7 +312,7 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 			UpdatesLock.Lock()
 			if len(arrEs) >= BulkSize-1 {
 				tmps := arrEs
-				elastic.BulkSave(index, itype, &tmps, false)
+				elastic.BulkSave(index, itype, &tmps, true)
 				arrEs = []map[string]interface{}{}
 			}
 			UpdatesLock.Unlock()
@@ -300,8 +328,7 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 	UpdatesLock.Lock()
 	if len(arrEs) > 0 {
 		tmps := arrEs
-		log.Println(tmps[0])
-		elastic.BulkSave(index, itype, &tmps, false)
+		elastic.BulkSave(index, itype, &tmps, true)
 	}
 	UpdatesLock.Unlock()
 	log.Println(mapInfo, "create bidding index...over", n)

+ 297 - 12
udpcreateindex/src/biddingindex.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"encoding/json"
+	"fmt"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -82,6 +83,258 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2)
 }
 
+// func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
+// 	n1, n2 := 0, 0
+// 	//线程池
+// 	UpdatesLock := sync.Mutex{}
+// 	fields := strings.Split(bidding["fields"].(string), ",")
+// 	//更新数组
+// 	arr := [][]map[string]interface{}{}
+// 	arrEs := []map[string]interface{}{}
+// 	//对比两张表数据,减少查询次数
+// 	var compare bson.M
+// 	log.Println("开始迭代..")
+// 	for n, tmp := range infos {
+// 		n1++
+// 		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
+// 		// 	tmp = make(map[string]interface{})
+// 		// 	continue
+// 		// }
+// 		update := map[string]interface{}{} //要更新的mongo数据
+// 		//对比方法----------------
+// 		tid := qutil.BsonIdToSId(tmp["_id"])
+// 		if eMap[tid] != nil {
+// 			compare = eMap[tid]
+// 			if qutil.IntAll(compare["dataging"]) == 1 { //extract中dataging=1不生索引
+// 				tmp = make(map[string]interface{})
+// 				compare = nil
+// 				continue
+// 			}
+// 			delete(eMap, tid)
+// 			//更新bidding表,生成索引
+// 			for _, k := range fields {
+// 				v1 := compare[k] //extract
+// 				v2 := tmp[k]     //bidding
+// 				if v2 == nil && v1 != nil {
+// 					update[k] = v1
+// 				} else if v2 != nil && v1 != nil {
+// 					//update[k+"_b"] = v2
+// 					update[k] = v1
+// 				} else if v2 != nil && v1 == nil {
+// 					//update[k+"_b"] = v2
+// 				}
+// 			}
+// 			if qutil.IntAll(compare["repeat"]) == 1 {
+// 				update["extracttype"] = -1
+// 			} else {
+// 				update["extracttype"] = 1
+// 			}
+// 		} else {
+// 			compare = nil
+// 		}
+// 		//下面可以多线程跑的--->
+// 		//处理分类
+// 		if compare != nil { //extract
+// 			subscopeclass, _ := compare["subscopeclass"].([]interface{})
+// 			if subscopeclass != nil {
+// 				//str := ","
+// 				m1 := map[string]bool{}
+// 				newclass := []string{}
+// 				for _, sc := range subscopeclass {
+// 					sclass, _ := sc.(string)
+// 					if !m1[sclass] {
+// 						m1[sclass] = true
+// 						//str += sclass + ","
+// 						newclass = append(newclass, sclass)
+// 					}
+// 				}
+// 				update["s_subscopeclass"] = strings.Join(newclass, ",")
+// 				update["subscopeclass"] = newclass
+// 			}
+// 			//处理中标企业
+// 			//			winner, _ := compare["winner"].(string)
+// 			//			m1 := map[string]bool{}
+// 			//			if winner != "" {
+// 			//				m1[winner] = true
+// 			//			}
+// 			//			package1 := compare["package"]
+// 			//			if package1 != nil {
+// 			//				packageM, _ := package1.(map[string]interface{})
+// 			//				for _, p := range packageM {
+// 			//					pm, _ := p.(map[string]interface{})
+// 			//					pw, _ := pm["winner"].(string)
+// 			//					if pw != "" {
+// 			//						m1[pw] = true
+// 			//					}
+// 			//				}
+// 			//			}
+// 			compare = nil
+// 			//			if len(m1) > 0 {
+// 			//				//str := ","
+// 			//				winnerarr := []string{}
+// 			//				for k, _ := range m1 {
+// 			//					//str += k + ","
+// 			//					winnerarr = append(winnerarr, k)
+// 			//				}
+// 			//				update["s_winner"] = strings.Join(winnerarr, ",")
+// 			//			}
+// 		}
+// 		//------------------对比结束
+
+// 		//处理key descript
+// 		if bkey == "" {
+// 			DealInfo(&tmp, &update)
+// 		}
+// 		//同时保存到elastic
+// 		for tk, tv := range update {
+// 			tmp[tk] = tv
+// 		}
+// 		if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
+// 			if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
+// 				delete(tmp, "supervisorrate")
+// 			}
+// 		}
+// 		//对projectscope字段的索引处理
+// 		ps, _ := tmp["projectscope"].(string)
+// 		//		if ps == "" {
+// 		//			tmp["projectscope"] = "" //= tmp["detail"]
+// 		//		}
+// 		if len(ps) > ESLEN {
+// 			tmp["projectscope"] = string(([]rune(ps))[:4000])
+// 		}
+// 		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+// 		//			tmp["budget"] = nil
+// 		//		} else if sbd, ok := tmp["budget"].(string); ok {
+// 		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+// 		//		}
+// 		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+// 		//			tmp["bidamount"] = nil
+// 		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
+// 		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+// 		//		}
+// 		UpdatesLock.Lock()
+// 		//		for k1, _ := range tmp {
+// 		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
+// 		//				delete(tmp, k1)
+// 		//			}
+// 		//		}
+// 		go IS.Add("bidding")
+// 		if qutil.IntAll(update["extracttype"]) != -1 {
+// 			newTmp := map[string]interface{}{}     //最终生索引的数据
+// 			for _, v := range biddingIndexFields { //索引字段
+// 				if tmp[v] != nil {
+// 					if "projectinfo" == v {
+// 						mp, _ := tmp[v].(map[string]interface{})
+// 						if mp != nil {
+// 							newmap := map[string]interface{}{}
+// 							for _, v1 := range projectinfoFields {
+// 								if mp[v1] != nil {
+// 									newmap[v1] = fmt.Sprint(mp[v1])
+// 								}
+// 							}
+// 							if len(newmap) > 0 {
+// 								newTmp[v] = newmap
+// 							}
+// 							// attachments := mp["attachments"]
+// 							// con := ""
+// 							// if attachments != nil {
+// 							// 	am, _ := attachments.(map[string]interface{})
+// 							// 	if am != nil {
+// 							// 		for _, v1 := range am {
+// 							// 			vm, _ := v1.(map[string]interface{})
+// 							// 			if vm != nil {
+// 							// 				c, _ := vm["content"].(string)
+// 							// 				con += c
+// 							// 			}
+// 							// 		}
+// 							// 	}
+// 							// }
+// 							// con = FilterDetailSpace(con)
+// 							// if con != "" {
+// 							// 	newTmp["attachments"] = con
+// 							// }
+// 						}
+// 					} else if v == "purchasinglist" { //标的物处理
+// 						purchasinglist_new := []map[string]interface{}{}
+// 						if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
+// 							for _, ls := range pcl {
+// 								lsm_new := make(map[string]interface{})
+// 								lsm := ls.(map[string]interface{})
+// 								for _, pf := range purchasinglistFields {
+// 									if lsm[pf] != nil {
+// 										lsm_new[pf] = lsm[pf]
+// 									}
+// 								}
+// 								if lsm_new != nil && len(lsm_new) > 0 {
+// 									purchasinglist_new = append(purchasinglist_new, lsm_new)
+// 								}
+// 							}
+// 						}
+// 						if len(purchasinglist_new) > 0 {
+// 							newTmp[v] = purchasinglist_new
+// 						}
+
+// 					} else {
+// 						if v == "detail" {
+// 							detail, _ := tmp[v].(string)
+// 							newTmp[v] = FilterDetail(detail)
+// 						} else {
+// 							newTmp[v] = tmp[v]
+// 						}
+// 					}
+// 				}
+// 			}
+// 			arrEs = append(arrEs, newTmp)
+// 		}
+// 		if len(update) > 0 {
+// 			arr = append(arr, []map[string]interface{}{
+// 				map[string]interface{}{
+// 					"_id": tmp["_id"],
+// 				},
+// 				map[string]interface{}{
+// 					"$set": update,
+// 				},
+// 			})
+// 		}
+// 		if len(arr) >= BulkSize-1 {
+// 			mgo.UpdateBulkAll(db, c, arr...)
+// 			arr = [][]map[string]interface{}{}
+// 		}
+// 		if len(arrEs) >= BulkSize-1 {
+// 			tmps := arrEs
+// 			elastic.BulkSave(index, itype, &tmps, true)
+// 			if other_index != "" && other_itype != "" {
+// 				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+// 			}
+// 			if len(multiIndex) == 2 {
+// 				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+// 			}
+// 			arrEs = []map[string]interface{}{}
+// 		}
+// 		UpdatesLock.Unlock()
+// 		if n%100 == 0 {
+// 			log.Println("current:", n)
+// 		}
+// 		tmp = make(map[string]interface{})
+// 	}
+// 	UpdatesLock.Lock()
+// 	if len(arr) > 0 {
+// 		mgo.UpdateBulkAll(db, c, arr...)
+// 	}
+// 	if len(arrEs) > 0 {
+// 		tmps := arrEs
+// 		elastic.BulkSave(index, itype, &tmps, true)
+// 		if other_index != "" && other_itype != "" {
+// 			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+// 		}
+// 		if len(multiIndex) == 2 {
+// 			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+// 		}
+// 	}
+// 	UpdatesLock.Unlock()
+// 	return n1, n2
+// }
+
 func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
 	n1, n2 := 0, 0
 	//线程池
@@ -201,16 +454,21 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		if len(ps) > ESLEN {
 			tmp["projectscope"] = string(([]rune(ps))[:4000])
 		}
-		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-		//			tmp["budget"] = nil
-		//		} else if sbd, ok := tmp["budget"].(string); ok {
-		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		//		}
-		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-		//			tmp["bidamount"] = nil
-		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
-		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		//		}
+		//对标的物为空处理
+		if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text
+			// if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
+			// 	tmp["detail"] = filetext    //更新es中detail
+			// 	update["detail"] = filetext //更新mongo中detail
+			// 	update["filedetail"] = 1    //mongo中打标记
+			// }
+			tmp["filetext"] = filetext
+		}
+		if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
+			delete(tmp, "purchasing")
+		}
+		if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
+			delete(tmp, "purchasinglist")
+		}
 		UpdatesLock.Lock()
 		//		for k1, _ := range tmp {
 		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
@@ -228,10 +486,12 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 							newmap := map[string]interface{}{}
 							for _, v1 := range projectinfoFields {
 								if mp[v1] != nil {
-									newmap[v1] = mp[v1]
+									newmap[v1] = fmt.Sprint(mp[v1])
 								}
 							}
-							newTmp[v] = newmap
+							if len(newmap) > 0 {
+								newTmp[v] = newmap
+							}
 							// attachments := mp["attachments"]
 							// con := ""
 							// if attachments != nil {
@@ -251,6 +511,25 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 							// 	newTmp["attachments"] = con
 							// }
 						}
+					} else if v == "purchasinglist" { //标的物处理
+						purchasinglist_new := []map[string]interface{}{}
+						if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
+							for _, ls := range pcl {
+								lsm_new := make(map[string]interface{})
+								lsm := ls.(map[string]interface{})
+								for _, pf := range purchasinglistFields {
+									if lsm[pf] != nil {
+										lsm_new[pf] = lsm[pf]
+									}
+								}
+								if lsm_new != nil && len(lsm_new) > 0 {
+									purchasinglist_new = append(purchasinglist_new, lsm_new)
+								}
+							}
+						}
+						if len(purchasinglist_new) > 0 {
+							newTmp[v] = purchasinglist_new
+						}
 					} else {
 						if v == "detail" {
 							detail, _ := tmp[v].(string)
@@ -280,6 +559,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		if len(arrEs) >= BulkSize-1 {
 			tmps := arrEs
 			elastic.BulkSave(index, itype, &tmps, true)
+			if other_index != "" && other_itype != "" {
+				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+			}
 			if len(multiIndex) == 2 {
 				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
 			}
@@ -298,6 +580,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	if len(arrEs) > 0 {
 		tmps := arrEs
 		elastic.BulkSave(index, itype, &tmps, true)
+		if other_index != "" && other_itype != "" {
+			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+		}
 		if len(multiIndex) == 2 {
 			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
 		}

+ 2 - 1
udpcreateindex/src/bidingpurchasing.go

@@ -7,7 +7,8 @@ import (
 	"sync"
 	"unicode/utf8"
 
-	u "./util"
+	u "util"
+
 	"gopkg.in/mgo.v2/bson"
 )
 

+ 7 - 1
udpcreateindex/src/config.json

@@ -39,7 +39,7 @@
         "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo","purchasing","purchasinglist","filetext"
         ],
         "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging",
-        "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,industry",
+        "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,approvedept,approvenumber,projecttype,approvecity",
         "purchasinglist":"itemname,model,unitname,number",
         "multiIndex": ""
     },
@@ -94,5 +94,11 @@
     "elastic": {
         "addr": "http://192.168.3.128:9800",
         "pool": 12
+    },
+    "elastic_other": {
+        "addr": "http://127.0.0.1:9800",
+        "pool": 12,
+        "index": "bidding_v2",
+        "type": "bidding"
     }
 }

+ 19 - 3
udpcreateindex/src/main.go

@@ -10,8 +10,7 @@ import (
 	"qfw/util/mongodb"
 	"strings"
 	"time"
-
-	u "./util"
+	u "util"
 )
 
 var (
@@ -29,6 +28,10 @@ var (
 	multiIndex           []string
 	purchasinglistFields []string
 	BulkSize             = 400
+	//bidding_other连接信息
+	bidding_other_es *elastic.Elastic
+	other_index      string
+	other_itype      string
 
 	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
 )
@@ -91,9 +94,22 @@ func init() {
 	}
 	mgostandard.InitPool()
 	log.Println(standard["addr"].(string))
-
+	//初始化es
+	//bidding
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
+	//bidding_other
+	if Sysconfig["elastic_other"] != nil {
+		econf_other := Sysconfig["elastic_other"].(map[string]interface{})
+		other_index = econf_other["index"].(string)
+		other_itype = econf_other["type"].(string)
+		bidding_other_es = &elastic.Elastic{
+			S_esurl: econf_other["addr"].(string),
+			I_size:  util.IntAllDef(econf_other["pool"], 5),
+		}
+		bidding_other_es.InitElasticSize()
+	}
+	//
 	if bidding["indexfields"] != nil {
 		biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
 	}

+ 1 - 1
udpcreateindex/src/task.go

@@ -12,7 +12,7 @@ import (
 func task_index() {
 	c := cron.New()
 	c.AddFunc("20 30 5 * * *", func() { task_projects() })
-	c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
+	//c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
 	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 	c.Start()
 }