Prechádzať zdrojové kódy

buyer agency 增量存量 修改

apple 5 rokov pred
rodič
commit
c0c718dd06

+ 2 - 0
udp_winner/config.json

@@ -18,5 +18,7 @@
   "mgourl": "127.0.0.1:27017",
   "mgodb_extract_kf": "extract_kf",
   "mgo_qyk_c": "enterprise_qyxy",
+  "mgo_qyk_buyer": "buyer_qyxy",
+  "mgo_qyk_agency": "gency_qyxy",
   "redis": "127.0.0.1:6379"
 }

+ 6 - 4
udp_winner/main.go

@@ -51,11 +51,11 @@ func init() {
 
 	BuyerFields = []string{"_id", "contact", "type", "ranks", "buyerclass",
 		"address", "district", "city", "province", "area_code", "credit_no", "buyer_name",
-		"history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
+		"history_name", "wechat_accounts", "website", "report_websites"}
 
 	AgencyFields = []string{"_id", "contact", "type", "ranks",
 		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
-		"history_name", "topscopeclass", "wechat_accounts", "website", "report_websites"}
+		"history_name", "wechat_accounts", "website", "report_websites"}
 	var err error
 	pool_size, _ := strconv.Atoi(Config["pool_size"])
 
@@ -135,6 +135,8 @@ func main() {
 	log.Println("Udp服务监听", updport)
 	log.Println("发送端口port:", Updport)
 	go TimedTaskWinner() //定时任务
+	go TimedTaskBuyer() //定时任务
+	go TimedTaskAgency() //定时任务
 	c := make(chan int, 1)
 	<-c
 
@@ -163,9 +165,9 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 				if key == "winner" {
 					go TaskWinner(tmp)
 				} else if key == "buyer" {
-
+					go TaskBuyer(tmp)
 				} else if key == "agency" {
-
+					go TaskAgency(tmp)
 				}
 			}
 		}

+ 15 - 15
udp_winner/timedTaskAgency.go

@@ -81,7 +81,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				//重复增量操作
 				//redis查询是否存在
 				rdb := RedisPool.Get()
-				rdb.Do("SELECT","1")
+				rdb.Do("SELECT","3")
 				if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
 					//redis不存在,存到临时表,定时任务处理
 					FClient.DbName = Config["mgodb_extract_kf"]
@@ -101,7 +101,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 					}
 					//拿到合并后的qyk
 					FClient.DbName = Config["mgodb_extract_kf"]
-					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					oldTmp := FClient.FindById(Config["mgo_qyk_agency"], reply)
 					if oldTmp == nil {
 						log.Println("存量 redis id 不存在",reply,tmp["agency"])
 						continue
@@ -133,7 +133,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 						oldTmp["updatatime"] = time.Now().Unix()
 						//mongo更新
 						FClient.DbName = Config["mgodb_extract_kf"]
-						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
 							log.Println("mongo更新err", esId)
 						}
 
@@ -198,7 +198,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 					//mongo更新
 					oldTmp["updatatime"] = time.Now().Unix()
 					FClient.DbName = Config["mgodb_extract_kf"]
-					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
@@ -219,13 +219,13 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 		tmp := map[string]interface{}{}
 		for cursor.Next(&tmp) {
 			overid = tmp["_id"].(primitive.ObjectID).Hex()
-			log.Println(tmp["_id"])
+			//log.Println(tmp["_id"])
 			if tmp["agency"] == nil || tmp["agency"] == "" {
 				continue
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
-			rdb.Do("SELECT","1")
+			rdb.Do("SELECT","3")
 			if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
@@ -243,7 +243,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				}
 				//拿到合并后的qyk
 				FClient.DbName = Config["mgodb_extract_kf"]
-				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				oldTmp := FClient.FindById(Config["mgo_qyk_agency"], reply)
 				if oldTmp == nil {
 					log.Println("redis id 不存在")
 					continue
@@ -273,7 +273,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 					oldTmp["updatatime"] = time.Now().Unix()
 					//mongo更新
 					FClient.DbName = Config["mgodb_extract_kf"]
-					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("mongo更新err", esId)
 					}
 
@@ -336,7 +336,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 				//mongo更新
 				oldTmp["updatatime"] = time.Now().Unix()
 				FClient.DbName = Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+				if !FClient.UpdateById(Config["mgo_qyk_agency"], esId, bson.M{"$set": oldTmp}) {
 					log.Println("mongo更新 err", esId, oldTmp)
 				}
 				//es更新
@@ -367,7 +367,7 @@ func TimedTaskAgency() {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 5)
+				t2.Reset(time.Second * 10)
 				continue
 			} else {
 				log.Println("临时表有数据:", tmpLast)
@@ -389,7 +389,7 @@ func TimedTaskAgency() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					rdb.Do("SELECT","1")
+					rdb.Do("SELECT","3")
 
 					if _, err := redis.String(rdb.Do("GET", tmp["agency"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
@@ -532,14 +532,14 @@ func TimedTaskAgency() {
 								if resulttmp["company_name"] == nil {
 									savetmp[sk] = ""
 								}else {
-									savetmp[sk] = resulttmp[sk]
+									savetmp[sk] = resulttmp["company_name"]
 								}
 								continue
 							}else if sk=="address"{
 								if resulttmp["company_address"] == nil {
 									savetmp[sk] = ""
 								}else {
-									savetmp[sk] = resulttmp[sk]
+									savetmp[sk] = resulttmp["company_address"]
 								}
 								continue
 							}
@@ -558,12 +558,12 @@ func TimedTaskAgency() {
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
 						FClient.DbName = Config["mgodb_extract_kf"]
-						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						saveid := FClient.Save(Config["mgo_qyk_agency"], savetmp)
 						if saveid != nil {
 							//保存redis
 							//保存redis
 							rc := RedisPool.Get()
-							rc.Do("SELECT","1")
+							rc.Do("SELECT","3")
 
 							var _id string
 							if v, ok := saveid.(primitive.ObjectID); ok {

+ 56 - 39
udp_winner/timedTaskBuyer.go

@@ -41,7 +41,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			"$lte": LtId,
 		},
 	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1,
-		"topscopeclass": 1, "buyeraddr": 1}).Iter()
+		"topscopeclass": 1, "buyeraddr": 1,"buyerclass":1}).Iter()
 	if cursor == nil {
 		log.Println(cursor)
 		return
@@ -81,7 +81,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				//重复增量操作
 				//redis查询是否存在
 				rdb := RedisPool.Get()
-				rdb.Do("SELECT","1")
+				rdb.Do("SELECT","2")
 				if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
 					//redis不存在,存到临时表,定时任务处理
 					FClient.DbName = Config["mgodb_extract_kf"]
@@ -101,7 +101,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 					}
 					//拿到合并后的qyk
 					FClient.DbName = Config["mgodb_extract_kf"]
-					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+					oldTmp := FClient.FindById(Config["mgo_qyk_buyer"], reply)
 					if oldTmp == nil {
 						log.Println("存量 redis id 不存在",reply,tmp["buyer"])
 						continue
@@ -122,31 +122,31 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 					sort.Strings(tmpTopscopeclass)
 
 					//更新buyerclass
-
-
 					esId := oldTmp["_id"].(primitive.ObjectID).Hex()
 					//更新行业类型
 					if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
-
-
 						//更新buyerclass合并
 						if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
 							//无值,不更新
 						}else {
-							//有值
 							var buyerclass_new,buyerclass_old string
 							buyerclass_new = tmp["buyerclass"].(string)
 							buyerclass_old = oldTmp["buyerclass"].(string)
-							if strings.Contains(buyerclass_old, buyerclass_new) {
-								oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+							if buyerclass_old=="" {
+								oldTmp["buyerclass"] = buyerclass_new
+							}else {
+								if buyerclass_new!=buyerclass_old {
+									if !strings.Contains(buyerclass_old, buyerclass_new) {
+										oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+									}
+								}
 							}
-
 						}
 
 						oldTmp["updatatime"] = time.Now().Unix()
 						//mongo更新
 						FClient.DbName = Config["mgodb_extract_kf"]
-						if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+						if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
 							log.Println("mongo更新err", esId)
 						}
 
@@ -213,20 +213,24 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
 						//无值,不更新
 					}else {
-						//有值
 						var buyerclass_new,buyerclass_old string
 						buyerclass_new = tmp["buyerclass"].(string)
 						buyerclass_old = oldTmp["buyerclass"].(string)
-						if strings.Contains(buyerclass_old, buyerclass_new) {
-							oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+						if buyerclass_old=="" {
+							oldTmp["buyerclass"] = buyerclass_new
+						}else {
+							if buyerclass_new!=buyerclass_old {
+								if !strings.Contains(buyerclass_old, buyerclass_new) {
+									oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+								}
+							}
 						}
-
 					}
 
 					//mongo更新
 					oldTmp["updatatime"] = time.Now().Unix()
 					FClient.DbName = Config["mgodb_extract_kf"]
-					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("存量  mongo更新 err", esId, oldTmp)
 					}
 					//es更新
@@ -247,15 +251,19 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 		tmp := map[string]interface{}{}
 		for cursor.Next(&tmp) {
 			overid = tmp["_id"].(primitive.ObjectID).Hex()
-			log.Println(tmp["_id"])
+			//log.Println(tmp["_id"])
 			if tmp["buyer"] == nil || tmp["buyer"] == "" {
 				continue
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
-			rdb.Do("SELECT","1")
+			rdb.Do("SELECT","2")
 			if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
+				if tmp["buyer"]=="无" {
+					log.Println("redis不存在:",tmp["buyer"])
+				}
+
 				FClient.DbName = Config["mgodb_extract_kf"]
 				if tmpid := FClient.Save("buyer_new", tmp); tmpid == nil {
 					log.Println("FClient.Save err", tmpid)
@@ -271,7 +279,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				}
 				//拿到合并后的qyk
 				FClient.DbName = Config["mgodb_extract_kf"]
-				oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+				oldTmp := FClient.FindById(Config["mgo_qyk_buyer"], reply)
 				if oldTmp == nil {
 					log.Println("redis id 不存在")
 					continue
@@ -294,11 +302,11 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				}
 				sort.Strings(tmpTopscopeclass)
 
-
 				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
 
-
-
+				if tmp["buyer"]=="无" {
+					log.Println("无无无无无无无")
+				}
 
 				//更新行业类型 buyerclass合并
 				if tmp["buyerperson"] == nil || tmp["buyerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["buyerperson"])) {
@@ -307,20 +315,24 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 					if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
 						//无值,不更新
 					}else {
-						//有值
 						var buyerclass_new,buyerclass_old string
 						buyerclass_new = tmp["buyerclass"].(string)
 						buyerclass_old = oldTmp["buyerclass"].(string)
-						if strings.Contains(buyerclass_old, buyerclass_new) {
-							oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+						if buyerclass_old=="" {
+							oldTmp["buyerclass"] = buyerclass_new
+						}else {
+							if buyerclass_new!=buyerclass_old {
+								if !strings.Contains(buyerclass_old, buyerclass_new) {
+									oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+								}
+							}
 						}
-
 					}
 
 					oldTmp["updatatime"] = time.Now().Unix()
 					//mongo更新
 					FClient.DbName = Config["mgodb_extract_kf"]
-					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+					if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
 						log.Println("mongo更新err", esId)
 					}
 
@@ -385,20 +397,24 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 				if tmp["buyerclass"] == nil || tmp["buyerclass"] == "" {
 					//无值,不更新
 				}else {
-					//有值
 					var buyerclass_new,buyerclass_old string
 					buyerclass_new = tmp["buyerclass"].(string)
 					buyerclass_old = oldTmp["buyerclass"].(string)
-					if strings.Contains(buyerclass_old, buyerclass_new) {
-						oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+					if buyerclass_old=="" {
+						oldTmp["buyerclass"] = buyerclass_new
+					}else {
+						if buyerclass_new!=buyerclass_old {
+							if !strings.Contains(buyerclass_old, buyerclass_new) {
+								oldTmp["buyerclass"] = buyerclass_old + ","+buyerclass_new //采购单位类型
+							}
+						}
 					}
-
 				}
 
 				//mongo更新
 				oldTmp["updatatime"] = time.Now().Unix()
 				FClient.DbName = Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+				if !FClient.UpdateById(Config["mgo_qyk_buyer"], esId, bson.M{"$set": oldTmp}) {
 					log.Println("mongo更新 err", esId, oldTmp)
 				}
 				//es更新
@@ -429,7 +445,8 @@ func TimedTaskBuyer() {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 5)
+				//t2.Reset(time.Second * 10) //增量
+				t2.Reset(time.Minute * 5) //存量
 				continue
 			} else {
 				log.Println("临时表有数据:", tmpLast)
@@ -451,7 +468,7 @@ func TimedTaskBuyer() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					rdb.Do("SELECT","1")
+					rdb.Do("SELECT","2")
 					if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
@@ -594,14 +611,14 @@ func TimedTaskBuyer() {
 								if resulttmp["company_name"] == nil {
 									savetmp[sk] = ""
 								}else {
-									savetmp[sk] = resulttmp[sk]
+									savetmp[sk] = resulttmp["company_name"]
 								}
 								continue
 							}else if sk=="address"{
 								if resulttmp["company_address"] == nil {
 									savetmp[sk] = ""
 								}else {
-									savetmp[sk] = resulttmp[sk]
+									savetmp[sk] = resulttmp["company_address"]
 								}
 								continue
 							}
@@ -620,11 +637,11 @@ func TimedTaskBuyer() {
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
 						FClient.DbName = Config["mgodb_extract_kf"]
-						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						saveid := FClient.Save(Config["mgo_qyk_buyer"], savetmp)
 						if saveid != nil {
 							//保存redis
 							rc := RedisPool.Get()
-							rc.Do("SELECT","1")
+							rc.Do("SELECT","2")
 							var _id string
 							if v, ok := saveid.(primitive.ObjectID); ok {
 								_id = v.Hex()

+ 1 - 0
udp_winner/timedTaskWinner.go

@@ -299,6 +299,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 				}
 				sort.Strings(tmpTopscopeclass)
 				oldTmp["industry"] = tmpTopscopeclass
+
 				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
 				//更新行业类型
 				if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {