Jianghan 2 năm trước cách đây
mục cha
commit
a0ff04099e

+ 5 - 0
README.md

@@ -13,6 +13,11 @@
 + isValidFile 附件有效字段  
 + entidlist 中标单位id字段  
 
+
+## processing_ids 数据处理流程-id段保存
++ 定时5分钟,保存id段
++ 保存id段(dataprocess=0,updatetime)—>招标分类(dataprocess=1,updatetime)—>标的物识别(dataprocess=2,updatetime)—>抽取(dataprocess=3,updatetime)—>字段清理(dataprocess=3,updatetime)—>行业分类(dataprocess=4,updatetime)—>判重(dataprocess=5,updatetime)—>bidding表字段同步(dataprocess=6,updatetime)
+
 ## data_tidb 数据处理流程-数据同步到tidb库(bidding、proejctset)  
 + bidding数据  
   + 基本信息

+ 77 - 18
data_tidb/bidding.go

@@ -21,6 +21,52 @@ var (
 	regLetter = regexp.MustCompile("[a-z]*")
 )
 
+func doBiddingTask(gtid, lteid string, mapInfo map[string]interface{}) {
+	sess := MongoB.GetMgoConn()
+	defer MongoB.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	stype := util.ObjToString(mapInfo["stype"])
+	q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId(gtid),
+		"$lte": mongodb.StringTOBsonId(lteid)}}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%1000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if util.IntAll(tmp["dataprocess"]) != 8 {
+				return
+			}
+			if stype == "bidding_history" && tmp["history_updatetime"] == nil {
+				return
+			}
+			taskBase(tmp)
+			taskTags(tmp)
+			//taskExpand(tmp)
+			//taskAtts(tmp)
+			//taskInfoformat(tmp)
+			//taskIntent(tmp)
+			//taskWinner(tmp)
+			//taskPackage(tmp)
+			//taskPur(tmp)
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
 func taskB() {
 	sess := MongoB.GetMgoConn()
 	defer MongoB.DestoryMongoConn(sess)
@@ -28,9 +74,10 @@ func taskB() {
 	ch := make(chan bool, 10)
 	wg := &sync.WaitGroup{}
 
-	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5f4c9672c014544073734928")}
-	//q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("5ad8c18fa5cb26b9b72ce098")}}
-	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding_back").Find(nil).Sort("_id").Iter()
+	q := map[string]interface{}{"_id": mongodb.StringTOBsonId("634eac71911e1eb345b2d861")}
+	//q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("6347dc8a911e1eb345aa7fcf"),
+	//	"$lte": mongodb.StringTOBsonId("634e93d6631ff1ac3de1cf18")}}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%20000 == 0 {
@@ -46,8 +93,8 @@ func taskB() {
 
 			if util.IntAll(tmp["extracttype"]) != -1 {
 				taskBase(tmp)
+				taskTags(tmp)
 				//taskExpand(tmp)
-				//taskTags(tmp)
 				//taskAtts(tmp)
 				//taskInfoformat(tmp)
 				//taskIntent(tmp)
@@ -112,15 +159,18 @@ func taskBase(tmp map[string]interface{}) {
 			}
 		} else if f == "buyer_id" {
 			if b := util.ObjToString(tmp["buyer"]); b != "" {
+				util.Debug(b)
 				saveM["buyer"] = b
-				if code := redis.GetStr("qyxy_id", b); code != "" {
+				//if code := redis.GetStr("qyxy_id", b); code != "" {
+				if code := getNameId(b); code != "" {
 					saveM[f] = code
 				}
 			}
 		} else if f == "agency_id" {
 			if b := util.ObjToString(tmp["agency"]); b != "" {
 				saveM["agency"] = b
-				if code := redis.GetStr("qyxy_id", b); code != "" {
+				//if code := redis.GetStr("qyxy_id", b); code != "" {
+				if code := getNameId(b); code != "" {
 					saveM[f] = code
 				}
 			} else {
@@ -143,21 +193,32 @@ func taskBase(tmp map[string]interface{}) {
 			}
 		}
 	}
+	util.Debug(saveM)
 	saveBasePool <- saveM
 	if len(errf) > 0 {
 		saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")}
 	}
 }
 
+func getNameId(name string) string {
+	info := MysqlTool.FindOne("dws_f_ent_baseinfo", map[string]interface{}{"name": name}, "name_id", "")
+	if info != nil && (*info)["name_Id"] != nil {
+		return util.ObjToString((*info)["name_Id"])
+	} else {
+		return ""
+	}
+}
+
 // @Description 扩展信息
 // @Author J 2022/9/22 11:13
 func taskExpand(tmp map[string]interface{}) {
 	saveM := make(map[string]interface{})
+	var errf []string // 异常字段
 	for _, f := range ExpandField {
 		if f == "infoid" {
 			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
 		} else if f == "project_startdate" || f == "project_completedate" || f == "signaturedate" || f == "bidendtime" || f == "bidstarttime" {
-			if tmp[f] != nil {
+			if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
 				t := util.Int64All(tmp[f])
 				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
 			}
@@ -205,17 +266,12 @@ func taskExpand(tmp map[string]interface{}) {
 			}
 		} else {
 			if tmp[f] != nil {
-				if reflect.TypeOf(tmp[f]).String() == "string" {
-					if f == "projectperiod" || f == "payway" || f == "bidopenaddress" {
-						if len(util.ObjToString(tmp[f])) <= 500 {
-							saveM[f] = tmp[f]
-						}
-					} else if f == "funds" || f == "getdocmethod" || f == "project_scale" {
-						if len(util.ObjToString(tmp[f])) <= 5000 {
-							saveM[f] = tmp[f]
-						}
-					} else {
-						saveM[f] = tmp[f]
+				if ExpandVMap[f] != nil {
+					var b bool
+					saveM[f], b = verifyF(f, tmp[f], ExpandVMap[f])
+					// 保存异常字段数据
+					if b {
+						errf = append(errf, f)
 					}
 				} else {
 					saveM[f] = tmp[f]
@@ -224,6 +280,9 @@ func taskExpand(tmp map[string]interface{}) {
 		}
 	}
 	saveExpandPool <- saveM
+	if len(errf) > 0 {
+		saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")}
+	}
 }
 
 // @Description 标签记录

+ 3 - 0
data_tidb/common.toml

@@ -1,5 +1,8 @@
 
+[udp]
+locport = ":1681"
 [db]
+
 [db.mysql]
 addr = "192.168.3.14:4000"
 dbname = "global_common_data"

+ 3 - 4
data_tidb/config/conf.go

@@ -23,15 +23,14 @@ func Init(conf string) {
 }
 
 type conf struct {
+	Udp  udp
 	DB   db
 	Mail mail
 	Log  log
 }
 
-type udpNext struct {
-	Addr  string
-	Port  int
-	Stype string
+type udp struct {
+	LocPort string
 }
 
 type mail struct {

BIN
data_tidb/data_tidb_linux


+ 108 - 0
data_tidb/field-criteria.json

@@ -54,6 +54,114 @@
       }
     }
   },
+  "dws_f_bid_expand_baseinfo": {
+    "field_array": ["infoid", "projectperiod", "project_duration", "project_timeunit", "project_startdate", "project_completedate", "signstarttime",
+      "signendtime", "docstarttime", "docendtime", "bidendtime", "bidstarttime", "signaturedate", "bidway", "docamount", "agencyrate", "agencyfee",
+      "currency", "funds", "payway", "bidamounttype", "review_experts", "bidmethod", "bid_bond", "bid_guarantee", "contract_bond", "contract_guarantee",
+      "bidopenaddress", "contractcode", "supervisorrate", "getdocmethod", "projectaddr", "project_scale", "purchasing_tag", "updatetime", "createtime"],
+    "field_criteria": {
+      "projectperiod": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "project_timeunit": {
+        "stype": "string",
+        "length": 50,
+        "intercept": true
+      },
+      "funds": {
+        "stype": "string",
+        "length": 100,
+        "intercept": true
+      },
+      "currency": {
+        "stype": "string",
+        "length": 20,
+        "intercept": true
+      },
+      "payway": {
+        "stype": "string",
+        "length": 255,
+        "intercept": true
+      },
+      "bidmethod": {
+        "stype": "string",
+        "length": 255,
+        "intercept": true
+      },
+      "bid_bond": {
+        "stype": "string",
+        "length": 1000,
+        "intercept": true
+      },
+      "contract_bond": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "bidopenaddress": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "contractcode": {
+        "stype": "string",
+        "length": 100,
+        "intercept": false
+      },
+      "getdocmethod": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "projectaddr": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "project_scale": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "purchasing_tag": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "biddiscount": {
+        "stype": "float",
+        "min": 0,
+        "max": 999999,
+        "decimal": 4
+      },
+      "docamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 99999999,
+        "decimal": 2
+      },
+      "agencyrate": {
+        "stype": "float",
+        "min": 0,
+        "max": 999999,
+        "decimal": 4
+      },
+      "agencyfee": {
+        "stype": "float",
+        "min": 0,
+        "max": 99999999,
+        "decimal": 2
+      },
+      "supervisorrate": {
+        "stype": "float",
+        "min": 0,
+        "max": 999999,
+        "decimal": 4
+      }
+    }
+  },
   "dws_f_project_baseinfo": {
     "field_array": ["projectid", "projectcode", "projectname", "area_code", "city_code", "district_code", "budget", "bidamount", "bidstatus", "bidtype",
       "bidopentime", "createtime", "firsttime", "zbtime", "jgtime", "lasttime", "buyer_id", "agency_id", "updatetime"],

+ 15 - 8
data_tidb/init.go

@@ -59,14 +59,14 @@ var (
 
 	BaseField   []string
 	BaseVMap    map[string]interface{}
+	ExpandField []string
+	ExpandVMap  map[string]interface{}
+
 	ProField    []string
 	ProVMap     map[string]interface{}
 	ProBusField []string
 	ProBusVMap  map[string]interface{}
-	ExpandField = []string{"infoid", "projectperiod", "project_duration", "project_timeunit", "project_startdate", "project_completedate", "signstarttime",
-		"signendtime", "docstarttime", "docendtime", "bidendtime", "bidstarttime", "signaturedate", "bidway", "docamount", "agencyrate", "agencyfee",
-		"currency", "funds", "payway", "bidamounttype", "review_experts", "bidmethod", "bid_bond", "bid_guarantee", "contract_bond", "contract_guarantee",
-		"bidopenaddress", "contractcode", "supervisorrate", "getdocmethod", "projectaddr", "project_scale", "purchasing_tag", "updatetime", "createtime"}
+
 	TagsField    = []string{"infoid", "labelcode", "labelvalues", "labelweight", "createtime"}
 	AttrField    = []string{"infoid", "filename", "fid", "ftype", "org_url", "size", "file_type", "createtime"}
 	FileTypeArr  = []string{"pdf", "doc", "docx", "xlsx", "xls", "jpg", "zip", "rar", "txt", "gif", "png", "bmp", "swf", "html"}
@@ -181,24 +181,31 @@ func InitField() {
 			BaseVMap = m["field_criteria"].(map[string]interface{})
 		} else {
 			log.Error("InitField", zap.String("field_array", "dws_f_bid_baseinfo"))
-			os.Exit(-1)
+			panic("dws_f_bid_baseinfo")
+		}
+		if m, o := FCriteria["dws_f_bid_expand_baseinfo"].(map[string]interface{}); o {
+			ExpandField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			ExpandVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_bid_expand_baseinfo"))
+			panic("dws_f_bid_expand_baseinfo")
 		}
 		if m, o := FCriteria["dws_f_project_baseinfo"].(map[string]interface{}); o {
 			ProField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
 			ProVMap = m["field_criteria"].(map[string]interface{})
 		} else {
 			log.Error("InitField", zap.String("field_array", "dws_f_project_baseinfo"))
-			os.Exit(-1)
+			panic("dws_f_project_baseinfo")
 		}
 		if m, o := FCriteria["dws_f_project_business"].(map[string]interface{}); o {
 			ProBusField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
 			ProBusVMap = m["field_criteria"].(map[string]interface{})
 		} else {
 			log.Error("InitField", zap.String("field_array", "dws_f_project_business"))
-			os.Exit(-1)
+			panic("dws_f_project_business")
 		}
 	} else {
 		log.Error("InitField, 未找到field-criteria.json文件")
-		os.Exit(-1)
+		panic("InitField, 未找到field-criteria.json文件")
 	}
 }

+ 44 - 6
data_tidb/main.go

@@ -5,15 +5,22 @@ import (
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/redis"
+	"app.yhyue.com/data_processing/common_utils/udp"
 	"data_tidb/config"
+	"encoding/json"
 	"fmt"
 	"github.com/spf13/cobra"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
+	"net"
 	"sync"
 	"time"
 )
 
+var (
+	UdpClient udp.UdpClient
+)
+
 func init() {
 	config.Init("./common.toml")
 	InitLog()
@@ -21,13 +28,17 @@ func init() {
 	InitMysql()
 	InitField()
 
-	redis.InitRedis1("qyxy_id=127.0.0.1:4379", 1)
+	//redis.InitRedis1("qyxy_id=127.0.0.1:4379", 1)
 	//redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
 	log.Info("init success")
 }
 
 func main() {
 
+	//go SaveFunc()
+	//go SaveTagFunc()
+	//go saveErrMethod()
+
 	rootCmd := &cobra.Command{Use: "my cmd"}
 	rootCmd.AddCommand(bidding())
 	rootCmd.AddCommand(project())
@@ -35,14 +46,40 @@ func main() {
 		fmt.Println("rootCmd.Execute failed", err.Error())
 	}
 
-	//taskMysql()
-	//taskMgo()
+	//UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
+	//UdpClient.Listen(processUdpMsg)
+	//log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
 
 	c := make(chan bool, 1)
 	<-c
 
 }
 
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer util.Catch()
+	switch act {
+	case udp.OP_TYPE_DATA: //上个节点的数据
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
+		gtid, _ := mapInfo["gtid"].(string)
+		lteid, _ := mapInfo["lteid"].(string)
+		if err != nil || gtid == "" || lteid == "" {
+			UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
+		} else {
+			//udp成功回写
+			if k := util.ObjToString(mapInfo["key"]); k != "" {
+				UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
+			} else {
+				k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
+				UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
+			}
+			log.Info("start sync ...")
+			doBiddingTask(gtid, lteid, mapInfo)
+		}
+	}
+}
+
 func taskMysql() {
 	pool := make(chan bool, 5) //控制线程数
 	wg := &sync.WaitGroup{}
@@ -227,8 +264,9 @@ func bidding() *cobra.Command {
 		Use:   "bidding",
 		Short: "Start processing bidding data",
 		Run: func(cmd *cobra.Command, args []string) {
+
 			go SaveFunc()
-			//go SaveTagFunc()
+			go SaveTagFunc()
 			//go SaveExpandFunc()
 			//go SaveAttrFunc()
 			//go SaveImfFunc()
@@ -277,7 +315,7 @@ func SaveFunc() {
 					defer func() {
 						<-saveBaseSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bid_baseinfo_new", BaseField, arru...)
+					MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -289,7 +327,7 @@ func SaveFunc() {
 					defer func() {
 						<-saveBaseSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bid_baseinfo_new", BaseField, arru...)
+					MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0

+ 1 - 0
data_tidb/util.go

@@ -47,6 +47,7 @@ func verifyF(f string, v interface{}, tmp1 interface{}) (interface{}, bool) {
 				return nil, true
 			}
 			return v2, false
+
 		default:
 			break
 		}

BIN
field_py/field_dispose_linux


+ 6 - 3
field_py/task.go

@@ -200,7 +200,6 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	//处理数据
 	addr := ip + ":" + fmt.Sprint(port)
 	start := time.Now()
-	log.Info("rpc", zap.String("addr", addr))
 	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 	if err != nil {
 		atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
@@ -212,7 +211,9 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	req := &proto.GoodsRequest{
 		Contents: reqStr,
 	}
-	resp, err := client.GoodsExtract(context.Background(), req)
+	ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
+	defer cancel()
+	resp, err := client.GoodsExtract(ctx, req)
 	if err != nil {
 		return nil, errors.New("Deal Data Error")
 	}
@@ -270,7 +271,9 @@ func rpcGetFieldR(reqStr string) (map[string]interface{}, error) {
 	req := &proto.ContentRequest{
 		Contents: reqStr,
 	}
-	resp, err := client.Extract(context.Background(), req)
+	ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
+	defer cancel()
+	resp, err := client.Extract(ctx, req)
 	if err != nil {
 		return nil, errors.New("Deal Data Error")
 	}

+ 9 - 0
field_sync/common.toml

@@ -24,6 +24,15 @@ stype = ""
 addr = "127.0.0.1"
 port = 1680
 stype = "subject"
+[udp.tidb1]
+addr = "127.0.0.1"
+port = 1681
+stype = ""
+[udp.tidb2]
+addr = "127.0.0.1"
+port = 1970
+stype = ""
+
 
 [nsq]
 addr = "192.168.3.166:4150"

+ 2 - 0
field_sync/config/conf.go

@@ -41,6 +41,8 @@ type udp struct {
 	Next    udpNext
 	Project udpNext
 	Tidb    udpNext
+	Tidb1   udpNext
+	Tidb2   udpNext
 }
 
 type nsq struct {

+ 37 - 5
field_sync/main.go

@@ -135,7 +135,7 @@ func NextNode(mapInfo map[string]interface{}, stype string) {
 	mapInfo["stype"] = stype
 	key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), stype)
 	mapInfo["key"] = key
-	log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
+	log.Info("udp es node", zap.Any("mapinfo:", mapInfo))
 	datas, _ := json.Marshal(mapInfo)
 	node := &UdpNode{datas, next, time.Now().Unix(), 0}
 	UdpTaskMap.Store(key, node)
@@ -168,20 +168,52 @@ func NextNodeBidData(mapInfo map[string]interface{}) {
 	}
 	mapInfo["stype"] = "biddingdata"
 	mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	log.Info("udp es node", zap.Any("mapinfo:", mapInfo))
 	datas, _ := json.Marshal(mapInfo)
 	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
 }
 
-func NextNodeTidb(mapInfo map[string]interface{}) {
+func NextNodeTidbQyxy(mapInfo map[string]interface{}) {
 	next := &net.UDPAddr{
 		IP:   net.ParseIP(config.Conf.Udp.Tidb.Addr),
 		Port: util.IntAll(config.Conf.Udp.Tidb.Port),
 	}
-	mapInfo["stype"] = "subject"
-	key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	mapInfo["stype"] = config.Conf.Udp.Tidb.Stype
+	mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	log.Info("udp tidb-qyxy node", zap.Any("mapinfo:", mapInfo))
 	datas, _ := json.Marshal(mapInfo)
 	node := &UdpNode{datas, next, time.Now().Unix(), 0}
-	UdpTaskMap.Store(key, node)
+	UdpTaskMap.Store(mapInfo["key"], node)
+	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
+}
+
+func NextNodeTidb(mapInfo map[string]interface{}, stype string) {
+	next := &net.UDPAddr{
+		IP:   net.ParseIP(config.Conf.Udp.Tidb1.Addr),
+		Port: util.IntAll(config.Conf.Udp.Tidb1.Port),
+	}
+	mapInfo["stype"] = stype
+	mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	log.Info("udp tidb-bidding node", zap.Any("mapinfo:", mapInfo))
+	datas, _ := json.Marshal(mapInfo)
+	node := &UdpNode{datas, next, time.Now().Unix(), 0}
+	UdpTaskMap.Store(mapInfo["key"], node)
+	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
+}
+
+// NextNodeHn @Description 郑坤 海南数据处理
+// @Author J 2022/10/28 09:26
+func NextNodeHn(mapInfo map[string]interface{}) {
+	next := &net.UDPAddr{
+		IP:   net.ParseIP(config.Conf.Udp.Tidb2.Addr),
+		Port: util.IntAll(config.Conf.Udp.Tidb2.Port),
+	}
+	mapInfo["stype"] = "hainan"
+	mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	log.Info("NextNodeTidb", zap.Any("mapinfo:", mapInfo))
+	datas, _ := json.Marshal(mapInfo)
+	node := &UdpNode{datas, next, time.Now().Unix(), 0}
+	UdpTaskMap.Store(mapInfo["key"], node)
 	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
 }
 

+ 4 - 2
field_sync/task.go

@@ -83,9 +83,11 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c))
 	NextNode(mapInfo, stype)
 	NextNodePro(mapInfo, stype)
+	NextNodeTidb(mapInfo, stype)
 	if stype == "bidding_history" {
-		NextNodeBidData(mapInfo) // bidding-data数据
-		NextNodeTidb(mapInfo)    // tidb-企业数据
+		NextNodeBidData(mapInfo)  // bidding-data数据
+		NextNodeTidbQyxy(mapInfo) // tidb-企业数据
+		NextNodeHn(mapInfo)
 	}
 }
 

+ 8 - 0
processing_ids/go.mod

@@ -0,0 +1,8 @@
+module processing_ids
+
+go 1.16
+
+require (
+	app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d
+	github.com/robfig/cron v1.2.0
+)

+ 90 - 0
processing_ids/go.sum

@@ -0,0 +1,90 @@
+app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d h1:Nh2rC3LBqh0alvam2vr4is/vbUaPkl0rbZxVETx3nmk=
+app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
+github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
+github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
+github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
+github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
+github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dchest/captcha v1.0.0 h1:vw+bm/qMFvTgcjQlYVTuQBJkarm5R0YSsDKhm1HZI2o=
+github.com/dchest/captcha v1.0.0/go.mod h1:7zoElIawLp7GUMLcj54K9kbw+jEyvz2K0FDdRRYhvWo=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
+github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/olivere/elastic.v2 v2.0.61/go.mod h1:CTVyl1gckiFw1aLZYxC00g3f9jnHmhoOKcWF7W3c6n4=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 82 - 0
processing_ids/main.go

@@ -0,0 +1,82 @@
+package main
+
+import (
+	"app.yhyue.com/data_processing/common_utils/log"
+	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"fmt"
+	"github.com/robfig/cron"
+	"go.uber.org/zap"
+	"io/ioutil"
+	"net/http"
+	"time"
+)
+
+var (
+	MgoBid          *mongodb.MongodbSim
+	lastId, startId string
+
+	mail_to  = "zhangjinkun@topnet.net.cn,wangjianghan@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn"
+	mail_api = "http://172.17.145.179:19281/_send/_mail"
+)
+
+func init() {
+	MgoBid = &mongodb.MongodbSim{
+		MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
+		DbName:      "qfw",
+		Size:        5,
+		UserName:    "SJZY_RWESBid_Other",
+		Password:    "SJZY@O17t8herB3B",
+	}
+	MgoBid.InitPool()
+
+	startId = ""
+}
+
+func main() {
+	TimeTask()
+}
+
+func TimeTask() {
+	c := cron.New()
+	cronstr := "0 */5 * * * ?" // 每5min执行一次
+	_ = c.AddFunc(cronstr, func() {
+		taskinfo()
+	})
+	c.Start()
+}
+
+func taskinfo() {
+	info, _ := MgoBid.Find("bidding", nil, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
+	if info != nil {
+		lastId = mongodb.BsonIdToSId((*info)[0]["_id"])
+	} else {
+		sendMail("bidding表id查询失败")
+		return
+	}
+	q := map[string]interface{}{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}
+	count := MgoBid.Count("bidding", q)
+	ids := fmt.Sprintf("%s-%s", startId, lastId)
+	if count <= 0 {
+		sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
+		return
+	}
+	save := make(map[string]interface{})
+	save["gtid"] = startId
+	save["lteid"] = lastId
+	save["count"] = count
+	now := time.Now().Unix()
+	save["dataprocess"] = 0
+	save["createtime"] = now
+	save["updatetime"] = now
+	MgoBid.Save("bidding_processing_ids", save)
+
+}
+
+func sendMail(content string) {
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_api, "processing_ids-send-fail", content))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
+	}
+}