package main import ( qu "app.yhyue.com/moapp/jybase/common" "dataPrefer/db" "dataPrefer/service" "encoding/json" "flag" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "gopkg.in/natefinch/lumberjack.v2" "io" mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "os" "sync" "time" ) var udpclient mu.UdpClient var lock = &sync.Mutex{} func main() { model := flag.Int("m", 0, "1:非定时任务") flag.Parse() var logger *lumberjack.Logger ctx := gctx.New() g.Config().MustGet(ctx, "logger").Struct(&logger) writers := []io.Writer{logger} if g.Config().MustGet(ctx, "logger.console").Bool() { writers = append(writers, os.Stdout) } log.SetOutput(io.MultiWriter(writers...)) udpPort := g.Config().MustGet(ctx, "udpPort").String() udpclient = mu.UdpClient{Local: udpPort, BufSize: 1024} udpclient.Listen(ProcessUdpMsg) log.Println("Udp服务监听", g.Config().MustGet(ctx, "udpPort").String()) service.IncDataById("684a9c215f834436f09c2710", "684a9c215f834436f09c2710") //service.Tj() //service.Hz() *model = 1 if *model == 0 { <-chan bool(nil) } } // udp接收 func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { log.Println("接收到udp消息", string(data)) stype := qu.ObjToString(mapInfo["stype"]) sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]) key := sid + "-" + eid + "-" + stype go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) if stype == "bidding" { lock.Lock() service.IncDataById(sid, eid) sendNextNode(nil, sid, eid) for { time.Sleep(10 * time.Second) if db.Mgo_Main.Count("bidding_processing_ids", map[string]interface{}{"lteid": eid, "dataprocess": 9}) > 0 { break } log.Println(string(data), "es索引未生完,等待中。。。") } lock.Unlock() } else { sendNextNode(data, "", "") } } case mu.OP_NOOP: //下个节点回应 str := string(data) log.Println("其他节点回应:", str) } } // 下节点发送 func sendNextNode(by []byte, sid string, eid string) { if by == nil { stype := g.Config().MustGet(gctx.New(), "nextNode.stype").String() key := sid + "-" + eid + "-" + stype by, _ = json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": stype, "key": key, }) } addr := &net.UDPAddr{ IP: net.ParseIP(g.Config().MustGet(gctx.New(), "nextNode.addr").String()), Port: g.Config().MustGet(gctx.New(), "nextNode.port").Int(), } udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点 }