瀏覽代碼

新流程

xuzhiheng 11 月之前
父節點
當前提交
4524cfbb2b
共有 3 個文件被更改,包括 33 次插入18 次删除
  1. 9 0
      field_sync/init.go
  2. 16 10
      field_sync/main.go
  3. 8 8
      field_sync/task.go

+ 9 - 0
field_sync/init.go

@@ -37,6 +37,15 @@ func InitMgo() {
 	}
 	MgoB.InitPool()
 
+	MgoBP = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoB.Addr,
+		DbName:      "qfw",
+		Size:        config.Conf.DB.MongoB.Size,
+		UserName:    config.Conf.DB.MongoB.User,
+		Password:    config.Conf.DB.MongoB.Password,
+	}
+	MgoBP.InitPool()
+
 	MgoE = &mongodb.MongodbSim{
 		MongodbAddr: config.Conf.DB.MongoE.Addr,
 		DbName:      config.Conf.DB.MongoE.Dbname,

+ 16 - 10
field_sync/main.go

@@ -25,11 +25,11 @@ import (
 )
 
 var (
-	MgoB *mongodb.MongodbSim
-	MgoE *mongodb.MongodbSim
-	MgoQ *mongodb.MongodbSim // 企业
-	MgoP *mongodb.MongodbSim // 凭安企业
-	Es   elastic.Es
+	MgoB, MgoBP *mongodb.MongodbSim
+	MgoE        *mongodb.MongodbSim
+	MgoQ        *mongodb.MongodbSim // 企业
+	MgoP        *mongodb.MongodbSim // 凭安企业
+	Es          elastic.Es
 
 	UdpClient  udp.UdpClient
 	UdpTaskMap = &sync.Map{}
@@ -67,7 +67,7 @@ func main() {
 	UdpClient.Listen(processUdpMsg)
 	log.Println("Udp服务监听 port:", config.Conf.Udp.LocPort)
 
-	info, _ := MgoB.Find("bidding_processing_ids", `{"dataprocess": 6}`, bson.M{"_id": 1}, nil, false, -1, -1)
+	info, _ := MgoBP.Find("bidding_processing_ids", `{"dataprocess_ai": 5}`, bson.M{"_id": 1}, nil, false, -1, -1)
 	log.Println(len(*info))
 	log.Println("size", len(*info))
 	if len(*info) > 0 {
@@ -366,11 +366,17 @@ func nsqMethod() {
 	for {
 		select {
 		case obj := <-Mcmer.Ch: //从通道读取即可
-			id := strings.Split(util.ObjToString(obj), "=")
-			if bson.IsObjectIdHex(id[1]) {
-				taskinfo(id[1])
+			objstr := util.ObjToString(obj)
+			log.Println("obj ", obj, objstr)
+			id := strings.Split(objstr, "=")
+			if len(id) > 1 {
+				if bson.IsObjectIdHex(id[1]) {
+					taskinfo(id[1])
+				} else {
+					log.Println("jy nsq id err id", objstr)
+				}
 			} else {
-				log.Println("jy nsq id err id", id[1])
+				log.Println("jy nsq id err id", objstr)
 			}
 		}
 	}

+ 8 - 8
field_sync/task.go

@@ -85,20 +85,20 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	log.Println("bidding sync...over all", count, "extract sync ", c)
 	NextNode(mapInfo, stype)
 	NextNodePro(mapInfo, stype)
-	NextNodeTidb(mapInfo, stype)
+	// NextNodeTidb(mapInfo, stype)
 	if stype == "bidding_history" {
-		NextNodeBidData(mapInfo)  // bidding-data数据
-		NextNodeTidbQyxy(mapInfo) // tidb-企业数据
-		NextNodeHn(mapInfo)
+		NextNodeBidData(mapInfo) // bidding-data数据
+		// NextNodeTidbQyxy(mapInfo) // tidb-企业数据
+		// NextNodeHn(mapInfo)
 	}
 	if stype == "bidding" {
 		uq := bson.M{"gtid": util.ObjToString(mapInfo["gtid"]), "lteid": util.ObjToString(mapInfo["lteid"])}
-		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true)
+		MgoBP.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess_ai": 6, "updatetime": time.Now().Unix()}}, false, true)
 	}
 	//领域标签处理的数据 id段
-	if stype == "bidding_history" {
-		MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
-	}
+	// if stype == "bidding_history" {
+	// 	MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
+	// }
 }
 
 func biddingAllTask(data []byte, mapInfo map[string]interface{}) {