Browse Source

存量分包

fengweiqiang 5 years ago
parent
commit
5a44ee1e00
3 changed files with 54 additions and 1 deletions
  1. 2 0
      udp_winner/timedTaskAgency.go
  2. 2 0
      udp_winner/timedTaskBuyer.go
  3. 50 1
      udp_winner/timedTaskWinner.go

+ 2 - 0
udp_winner/timedTaskAgency.go

@@ -330,6 +330,7 @@ func TimedTaskAgency() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		timenow:=time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_a_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
@@ -605,6 +606,7 @@ func TimedTaskAgency() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		nextNode("agencyent",timenow)
 	}
 }
 

+ 2 - 0
udp_winner/timedTaskBuyer.go

@@ -365,6 +365,7 @@ func TimedTaskBuyer() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		timenow := time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_b_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
@@ -654,6 +655,7 @@ func TimedTaskBuyer() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		nextNode("buyerent",timenow)
 	}
 }
 

+ 50 - 1
udp_winner/timedTaskWinner.go

@@ -70,6 +70,53 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 			tmp["_id"] = mgoId
 			//创建value数组
 			tmps := make([]map[string]interface{}, 0)
+			//存量删除分包
+			if v, ok := tmp["package"].(map[string]interface{}); ok {
+				for i, vv := range v {
+					if vvv, ok2 := vv.(map[string]interface{}); ok2 {
+						if pwinner, ok := vvv["winner"].(string); ok && pwinner != "" {
+							if pwinper, ok4 := vvv["winnerperson"].(string); ok4 && pwinper != "" {
+								ptmp := make(map[string]interface{})
+								ptmp["p_id"] = mgoId + "_pkg_" + i
+								ptmp["_id"] = bson.NewObjectId()
+								ptmp["winner"] = winner
+								ptmp["winnerperson"] = pwinper
+								if pkgtel, ok6 := vvv["winnertel"].(string); ok6 {
+									ptmp["winnertel"] = pkgtel
+								}
+								if tmp["topscopeclass"]!=nil{
+									ptmp["topscopeclass"] = tmp["topscopeclass"]
+								}
+								//分包里中标单位和企业名一样
+								if pwinner == winner {
+									tmps = append(tmps, ptmp)
+								}else if conn.Exists(pwinner).Val() > 0 {
+									//分包里中标单位和企业名不一样,存量里匹配上了
+									bytes, _ := conn.Get(pwinner).Bytes()
+									cltmps := make([]map[string]interface{}, 0)
+									json.Unmarshal(bytes, &cltmps)
+									cltmps = append(cltmps, ptmp)
+									bytes2, _ := json.Marshal(cltmps)
+									//存量redis的key存在,合并  key :企业名 val :[]map
+									if err := conn.Set(pwinner, string(bytes2), 0).Err(); err != nil {
+										log.Println(err)
+									}
+								} else {
+									//分包和企业名不一样,而且存量没匹配上
+									cltmps := make([]map[string]interface{}, 0)
+									cltmps = append(cltmps, ptmp)
+									bytes2, _ := json.Marshal(cltmps)
+									//存量redis的key新增,新增  key :企业名 val :[]map
+									if err := conn.Set(pwinner, string(bytes2), 0).Err(); err != nil {
+										log.Println(err)
+									}
+								}
+							}
+						}
+					}
+				}
+				delete(tmp, "package")
+			}
 			if e_num > 0 {
 				//存量redis的key存在,累加更新
 				bytes, _ := conn.Get(winner).Bytes()
@@ -213,7 +260,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
 		//发送udp 更新es段
-		nextNode("winnerent",timenow)
+		nextNode("winnerent", timenow)
 	}
 
 }
@@ -346,6 +393,7 @@ func TimedTaskWinner() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		timenow:=time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_w_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
@@ -642,6 +690,7 @@ func TimedTaskWinner() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		nextNode("winnerent", timenow)
 	}
 }