浏览代码

buyer,agency存量修改

apple 5 年之前
父节点
当前提交
c95c69b95b
共有 3 个文件被更改,包括 23 次插入8 次删除
  1. 4 4
      udp_winner/config.json
  2. 11 2
      udp_winner/timedTaskAgency.go
  3. 8 2
      udp_winner/timedTaskBuyer.go

+ 4 - 4
udp_winner/config.json

@@ -2,10 +2,10 @@
   "elasticsearch": "http://127.0.0.1:9800",
   "elasticsearch_index": "localhost_winner",
   "elasticsearch_type": "mytestwinner",
-  "elasticsearch_buyer_index": "buyer_enterprise",
-  "elasticsearch_buyer_type": "buyerent",
-  "elasticsearch_agency_index": "agency_enterprise",
-  "elasticsearch_agency_type": "agencyent",
+  "elasticsearch_buyer_index": "localhost_buyer",
+  "elasticsearch_buyer_type": "mytestbuyer",
+  "elasticsearch_agency_index": "localhost_agency",
+  "elasticsearch_agency_type": "mytestagency",
   "udpport": "127.0.0.1:12311",
   "port": "12311",
   "pool_size": "10",

+ 11 - 2
udp_winner/timedTaskAgency.go

@@ -60,8 +60,10 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			if tmp["agency"] == nil || tmp["agency"] == "" {
 				continue
 			}
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
 			bytes, _ := json.Marshal(tmp)
-			if err := conn.Set(tmp["_id"].(primitive.ObjectID).Hex(), string(bytes), 0).Err(); err != nil {
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
 				log.Println(err)
 			}
 		}
@@ -73,12 +75,13 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			iterator := scan.Iterator()
 			for iterator.Next() {
 				redisId := iterator.Val()                       //redis key
-				redisvalue := conn.Get(iterator.Val()).String() //redis val
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
 				tmp := make(map[string]interface{})
 				json.Unmarshal([]byte(redisvalue),&tmp)
 				//重复增量操作
 				//redis查询是否存在
 				rdb := RedisPool.Get()
+				rdb.Do("SELECT","1")
 				if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
 					//redis不存在,存到临时表,定时任务处理
 					FClient.DbName = Config["mgodb_extract_kf"]
@@ -222,6 +225,7 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
+			rdb.Do("SELECT","1")
 			if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
@@ -385,6 +389,8 @@ func TimedTaskAgency() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
+					rdb.Do("SELECT","1")
+
 					if _, err := redis.String(rdb.Do("GET", tmp["agency"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
@@ -554,8 +560,11 @@ func TimedTaskAgency() {
 						FClient.DbName = Config["mgodb_extract_kf"]
 						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
 						if saveid != nil {
+							//保存redis
 							//保存redis
 							rc := RedisPool.Get()
+							rc.Do("SELECT","1")
+
 							var _id string
 							if v, ok := saveid.(primitive.ObjectID); ok {
 								_id = v.Hex()

+ 8 - 2
udp_winner/timedTaskBuyer.go

@@ -60,8 +60,10 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			if tmp["buyer"] == nil || tmp["buyer"] == "" {
 				continue
 			}
+			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp,"_id")
 			bytes, _ := json.Marshal(tmp)
-			if err := conn.Set(tmp["_id"].(primitive.ObjectID).Hex(), string(bytes), 0).Err(); err != nil {
+			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
 				log.Println(err)
 			}
 		}
@@ -73,12 +75,13 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			iterator := scan.Iterator()
 			for iterator.Next() {
 				redisId := iterator.Val()                       //redis key
-				redisvalue := conn.Get(iterator.Val()).String() //redis val
+				redisvalue := conn.Get(iterator.Val()).Val() //redis val
 				tmp := make(map[string]interface{})
 				json.Unmarshal([]byte(redisvalue),&tmp)
 				//重复增量操作
 				//redis查询是否存在
 				rdb := RedisPool.Get()
+				rdb.Do("SELECT","1")
 				if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
 					//redis不存在,存到临时表,定时任务处理
 					FClient.DbName = Config["mgodb_extract_kf"]
@@ -250,6 +253,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
+			rdb.Do("SELECT","1")
 			if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
@@ -447,6 +451,7 @@ func TimedTaskBuyer() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
+					rdb.Do("SELECT","1")
 					if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
@@ -619,6 +624,7 @@ func TimedTaskBuyer() {
 						if saveid != nil {
 							//保存redis
 							rc := RedisPool.Get()
+							rc.Do("SELECT","1")
 							var _id string
 							if v, ok := saveid.(primitive.ObjectID); ok {
 								_id = v.Hex()