123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- package main
- import (
- "encoding/json"
- "fieldproject_inc_data/config"
- "fmt"
- "github.com/robfig/cron"
- "github.com/spf13/cobra"
- "go.mongodb.org/mongo-driver/bson"
- "go.uber.org/zap"
- "io/ioutil"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "net/http"
- "sync"
- "time"
- )
- func main() {
- rootCmd := &cobra.Command{Use: "my cmd"}
- rootCmd.AddCommand(timeTask())
- rootCmd.AddCommand(fieldTask())
- if err := rootCmd.Execute(); err != nil {
- fmt.Println("rootCmd.Execute failed", err.Error())
- }
- }
- // @Description 定时任务 id段
- // @Author J 2022/8/11 16:49
- func timeTask() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "time",
- Short: "Start scheduled task",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- go checkMapJob()
- UdpClient := udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
- UdpClient.Listen(func(b byte, data []byte, add *net.UDPAddr) {
- switch b {
- case udp.OP_NOOP:
- ok := string(data)
- if ok != "" {
- log.Info("udp re", zap.String("data:", ok))
- UdpTaskMap.Delete(ok)
- }
- }
- })
- c := cron.New()
- _ = c.AddFunc("0 */10 * * * ?", func() {
- log.Info("start process")
- info, _ := MongoTool.Find("field_data_record", nil, `{"_id": -1}`, nil, true, -1, -1)
- if info != nil && len(*info) > 0 {
- if util.IntAll((*info)[0]["status"]) == 0 {
- mapInfo := make(map[string]interface{})
- var next = &net.UDPAddr{
- IP: net.ParseIP(config.Conf.Udp.Next.Addr),
- Port: util.IntAll(config.Conf.Udp.Next.Port),
- }
- mapInfo["stype"] = config.Conf.Udp.Next.Stype
- mapInfo["gtid"] = util.ObjToString((*info)[0]["gtid"])
- mapInfo["lteid"] = util.ObjToString((*info)[0]["lteid"])
- key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
- mapInfo["key"] = key
- log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
- datas, _ := json.Marshal(mapInfo)
- node := &UdpNode{datas, next, time.Now().Unix(), 0}
- UdpTaskMap.Store(key, node)
- _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
- } else {
- log.Info("timeTask not find ids")
- }
- }
- })
- c.Start()
- ch := make(chan bool, 1)
- <-ch
- },
- }
- return cmdClient
- }
- // @Description 后续处理
- // @Author J 2022/9/13 10:50
- func fieldTask() *cobra.Command {
- cmdClient := &cobra.Command{
- Use: "field",
- Short: "Start processing inc field data",
- Run: func(cmd *cobra.Command, args []string) {
- InitMgo()
- InitEs()
- go checkMapJob()
- task()
- },
- }
- return cmdClient
- }
- func task() {
- go updateEsMethod()
- UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
- UdpClient.Listen(processUdpMsg)
- log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer util.Catch()
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
- gtid, _ := mapInfo["gtid"].(string)
- lteid, _ := mapInfo["lteid"].(string)
- if err != nil || gtid == "" || lteid == "" {
- UdpClient.WriteUdp([]byte("udp error"), udp.OP_NOOP, ra) //udp失败回写
- } else {
- //udp成功回写
- if k := util.ObjToString(mapInfo["key"]); k != "" {
- UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- } else {
- k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
- UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- }
- log.Info("start dispose ...")
- disposeFunc(gtid, lteid)
- }
- case udp.OP_NOOP: //回应
- ok := string(data)
- if ok != "" {
- log.Info("udp re", zap.String("data:", ok))
- UdpTaskMap.Delete(ok)
- }
- }
- }
- func disposeFunc(gtid, lteid string) {
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- ch := make(chan bool, 2)
- wg := &sync.WaitGroup{}
- q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}}
- field := map[string]interface{}{"bid_field": 1}
- query := sess.DB("qfw").C("bidding").Find(q).Select(field).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%500 == 0 {
- util.Debug("current ---", count, tmp["_id"])
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if f := util.ObjToString(tmp["bid_field"]); f != "" {
- id := mongodb.BsonIdToSId(tmp["_id"])
- update := make(map[string]interface{})
- update["bid_field"] = f
- updateEsPool <- []map[string]interface{}{{
- "_id": id,
- },
- update,
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- up := bson.M{"$set": bson.M{"status": 1}}
- MongoTool.Update("field_data_record", map[string]interface{}{"gtid": gtid}, up, false, false)
- util.Debug("over ---", count)
- }
- type UdpNode struct {
- data []byte
- addr *net.UDPAddr
- timestamp int64
- retry int
- }
- func checkMapJob() {
- if config.Conf.Mail.Send {
- log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
- for {
- UdpTaskMap.Range(func(k, v interface{}) bool {
- now := time.Now().Unix()
- node, _ := v.(*UdpNode)
- if now-node.timestamp > 120 {
- node.retry++
- if node.retry > 5 {
- UdpTaskMap.Delete(k)
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "project-send-fail", k.(string)))
- if err == nil {
- defer res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
- }
- } else {
- log.Info("udp重发", zap.Any("k:", k))
- UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
- }
- } else if now-node.timestamp > 10 {
- log.Info("udp任务超时中..", zap.Any("k:", k))
- }
- return true
- })
- time.Sleep(60 * time.Second)
- }
- }
- }
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|