Przeglądaj źródła

新增生成es字段类型配置

maxiaoshan 4 lat temu
rodzic
commit
f70ae2dee3

+ 79 - 19
udpcreateindex/src/biddingall.go

@@ -5,6 +5,8 @@ import (
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
+
+	//"reflect"
 	"strings"
 	"sync"
 
@@ -270,6 +272,83 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 			UpdatesLock.Lock()
 			if qutil.IntAll(update["extracttype"]) != -1 {
 				newTmp := map[string]interface{}{}
+				// for field, ftype := range biddingIndexFieldsMap { //
+				// 	if tmp[field] != nil && del[field] == nil { //
+				// 		//qutil.Debug(field, tmp[field], reflect.TypeOf(tmp[field]).String(), ftype)
+				// 		if field == "projectinfo" {
+				// 			mp, _ := tmp[field].(map[string]interface{})
+				// 			if mp != nil {
+				// 				newmap := map[string]interface{}{}
+				// 				for k, ktype := range projectinfoFieldsMap {
+				// 					mpv := mp[k]
+				// 					if mpv != nil && reflect.TypeOf(mpv).String() == ktype {
+				// 						newmap[k] = mp[k]
+				// 					}
+				// 				}
+				// 				if len(newmap) > 0 {
+				// 					newTmp[field] = newmap
+				// 				}
+				// 			}
+				// 		} else if field == "purchasinglist" { //标的物处理
+				// 			purchasinglist_new := []map[string]interface{}{}
+				// 			if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
+				// 				for _, ls := range pcl {
+				// 					lsm_new := make(map[string]interface{})
+				// 					lsm := ls.(map[string]interface{})
+				// 					for pf, pftype := range purchasinglistFieldsMap {
+				// 						lsmv := lsm[pf]
+				// 						if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype {
+				// 							lsm_new[pf] = lsm[pf]
+				// 						}
+				// 					}
+				// 					if lsm_new != nil && len(lsm_new) > 0 {
+				// 						purchasinglist_new = append(purchasinglist_new, lsm_new)
+				// 					}
+				// 				}
+				// 			}
+				// 			if len(purchasinglist_new) > 0 {
+				// 				newTmp[field] = purchasinglist_new
+				// 			}
+				// 		} else if field == "winnerorder" { //中标候选
+				// 			winnerorder_new := []map[string]interface{}{}
+				// 			if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 {
+				// 				for _, win := range winnerorder {
+				// 					winMap_new := make(map[string]interface{})
+				// 					winMap := win.(map[string]interface{})
+				// 					for wf, wftype := range winnerorderlistFieldsMap {
+				// 						wfv := winMap[wf]
+				// 						if wfv != nil && reflect.TypeOf(wfv).String() == wftype {
+				// 							if wf == "sort" && qutil.Int64All(wfv) > 100 {
+				// 								continue
+				// 							}
+				// 							winMap_new[wf] = winMap[wf]
+				// 						}
+				// 					}
+				// 					if winMap_new != nil && len(winMap_new) > 0 {
+				// 						winnerorder_new = append(winnerorder_new, winMap_new)
+				// 					}
+				// 				}
+				// 			}
+				// 			if len(winnerorder_new) > 0 {
+				// 				newTmp[field] = winnerorder_new
+				// 			}
+				// 		} else if field == "detail" { //过滤
+				// 			detail, _ := tmp[field].(string)
+				// 			if len([]rune(detail)) > detailLength {
+				// 				detail = detail[:detailLength]
+				// 			}
+				// 			newTmp[field] = FilterDetail(detail)
+				// 		} else if field == "_id" || field == "topscopeclass" { //不做处理
+				// 			newTmp[field] = tmp[field]
+				// 		} else { //其它字段判断数据类型,不正确舍弃
+				// 			if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
+				// 				continue
+				// 			} else {
+				// 				newTmp[field] = fieldval
+				// 			}
+				// 		}
+				// 	}
+				// }
 				for _, v := range biddingIndexFields { //
 					if tmp[v] != nil && del[v] == nil { //
 						if "projectinfo" == v {
@@ -304,25 +383,6 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 							if len(purchasinglist_new) > 0 {
 								newTmp[v] = purchasinglist_new
 							}
-							/*} else if v == "winnerorder" { //中标候选
-							winnerorder_new := []map[string]interface{}{}
-							if winnerorder, _ := tmp[v].([]interface{}); len(winnerorder) > 0 {
-								for _, win := range winnerorder {
-									winMap_new := make(map[string]interface{})
-									winMap := win.(map[string]interface{})
-									for _, wf := range winnerorderlistFields {
-										if winMap[wf] != nil {
-											winMap_new[wf] = winMap[wf]
-										}
-									}
-									if winMap_new != nil && len(winMap_new) > 0 {
-										winnerorder_new = append(winnerorder_new, winMap_new)
-									}
-								}
-							}
-							if len(winnerorder_new) > 0 {
-								newTmp[v] = winnerorder_new
-							}*/
 						} else {
 							if v == "detail" {
 								detail, _ := tmp[v].(string)

+ 4 - 1
udpcreateindex/src/biddingindex.go

@@ -360,7 +360,10 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 								winMap_new := make(map[string]interface{})
 								winMap := win.(map[string]interface{})
 								for _, wf := range winnerorderlistFields {
-									if winMap[wf] != nil {
+									if wfv := winMap[wf]; wfv != nil {
+										if wf == "sort" && qutil.Int64All(wfv) > 100 {
+											continue
+										}
 										winMap_new[wf] = winMap[wf]
 									}
 								}

+ 10 - 3
udpcreateindex/src/buyerindex.go

@@ -19,7 +19,7 @@ func buyerTask(data []byte, mapInfo map[string]interface{}) {
 			},
 		}
 	}
-	session := extractmgo.GetMgoConn(1800)
+	session := extractmgo.GetMgoConn()
 	defer extractmgo.DestoryMongoConn(session)
 	c, _ := buyer["collect"].(string)
 	db, _ := buyer["db"].(string)
@@ -29,13 +29,20 @@ func buyerTask(data []byte, mapInfo map[string]interface{}) {
 	savepool := make(chan bool, 10)
 
 	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
-	query := session.DB(db).C(c).Find(q).Select(bson.M{"pici": 0}).Iter()
+	query := session.DB(db).C(c).Find(q).Select(map[string]interface{}{
+		"buyer_name": 1,
+		"province":   1,
+		"city":       1,
+		"district":   1,
+	}).Iter()
 
 	arr := make([]map[string]interface{}, savesizei)
 	var n int
 	i := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
-		go IS.Add("buyer")
+		//go IS.Add("buyer")
+		tmp["name"] = tmp["buyer_name"]
+		delete(tmp, "buyer_name")
 		arr[i] = tmp
 		n++
 		if i == savesizei-1 {

+ 15 - 11
udpcreateindex/src/config.json

@@ -1,7 +1,7 @@
 {
     "udpport": ":1483",
     "msg_server": "10.171.112.160:7070",
-     "mongodb": {
+    "mongodb": {
         "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "mxs"
@@ -9,7 +9,7 @@
 	"savedb": {
         "addr": "192.168.3.207:27092",
         "size": 10,
-        "db": "mxs"
+        "db": "qfw_data"
     },
     "jkmail": {
         "to":"zhangjinkun@topnet.net.cn",
@@ -22,9 +22,9 @@
         "type": "winner"
     },
     "buyer": {
-        "db": "qfw",
-        "collect": "buyer",
-        "index": "buyer",
+        "db": "mxs",
+        "collect": "buyer_enterprise",
+        "index": "buyer_v5",
         "type": "buyer"
     },
     "biddingback": {
@@ -35,26 +35,30 @@
     },
     "bidding": {
         "db": "mxs",
-        "collect": "bidding",
-        "index": "bidding_v1",
+        "collect": "test",
+        "index": "bidding_v5",
         "type": "bidding",
         "extractdb": "mxs",
         "extractcollect": "extract",
         "indexfields":[ 
         "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo","purchasing","purchasinglist","filetext","channel","winnerorder"
         ],
+        "indexfieldsmap":{"buyerzipcode":"string","winnertel":"string","winnerperson":"string","contractcode":"string","winneraddr":"string","agencyaddr":"string","buyeraddr":"string","signaturedate":"int64","projectperiod":"string","projectaddr":"string","agencytel":"string","agencyperson":"string","buyerperson":"string","agency":"string","projectscope":"string","projectcode":"string","bidopentime":"int64","supervisorrate":"float64","buyertel":"string","bidamount":"float64","winner":"string","buyer":"string","budget":"float64","projectname":"string","bidstatus":"string","buyerclass":"string","topscopeclass":"","s_topscopeclass":"string","s_subscopeclass":"string","area":"string","city":"string","district":"string","s_winner":"string","_id":"","title":"string","detail":"string","site":"string","comeintime":"int64","href":"string","infoformat":"int","publishtime":"int64","s_sha":"string","spidercode":"string","subtype":"string","toptype":"string","projectinfo":"","purchasing":"string","purchasinglist":"","filetext":"string","channel":"string","winnerorder":""},
         "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,s_topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging,winnerorder",
-        "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,approvedept,approvenumber,projecttype,approvecity",
+        "projectinfo":"approvecode,approvecontent,approvestatus,approvetime,approvedept,approvenumber,projecttype,approvecity", 
+     	"projectinfomap":{"approvecode":"string","approvecontent":"string","approvestatus":"string","approvetime":"string","approvedept":"string","approvenumber":"string","projecttype":"string","approvecity":"string"},
         "purchasinglist": "itemname,model,unitname,number",
+        "purchasinglistmap":{"itemname":"string","model":"string","unitname":"string","number":"float64"},
         "winnerorder": "sort,sortstr,entname",
+        "winnerordermap": {"sort":"int","sortstr":"string","entname":"string"},
         "multiIndex": ""
     },
     "filelength": 50000,
     "detaillength": 50000,
     "project": {
         "db": "mxs",
-        "collect": "test",
-        "index": "projectset_v1",
+        "collect": "projectset",
+        "index": "projectset_v5",
         "type": "projectset"
     },
     "project2": {
@@ -95,7 +99,7 @@
 		}
     },
     "elastic": {
-        "addr": "http://192.168.3.128:9800",
+        "addr": "http://192.168.3.11:9800",
         "pool": 12
     }
 }

+ 52 - 23
udpcreateindex/src/main.go

@@ -14,23 +14,27 @@ import (
 )
 
 var (
-	Sysconfig             map[string]interface{} //配置文件
-	mgo                   *mongodb.MongodbSim    //mongodb操作对象
-	extractmgo            *mongodb.MongodbSim    //mongodb操作对象
-	project2db            *mongodb.MongodbSim    //mongodb操作对象
-	mgostandard           *mongodb.MongodbSim    //mongodb操作对象
-	qyxydb                *mongodb.MongodbSim    //mongodb操作对象
-	udpclient             mu.UdpClient           //udp对象
-	updport               string
-	savesizei             = 500
-	biddingIndexFields    = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
-	projectinfoFields     []string
-	multiIndex            []string
-	purchasinglistFields  []string
-	winnerorderlistFields []string
-	BulkSize              = 400
-	detailLength          = 50000
-	fileLength            = 50000
+	Sysconfig                map[string]interface{} //配置文件
+	mgo                      *mongodb.MongodbSim    //mongodb操作对象
+	extractmgo               *mongodb.MongodbSim    //mongodb操作对象
+	project2db               *mongodb.MongodbSim    //mongodb操作对象
+	mgostandard              *mongodb.MongodbSim    //mongodb操作对象
+	qyxydb                   *mongodb.MongodbSim    //mongodb操作对象
+	udpclient                mu.UdpClient           //udp对象
+	updport                  string
+	savesizei                = 500
+	biddingIndexFields       = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
+	biddingIndexFieldsMap    = map[string]string{}
+	projectinfoFields        []string
+	projectinfoFieldsMap     = map[string]string{}
+	multiIndex               []string
+	purchasinglistFields     []string
+	winnerorderlistFields    []string
+	purchasinglistFieldsMap  = map[string]string{}
+	winnerorderlistFieldsMap = map[string]string{}
+	BulkSize                 = 400
+	detailLength             = 50000
+	fileLength               = 50000
 	//bidding_other连接信息
 	bidding_other_es *elastic.Elastic
 	other_index      string
@@ -131,18 +135,43 @@ func init() {
 			purchasinglistFields = strings.Split(pcl, ",")
 		}
 	}
-	// if bidding["winnerorder"] != nil {
-	// 	winnerorder := util.ObjToString(bidding["winnerorder"])
-	// 	if winnerorder != "" {
-	// 		winnerorderlistFields = strings.Split(winnerorder, ",")
-	// 	}
-	// }
+	if bidding["winnerorder"] != nil {
+		winnerorder := util.ObjToString(bidding["winnerorder"])
+		if winnerorder != "" {
+			winnerorderlistFields = strings.Split(winnerorder, ",")
+		}
+	}
 	if bidding["multiIndex"] != nil {
 		mi := util.ObjToString(bidding["multiIndex"])
 		if mi != "" {
 			multiIndex = strings.Split(mi, ",")
 		}
 	}
+	//
+	if bidding["indexfieldsmap"] != nil {
+		for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
+			biddingIndexFieldsMap[k] = util.ObjToString(v)
+		}
+		util.Debug(biddingIndexFieldsMap)
+	}
+	if bidding["projectinfomap"] != nil {
+		for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
+			projectinfoFieldsMap[k] = util.ObjToString(v)
+		}
+		util.Debug(projectinfoFieldsMap)
+	}
+	if bidding["purchasinglistmap"] != nil {
+		for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
+			purchasinglistFieldsMap[k] = util.ObjToString(v)
+		}
+		util.Debug(purchasinglistFieldsMap)
+	}
+	if bidding["winnerordermap"] != nil {
+		for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
+			winnerorderlistFieldsMap[k] = util.ObjToString(v)
+		}
+		util.Debug(winnerorderlistFieldsMap)
+	}
 	log.Println(projectinfoFields)
 	log.Println(purchasinglistFields)
 	//初始化oss