Explorar o código

三库初始化

fengweiqiang %!s(int64=5) %!d(string=hai) anos
pai
achega
76aef8e1e0
Modificáronse 2 ficheiros con 14 adicións e 502 borrados
  1. 14 234
      udp_winner/main.go
  2. 0 268
      udp_winner/timedTask.go

+ 14 - 234
udp_winner/main.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"github.com/garyburd/redigo/redis"
 	"go.mongodb.org/mongo-driver/bson"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 	es "gopkg.in/olivere/elastic.v1"
 	"log"
 	mu "mfw/util"
@@ -15,7 +14,6 @@ import (
 	"regexp"
 	"strconv"
 
-	"sort"
 	"strings"
 	"time"
 )
@@ -60,7 +58,7 @@ func init() {
 	FClient = new(MongodbSim)
 	FClient.MongodbAddr = Config["mgourl"]
 	FClient.Size = pool_size
-	FClient.DbName =Config["mgodb_extract_kf"]
+	FClient.DbName = Config["mgodb_extract_kf"]
 	//mongodbSim.DbName = "qfw"
 	FClient.InitPool()
 	FClientmgoConn := FClient.GetMgoConn()
@@ -114,7 +112,7 @@ func main() {
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
 	log.Println("发送端口port:", Updport)
-	go TimedTask() //定时任务
+	go TimedTaskWinner() //定时任务
 	c := make(chan int, 1)
 	<-c
 
@@ -132,242 +130,24 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
 			return
 		} else if tmp != nil {
-			if key,ok := (*tmp)["key"].(string);ok{
+			if key, ok := (*tmp)["key"].(string); ok {
 				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-			}else {
+			} else {
 				udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
 			}
-			go task(tmp)
-		}
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("发送成功", string(data))
-	}
-}
-func task(mapinfo *map[string]interface{}) {
-	defer util.Catch()
-	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
-	if gtid == "" || lteid == "" {
-		log.Println(gtid, lteid, "参数错误")
-		return
-	}
-	GId, err := primitive.ObjectIDFromHex(gtid)
-	LtId, err2 := primitive.ObjectIDFromHex(lteid)
-	if err != nil || err2 != nil {
-		log.Println(gtid, lteid, "转换_id错误")
-		return
-	}
-	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
-	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
-	// (area地区-province省份 city城市-city城市 district区县-district区县)
-	// winneraddr-company_address企业地址
-	SourceClientcc := SourceClient.GetMgoConn()
-	defer SourceClient.DestoryMongoConn(SourceClientcc)
-	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
-		"_id": bson.M{
-			"$gte": GId,
-			"$lte": LtId,
-		},
-	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
-		"topscopeclass": 1, "winneraddr": 1}).Iter()
-	if cursor == nil {
-		log.Println(cursor)
-		return
-	}
-	overid := gtid
-	tmp := map[string]interface{}{}
-	for cursor.Next(&tmp) {
-		overid = tmp["_id"].(primitive.ObjectID).Hex()
-		log.Println(tmp["_id"])
-		if tmp["winner"] == nil || tmp["winner"] == "" {
-			continue
-		}
-		//redis查询是否存在
-		rdb := RedisPool.Get()
-		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
-			//redis不存在存到临时表,定时任务处理
-			FClient.DbName = Config["mgodb_extract_kf"]
-			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
-				log.Println("FClient.Save err",tmpid)
-			}
-			//log.Println("get redis id err:定时任务处理", err, tmp)
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			continue
-		} else {
-			//log.Println("redis get :", reply)
-			//redis存在
-			//log.Println(reply)
-			//reply = "5e0316b998a9abaf6535df3d"
-			//id, err := primitive.ObjectIDFromHex(reply)
-			//if err != nil {
-			//	log.Println("get redis id  Hex err:", err, tmp)
-			//	if err := rdb.Close(); err != nil {
-			//		log.Println(err)
-			//	}
-			//	continue
-			//}
-			if err := rdb.Close(); err != nil {
-				log.Println(err)
-			}
-			//拿到合并后的qyk
-			FClient.DbName = Config["mgodb_extract_kf"]
-			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
-			if oldTmp == nil{
-				log.Println("redis id 不存在")
-				continue
-			}
-			//err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	FindOne(context.TODO(), bson.M{"_id": id}).Decode(&oldTmp)
-			//if err != nil {
-			//	log.Println("qyk id err:", err, id)
-			//	continue
-			//}
-			//比较合并
-			//行业类型
-			tmpTopscopeclass := []string{}
-			tmpTopscopeclassMap := make(map[string]bool)
+			//data_type:winner data_type:buyer data_type:agency
+			//data_info:save//存量   data_info:add //增量
+			if key, ok := (*tmp)["data_type"].(string); ok {
+				if key == "winner" {
+					go TaskWinner(tmp)
+				} else if key == "buyer" {
+
+				} else if key == "agency" {
 
-			if oldTmp["industry"] == nil {
-				//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-						}
-					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
-					}
-				}
-			} else {
-				if v, ok := oldTmp["industry"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok {
-							tmpTopscopeclassMap[vvv] = true
-						}
-					}
-				}
-				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-						}
-					}
-					for k := range tmpTopscopeclassMap {
-						tmpTopscopeclass = append(tmpTopscopeclass, k)
-					}
-				}
-			}
-			sort.Strings(tmpTopscopeclass)
-			oldTmp["industry"] = tmpTopscopeclass
-			esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-			//更新行业类型
-			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-				oldTmp["updatatime"] = time.Now().Unix()
-				//mongo更新
-				FClient.DbName =Config["mgodb_extract_kf"]
-				if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-					log.Println("mongo更新err",esId)
-				}
-				//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-				//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-				//	log.Println("mongo更新err:", err)
-				//}
-				//es更新
-				delete(oldTmp, "_id")
-				//esConn := elastic.GetEsConn()
-				//defer elastic.DestoryEsConn(esConn)
-				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("update es err:", err)
-				}
-				//log.Println( err2,err3)
-				continue
-			}
-			//联系方式合并
-			var tmpperson, winnertel string
-			tmpperson = tmp["winnerperson"].(string)
-			if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
-				winnertel = ""
-			} else {
-				if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
-					winnertel = ""
-				} else {
-					winnertel = util.ObjToString(tmp["winnertel"])
-				}
-			}
-			contactMaps := make([]interface{}, 0)
-			if oldTmp["contact"] == nil {
-				tmpContact := make(map[string]interface{})
-				tmpContact["contact_person"] = tmpperson
-				tmpContact["contact_type"] = "项目联系人"
-				tmpContact["phone"] = winnertel
-				tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-				tmpContact["updatetime"] = time.Now().Unix()
-				contactMaps = append(contactMaps, tmpContact)
-			} else {
-				//对比前四项,相等丢弃
-				if v, ok := oldTmp["contact"].(primitive.A); ok {
-					var isNotUpdate bool
-					for _, vv := range v {
-						if vvv, ok := vv.(map[string]interface{}); ok {
-							if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-								vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-								isNotUpdate = true
-								vvv["updatetime"] = time.Now().Unix()
-							}
-							contactMaps = append(contactMaps, vvv)
-						}
-					}
-					if !isNotUpdate {
-						vvv := make(map[string]interface{})
-						vvv["contact_person"] = tmp["winnerperson"]
-						vvv["contact_type"] = "项目联系人"
-						vvv["phone"] = winnertel
-						vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-						vvv["updatetime"] = time.Now().Unix()
-						contactMaps = append(contactMaps, vvv)
-					}
 				}
 			}
-			oldTmp["contact"] = contactMaps
-			//mongo更新
-			oldTmp["updatatime"] = time.Now().Unix()
-			FClient.DbName=Config["mgodb_extract_kf"]
-			if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
-				log.Println("mongo更新 err",esId,oldTmp)
-			}
-			//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-			//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-			//	log.Println("mongo更新 err :", err)
-			//}
-			//es更新
-			delete(oldTmp, "_id")
-			//esConn := elastic.GetEsConn()
-			//defer elastic.DestoryEsConn(esConn)
-			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-				log.Println("EsConn err :", err)
-			}
-			//log.Println( err2,err3)
 		}
+	case mu.OP_NOOP: //下个节点回应
+		log.Println("发送成功", string(data))
 	}
-	//defer cursor.Close(context.Background())
-	//log.Println("合并执行完成", gtid, lteid, overid)
-	//if overid != lteid {
-	//	by, _ := json.Marshal(map[string]interface{}{
-	//		"gtid":  overid,
-	//		"lteid": lteid,
-	//		"stype": "",
-	//	})
-	//	if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-	//		IP:   net.ParseIP("127.0.0.1"),
-	//		Port: Updport,
-	//	}); e != nil {
-	//		log.Println(e)
-	//	}
-	//	log.Println("重新发送udp:", string(by))
-	//	return
-	//}
-	log.Println("合并执行完成 ok", gtid, lteid, overid)
-
 }

+ 0 - 268
udp_winner/timedTask.go

@@ -1,268 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"fmt"
-	"github.com/garyburd/redigo/redis"
-	"go.mongodb.org/mongo-driver/bson/primitive"
-	"gopkg.in/mgo.v2/bson"
-	"log"
-	mu "mfw/util"
-	"net"
-	"sort"
-	"strings"
-	"time"
-)
-
-//定时任务
-//1.存异常表
-//2.合并原始库新增
-func TimedTask() {
-	//time.Sleep(time.Hour*70)
-	t2 := time.NewTimer(time.Second * 5)
-	for range t2.C {
-		Fcconn := FClient.GetMgoConn()
-		defer FClient.DestoryMongoConn(Fcconn)
-		tmpLast := map[string]interface{}{}
-		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
-			if !iter.Next(&tmpLast) {
-				//临时表无数据
-				log.Println("临时表无数据:")
-				t2.Reset(time.Minute * 5)
-				continue
-			} else {
-				log.Println("临时表有数据:", tmpLast)
-				fconn := FClient.GetMgoConn()
-				defer FClient.DestoryMongoConn(fconn)
-				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
-					"_id": bson.M{
-						"$lte": tmpLast["_id"],
-					},
-				}).Sort("_id").Iter()
-				if cursor == nil {
-					log.Println("查询失败")
-					t2.Reset(time.Second * 5)
-					continue
-				}
-				//遍历临时表数据,匹配不到原始库存入异常表
-				tmp := make(map[string]interface{})
-				for cursor.Next(&tmp) {
-					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
-					//再重新查找redis,存在发udp处理,不存在走新增合并
-					rdb := RedisPool.Get()
-					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
-						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
-						//redis存在发送udp进行处理
-						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":  tmpId,
-							"lteid": tmpId,
-							"stype": "",
-						})
-						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-							IP:   net.ParseIP("127.0.0.1"),
-							Port: Updport,
-						}); e != nil {
-							log.Println(e)
-						}
-						//存在的话删除tmp mongo表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if DeletedCount := FClient.DeleteById("winner_new", tmpId); DeletedCount == 0 {
-							log.Println("删除临时表err:", DeletedCount)
-						}
-						if err := rdb.Close(); err != nil {
-							log.Println(err)
-						}
-						continue
-					} else {
-						if err = rdb.Close(); err != nil {
-							log.Println(err)
-						}
-					}
-					//查询redis不存在新增
-					FClient.DbName = Config["mgodb_enterprise"]
-					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
-					if resulttmp != nil {
-						//log.Println(r)
-						//匹配不到原始库,存入异常表删除临时表
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if saveid := FClient.Save("winner_err", tmp); saveid == nil {
-							log.Println("存入异常表错误", tmp)
-						}
-						FClient.DbName = Config["mgodb_extract_kf"]
-						if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
-							log.Println("删除临时表错误", deleteNum)
-						}
-						continue
-					} else {
-						//log.Println(123)
-						//匹配到原始库,新增 resulttmp
-						if resulttmp["credit_no"] != nil {
-							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
-								len(strings.TrimSpace(credit_no)) > 8 {
-								dataNo := strings.TrimSpace(credit_no)[2:8]
-								if Addrs[dataNo] != nil {
-									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
-										if resulttmp["province"] == nil || resulttmp["province"] == "" {
-											resulttmp["province"] = v["province"]
-										}
-										resulttmp["city"] = v["city"]
-										resulttmp["district"] = v["district"]
-
-									}
-								}
-							}
-						}
-						contacts := make([]map[string]interface{}, 0)
-						contact := make(map[string]interface{}, 0)
-						if resulttmp["legal_person"] != nil {
-							contact["contact_person"] = resulttmp["legal_person"] //联系人
-						} else {
-							contact["contact_person"] = "" //联系人
-						}
-						contact["contact_type"] = "法定代表人" //法定代表人
-						//log.Println(1)
-						if resulttmp["annual_reports"] != nil {
-							bytes, err := json.Marshal(resulttmp["annual_reports"])
-							if err != nil {
-								log.Println("annual_reports err:", err)
-							}
-							phonetmp := make([]map[string]interface{}, 0)
-							err = json.Unmarshal(bytes, &phonetmp)
-							if err != nil {
-								log.Println("Unmarshal err:", err)
-							}
-							for _, vv := range phonetmp {
-								if vv["company_phone"] != nil {
-									if vv["company_phone"] == "" {
-										continue
-									} else {
-										contact["phone"] = vv["company_phone"] //联系电话
-										break
-									}
-								} else {
-									contact["phone"] = "" //联系电话
-								}
-
-							}
-						}
-						//log.Println(k, contact["phone"], resulttmp["_id"])
-						//time.Sleep(10 * time.Second)
-						if contact["phone"] == nil {
-							contact["phone"] = "" //联系电话
-						}
-						contact["topscopeclass"] = "企业公示"         //项目类型
-						contact["updatetime"] = time.Now().Unix() //更新时间
-						contacts = append(contacts, contact)
-						resulttmp["contact"] = contacts
-
-						savetmp := make(map[string]interface{}, 0)
-						for _, sk := range Fields {
-							if sk == "establish_date" {
-								if resulttmp[sk] != nil {
-									savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
-									continue
-								}
-							} else if sk == "capital" {
-								//log.Println(sk, resulttmp[sk])
-								savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
-								continue
-							} else if sk == "partners" {
-								//log.Println(sk, resulttmp[sk], )
-								//fmt.Println(reflect.TypeOf(resulttmp[sk]))
-								if resulttmp[sk] != nil {
-									if ppms, ok := resulttmp[sk].(primitive.A); ok {
-										for i, _ := range ppms {
-											if ppms[i].(map[string]interface{})["stock_type"] != nil {
-												ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
-											}
-											delete(ppms[i].(map[string]interface{}), "identify_type")
-										}
-										savetmp[sk] = ppms
-
-									}
-								} else {
-									savetmp[sk] = []interface{}{}
-								}
-								continue
-							} else if sk == "_id" {
-								savetmp["tmp"+sk] = resulttmp[sk]
-								continue
-							} else if sk == "area_code" {
-								//行政区划代码
-								savetmp[sk] = fmt.Sprint(resulttmp[sk])
-								continue
-							} else if sk == "report_websites" {
-								//网址
-								if resulttmp["report_websites"] == nil {
-									savetmp["website"] = ""
-								} else {
-									report_websitesArr := []string{}
-									if ppms, ok := resulttmp[sk].(primitive.A); ok {
-										for _, v := range ppms {
-											if vvv, ok := v.(map[string]interface{}); ok {
-												if rv, ok := vvv["website_url"].(string); ok {
-													report_websitesArr = append(report_websitesArr, rv)
-												}
-											}
-										}
-									}
-									sort.Strings(report_websitesArr)
-									savetmp["website"] = strings.Join(report_websitesArr, ";")
-								}
-								continue
-							} else if sk == "wechat_accounts" {
-								savetmp[sk] = []interface{}{}
-								continue
-							}
-							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
-								savetmp[sk] = ""
-							} else {
-								savetmp[sk] = resulttmp[sk]
-							}
-						}
-						//tmps = append(tmps, savetmp)
-						savetmp["updatatime"] = time.Now().Unix()
-						//保存mongo
-						FClient.DbName = Config["mgodb_extract_kf"]
-						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
-						if saveid != nil {
-							//保存redis
-							rc := RedisPool.Get()
-							var _id string
-							if v, ok := saveid.(primitive.ObjectID); ok {
-								_id = v.Hex()
-							}
-							if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
-								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-							} else {
-								//保存es
-								delete(savetmp, "_id")
-								if err := rc.Close(); err != nil {
-									log.Println(err)
-								}
-
-								//esConn := elastic.GetEsConn()
-								//defer elastic.DestoryEsConn(esConn)
-								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-								} else {
-									//删除临时表
-									FClient.DbName = Config["mgodb_extract_kf"]
-									if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
-										log.Println("删除临时表失败", deleteNum)
-									}
-								}
-							}
-						} else {
-							log.Println("save mongo err:", saveid, tmp["_id"])
-						}
-					}
-				}
-			}
-		}
-		t2.Reset(time.Minute)
-	}
-}