zhangjinkun преди 5 години
родител
ревизия
65665a3eb0
променени са 3 файла, в които са добавени 114 реда и са изтрити 19 реда
  1. 10 1
      udp_winner/config.json
  2. 45 18
      udp_winner/main.go
  3. 59 0
      udp_winner/udptaskmap.go

+ 10 - 1
udp_winner/config.json

@@ -19,5 +19,14 @@
   "redis_buyer_db": "2",
   "redis_agency_db": "3",
   "chan_pool_num": "10",
-  "his_redis": "127.0.0.1:6380"
+  "his_redis": "127.0.0.1:6380",
+  "jkmail": {
+        "to": "zhangjinkun@topnet.net.cn",
+        "api": "http://10.171.112.160:19281/_send/_mail"
+    },
+  "nextNode": {
+        "addr": "127.0.0.1",
+        "port": 1483,
+        "memo": "标准库"
+    }
 }

+ 45 - 18
udp_winner/main.go

@@ -3,33 +3,35 @@ package main
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/garyburd/redigo/redis"
-	hisRedis "github.com/go-redis/redis"
-	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
 	"qfw/util"
-	"qfw/common/src/qfw/util/mongodb"
+	"qfw/util/mongodb"
 	"regexp"
 	"strconv"
 	"strings"
 	"time"
+
+	"github.com/garyburd/redigo/redis"
+	hisRedis "github.com/go-redis/redis"
+	"gopkg.in/mgo.v2/bson"
 )
 
 var (
-	Config                                = make(map[string]string)
-	Fields, BuyerFields, AgencyFields     []string
-	SourceClient, FClient                 *mongodb.MongodbSim
-	RedisPool                             redis.Pool
-	HisRedisPool                          *hisRedis.Client
-	Addrs                                 = make(map[string]interface{}, 0) //省市县
-	udpclient                             mu.UdpClient                      //udp对象
-	Reg_person                            = regexp.MustCompile("[\u4e00-\u9fa5]+")
-	Reg_xing                              = regexp.MustCompile(`\*{1,}`)
-	Reg_tel                               = regexp.MustCompile(`^[0-9\-\s]*$`)
-	Updport                               int
-	CPoolWinner, CPoolBuery, CPoolAgency  chan bool
+	Config                               = make(map[string]string)
+	Sysconfig                            map[string]interface{}
+	Fields, BuyerFields, AgencyFields    []string
+	SourceClient, FClient                *mongodb.MongodbSim
+	RedisPool                            redis.Pool
+	HisRedisPool                         *hisRedis.Client
+	Addrs                                = make(map[string]interface{}, 0) //省市县
+	udpclient                            mu.UdpClient                      //udp对象
+	Reg_person                           = regexp.MustCompile("[\u4e00-\u9fa5]+")
+	Reg_xing                             = regexp.MustCompile(`\*{1,}`)
+	Reg_tel                              = regexp.MustCompile(`^[0-9\-\s]*$`)
+	Updport                              int
+	CPoolWinner, CPoolBuery, CPoolAgency chan bool
 	//his_redis db
 	redis_winner_db, redis_buyer_db, redis_agency_db int
 	//异常表正则匹配处理
@@ -43,7 +45,8 @@ var (
 func init() {
 	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
 	util.ReadConfig(&Config)
-	log.Println(Config)
+	util.ReadConfig(&Sysconfig)
+	log.Println(Sysconfig)
 	var err error
 	cpnum, err := strconv.Atoi(Config["chan_pool_num"])
 	if err != nil {
@@ -83,6 +86,7 @@ func main() {
 	go TimedTaskWinner() //定时任务
 	go TimedTaskBuyer()  //定时任务
 	go TimedTaskAgency() //定时任务
+	go checkMapJob()
 	c := make(chan int, 1)
 	<-c
 
@@ -205,7 +209,7 @@ func initReg() {
 	if !b {
 		log.Fatalln("查询正则失败")
 	}
-	for _, v := range (*findReg) {
+	for _, v := range *findReg {
 		s_field, ok := v["s_field"].(string) //字段
 		s_rule, ok2 := v["s_rule"].(string)  //正则
 		s_type, ok3 := v["s_type"].(string)  //ok or err
@@ -243,3 +247,26 @@ func initReg() {
 	}
 	log.Println(len(WinnerRegOk), len(WinnerRegErr), len(BuyerRegOk), len(BuyerRegErr), len(AgencyRegOk), len(AgencyRegErr))
 }
+
+//通知下个节点nextNode
+func nextNode(stype string, updatatime int64) {
+	to, _ := Sysconfig["nextNode"].(map[string]interface{})
+	log.Println(stype, to)
+	key := stype + "-" + fmt.Sprint(updatatime)
+	by, _ := json.Marshal(map[string]interface{}{
+		"query": map[string]interface{}{
+			"updatatime": map[string]interface{}{
+				"$gte": updatatime,
+			},
+		},
+		"stype": stype,
+		"key":   key,
+	})
+	addr := &net.UDPAddr{
+		IP:   net.ParseIP(to["addr"].(string)),
+		Port: util.IntAll(to["port"]),
+	}
+	node := &udpNode{by, addr, time.Now().Unix(), 0}
+	udptaskmap.Store(key, node)
+	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+}

+ 59 - 0
udp_winner/udptaskmap.go

@@ -0,0 +1,59 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"log"
+	mu "mfw/util"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+)
+
+var udptaskmap = &sync.Map{}
+var tomail string
+var api string
+
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+	retry     int
+}
+
+func checkMapJob() {
+	//阿里云内网无法发送邮件
+	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println("start checkMapJob", tomail, Sysconfig["jkmail"])
+	for {
+		udptaskmap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*udpNode)
+			if now-node.timestamp > 120 {
+				node.retry++
+				if node.retry > 5 {
+					log.Println("udp重试失败", k)
+					udptaskmap.Delete(k)
+					res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "extract-send-fail", k.(string)))
+					if err == nil {
+						defer res.Body.Close()
+						read, err := ioutil.ReadAll(res.Body)
+						log.Println("邮件发发送:", string(read), err)
+					}
+				} else {
+					log.Println("udp重发", k)
+					udpclient.WriteUdp(node.data, mu.OP_TYPE_DATA, node.addr)
+				}
+			} else if now-node.timestamp > 10 {
+				log.Println("udp任务超时中..", k)
+			}
+			return true
+		})
+		time.Sleep(60 * time.Second)
+	}
+}