123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package main
- import (
- qu "app.yhyue.com/moapp/jybase/common"
- "dataPrefer/db"
- "dataPrefer/service"
- "encoding/json"
- "flag"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gctx"
- "gopkg.in/natefinch/lumberjack.v2"
- "io"
- mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "log"
- "net"
- "os"
- "sync"
- "time"
- )
- var udpclient mu.UdpClient
- var lock = &sync.Mutex{}
- func main() {
- model := flag.Int("m", 0, "1:非定时任务")
- flag.Parse()
- var logger *lumberjack.Logger
- ctx := gctx.New()
- g.Config().MustGet(ctx, "logger").Struct(&logger)
- writers := []io.Writer{logger}
- if g.Config().MustGet(ctx, "logger.console").Bool() {
- writers = append(writers, os.Stdout)
- }
- log.SetOutput(io.MultiWriter(writers...))
- udpPort := g.Config().MustGet(ctx, "udpPort").String()
- udpclient = mu.UdpClient{Local: udpPort, BufSize: 1024}
- udpclient.Listen(ProcessUdpMsg)
- log.Println("Udp服务监听", g.Config().MustGet(ctx, "udpPort").String())
- service.IncDataById("684a9c215f834436f09c2710", "684a9c215f834436f09c2710")
- //service.Tj()
- //service.Hz()
- *model = 1
- if *model == 0 {
- <-chan bool(nil)
- }
- }
- // udp接收
- func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- log.Println("接收到udp消息", string(data))
- stype := qu.ObjToString(mapInfo["stype"])
- sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
- key := sid + "-" + eid + "-" + stype
- go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- if stype == "bidding" {
- lock.Lock()
- service.IncDataById(sid, eid)
- sendNextNode(nil, sid, eid)
- for {
- time.Sleep(10 * time.Second)
- if db.Mgo_Main.Count("bidding_processing_ids", map[string]interface{}{"lteid": eid, "dataprocess": 9}) > 0 {
- break
- }
- log.Println(string(data), "es索引未生完,等待中。。。")
- }
- lock.Unlock()
- } else {
- sendNextNode(data, "", "")
- }
- }
- case mu.OP_NOOP: //下个节点回应
- str := string(data)
- log.Println("其他节点回应:", str)
- }
- }
- // 下节点发送
- func sendNextNode(by []byte, sid string, eid string) {
- if by == nil {
- stype := g.Config().MustGet(gctx.New(), "nextNode.stype").String()
- key := sid + "-" + eid + "-" + stype
- by, _ = json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": stype,
- "key": key,
- })
- }
- addr := &net.UDPAddr{
- IP: net.ParseIP(g.Config().MustGet(gctx.New(), "nextNode.addr").String()),
- Port: g.Config().MustGet(gctx.New(), "nextNode.port").Int(),
- }
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
- }
|