Эх сурвалжийг харах

Merge branch 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract into dev3.4

apple 5 жил өмнө
parent
commit
c2e98b0775

+ 6 - 4
udp_winner/timedTaskAgency.go

@@ -29,12 +29,12 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
-	timenow := time.Now().Unix()
+	//timenow := time.Now().Unix()
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// agencyaddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn(86400)
+	SourceClientcc := SourceClient.GetMgoConn(8640000)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -206,7 +206,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
 		//发送udp 更新es段
-		nextNode("agencyent",timenow)
+		//nextNode("agencyent",timenow)
 	}
 
 }
@@ -330,13 +330,14 @@ func TimedTaskAgency() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		//timenow:=time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_a_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 1)
+				t2.Reset(time.Minute * 5)
 				FClient.DestoryMongoConn(Fcconn)
 				continue
 			} else {
@@ -605,6 +606,7 @@ func TimedTaskAgency() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		//nextNode("agencyent",timenow)
 	}
 }
 

+ 6 - 4
udp_winner/timedTaskBuyer.go

@@ -29,12 +29,12 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
-	timenow:=time.Now().Unix()
+	//timenow:=time.Now().Unix()
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// buyeraddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn(86400)
+	SourceClientcc := SourceClient.GetMgoConn(8640000)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -224,7 +224,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
 		//发送udp 更新es段
-		nextNode("buyerent",timenow)
+		//nextNode("buyerent",timenow)
 	}
 
 }
@@ -365,13 +365,14 @@ func TimedTaskBuyer() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		//timenow := time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_b_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 1)
+				t2.Reset(time.Minute * 5)
 				FClient.DestoryMongoConn(Fcconn)
 				continue
 			} else {
@@ -654,6 +655,7 @@ func TimedTaskBuyer() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		//nextNode("buyerent",timenow)
 	}
 }
 

+ 53 - 4
udp_winner/timedTaskWinner.go

@@ -29,12 +29,12 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		log.Println(gtid, lteid, "不是Objectid,转换_id错误", gtid, lteid)
 		return
 	}
-	timenow := time.Now().Unix()
+	//timenow := time.Now().Unix()
 	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// winneraddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn(86400)
+	SourceClientcc := SourceClient.GetMgoConn(8640000)
 	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
@@ -70,6 +70,53 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 			tmp["_id"] = mgoId
 			//创建value数组
 			tmps := make([]map[string]interface{}, 0)
+			//存量删除分包
+			if v, ok := tmp["package"].(map[string]interface{}); ok {
+				for i, vv := range v {
+					if vvv, ok2 := vv.(map[string]interface{}); ok2 {
+						if pwinner, ok := vvv["winner"].(string); ok && pwinner != "" {
+							if pwinper, ok4 := vvv["winnerperson"].(string); ok4 && pwinper != "" {
+								ptmp := make(map[string]interface{})
+								ptmp["p_id"] = mgoId + "_pkg_" + i
+								ptmp["_id"] = bson.NewObjectId()
+								ptmp["winner"] = winner
+								ptmp["winnerperson"] = pwinper
+								if pkgtel, ok6 := vvv["winnertel"].(string); ok6 {
+									ptmp["winnertel"] = pkgtel
+								}
+								if tmp["topscopeclass"]!=nil{
+									ptmp["topscopeclass"] = tmp["topscopeclass"]
+								}
+								//分包里中标单位和企业名一样
+								if pwinner == winner {
+									tmps = append(tmps, ptmp)
+								}else if conn.Exists(pwinner).Val() > 0 {
+									//分包里中标单位和企业名不一样,存量里匹配上了
+									bytes, _ := conn.Get(pwinner).Bytes()
+									cltmps := make([]map[string]interface{}, 0)
+									json.Unmarshal(bytes, &cltmps)
+									cltmps = append(cltmps, ptmp)
+									bytes2, _ := json.Marshal(cltmps)
+									//存量redis的key存在,合并  key :企业名 val :[]map
+									if err := conn.Set(pwinner, string(bytes2), 0).Err(); err != nil {
+										log.Println(err)
+									}
+								} else {
+									//分包和企业名不一样,而且存量没匹配上
+									cltmps := make([]map[string]interface{}, 0)
+									cltmps = append(cltmps, ptmp)
+									bytes2, _ := json.Marshal(cltmps)
+									//存量redis的key新增,新增  key :企业名 val :[]map
+									if err := conn.Set(pwinner, string(bytes2), 0).Err(); err != nil {
+										log.Println(err)
+									}
+								}
+							}
+						}
+					}
+				}
+				delete(tmp, "package")
+			}
 			if e_num > 0 {
 				//存量redis的key存在,累加更新
 				bytes, _ := conn.Get(winner).Bytes()
@@ -213,7 +260,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		SourceClient.DestoryMongoConn(SourceClientcc)
 		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
 		//发送udp 更新es段
-		nextNode("winnerent",timenow)
+		//nextNode("winnerent", timenow)
 	}
 
 }
@@ -346,13 +393,14 @@ func TimedTaskWinner() {
 	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		//timenow:=time.Now().Unix()
 		Fcconn := FClient.GetMgoConn(86400)
 		tmpLast := map[string]interface{}{}
 		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C(Config["mgo_qyk_c_w_new"]).Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 1)
+				t2.Reset(time.Minute * 5)
 				FClient.DestoryMongoConn(Fcconn)
 				continue
 			} else {
@@ -642,6 +690,7 @@ func TimedTaskWinner() {
 		}
 		FClient.DestoryMongoConn(Fcconn)
 		t2.Reset(time.Minute)
+		//nextNode("winnerent", timenow)
 	}
 }