fengweiqiang 5 年之前
父节点
当前提交
3ef7a589c9
共有 2 个文件被更改,包括 37 次插入36 次删除
  1. 7 14
      udp_winner/config.json
  2. 30 22
      udp_winner/main.go

+ 7 - 14
udp_winner/config.json

@@ -1,30 +1,23 @@
 {
-  "elasticsearch": "http://172.17.145.170:9800",
-  "elasticsearch_index": "winner_enterprise",
-  "elasticsearch_type": "winnerent",
-  "elasticsearch_buyer_index": "buyer_enterprise",
-  "elasticsearch_buyer_type": "buyerent",
-  "elasticsearch_agency_index": "agency_enterprise",
-  "elasticsearch_agency_type": "agencyent",
   "udpport": "127.0.0.1:12678",
   "port": "12678",
   "pool_size": "10",
-  "mgoinit": "172.17.145.163:27082",
-  "mgodb_bidding": "extract_v3",
-  "mgodb_mgoinit_c": "buyer",
+  "mgoinit": "127.0.0.1:27017",
+  "mgodb_bidding": "qfw",
+  "mgodb_mgoinit_c": "bidding",
   "mgodb_enterprise": "enterprise",
   "mgodb_enterprise_c": "qyxy",
   "mgourl2": "192.168.3.207:27092",
-  "mgourl": "172.17.145.163:27082",
-  "mgodb_extract_kf": "extract_v3",
+  "mgourl": "127.0.0.1:27017",
+  "mgodb_extract_kf": "extract_kf",
   "mgo_qyk_c": "winner_enterprise",
   "mgo_qyk_buyer": "buyer_enterprise",
   "mgo_qyk_agency": "agency_enterprise",
   "mgo_qyk_reg": "rc_rule",
-  "redis": "172.17.148.44:1479",
+  "redis": "127.0.0.1:6379",
   "redis_winner_db": "1",
   "redis_buyer_db": "2",
   "redis_agency_db": "3",
   "chan_pool_num": "10",
-  "his_redis": "172.17.148.44:2579"
+  "his_redis": "127.0.0.1:6380"
 }

+ 30 - 22
udp_winner/main.go

@@ -6,11 +6,9 @@ import (
 	"github.com/garyburd/redigo/redis"
 	hisRedis "github.com/go-redis/redis"
 	"gopkg.in/mgo.v2/bson"
-	es "gopkg.in/olivere/elastic.v1"
 	"log"
 	mu "mfw/util"
 	"net"
-	"qfw/common/src/qfw/util/elastic"
 	"qfw/util"
 	"qfw/util/mongodb"
 	"regexp"
@@ -27,13 +25,11 @@ var (
 	HisRedisPool                          *hisRedis.Client
 	Addrs                                 = make(map[string]interface{}, 0) //省市县
 	udpclient                             mu.UdpClient                      //udp对象
-	ElasticClientIndex, ElasticClientType string
+	Reg_person                            = regexp.MustCompile("[\u4e00-\u9fa5]+")
 	Reg_xing                              = regexp.MustCompile(`\*{1,}`)
-	Reg_person                            = regexp.MustCompile("[\u4E00-\u9FA5\\s]+")
 	Reg_tel                               = regexp.MustCompile(`^[0-9\-\s]*$`)
-	EsConn                                *es.Client
 	Updport                               int
-	CPool                                 chan bool
+	CPoolWinner, CPoolBuery, CPoolAgency  chan bool
 	//his_redis db
 	redis_winner_db, redis_buyer_db, redis_agency_db int
 	//异常表正则匹配处理
@@ -53,7 +49,9 @@ func init() {
 	if err != nil {
 		log.Fatalln(err)
 	}
-	CPool = make(chan bool, cpnum)
+	CPoolWinner = make(chan bool, cpnum)
+	CPoolBuery = make(chan bool, cpnum)
+	CPoolAgency = make(chan bool, cpnum)
 	Fields = []string{"_id", "contact", "partners", "business_scope", "company_address",
 		"capital", "establish_date", "legal_person", "company_type",
 		"district", "city", "province", "area_code", "credit_no",
@@ -68,11 +66,6 @@ func init() {
 		"address", "district", "city", "province", "area_code", "credit_no", "agency_name",
 		"history_name", "wechat_accounts", "website", "report_websites"}
 
-	//es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
-	//es init
-	elastic.InitElasticSize(Config["elasticsearch"], 50)
-	EsConn = elastic.GetEsConn()
-	defer elastic.DestoryEsConn(EsConn)
 	initRdis()
 	initMongo()
 	initReg()
@@ -87,8 +80,8 @@ func main() {
 	log.Println("Udp服务监听", updport)
 	log.Println("发送端口port:", Updport)
 	go TimedTaskWinner() //定时任务
-	go TimedTaskBuyer()  //定时任务
-	go TimedTaskAgency() //定时任务
+	//go TimedTaskBuyer()  //定时任务
+	//go TimedTaskAgency() //定时任务
 	c := make(chan int, 1)
 	<-c
 
@@ -113,13 +106,22 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 			//data_info:save//存量   data_info:add //增量
 			//阻塞
-			CPool <- true
+			CPoolWinner <- true
 			go func(mapinfo *map[string]interface{}) {
-				defer func() { <-CPool }()
-				go TaskWinner(mapinfo)
-				go TaskBuyer(mapinfo)
-				go TaskAgency(mapinfo)
+				defer func() { <-CPoolWinner }()
+				TaskWinner(mapinfo)
 			}(tmp)
+			CPoolBuery <- true
+			go func(mapinfo *map[string]interface{}) {
+				defer func() { <-CPoolBuery }()
+				TaskBuyer(mapinfo)
+			}(tmp)
+			CPoolAgency <- true
+			go func(mapinfo *map[string]interface{}) {
+				defer func() { <-CPoolAgency }()
+				TaskAgency(mapinfo)
+			}(tmp)
+
 		}
 	case mu.OP_NOOP: //下个节点回应
 		log.Println("发送成功", string(data))
@@ -170,14 +172,12 @@ func initMongo() {
 	SourceClient.MongodbAddr = Config["mgoinit"]
 	SourceClient.Size = pool_size
 	SourceClient.DbName = Config["mgodb_bidding"]
-	//mongodbSim.DbName = "qfw"
 	SourceClient.InitPool()
 
 	FClient = new(mongodb.MongodbSim)
 	FClient.MongodbAddr = Config["mgourl"]
 	FClient.Size = pool_size
 	FClient.DbName = Config["mgodb_extract_kf"]
-	//mongodbSim.DbName = "qfw"
 	FClient.InitPool()
 	FClientmgoConn := FClient.GetMgoConn()
 	defer FClient.DestoryMongoConn(FClientmgoConn)
@@ -211,7 +211,15 @@ func initReg() {
 		if !ok || !ok2 || !ok3 || s_field == "" || s_rule == "" || s_type == "" {
 			continue
 		}
-		regtmp := regexp.MustCompile(s_rule)
+		var pattern string
+		if strings.Contains(s_rule, "\\u") {
+			s_rule = strings.Replace(s_rule, "\\", "\\\\", -1)
+			s_rule = strings.Replace(s_rule, "\\\\u", "\\u", -1)
+			pattern, _ = strconv.Unquote(`"` + s_rule + `"`)
+		} else {
+			pattern = s_rule
+		}
+		regtmp := regexp.MustCompile(pattern)
 		if s_field == "winner" {
 			if s_type == "ok" {
 				WinnerRegOk = append(WinnerRegOk, *regtmp)