package main import ( "encoding/json" "field-dispose/config" "fmt" "go.uber.org/zap" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "net/http" "sync" "time" ) var ( MgoB *mongodb.MongodbSim Es *elastic.Elastic UdpClient udp.UdpClient UdpTaskMap = &sync.Map{} updatePool chan []map[string]interface{} updateSp chan bool updateEsPool chan []map[string]interface{} updateEsSp chan bool UdpChan = make(chan map[string]interface{}, 500) SingleThread = make(chan bool, 1) Skipping = false //rpc重试跳过 ) func init() { config.Init("./common.toml") InitLog() InitMgo() updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 2) UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort)) } type UdpNode struct { data []byte addr *net.UDPAddr timestamp int64 retry int } func main() { go SaveErrorInfo() //保存异常信息 //go CheckErrorNum() //go updateEsMethod() go checkMapJob() go updateMethod() for { mapinfo, ok := <-UdpChan if !ok { continue } SingleThread <- true go func(m map[string]interface{}) { defer func() { <-SingleThread }() log.Info("start dispose ...", zap.Any("key", mapinfo["key"])) getIntention(m) }(mapinfo) } } func InitMgo() { MgoB = &mongodb.MongodbSim{ MongodbAddr: config.Conf.DB.Mongo.Addr, DbName: config.Conf.DB.Mongo.Dbname, Size: config.Conf.DB.Mongo.Size, UserName: config.Conf.DB.Mongo.User, Password: config.Conf.DB.Mongo.Password, } MgoB.InitPool() Es = &elastic.Elastic{ S_esurl: config.Conf.DB.Es.Addr, I_size: config.Conf.DB.Es.Size, } Es.InitElasticSize() } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer util.Catch() switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo)) if err != nil { UdpClient.WriteUdp([]byte("error: "+err.Error()), udp.OP_NOOP, ra) } else { stype := util.ObjToString(mapInfo["stype"]) switch stype { case "jqcl": gtid, _ := mapInfo["gtid"].(string) lteid, _ := mapInfo["lteid"].(string) //udp成功回写 if k := util.ObjToString(mapInfo["key"]); k != "" { go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } else { k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"])) go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } UdpChan <- mapInfo case "tout-true": Skipping = true go UdpClient.WriteUdp([]byte(fmt.Sprintf("Skipping:%s", "true")), udp.OP_NOOP, ra) case "tout-false": Skipping = false go UdpClient.WriteUdp([]byte(fmt.Sprintf("Skipping:%s", "false")), udp.OP_NOOP, ra) case "monitor": log.Info("monitor", zap.Any("mapInfo:", mapInfo)) go UdpClient.WriteUdp([]byte(util.ObjToString(mapInfo["key"])), udp.OP_NOOP, ra) } } case udp.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Info("udp re", zap.String("data:", ok)) UdpTaskMap.Delete(ok) } } } func NextNode(mapInfo map[string]interface{}) { var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = config.Conf.Udp.Next.Stype key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype) mapInfo["key"] = key log.Info("udp next node", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } func checkMapJob() { if config.Conf.Mail.Send { log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To)) for { UdpTaskMap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*UdpNode) if now-node.timestamp > 120 { node.retry++ if node.retry > 5 { UdpTaskMap.Delete(k) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "python字段识别-send-fail", k.(string))) if err == nil { defer res.Body.Close() read, err := io.ReadAll(res.Body) log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err)) } } else { log.Info("udp重发", zap.Any("k:", k)) UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr) } } else if now-node.timestamp > 10 { log.Info("udp任务超时中..", zap.Any("k:", k)) } return true }) time.Sleep(60 * time.Second) } } } func updateMethod() { log.Info("updateMethod 保存...") arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 500 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } } func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk(config.Conf.DB.Es.IndexS, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk(config.Conf.DB.Es.IndexS, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }