Prechádzať zdrojové kódy

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

fengweiqiang 5 rokov pred
rodič
commit
e845c1980e

+ 66 - 72
udpcreateindex/src/biddingall.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -67,7 +66,11 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 	var compare bson.M
 	bnil := false
 	for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
+		if deldata := qutil.IntAll(tmp["del"]); deldata == 1 { //临时:重复数据不生索引
+			continue
+		}
 		update := map[string]interface{}{}
+		del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
 		//对比方法----------------
 		for {
 			if compare == nil {
@@ -84,20 +87,22 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 					bnil = false
 					//更新bidding表,生成索引
 					for _, k := range fields { //fields更新到mongo的字段
-						v1 := compare[k]
-						v2 := tmp[k]
+						v1 := compare[k] //extract
+						v2 := tmp[k]     //bidding
 						if v2 == nil && v1 != nil {
 							update[k] = v1
 						} else if v2 != nil && v1 != nil {
-							//update[k+"_b"] = v2
 							update[k] = v1
-						} else if v2 != nil && v1 == nil {
-							//update[k+"_b"] = v2
+						} else if v2 != nil && v1 == nil { //
+							if k == "s_subscopeclass" && del["subscopeclass"] == nil {
+								continue
+							}
+							del[k] = 1
+							//qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
 						}
 					}
 					if qutil.IntAll(compare["repeat"]) == 1 {
 						update["extracttype"] = -1
-						//} else if qutil.IntAll(tmp["extracttype"]) == -1 {
 					} else {
 						update["extracttype"] = 1
 					}
@@ -120,21 +125,19 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 		//下面可以多线程跑的--->
 		//处理分类
 		mpool <- true
-		go func(tmp, update, compare map[string]interface{}, bnil bool) {
+		go func(tmp, update, compare, del map[string]interface{}, bnil bool) {
 			defer func() {
 				<-mpool
 			}()
 			if !bnil && compare != nil {
 				subscopeclass, _ := compare["subscopeclass"].([]interface{})
 				if subscopeclass != nil {
-					//str := ","
 					m1 := map[string]bool{}
 					newclass := []string{}
 					for _, sc := range subscopeclass {
 						sclass, _ := sc.(string)
 						if !m1[sclass] {
 							m1[sclass] = true
-							//str += sclass + ","
 							newclass = append(newclass, sclass)
 						}
 					}
@@ -142,72 +145,65 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 					update["subscopeclass"] = newclass
 				}
 				//处理中标企业
-				winner, _ := compare["winner"].(string)
-				m1 := map[string]bool{}
-				if winner != "" {
-					m1[winner] = true
-				}
-				package1 := compare["package"]
-				if package1 != nil {
-					packageM, _ := package1.(map[string]interface{})
-					for _, p := range packageM {
-						pm, _ := p.(map[string]interface{})
-						pw, _ := pm["winner"].(string)
-						if pw != "" {
-							m1[pw] = true
-						}
-					}
-				}
+				//				winner, _ := compare["winner"].(string)
+				//				m1 := map[string]bool{}
+				//				if winner != "" {
+				//					m1[winner] = true
+				//				}
+				//				package1 := compare["package"]
+				//				if package1 != nil {
+				//					packageM, _ := package1.(map[string]interface{})
+				//					for _, p := range packageM {
+				//						pm, _ := p.(map[string]interface{})
+				//						pw, _ := pm["winner"].(string)
+				//						if pw != "" {
+				//							m1[pw] = true
+				//						}
+				//					}
+				//				}
 				compare = nil
-				if len(m1) > 0 {
-					//str := ","
-					winnerarr := []string{}
-					for k, _ := range m1 {
-						//str += k + ","
-						winnerarr = append(winnerarr, k)
-					}
-					update["s_winner"] = strings.Join(winnerarr, ",")
-				}
+				//				if len(m1) > 0 {
+				//					//str := ","
+				//					winnerarr := []string{}
+				//					for k, _ := range m1 {
+				//						//str += k + ","
+				//						winnerarr = append(winnerarr, k)
+				//					}
+				//					update["s_winner"] = strings.Join(winnerarr, ",")
+				//				}
 			}
 			//------------------对比结束
-
-			//处理key descript
-			//		if bkey == "" {
-			//			DealInfo(&tmp, &update)
-			//		}
 			//同时保存到elastic
 			for tk, tv := range update {
 				tmp[tk] = tv
 			}
 			//对projectscope字段的索引处理
 			ps, _ := tmp["projectscope"].(string)
-			if ps == "" {
-				tmp["projectscope"] = "" //= tmp["detail"]
-			}
+			//			if ps == "" {
+			//				tmp["projectscope"] = ""
+			//			}
 			if len(ps) > ESLEN {
 				tmp["projectscope"] = string(([]rune(ps))[:4000])
 			}
-			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-				tmp["budget"] = nil
-			} else if sbd, ok := tmp["budget"].(string); ok {
-				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
-			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-				tmp["bidamount"] = nil
-			} else if sbd, ok := tmp["bidamount"].(string); ok {
-				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
-			//		for k1, _ := range tmp {
-			//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
-			//				delete(tmp, k1)
+
+			//预算和中标金额
+			//			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+			//				tmp["budget"] = nil
+			//			} else if sbd, ok := tmp["budget"].(string); ok {
+			//				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
 			//			}
-			//		}
+			//			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+			//				tmp["bidamount"] = nil
+			//			} else if sbd, ok := tmp["bidamount"].(string); ok {
+			//				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+			//			}
+
 			//go IS.Add("bidding")
 			UpdatesLock.Lock()
 			if qutil.IntAll(update["extracttype"]) != -1 {
 				newTmp := map[string]interface{}{}
 				for _, v := range biddingIndexFields { //
-					if tmp[v] != nil {
+					if tmp[v] != nil && del[v] == nil { //
 						if "projectinfo" == v {
 							mp, _ := tmp[v].(map[string]interface{})
 							if mp != nil {
@@ -232,8 +228,8 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 										}
 									}
 								}
+								con = FilterDetailSpace(con)
 								if con != "" {
-									con = FilterDetailSpace(con)
 									newTmp["attachments"] = con
 								}
 							}
@@ -245,21 +241,19 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 								newTmp[v] = tmp[v]
 							}
 						}
-					} else if v == "budget" || v == "bidamount" {
+					} /*else if v == "budget" || v == "bidamount" {
 						newTmp[v] = nil
-					}
+					}*/
 				}
 				arrEs = append(arrEs, newTmp)
 			}
 			if len(update) > 0 {
-				arr = append(arr, []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmp["_id"],
-					},
-					map[string]interface{}{
-						"$set": update,
-					},
-				})
+				queryId := map[string]interface{}{"_id": tmp["_id"]}
+				set := map[string]interface{}{"$set": update}
+				if len(del) > 0 { //删除的数据
+					set["$unset"] = del
+				}
+				arr = append(arr, []map[string]interface{}{queryId, set})
 			}
 			if len(arr) >= BulkSize {
 				mgo.UpdateBulkAll(db, c, arr...)
@@ -274,9 +268,9 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 				arrEs = []map[string]interface{}{}
 			}
 			UpdatesLock.Unlock()
-		}(tmp, update, compare, bnil)
-		if n%100 == 0 {
-			log.Println("current:", n)
+		}(tmp, update, compare, del, bnil)
+		if n%1000 == 0 {
+			log.Println("current:", n, tmp["_id"])
 		}
 		tmp = make(map[string]interface{})
 	}

+ 37 - 36
udpcreateindex/src/biddingdata.go

@@ -1,7 +1,7 @@
 package main
 
 import (
-	"fmt"
+	//	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -48,6 +48,7 @@ var indexfield = []string{
 	"buyerclass",
 	"district",
 	"topscopeclass",
+	"attachments",
 }
 
 //招标数据表和抽取表一一对应开始更新
@@ -179,32 +180,32 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 					update["subscopeclass"] = newclass
 				}
 				//处理中标企业
-				winner, _ := compare["winner"].(string)
-				m1 := map[string]bool{}
-				if winner != "" {
-					m1[winner] = true
-				}
-				package1 := compare["package"]
-				if package1 != nil {
-					packageM, _ := package1.(map[string]interface{})
-					for _, p := range packageM {
-						pm, _ := p.(map[string]interface{})
-						pw, _ := pm["winner"].(string)
-						if pw != "" {
-							m1[pw] = true
-						}
-					}
-				}
+				//				winner, _ := compare["winner"].(string)
+				//				m1 := map[string]bool{}
+				//				if winner != "" {
+				//					m1[winner] = true
+				//				}
+				//				package1 := compare["package"]
+				//				if package1 != nil {
+				//					packageM, _ := package1.(map[string]interface{})
+				//					for _, p := range packageM {
+				//						pm, _ := p.(map[string]interface{})
+				//						pw, _ := pm["winner"].(string)
+				//						if pw != "" {
+				//							m1[pw] = true
+				//						}
+				//					}
+				//				}
 				compare = nil
-				if len(m1) > 0 {
-					//str := ","
-					winnerarr := []string{}
-					for k, _ := range m1 {
-						//str += k + ","
-						winnerarr = append(winnerarr, k)
-					}
-					update["s_winner"] = strings.Join(winnerarr, ",")
-				}
+				//				if len(m1) > 0 {
+				//					//str := ","
+				//					winnerarr := []string{}
+				//					for k, _ := range m1 {
+				//						//str += k + ","
+				//						winnerarr = append(winnerarr, k)
+				//					}
+				//					update["s_winner"] = strings.Join(winnerarr, ",")
+				//				}
 			}
 			//------------------对比结束
 
@@ -220,16 +221,16 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 			if len(ps) > ESLEN {
 				tmp["projectscope"] = string(([]rune(ps))[:4000])
 			}
-			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-				tmp["budget"] = nil
-			} else if sbd, ok := tmp["budget"].(string); ok {
-				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
-			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-				tmp["bidamount"] = nil
-			} else if sbd, ok := tmp["bidamount"].(string); ok {
-				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
+			//			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+			//				tmp["budget"] = nil
+			//			} else if sbd, ok := tmp["budget"].(string); ok {
+			//				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+			//			}
+			//			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+			//				tmp["bidamount"] = nil
+			//			} else if sbd, ok := tmp["bidamount"].(string); ok {
+			//				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+			//			}
 
 			if qutil.IntAll(update["extracttype"]) != -1 {
 				newTmp := map[string]interface{}{}

+ 41 - 42
udpcreateindex/src/biddingindex.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"encoding/json"
-	"fmt"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -143,32 +142,32 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 				update["subscopeclass"] = newclass
 			}
 			//处理中标企业
-			winner, _ := compare["winner"].(string)
-			m1 := map[string]bool{}
-			if winner != "" {
-				m1[winner] = true
-			}
-			package1 := compare["package"]
-			if package1 != nil {
-				packageM, _ := package1.(map[string]interface{})
-				for _, p := range packageM {
-					pm, _ := p.(map[string]interface{})
-					pw, _ := pm["winner"].(string)
-					if pw != "" {
-						m1[pw] = true
-					}
-				}
-			}
+			//			winner, _ := compare["winner"].(string)
+			//			m1 := map[string]bool{}
+			//			if winner != "" {
+			//				m1[winner] = true
+			//			}
+			//			package1 := compare["package"]
+			//			if package1 != nil {
+			//				packageM, _ := package1.(map[string]interface{})
+			//				for _, p := range packageM {
+			//					pm, _ := p.(map[string]interface{})
+			//					pw, _ := pm["winner"].(string)
+			//					if pw != "" {
+			//						m1[pw] = true
+			//					}
+			//				}
+			//			}
 			compare = nil
-			if len(m1) > 0 {
-				//str := ","
-				winnerarr := []string{}
-				for k, _ := range m1 {
-					//str += k + ","
-					winnerarr = append(winnerarr, k)
-				}
-				update["s_winner"] = strings.Join(winnerarr, ",")
-			}
+			//			if len(m1) > 0 {
+			//				//str := ","
+			//				winnerarr := []string{}
+			//				for k, _ := range m1 {
+			//					//str += k + ","
+			//					winnerarr = append(winnerarr, k)
+			//				}
+			//				update["s_winner"] = strings.Join(winnerarr, ",")
+			//			}
 		}
 		//------------------对比结束
 
@@ -182,22 +181,22 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		}
 		//对projectscope字段的索引处理
 		ps, _ := tmp["projectscope"].(string)
-		if ps == "" {
-			tmp["projectscope"] = "" //= tmp["detail"]
-		}
+		//		if ps == "" {
+		//			tmp["projectscope"] = "" //= tmp["detail"]
+		//		}
 		if len(ps) > ESLEN {
 			tmp["projectscope"] = string(([]rune(ps))[:4000])
 		}
-		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			tmp["budget"] = nil
-		} else if sbd, ok := tmp["budget"].(string); ok {
-			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
-		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			tmp["bidamount"] = nil
-		} else if sbd, ok := tmp["bidamount"].(string); ok {
-			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
+		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+		//			tmp["budget"] = nil
+		//		} else if sbd, ok := tmp["budget"].(string); ok {
+		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
+		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+		//			tmp["bidamount"] = nil
+		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
+		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
 		UpdatesLock.Lock()
 		//		for k1, _ := range tmp {
 		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
@@ -236,8 +235,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 									}
 								}
 							}
+							con = FilterDetailSpace(con)
 							if con != "" {
-								con = FilterDetailSpace(con)
 								newTmp["attachments"] = con
 							}
 						}
@@ -249,9 +248,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 							newTmp[v] = tmp[v]
 						}
 					}
-				} else if v == "budget" || v == "bidamount" {
+				} /*else if v == "budget" || v == "bidamount" {
 					newTmp[v] = nil
-				}
+				}*/
 			}
 			arrEs = append(arrEs, newTmp)
 		}

+ 33 - 16
udpcreateindex/src/biddingindexback.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -86,22 +85,22 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 		}
 
 		ps, _ := tmp["projectscope"].(string)
-		if ps == "" {
-			tmp["projectscope"] = "" //= tmp["detail"]
-		}
+		//		if ps == "" {
+		//			tmp["projectscope"] = "" //= tmp["detail"]
+		//		}
 		if len(ps) > ESLEN {
 			tmp["projectscope"] = string(([]rune(ps))[:4000])
 		}
-		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			tmp["budget"] = nil
-		} else if sbd, ok := tmp["budget"].(string); ok {
-			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
-		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			tmp["bidamount"] = nil
-		} else if sbd, ok := tmp["bidamount"].(string); ok {
-			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
+		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+		//			tmp["budget"] = nil
+		//		} else if sbd, ok := tmp["budget"].(string); ok {
+		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
+		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+		//			tmp["bidamount"] = nil
+		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
+		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
 		UpdatesLock.Lock()
 		newTmp := map[string]interface{}{}
 		for _, v := range biddingIndexFields {
@@ -116,6 +115,24 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 							}
 						}
 						newTmp[v] = newmap
+						attachments := mp["attachments"]
+						con := ""
+						if attachments != nil {
+							am, _ := attachments.(map[string]interface{})
+							if am != nil {
+								for _, v1 := range am {
+									vm, _ := v1.(map[string]interface{})
+									if vm != nil {
+										c, _ := vm["content"].(string)
+										con += c
+									}
+								}
+							}
+						}
+						con = FilterDetailSpace(con)
+						if con != "" {
+							newTmp["attachments"] = con
+						}
 					}
 				} else {
 					if v == "detail" {
@@ -125,9 +142,9 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 						newTmp[v] = tmp[v]
 					}
 				}
-			} else if v == "budget" || v == "bidamount" {
+			} /* else if v == "budget" || v == "bidamount" {
 				newTmp[v] = nil
-			}
+			}*/
 		}
 		arrEs = append(arrEs, newTmp)
 		if len(arrEs) >= BulkSizeBack {

+ 15 - 16
udpcreateindex/src/config.json

@@ -1,13 +1,13 @@
 {
     "udpport": ":1483",
-    "msg_server": "192.168.3.207:27092",
+    "msg_server": "10.171.112.160:7070",
 	"savedb": {
-        "addr": "192.168.3.207:27092",
-        "size": 6,
-        "db": "extract_v3"
+        "addr": "172.17.4.187:27083",
+        "size": 10,
+        "db": "qfw"
     },
     "jkmail": {
-        "to":"renzheng@topnet.net.cn",
+        "to":"zhangjinkun@topnet.net.cn",
 		"api":"http://10.171.112.160:19281/_send/_mail"
     },
     "winner": {
@@ -30,31 +30,30 @@
     },
     "bidding": {
         "db": "qfw",
-        "collect": "bidding",
-        "index": "bidding",
+        "collect": "bidding_back",
+        "index": "bidding_v1",
         "type": "bidding",
-        "extractdb": "extract_v3",
-        "extractcollect": "result_v3",
-        "indexfields": [
-            "_id","district","topscopeclass","s_winner","winner","buyerclass","title","detail","area","site","bidopendate","bidopentime","buyer","city","comeintime","href","infoformat","projectcode","projectname","publishtime","s_sha","spidercode","subtype","toptype","agency","budget","bidamount","s_subscopeclass","projectscope","bidstatus","projectinfo","buyertel","buyerperson","buyeraddr","buyerzipcode","winnertel","winnerperson","projectid"
+        "extractdb": "qfw",
+        "extractcollect": "result_20200116",
+        "indexfields":[ "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo"
         ],
-        "fields": "buyerclass,projectname,projectcode,bidamount,budget,agency,amount,winner,buyer,bidopendate,bidopentime,bidstatus,projectscope,buyertel,buyerperson,buyeraddr,buyerzipcode,city,area,district,topscopeclass,winnertel,winnerperson",
+        "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,area,city,district,s_winner",
         "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,industry",
         "multiIndex": ""
     },
     "project": {
-        "db": "qfw",
-        "collect": "projectset",
+        "db": "extract_kf",
+        "collect": "huawei_project",
         "index": "projectset_v1",
         "type": "projectset"
     },
     "mongodb": {
-        "addr": "192.168.3.207:27092",
+        "addr": "10.172.242.243:27080,10.30.94.175:27081,10.81.232.246:27082",
         "pool": 10,
         "db": "qfw"
     },
     "elastic": {
-        "addr": "http://39.96.199.144:9800",
+        "addr": "http://172.17.145.170:9800",
         "pool": 12
     }
 }

+ 10 - 8
udpcreateindex/src/projectindex.go

@@ -1,7 +1,7 @@
 package main
 
 import (
-	"fmt"
+	//"fmt"
 	"log"
 	"qfw/util"
 	elastic "qfw/util/elastic"
@@ -31,19 +31,21 @@ func projectTask(data []byte, mapInfo map[string]interface{}) {
 
 	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
 	query := session.DB(db).C(c).Find(q).Iter()
-
 	arr := make([]map[string]interface{}, savesizei)
 	var n int
 	i := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
 		delete(tmp, "package")
 		delete(tmp, "winnerorder")
-		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			tmp["budget"] = nil
-		}
-		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			tmp["bidamount"] = nil
-		}
+		delete(tmp, "infofield")
+		delete(tmp, "budgettag")
+		delete(tmp, "bidamounttag")
+		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+		//			tmp["budget"] = nil
+		//		}
+		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+		//			tmp["bidamount"] = nil
+		//		}
 		//go IS.Add("project")
 		arr[i] = tmp
 		n++

+ 4 - 3
udpfilterdup/src/config.json

@@ -3,9 +3,9 @@
     "dupdays": 5,
     "mongodb": {
         "addr": "192.168.3.207:27092",
-        "pool": 5,
+        "pool": 10,
         "db": "extract_kf",
-        "extract": "demo_data3.2",
+        "extract": "zk",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -17,7 +17,8 @@
     },
     "nextNode": [],
     "isMerger": false,
-    "threads": 4,
+    "threads": 1,
+    "isSort":true,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
     "specialtitle_2": "项目([0-9a-zA-Z一二三四五六七八九十零123456789])",

+ 26 - 7
udpfilterdup/src/datamap.go

@@ -78,7 +78,10 @@ func NewDatamap(days int, lastid string) *datamap {
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 {
 			continuSum++
 		} else {
-			pt := tmp["publishtime"]
+			pt := tmp["comeintime"]
+			if Is_Sort {
+				pt = tmp["publishtime"]
+			}
 			pt_time := qutil.Int64All(pt)
 			if pt_time <= 0 {
 				continue
@@ -124,7 +127,10 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("-_id").Iter()
 	m, n := 0, 0
 	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); {
-		pt_s := tmp_start["publishtime"]
+		pt_s := tmp_start["comeintime"]
+		if Is_Sort {
+			pt_s = tmp_start["publishtime"]
+		}
 		pt_time := qutil.Int64All(pt_s)
 		if pt_time <= 0 {
 			continue
@@ -155,7 +161,10 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("_id").Iter()
 
 	for tmp_last := make(map[string]interface{}); it_last.Next(&tmp_last); {
-		pt_l := tmp_last["publishtime"]
+		pt_l := tmp_last["comeintime"]
+		if Is_Sort {
+			pt_l = tmp_last["publishtime"]
+		}
 		pt_time := qutil.Int64All(pt_l)
 		if pt_time <= 0 {
 			continue
@@ -342,8 +351,10 @@ L:
 
 	//往预存数据 d 添加
 	if !b {
-		//ct := info.publishtime
 		ct := info.comeintime
+		if Is_Sort {
+			ct = info.publishtime
+		}
 		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
 		d.lock.Lock()
@@ -497,8 +508,10 @@ L:
 	}
 	//往预存数据 d 添加
 	if !b {
-		//ct := info.publishtime
 		ct := info.comeintime
+		if Is_Sort {
+			ct = info.publishtime
+		}
 		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 		k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
 		data := h.data[k]
@@ -519,7 +532,10 @@ L:
 
 //替换原始数据池
 func (d *datamap) replaceSourceData(replaceData *Info, replaceId string) {
-	ct := replaceData.publishtime
+	ct := replaceData.comeintime
+	if Is_Sort {
+		ct = replaceData.publishtime
+	}
 	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 	k := fmt.Sprintf("%s_%s_%s", dkey, replaceData.subtype, replaceData.area)
 	d.lock.Lock()
@@ -544,7 +560,10 @@ func (d *datamap) replaceSourceData(replaceData *Info, replaceId string) {
 }
 
 func (h *historymap) replaceSourceData(replaceData *Info, replaceId string) {
-	ct := replaceData.publishtime
+	ct := replaceData.comeintime
+	if Is_Sort {
+		ct = replaceData.publishtime
+	}
 	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
 	k := fmt.Sprintf("%s_%s_%s", dkey, replaceData.subtype, replaceData.area)
 	h.lock.Lock()

+ 0 - 205
udpfilterdup/src/datamap.go.bak

@@ -1,205 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"log"
-	"math"
-	qutil "qfw/util"
-	"strings"
-	"sync"
-	"time"
-)
-
-type Info struct {
-	id                 string
-	title              string
-	area               string
-	city               string
-	subtype            string
-	buyer              string
-	agency             string //代理机构
-	winner             string //中标单位
-	projectname        string
-	projectcode        string
-	publishtime        int64
-	ContainSpecialWord bool
-}
-
-var datelimit = float64(432000)
-
-type datamap struct {
-	lock   sync.Mutex //锁
-	days   int        //保留几天数据
-	data   map[string][]*Info
-	keymap []string
-}
-
-func NewDatamap(days int) *datamap {
-	datelimit = qutil.Float64All(days * 86400)
-	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}}
-	dm.keymap = dm.GetLatelyFiveDay()
-	//初始化加载数据
-	sess := mgo.GetMgoConn()
-	defer mgo.DestoryMongoConn(sess)
-	it := sess.DB(mgo.DbName).C(extract).Find(nil).Sort("-_id").Iter()
-	now1 := time.Now().Unix()
-	n, continuSum := 0, 0
-	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
-		//
-		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.ObjToString(tmp["subtype"]) == "变更" {
-			continuSum++
-		} else {
-			cm := tmp["comeintime"]
-			comeintime := qutil.Int64All(cm)
-			if qutil.Float64All(now1-comeintime) < datelimit {
-				info := NewInfo(tmp)
-				k := fmt.Sprintf("%s_%s_%s", qutil.FormatDateWithObj(&cm, qutil.Date_yyyyMMdd), info.subtype, info.area)
-				data := dm.data[k]
-				if data == nil {
-					data = []*Info{}
-					//log.Println(k)
-				}
-				data = append(data, info)
-				dm.data[k] = data
-			} else {
-				break
-			}
-		}
-		if n%5000 == 0 {
-			log.Println("current n:", n, continuSum)
-		}
-		tmp = make(map[string]interface{})
-	}
-	log.Println("load data:", n)
-	//启动定时任务
-	now := time.Now()
-	t2 := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.Local)
-	go time.AfterFunc(time.Duration(int64(t2.Unix()-now.Unix()))*time.Second, func() {
-		//go time.AfterFunc(time.Duration(10)*time.Second, func() {
-		dm.update()
-	})
-	return dm
-}
-func NewInfo(tmp map[string]interface{}) *Info {
-	subtype := qutil.ObjToString(tmp["subtype"])
-	area := qutil.ObjToString(tmp["area"])
-	if area == "A" {
-		area = "全国"
-	}
-	info := &Info{}
-	info.id = qutil.BsonIdToSId(tmp["_id"])
-	info.title = qutil.ObjToString(tmp["title"])
-	info.area = area
-	info.subtype = subtype
-	info.buyer = qutil.ObjToString(tmp["buyer"])
-	info.projectname = qutil.ObjToString(tmp["projectname"])
-	info.ContainSpecialWord = FilterRegexp.MatchString(info.projectname) || FilterRegexp.MatchString(info.title)
-	info.projectcode = qutil.ObjToString(tmp["projectcode"])
-	info.city = qutil.ObjToString(tmp["city"])
-	info.agency = qutil.ObjToString(tmp["agency"])
-	//info.winner = qutil.ObjToString(tmp["winner"])
-	info.publishtime = qutil.Int64All(tmp["publishtime"])
-	return info
-}
-
-func (d *datamap) check(info *Info) (b bool, id string) {
-	d.lock.Lock()
-	defer d.lock.Unlock()
-	keys := []string{}
-	for _, k := range d.keymap {
-		keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
-		if info.area != "全国" { //这个后续可以不要
-			keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
-		}
-	}
-L:
-	for _, k := range keys {
-		data := d.data[k]
-		if len(data) > 1 { //对比
-			for _, v := range data {
-				if math.Abs(qutil.Float64All(v.publishtime-info.publishtime)) > datelimit {
-					continue
-				}
-				if v.agency != "" && info.agency != "" && v.agency != info.agency {
-					continue
-				}
-				n := 0
-				if v.buyer != "" && v.buyer == info.buyer {
-					n++
-				}
-				if v.projectname != "" && v.projectname == info.projectname {
-					n++
-				}
-				if !info.ContainSpecialWord && n > 1 {
-					b = true
-					id = v.id
-					break L
-				} else if v.projectcode != "" && v.projectcode == info.projectcode {
-					n++
-				}
-				if !info.ContainSpecialWord && n > 1 || n > 2 {
-					b = true
-					id = v.id
-					break L
-				}
-				//标题长度大于10且相等即为重复
-				//				if len([]rune(info.title)) > 10 && v.title == info.title {
-				//					b = true
-				//					id = v.id
-				//					break L
-				//				}
-				//标题长度大于10且包含关系+buyer/projectname/projectcode/city(全国/A的只判断包含关系即可)相等即为重复
-				if len([]rune(info.title)) > 10 && len([]rune(v.title)) > 10 && (strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
-					if info.area == "全国" || n > 0 || info.city == v.city {
-						b = true
-						id = v.id
-						break L
-					}
-				}
-			}
-		}
-	}
-	if !b {
-		k := fmt.Sprintf("%s_%s_%s", time.Now().Format(qutil.Date_yyyyMMdd), info.subtype, info.area)
-		data := d.data[k]
-		if data == nil {
-			data = []*Info{info}
-		} else {
-			data = append(data, info)
-		}
-		d.data[k] = data
-	}
-	return
-}
-
-func (d *datamap) update() {
-	//每天0点清除历史数据
-	d.lock.Lock()
-	now, now1 := time.Now(), time.Now()
-	t2 := time.Date(now1.Year(), now1.Month(), now1.Day()+1, 0, 0, 0, 0, time.Local)
-	date := now.AddDate(0, 0, -d.days).Format(qutil.Date_yyyyMMdd)
-	all, all1 := 0, 0
-	for k, v := range d.data {
-		all += len(v)
-		if strings.HasPrefix(k, date) {
-			delete(d.data, k)
-		}
-	}
-	for _, v := range d.data {
-		all1 += len(v)
-	}
-	log.Println("更新前后数据:", all, all1)
-	d.keymap = d.GetLatelyFiveDay()
-	d.lock.Unlock()
-	time.AfterFunc(time.Duration(int64(t2.Unix()-now1.Unix()))*time.Second, d.update)
-}
-
-func (d *datamap) GetLatelyFiveDay() []string {
-	array := make([]string, d.days)
-	now := time.Now()
-	for i := 0; i < d.days; i++ {
-		array[i] = now.Format(qutil.Date_yyyyMMdd)
-		now = now.AddDate(0, 0, -1)
-	}
-	return array
-}

+ 28 - 16
udpfilterdup/src/main.go

@@ -29,15 +29,14 @@ var (
 	DM        *datamap                 //
 	HM        *historymap              //判重数据
 	lastid    = ""
-	/*
-		5da3f31aa5cb26b9b798d3aa
-	*/
+
 	//正则筛选相关
 	FilterRegTitle   = regexp.MustCompile("^_$")
 	FilterRegTitle_1 = regexp.MustCompile("^_$")
 	FilterRegTitle_2 = regexp.MustCompile("^_$")
 
 	isMerger         bool                              //是否合并
+	Is_Sort          bool                              //是否排序
 	threadNum        int                               //线程数量
 	SiteMap          map[string]map[string]interface{} //站点map
 	idtype, sid, eid string                            //测试人员判重使用
@@ -60,7 +59,6 @@ func init() {
 	}
 	mgo.InitPool()
 	extract = mconf["extract"].(string)
-
 	dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
 	//加载数据
 	DM = NewDatamap(dupdays, lastid)
@@ -68,6 +66,7 @@ func init() {
 	FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
 	FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
 	isMerger = Sysconfig["isMerger"].(bool)
+	Is_Sort = Sysconfig["isSort"].(bool)
 	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
 
 	//站点配置
@@ -107,9 +106,9 @@ func mainT() {
 		ObjectId("5df5071ce9d1f601e495fa54")
 		ObjectId("5e09c05f0cf41612e0626abc")
 	*/
-	sid = "5df5071ce9d1f601e495fa54"
-	eid = "5e09c05f0cf41612e0626abc"
-
+	log.Println("测试开始")
+	sid = "5da3f31aa5cb26b9b798d3aa"
+	eid = "5da418c4a5cb26b9b7e3e9a6"
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
 		log.Println("sid,eid参数不能为空")
@@ -118,7 +117,7 @@ func mainT() {
 	mapinfo["gtid"] = sid
 	mapinfo["lteid"] = eid
 	mapinfo["stop"] = "true"
-	task([]byte{}, mapinfo)
+	historyTask([]byte{}, mapinfo)
 	time.Sleep(10 * time.Second)
 }
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
@@ -183,12 +182,16 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println(mgo.DbName, extract, q)
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
-	//it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+
+	//是否排序
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
+	if Is_Sort {
+		it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	}
+	//it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
 	updateExtract := [][]map[string]interface{}{}
 	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)
-
 	wg := &sync.WaitGroup{}
 	//mapLock := &sync.Mutex{}
 	n, repeateN := 0, 0
@@ -397,11 +400,15 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	minTime, maxTime := int64(0), int64(0)
 	for tmp := make(map[string]interface{}); it.Next(&tmp); {
 		//取出最大最小时间
-		if minTime == 0 || maxTime == 0 && util.Int64All(tmp["publishtime"]) != 0 {
-			minTime = util.Int64All(tmp["publishtime"])
-			maxTime = util.Int64All(tmp["publishtime"])
+		info_time:=tmp["comeintime"]
+		if Is_Sort {
+			info_time = tmp["publishtime"]
+		}
+		if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
+			minTime = util.Int64All(info_time)
+			maxTime = util.Int64All(info_time)
 		} else {
-			t := util.Int64All(tmp["publishtime"])
+			t := util.Int64All(info_time)
 			if t < minTime && t != 0 {
 				minTime = t
 			}
@@ -412,7 +419,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	}
 	//时间不正确时
 	if minTime == 0 && maxTime == 0 {
-		log.Println("段数据区间 publishtime不符合")
+		log.Println("段数据区间 不符合")
 		return
 	}
 	fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
@@ -442,7 +449,12 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 		}
 	}
 	log.Println(mgo.DbName, extract, q_history)
-	it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
+
+	//是否排序
+	it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
+	if Is_Sort {
+		it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
+	}
 	updateExtract := [][]map[string]interface{}{}
 	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)

+ 1 - 1
udpfilterdup/src/mgo.go

@@ -144,7 +144,7 @@ func (m *MongodbSim) InitPool() {
 	opts := options.Client()
 	opts.SetConnectTimeout(3 * time.Second)
 	opts.ApplyURI("mongodb://" + m.MongodbAddr)
-	opts.SetMaxPoolSize(uint64(m.Size))
+	opts.SetMaxPoolSize(uint16(m.Size))
 	m.pool = make(chan bool, m.Size)
 	opts.SetMaxConnIdleTime(2 * time.Hour)
 	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)

+ 1 - 1
udps/main.go

@@ -30,7 +30,7 @@ ObjectId("5da418c4a5cb26b9b7e3e9a6")
 	flag.StringVar(&sid, "sid", "", "开始id")
 	flag.StringVar(&eid, "eid", "", "结束id")
 	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
-	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
+	flag.StringVar(&endDate, "end", "2019-11-10", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
 	flag.StringVar(&port, "port", "1488", "dup端口")
 	flag.StringVar(&stype, "stype", "", "stype")