main.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package main
  2. import (
  3. "encoding/json"
  4. "fieldproject_inc_data/config"
  5. "fmt"
  6. "github.com/robfig/cron"
  7. "github.com/spf13/cobra"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.uber.org/zap"
  10. "io/ioutil"
  11. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  15. "net"
  16. "net/http"
  17. "sync"
  18. "time"
  19. )
  20. func main() {
  21. rootCmd := &cobra.Command{Use: "my cmd"}
  22. rootCmd.AddCommand(timeTask())
  23. rootCmd.AddCommand(fieldTask())
  24. if err := rootCmd.Execute(); err != nil {
  25. fmt.Println("rootCmd.Execute failed", err.Error())
  26. }
  27. }
  28. // @Description 定时任务 id段
  29. // @Author J 2022/8/11 16:49
  30. func timeTask() *cobra.Command {
  31. cmdClient := &cobra.Command{
  32. Use: "time",
  33. Short: "Start scheduled task",
  34. Run: func(cmd *cobra.Command, args []string) {
  35. InitMgo()
  36. go checkMapJob()
  37. UdpClient := udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  38. UdpClient.Listen(func(b byte, data []byte, add *net.UDPAddr) {
  39. switch b {
  40. case udp.OP_NOOP:
  41. ok := string(data)
  42. if ok != "" {
  43. log.Info("udp re", zap.String("data:", ok))
  44. UdpTaskMap.Delete(ok)
  45. }
  46. }
  47. })
  48. c := cron.New()
  49. _ = c.AddFunc("0 */10 * * * ?", func() {
  50. log.Info("start process")
  51. info, _ := MongoTool.Find("field_data_record", nil, `{"_id": -1}`, nil, true, -1, -1)
  52. if info != nil && len(*info) > 0 {
  53. if util.IntAll((*info)[0]["status"]) == 0 {
  54. mapInfo := make(map[string]interface{})
  55. var next = &net.UDPAddr{
  56. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  57. Port: util.IntAll(config.Conf.Udp.Next.Port),
  58. }
  59. mapInfo["stype"] = config.Conf.Udp.Next.Stype
  60. mapInfo["gtid"] = util.ObjToString((*info)[0]["gtid"])
  61. mapInfo["lteid"] = util.ObjToString((*info)[0]["lteid"])
  62. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
  63. mapInfo["key"] = key
  64. log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
  65. datas, _ := json.Marshal(mapInfo)
  66. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  67. UdpTaskMap.Store(key, node)
  68. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  69. } else {
  70. log.Info("timeTask not find ids")
  71. }
  72. }
  73. })
  74. c.Start()
  75. ch := make(chan bool, 1)
  76. <-ch
  77. },
  78. }
  79. return cmdClient
  80. }
  81. // @Description 后续处理
  82. // @Author J 2022/9/13 10:50
  83. func fieldTask() *cobra.Command {
  84. cmdClient := &cobra.Command{
  85. Use: "field",
  86. Short: "Start processing inc field data",
  87. Run: func(cmd *cobra.Command, args []string) {
  88. InitMgo()
  89. InitEs()
  90. go checkMapJob()
  91. task()
  92. },
  93. }
  94. return cmdClient
  95. }
  96. func task() {
  97. go updateEsMethod()
  98. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  99. UdpClient.Listen(processUdpMsg)
  100. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  101. ch := make(chan bool, 1)
  102. <-ch
  103. }
  104. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  105. defer util.Catch()
  106. switch act {
  107. case udp.OP_TYPE_DATA: //上个节点的数据
  108. var mapInfo map[string]interface{}
  109. err := json.Unmarshal(data, &mapInfo)
  110. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  111. gtid, _ := mapInfo["gtid"].(string)
  112. lteid, _ := mapInfo["lteid"].(string)
  113. if err != nil || gtid == "" || lteid == "" {
  114. UdpClient.WriteUdp([]byte("udp error"), udp.OP_NOOP, ra) //udp失败回写
  115. } else {
  116. //udp成功回写
  117. if k := util.ObjToString(mapInfo["key"]); k != "" {
  118. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  119. } else {
  120. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  121. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  122. }
  123. log.Info("start dispose ...")
  124. disposeFunc(gtid, lteid)
  125. }
  126. case udp.OP_NOOP: //回应
  127. ok := string(data)
  128. if ok != "" {
  129. log.Info("udp re", zap.String("data:", ok))
  130. UdpTaskMap.Delete(ok)
  131. }
  132. }
  133. }
  134. func disposeFunc(gtid, lteid string) {
  135. sess := MongoTool.GetMgoConn()
  136. defer MongoTool.DestoryMongoConn(sess)
  137. ch := make(chan bool, 2)
  138. wg := &sync.WaitGroup{}
  139. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}}
  140. field := map[string]interface{}{"bid_field": 1}
  141. query := sess.DB("qfw").C("bidding").Find(q).Select(field).Iter()
  142. count, num := 0, 0
  143. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  144. if count%2000 == 0 {
  145. util.Debug("current ---", count, tmp["_id"])
  146. }
  147. ch <- true
  148. wg.Add(1)
  149. go func(tmp map[string]interface{}) {
  150. defer func() {
  151. <-ch
  152. wg.Done()
  153. }()
  154. if f := util.ObjToString(tmp["bid_field"]); f != "" {
  155. NumLock.Lock()
  156. num++
  157. NumLock.Unlock()
  158. id := mongodb.BsonIdToSId(tmp["_id"])
  159. util.Debug("id ---", id)
  160. update := make(map[string]interface{})
  161. update["bid_field"] = f
  162. updateEsPool <- []map[string]interface{}{{
  163. "_id": id,
  164. },
  165. update,
  166. }
  167. }
  168. }(tmp)
  169. tmp = make(map[string]interface{})
  170. }
  171. wg.Wait()
  172. up := bson.M{"$set": bson.M{"status": 1}}
  173. MongoTool.Update("field_data_record", map[string]interface{}{"gtid": gtid}, up, false, false)
  174. util.Debug("over ---", count, "actual num ---", num)
  175. }
  176. type UdpNode struct {
  177. data []byte
  178. addr *net.UDPAddr
  179. timestamp int64
  180. retry int
  181. }
  182. func checkMapJob() {
  183. if config.Conf.Mail.Send {
  184. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  185. for {
  186. UdpTaskMap.Range(func(k, v interface{}) bool {
  187. now := time.Now().Unix()
  188. node, _ := v.(*UdpNode)
  189. if now-node.timestamp > 120 {
  190. node.retry++
  191. if node.retry > 5 {
  192. UdpTaskMap.Delete(k)
  193. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "project-send-fail", k.(string)))
  194. if err == nil {
  195. defer res.Body.Close()
  196. read, err := ioutil.ReadAll(res.Body)
  197. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  198. }
  199. } else {
  200. log.Info("udp重发", zap.Any("k:", k))
  201. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  202. }
  203. } else if now-node.timestamp > 10 {
  204. log.Info("udp任务超时中..", zap.Any("k:", k))
  205. }
  206. return true
  207. })
  208. time.Sleep(60 * time.Second)
  209. }
  210. }
  211. }
  212. func updateEsMethod() {
  213. arru := make([][]map[string]interface{}, saveSize)
  214. indexu := 0
  215. for {
  216. select {
  217. case v := <-updateEsPool:
  218. arru[indexu] = v
  219. indexu++
  220. if indexu == saveSize {
  221. updateEsSp <- true
  222. go func(arru [][]map[string]interface{}) {
  223. defer func() {
  224. <-updateEsSp
  225. }()
  226. Es.UpdateBulk("bidding", arru...)
  227. Es1.UpdateBulk("bidding", arru...)
  228. }(arru)
  229. arru = make([][]map[string]interface{}, saveSize)
  230. indexu = 0
  231. }
  232. case <-time.After(1000 * time.Millisecond):
  233. if indexu > 0 {
  234. updateEsSp <- true
  235. go func(arru [][]map[string]interface{}) {
  236. defer func() {
  237. <-updateEsSp
  238. }()
  239. Es.UpdateBulk("bidding", arru...)
  240. Es1.UpdateBulk("bidding", arru...)
  241. }(arru[:indexu])
  242. arru = make([][]map[string]interface{}, saveSize)
  243. indexu = 0
  244. }
  245. }
  246. }
  247. }