浏览代码

修改接口参数

fengweiqiang 5 年之前
父节点
当前提交
5a436b88e2
共有 4 个文件被更改,包括 217 次插入217 次删除
  1. 95 91
      udp_winner/main.go
  2. 0 2
      udp_winner/timedTaskAgency.go
  3. 0 2
      udp_winner/timedTaskBuyer.go
  4. 122 122
      udp_winner/timedTaskWinner.go

+ 95 - 91
udp_winner/main.go

@@ -67,43 +67,67 @@ func init() {
 	AgencyFields = []string{"_id", "contact", "type", "ranks",
 		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
 		"history_name", "wechat_accounts", "website", "report_websites"}
-	pool_size, _ := strconv.Atoi(Config["pool_size"])
-
-	//mongo init
-	SourceClient = new(mongodb.MongodbSim)
-	SourceClient.MongodbAddr = Config["mgoinit"]
-	SourceClient.Size = pool_size
-	SourceClient.DbName = Config["mgodb_bidding"]
-	//mongodbSim.DbName = "qfw"
-	SourceClient.InitPool()
 
-	FClient = new(mongodb.MongodbSim)
-	FClient.MongodbAddr = Config["mgourl"]
-	FClient.Size = pool_size
-	FClient.DbName = Config["mgodb_extract_kf"]
-	//mongodbSim.DbName = "qfw"
-	FClient.InitPool()
-	FClientmgoConn := FClient.GetMgoConn()
-	defer FClient.DestoryMongoConn(FClientmgoConn)
-	//加载省市县代码
-	cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
-	//defer FClient.Connect(cc)
-	if cursor2 == nil {
-		log.Fatalln(cursor2)
-	}
-	tmp := make(map[string]interface{})
-	for cursor2.Next(&tmp) {
-		code := tmp["code"]
-		if code != nil && strings.TrimSpace(code.(string)) != "" {
-			Addrs[fmt.Sprint(code)] = tmp
-		}
-	}
-	log.Println(len(Addrs))
 	//es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
 	//es init
 	elastic.InitElasticSize(Config["elasticsearch"], 50)
 	EsConn = elastic.GetEsConn()
 	defer elastic.DestoryEsConn(EsConn)
+	initRdis()
+	initMongo()
+	initReg()
+}
+
+func main() {
+	//udp
+	updport := Config["udpport"]
+	Updport, _ = strconv.Atoi(Config["port"])
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(processUdpMsg)
+	log.Println("Udp服务监听", updport)
+	log.Println("发送端口port:", Updport)
+	go TimedTaskWinner() //定时任务
+	go TimedTaskBuyer()  //定时任务
+	go TimedTaskAgency() //定时任务
+	c := make(chan int, 1)
+	<-c
+
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	log.Println(act, string(data), ra)
+	switch act {
+	case mu.OP_TYPE_DATA: //上个节点的数据
+		//从表中开始处理生成企业数据
+		tmp := new(map[string]interface{})
+		err := json.Unmarshal(data, &tmp)
+		if err != nil {
+			log.Println("err:", err)
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+			return
+		} else if tmp != nil {
+			if key, ok := (*tmp)["key"].(string); ok {
+				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+			} else {
+				udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
+			}
+			//data_info:save//存量   data_info:add //增量
+			//阻塞
+			CPool <- true
+			go func(mapinfo *map[string]interface{}) {
+				defer func() { <-CPool }()
+				go TaskWinner(mapinfo)
+				go TaskBuyer(mapinfo)
+				go TaskAgency(mapinfo)
+			}(tmp)
+		}
+	case mu.OP_NOOP: //下个节点回应
+		log.Println("发送成功", string(data))
+	}
+}
+
+func initRdis() {
+	var err error
 	//redis
 	RedisPool = redis.Pool{
 		MaxIdle:     50,
@@ -137,11 +161,43 @@ func init() {
 	redis_winner_db, _ = strconv.Atoi(Config["redis_winner_db"])
 	redis_buyer_db, _ = strconv.Atoi(Config["redis_buyer_db"])
 	redis_agency_db, _ = strconv.Atoi(Config["redis_agency_db"])
-	iniReg()
+}
+
+func initMongo() {
+	//mongo init
+	pool_size, _ := strconv.Atoi(Config["pool_size"])
+	SourceClient = new(mongodb.MongodbSim)
+	SourceClient.MongodbAddr = Config["mgoinit"]
+	SourceClient.Size = pool_size
+	SourceClient.DbName = Config["mgodb_bidding"]
+	//mongodbSim.DbName = "qfw"
+	SourceClient.InitPool()
 
+	FClient = new(mongodb.MongodbSim)
+	FClient.MongodbAddr = Config["mgourl"]
+	FClient.Size = pool_size
+	FClient.DbName = Config["mgodb_extract_kf"]
+	//mongodbSim.DbName = "qfw"
+	FClient.InitPool()
+	FClientmgoConn := FClient.GetMgoConn()
+	defer FClient.DestoryMongoConn(FClientmgoConn)
+	//加载省市县代码
+	cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
+	//defer FClient.Connect(cc)
+	if cursor2 == nil {
+		log.Fatalln(cursor2)
+	}
+	tmp := make(map[string]interface{})
+	for cursor2.Next(&tmp) {
+		code := tmp["code"]
+		if code != nil && strings.TrimSpace(code.(string)) != "" {
+			Addrs[fmt.Sprint(code)] = tmp
+		}
+	}
+	log.Println(len(Addrs))
 }
 
-func iniReg() {
+func initReg() {
 	FClientmgoConnReg := FClient.GetMgoConn()
 	defer FClient.DestoryMongoConn(FClientmgoConnReg)
 	findReg, b := FClient.Find(Config["mgo_qyk_reg"], bson.M{"delete": false, "isuse": true}, bson.M{"_id": 1}, nil, false, -1, 0)
@@ -157,76 +213,24 @@ func iniReg() {
 		}
 		regtmp := regexp.MustCompile(s_rule)
 		if s_field == "winner" {
-			if s_type=="ok"{
+			if s_type == "ok" {
 				WinnerRegOk = append(WinnerRegOk, *regtmp)
-			}else if s_type=="err"{
+			} else if s_type == "err" {
 				WinnerRegErr = append(WinnerRegErr, *regtmp)
 			}
 		} else if s_field == "buyer" {
-			if s_type=="ok"{
+			if s_type == "ok" {
 				BuerRegOk = append(BuerRegOk, *regtmp)
-			}else if s_type=="err"{
+			} else if s_type == "err" {
 				BuyerRegErr = append(BuyerRegErr, *regtmp)
 			}
 		} else if s_field == "agency" {
-			if s_type=="ok"{
+			if s_type == "ok" {
 				AgencyRegOk = append(AgencyRegOk, *regtmp)
-			}else if s_type=="err"{
+			} else if s_type == "err" {
 				AgencyRegErr = append(AgencyRegErr, *regtmp)
 			}
 		}
 	}
 	log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
 }
-
-func main() {
-	//udp
-	updport := Config["udpport"]
-	Updport, _ = strconv.Atoi(Config["port"])
-	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	log.Println("Udp服务监听", updport)
-	log.Println("发送端口port:", Updport)
-	go TimedTaskWinner() //定时任务
-	go TimedTaskBuyer()  //定时任务
-	go TimedTaskAgency() //定时任务
-	c := make(chan int, 1)
-	<-c
-
-}
-
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
-	log.Println(act, string(data), ra)
-	switch act {
-	case mu.OP_TYPE_DATA: //上个节点的数据
-		//从表中开始处理生成企业数据
-		tmp := new(map[string]interface{})
-		err := json.Unmarshal(data, &tmp)
-		if err != nil {
-			log.Println("err:", err)
-			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
-			return
-		} else if tmp != nil {
-			if key, ok := (*tmp)["key"].(string); ok {
-				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
-			} else {
-				udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
-			}
-			//data_type:winner data_type:buyer data_type:agency
-			//data_info:save//存量   data_info:add //增量
-			if key, ok := (*tmp)["data_type"].(string); ok {
-				//阻塞
-				CPool <- true
-				if key == "winner" {
-					go TaskWinner(tmp)
-				} else if key == "buyer" {
-					go TaskBuyer(tmp)
-				} else if key == "agency" {
-					go TaskAgency(tmp)
-				}
-			}
-		}
-	case mu.OP_NOOP: //下个节点回应
-		log.Println("发送成功", string(data))
-	}
-}

+ 0 - 2
udp_winner/timedTaskAgency.go

@@ -18,8 +18,6 @@ import (
 //之前main方法,只更新
 func TaskAgency(mapinfo *map[string]interface{}) {
 	defer util.Catch()
-	//释放
-	defer func() { <-CPool }()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
 	if gtid == "" || lteid == "" {
 		log.Println(gtid, lteid, "参数错误")

+ 0 - 2
udp_winner/timedTaskBuyer.go

@@ -18,8 +18,6 @@ import (
 //之前main方法,只更新
 func TaskBuyer(mapinfo *map[string]interface{}) {
 	defer util.Catch()
-	//释放
-	defer func() { <-CPool }()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
 	if gtid == "" || lteid == "" {
 		log.Println(gtid, lteid, "参数错误")

+ 122 - 122
udp_winner/timedTaskWinner.go

@@ -4,10 +4,9 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
+	"gopkg.in/mgo.v2"
 	"gopkg.in/mgo.v2/bson"
 	"log"
-	mu "mfw/util"
-	"net"
 	"qfw/util"
 	"sort"
 	"strings"
@@ -18,8 +17,6 @@ import (
 //之前main方法,只更新
 func TaskWinner(mapinfo *map[string]interface{}) {
 	defer util.Catch()
-	//释放
-	defer func() { <-CPool }()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
 	if gtid == "" || lteid == "" {
 		log.Println(gtid, lteid, "参数错误")
@@ -43,7 +40,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 			"$gte": GId,
 			"$lte": LtId,
 		},
-	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1, "winneraddr": 1}).Iter()
+	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1, "topscopeclass": 1, "package": 1}).Iter()
 
 	if cursor.Err() != nil {
 		SourceClient.DestoryMongoConn(SourceClientcc)
@@ -215,133 +212,140 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		log.Println("存量历史合并执行完成 ok", gtid, lteid)
 
 	} else {
-		//增量处理
-		overid := gtid
-		tmp := map[string]interface{}{}
-		for cursor.Next(&tmp) {
-			overid = tmp["_id"].(bson.ObjectId).Hex()
-			//log.Println(tmp["_id"])
-			winner, ok := tmp["winner"].(string)
-			if !ok || utf8.RuneCountInString(winner) < 4 {
-				continue
+		//增量
+		overid := addfunc(gtid, cursor)
+		SourceClient.DestoryMongoConn(SourceClientcc)
+		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
+	}
+
+}
+//增量
+func addfunc(gtid string, cursor *mgo.Iter) string {
+	//增量处理
+	overid := gtid
+	tmp := map[string]interface{}{}
+	for cursor.Next(&tmp) {
+		overid = tmp["_id"].(bson.ObjectId).Hex()
+		//log.Println(tmp["_id"])
+		winner, ok := tmp["winner"].(string)
+		if !ok || utf8.RuneCountInString(winner) < 4 {
+			continue
+		}
+		//redis查询是否存在
+		rdb := RedisPool.Get()
+		rdb.Do("SELECT", redis_winner_db)
+		if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
+			//redis不存在存到临时表,定时任务处理
+			FClient.DbName = Config["mgodb_extract_kf"]
+			if err := FClient.SaveForOld("winner_new", tmp); err != nil {
+				log.Println("FClient.Save err", err, tmp)
 			}
-			//redis查询是否存在
-			rdb := RedisPool.Get()
-			rdb.Do("SELECT", redis_winner_db)
-			if reply, err := redis.String(rdb.Do("GET", winner)); err != nil {
-				//redis不存在存到临时表,定时任务处理
-				FClient.DbName = Config["mgodb_extract_kf"]
-				if err := FClient.SaveForOld("winner_new", tmp); err != nil {
-					log.Println("FClient.Save err", err, tmp)
-				}
-				//log.Println("get redis id err:定时任务处理", err, tmp)
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
+			//log.Println("get redis id err:定时任务处理", err, tmp)
+			if err := rdb.Close(); err != nil {
+				log.Println(err)
+			}
+			continue
+		} else {
+			if err := rdb.Close(); err != nil {
+				log.Println(err)
+			}
+			//拿到合并后的qyk
+			FClient.DbName = Config["mgodb_extract_kf"]
+			oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
+			if !b || (*oldTmp) == nil || reply == "" || (*oldTmp)["_id"] == nil {
+				log.Println("redis id 不存在")
 				continue
-			} else {
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
-				//拿到合并后的qyk
-				FClient.DbName = Config["mgodb_extract_kf"]
-				oldTmp, b := FClient.FindById(Config["mgo_qyk_c"], reply, bson.M{})
-				if !b || (*oldTmp) == nil|| reply==""||(*oldTmp)["_id"]==nil{
-					log.Println("redis id 不存在")
-					continue
-				}
-				//比较合并
-				//行业类型
-				tmpTopscopeclass := []string{}
-				tmpConTopscopeclass := []string{}
-				tmpTopscopeclassMap := make(map[string]bool)
+			}
+			//比较合并 行业类型
+			tmpTopscopeclass := []string{}
+			tmpConTopscopeclass := []string{}
+			tmpTopscopeclassMap := make(map[string]bool)
 
-				if (*oldTmp)["industry"] != nil {
-					if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
-						for _, vv := range v {
-							if vvv, ok := vv.(string); ok {
-								tmpTopscopeclassMap[vvv] = true
-							}
-						}
-					}
-				}
-				if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+			if (*oldTmp)["industry"] != nil {
+				if v, ok := (*oldTmp)["industry"].([]interface{}); ok {
 					for _, vv := range v {
-						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-							tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
+						if vvv, ok := vv.(string); ok {
+							tmpTopscopeclassMap[vvv] = true
 						}
 					}
 				}
-				for k := range tmpTopscopeclassMap {
-					tmpTopscopeclass = append(tmpTopscopeclass, k)
-				}
-				sort.Strings(tmpTopscopeclass)
-				(*oldTmp)["industry"] = tmpTopscopeclass
-
-				esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
-				//更新行业类型
-				if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-					(*oldTmp)["updatatime"] = time.Now().Unix()
-					//mongo更新
-					FClient.DbName = Config["mgodb_extract_kf"]
-					if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
-						log.Println("mongo更新err", esId)
-					}
-
-					//es更新
-					delete((*oldTmp), "_id")
-					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("update es err:", err)
-					}
-					continue
-				}
-				//联系方式合并
-				var tmpperson, winnertel string
-				if tmppersona, ok := tmp["winnerperson"].(string); ok {
-					tmpperson = tmppersona
-				}
-				if winnerteltmp, ok := tmp["winnertel"].(string); ok {
-					winnertel = winnerteltmp
-				}
-				if Reg_xing.MatchString(winnertel) || !Reg_tel.MatchString(winnertel) {
-					winnertel = ""
-				} else {
-					winnertel = winnertel
-				}
-				contactMaps := make([]interface{}, 0)
-				if (*oldTmp)["contact"] != nil {
-					//直接添加联系人,不再判断
-					if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
-						contactMaps = append(contactMaps, v...)
+			}
+			if v, ok := tmp["topscopeclass"].([]interface{}); ok {
+				for _, vv := range v {
+					if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+						tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
+						tmpConTopscopeclass = append(tmpConTopscopeclass, vvv[:len(vvv)-1])
 					}
 				}
-				vvv := make(map[string]interface{})
-				vvv["infoid"] = overid
-				vvv["contact_person"] = tmpperson
-				vvv["contact_type"] = "项目联系人"
-				vvv["phone"] = winnertel
-				vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
-				vvv["updatetime"] = time.Now().Unix()
-				contactMaps = append(contactMaps, vvv)
-				(*oldTmp)["contact"] = contactMaps
-				//mongo更新
+			}
+			for k := range tmpTopscopeclassMap {
+				tmpTopscopeclass = append(tmpTopscopeclass, k)
+			}
+			sort.Strings(tmpTopscopeclass)
+			(*oldTmp)["industry"] = tmpTopscopeclass
+
+			esId := (*oldTmp)["_id"].(bson.ObjectId).Hex()
+			//更新行业类型
+			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
 				(*oldTmp)["updatatime"] = time.Now().Unix()
+				//mongo更新
 				FClient.DbName = Config["mgodb_extract_kf"]
 				if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
-					log.Println("mongo更新 err", esId, oldTmp)
+					log.Println("mongo更新err", esId)
 				}
+
 				//es更新
 				delete((*oldTmp), "_id")
 				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("EsConn err :", err)
+					log.Println("update es err:", err)
+				}
+				continue
+			}
+			//联系方式合并
+			var tmpperson, winnertel string
+			if tmppersona, ok := tmp["winnerperson"].(string); ok {
+				tmpperson = tmppersona
+			}
+			if winnerteltmp, ok := tmp["winnertel"].(string); ok {
+				winnertel = winnerteltmp
+			}
+			if Reg_xing.MatchString(winnertel) || !Reg_tel.MatchString(winnertel) {
+				winnertel = ""
+			} else {
+				winnertel = winnertel
+			}
+			contactMaps := make([]interface{}, 0)
+			if (*oldTmp)["contact"] != nil {
+				//直接添加联系人,不再判断
+				if v, ok := (*oldTmp)["contact"].([]interface{}); ok {
+					contactMaps = append(contactMaps, v...)
 				}
 			}
+			vvv := make(map[string]interface{})
+			vvv["infoid"] = overid
+			vvv["contact_person"] = tmpperson
+			vvv["contact_type"] = "项目联系人"
+			vvv["phone"] = winnertel
+			vvv["topscopeclass"] = strings.Join(tmpConTopscopeclass, ";")
+			vvv["updatetime"] = time.Now().Unix()
+			contactMaps = append(contactMaps, vvv)
+			//分包处理
+			PackageDealWith(&contactMaps,tmp)
+			(*oldTmp)["contact"] = contactMaps
+			//mongo更新
+			(*oldTmp)["updatatime"] = time.Now().Unix()
+			FClient.DbName = Config["mgodb_extract_kf"]
+			if !FClient.UpdateById(Config["mgo_qyk_c"], esId, bson.M{"$set": oldTmp}) {
+				log.Println("mongo更新 err", esId, oldTmp)
+			}
+			//es更新
+			delete((*oldTmp), "_id")
+			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+				log.Println("EsConn err :", err)
+			}
 		}
-		SourceClient.DestoryMongoConn(SourceClientcc)
-		log.Println("增量合并执行完成 ok", gtid, lteid, overid)
 	}
-
+	return overid
 }
 
 //定时任务  新增
@@ -387,19 +391,10 @@ func TimedTaskWinner() {
 					rdb.Do("SELECT", redis_winner_db)
 					if _, err := redis.String(rdb.Do("GET", errwinner)); err == nil {
 						//redis存在发送udp进行处理
-						by, _ := json.Marshal(map[string]interface{}{
+						TaskWinner(&map[string]interface{}{
 							"gtid":      tmpId,
 							"lteid":     tmpId,
-							"stype":     "",
-							"data_type": "winner",
-							"data_info": "add",
 						})
-						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-							IP:   net.ParseIP("127.0.0.1"),
-							Port: Updport,
-						}); e != nil {
-							log.Println(e)
-						}
 						//存在的话删除tmp mongo表
 						FClient.DbName = Config["mgodb_extract_kf"]
 						if DeletedCount := FClient.Del("winner_new", bson.M{"_id": bson.ObjectIdHex(tmpId)}); !DeletedCount {
@@ -658,3 +653,8 @@ func TimedTaskWinner() {
 		t2.Reset(time.Minute)
 	}
 }
+
+//分包处理
+func PackageDealWith(contactMaps *[]interface{},tmp map[string]interface{})  {
+	
+}