maxiaoshan 5 years ago
parent
commit
6df23763f3

+ 3 - 304
udpcreateindex/src/biddingall.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -230,310 +231,7 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 								newmap := map[string]interface{}{}
 								for _, v1 := range projectinfoFields {
 									if mp[v1] != nil {
-										newmap[v1] = mp[v1]
-									}
-								}
-								newTmp[v] = newmap
-							}
-						} else if v == "purchasinglist" { //标的物处理
-							purchasinglist_new := []map[string]interface{}{}
-							if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
-								for _, ls := range pcl {
-									lsm_new := make(map[string]interface{})
-									lsm := ls.(map[string]interface{})
-									for _, pf := range purchasinglistFields {
-										if lsm[pf] != nil {
-											lsm_new[pf] = lsm[pf]
-										}
-									}
-									if lsm_new != nil && len(lsm_new) > 0 {
-										purchasinglist_new = append(purchasinglist_new, lsm_new)
-									}
-								}
-							}
-							if len(purchasinglist_new) > 0 {
-								newTmp[v] = purchasinglist_new
-							}
-
-						} else {
-							if v == "detail" {
-								detail, _ := tmp[v].(string)
-								newTmp[v] = FilterDetail(detail)
-							} else {
-								newTmp[v] = tmp[v]
-							}
-						}
-					} /*else if v == "budget" || v == "bidamount" {
-						newTmp[v] = nil
-					}*/
-				}
-				arrEs = append(arrEs, newTmp)
-			}
-			if len(update) > 0 {
-				queryId := map[string]interface{}{"_id": tmp["_id"]}
-				set := map[string]interface{}{"$set": update}
-				if len(del) > 0 { //删除的数据
-					set["$unset"] = del
-				}
-				arr = append(arr, []map[string]interface{}{queryId, set})
-			}
-			if len(arr) >= BulkSize {
-				mgo.UpdateBulkAll(db, c, arr...)
-				arr = [][]map[string]interface{}{}
-			}
-			if len(arrEs) >= BulkSize {
-				tmps := arrEs
-				elastic.BulkSave(index, itype, &tmps, true)
-				if len(multiIndex) == 2 {
-					elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-				}
-				arrEs = []map[string]interface{}{}
-			}
-			UpdatesLock.Unlock()
-		}(tmp, update, compare, del, bnil)
-		if n%1000 == 0 {
-			log.Println("current:", n, _id)
-		}
-		tmp = make(map[string]interface{})
-	}
-	for i := 0; i < thread; i++ {
-		mpool <- true
-	}
-	UpdatesLock.Lock()
-	//log.Println(db, c, index, itype, arr, arrEs)
-	if len(arr) > 0 {
-		mgo.UpdateBulkAll(db, c, arr...)
-	}
-	if len(arrEs) > 0 {
-		tmps := arrEs
-		elastic.BulkSave(index, itype, &tmps, true)
-		if len(multiIndex) == 2 {
-			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-		}
-	}
-	UpdatesLock.Unlock()
-	log.Println(mapInfo, "create bidding index...over", n)
-}
-
-func biddingAllTask1(data []byte, mapInfo map[string]interface{}) {
-	defer qutil.Catch()
-	thread := 40
-	var mpool = make(chan bool, thread)
-	q, _ := mapInfo["query"].(map[string]interface{})
-	if q == nil {
-		q = map[string]interface{}{
-			"_id": bson.M{
-				"$gt":  qutil.StringTOBsonId(mapInfo["gtid"].(string)),
-				"$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
-			},
-		}
-	}
-	//bidding库
-	session := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(session)
-	//extract库
-	extractsession := extractmgo.GetMgoConn()
-	defer extractmgo.DestoryMongoConn(extractsession)
-	//连接信息
-	c, _ := mapInfo["coll"].(string)
-	if c == "" {
-		c, _ = bidding["collect"].(string)
-	}
-
-	extractc, _ := bidding["extractcollect"].(string)
-	db, _ := bidding["db"].(string)
-	extractdb, _ := bidding["extractdb"].(string)
-	index, _ := bidding["index"].(string)
-	itype, _ := bidding["type"].(string)
-	count, _ := session.DB(db).C(c).Find(&q).Count()
-	fields := strings.Split(bidding["fields"].(string), ",")
-	//线程池
-	UpdatesLock := sync.Mutex{}
-
-	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
-	//查询招标数据
-	query := session.DB(db).C(c).Find(q).Select(bson.M{
-		"projectinfo.attachment": 0,
-		"contenthtml":            0,
-	}).Sort("_id").Iter()
-	//查询抽取结果
-	extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter()
-
-	n := 0
-
-	//更新数组
-	arr := [][]map[string]interface{}{}
-	arrEs := []map[string]interface{}{}
-	//对比两张表数据,减少查询次数
-	var compare bson.M
-	bnil := false
-	for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
-		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
-		// 	tmp = make(map[string]interface{})
-		// 	continue
-		// }
-		update := map[string]interface{}{}
-		del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
-		//对比方法----------------
-		for {
-			if compare == nil {
-				compare = make(bson.M)
-				if !extractquery.Next(compare) {
-					break
-				}
-			}
-			if compare != nil {
-				//对比
-				cid := qutil.BsonIdToSId(compare["_id"])
-				tid := qutil.BsonIdToSId(tmp["_id"])
-				if cid == tid {
-					bnil = false
-					//更新bidding表,生成索引
-					for _, k := range fields { //fields更新到mongo的字段
-						v1 := compare[k] //extract
-						v2 := tmp[k]     //bidding
-						if v2 == nil && v1 != nil {
-							update[k] = v1
-						} else if v2 != nil && v1 != nil {
-							update[k] = v1
-						} else if v2 != nil && v1 == nil { //
-							if k == "s_subscopeclass" && del["subscopeclass"] == nil {
-								continue
-							}
-							del[k] = 1
-							//qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
-						}
-					}
-					if qutil.IntAll(compare["repeat"]) == 1 {
-						update["extracttype"] = -1
-					} else {
-						update["extracttype"] = 1
-					}
-					break
-				} else {
-					if cid < tid {
-						bnil = false
-						compare = nil
-						continue
-					} else {
-						bnil = true
-						break
-					}
-				}
-			} else {
-				bnil = false
-				break
-			}
-		}
-		//下面可以多线程跑的--->
-		//处理分类
-		mpool <- true
-		_id := tmp["_id"]
-		go func(tmp, update, compare, del map[string]interface{}, bnil bool) {
-			defer func() {
-				<-mpool
-			}()
-			if !bnil && compare != nil {
-				subscopeclass, _ := compare["subscopeclass"].([]interface{})
-				if subscopeclass != nil {
-					m1 := map[string]bool{}
-					newclass := []string{}
-					for _, sc := range subscopeclass {
-						sclass, _ := sc.(string)
-						if !m1[sclass] {
-							m1[sclass] = true
-							newclass = append(newclass, sclass)
-						}
-					}
-					update["s_subscopeclass"] = strings.Join(newclass, ",")
-					update["subscopeclass"] = newclass
-				}
-				//处理中标企业
-				//				winner, _ := compare["winner"].(string)
-				//				m1 := map[string]bool{}
-				//				if winner != "" {
-				//					m1[winner] = true
-				//				}
-				//				package1 := compare["package"]
-				//				if package1 != nil {
-				//					packageM, _ := package1.(map[string]interface{})
-				//					for _, p := range packageM {
-				//						pm, _ := p.(map[string]interface{})
-				//						pw, _ := pm["winner"].(string)
-				//						if pw != "" {
-				//							m1[pw] = true
-				//						}
-				//					}
-				//				}
-				compare = nil
-				//				if len(m1) > 0 {
-				//					//str := ","
-				//					winnerarr := []string{}
-				//					for k, _ := range m1 {
-				//						//str += k + ","
-				//						winnerarr = append(winnerarr, k)
-				//					}
-				//					update["s_winner"] = strings.Join(winnerarr, ",")
-				//				}
-			}
-			//------------------对比结束
-			//同时保存到elastic
-			for tk, tv := range update {
-				tmp[tk] = tv
-			}
-			if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
-				if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
-					delete(tmp, "supervisorrate")
-				}
-			}
-			//对projectscope字段的索引处理
-			ps, _ := tmp["projectscope"].(string)
-			//			if ps == "" {
-			//				tmp["projectscope"] = ""
-			//			}
-			if len(ps) > ESLEN {
-				tmp["projectscope"] = string(([]rune(ps))[:4000])
-			}
-			//对标的物为空处理
-			if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text
-				if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
-					tmp["detail"] = filetext    //更新es中detail
-					update["detail"] = filetext //更新mongo中detail
-					update["filedetail"] = 1    //mongo中打标记
-				}
-				tmp["filetext"] = filetext
-			}
-			if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
-				delete(tmp, "purchasing")
-			}
-			if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
-				delete(tmp, "purchasinglist")
-			}
-			//预算和中标金额
-			//			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			//				tmp["budget"] = nil
-			//			} else if sbd, ok := tmp["budget"].(string); ok {
-			//				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			//			}
-			//			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			//				tmp["bidamount"] = nil
-			//			} else if sbd, ok := tmp["bidamount"].(string); ok {
-			//				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			//			}
-
-			//go IS.Add("bidding")
-			UpdatesLock.Lock()
-			if qutil.IntAll(update["extracttype"]) != -1 {
-				newTmp := map[string]interface{}{}
-				for _, v := range biddingIndexFields { //
-					if tmp[v] != nil && del[v] == nil { //
-						if "projectinfo" == v {
-							mp, _ := tmp[v].(map[string]interface{})
-							if mp != nil {
-								newmap := map[string]interface{}{}
-								for _, v1 := range projectinfoFields {
-									if mp[v1] != nil {
-										newmap[v1] = mp[v1]
+										newmap[v1] = fmt.Sprint(mp[v1])
 									}
 								}
 								if len(newmap) > 0 {
@@ -559,6 +257,7 @@ func biddingAllTask1(data []byte, mapInfo map[string]interface{}) {
 							if len(purchasinglist_new) > 0 {
 								newTmp[v] = purchasinglist_new
 							}
+
 						} else {
 							if v == "detail" {
 								detail, _ := tmp[v].(string)

+ 277 - 235
udpcreateindex/src/biddingindex.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"encoding/json"
+	"fmt"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -82,237 +83,259 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2)
 }
 
-func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
-	n1, n2 := 0, 0
-	//线程池
-	UpdatesLock := sync.Mutex{}
-	fields := strings.Split(bidding["fields"].(string), ",")
-	//更新数组
-	arr := [][]map[string]interface{}{}
-	arrEs := []map[string]interface{}{}
-	//对比两张表数据,减少查询次数
-	var compare bson.M
-	log.Println("开始迭代..")
-	for n, tmp := range infos {
-		n1++
-		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
-		// 	tmp = make(map[string]interface{})
-		// 	continue
-		// }
-		update := map[string]interface{}{} //要更新的mongo数据
-		//对比方法----------------
-		tid := qutil.BsonIdToSId(tmp["_id"])
-		if eMap[tid] != nil {
-			compare = eMap[tid]
-			if qutil.IntAll(compare["dataging"]) == 1 { //extract中dataging=1不生索引
-				tmp = make(map[string]interface{})
-				compare = nil
-				continue
-			}
-			delete(eMap, tid)
-			//更新bidding表,生成索引
-			for _, k := range fields {
-				v1 := compare[k] //extract
-				v2 := tmp[k]     //bidding
-				if v2 == nil && v1 != nil {
-					update[k] = v1
-				} else if v2 != nil && v1 != nil {
-					//update[k+"_b"] = v2
-					update[k] = v1
-				} else if v2 != nil && v1 == nil {
-					//update[k+"_b"] = v2
-				}
-			}
-			if qutil.IntAll(compare["repeat"]) == 1 {
-				update["extracttype"] = -1
-			} else {
-				update["extracttype"] = 1
-			}
-		} else {
-			compare = nil
-		}
-		//下面可以多线程跑的--->
-		//处理分类
-		if compare != nil { //extract
-			subscopeclass, _ := compare["subscopeclass"].([]interface{})
-			if subscopeclass != nil {
-				//str := ","
-				m1 := map[string]bool{}
-				newclass := []string{}
-				for _, sc := range subscopeclass {
-					sclass, _ := sc.(string)
-					if !m1[sclass] {
-						m1[sclass] = true
-						//str += sclass + ","
-						newclass = append(newclass, sclass)
-					}
-				}
-				update["s_subscopeclass"] = strings.Join(newclass, ",")
-				update["subscopeclass"] = newclass
-			}
-			//处理中标企业
-			//			winner, _ := compare["winner"].(string)
-			//			m1 := map[string]bool{}
-			//			if winner != "" {
-			//				m1[winner] = true
-			//			}
-			//			package1 := compare["package"]
-			//			if package1 != nil {
-			//				packageM, _ := package1.(map[string]interface{})
-			//				for _, p := range packageM {
-			//					pm, _ := p.(map[string]interface{})
-			//					pw, _ := pm["winner"].(string)
-			//					if pw != "" {
-			//						m1[pw] = true
-			//					}
-			//				}
-			//			}
-			compare = nil
-			//			if len(m1) > 0 {
-			//				//str := ","
-			//				winnerarr := []string{}
-			//				for k, _ := range m1 {
-			//					//str += k + ","
-			//					winnerarr = append(winnerarr, k)
-			//				}
-			//				update["s_winner"] = strings.Join(winnerarr, ",")
-			//			}
-		}
-		//------------------对比结束
+// func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
+// 	n1, n2 := 0, 0
+// 	//线程池
+// 	UpdatesLock := sync.Mutex{}
+// 	fields := strings.Split(bidding["fields"].(string), ",")
+// 	//更新数组
+// 	arr := [][]map[string]interface{}{}
+// 	arrEs := []map[string]interface{}{}
+// 	//对比两张表数据,减少查询次数
+// 	var compare bson.M
+// 	log.Println("开始迭代..")
+// 	for n, tmp := range infos {
+// 		n1++
+// 		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
+// 		// 	tmp = make(map[string]interface{})
+// 		// 	continue
+// 		// }
+// 		update := map[string]interface{}{} //要更新的mongo数据
+// 		//对比方法----------------
+// 		tid := qutil.BsonIdToSId(tmp["_id"])
+// 		if eMap[tid] != nil {
+// 			compare = eMap[tid]
+// 			if qutil.IntAll(compare["dataging"]) == 1 { //extract中dataging=1不生索引
+// 				tmp = make(map[string]interface{})
+// 				compare = nil
+// 				continue
+// 			}
+// 			delete(eMap, tid)
+// 			//更新bidding表,生成索引
+// 			for _, k := range fields {
+// 				v1 := compare[k] //extract
+// 				v2 := tmp[k]     //bidding
+// 				if v2 == nil && v1 != nil {
+// 					update[k] = v1
+// 				} else if v2 != nil && v1 != nil {
+// 					//update[k+"_b"] = v2
+// 					update[k] = v1
+// 				} else if v2 != nil && v1 == nil {
+// 					//update[k+"_b"] = v2
+// 				}
+// 			}
+// 			if qutil.IntAll(compare["repeat"]) == 1 {
+// 				update["extracttype"] = -1
+// 			} else {
+// 				update["extracttype"] = 1
+// 			}
+// 		} else {
+// 			compare = nil
+// 		}
+// 		//下面可以多线程跑的--->
+// 		//处理分类
+// 		if compare != nil { //extract
+// 			subscopeclass, _ := compare["subscopeclass"].([]interface{})
+// 			if subscopeclass != nil {
+// 				//str := ","
+// 				m1 := map[string]bool{}
+// 				newclass := []string{}
+// 				for _, sc := range subscopeclass {
+// 					sclass, _ := sc.(string)
+// 					if !m1[sclass] {
+// 						m1[sclass] = true
+// 						//str += sclass + ","
+// 						newclass = append(newclass, sclass)
+// 					}
+// 				}
+// 				update["s_subscopeclass"] = strings.Join(newclass, ",")
+// 				update["subscopeclass"] = newclass
+// 			}
+// 			//处理中标企业
+// 			//			winner, _ := compare["winner"].(string)
+// 			//			m1 := map[string]bool{}
+// 			//			if winner != "" {
+// 			//				m1[winner] = true
+// 			//			}
+// 			//			package1 := compare["package"]
+// 			//			if package1 != nil {
+// 			//				packageM, _ := package1.(map[string]interface{})
+// 			//				for _, p := range packageM {
+// 			//					pm, _ := p.(map[string]interface{})
+// 			//					pw, _ := pm["winner"].(string)
+// 			//					if pw != "" {
+// 			//						m1[pw] = true
+// 			//					}
+// 			//				}
+// 			//			}
+// 			compare = nil
+// 			//			if len(m1) > 0 {
+// 			//				//str := ","
+// 			//				winnerarr := []string{}
+// 			//				for k, _ := range m1 {
+// 			//					//str += k + ","
+// 			//					winnerarr = append(winnerarr, k)
+// 			//				}
+// 			//				update["s_winner"] = strings.Join(winnerarr, ",")
+// 			//			}
+// 		}
+// 		//------------------对比结束
 
-		//处理key descript
-		if bkey == "" {
-			DealInfo(&tmp, &update)
-		}
-		//同时保存到elastic
-		for tk, tv := range update {
-			tmp[tk] = tv
-		}
-		if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
-			if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
-				delete(tmp, "supervisorrate")
-			}
-		}
-		//对projectscope字段的索引处理
-		ps, _ := tmp["projectscope"].(string)
-		//		if ps == "" {
-		//			tmp["projectscope"] = "" //= tmp["detail"]
-		//		}
-		if len(ps) > ESLEN {
-			tmp["projectscope"] = string(([]rune(ps))[:4000])
-		}
-		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-		//			tmp["budget"] = nil
-		//		} else if sbd, ok := tmp["budget"].(string); ok {
-		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		//		}
-		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-		//			tmp["bidamount"] = nil
-		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
-		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		//		}
-		UpdatesLock.Lock()
-		//		for k1, _ := range tmp {
-		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
-		//				delete(tmp, k1)
-		//			}
-		//		}
-		go IS.Add("bidding")
-		if qutil.IntAll(update["extracttype"]) != -1 {
-			newTmp := map[string]interface{}{}     //最终生索引的数据
-			for _, v := range biddingIndexFields { //索引字段
-				if tmp[v] != nil {
-					if "projectinfo" == v {
-						mp, _ := tmp[v].(map[string]interface{})
-						if mp != nil {
-							newmap := map[string]interface{}{}
-							for _, v1 := range projectinfoFields {
-								if mp[v1] != nil {
-									newmap[v1] = mp[v1]
-								}
-							}
-							newTmp[v] = newmap
-							// attachments := mp["attachments"]
-							// con := ""
-							// if attachments != nil {
-							// 	am, _ := attachments.(map[string]interface{})
-							// 	if am != nil {
-							// 		for _, v1 := range am {
-							// 			vm, _ := v1.(map[string]interface{})
-							// 			if vm != nil {
-							// 				c, _ := vm["content"].(string)
-							// 				con += c
-							// 			}
-							// 		}
-							// 	}
-							// }
-							// con = FilterDetailSpace(con)
-							// if con != "" {
-							// 	newTmp["attachments"] = con
-							// }
-						}
-					} else {
-						if v == "detail" {
-							detail, _ := tmp[v].(string)
-							newTmp[v] = FilterDetail(detail)
-						} else {
-							newTmp[v] = tmp[v]
-						}
-					}
-				}
-			}
-			arrEs = append(arrEs, newTmp)
-		}
-		if len(update) > 0 {
-			arr = append(arr, []map[string]interface{}{
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": update,
-				},
-			})
-		}
-		if len(arr) >= BulkSize-1 {
-			mgo.UpdateBulkAll(db, c, arr...)
-			arr = [][]map[string]interface{}{}
-		}
-		if len(arrEs) >= BulkSize-1 {
-			tmps := arrEs
-			elastic.BulkSave(index, itype, &tmps, true)
-			if other_index != "" && other_itype != "" {
-				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
-			}
-			if len(multiIndex) == 2 {
-				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-			}
-			arrEs = []map[string]interface{}{}
-		}
-		UpdatesLock.Unlock()
-		if n%100 == 0 {
-			log.Println("current:", n)
-		}
-		tmp = make(map[string]interface{})
-	}
-	UpdatesLock.Lock()
-	if len(arr) > 0 {
-		mgo.UpdateBulkAll(db, c, arr...)
-	}
-	if len(arrEs) > 0 {
-		tmps := arrEs
-		elastic.BulkSave(index, itype, &tmps, true)
-		if other_index != "" && other_itype != "" {
-			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
-		}
-		if len(multiIndex) == 2 {
-			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-		}
-	}
-	UpdatesLock.Unlock()
-	return n1, n2
-}
+// 		//处理key descript
+// 		if bkey == "" {
+// 			DealInfo(&tmp, &update)
+// 		}
+// 		//同时保存到elastic
+// 		for tk, tv := range update {
+// 			tmp[tk] = tv
+// 		}
+// 		if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
+// 			if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
+// 				delete(tmp, "supervisorrate")
+// 			}
+// 		}
+// 		//对projectscope字段的索引处理
+// 		ps, _ := tmp["projectscope"].(string)
+// 		//		if ps == "" {
+// 		//			tmp["projectscope"] = "" //= tmp["detail"]
+// 		//		}
+// 		if len(ps) > ESLEN {
+// 			tmp["projectscope"] = string(([]rune(ps))[:4000])
+// 		}
+// 		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+// 		//			tmp["budget"] = nil
+// 		//		} else if sbd, ok := tmp["budget"].(string); ok {
+// 		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+// 		//		}
+// 		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+// 		//			tmp["bidamount"] = nil
+// 		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
+// 		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+// 		//		}
+// 		UpdatesLock.Lock()
+// 		//		for k1, _ := range tmp {
+// 		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
+// 		//				delete(tmp, k1)
+// 		//			}
+// 		//		}
+// 		go IS.Add("bidding")
+// 		if qutil.IntAll(update["extracttype"]) != -1 {
+// 			newTmp := map[string]interface{}{}     //最终生索引的数据
+// 			for _, v := range biddingIndexFields { //索引字段
+// 				if tmp[v] != nil {
+// 					if "projectinfo" == v {
+// 						mp, _ := tmp[v].(map[string]interface{})
+// 						if mp != nil {
+// 							newmap := map[string]interface{}{}
+// 							for _, v1 := range projectinfoFields {
+// 								if mp[v1] != nil {
+// 									newmap[v1] = fmt.Sprint(mp[v1])
+// 								}
+// 							}
+// 							if len(newmap) > 0 {
+// 								newTmp[v] = newmap
+// 							}
+// 							// attachments := mp["attachments"]
+// 							// con := ""
+// 							// if attachments != nil {
+// 							// 	am, _ := attachments.(map[string]interface{})
+// 							// 	if am != nil {
+// 							// 		for _, v1 := range am {
+// 							// 			vm, _ := v1.(map[string]interface{})
+// 							// 			if vm != nil {
+// 							// 				c, _ := vm["content"].(string)
+// 							// 				con += c
+// 							// 			}
+// 							// 		}
+// 							// 	}
+// 							// }
+// 							// con = FilterDetailSpace(con)
+// 							// if con != "" {
+// 							// 	newTmp["attachments"] = con
+// 							// }
+// 						}
+// 					} else if v == "purchasinglist" { //标的物处理
+// 						purchasinglist_new := []map[string]interface{}{}
+// 						if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
+// 							for _, ls := range pcl {
+// 								lsm_new := make(map[string]interface{})
+// 								lsm := ls.(map[string]interface{})
+// 								for _, pf := range purchasinglistFields {
+// 									if lsm[pf] != nil {
+// 										lsm_new[pf] = lsm[pf]
+// 									}
+// 								}
+// 								if lsm_new != nil && len(lsm_new) > 0 {
+// 									purchasinglist_new = append(purchasinglist_new, lsm_new)
+// 								}
+// 							}
+// 						}
+// 						if len(purchasinglist_new) > 0 {
+// 							newTmp[v] = purchasinglist_new
+// 						}
+
+// 					} else {
+// 						if v == "detail" {
+// 							detail, _ := tmp[v].(string)
+// 							newTmp[v] = FilterDetail(detail)
+// 						} else {
+// 							newTmp[v] = tmp[v]
+// 						}
+// 					}
+// 				}
+// 			}
+// 			arrEs = append(arrEs, newTmp)
+// 		}
+// 		if len(update) > 0 {
+// 			arr = append(arr, []map[string]interface{}{
+// 				map[string]interface{}{
+// 					"_id": tmp["_id"],
+// 				},
+// 				map[string]interface{}{
+// 					"$set": update,
+// 				},
+// 			})
+// 		}
+// 		if len(arr) >= BulkSize-1 {
+// 			mgo.UpdateBulkAll(db, c, arr...)
+// 			arr = [][]map[string]interface{}{}
+// 		}
+// 		if len(arrEs) >= BulkSize-1 {
+// 			tmps := arrEs
+// 			elastic.BulkSave(index, itype, &tmps, true)
+// 			if other_index != "" && other_itype != "" {
+// 				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+// 			}
+// 			if len(multiIndex) == 2 {
+// 				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+// 			}
+// 			arrEs = []map[string]interface{}{}
+// 		}
+// 		UpdatesLock.Unlock()
+// 		if n%100 == 0 {
+// 			log.Println("current:", n)
+// 		}
+// 		tmp = make(map[string]interface{})
+// 	}
+// 	UpdatesLock.Lock()
+// 	if len(arr) > 0 {
+// 		mgo.UpdateBulkAll(db, c, arr...)
+// 	}
+// 	if len(arrEs) > 0 {
+// 		tmps := arrEs
+// 		elastic.BulkSave(index, itype, &tmps, true)
+// 		if other_index != "" && other_itype != "" {
+// 			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
+// 		}
+// 		if len(multiIndex) == 2 {
+// 			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
+// 		}
+// 	}
+// 	UpdatesLock.Unlock()
+// 	return n1, n2
+// }
 
-func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
+func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
 	n1, n2 := 0, 0
 	//线程池
 	UpdatesLock := sync.Mutex{}
@@ -433,11 +456,11 @@ func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interfa
 		}
 		//对标的物为空处理
 		if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text
-			if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
-				tmp["detail"] = filetext    //更新es中detail
-				update["detail"] = filetext //更新mongo中detail
-				update["filedetail"] = 1    //mongo中打标记
-			}
+			// if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
+			// 	tmp["detail"] = filetext    //更新es中detail
+			// 	update["detail"] = filetext //更新mongo中detail
+			// 	update["filedetail"] = 1    //mongo中打标记
+			// }
 			tmp["filetext"] = filetext
 		}
 		if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
@@ -463,7 +486,7 @@ func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interfa
 							newmap := map[string]interface{}{}
 							for _, v1 := range projectinfoFields {
 								if mp[v1] != nil {
-									newmap[v1] = mp[v1]
+									newmap[v1] = fmt.Sprint(mp[v1])
 								}
 							}
 							if len(newmap) > 0 {
@@ -488,6 +511,25 @@ func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interfa
 							// 	newTmp["attachments"] = con
 							// }
 						}
+					} else if v == "purchasinglist" { //标的物处理
+						purchasinglist_new := []map[string]interface{}{}
+						if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
+							for _, ls := range pcl {
+								lsm_new := make(map[string]interface{})
+								lsm := ls.(map[string]interface{})
+								for _, pf := range purchasinglistFields {
+									if lsm[pf] != nil {
+										lsm_new[pf] = lsm[pf]
+									}
+								}
+								if lsm_new != nil && len(lsm_new) > 0 {
+									purchasinglist_new = append(purchasinglist_new, lsm_new)
+								}
+							}
+						}
+						if len(purchasinglist_new) > 0 {
+							newTmp[v] = purchasinglist_new
+						}
 					} else {
 						if v == "detail" {
 							detail, _ := tmp[v].(string)

+ 1 - 1
udpcreateindex/src/config.json

@@ -39,7 +39,7 @@
         "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo","purchasing","purchasinglist","filetext"
         ],
         "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging",
-        "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,industry",
+        "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,approvedept,approvenumber,projecttype,approvecity",
         "purchasinglist":"itemname,model,unitname,number",
         "multiIndex": ""
     },

+ 1 - 1
udpcreateindex/src/task.go

@@ -12,7 +12,7 @@ import (
 func task_index() {
 	c := cron.New()
 	c.AddFunc("20 30 5 * * *", func() { task_projects() })
-	c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
+	//c.AddFunc("0 30 * * * *", func() { task_biddingfile() }) //每30分钟执行一次
 	//c.AddFunc("0 22 14 * * *", func() { task_qyxyindex() })
 	c.Start()
 }