fengweiqiang 5 년 전
부모
커밋
418083ff5b
1개의 변경된 파일501개의 추가작업 그리고 0개의 파일을 삭제
  1. 501 0
      udp_winner/timedTaskWinner.go

+ 501 - 0
udp_winner/timedTaskWinner.go

@@ -0,0 +1,501 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/garyburd/redigo/redis"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"gopkg.in/mgo.v2/bson"
+	"log"
+	mu "mfw/util"
+	"net"
+	"qfw/util"
+	"sort"
+	"strings"
+	"time"
+)
+
+
+
+func TaskWinner(mapinfo *map[string]interface{}) {
+	defer util.Catch()
+	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
+	if gtid == "" || lteid == "" {
+		log.Println(gtid, lteid, "参数错误")
+		return
+	}
+	GId, err := primitive.ObjectIDFromHex(gtid)
+	LtId, err2 := primitive.ObjectIDFromHex(lteid)
+	if err != nil || err2 != nil {
+		log.Println(gtid, lteid, "转换_id错误")
+		return
+	}
+	//udp的id区间查询bidding  中标人 中标联系人 中标联系电话
+	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
+	// (area地区-province省份 city城市-city城市 district区县-district区县)
+	// winneraddr-company_address企业地址
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
+		"_id": bson.M{
+			"$gte": GId,
+			"$lte": LtId,
+		},
+	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
+		"topscopeclass": 1, "winneraddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
+		return
+	}
+	overid := gtid
+	tmp := map[string]interface{}{}
+	for cursor.Next(&tmp) {
+		overid = tmp["_id"].(primitive.ObjectID).Hex()
+		log.Println(tmp["_id"])
+		if tmp["winner"] == nil || tmp["winner"] == "" {
+			continue
+		}
+		//redis查询是否存在
+		rdb := RedisPool.Get()
+		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+			//redis不存在存到临时表,定时任务处理
+			FClient.DbName = Config["mgodb_extract_kf"]
+			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
+				log.Println("FClient.Save err",tmpid)
+			}
+			//log.Println("get redis id err:定时任务处理", err, tmp)
+			if err := rdb.Close(); err != nil {
+				log.Println(err)
+			}
+			continue
+		} else {
+			//log.Println("redis get :", reply)
+			//redis存在
+			//log.Println(reply)
+			//reply = "5e0316b998a9abaf6535df3d"
+			//id, err := primitive.ObjectIDFromHex(reply)
+			//if err != nil {
+			//	log.Println("get redis id  Hex err:", err, tmp)
+			//	if err := rdb.Close(); err != nil {
+			//		log.Println(err)
+			//	}
+			//	continue
+			//}
+			if err := rdb.Close(); err != nil {
+				log.Println(err)
+			}
+			//拿到合并后的qyk
+			FClient.DbName = Config["mgodb_extract_kf"]
+			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+			if oldTmp == nil{
+				log.Println("redis id 不存在")
+				continue
+			}
+			//err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+			//	FindOne(context.TODO(), bson.M{"_id": id}).Decode(&oldTmp)
+			//if err != nil {
+			//	log.Println("qyk id err:", err, id)
+			//	continue
+			//}
+			//比较合并
+			//行业类型
+			tmpTopscopeclass := []string{}
+			tmpTopscopeclassMap := make(map[string]bool)
+
+			if oldTmp["industry"] == nil {
+				//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+			} else {
+				if v, ok := oldTmp["industry"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok {
+							tmpTopscopeclassMap[vvv] = true
+						}
+					}
+				}
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						}
+					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
+				}
+			}
+			sort.Strings(tmpTopscopeclass)
+			oldTmp["industry"] = tmpTopscopeclass
+			esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+			//更新行业类型
+			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
+				oldTmp["updatatime"] = time.Now().Unix()
+				//mongo更新
+				FClient.DbName =Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
+					log.Println("mongo更新err",esId)
+				}
+				//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+				//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
+				//	log.Println("mongo更新err:", err)
+				//}
+				//es更新
+				delete(oldTmp, "_id")
+				//esConn := elastic.GetEsConn()
+				//defer elastic.DestoryEsConn(esConn)
+				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+					log.Println("update es err:", err)
+				}
+				//log.Println( err2,err3)
+				continue
+			}
+			//联系方式合并
+			var tmpperson, winnertel string
+			tmpperson = tmp["winnerperson"].(string)
+			if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
+				winnertel = ""
+			} else {
+				if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+					winnertel = ""
+				} else {
+					winnertel = util.ObjToString(tmp["winnertel"])
+				}
+			}
+			contactMaps := make([]interface{}, 0)
+			if oldTmp["contact"] == nil {
+				tmpContact := make(map[string]interface{})
+				tmpContact["contact_person"] = tmpperson
+				tmpContact["contact_type"] = "项目联系人"
+				tmpContact["phone"] = winnertel
+				tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+				tmpContact["updatetime"] = time.Now().Unix()
+				contactMaps = append(contactMaps, tmpContact)
+			} else {
+				//对比前四项,相等丢弃
+				if v, ok := oldTmp["contact"].(primitive.A); ok {
+					var isNotUpdate bool
+					for _, vv := range v {
+						if vvv, ok := vv.(map[string]interface{}); ok {
+							if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+								vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+								isNotUpdate = true
+								vvv["updatetime"] = time.Now().Unix()
+							}
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+					if !isNotUpdate {
+						vvv := make(map[string]interface{})
+						vvv["contact_person"] = tmp["winnerperson"]
+						vvv["contact_type"] = "项目联系人"
+						vvv["phone"] = winnertel
+						vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						vvv["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, vvv)
+					}
+				}
+			}
+			oldTmp["contact"] = contactMaps
+			//mongo更新
+			oldTmp["updatatime"] = time.Now().Unix()
+			FClient.DbName=Config["mgodb_extract_kf"]
+			if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
+				log.Println("mongo更新 err",esId,oldTmp)
+			}
+			//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+			//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
+			//	log.Println("mongo更新 err :", err)
+			//}
+			//es更新
+			delete(oldTmp, "_id")
+			//esConn := elastic.GetEsConn()
+			//defer elastic.DestoryEsConn(esConn)
+			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+				log.Println("EsConn err :", err)
+			}
+			//log.Println( err2,err3)
+		}
+	}
+	//defer cursor.Close(context.Background())
+	//log.Println("合并执行完成", gtid, lteid, overid)
+	//if overid != lteid {
+	//	by, _ := json.Marshal(map[string]interface{}{
+	//		"gtid":  overid,
+	//		"lteid": lteid,
+	//		"stype": "",
+	//	})
+	//	if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+	//		IP:   net.ParseIP("127.0.0.1"),
+	//		Port: Updport,
+	//	}); e != nil {
+	//		log.Println(e)
+	//	}
+	//	log.Println("重新发送udp:", string(by))
+	//	return
+	//}
+	log.Println("合并执行完成 ok", gtid, lteid, overid)
+
+}
+
+
+//定时任务
+//1.存异常表
+//2.合并原始库新增
+func TimedTaskWinner() {
+	//time.Sleep(time.Hour*70)
+	t2 := time.NewTimer(time.Second * 5)
+	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
+		tmpLast := map[string]interface{}{}
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Minute * 5)
+				continue
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
+				}
+				//遍历临时表数据,匹配不到原始库存入异常表
+				tmp := make(map[string]interface{})
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
+					//再重新查找redis,存在发udp处理,不存在走新增合并
+					rdb := RedisPool.Get()
+					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
+						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
+						//redis存在发送udp进行处理
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
+						//存在的话删除tmp mongo表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("winner_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
+						}
+						if err := rdb.Close(); err != nil {
+							log.Println(err)
+						}
+						continue
+					} else {
+						if err = rdb.Close(); err != nil {
+							log.Println(err)
+						}
+					}
+					//查询redis不存在新增
+					FClient.DbName = Config["mgodb_enterprise"]
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
+					if resulttmp["_id"] == nil {
+						//log.Println(r)
+						//匹配不到原始库,存入异常表删除临时表
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("winner_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
+						}
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
+						}
+						continue
+					} else {
+						//log.Println(123)
+						//匹配到原始库,新增 resulttmp
+						if resulttmp["credit_no"] != nil {
+							if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
+								len(strings.TrimSpace(credit_no)) > 8 {
+								dataNo := strings.TrimSpace(credit_no)[2:8]
+								if Addrs[dataNo] != nil {
+									if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
+										if resulttmp["province"] == nil || resulttmp["province"] == "" {
+											resulttmp["province"] = v["province"]
+										}
+										resulttmp["city"] = v["city"]
+										resulttmp["district"] = v["district"]
+
+									}
+								}
+							}
+						}
+						contacts := make([]map[string]interface{}, 0)
+						contact := make(map[string]interface{}, 0)
+						if resulttmp["legal_person"] != nil {
+							contact["contact_person"] = resulttmp["legal_person"] //联系人
+						} else {
+							contact["contact_person"] = "" //联系人
+						}
+						contact["contact_type"] = "法定代表人" //法定代表人
+						//log.Println(1)
+						if resulttmp["annual_reports"] != nil {
+							bytes, err := json.Marshal(resulttmp["annual_reports"])
+							if err != nil {
+								log.Println("annual_reports err:", err)
+							}
+							phonetmp := make([]map[string]interface{}, 0)
+							err = json.Unmarshal(bytes, &phonetmp)
+							if err != nil {
+								log.Println("Unmarshal err:", err)
+							}
+							for _, vv := range phonetmp {
+								if vv["company_phone"] != nil {
+									if vv["company_phone"] == "" {
+										continue
+									} else {
+										contact["phone"] = vv["company_phone"] //联系电话
+										break
+									}
+								} else {
+									contact["phone"] = "" //联系电话
+								}
+
+							}
+						}
+						//log.Println(k, contact["phone"], resulttmp["_id"])
+						//time.Sleep(10 * time.Second)
+						if contact["phone"] == nil {
+							contact["phone"] = "" //联系电话
+						}
+						contact["topscopeclass"] = "企业公示"         //项目类型
+						contact["updatetime"] = time.Now().Unix() //更新时间
+						contacts = append(contacts, contact)
+						resulttmp["contact"] = contacts
+
+						savetmp := make(map[string]interface{}, 0)
+						for _, sk := range Fields {
+							if sk == "establish_date" {
+								if resulttmp[sk] != nil {
+									savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
+									continue
+								}
+							} else if sk == "capital" {
+								//log.Println(sk, resulttmp[sk])
+								savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
+								continue
+							} else if sk == "partners" {
+								//log.Println(sk, resulttmp[sk], )
+								//fmt.Println(reflect.TypeOf(resulttmp[sk]))
+								if resulttmp[sk] != nil {
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for i, _ := range ppms {
+											if ppms[i].(map[string]interface{})["stock_type"] != nil {
+												ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
+											}
+											delete(ppms[i].(map[string]interface{}), "identify_type")
+										}
+										savetmp[sk] = ppms
+
+									}
+								} else {
+									savetmp[sk] = []interface{}{}
+								}
+								continue
+							} else if sk == "_id" {
+								savetmp["tmp"+sk] = resulttmp[sk]
+								continue
+							} else if sk == "area_code" {
+								//行政区划代码
+								savetmp[sk] = fmt.Sprint(resulttmp[sk])
+								continue
+							} else if sk == "report_websites" {
+								//网址
+								if resulttmp["report_websites"] == nil {
+									savetmp["website"] = ""
+								} else {
+									report_websitesArr := []string{}
+									if ppms, ok := resulttmp[sk].(primitive.A); ok {
+										for _, v := range ppms {
+											if vvv, ok := v.(map[string]interface{}); ok {
+												if rv, ok := vvv["website_url"].(string); ok {
+													report_websitesArr = append(report_websitesArr, rv)
+												}
+											}
+										}
+									}
+									sort.Strings(report_websitesArr)
+									savetmp["website"] = strings.Join(report_websitesArr, ";")
+								}
+								continue
+							} else if sk == "wechat_accounts" {
+								savetmp[sk] = []interface{}{}
+								continue
+							}
+							if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
+								savetmp[sk] = ""
+							} else {
+								savetmp[sk] = resulttmp[sk]
+							}
+						}
+						//tmps = append(tmps, savetmp)
+						savetmp["updatatime"] = time.Now().Unix()
+						//保存mongo
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						if saveid != nil {
+							//保存redis
+							rc := RedisPool.Get()
+							var _id string
+							if v, ok := saveid.(primitive.ObjectID); ok {
+								_id = v.Hex()
+							}
+							if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
+								log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+							} else {
+								//保存es
+								delete(savetmp, "_id")
+								if err := rc.Close(); err != nil {
+									log.Println(err)
+								}
+
+								//esConn := elastic.GetEsConn()
+								//defer elastic.DestoryEsConn(esConn)
+								if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
+									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
+								} else {
+									//删除临时表
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
+									}
+								}
+							}
+						} else {
+							log.Println("save mongo err:", saveid, tmp["_id"])
+						}
+					}
+				}
+			}
+		}
+		t2.Reset(time.Minute)
+	}
+}