main.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package main
  2. import (
  3. qu "app.yhyue.com/moapp/jybase/common"
  4. "dataPrefer/db"
  5. "dataPrefer/service"
  6. "encoding/json"
  7. "flag"
  8. "github.com/gogf/gf/v2/encoding/gjson"
  9. "github.com/gogf/gf/v2/frame/g"
  10. "github.com/gogf/gf/v2/os/gctx"
  11. "github.com/gogf/gf/v2/os/gfile"
  12. "gopkg.in/natefinch/lumberjack.v2"
  13. "io"
  14. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  15. "log"
  16. "net"
  17. "os"
  18. "sync"
  19. "time"
  20. )
  21. const (
  22. Collection_Ids = "bidding_processing_ids"
  23. LastIdPath = "./lastId.json"
  24. )
  25. var (
  26. udpclient mu.UdpClient
  27. lock = &sync.Mutex{}
  28. )
  29. func main() {
  30. model := flag.Int("m", 0, "1:非定时任务")
  31. flag.Parse()
  32. var logger *lumberjack.Logger
  33. ctx := gctx.New()
  34. g.Config().MustGet(ctx, "logger").Struct(&logger)
  35. writers := []io.Writer{logger}
  36. if g.Config().MustGet(ctx, "logger.console").Bool() {
  37. writers = append(writers, os.Stdout)
  38. }
  39. log.SetOutput(io.MultiWriter(writers...))
  40. udpPort := g.Config().MustGet(ctx, "udpPort").String()
  41. udpclient = mu.UdpClient{Local: udpPort, BufSize: 1024}
  42. udpclient.Listen(ProcessUdpMsg)
  43. log.Println("Udp服务监听", g.Config().MustGet(ctx, "udpPort").String())
  44. go repair()
  45. //service.IncDataById("684a9c215f834436f09c2710", "684a9c215f834436f09c2710")
  46. //service.Tj()
  47. //service.Hz()
  48. //*model = 1
  49. if *model == 0 {
  50. <-chan bool(nil)
  51. }
  52. }
  53. // udp接收
  54. func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  55. switch act {
  56. case mu.OP_TYPE_DATA:
  57. var mapInfo map[string]interface{}
  58. err := json.Unmarshal(data, &mapInfo)
  59. if err != nil {
  60. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  61. } else if mapInfo != nil {
  62. go udpclient.WriteUdp([]byte(qu.ObjToString(mapInfo["key"])), mu.OP_NOOP, ra)
  63. log.Println("接收到udp消息", string(data))
  64. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  65. if qu.ObjToString(mapInfo["stype"]) == "bidding" {
  66. execute(nil, sid, eid)
  67. } else {
  68. sendNextNode(data, "", "")
  69. }
  70. }
  71. case mu.OP_NOOP: //下个节点回应
  72. str := string(data)
  73. log.Println("其他节点回应:", str)
  74. }
  75. }
  76. func execute(ids_id interface{}, sid, eid string) {
  77. lock.Lock()
  78. defer lock.Unlock()
  79. service.IncDataById(sid, eid)
  80. sendNextNode(nil, sid, eid)
  81. for {
  82. time.Sleep(10 * time.Second)
  83. if db.Mgo_Main.Count(Collection_Ids, map[string]interface{}{"lteid": eid, "dataprocess": g.Config().MustGet(gctx.New(), "esDataProcess").Int()}) > 0 {
  84. break
  85. }
  86. log.Println("sid", sid, "eid", eid, "es索引未生完,等待中。。。")
  87. }
  88. if jn, err := gjson.Load(LastIdPath); err != nil {
  89. log.Println("加载", LastIdPath, "出错", err)
  90. } else if eid > jn.Get("lastId").String() {
  91. jn.Set("lastId", eid)
  92. if err := gfile.PutContents(LastIdPath, jn.String()); err != nil {
  93. log.Println("覆盖", LastIdPath, "出错", err)
  94. }
  95. }
  96. if ids_id == nil {
  97. idsProcOver(sid, eid)
  98. } else {
  99. updateIdsById(ids_id, sid, eid)
  100. }
  101. }
  102. // 下节点发送
  103. func sendNextNode(by []byte, sid string, eid string) {
  104. if by == nil {
  105. stype := g.Config().MustGet(gctx.New(), "nextNode.stype").String()
  106. key := sid + "-" + eid + "-" + stype
  107. by, _ = json.Marshal(map[string]interface{}{
  108. "gtid": sid,
  109. "lteid": eid,
  110. "stype": stype,
  111. "key": key,
  112. })
  113. }
  114. addr := &net.UDPAddr{
  115. IP: net.ParseIP(g.Config().MustGet(gctx.New(), "nextNode.addr").String()),
  116. Port: g.Config().MustGet(gctx.New(), "nextNode.port").Int(),
  117. }
  118. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
  119. }
  120. func repair() {
  121. jn, err := gjson.Load(LastIdPath)
  122. if err != nil {
  123. log.Println("程序启动,修补数据加载", LastIdPath, "出错", err)
  124. return
  125. }
  126. lastId := jn.Get("lastId").String()
  127. if lastId == "" {
  128. log.Println("程序启动,修补数据lastId异常为空!")
  129. return
  130. }
  131. startTime := jn.Get("startTime").Int64()
  132. if startTime == 0 {
  133. log.Println("程序启动后,修补数据startTime异常为0!")
  134. return
  135. }
  136. sess := db.Mgo_Main.GetMgoConn()
  137. defer db.Mgo_Main.DestoryMongoConn(sess)
  138. it := sess.DB(db.Mgo_Main.DbName).C(Collection_Ids).Find(map[string]interface{}{"gtid": map[string]interface{}{"$gte": lastId}, "dataprocess": 8, "createtime": map[string]interface{}{"$gte": startTime}}).Select(map[string]interface{}{"_id": 1, "gtid": 1, "lteid": 1}).Sort().Iter()
  139. index := 0
  140. for m := make(map[string]interface{}); it.Next(&m); {
  141. sid, _ := m["gtid"].(string)
  142. eid, _ := m["lteid"].(string)
  143. if sid == "" || eid == "" {
  144. log.Println("sid", sid, "eid", eid, "异常,过滤掉!")
  145. continue
  146. }
  147. index++
  148. execute(m["_id"], sid, eid)
  149. m = make(map[string]interface{})
  150. }
  151. log.Println("程序启动后,修补id段数据", index)
  152. }
  153. // id段处理完,更新状态
  154. func idsProcOver(sid, eid string) {
  155. query := map[string]interface{}{
  156. "gtid": map[string]interface{}{
  157. "$gte": sid,
  158. },
  159. "lteid": map[string]interface{}{
  160. "$lte": eid,
  161. },
  162. }
  163. datas, _ := db.Mgo_Main.Find(Collection_Ids, query, nil, `{"_id":1}`, false, 0, -1)
  164. if datas == nil || len(*datas) == 0 {
  165. log.Println("未查询到记录id段落~", query)
  166. }
  167. log.Println("开始更新流程段落记录", len(*datas))
  168. for _, v := range *datas {
  169. updateIdsById(v["_id"], sid, eid)
  170. }
  171. }
  172. func updateIdsById(_id interface{}, sid, eid string) {
  173. ok := db.Mgo_Main.UpdateById(Collection_Ids, _id, map[string]interface{}{
  174. "$set": map[string]interface{}{
  175. "dataprocess": 10,
  176. "updatetime": time.Now().Unix(),
  177. },
  178. })
  179. log.Println(_id, "流程段落记录更新完毕", sid, eid, ok)
  180. }