Jianghan 2 éve
szülő
commit
7837e8f35f
3 módosított fájl, 25 hozzáadás és 13 törlés
  1. 5 1
      README.md
  2. 1 1
      main.go
  3. 19 11
      task.go

+ 5 - 1
README.md

@@ -4,4 +4,8 @@
 + rpc serve-name goods_service
 
 ### 评标专家 review_experts
-+ rpc serve-name extract_expert_service
++ rpc serve-name extract_expert_service
+
+
+后续修改
+id段内,只需要通过微服务中心获取到5个ip+端口(非重复)可以一直重复处理数据

+ 1 - 1
main.go

@@ -55,7 +55,7 @@ type UdpNode struct {
 func main() {
 	go SaveErrorInfo() //保存异常信息
 	//go CheckErrorNum()
-	go updateEsMethod()
+	//go updateEsMethod()
 	go checkMapJob()
 	go updateMethod()
 

+ 19 - 11
task.go

@@ -63,32 +63,38 @@ func getIntention(gtid, lteid string, mapinfo map[string]interface{}) {
 			id := mongodb.BsonIdToSId(tmp["_id"])
 			update := make(map[string]interface{})
 			result2 := taskB(tmp, gtid, lteid)
-			//result1 := taskA(tmp, gtid, lteid)
+			result1 := taskA(tmp, gtid, lteid)
 			if r, ok := result2["result"].(map[string]interface{}); ok {
 				if r[id] != nil && len(r[id].([]interface{})) > 0 {
 					update["review_experts"] = strings.Join(util.ObjArrToStringArr(r[id].([]interface{})), ",")
 				}
 			}
-			//if result1 != nil && len(result1) > 0 {
-			//	update = result1
-			//}
+			util.Debug(result1)
+			if result1 != nil && len(result1) > 0 {
+				if result1["purchasinglist"] != nil {
+					update["purchasinglist"] = result1["purchasinglist"]
+				}
+				if result1["procurementlist"] != nil {
+					update["procurementlist"] = result1["procurementlist"]
+				}
+			}
 			if len(update) > 0 {
 				updatePool <- []map[string]interface{}{
-					{"_id": tmp["_id"]},
+					{"_id": mongodb.StringTOBsonId(id)},
 					{"$set": update},
 				}
-				updateEsPool <- []map[string]interface{}{{
-					"_id": id,
-				},
-					update,
-				}
+				//updateEsPool <- []map[string]interface{}{{
+				//	"_id": id,
+				//},
+				//	update,
+				//}
 			}
 		}(tmp)
 		tmp = map[string]interface{}{}
 	}
 	wg.Wait()
 	log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid))
-	//NextNode(mapinfo)
+	NextNode(mapinfo)
 }
 
 // @Description procurementlist
@@ -194,6 +200,7 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
 	//处理数据
 	addr := ip + ":" + fmt.Sprint(port)
+	log.Info("rpc", zap.String("addr", addr), zap.String("str", reqStr))
 	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 	if err != nil {
 		atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
@@ -247,6 +254,7 @@ func rpcGetFieldR(reqStr string) (map[string]interface{}, error) {
 	atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
 	//处理数据
 	addr := ip + ":" + fmt.Sprint(port)
+	log.Info("rpc", zap.String("addr", addr), zap.String("str", reqStr))
 	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 	if err != nil {
 		atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1