123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- package main
- import (
- "encoding/json"
- "field-dispose/config"
- "fmt"
- "go.uber.org/zap"
- "io/ioutil"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "net/http"
- "sync"
- "time"
- )
- var (
- MgoB *mongodb.MongodbSim
- Es *elastic.Elastic
- UdpClient udp.UdpClient
- UdpTaskMap = &sync.Map{}
- updatePool chan []map[string]interface{}
- updateSp chan bool
- updateEsPool chan []map[string]interface{}
- updateEsSp chan bool
- UdpChan = make(chan map[string]interface{}, 500)
- SingleThread = make(chan bool, 1)
- Skipping = false //rpc重试跳过
- )
- func init() {
- config.Init("./common.toml")
- InitLog()
- InitMgo()
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 2)
- UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
- UdpClient.Listen(processUdpMsg)
- log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
- }
- type UdpNode struct {
- data []byte
- addr *net.UDPAddr
- timestamp int64
- retry int
- }
- func main() {
- go SaveErrorInfo() //保存异常信息
- //go CheckErrorNum()
- //go updateEsMethod()
- go checkMapJob()
- go updateMethod()
- for {
- mapinfo, ok := <-UdpChan
- if !ok {
- continue
- }
- SingleThread <- true
- go func(m map[string]interface{}) {
- defer func() {
- <-SingleThread
- }()
- log.Info("start dispose ...", zap.Any("key", mapinfo["key"]))
- getIntention(m)
- }(mapinfo)
- }
- }
- func InitMgo() {
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: config.Conf.DB.Mongo.Addr,
- DbName: config.Conf.DB.Mongo.Dbname,
- Size: config.Conf.DB.Mongo.Size,
- UserName: config.Conf.DB.Mongo.User,
- Password: config.Conf.DB.Mongo.Password,
- }
- MgoB.InitPool()
- Es = &elastic.Elastic{
- S_esurl: config.Conf.DB.Es.Addr,
- I_size: config.Conf.DB.Es.Size,
- }
- Es.InitElasticSize()
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer util.Catch()
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
- if err != nil {
- UdpClient.WriteUdp([]byte("error: "+err.Error()), udp.OP_NOOP, ra)
- } else {
- stype := util.ObjToString(mapInfo["stype"])
- switch stype {
- case "jqcl":
- gtid, _ := mapInfo["gtid"].(string)
- lteid, _ := mapInfo["lteid"].(string)
- //udp成功回写
- if k := util.ObjToString(mapInfo["key"]); k != "" {
- go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- } else {
- k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
- go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- }
- UdpChan <- mapInfo
- case "tout-true":
- Skipping = true
- go UdpClient.WriteUdp([]byte(fmt.Sprintf("Skipping:%s", "true")), udp.OP_NOOP, ra)
- case "tout-false":
- Skipping = false
- go UdpClient.WriteUdp([]byte(fmt.Sprintf("Skipping:%s", "false")), udp.OP_NOOP, ra)
- case "monitor":
- log.Info("monitor", zap.Any("mapInfo:", mapInfo))
- go UdpClient.WriteUdp([]byte(util.ObjToString(mapInfo["key"])), udp.OP_NOOP, ra)
- }
- }
- case udp.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Info("udp re", zap.String("data:", ok))
- UdpTaskMap.Delete(ok)
- }
- }
- }
- func NextNode(mapInfo map[string]interface{}) {
- var next = &net.UDPAddr{
- IP: net.ParseIP(config.Conf.Udp.Next.Addr),
- Port: util.IntAll(config.Conf.Udp.Next.Port),
- }
- mapInfo["stype"] = config.Conf.Udp.Next.Stype
- key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
- mapInfo["key"] = key
- log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
- datas, _ := json.Marshal(mapInfo)
- node := &UdpNode{datas, next, time.Now().Unix(), 0}
- UdpTaskMap.Store(key, node)
- _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
- }
- func checkMapJob() {
- if config.Conf.Mail.Send {
- log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
- for {
- UdpTaskMap.Range(func(k, v interface{}) bool {
- now := time.Now().Unix()
- node, _ := v.(*UdpNode)
- if now-node.timestamp > 120 {
- node.retry++
- if node.retry > 5 {
- UdpTaskMap.Delete(k)
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "python字段识别-send-fail", k.(string)))
- if err == nil {
- defer res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
- }
- } else {
- log.Info("udp重发", zap.Any("k:", k))
- UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
- }
- } else if now-node.timestamp > 10 {
- log.Info("udp任务超时中..", zap.Any("k:", k))
- }
- return true
- })
- time.Sleep(60 * time.Second)
- }
- }
- }
- func updateMethod() {
- log.Info("updateMethod 保存...")
- arru := make([][]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk(config.Conf.DB.Es.IndexS, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk(config.Conf.DB.Es.IndexS, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|