Parcourir la source

修改联系人逻辑
定时任务udp发送参数
配置文件

fengweiqiang il y a 5 ans
Parent
commit
a7ee4c43b5
3 fichiers modifiés avec 62 ajouts et 20 suppressions
  1. 5 1
      udp_winner/config.json
  2. 11 3
      udp_winner/main.go
  3. 46 16
      udp_winner/timedTaskWinner.go

+ 5 - 1
udp_winner/config.json

@@ -20,5 +20,9 @@
   "mgo_qyk_c": "enterprise_qyxy",
   "mgo_qyk_buyer": "buyer_qyxy",
   "mgo_qyk_agency": "gency_qyxy",
-  "redis": "127.0.0.1:6379"
+  "redis": "127.0.0.1:6379",
+  "redis_winner_db": "1",
+  "redis_buyer_db": "2",
+  "redis_agency_db": "3",
+  "chan_pool_num": "10"
 }

+ 11 - 3
udp_winner/main.go

@@ -21,7 +21,7 @@ import (
 
 var (
 	Config                                = make(map[string]string)
-	Fields,BuyerFields,AgencyFields                                []string
+	Fields, BuyerFields, AgencyFields     []string
 	SourceClient, FClient                 *MongodbSim
 	RedisPool                             redis.Pool
 	HisRedisPool                          *hisRedis.Client
@@ -33,6 +33,7 @@ var (
 	Reg_tel                               = regexp.MustCompile(`^[0-9\-\s]*$`)
 	EsConn                                *es.Client
 	Updport                               int
+	CPool                                 chan bool
 )
 
 /**
@@ -43,6 +44,12 @@ func init() {
 	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
 	util.ReadConfig(&Config)
 	log.Println(Config)
+	var err error
+	cpnum, err := strconv.Atoi(Config["chan_pool_num"])
+	if err != nil{
+		log.Fatalln(err)
+	}
+	CPool = make(chan bool,cpnum)
 	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
 		"capital", "establish_date", "legal_person", "company_type",
 		"district", "city", "province", "area_code", "credit_no",
@@ -56,7 +63,6 @@ func init() {
 	AgencyFields = []string{"_id", "contact", "type", "ranks",
 		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
 		"history_name", "wechat_accounts", "website", "report_websites"}
-	var err error
 	pool_size, _ := strconv.Atoi(Config["pool_size"])
 
 	//mongo init
@@ -135,7 +141,7 @@ func main() {
 	log.Println("Udp服务监听", updport)
 	log.Println("发送端口port:", Updport)
 	go TimedTaskWinner() //定时任务
-	go TimedTaskBuyer() //定时任务
+	go TimedTaskBuyer()  //定时任务
 	go TimedTaskAgency() //定时任务
 	c := make(chan int, 1)
 	<-c
@@ -162,6 +168,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			//data_type:winner data_type:buyer data_type:agency
 			//data_info:save//存量   data_info:add //增量
 			if key, ok := (*tmp)["data_type"].(string); ok {
+				//阻塞
+				CPool <- true
 				if key == "winner" {
 					go TaskWinner(tmp)
 				} else if key == "buyer" {

+ 46 - 16
udp_winner/timedTaskWinner.go

@@ -11,6 +11,7 @@ import (
 	"net"
 	"qfw/util"
 	"sort"
+	"strconv"
 	"strings"
 	"time"
 )
@@ -18,6 +19,8 @@ import (
 //之前main方法,只更新
 func TaskWinner(mapinfo *map[string]interface{}) {
 	defer util.Catch()
+	//释放
+	defer func() { <-CPool }()
 	gtid, lteid := util.ObjToString((*mapinfo)["gtid"]), util.ObjToString((*mapinfo)["lteid"])
 	if gtid == "" || lteid == "" {
 		log.Println(gtid, lteid, "参数错误")
@@ -53,15 +56,16 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		conn := HisRedisPool.Conn()
 		defer conn.Close()
 		//选择redis db
-		conn.Select(1)
+		redis_winner_db, _ := strconv.Atoi(Config["redis_winner_db"])
+		conn.Select(redis_winner_db)
 		//遍历bidding表保存到redis
 		// key:_id  value:json结构体
 		for cursor.Next(&tmp) {
 			if tmp["winner"] == nil || tmp["winner"] == "" {
 				continue
 			}
-			mgoId:=tmp["_id"].(primitive.ObjectID).Hex()
-			delete(tmp,"_id")
+			mgoId := tmp["_id"].(primitive.ObjectID).Hex()
+			delete(tmp, "_id")
 			bytes, _ := json.Marshal(tmp)
 			if err := conn.Set(mgoId, string(bytes), 0).Err(); err != nil {
 				log.Println(err)
@@ -74,14 +78,14 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 		} else {
 			iterator := scan.Iterator()
 			for iterator.Next() {
-				redisId := iterator.Val()                       //redis key
+				redisId := iterator.Val()                    //redis key
 				redisvalue := conn.Get(iterator.Val()).Val() //redis val
 				tmp := make(map[string]interface{})
-				json.Unmarshal([]byte(redisvalue),&tmp)
+				json.Unmarshal([]byte(redisvalue), &tmp)
 				//重复增量操作
 				//redis查询是否存在
 				rdb := RedisPool.Get()
-				rdb.Do("SELECT","1")
+				rdb.Do("SELECT", Config["redis_winner_db"])
 				if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
 					//redis不存在,存到临时表,定时任务处理
 					FClient.DbName = Config["mgodb_extract_kf"]
@@ -90,7 +94,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 					}
 					//log.Println("get redis id err:定时任务处理", err, tmp)
 					if err := rdb.Close(); err != nil {
-						log.Println("存量",err)
+						log.Println("存量", err)
 					}
 					//删除存量redis
 					conn.Del(redisId)
@@ -103,7 +107,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 					FClient.DbName = Config["mgodb_extract_kf"]
 					oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
 					if oldTmp == nil {
-						log.Println("存量 redis id 不存在",reply,tmp["winner"])
+						log.Println("存量 redis id 不存在", reply, tmp["winner"])
 						continue
 					}
 					tmpTopscopeclass := []string{}
@@ -184,7 +188,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 						contactMaps = append(contactMaps, tmpContact)
 					} else {
 						//对比前四项,相等丢弃
-						if v, ok := oldTmp["contact"].(primitive.A); ok {
+						/*if v, ok := oldTmp["contact"].(primitive.A); ok {
 							var isNotUpdate bool
 							for _, vv := range v {
 								if vvv, ok := vv.(map[string]interface{}); ok {
@@ -206,7 +210,19 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 								vvv["updatetime"] = time.Now().Unix()
 								contactMaps = append(contactMaps, vvv)
 							}
+						}*/
+						//直接添加联系人,不再判断
+						if v, ok := oldTmp["contact"].(primitive.A); ok {
+							contactMaps = append(contactMaps, v...)
 						}
+						vvv := make(map[string]interface{})
+						vvv["infoid"] = redisId
+						vvv["contact_person"] = tmp["winnerperson"]
+						vvv["contact_type"] = "项目联系人"
+						vvv["phone"] = winnertel
+						vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						vvv["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, vvv)
 					}
 					oldTmp["contact"] = contactMaps
 					//mongo更新
@@ -239,7 +255,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 			}
 			//redis查询是否存在
 			rdb := RedisPool.Get()
-			rdb.Do("SELECT","1")
+			rdb.Do("SELECT", Config["redis_winner_db"])
 			if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
 				//redis不存在存到临时表,定时任务处理
 				FClient.DbName = Config["mgodb_extract_kf"]
@@ -341,7 +357,7 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 					contactMaps = append(contactMaps, tmpContact)
 				} else {
 					//对比前四项,相等丢弃
-					if v, ok := oldTmp["contact"].(primitive.A); ok {
+					/*if v, ok := oldTmp["contact"].(primitive.A); ok {
 						var isNotUpdate bool
 						for _, vv := range v {
 							if vvv, ok := vv.(map[string]interface{}); ok {
@@ -363,7 +379,19 @@ func TaskWinner(mapinfo *map[string]interface{}) {
 							vvv["updatetime"] = time.Now().Unix()
 							contactMaps = append(contactMaps, vvv)
 						}
+					}*/
+					//直接添加联系人,不再判断
+					if v, ok := oldTmp["contact"].(primitive.A); ok {
+						contactMaps = append(contactMaps, v...)
 					}
+					vvv := make(map[string]interface{})
+					vvv["infoid"] = overid
+					vvv["contact_person"] = tmp["winnerperson"]
+					vvv["contact_type"] = "项目联系人"
+					vvv["phone"] = winnertel
+					vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+					vvv["updatetime"] = time.Now().Unix()
+					contactMaps = append(contactMaps, vvv)
 				}
 				oldTmp["contact"] = contactMaps
 				//mongo更新
@@ -420,14 +448,16 @@ func TimedTaskWinner() {
 					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					rdb.Do("SELECT","1")
+					rdb.Do("SELECT", Config["redis_winner_db"])
 					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
 						by, _ := json.Marshal(map[string]interface{}{
-							"gtid":  tmpId,
-							"lteid": tmpId,
-							"stype": "",
+							"gtid":      tmpId,
+							"lteid":     tmpId,
+							"stype":     "",
+							"data_type": "winner",
+							"data_info": "add",
 						})
 						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
 							IP:   net.ParseIP("127.0.0.1"),
@@ -601,7 +631,7 @@ func TimedTaskWinner() {
 						if saveid != nil {
 							//保存redis
 							rc := RedisPool.Get()
-							rc.Do("SELECT","1")
+							rc.Do("SELECT", Config["redis_winner_db"])
 							var _id string
 							if v, ok := saveid.(primitive.ObjectID); ok {
 								_id = v.Hex()