Forráskód Böngészése

Merge branch 'dev3.4' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4

maxiaoshan 5 éve
szülő
commit
e557b2979b

+ 2 - 1
udp_city/src/config.json

@@ -4,8 +4,9 @@
   "dbsize": 3,
   "dbname": "enterprise",
   "mgodb_bidding": "192.168.3.207:27092",
-  "mgodb_bidding_xs": "10.172.242.243:27080",
+  "mgodb_bidding_xs": "172.17.4.187:27083",
   "dbname_bidding": "qfw",
+  "findDb": "result_20200714",
   "udpport": "1485",
   "nextNode": [
     {

+ 53 - 52
udp_city/src/main.go

@@ -8,6 +8,7 @@ import (
 	mgo "mongodbutil"
 	"net"
 	qu "qfw/util"
+	"strings"
 )
 
 var Udpclient mu.UdpClient //udp对象
@@ -16,15 +17,19 @@ var Config map[string]interface{}
 var PageSize = 5000 //查询分页
 var biddingFields = `{"buyer":1,"modifyinfo":1}`
 var qyxyFields = `{"company_code":1,"province":1,"city":1,"district":1}`
+var findDb string
+var cc chan bool = make(chan bool, 5)
 
 func init() {
 	qu.ReadConfig(&Config)
 	if len(Config) == 0 {
 		log.Fatal("读取配置文件失败", Config)
 	}
+	findDb = qu.ObjToString(Config["findDb"])
 	initCap := qu.IntAll(Config["dbsize"])
 	addr := qu.ObjToString(Config["mgodb"])
 	dbname := qu.ObjToString(Config["dbname"])
+	cc = make(chan bool, 3)
 	mgo.Mgo = mgo.MgoFactory(initCap, initCap*3, 120, addr, dbname)
 	mgo.Mgo_Bidding = mgo.MgoFactory(initCap, initCap*3, 120, qu.ObjToString(Config["mgodb_bidding"]), qu.ObjToString(Config["dbname_bidding"]))
 	nextNodes = qu.ObjArrToMapArr(Config["nextNode"].([]interface{}))
@@ -50,7 +55,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				go Udpclient.WriteUdp(data, mu.OP_NOOP, ra)
 				log.Println("udp通知抽取id段", sid, " ", eid)
 
-				getCity(sid, eid)
+				getCity(sid, eid, qu.ObjToString(rep["stype"]))
 				log.Println("udp通知抽取完成,eid", eid)
 				for _, m := range nextNodes {
 					by, _ := json.Marshal(map[string]interface{}{
@@ -73,73 +78,67 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
-func getCity(sid, eid string) {
+func getCity(sid, eid, rep string) {
 	index := 0
+	var unum int64
 	query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
-	count := mgo.Mgo_Bidding.Count("bidding", query)
-	count_bak := mgo.Mgo_Bidding.Count("bidding_bak", query)
-	count += count_bak
+	count := mgo.Mgo_Bidding.Count(findDb, query)
 	log.Println("查询条件为:", query, "查询条数:", count)
 	pageNum := (count + PageSize - 1) / PageSize
 	limit := PageSize
 	if count < PageSize {
 		limit = count
 	}
-	table := "bidding_bak"
+	table := findDb
 	for i := 0; i < pageNum; i++ {
-		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid),"$lte":bson.ObjectIdHex(eid)}}
-		if mgo.Mgo_Bidding.Count(table, query) < 1 {
-			table = "bidding"
-		} else if table == "bidding_bak" && mgo.Mgo_Bidding.Count("bidding", query) > 0 {
-			log.Printf("page=%d,query=%v,db=%v\n", i+1, query, "bidding")
-			list2, _ := mgo.Mgo_Bidding.Find("bidding", query, nil, biddingFields, false, 0, limit)
-			processingCity(sid, eid, list2, index, "bidding", i)
-		}
+		query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
 		log.Printf("page=%d,query=%v,db=%v\n", i+1, query, table)
 		list, _ := mgo.Mgo_Bidding.Find(table, query, nil, biddingFields, false, 0, limit)
-		processingCity(sid, eid, list, index, table, i)
-	}
-}
+		for _, v := range *list {
+			if qu.ObjToString(v["district"]) != "" && qu.ObjToString(v["city"]) != "" && qu.ObjToString(v["area"]) != "" {
+				index++
+				continue
+			}
+			if qu.ObjToString(v["buyer"]) == "" {
+				index++
+				continue
+			}
 
-func processingCity(sid string, eid string, list *[]map[string]interface{}, index int, table string, i int) {
-	for _, v := range *list {
-		if qu.ObjToString(v["district"]) != "" && qu.ObjToString(v["city"]) != "" && qu.ObjToString(v["area"]) != "" {
-			index++
-			continue
-		}
-		if qu.ObjToString(v["buyer"]) == "" {
+			_id := qu.BsonIdToSId(v["_id"])
+			cc <- true
+			go func(v map[string]interface{}) {
+				rdata := cityMarshal(v)
+				if len(rdata) > 0 {
+					umap := make(map[string]interface{})
+					if v["modifyinfo"] == nil {
+						umap["modifyinfo"] = make(map[string]interface{})
+					} else {
+						umap["modifyinfo"] = v["modifyinfo"]
+					}
+					for rk, rv := range rdata {
+						umap[rk] = rv
+						umap["modifyinfo"].(map[string]interface{})[rk] = "企业信息"
+					}
+					unum++
+					log.Println(unum, ",id:", _id)
+					mgo.Mgo_Bidding.UpdateById(table, v["_id"], map[string]interface{}{
+						"$set": umap,
+					})
+				}
+				<-cc
+			}(v)
 			index++
-			continue
-		}
-
-		_id := qu.BsonIdToSId(v["_id"])
-		rdata := cityMarshal(v)
-		if len(rdata) > 0 {
-			umap := make(map[string]interface{})
-			if v["modifyinfo"] == nil {
-				umap["modifyinfo"] = make(map[string]interface{})
-			} else {
-				umap["modifyinfo"] = v["modifyinfo"]
+			if index%1000 == 0 {
+				log.Println("index:", index, ",页码:", i+1, ",_id:", _id)
 			}
-			for rk, rv := range rdata {
-				umap[rk] = rv
-				umap["modifyinfo"].(map[string]interface{})[rk] = "企业信息"
+			sid = _id
+			if sid >= eid {
+				break
 			}
-			mgo.Mgo_Bidding.UpdateById(table, v["_id"], map[string]interface{}{
-				"$set": umap,
-			})
-
-		}
-		index++
-		if index%1000 == 0 {
-			log.Println("index:", index, ",页码:", i+1, ",_id:", _id)
-		}
-		sid = _id
-		if sid >= eid {
-			break
 		}
 	}
 }
+
 func cityMarshal(data map[string]interface{}) map[string]string {
 	rdata := make(map[string]string)
 	buyer := qu.ObjToString(data["buyer"])
@@ -154,16 +153,18 @@ func cityMarshal(data map[string]interface{}) map[string]string {
 			if province := qu.ObjToString((*province_city_district)["province"]); province != "" {
 				rdata["area"] = province
 			}
-			if city := qu.ObjToString((*province_city_district)["city"]); city != "" {
+			if city := qu.ObjToString((*province_city_district)["city"]); city != "" && !strings.Contains(city, rdata["area"]) {
 				rdata["city"] = city
 			}
-			if district := qu.ObjToString((*province_city_district)["district"]); district != "" {
+			if district := qu.ObjToString((*province_city_district)["district"]); district != "" && !strings.Contains(district, rdata["area"]) {
 				rdata["district"] = district
 			}
 			return rdata
 		}
 	}
 	if province := qu.ObjToString((*tmp)["province"]); province != "" {
+		province = strings.TrimRight(province, "省")
+		province = strings.TrimRight(province, "市")
 		rdata["area"] = province
 	}
 	if city := qu.ObjToString((*tmp)["city"]); city != "" {

+ 41 - 1
udpfilterdup/src/README.md

@@ -35,4 +35,44 @@
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
     "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
     "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
-}
+}
+
+
+
+
+
+
+
+
+{
+    "udpport": ":17859",
+    "dupdays": 5,
+    "mongodb": {
+        "addr": "192.168.3.207:27092",
+        "pool": 5,
+        "db": "extract_kf",
+        "extract": "a_testbidding",
+        "extract_back": "a_testbidding",
+        "site": {
+            "dbname": "extract_kf",
+            "coll": "site"
+        }
+    },
+    "jkmail": {
+        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+    "nextNode": [
+    ],
+    "threads": 1,
+    "isMerger": true,
+    "lowHeavy":true,
+    "timingTask":false,
+    "timingSpanDay": 3,
+    "timingPubScope": 720,
+    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
+    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
+    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
+    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
+}

+ 6 - 12
udpfilterdup/src/config.json

@@ -1,14 +1,14 @@
 {
-    "udpport": ":1785",
+    "udpport": ":17859",
     "dupdays": 5,
     "mongodb": {
-        "addr": "172.17.4.187:27083",
+        "addr": "192.168.3.207:27092",
         "pool": 5,
-        "db": "qfw",
-        "extract": "result_file_20200410",
-        "extract_back": "result_file_20200409",
+        "db": "extract_kf",
+        "extract": "a_testbidding",
+        "extract_back": "a_testbidding",
         "site": {
-            "dbname": "qfw",
+            "dbname": "extract_kf",
             "coll": "site"
         }
     },
@@ -17,12 +17,6 @@
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
     "nextNode": [
-        {
-            "addr": "127.0.0.1",
-            "port": 1783,
-            "stype": "bidding",
-            "memo": "创建招标数据索引new"
-        }
     ],
     "threads": 1,
     "isMerger": true,

+ 16 - 19
udpfilterdup/src/main.go

@@ -106,7 +106,6 @@ func init() {
 
 
 func main() {
-
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
@@ -299,22 +298,22 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						//mongo更新 - 具体字段 - merge
 						mgo.UpdateById(extract,source.id,update_map)
 						//发udp  更新索引
-						for _, to := range nextNode {
-							key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
-							by, _ := json.Marshal(map[string]interface{}{
-								"gtid":  source.id,
-								"lteid": source.id,
-								"stype": "biddingall",
-								"key":   key,
-							})
-							addr := &net.UDPAddr{
-								IP:   net.ParseIP(to["addr"].(string)),
-								Port: util.IntAll(to["port"]),
-							}
-							node := &udpNode{by, addr, time.Now().Unix(), 0}
-							udptaskmap.Store(key, node)
-							udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-						}
+						//for _, to := range nextNode {
+						//	key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
+						//	by, _ := json.Marshal(map[string]interface{}{
+						//		"gtid":  source.id,
+						//		"lteid": source.id,
+						//		"stype": "biddingall",
+						//		"key":   key,
+						//	})
+						//	addr := &net.UDPAddr{
+						//		IP:   net.ParseIP(to["addr"].(string)),
+						//		Port: util.IntAll(to["port"]),
+						//	}
+						//	node := &udpNode{by, addr, time.Now().Unix(), 0}
+						//	udptaskmap.Store(key, node)
+						//	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+						//}
 					}
 				}
 			}
@@ -422,8 +421,6 @@ func timedTaskOnce() {
 			continue
 		}
 
-
-
 		//取-符合-发布时间X年内的数据
 		if util.IntAll(tmp["dataging"]) == 1 {
 			pubtime := util.Int64All(tmp["publishtime"])