fengweiqiang 5 年之前
父节点
当前提交
19d35b1360
共有 2 个文件被更改,包括 12 次插入7 次删除
  1. 8 6
      udp_winner/main.go
  2. 4 1
      udp_winner/mgo.go

+ 8 - 6
udp_winner/main.go

@@ -132,7 +132,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
 			return
 		} else if tmp != nil {
-			udpclient.WriteUdp([]byte("ok,run"), mu.OP_NOOP, ra)
+			if key,ok := (*tmp)["key"].(string);ok{
+				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+			}else {
+				udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
+			}
 			go task(tmp)
 		}
 	case mu.OP_NOOP: //下个节点回应
@@ -172,12 +176,11 @@ func task(mapinfo *map[string]interface{}) {
 	overid := gtid
 	tmp := map[string]interface{}{}
 	for cursor.Next(&tmp) {
+		overid = tmp["_id"].(primitive.ObjectID).Hex()
+		log.Println(tmp["_id"])
 		if tmp["winner"] == nil || tmp["winner"] == "" {
 			continue
 		}
-		overid = tmp["_id"].(primitive.ObjectID).Hex()
-		log.Println(tmp["_id"])
-
 		//redis查询是否存在
 		rdb := RedisPool.Get()
 		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
@@ -208,9 +211,8 @@ func task(mapinfo *map[string]interface{}) {
 				log.Println(err)
 			}
 			//拿到合并后的qyk
-			oldTmp := make(map[string]interface{})
 			FClient.DbName = Config["mgodb_extract_kf"]
-			oldTmp = FClient.FindById(Config["mgo_qyk_c"], reply)
+			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
 			if oldTmp == nil{
 				log.Println("redis id 不存在")
 				continue

+ 4 - 1
udp_winner/mgo.go

@@ -269,7 +269,10 @@ func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
 	coll := m.C.Database(m.DbName).Collection(c)
 	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
 	v := map[string]interface{}{}
-	r.Decode(&v)
+	if e := r.Decode(&v);e != nil{
+		log.Println(e)
+		return nil
+	}
 	return v
 }