123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- package main
- import (
- qu "app.yhyue.com/moapp/jybase/common"
- "dataPrefer/db"
- "dataPrefer/service"
- "encoding/json"
- "flag"
- "github.com/gogf/gf/v2/encoding/gjson"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gctx"
- "github.com/gogf/gf/v2/os/gfile"
- "gopkg.in/natefinch/lumberjack.v2"
- "io"
- mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "log"
- "net"
- "os"
- "sync"
- "time"
- )
- const (
- Collection_Ids = "bidding_processing_ids"
- LastIdPath = "./lastId.json"
- )
- var (
- udpclient mu.UdpClient
- lock = &sync.Mutex{}
- )
- func main() {
- model := flag.Int("m", 0, "1:非定时任务")
- flag.Parse()
- var logger *lumberjack.Logger
- ctx := gctx.New()
- g.Config().MustGet(ctx, "logger").Struct(&logger)
- writers := []io.Writer{logger}
- if g.Config().MustGet(ctx, "logger.console").Bool() {
- writers = append(writers, os.Stdout)
- }
- log.SetOutput(io.MultiWriter(writers...))
- udpPort := g.Config().MustGet(ctx, "udpPort").String()
- udpclient = mu.UdpClient{Local: udpPort, BufSize: 1024}
- udpclient.Listen(ProcessUdpMsg)
- log.Println("Udp服务监听", g.Config().MustGet(ctx, "udpPort").String())
- go repair()
- //service.IncDataById("684a9c215f834436f09c2710", "684a9c215f834436f09c2710")
- //service.Tj()
- //service.Hz()
- //*model = 1
- if *model == 0 {
- <-chan bool(nil)
- }
- }
- // udp接收
- func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- go udpclient.WriteUdp([]byte(qu.ObjToString(mapInfo["key"])), mu.OP_NOOP, ra)
- log.Println("接收到udp消息", string(data))
- sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
- if qu.ObjToString(mapInfo["stype"]) == "bidding" {
- execute(nil, sid, eid)
- } else {
- sendNextNode(data, "", "")
- }
- }
- case mu.OP_NOOP: //下个节点回应
- str := string(data)
- log.Println("其他节点回应:", str)
- }
- }
- func execute(ids_id interface{}, sid, eid string) {
- lock.Lock()
- defer lock.Unlock()
- service.IncDataById(sid, eid)
- sendNextNode(nil, sid, eid)
- for {
- time.Sleep(10 * time.Second)
- if db.Mgo_Main.Count(Collection_Ids, map[string]interface{}{"lteid": eid, "dataprocess": g.Config().MustGet(gctx.New(), "esDataProcess").Int()}) > 0 {
- break
- }
- log.Println("sid", sid, "eid", eid, "es索引未生完,等待中。。。")
- }
- if jn, err := gjson.Load(LastIdPath); err != nil {
- log.Println("加载", LastIdPath, "出错", err)
- } else if eid > jn.Get("lastId").String() {
- jn.Set("lastId", eid)
- if err := gfile.PutContents(LastIdPath, jn.String()); err != nil {
- log.Println("覆盖", LastIdPath, "出错", err)
- }
- }
- if ids_id == nil {
- idsProcOver(sid, eid)
- } else {
- updateIdsById(ids_id, sid, eid)
- }
- }
- // 下节点发送
- func sendNextNode(by []byte, sid string, eid string) {
- if by == nil {
- stype := g.Config().MustGet(gctx.New(), "nextNode.stype").String()
- key := sid + "-" + eid + "-" + stype
- by, _ = json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": stype,
- "key": key,
- })
- }
- addr := &net.UDPAddr{
- IP: net.ParseIP(g.Config().MustGet(gctx.New(), "nextNode.addr").String()),
- Port: g.Config().MustGet(gctx.New(), "nextNode.port").Int(),
- }
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
- }
- func repair() {
- jn, err := gjson.Load(LastIdPath)
- if err != nil {
- log.Println("程序启动,修补数据加载", LastIdPath, "出错", err)
- return
- }
- lastId := jn.Get("lastId").String()
- if lastId == "" {
- log.Println("程序启动,修补数据lastId异常为空!")
- return
- }
- startTime := jn.Get("startTime").Int64()
- if startTime == 0 {
- log.Println("程序启动后,修补数据startTime异常为0!")
- return
- }
- sess := db.Mgo_Main.GetMgoConn()
- defer db.Mgo_Main.DestoryMongoConn(sess)
- it := sess.DB(db.Mgo_Main.DbName).C(Collection_Ids).Find(map[string]interface{}{"gtid": map[string]interface{}{"$gte": lastId}, "dataprocess": 8, "createtime": map[string]interface{}{"$gte": startTime}}).Select(map[string]interface{}{"_id": 1, "gtid": 1, "lteid": 1}).Sort().Iter()
- index := 0
- for m := make(map[string]interface{}); it.Next(&m); {
- sid, _ := m["gtid"].(string)
- eid, _ := m["lteid"].(string)
- if sid == "" || eid == "" {
- log.Println("sid", sid, "eid", eid, "异常,过滤掉!")
- continue
- }
- index++
- execute(m["_id"], sid, eid)
- m = make(map[string]interface{})
- }
- log.Println("程序启动后,修补id段数据", index)
- }
- // id段处理完,更新状态
- func idsProcOver(sid, eid string) {
- query := map[string]interface{}{
- "gtid": map[string]interface{}{
- "$gte": sid,
- },
- "lteid": map[string]interface{}{
- "$lte": eid,
- },
- }
- datas, _ := db.Mgo_Main.Find(Collection_Ids, query, nil, `{"_id":1}`, false, 0, -1)
- if datas == nil || len(*datas) == 0 {
- log.Println("未查询到记录id段落~", query)
- }
- log.Println("开始更新流程段落记录", len(*datas))
- for _, v := range *datas {
- updateIdsById(v["_id"], sid, eid)
- }
- }
- func updateIdsById(_id interface{}, sid, eid string) {
- ok := db.Mgo_Main.UpdateById(Collection_Ids, _id, map[string]interface{}{
- "$set": map[string]interface{}{
- "dataprocess": 10,
- "updatetime": time.Now().Unix(),
- },
- })
- log.Println(_id, "流程段落记录更新完毕", sid, eid, ok)
- }
|