Pārlūkot izejas kodu

定时任务完善

apple 5 gadi atpakaļ
vecāks
revīzija
b21f6d2884
3 mainītis faili ar 54 papildinājumiem un 54 dzēšanām
  1. 4 0
      udp_winner/config.json
  2. 38 46
      udp_winner/timedTaskAgency.go
  3. 12 8
      udp_winner/timedTaskBuyer.go

+ 4 - 0
udp_winner/config.json

@@ -2,6 +2,10 @@
   "elasticsearch": "http://127.0.0.1:9800",
   "elasticsearch_index": "winner_enterprise",
   "elasticsearch_type": "winnerent",
+  "elasticsearch_buyer_index": "buyer_enterprise",
+  "elasticsearch_buyer_type": "buyerent",
+  "elasticsearch_agency_index": "agency_enterprise",
+  "elasticsearch_agency_type": "agencyent",
   "udpport": "127.0.0.1:12311",
   "port": "12311",
   "pool_size": "10",

+ 38 - 46
udp_winner/timedTaskAgency.go

@@ -39,8 +39,8 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 			"$gte": GId,
 			"$lte": LtId,
 		},
-	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
-		"topscopeclass": 1, "winneraddr": 1}).Iter()
+	}).Select(bson.M{"agency": 1, "agencytel": 1, "agencyperson": 1,
+		"topscopeclass": 1, "agencyaddr": 1}).Iter()
 	if cursor == nil {
 		log.Println(cursor)
 		return
@@ -50,15 +50,15 @@ func TaskAgency(mapinfo *map[string]interface{}) {
 	for cursor.Next(&tmp) {
 		overid = tmp["_id"].(primitive.ObjectID).Hex()
 		log.Println(tmp["_id"])
-		if tmp["winner"] == nil || tmp["winner"] == "" {
+		if tmp["agency"] == nil || tmp["agency"] == "" {
 			continue
 		}
 		//redis查询是否存在
 		rdb := RedisPool.Get()
-		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+		if reply, err := redis.String(rdb.Do("GET", tmp["agency"])); err != nil {
 			//redis不存在存到临时表,定时任务处理
 			FClient.DbName = Config["mgodb_extract_kf"]
-			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
+			if tmpid := FClient.Save("agency_new", tmp) ;tmpid==nil{
 				log.Println("FClient.Save err",tmpid)
 			}
 			//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -256,7 +256,7 @@ func TimedTaskAgency() {
 		Fcconn := FClient.GetMgoConn()
 		defer FClient.DestoryMongoConn(Fcconn)
 		tmpLast := map[string]interface{}{}
-		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
 			if !iter.Next(&tmpLast) {
 				//临时表无数据
 				log.Println("临时表无数据:")
@@ -266,7 +266,7 @@ func TimedTaskAgency() {
 				log.Println("临时表有数据:", tmpLast)
 				fconn := FClient.GetMgoConn()
 				defer FClient.DestoryMongoConn(fconn)
-				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("agency_new").Find(bson.M{
 					"_id": bson.M{
 						"$lte": tmpLast["_id"],
 					},
@@ -282,7 +282,7 @@ func TimedTaskAgency() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
+					if _, err := redis.String(rdb.Do("GET", tmp["agency"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
 						by, _ := json.Marshal(map[string]interface{}{
@@ -298,7 +298,7 @@ func TimedTaskAgency() {
 						}
 						//存在的话删除tmp mongo表
 						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.DeleteById("winner_new", tmpId); DeletedCount == 0 {
+						if DeletedCount := FClient.DeleteById("agency_new", tmpId); DeletedCount == 0 {
 							log.Println("删除临时表err:", DeletedCount)
 						}
 						if err := rdb.Close(); err != nil {
@@ -312,16 +312,16 @@ func TimedTaskAgency() {
 					}
 					//查询redis不存在新增
 					FClient.DbName = Config["mgodb_enterprise"]
-					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["agency"]})
 					if resulttmp["_id"] == nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
 						FClient.DbName = Config["mgodb_extract_kf"]
-						if saveid := FClient.Save("winner_err", tmp); saveid == nil {
+						if saveid := FClient.Save("agency_err", tmp); saveid == nil {
 							log.Println("存入异常表错误", tmp)
 						}
 						FClient.DbName = Config["mgodb_extract_kf"]
-						if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+						if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
 							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
@@ -388,35 +388,8 @@ func TimedTaskAgency() {
 						resulttmp["contact"] = contacts
 
 						savetmp := make(map[string]interface{}, 0)
-						for _, sk := range Fields {
-							if sk == "establish_date" {
-								if resulttmp[sk] != nil {
-									savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
-									continue
-								}
-							} else if sk == "capital" {
-								//log.Println(sk, resulttmp[sk])
-								savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
-								continue
-							} else if sk == "partners" {
-								//log.Println(sk, resulttmp[sk], )
-								//fmt.Println(reflect.TypeOf(resulttmp[sk]))
-								if resulttmp[sk] != nil {
-									if ppms, ok := resulttmp[sk].(primitive.A); ok {
-										for i, _ := range ppms {
-											if ppms[i].(map[string]interface{})["stock_type"] != nil {
-												ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
-											}
-											delete(ppms[i].(map[string]interface{}), "identify_type")
-										}
-										savetmp[sk] = ppms
-
-									}
-								} else {
-									savetmp[sk] = []interface{}{}
-								}
-								continue
-							} else if sk == "_id" {
+						for _, sk := range AgencyFields {
+							if sk == "_id" {
 								savetmp["tmp"+sk] = resulttmp[sk]
 								continue
 							} else if sk == "area_code" {
@@ -445,8 +418,27 @@ func TimedTaskAgency() {
 							} else if sk == "wechat_accounts" {
 								savetmp[sk] = []interface{}{}
 								continue
+							}else if sk=="agency_name" {
+								if resulttmp["company_name"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
+							}else if sk=="address"{
+								if resulttmp["company_address"] == nil {
+									savetmp[sk] = ""
+								}else {
+									savetmp[sk] = resulttmp[sk]
+								}
+								continue
 							}
-							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
+
+
+
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" &&
+								sk != "agency_name" && sk != "address" &&
+								sk != "contact" && sk != "report_websites" {
 								savetmp[sk] = ""
 							} else {
 								savetmp[sk] = resulttmp[sk]
@@ -464,8 +456,8 @@ func TimedTaskAgency() {
 							if v, ok := saveid.(primitive.ObjectID); ok {
 								_id = v.Hex()
 							}
-							if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
-								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
+							if _, err := rc.Do("SET", savetmp["agency_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["agency_name"], err)
 								if err := rc.Close(); err != nil {
 									log.Println(err)
 								}
@@ -478,12 +470,12 @@ func TimedTaskAgency() {
 
 								//esConn := elastic.GetEsConn()
 								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+								if _, err := EsConn.Index().Index(Config["elasticsearch_agency_index"]).Type(Config["elasticsearch_agency_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
 									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
 								} else {
 									//删除临时表
 									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+									if deleteNum := FClient.DeleteById("agency_new", tmpId); deleteNum == 0 {
 										log.Println("删除临时表失败", deleteNum)
 									}
 								}

+ 12 - 8
udp_winner/timedTaskBuyer.go

@@ -39,26 +39,27 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			"$gte": GId,
 			"$lte": LtId,
 		},
-	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
-		"topscopeclass": 1, "winneraddr": 1}).Iter()
+	}).Select(bson.M{"buyer": 1, "buyertel": 1, "buyerperson": 1,
+		"topscopeclass": 1, "buyeraddr": 1}).Iter()
 	if cursor == nil {
 		log.Println(cursor)
 		return
 	}
 	overid := gtid
 	tmp := map[string]interface{}{}
+	//binding tmp
 	for cursor.Next(&tmp) {
 		overid = tmp["_id"].(primitive.ObjectID).Hex()
 		log.Println(tmp["_id"])
-		if tmp["winner"] == nil || tmp["winner"] == "" {
+		if tmp["buyer"] == nil || tmp["buyer"] == "" {
 			continue
 		}
 		//redis查询是否存在
 		rdb := RedisPool.Get()
-		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+		if reply, err := redis.String(rdb.Do("GET", tmp["buyer"])); err != nil {
 			//redis不存在存到临时表,定时任务处理
 			FClient.DbName = Config["mgodb_extract_kf"]
-			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
+			if tmpid := FClient.Save("buyer_new", tmp) ;tmpid==nil{
 				log.Println("FClient.Save err",tmpid)
 			}
 			//log.Println("get redis id err:定时任务处理", err, tmp)
@@ -84,6 +85,7 @@ func TaskBuyer(mapinfo *map[string]interface{}) {
 			}
 			//拿到合并后的qyk
 			FClient.DbName = Config["mgodb_extract_kf"]
+			//存新表的数据
 			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
 			if oldTmp == nil{
 				log.Println("redis id 不存在")
@@ -282,7 +284,7 @@ func TimedTaskBuyer() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
+					if _, err := redis.String(rdb.Do("GET", tmp["buyer"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
 						by, _ := json.Marshal(map[string]interface{}{
@@ -313,7 +315,7 @@ func TimedTaskBuyer() {
 					//查询redis不存在新增
 					FClient.DbName = Config["mgodb_enterprise"]
 					//qyxy 企业库 两亿条
-					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["buyer"]})
 					if resulttmp["_id"] == nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
@@ -425,12 +427,14 @@ func TimedTaskBuyer() {
 								}else {
 									savetmp[sk] = resulttmp[sk]
 								}
+								continue
 							}else if sk=="address"{
 								if resulttmp["company_address"] == nil {
 									savetmp[sk] = ""
 								}else {
 									savetmp[sk] = resulttmp[sk]
 								}
+								continue
 							}
 
 
@@ -469,7 +473,7 @@ func TimedTaskBuyer() {
 
 								//esConn := elastic.GetEsConn()
 								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+								if _, err := EsConn.Index().Index(Config["elasticsearch_buyer_index"]).Type(Config["elasticsearch_buyer_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
 									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
 								} else {
 									//删除临时表