Jianghan 2 年之前
父節點
當前提交
e88a75d2b0

二進制
data_fusion/bin/data_fusion


二進制
data_fusion/bin/data_fusion_linux


+ 1 - 2
data_fusion/common.toml

@@ -1,7 +1,7 @@
 [serve]
 thread = 10
 fields = ["agency", "agencytel", "agencyperson", "bidendtime", "budget", "bidamount", "buyer", "buyerclass", "buyerperson", "buyertel", "buyeraddr", "bidway",
-    "district", "projectcode", "projectscope", "s_winner", "winnertel", "winerperson", "publishtime", "signaturedate", "docstarttime", "package", "attachments",
+    "district", "projectcode", "projectscope", "s_winner", "winnertel", "winerperson", "publishtime", "signaturedate", "docstarttime", "attachments",
     "project_timeunit", "project_completedate", "project_duration", "project_startdate", "winnerorder", "purchasinglist", "qualifies", "contractcode", "bidopentime",
     "topscopeclass", "subscopeclass", "toptype", "subtype"]
 
@@ -26,7 +26,6 @@ bidopentime = 1
 contractcode = 3
 qualifies = 1
 attachments = 2
-package = 1
 purchasinglist = 1
 winnerorder = 1
 

+ 152 - 68
data_fusion/main.go

@@ -14,6 +14,10 @@ import (
 	"time"
 )
 
+var (
+	ArrLock []sync.Mutex
+)
+
 func init() {
 	config.Init("./common.toml")
 	InitLog()
@@ -26,6 +30,10 @@ func init() {
 	updateSp = make(chan bool, 5)
 	recordPool = make(chan []map[string]interface{}, 5000)
 	recordSp = make(chan bool, 5)
+
+	for i := 0; i < config.Conf.Serve.Thread; i++ {
+		ArrLock = append(ArrLock, sync.Mutex{})
+	}
 }
 
 func main() {
@@ -38,39 +46,47 @@ func main() {
 	ch := make(chan bool, config.Conf.Serve.Thread)
 	wg := &sync.WaitGroup{}
 
-	q := map[string]interface{}{"_id": mongodb.StringTOBsonId("639751bb063a7b816e026aa1")}
-	it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding_fusion").Find(q).Select(nil).Iter()
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("635ff15d631ff1ac3d095c41")}
+	//f := map[string]interface{}{"contenthtml": 0}
+	it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding_fusion").Find(nil).Select(nil).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
-		if count%2000 == 0 {
+		if count%20000 == 0 {
 			log.Info("main", zap.Int("current:", count))
 		}
-		if repeat := util.IntAll(tmp["repeat"]); repeat != 1 {
+		if repeat := util.IntAll(tmp["extracttype"]); repeat != -1 {
 			continue
 		}
 		ch <- true
 		wg.Add(1)
+
+		rid := util.ObjToString(tmp["repeat_id"])
+		ArrLock[util.HashCode(rid)%config.Conf.Serve.Thread].Lock()
+
 		go func(tmp map[string]interface{}) {
-			defer func() {
+			defer func(rid string) {
 				<-ch
 				wg.Done()
-			}()
+
+				ArrLock[util.HashCode(rid)%config.Conf.Serve.Thread].Unlock()
+			}(rid)
 			repeatId := util.ObjToString(tmp["repeat_id"])
 			if str := redis.GetStr("fusion_id", repeatId); str != "" {
 				mid := strings.Split(str, "-")[0]
-				tmp1, _ := MgoB.FindById("bidding_fusion", mid, nil)
+				tmp1 := findData(mid)
 				w, s := getWeight(tmp)
-				w1, s1 := getWeight(*tmp1)
-				util.Debug(w, s, w1, s1)
+				w1, _ := getWeight(tmp1)
 				var update map[string]interface{}
+				var fs []string
 				if w > w1 {
-					update = mergeTmp(tmp, *tmp1)
-					//if len(update) > 0 {
-					//	updatePool <- []map[string]interface{}{
-					//		{"_id": tmp["_id"]},
-					//		{"$set": update},
-					//	}
-					//}
+					update, fs = mergeTmp(tmp, tmp1)
+					if len(update) > 0 {
+						updatePool <- []map[string]interface{}{
+							{"_id": tmp["_id"]},
+							{"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
+							//{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
+						}
+					}
 					record := make(map[string]interface{})
 					record["$set"] = map[string]interface{}{
 						"template_id":     mongodb.BsonIdToSId(tmp["_id"]),
@@ -92,13 +108,14 @@ func main() {
 					}
 					redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), str))
 				} else {
-					update = mergeTmp(*tmp1, tmp)
-					//if len(update) > 0 {
-					//	updatePool <- []map[string]interface{}{
-					//		{"_id": (*tmp1)["_id"]},
-					//		{"$set": update},
-					//	}
-					//}
+					update, fs = mergeTmp(tmp1, tmp)
+					if len(update) > 0 {
+						updatePool <- []map[string]interface{}{
+							{"_id": (tmp1)["_id"]},
+							{"$set": update, "$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
+							//{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
+						}
+					}
 					record := make(map[string]interface{})
 					record["$set"] = map[string]interface{}{
 						"template_weight": w1,
@@ -117,30 +134,35 @@ func main() {
 						{"_id": mongodb.StringTOBsonId(repeatId)},
 						record,
 					}
-					redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"])))
+					redis.PutCKV("fusion_id", mid, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"])))
 				}
 			} else {
-				tmp1, _ := MgoB.FindById("bidding_fusion", repeatId, nil)
+				tmp1 := findData(repeatId)
 				w, s := getWeight(tmp)
-				w1, s1 := getWeight(*tmp1)
+				w1, s1 := getWeight(tmp1)
 				var update map[string]interface{}
+				var fs []string
 				if w > w1 {
-					update = mergeTmp(tmp, *tmp1)
-					//if len(update) > 0 {
-					//	updatePool <- []map[string]interface{}{
-					//		{"_id": tmp["_id"]},
-					//		{"$set": update},
-					//	}
-					//}
+					update, fs = mergeTmp(tmp, tmp1)
+					if len(update) > 0 {
+						set := util.DeepCopy(update).(map[string]interface{})
+						set["fusion_fields"] = fs
+						updatePool <- []map[string]interface{}{
+							{"_id": tmp["_id"]},
+							{"$set": set},
+							//{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
+							//{"$set": map[string]interface{}{"fusion_fields": fs}},
+						}
+					}
 					record := make(map[string]interface{})
-					record["_id"] = (*tmp1)["_id"]
+					record["_id"] = tmp1["_id"]
 					record["template_id"] = mongodb.BsonIdToSId(tmp["_id"])
 					record["template_weight"] = w
 					record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(repeatId)}
 					var recordlist []map[string]interface{}
 					recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "weight": w})
 					update1 := util.DeepCopy(update).(map[string]interface{})
-					update1["infoid"] = mongodb.BsonIdToSId((*tmp1)["_id"])
+					update1["infoid"] = mongodb.BsonIdToSId(tmp1["_id"])
 					update1["weight"] = w1
 					if w1 == 0 {
 						update1["remark"] = s1
@@ -148,25 +170,29 @@ func main() {
 					recordlist = append(recordlist, update1)
 					record["record"] = recordlist
 					recordPool <- []map[string]interface{}{
-						{"_id": (*tmp1)["_id"]},
+						{"_id": tmp1["_id"]},
 						{"$set": record},
 					}
-					redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"])))
+					redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])))
 				} else {
-					update = mergeTmp(*tmp1, tmp)
-					//if len(update) > 0 {
-					//	updatePool <- []map[string]interface{}{
-					//		{"_id": (*tmp1)["_id"]},
-					//		{"$set": update},
-					//	}
-					//}
+					update, fs = mergeTmp(tmp1, tmp)
+					if len(update) > 0 {
+						set := util.DeepCopy(update).(map[string]interface{})
+						set["fusion_fields"] = fs
+						updatePool <- []map[string]interface{}{
+							{"_id": tmp1["_id"]},
+							{"$set": set},
+							//{"$addToSet": map[string]interface{}{"fusion_fields": map[string]interface{}{"$each": fs}}},
+							//{"$set": map[string]interface{}{"fusion_fields": fs}},
+						}
+					}
 					record := make(map[string]interface{})
-					record["_id"] = (*tmp1)["_id"]
-					record["template_id"] = mongodb.BsonIdToSId((*tmp1)["_id"])
+					record["_id"] = tmp1["_id"]
+					record["template_id"] = mongodb.BsonIdToSId(tmp1["_id"])
 					record["template_weight"] = w1
-					record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"])}
+					record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(tmp1["_id"])}
 					var recordlist []map[string]interface{}
-					recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId((*tmp1)["_id"]), "weight": w1})
+					recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp1["_id"]), "weight": w1})
 					update1 := util.DeepCopy(update).(map[string]interface{})
 					update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
 					update1["weight"] = w
@@ -176,21 +202,35 @@ func main() {
 					recordlist = append(recordlist, update1)
 					record["record"] = recordlist
 					recordPool <- []map[string]interface{}{
-						{"_id": (*tmp1)["_id"]},
+						{"_id": tmp1["_id"]},
 						{"$set": record},
 					}
-					redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId((*tmp1)["_id"]), mongodb.BsonIdToSId(tmp["_id"])))
+					redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp1["_id"]), mongodb.BsonIdToSId(tmp["_id"])))
 				}
 			}
-
 		}(tmp)
 		tmp = map[string]interface{}{}
 	}
+	wg.Wait()
 
+	log.Info("fusion over...", zap.Int("count:", count))
 	c := make(chan bool, 1)
 	<-c
 }
 
+func findData(id string) map[string]interface{} {
+	tmp, _ := MgoB.FindById("bidding_fusion", id, nil)
+	if tmp != nil && len(*tmp) > 0 {
+		return *tmp
+	} else {
+		tmp, _ = MgoB.FindById("bidding", id, nil)
+		if tmp != nil && len(*tmp) > 0 {
+			return *tmp
+		}
+	}
+	return nil
+}
+
 func getWeight(tmp map[string]interface{}) (int, string) {
 	var w int
 	if util.IntAll(tmp["publishtime"]) <= 0 {
@@ -201,17 +241,30 @@ func getWeight(tmp map[string]interface{}) (int, string) {
 	}
 	for k, v := range config.Conf.Serve.Weight {
 		if tmp[k] != nil {
-			util.Debug(k)
-			if reflect.TypeOf(tmp[k]).String() == "string" {
-				if util.ObjToString(tmp[k]) != "" {
-					w += v
+			if k == "attachments" {
+				if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
+					if atts, o2 := pinfo["attachments"].(map[string]interface{}); o2 {
+						if len(atts) > 0 {
+							w += v
+						}
+					}
 				}
-			} else if reflect.TypeOf(tmp[k]).String() == "float64" {
-				if util.Float64All(tmp[k]) > 0 {
+			} else {
+				if reflect.TypeOf(tmp[k]).String() == "string" {
+					if util.ObjToString(tmp[k]) != "" {
+						w += v
+					}
+				} else if reflect.TypeOf(tmp[k]).String() == "float64" {
+					if util.Float64All(tmp[k]) > 0 {
+						w += v
+					}
+				} else if reflect.TypeOf(tmp[k]).String() == "[]interface {}" {
+					if len(tmp[k].([]interface{})) > 0 {
+						w += v
+					}
+				} else {
 					w += v
 				}
-			} else {
-				w += v
 			}
 		}
 	}
@@ -220,20 +273,51 @@ func getWeight(tmp map[string]interface{}) (int, string) {
 
 // @Description tmp模版数据, tmp1补充数据
 // @Author J 2023/1/3 11:31
-func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) map[string]interface{} {
+func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) (map[string]interface{}, []string) {
 	update := make(map[string]interface{})
+	var fs []string
 	for _, v := range config.Conf.Serve.Fields {
-		if tmp[v] == nil && tmp1[v] != nil {
-			if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" {
-				update[v] = util.ObjToString(tmp1[v])
-			} else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 {
-				update[v] = tmp1[v]
-			} else {
-				update[v] = tmp1[v]
+		if v == "attachments" {
+			if pinfo1, ok1 := tmp1["projectinfo"].(map[string]interface{}); ok1 {
+				if ats, ok2 := pinfo1[v].(map[string]interface{}); ok2 {
+					if pinfo1[v] != nil && len(ats) > 0 {
+						if pinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
+							if pinfo[v] == nil {
+								pinfo[v] = pinfo1[v]
+								update["projectinfo"] = pinfo
+								update["attach_text"] = tmp1["attach_text"] // 补充附件文本
+								fs = append(fs, v)
+								fs = append(fs, "attach_text")
+							}
+						} else {
+							update["projectinfo"] = map[string]interface{}{v: pinfo1[v]}
+							update["attach_text"] = tmp1["attach_text"]
+							fs = append(fs, v)
+							fs = append(fs, "attach_text")
+						}
+					}
+				} else {
+					if pinfo1[v] != nil {
+						log.Error("mergeTmp err...", zap.Any("id", mongodb.BsonIdToSId(tmp1["_id"])))
+					}
+				}
+			}
+		} else {
+			if tmp[v] == nil && tmp1[v] != nil {
+				if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" {
+					update[v] = util.ObjToString(tmp1[v])
+					fs = append(fs, v)
+				} else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 {
+					update[v] = tmp1[v]
+					fs = append(fs, v)
+				} else {
+					update[v] = tmp1[v]
+					fs = append(fs, v)
+				}
 			}
 		}
 	}
-	return update
+	return update, fs
 }
 
 func updateMethod() {

+ 22 - 16
data_tidb/bidding.go

@@ -4,7 +4,6 @@ import (
 	util "app.yhyue.com/data_processing/common_utils"
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
-	"app.yhyue.com/data_processing/common_utils/redis"
 	"data_tidb/config"
 	"fmt"
 	"github.com/shopspring/decimal"
@@ -74,9 +73,8 @@ func taskB() {
 	ch := make(chan bool, 10)
 	wg := &sync.WaitGroup{}
 
-	q := map[string]interface{}{"_id": mongodb.StringTOBsonId("634eac71911e1eb345b2d861")}
-	//q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("6347dc8a911e1eb345aa7fcf"),
-	//	"$lte": mongodb.StringTOBsonId("634e93d6631ff1ac3de1cf18")}}
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("634eac71911e1eb345b2d861")}
+	q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("632d42d667a6b0a2861eef92")}}
 	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
@@ -94,13 +92,13 @@ func taskB() {
 			if util.IntAll(tmp["extracttype"]) != -1 {
 				taskBase(tmp)
 				taskTags(tmp)
-				//taskExpand(tmp)
-				//taskAtts(tmp)
-				//taskInfoformat(tmp)
-				//taskIntent(tmp)
-				//taskWinner(tmp)
+				taskExpand(tmp)
+				taskAtts(tmp)
+				taskInfoformat(tmp)
+				taskIntent(tmp)
+				taskWinner(tmp)
 				//taskPackage(tmp)
-				//taskPur(tmp)
+				taskPur(tmp)
 			}
 
 		}(tmp)
@@ -159,7 +157,6 @@ func taskBase(tmp map[string]interface{}) {
 			}
 		} else if f == "buyer_id" {
 			if b := util.ObjToString(tmp["buyer"]); b != "" {
-				util.Debug(b)
 				saveM["buyer"] = b
 				//if code := redis.GetStr("qyxy_id", b); code != "" {
 				if code := getNameId(b); code != "" {
@@ -193,7 +190,6 @@ func taskBase(tmp map[string]interface{}) {
 			}
 		}
 	}
-	util.Debug(saveM)
 	saveBasePool <- saveM
 	if len(errf) > 0 {
 		saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")}
@@ -217,7 +213,8 @@ func taskExpand(tmp map[string]interface{}) {
 	for _, f := range ExpandField {
 		if f == "infoid" {
 			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
-		} else if f == "project_startdate" || f == "project_completedate" || f == "signaturedate" || f == "bidendtime" || f == "bidstarttime" {
+		} else if f == "project_startdate" || f == "project_completedate" || f == "signstarttime" || f == "bidendtime" || f == "bidstarttime" || f == "docstarttime" ||
+			f == "docendtime" || f == "signaturedate" || f == "signendtime" {
 			if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
 				t := util.Int64All(tmp[f])
 				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
@@ -418,7 +415,10 @@ func taskIntent(tmp map[string]interface{}) {
 					saveM[f] = time.Now().Format(util.Date_Full_Layout)
 				} else if f == "buyer_id" {
 					if b := util.ObjToString(tmp["buyer"]); b != "" {
-						if code := redis.GetStr("qyxy_id", b); code != "" {
+						//if code := redis.GetStr("qyxy_id", b); code != "" {
+						//	saveM[f] = code
+						//}
+						if code := getNameId(b); code != "" {
 							saveM[f] = code
 						}
 					}
@@ -451,7 +451,10 @@ func taskWinner(tmp map[string]interface{}) {
 					} else if f == "winner_id" {
 						if b := util.ObjToString(w1["entname"]); b != "" {
 							saveM["winner"] = b
-							if code := redis.GetStr("qyxy_id", b); code != "" {
+							//if code := redis.GetStr("qyxy_id", b); code != "" {
+							//	saveM[f] = code
+							//}
+							if code := getNameId(b); code != "" {
 								saveM[f] = code
 							}
 						}
@@ -480,7 +483,10 @@ func taskWinner(tmp map[string]interface{}) {
 			} else if f == "winner_id" {
 				if s != "" {
 					saveM["winner"] = s
-					if code := redis.GetStr("qyxy_id", s); code != "" {
+					//if code := redis.GetStr("qyxy_id", s); code != "" {
+					//	saveM[f] = code
+					//}
+					if code := getNameId(s); code != "" {
 						saveM[f] = code
 					}
 				}

二進制
data_tidb/data_tidb


+ 7 - 7
data_tidb/main.go

@@ -267,13 +267,13 @@ func bidding() *cobra.Command {
 
 			go SaveFunc()
 			go SaveTagFunc()
-			//go SaveExpandFunc()
-			//go SaveAttrFunc()
-			//go SaveImfFunc()
-			//go SaveIntentFunc()
-			//go SaveWinnerFunc()
-			//go SavePackageFunc()
-			//go SavePurFunc()
+			go SaveExpandFunc()
+			go SaveAttrFunc()
+			go SaveImfFunc()
+			go SaveIntentFunc()
+			go SaveWinnerFunc()
+			go SavePackageFunc()
+			go SavePurFunc()
 
 			go saveErrMethod()
 			taskB()

二進制
field_py/field_dispose_1786_linux


二進制
field_py/field_dispose_log


+ 1 - 1
field_py/go.mod

@@ -6,7 +6,7 @@ require (
 	app.yhyue.com/BP/servicerd v0.0.0-20201203055056-87643512f867
 	app.yhyue.com/data_processing/common_utils v0.0.0-20220830011833-76d58ef43f4f
 	github.com/BurntSushi/toml v1.2.0
-	go.mongodb.org/mongo-driver v1.11.0 // indirect
+	go.mongodb.org/mongo-driver v1.11.0
 	go.uber.org/zap v1.22.0
 	google.golang.org/grpc v1.49.0
 	google.golang.org/protobuf v1.27.1

+ 1 - 1
field_py/go.sum

@@ -37,6 +37,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
 github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
 github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
@@ -112,7 +113,6 @@ github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgk
 github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
 github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
-go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4=
 go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
 go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
 go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=

+ 23 - 12
field_py/main.go

@@ -31,6 +31,8 @@ var (
 
 	UdpChan      = make(chan map[string]interface{}, 500)
 	SingleThread = make(chan bool, 1)
+
+	Skipping = false //rpc重试跳过
 )
 
 func init() {
@@ -101,19 +103,28 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		var mapInfo map[string]interface{}
 		err := json.Unmarshal(data, &mapInfo)
 		log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
-		gtid, _ := mapInfo["gtid"].(string)
-		lteid, _ := mapInfo["lteid"].(string)
-		if err != nil || gtid == "" || lteid == "" {
-			UdpClient.WriteUdp([]byte("cgyx udp error"), udp.OP_NOOP, ra) //udp失败回写
+		if err != nil {
+			UdpClient.WriteUdp([]byte("error: "+err.Error()), udp.OP_NOOP, ra)
 		} else {
-			//udp成功回写
-			if k := util.ObjToString(mapInfo["key"]); k != "" {
-				go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
-			} else {
-				k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
-				go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
+			stype := util.ObjToString(mapInfo["stype"])
+			switch stype {
+			case "jqcl":
+				gtid, _ := mapInfo["gtid"].(string)
+				lteid, _ := mapInfo["lteid"].(string)
+				//udp成功回写
+				if k := util.ObjToString(mapInfo["key"]); k != "" {
+					go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
+				} else {
+					k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
+					go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
+				}
+				UdpChan <- mapInfo
+			case "tout-true":
+				Skipping = true
+			case "tout-false":
+				Skipping = false
 			}
-			UdpChan <- mapInfo
+
 		}
 	case udp.OP_NOOP: //下个节点回应
 		ok := string(data)
@@ -150,7 +161,7 @@ func checkMapJob() {
 					node.retry++
 					if node.retry > 5 {
 						UdpTaskMap.Delete(k)
-						res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-py-dispose-send-fail", k.(string)))
+						res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "python字段识别-send-fail", k.(string)))
 						if err == nil {
 							defer res.Body.Close()
 							read, err := ioutil.ReadAll(res.Body)

+ 73 - 38
field_py/task.go

@@ -76,7 +76,7 @@ func getIntention(mapinfo map[string]interface{}) {
 				}
 			}
 			if result1 != nil && len(result1) > 0 {
-				if result1["purchasinglist"] != nil {
+				if result1["purchasinglist"] != nil && len(result1["purchasinglist"].([]interface{})) > 0 {
 					update["purchasinglist"] = result1["purchasinglist"]
 				}
 				if result1["procurementlist"] != nil {
@@ -102,7 +102,7 @@ func getIntention(mapinfo map[string]interface{}) {
 	NextNode(mapinfo)
 }
 
-// @Description procurementlist
+// @Description procurementlist、purchasinglist
 // @Author J 2022/8/31 15:28
 func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
 	id := mongodb.BsonIdToSId(tmp["_id"])
@@ -112,7 +112,7 @@ func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{
 	if err != nil {
 		ErrorInfoCache <- map[string]interface{}{
 			"err":        "Json Marshal Error",
-			"type":       "采购意向",
+			"type":       "采购意向/标的物",
 			"id":         id,
 			"comeintime": time.Now().Unix(),
 			"ok":         false,
@@ -126,7 +126,7 @@ func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{
 	if err != nil { //保存处理异常信息
 		ErrorInfoCache <- map[string]interface{}{
 			"err":        err.Error(),
-			"type":       "采购意向",
+			"type":       "采购意向/标的物",
 			"id":         id,
 			"comeintime": time.Now().Unix(),
 			"ok":         false,
@@ -156,7 +156,7 @@ func taskB(tmp map[string]interface{}, gtid, lteid string) map[string]interface{
 	}
 	//处理数据
 	result, err := rpcGetFieldR(string(reqStr), id)
-	if err != nil { //保存处理异常信息
+	if err != nil {
 		ErrorInfoCache <- map[string]interface{}{
 			"err":        err.Error(),
 			"type":       "评标专家字段",
@@ -184,16 +184,30 @@ func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
 	ip := ""
 	port := -1
 	//重试获取ip、port
-	for i := 1; i <= 3; i++ {
-		repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
-		if err != nil {
-			continue
-		} else {
-			ip = repl.Ip
-			port = int(repl.Port)
-			break
+	if Skipping {
+		for i := 1; i <= 3; i++ {
+			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
+			if err != nil {
+				continue
+			} else {
+				ip = repl.Ip
+				port = int(repl.Port)
+				break
+			}
+		}
+	} else {
+		for {
+			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
+			if err != nil {
+				continue
+			} else {
+				ip = repl.Ip
+				port = int(repl.Port)
+				break
+			}
 		}
 	}
+
 	if ip == "" || port == -1 { //重试三次,回去ip、port失败
 		atomic.AddInt64(&IpGetErrNum, 1) //异常次数+1
 		return nil, errors.New("Get Ip Error")
@@ -201,7 +215,6 @@ func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
 	atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
 	//处理数据
 	addr := ip + ":" + fmt.Sprint(port)
-	start := time.Now()
 	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 	if err != nil {
 		atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
@@ -215,23 +228,37 @@ func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
 	}
 	ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*2)
 	defer cancel()
+	//go func(ctx context.Context) {
+	//	select {
+	//	case <-ctx.Done():
+	//		return
+	//	case <-time.After(2 * time.Minute):
+	//		// 超时处理
+	//		//log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "goods_service"), zap.String("id", id), zap.Any("ip+port", addr))
+	//		ErrorInfoCache <- map[string]interface{}{
+	//			"err":        "接口处理超过2min",
+	//			"type":       "采购意向/标的物",
+	//			"id":         id,
+	//			"comeintime": time.Now().Unix(),
+	//			"ok":         false,
+	//			"gtid":       gtid,
+	//			"letid":      lteid,
+	//		}
+	//		return
+	//	}
+	//}(ctx)
 	resp, err := client.GoodsExtract(ctx, req)
 	if err != nil {
-		return nil, errors.New("Deal Data Error")
+		_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
+		return nil, err
 	}
 	result := map[string]interface{}{}
 	if json.Unmarshal([]byte(resp.Goods), &result) != nil {
+		_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
 		return nil, errors.New("Json Unmarshal Error")
 	}
 	// 服务中心释放服务
 	_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
-	if err != nil {
-		return nil, err
-	}
-	if time.Since(start).Minutes() > 2 {
-		// py接口字段识别超过5分钟
-		log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "goods_service"), zap.String("id", id), zap.Any("ip+port", addr))
-	}
 	return result, nil
 }
 
@@ -249,14 +276,27 @@ func rpcGetFieldR(reqStr, id string) (map[string]interface{}, error) {
 	ip := ""
 	port := -1
 	//重试获取ip、port
-	for i := 1; i <= 3; i++ {
-		repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
-		if err != nil {
-			continue
-		} else {
-			ip = repl.Ip
-			port = int(repl.Port)
-			break
+	if Skipping {
+		for i := 1; i <= 3; i++ {
+			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
+			if err != nil {
+				continue
+			} else {
+				ip = repl.Ip
+				port = int(repl.Port)
+				break
+			}
+		}
+	} else {
+		for {
+			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "extract_expert_service", Balance: 0})
+			if err != nil {
+				continue
+			} else {
+				ip = repl.Ip
+				port = int(repl.Port)
+				break
+			}
 		}
 	}
 	if ip == "" || port == -1 { //重试三次,回去ip、port失败
@@ -266,7 +306,6 @@ func rpcGetFieldR(reqStr, id string) (map[string]interface{}, error) {
 	atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
 	//处理数据
 	addr := ip + ":" + fmt.Sprint(port)
-	start := time.Now()
 	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 	if err != nil {
 		atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
@@ -282,16 +321,12 @@ func rpcGetFieldR(reqStr, id string) (map[string]interface{}, error) {
 	defer cancel()
 	resp, err := client.Extract(ctx, req)
 	if err != nil {
-		return nil, errors.New("Deal Data Error")
+		return nil, err
 	}
 	result := make(map[string]interface{})
 	if json.Unmarshal([]byte(resp.Results), &result) != nil {
 		return nil, errors.New("Json Unmarshal Error")
 	}
-	if time.Since(start).Minutes() > 2 {
-		// py接口字段识别超过5分钟
-		log.Info("rpcGetFieldR 字段识别超过2min", zap.Any("serve", "extract_expert_service"), zap.Any("id", id), zap.Any("ip+port", addr))
-	}
 	return result, nil
 }
 
@@ -363,7 +398,7 @@ func SaveErrorInfo() {
 					defer func() {
 						<-SP
 					}()
-					MgoB.SaveBulk("bidding_warn", savearr...)
+					MgoB.SaveBulk("bidding_warn_1", savearr...)
 				}(savearr)
 				savearr = make([]map[string]interface{}, 200)
 				indexh = 0
@@ -375,7 +410,7 @@ func SaveErrorInfo() {
 					defer func() {
 						<-SP
 					}()
-					MgoB.SaveBulk("bidding_warn", savearr...)
+					MgoB.SaveBulk("bidding_warn_1", savearr...)
 				}(savearr[:indexh])
 				savearr = make([]map[string]interface{}, 200)
 				indexh = 0

+ 4 - 5
field_sync/common.toml

@@ -8,7 +8,8 @@ fields = ["buyerzipcode", "winnertel", "winnerperson", "contractcode", "winnerad
     "project_duration", "project_timeunit", "project_startdate", "project_completedate", "payway", "contract_guarantee", "bid_guarantee", "qualifies",
     "funds", "review_experts", "bidmethod", "bidendtime", "bidopenaddress", "docamount", "bidway", "agencyrate", "agencyfee", "getdocmethod", "purchasing_tag",
     "package", "history_updatetime", "total_investment", "owner", "projecttype", "project_person", "project_phone", "approvedept", "construction_area",
-    "floor_area"
+    "floor_area", "bidstarttime", "docendtime", "docstarttime", "signendtime", "signstarttime", "issue_quota", "bidopen_shape", "quote_mode", "is_acquire_tender",
+    "is_payment_deposit", "is_joint_bidding", "est_purchase_time", "est_purchase_amount"
 ]
 
 [udp]
@@ -50,7 +51,6 @@ coll = "bidding"
 size = 5
 user = ""
 password = ""
-
 [db.mongoE]
 addr = "192.168.3.207:27092"
 dbname = "wjh"
@@ -59,15 +59,14 @@ coll = "extract"
 coll1 = ""
 user = ""
 password = ""
-
 [db.mongoQ]
 addr = "192.168.3.207:27092"
 dbname = "mixdata"
 size = 5
 coll = "qyxy_std"
+coll1 = "address_jy_2021"
 user = ""
 password = ""
-
 [db.mongoP]
 addr = "192.168.3.207:27092"
 dbname = "mixdata"
@@ -78,7 +77,7 @@ password = ""
 
 [db.redis]
 #company_id
-addr = "qyxy_id=192.168.3.207:8379"
+addr = "qyxy_id=192.168.3.207:1679"
 dbindex = 4
 
 [mail]

+ 3 - 3
field_sync/go.mod

@@ -3,11 +3,11 @@ module field_sync
 go 1.16
 
 require (
-	app.yhyue.com/data_processing/common_utils v0.0.0-20220830011833-76d58ef43f4f
+	app.yhyue.com/data_processing/common_utils v0.0.0-20221205033056-885644941005
 	github.com/BurntSushi/toml v1.2.0
 	github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
-	go.mongodb.org/mongo-driver v1.11.0 // indirect
+	go.mongodb.org/mongo-driver v1.11.0
 	go.uber.org/zap v1.22.0
 	golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
-	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
+	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
 )

+ 2 - 1
field_sync/go.sum

@@ -1,5 +1,7 @@
 app.yhyue.com/data_processing/common_utils v0.0.0-20220830011833-76d58ef43f4f h1:5fUbVRwPM3oBsZgvG76Bia3I4SdwdBB6PvJ6B28Qkyc=
 app.yhyue.com/data_processing/common_utils v0.0.0-20220830011833-76d58ef43f4f/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
+app.yhyue.com/data_processing/common_utils v0.0.0-20221205033056-885644941005 h1:AEEi+8ao9pTVqPIh6uVvjxBby/i43fFj7DwVo+feDAE=
+app.yhyue.com/data_processing/common_utils v0.0.0-20221205033056-885644941005/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
 github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
 github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
@@ -55,7 +57,6 @@ github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgk
 github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
 github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
-go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4=
 go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
 go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
 go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=

+ 74 - 0
field_sync/init.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	util "app.yhyue.com/data_processing/common_utils"
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"field_sync/config"
@@ -65,3 +66,76 @@ func InitMgo() {
 	}
 	MgoP.InitPool()
 }
+
+type Province struct {
+	P_Name string
+}
+type City struct {
+	P_Name string
+	C_Name string
+}
+type District struct {
+	P_Name string
+	C_Name string
+	D_Name string
+}
+
+//初始化城市
+func initCheckCity() {
+	//初始化-城市配置
+	ProvinceDict = make(map[string][]Province, 0)
+	CityDict = make(map[string][]City, 0)
+	DistrictDict = make(map[string][]District, 0)
+
+	q := map[string]interface{}{
+		"town_code": map[string]interface{}{
+			"$exists": 0,
+		},
+	}
+	sess := MgoQ.GetMgoConn()
+	defer MgoQ.DestoryMongoConn(sess)
+	it := sess.DB(MgoQ.DbName).C(config.Conf.DB.MongoQ.Coll1).Find(&q).Iter()
+	total := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		district_code := util.IntAll(tmp["district_code"])
+		city_code := util.IntAll(tmp["city_code"])
+		if district_code > 0 {
+			province := util.ObjToString(tmp["province"])
+			city := util.ObjToString(tmp["city"])
+			district := util.ObjToString(tmp["district"])
+			data := District{province, city, district}
+			if DistrictDict[district] == nil {
+				DistrictDict[district] = []District{data}
+			} else {
+				arr := DistrictDict[district]
+				arr = append(arr, data)
+				DistrictDict[district] = arr
+			}
+		} else {
+			if city_code > 0 {
+				province := util.ObjToString(tmp["province"])
+				city := util.ObjToString(tmp["city"])
+				data := City{province, city}
+				if CityDict[city] == nil {
+					CityDict[city] = []City{data}
+				} else {
+					arr := CityDict[city]
+					arr = append(arr, data)
+					CityDict[city] = arr
+				}
+			} else {
+				province := util.ObjToString(tmp["province"])
+				data := Province{province}
+				if ProvinceDict[province] == nil {
+					ProvinceDict[province] = []Province{data}
+				} else {
+					arr := ProvinceDict[province]
+					arr = append(arr, data)
+					ProvinceDict[province] = arr
+				}
+			}
+		}
+		tmp = make(map[string]interface{})
+	}
+	util.Debug(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict)))
+}

+ 14 - 6
field_sync/main.go

@@ -5,7 +5,6 @@ import (
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	gonsq "app.yhyue.com/data_processing/common_utils/nsq"
-	"app.yhyue.com/data_processing/common_utils/redis"
 	"app.yhyue.com/data_processing/common_utils/udp"
 	"encoding/json"
 	"field_sync/config"
@@ -45,17 +44,18 @@ func init() {
 	InitFileInfo()
 	InitLog()
 	InitMgo()
-	inits()
-	redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex)
+	//inits()
+	//redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex)
 
+	initCheckCity()
 	log.Info("init success")
 }
 
 func main() {
-	go checkMapJob()
-	go nsqMethod()
+	//go checkMapJob()
+	//go nsqMethod()
 
-	//go UpdateBidding()
+	go UpdateBidding()
 	//go UpdateExtract()
 
 	UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
@@ -101,6 +101,14 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					biddingTask(data, mapInfo)
 				}()
+			case "bidding_all": //id段存量数据
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					biddingAllTask(data, mapInfo)
+				}()
 			default:
 				pool <- true
 				go func() {

+ 287 - 1
field_sync/task.go

@@ -20,7 +20,12 @@ import (
 )
 
 var (
-	regLetter = regexp.MustCompile("[a-z]*")
+	regLetter  = regexp.MustCompile("[a-z]*")
+	cityEndReg = regexp.MustCompile("(区|县|市)$")
+
+	ProvinceDict map[string][]Province //省份-map
+	CityDict     map[string][]City     //城市-map
+	DistrictDict map[string][]District //区县-map
 )
 
 func biddingTask(data []byte, mapInfo map[string]interface{}) {
@@ -98,6 +103,167 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	}
 }
 
+func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
+	defer util.Catch()
+	q, _ := mapInfo["query"].(map[string]interface{})
+	if q == nil {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
+				"$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
+			},
+		}
+	}
+	//extract库
+	extractConn := MgoE.GetMgoConn()
+	defer MgoE.DestoryMongoConn(extractConn)
+	extractResult := extractConn.DB(MgoE.DbName).C(config.Conf.DB.MongoE.Coll).Find(q).Select(map[string]interface{}{
+		"field_source": 0,
+		"kvtext":       0,
+	}).Sort("-_id").Iter()
+
+	//bidding库
+	biddingConn := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(biddingConn)
+	count := 0
+
+	var compare map[string]interface{}
+	result := biddingConn.DB(MgoB.DbName).C(config.Conf.DB.MongoB.Coll).Find(q).Select(map[string]interface{}{
+		"contenthtml":  0,
+		"field_source": 0,
+	}).Sort("-_id").Iter()
+	for tmp := make(map[string]interface{}); result.Next(tmp); count++ {
+		update := map[string]interface{}{}
+		del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
+		//对比方法----------------
+		for {
+			if compare == nil {
+				compare = make(map[string]interface{})
+				if !extractResult.Next(compare) {
+					break
+				}
+			}
+			if compare != nil {
+				cid := mongodb.BsonIdToSId(compare["_id"])
+				tid := mongodb.BsonIdToSId(tmp["_id"])
+				if cid == tid {
+					//更新bidding表;bidding表modifyinfo中的字段不更新
+					modifyinfo := make(map[string]bool)
+					if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil {
+						for k := range tmpmodifyinfo {
+							modifyinfo[k] = true
+						}
+					}
+					for _, k := range config.Conf.Serve.FieldS {
+						v1 := compare[k] //extract
+						v2 := tmp[k]     //bidding
+						if v2 == nil && v1 != nil {
+							update[k] = v1
+						} else if v2 != nil && v1 != nil && !modifyinfo[k] {
+							update[k] = v1
+						} else if v2 != nil && v1 == nil && !modifyinfo[k] {
+							if k == "s_subscopeclass" && del["subscopeclass"] == nil {
+								continue
+							} else if k == "s_topscopeclass" && del["topscopeclass"] == nil {
+								continue
+							}
+							del[k] = 1
+							//util.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
+						}
+
+					}
+					//if util.IntAll(compare["repeat"]) == 1 {
+					//	update["extracttype"] = -1
+					//	update["dataprocess"] = 7
+					//} else {
+					//	update["extracttype"] = 1
+					//	update["dataprocess"] = 8
+					//}
+					break
+				} else {
+					if cid < tid {
+						compare = nil
+						continue
+					} else {
+						break
+					}
+				}
+			} else {
+				break
+			}
+		}
+
+		//------------------对比结束
+		//处理分类
+		//if compare != nil { //extract
+		//	fieldFun(compare, update)
+		//	compare = nil
+		//}
+
+		// 城市标准化
+		if update["area"] != nil || update["city"] != nil || update["district"] != nil {
+			rdata := standardCheckCity(util.ObjToString(tmp["area"]), util.ObjToString(tmp["city"]), util.ObjToString(tmp["district"]))
+			if len(rdata) > 0 {
+				for k, v := range rdata {
+					if v != "" {
+						delete(update, v)
+						del[v] = 1
+					} else {
+						delete(del, k)
+						update[k] = v
+					}
+				}
+			}
+		}
+
+		// entidlist
+		extractMap := make(map[string]interface{})
+		if update["s_winner"] != "" {
+			cid := companyFun(update)
+			if len(cid) > 0 {
+				update["entidlist"] = cid
+				extractMap["entidlist"] = cid
+			}
+		}
+
+		//if len(extractMap) > 0 {
+		//	updateExtPool <- []map[string]interface{}{
+		//		{"_id": tmp["_id"]},
+		//		{"$set": extractMap},
+		//	}
+		//}
+		// 附件有效字段
+		//if i := validFile(tmp); i != 0 {
+		//	if i == -1 {
+		//		update["isValidFile"] = false
+		//	} else {
+		//		update["isValidFile"] = true
+		//	}
+		//}
+		if len(update) > 0 {
+			if len(del) > 0 { //删除的字段
+				updateBidPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update, "$unset": del},
+				}
+			} else {
+				updateBidPool <- []map[string]interface{}{{
+					"_id": tmp["_id"],
+				},
+					{"$set": update},
+				}
+			}
+		}
+		if count%50000 == 0 {
+			log.Info("biddingTask", zap.Int("current", count))
+		}
+		tmp = make(map[string]interface{})
+	}
+
+	log.Info("biddingAll sync...over", zap.Int("all", count))
+}
+
 func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey, stype string) int {
 	syncNo := 0 //抽取表数据同步数量
 	//对比两张表数据,减少查询次数
@@ -521,3 +687,123 @@ func taskinfo(id string) {
 	log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
 	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
 }
+
+//城市标准校验
+func standardCheckCity(area string, city string, district string) map[string]string {
+	rdata := make(map[string]string)
+	if area == "香港" || area == "澳门" || area == "台湾" || (area == "全国" && (city == "" && district == "")) {
+		return rdata
+	}
+	//第一步:区校验
+	if district != "" {
+		districtArr := DistrictDict[district]
+		if districtArr == nil { //涉及了 个别别名相关的数据
+			trim_arr := aliasDataDistrict(district) //拆分后缀
+			if len(trim_arr) > 0 {
+				for _, alias_district := range trim_arr {
+					alias_districtArr := DistrictDict[alias_district]
+					for _, v := range alias_districtArr {
+						if city == v.C_Name && area == v.P_Name {
+							rdata["district"] = alias_district
+							return rdata
+						}
+					}
+				}
+			}
+			rdata["district"] = ""
+		} else {
+			isTrue := false
+			for _, v := range districtArr {
+				if city == v.C_Name && area == v.P_Name {
+					isTrue = true
+					break
+				}
+			}
+			if isTrue { //完全匹配
+				return rdata
+			} else { //未完全匹配
+				if len(districtArr) == 1 {
+					rdata["area"] = districtArr[0].P_Name
+					rdata["city"] = districtArr[0].C_Name
+					rdata["district"] = districtArr[0].D_Name
+					return rdata
+				} else {
+					rdata["district"] = ""
+				}
+			}
+		}
+	}
+
+	//第二步:区校验-失败   市-校验
+	if city != "" {
+		cityArr := CityDict[city]
+		if cityArr == nil {
+			//把市当成区,匹配三级   - 存在优化空间- city:郑州  别名
+			districtArr := DistrictDict[city]
+			for _, v := range districtArr {
+				if city == v.C_Name && area == v.P_Name {
+					rdata["area"] = districtArr[0].P_Name
+					rdata["city"] = districtArr[0].C_Name
+					rdata["district"] = districtArr[0].D_Name
+					return rdata
+				}
+			}
+			rdata["city"] = ""
+		} else {
+			isTrue := false
+			for _, v := range cityArr {
+				if area == v.P_Name {
+					isTrue = true
+					break
+				}
+			}
+			if isTrue { //完全匹配
+				return rdata
+			} else { //未完全匹配
+				if len(cityArr) == 1 {
+					rdata["area"] = cityArr[0].P_Name
+					rdata["city"] = cityArr[0].C_Name
+					rdata["district"] = ""
+					return rdata
+				} else {
+					rdata["city"] = ""
+				}
+			}
+		}
+	}
+
+	//第三步:省份校验
+	if ProvinceDict[area] == nil {
+		rdata["area"] = "全国"
+		rdata["city"] = ""
+		rdata["district"] = ""
+	}
+
+	return rdata
+}
+
+//拆分三级县
+func aliasDataDistrict(district string) []string {
+	arr := []string{}
+	if cityEndReg.MatchString(district) {
+		str := cityEndReg.FindString(district)
+		strings.TrimRight(district, str)
+		if str == "县" {
+			arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
+			arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
+		} else if str == "区" {
+			arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
+			arr = append(arr, fmt.Sprintf("%s市", strings.TrimRight(district, str)))
+		} else if str == "市" {
+			arr = append(arr, fmt.Sprintf("%s县", strings.TrimRight(district, str)))
+			arr = append(arr, fmt.Sprintf("%s区", strings.TrimRight(district, str)))
+		} else {
+
+		}
+	} else { //未找到 district- 区县市  例: district : 金水
+		arr = append(arr, fmt.Sprintf("%s区", district))
+		arr = append(arr, fmt.Sprintf("%s县", district))
+		arr = append(arr, fmt.Sprintf("%s市", district))
+	}
+	return arr
+}

+ 2 - 0
monitor/common.toml

@@ -1,4 +1,6 @@
 [serve]
+fileWarn = 10000
+pyWarn = 50000
 
 [db]
 [db.mongoB]

+ 2 - 2
monitor/config/conf.go

@@ -31,8 +31,8 @@ type conf struct {
 }
 
 type serve struct {
-	GrpcAddr string
-	Thread   int
+	FileWarn int
+	PyWarn   int
 }
 
 type udp struct {

+ 3 - 3
monitor/config/conf_test.go

@@ -17,8 +17,8 @@ loglevel  = "info"
 format = "text"
 
 [serve]
-grpcAddr = "192.168.3.12:10021"
-udpPort = "1782"
+fileWarn = 10000
+pyWarn = 50000
 
 [db]
 [db.mongo]
@@ -48,6 +48,6 @@ func TestInit(t *testing.T) {
 	testfile := "/tmp/crocodile.toml"
 	ioutil.WriteFile(testfile, []byte(confs), 0644)
 	Init(testfile)
-	t.Logf("%+v", Conf.Serve.GrpcAddr)
+	t.Logf("%+v", Conf.Serve.PyWarn)
 	os.Remove(testfile)
 }

+ 1 - 2
monitor/go.mod

@@ -3,8 +3,7 @@ module monitor
 go 1.16
 
 require (
-	app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d
-	app.yhyue.com/data_processing/field_info_tag v1.1.1 // indirect
+	app.yhyue.com/data_processing/common_utils v0.0.0-20221205033056-885644941005
 	github.com/BurntSushi/toml v1.2.1
 	github.com/robfig/cron v1.2.0
 	github.com/spf13/cobra v1.6.1

+ 10 - 0
monitor/go.sum

@@ -1,11 +1,15 @@
 app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d h1:Nh2rC3LBqh0alvam2vr4is/vbUaPkl0rbZxVETx3nmk=
 app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
+app.yhyue.com/data_processing/common_utils v0.0.0-20221205033056-885644941005 h1:AEEi+8ao9pTVqPIh6uVvjxBby/i43fFj7DwVo+feDAE=
+app.yhyue.com/data_processing/common_utils v0.0.0-20221205033056-885644941005/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
 app.yhyue.com/data_processing/field_info_tag v1.1.1 h1:ouJx+s77O8N69RALYyBkOPNyVeJ8WZL+2Ot/eZSEHTw=
 app.yhyue.com/data_processing/field_info_tag v1.1.1/go.mod h1:i9NKOqhWmcDl0Cl5qHBBBx46Da0Qzys7L0J+3my/Hbw=
 github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
 github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
 github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
+github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
 github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
 github.com/antlabs/strsim v0.0.3 h1:J9AHxnybJZHKBoxeup1VZNWt3ST8QD+ieDJsm/nEpRo=
 github.com/antlabs/strsim v0.0.3/go.mod h1:bIcymn+2jtt01korFun0bs8PsYZeQa82aHoYMi7cm30=
@@ -15,6 +19,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dchest/captcha v1.0.0 h1:vw+bm/qMFvTgcjQlYVTuQBJkarm5R0YSsDKhm1HZI2o=
 github.com/dchest/captcha v1.0.0/go.mod h1:7zoElIawLp7GUMLcj54K9kbw+jEyvz2K0FDdRRYhvWo=
 github.com/donnie4w/go-logger v0.0.0-20170827050443-4740c51383f4 h1:T9PR91sjTtrA1HmZB4G+M7OLCelch0f6rIEY7Mm1T4U=
 github.com/donnie4w/go-logger v0.0.0-20170827050443-4740c51383f4/go.mod h1:L7S4x0R7vv3xoOhGuyAJyCO2MYzWOpccM4Isn8jIUgY=
@@ -25,6 +30,7 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC
 github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
 github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
 github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
 github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -39,6 +45,7 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
 github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -97,6 +104,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
@@ -125,9 +133,11 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/olivere/elastic.v2 v2.0.61 h1:7cpl3MW8ysa4GYFBXklpo5mspe4NK0rpZTdyZ+QcD4U=
 gopkg.in/olivere/elastic.v2 v2.0.61/go.mod h1:CTVyl1gckiFw1aLZYxC00g3f9jnHmhoOKcWF7W3c6n4=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

+ 23 - 6
monitor/main.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	util "app.yhyue.com/data_processing/common_utils"
 	"app.yhyue.com/data_processing/common_utils/log"
 	"bytes"
 	"encoding/json"
@@ -17,7 +18,8 @@ import (
 var (
 	WebUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a"
 
-	WarningStr = "数据采集bidding_file表数据,已累计%d数据未处理"
+	WarningStr  = "数据采集bidding_file表数据,已累计%d条数据未处理"
+	WarningStr1 = "标的物等字段识别提醒,bidding表数据已积累%d条未处理"
 )
 
 func init() {
@@ -35,18 +37,20 @@ func main() {
 
 func biddingFile() *cobra.Command {
 	cmdClient := &cobra.Command{
-		Use:   "bidding_file",
+		Use:   "bidding_warn",
 		Short: "Start statistics bidding_file data",
 		Run: func(cmd *cobra.Command, args []string) {
 			InitMgo()
 
-			taskinfo()
 			crn := cron.New()
 			_ = crn.AddFunc("@hourly", func() {
-				taskinfo()
+				taskFile()
 			})
 			crn.Start()
 
+			_ = crn.AddFunc("0 */30 * * * ?", func() {
+				taskPy()
+			})
 			c := make(chan bool, 1)
 			<-c
 		},
@@ -54,15 +58,28 @@ func biddingFile() *cobra.Command {
 	return cmdClient
 }
 
-func taskinfo() {
+func taskFile() {
 	count := MgoB.Count("bidding_file", bson.M{"moveok": bson.M{"$exists": false}})
-	if count > 10000 {
+	if count > config.Conf.Serve.FileWarn {
 		SendMsg(fmt.Sprintf(WarningStr, count))
 	} else {
 		log.Info("bidding_file", zap.Int("count", count))
 	}
 }
 
+func taskPy() {
+	info, _ := MgoB.Find("bidding_processing_ids", bson.M{"dataprocess": bson.M{"$in": []int{1, 2}}}, nil, bson.M{"count": 1, "dataprocess": 1}, false, -1, -1)
+	count := 0
+	for _, m := range *info {
+		count += util.IntAll(m["count"])
+	}
+	if count > config.Conf.Serve.PyWarn {
+		SendMsg(fmt.Sprintf(WarningStr1, count))
+	} else {
+		log.Info("bidding_py", zap.Int("count", count))
+	}
+}
+
 func SendMsg(content string) {
 	client := &http.Client{}
 	data := map[string]interface{}{"msgtype": "text", "text": map[string]interface{}{