main.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/udp"
  7. "encoding/json"
  8. "fieldproject_inc_data/config"
  9. "fmt"
  10. "github.com/robfig/cron"
  11. "github.com/spf13/cobra"
  12. "go.mongodb.org/mongo-driver/bson"
  13. "go.uber.org/zap"
  14. "io/ioutil"
  15. "net"
  16. "net/http"
  17. "sync"
  18. "time"
  19. )
  20. func main() {
  21. rootCmd := &cobra.Command{Use: "my cmd"}
  22. rootCmd.AddCommand(timeTask())
  23. rootCmd.AddCommand(fieldTask())
  24. if err := rootCmd.Execute(); err != nil {
  25. fmt.Println("rootCmd.Execute failed", err.Error())
  26. }
  27. }
  28. // @Description 定时任务 id段
  29. // @Author J 2022/8/11 16:49
  30. func timeTask() *cobra.Command {
  31. cmdClient := &cobra.Command{
  32. Use: "time",
  33. Short: "Start scheduled task",
  34. Run: func(cmd *cobra.Command, args []string) {
  35. InitMgo()
  36. go checkMapJob()
  37. UdpClient := udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  38. UdpClient.Listen(func(b byte, data []byte, add *net.UDPAddr) {
  39. switch b {
  40. case udp.OP_NOOP:
  41. ok := string(data)
  42. if ok != "" {
  43. log.Info("udp re", zap.String("data:", ok))
  44. UdpTaskMap.Delete(ok)
  45. }
  46. }
  47. })
  48. c := cron.New()
  49. _ = c.AddFunc("0 */10 * * * ?", func() {
  50. log.Info("start process")
  51. info, _ := MongoTool.Find("field_data_record", nil, `{"_id": -1}`, nil, true, -1, -1)
  52. if info != nil && len(*info) > 0 {
  53. if util.IntAll((*info)[0]["status"]) == 0 {
  54. mapInfo := make(map[string]interface{})
  55. var next = &net.UDPAddr{
  56. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  57. Port: util.IntAll(config.Conf.Udp.Next.Port),
  58. }
  59. mapInfo["stype"] = config.Conf.Udp.Next.Stype
  60. mapInfo["gtid"] = util.ObjToString((*info)[0]["gtid"])
  61. mapInfo["lteid"] = util.ObjToString((*info)[0]["lteid"])
  62. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
  63. mapInfo["key"] = key
  64. log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
  65. datas, _ := json.Marshal(mapInfo)
  66. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  67. UdpTaskMap.Store(key, node)
  68. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  69. } else {
  70. log.Info("timeTask not find ids")
  71. }
  72. }
  73. })
  74. c.Start()
  75. ch := make(chan bool, 1)
  76. <-ch
  77. },
  78. }
  79. return cmdClient
  80. }
  81. // @Description 后续处理
  82. // @Author J 2022/9/13 10:50
  83. func fieldTask() *cobra.Command {
  84. cmdClient := &cobra.Command{
  85. Use: "field",
  86. Short: "Start processing inc field data",
  87. Run: func(cmd *cobra.Command, args []string) {
  88. InitMgo()
  89. InitEs()
  90. task()
  91. },
  92. }
  93. return cmdClient
  94. }
  95. func task() {
  96. go updateEsMethod()
  97. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  98. UdpClient.Listen(processUdpMsg)
  99. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  100. ch := make(chan bool, 1)
  101. <-ch
  102. }
  103. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  104. defer util.Catch()
  105. switch act {
  106. case udp.OP_TYPE_DATA: //上个节点的数据
  107. var mapInfo map[string]interface{}
  108. err := json.Unmarshal(data, &mapInfo)
  109. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  110. gtid, _ := mapInfo["gtid"].(string)
  111. lteid, _ := mapInfo["lteid"].(string)
  112. if err != nil || gtid == "" || lteid == "" {
  113. UdpClient.WriteUdp([]byte("udp error"), udp.OP_NOOP, ra) //udp失败回写
  114. } else {
  115. //udp成功回写
  116. if k := util.ObjToString(mapInfo["key"]); k != "" {
  117. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  118. } else {
  119. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  120. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  121. }
  122. log.Info("start dispose ...")
  123. disposeFunc(gtid, lteid)
  124. }
  125. case udp.OP_NOOP: //回应
  126. ok := string(data)
  127. if ok != "" {
  128. log.Info("udp re", zap.String("data:", ok))
  129. UdpTaskMap.Delete(ok)
  130. }
  131. }
  132. }
  133. func disposeFunc(gtid, lteid string) {
  134. sess := MongoTool.GetMgoConn()
  135. defer MongoTool.DestoryMongoConn(sess)
  136. ch := make(chan bool, 2)
  137. wg := &sync.WaitGroup{}
  138. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}}
  139. field := map[string]interface{}{"bid_field": 1}
  140. query := sess.DB("qfw").C("bidding").Find(q).Select(field).Iter()
  141. count := 0
  142. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  143. if count%500 == 0 {
  144. util.Debug("current ---", count, tmp["_id"])
  145. }
  146. ch <- true
  147. wg.Add(1)
  148. go func(tmp map[string]interface{}) {
  149. defer func() {
  150. <-ch
  151. wg.Done()
  152. }()
  153. if f := util.ObjToString(tmp["bid_field"]); f != "" {
  154. id := mongodb.BsonIdToSId(tmp["_id"])
  155. update := make(map[string]interface{})
  156. update["bid_field"] = f
  157. updateEsPool <- []map[string]interface{}{{
  158. "_id": id,
  159. },
  160. update,
  161. }
  162. }
  163. }(tmp)
  164. tmp = make(map[string]interface{})
  165. }
  166. wg.Wait()
  167. up := bson.M{"$set": bson.M{"status": 1}}
  168. MongoTool.Update("field_data_record", map[string]interface{}{"gtid": gtid}, up, false, false)
  169. util.Debug("over ---", count)
  170. }
  171. type UdpNode struct {
  172. data []byte
  173. addr *net.UDPAddr
  174. timestamp int64
  175. retry int
  176. }
  177. func checkMapJob() {
  178. if config.Conf.Mail.Send {
  179. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  180. for {
  181. UdpTaskMap.Range(func(k, v interface{}) bool {
  182. now := time.Now().Unix()
  183. node, _ := v.(*UdpNode)
  184. if now-node.timestamp > 120 {
  185. node.retry++
  186. if node.retry > 5 {
  187. UdpTaskMap.Delete(k)
  188. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "project-send-fail", k.(string)))
  189. if err == nil {
  190. defer res.Body.Close()
  191. read, err := ioutil.ReadAll(res.Body)
  192. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  193. }
  194. } else {
  195. log.Info("udp重发", zap.Any("k:", k))
  196. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  197. }
  198. } else if now-node.timestamp > 10 {
  199. log.Info("udp任务超时中..", zap.Any("k:", k))
  200. }
  201. return true
  202. })
  203. time.Sleep(60 * time.Second)
  204. }
  205. }
  206. }
  207. func updateEsMethod() {
  208. arru := make([][]map[string]interface{}, saveSize)
  209. indexu := 0
  210. for {
  211. select {
  212. case v := <-updateEsPool:
  213. arru[indexu] = v
  214. indexu++
  215. if indexu == saveSize {
  216. updateEsSp <- true
  217. go func(arru [][]map[string]interface{}) {
  218. defer func() {
  219. <-updateEsSp
  220. }()
  221. Es.UpdateBulk("bidding", "bidding", arru...)
  222. }(arru)
  223. arru = make([][]map[string]interface{}, saveSize)
  224. indexu = 0
  225. }
  226. case <-time.After(1000 * time.Millisecond):
  227. if indexu > 0 {
  228. updateEsSp <- true
  229. go func(arru [][]map[string]interface{}) {
  230. defer func() {
  231. <-updateEsSp
  232. }()
  233. Es.UpdateBulk("bidding", "bidding", arru...)
  234. }(arru[:indexu])
  235. arru = make([][]map[string]interface{}, saveSize)
  236. indexu = 0
  237. }
  238. }
  239. }
  240. }