Эх сурвалжийг харах

Merge branch 'dev3.4.1' of http://192.168.3.207:8080/qmx/jy-data-extract into dev3.4.1

zhengkun 4 жил өмнө
parent
commit
e26790660d

+ 30 - 71
udpcreateindex/src/biddingall.go

@@ -27,7 +27,7 @@ var RegSpace = regexp.MustCompile("[\\s\u3000\u2003\u00a0]+")
 
 func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 	defer qutil.Catch()
-	thread := 40
+	thread := 10
 	var mpool = make(chan bool, thread)
 	q, _ := mapInfo["query"].(map[string]interface{})
 	if q == nil {
@@ -232,26 +232,26 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 			for tk, tv := range update {
 				tmp[tk] = tv
 			}
-			//if tmp["s_winner"] != "" {
-			//	sWinnerarr := strings.Split(qutil.ObjToString(tmp["s_winner"]), ",")
-			//	var cid []string
-			//	for _, w := range sWinnerarr {
-			//		if w != "" {
-			//			ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w})
-			//			if len(*ent) > 0 {
-			//				cid = append(cid, qutil.ObjToString((*ent)["company_id"]))
-			//			}
-			//		}
-			//	}
-			//	if len(cid) > 0 {
-			//		tmp["entidlist"] = cid
-			//		update["entidlist"] = cid
-			//		tmp_up := []map[string]interface{}{}
-			//		tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]})
-			//		tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}})
-			//		UpdataMgoCache <- tmp_up
-			//	}
-			//}
+			if tmp["s_winner"] != "" {
+				sWinnerarr := strings.Split(qutil.ObjToString(tmp["s_winner"]), ",")
+				var cid []string
+				for _, w := range sWinnerarr {
+					if w != "" {
+						ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w})
+						if len(*ent) > 0 {
+							cid = append(cid, qutil.ObjToString((*ent)["company_id"]))
+						}
+					}
+				}
+				if len(cid) > 0 {
+					tmp["entidlist"] = cid
+					update["entidlist"] = cid
+					tmp_up := []map[string]interface{}{}
+					tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]})
+					tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}})
+					UpdataMgoCache <- tmp_up
+				}
+			}
 			//对projectscope字段的索引处理
 			ps, _ := tmp["projectscope"].(string)
 			if len(ps) > ESLEN {
@@ -364,6 +364,12 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 							if len(qs) > 0 {
 								newTmp[field] = strings.Join(qs, ",")
 							}
+						} else if field == "review_experts" {
+							// 评审专家
+							if arr, ok :=tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
+								arr1 := qutil.ObjArrToStringArr(arr)
+								newTmp[field] = strings.Join(arr1, ",")
+							}
 						} else if field == "detail" { //过滤
 							detail, _ := tmp[field].(string)
 							if len([]rune(detail)) > detailLength {
@@ -383,60 +389,13 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 							if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 								continue
 							} else {
-								newTmp[field] = fieldval
+								if fieldval != "" {
+									newTmp[field] = fieldval
+								}
 							}
 						}
 					}
 				}
-				// for _, v := range biddingIndexFields { //
-				// 	if tmp[v] != nil && del[v] == nil { //
-				// 		if "projectinfo" == v {
-				// 			mp, _ := tmp[v].(map[string]interface{})
-				// 			if mp != nil {
-				// 				newmap := map[string]interface{}{}
-				// 				for _, v1 := range projectinfoFields {
-				// 					if mp[v1] != nil {
-				// 						newmap[v1] = fmt.Sprint(mp[v1])
-				// 					}
-				// 				}
-				// 				if len(newmap) > 0 {
-				// 					newTmp[v] = newmap
-				// 				}
-				// 			}
-				// 		} else if v == "purchasinglist" { //标的物处理
-				// 			purchasinglist_new := []map[string]interface{}{}
-				// 			if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
-				// 				for _, ls := range pcl {
-				// 					lsm_new := make(map[string]interface{})
-				// 					lsm := ls.(map[string]interface{})
-				// 					for _, pf := range purchasinglistFields {
-				// 						if lsm[pf] != nil {
-				// 							lsm_new[pf] = lsm[pf]
-				// 						}
-				// 					}
-				// 					if lsm_new != nil && len(lsm_new) > 0 {
-				// 						purchasinglist_new = append(purchasinglist_new, lsm_new)
-				// 					}
-				// 				}
-				// 			}
-				// 			if len(purchasinglist_new) > 0 {
-				// 				newTmp[v] = purchasinglist_new
-				// 			}
-				// 		} else {
-				// 			if v == "detail" {
-				// 				detail, _ := tmp[v].(string)
-				// 				if len([]rune(detail)) > detailLength {
-				// 					detail = detail[:detailLength]
-				// 				}
-				// 				newTmp[v] = FilterDetail(detail)
-				// 			} else {
-				// 				newTmp[v] = tmp[v]
-				// 			}
-				// 		}
-				// 	} /*else if v == "budget" || v == "bidamount" {
-				// 		newTmp[v] = nil
-				// 	}*/
-				// }
 				arrEs = append(arrEs, newTmp)
 			}
 			if len(update) > 0 {

+ 14 - 3
udpcreateindex/src/biddingindex.go

@@ -45,8 +45,11 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	defer extractmgo.DestoryMongoConn(extractsession)
 	extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter()
 	eMap := map[string]map[string]interface{}{}
-	extCount := 0
+	extCount, repeatCount := 0, 0
 	for tmp := make(map[string]interface{}); extractquery.Next(tmp); extCount++ {
+		if qutil.IntAll(tmp["repeat"]) == 1 {
+			repeatCount++
+		}
 		tid := mongodb.BsonIdToSId(tmp["_id"])
 		eMap[tid] = tmp
 		tmp = make(map[string]interface{})
@@ -55,6 +58,7 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	//bidding库
 	session := mgo.GetMgoConn()
 	count, _ := session.DB(db).C(c).Find(&q).Count()
+	log.Println("抽取表 重复数据量:", repeatCount)
 	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
 	n1, n2 := 0, 0
 	if count < 200000 {
@@ -405,13 +409,21 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 						if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 							newTmp[field] = qutil.Int64All(tmp[field])
 						}
+					} else if field == "review_experts" {
+						// 评审专家
+						if arr, ok :=tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
+							arr1 := qutil.ObjArrToStringArr(arr)
+							newTmp[field] = strings.Join(arr1, ",")
+						}
 					} else if field == "entidlist" {
 						newTmp[field] = tmp[field]
 					} else { //其它字段判断数据类型,不正确舍弃
 						if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 							continue
 						} else {
-							newTmp[field] = fieldval
+							if fieldval != "" {
+								newTmp[field] = fieldval
+							}
 						}
 					}
 				}
@@ -545,7 +557,6 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	}
 	if len(arrEs) > 0 {
 		tmps := arrEs
-		log.Println("---", tmps[0])
 		elastic.BulkSave(index, itype, &tmps, true)
 		if other_index != "" && other_itype != "" {
 			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)

+ 9 - 6
udpcreateindex/src/config.json

@@ -6,12 +6,12 @@
   "mongodb": {
     "addr": "192.168.3.207:27092",
     "pool": 10,
-    "db": "qfw_data"
+    "db": "wjh"
   },
   "savedb": {
     "addr": "192.168.3.207:27092",
     "size": 10,
-    "db": "qfw_data"
+    "db": "wjh"
   },
   "jkmail": {
     "to": "zhangjinkun@topnet.net.cn",
@@ -36,7 +36,7 @@
     "type": "bidding"
   },
   "bidding": {
-    "db": "qfw_data",
+    "db": "wjh",
     "collect": "bidding",
     "index": "bidding",
     "type": "bidding",
@@ -47,7 +47,8 @@
       "buyerperson", "agency", "projectscope", "projectcode", "bidopentime", "supervisorrate", "buyertel", "bidamount", "winner", "buyer", "budget", "projectname", "bidstatus", "buyerclass",
       "topscopeclass", "s_topscopeclass", "s_subscopeclass", "area", "city", "district", "s_winner", "_id", "title", "detail", "site", "comeintime", "href", "infoformat", "publishtime",
       "s_sha", "spidercode", "subtype", "toptype", "projectinfo", "purchasing", "purchasinglist", "filetext", "channel", "winnerorder", "project_scale", "project_duration", "project_timeunit",
-      "project_startdate", "project_completedate", "payway", "contract_guarantee", "bid_guarantee", "qualifies", "entidlist"
+      "project_startdate", "project_completedate", "payway", "contract_guarantee", "bid_guarantee", "qualifies", "entidlist", "funds", "review_experts", "bidmethod", "bidendtime", "bidopenaddress",
+      "docamount", "agencyrate", "agencyfee", "getdocmethod", "china_bidding", "bidway"
     ],
     "indexfieldsmap": {
       "buyerzipcode": "string", "winnertel": "string", "winnerperson": "string", "contractcode": "string", "winneraddr": "string", "agencyaddr": "string", "buyeraddr": "string", "signaturedate": "int64",
@@ -56,9 +57,11 @@
       "buyerclass": "string", "topscopeclass": "", "s_topscopeclass": "string", "s_subscopeclass": "string", "area": "string", "city": "string", "district": "string", "s_winner": "string", "_id": "", "title": "string",
       "detail": "string", "site": "string", "comeintime": "int64", "href": "string", "infoformat": "int", "publishtime": "int64", "s_sha": "string", "spidercode": "string", "subtype": "string", "toptype": "string",
       "projectinfo": "", "purchasing": "string", "purchasinglist": "", "filetext": "string", "channel": "string", "winnerorder": "", "project_scale": "string", "project_duration": "float64", "project_timeunit": "string",
-      "project_startdate": "int64", "project_completedate": "int64", "payway": "string", "contract_guarantee": "bool", "bid_guarantee": "bool", "qualifies": "", "entidlist": "[]string"
+      "project_startdate": "int64", "project_completedate": "int64", "payway": "string", "contract_guarantee": "bool", "bid_guarantee": "bool", "qualifies": "", "entidlist": "[]string", "funds": "string",
+      "review_experts": "", "bidmethod": "string", "bidendtime": "int64", "bidopenaddress": "string", "docamount": "float64", "agencyrate": "float64", "agencyfee": "float64", "bidway": "string",
+      "getdocmethod": "string", "china_bidding": "string"
     },
-    "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,s_topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging,winnerorder,project_scale,project_duration,project_timeunit,project_startdate,project_completedate, payway,contract_guarantee,bid_guarantee,qualifies",
+    "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,s_topscopeclass,area,city,district,s_winner,toptype,subtype,subscopeclass,s_subscopeclass,dataging,winnerorder,project_scale,project_duration,project_timeunit,project_startdate,project_completedate, payway,contract_guarantee,bid_guarantee,qualifies,funds,review_experts,bidmethod,bidendtime,bidopenaddress,docamount,bidway,agencyrate,agencyfee,getdocmethod",
     "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,approvedept,approvenumber,projecttype,approvecity",
     "projectinfomap": {
       "approvecode": "string", "approvecontent": "string", "approvestatus": "string", "approvetime": "string", "approvedept": "string", "approvenumber": "string", "projecttype": "string", "approvecity": "string"

BIN
udpcreateindex/src/go_run_creatIndex


+ 6 - 7
udpcreateindex/src/main.go

@@ -46,7 +46,7 @@ var SP = make(chan bool, 5)
 
 func init() {
 	util.ReadConfig(&Sysconfig)
-	//inits()
+	inits()
 	//go checkMapJob()
 	detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
 	fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
@@ -65,9 +65,9 @@ func init() {
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
 		DbName:      mconf["db"].(string),
-		//UserName:	 Sysconfig["uname"].(string),
-		//Password:    Sysconfig["upwd"].(string),
-		//ReplSet: 	 "bidding",
+		UserName:	 Sysconfig["uname"].(string),
+		Password:    Sysconfig["upwd"].(string),
+		ReplSet: 	 "bidding",
 	}
 	mgo.InitPool()
 	project2db = &mongodb.MongodbSim{
@@ -104,8 +104,8 @@ func init() {
 		MongodbAddr: standard["addr"].(string),
 		Size:        util.IntAllDef(standard["pool"], 5),
 		DbName:      standard["db"].(string),
-		//UserName:    Sysconfig["uname"].(string),
-		//Password:    Sysconfig["upwd"].(string),
+		UserName:    Sysconfig["uname"].(string),
+		Password:    Sysconfig["upwd"].(string),
 	}
 	mgostandard.InitPool()
 
@@ -192,7 +192,6 @@ func main() {
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
-	// time.Sleep(99999 * time.Hour)
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 1 - 1
udpcreateindex/src/util/ossclient.go

@@ -11,7 +11,7 @@ import (
 )
 
 var (
-	ossEndpoint        = "oss-cn-beijing.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
+	ossEndpoint        = "oss-cn-beijing-internal.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
 	ossAccessKeyId     = "LTAI4G5x9aoZx8dDamQ7vfZi"
 	ossAccessKeySecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
 	ossBucketName      = "topjy"