package main import ( "encoding/json" "fieldproject_inc_data/config" "fmt" "github.com/robfig/cron" "github.com/spf13/cobra" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "io/ioutil" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "net/http" "sync" "time" ) func main() { rootCmd := &cobra.Command{Use: "my cmd"} rootCmd.AddCommand(timeTask()) rootCmd.AddCommand(fieldTask()) if err := rootCmd.Execute(); err != nil { fmt.Println("rootCmd.Execute failed", err.Error()) } } // @Description 定时任务 id段 // @Author J 2022/8/11 16:49 func timeTask() *cobra.Command { cmdClient := &cobra.Command{ Use: "time", Short: "Start scheduled task", Run: func(cmd *cobra.Command, args []string) { InitMgo() go checkMapJob() UdpClient := udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} UdpClient.Listen(func(b byte, data []byte, add *net.UDPAddr) { switch b { case udp.OP_NOOP: ok := string(data) if ok != "" { log.Info("udp re", zap.String("data:", ok)) UdpTaskMap.Delete(ok) } } }) c := cron.New() _ = c.AddFunc("0 */10 * * * ?", func() { log.Info("start process") info, _ := MongoTool.Find("field_data_record", nil, `{"_id": -1}`, nil, true, -1, -1) if info != nil && len(*info) > 0 { if util.IntAll((*info)[0]["status"]) == 0 { mapInfo := make(map[string]interface{}) var next = &net.UDPAddr{ IP: net.ParseIP(config.Conf.Udp.Next.Addr), Port: util.IntAll(config.Conf.Udp.Next.Port), } mapInfo["stype"] = config.Conf.Udp.Next.Stype mapInfo["gtid"] = util.ObjToString((*info)[0]["gtid"]) mapInfo["lteid"] = util.ObjToString((*info)[0]["lteid"]) key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype) mapInfo["key"] = key log.Info("udp next node", zap.Any("mapinfo:", mapInfo)) datas, _ := json.Marshal(mapInfo) node := &UdpNode{datas, next, time.Now().Unix(), 0} UdpTaskMap.Store(key, node) _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next) } else { log.Info("timeTask not find ids") } } }) c.Start() ch := make(chan bool, 1) <-ch }, } return cmdClient } // @Description 后续处理 // @Author J 2022/9/13 10:50 func fieldTask() *cobra.Command { cmdClient := &cobra.Command{ Use: "field", Short: "Start processing inc field data", Run: func(cmd *cobra.Command, args []string) { InitMgo() InitEs() go checkMapJob() task() }, } return cmdClient } func task() { go updateEsMethod() UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort)) ch := make(chan bool, 1) <-ch } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { defer util.Catch() switch act { case udp.OP_TYPE_DATA: //上个节点的数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo)) gtid, _ := mapInfo["gtid"].(string) lteid, _ := mapInfo["lteid"].(string) if err != nil || gtid == "" || lteid == "" { UdpClient.WriteUdp([]byte("udp error"), udp.OP_NOOP, ra) //udp失败回写 } else { //udp成功回写 if k := util.ObjToString(mapInfo["key"]); k != "" { UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } else { k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"])) UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra) } log.Info("start dispose ...") disposeFunc(gtid, lteid) } case udp.OP_NOOP: //回应 ok := string(data) if ok != "" { log.Info("udp re", zap.String("data:", ok)) UdpTaskMap.Delete(ok) } } } func disposeFunc(gtid, lteid string) { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, 2) wg := &sync.WaitGroup{} q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} field := map[string]interface{}{"bid_field": 1} query := sess.DB("qfw").C("bidding").Find(q).Select(field).Iter() count, num := 0, 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { util.Debug("current ---", count, tmp["_id"]) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if f := util.ObjToString(tmp["bid_field"]); f != "" { NumLock.Lock() num++ NumLock.Unlock() id := mongodb.BsonIdToSId(tmp["_id"]) util.Debug("id ---", id) update := make(map[string]interface{}) update["bid_field"] = f updateEsPool <- []map[string]interface{}{{ "_id": id, }, update, } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() up := bson.M{"$set": bson.M{"status": 1}} MongoTool.Update("field_data_record", map[string]interface{}{"gtid": gtid}, up, false, false) util.Debug("over ---", count, "actual num ---", num) } type UdpNode struct { data []byte addr *net.UDPAddr timestamp int64 retry int } func checkMapJob() { if config.Conf.Mail.Send { log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To)) for { UdpTaskMap.Range(func(k, v interface{}) bool { now := time.Now().Unix() node, _ := v.(*UdpNode) if now-node.timestamp > 120 { node.retry++ if node.retry > 5 { UdpTaskMap.Delete(k) res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "project-send-fail", k.(string))) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err)) } } else { log.Info("udp重发", zap.Any("k:", k)) UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr) } } else if now-node.timestamp > 10 { log.Info("udp任务超时中..", zap.Any("k:", k)) } return true }) time.Sleep(60 * time.Second) } } } func updateEsMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == saveSize { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) Es1.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) Es1.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }