main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "field-dispose/config"
  10. "fmt"
  11. "go.uber.org/zap"
  12. "io/ioutil"
  13. "net"
  14. "net/http"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MgoB *mongodb.MongodbSim
  20. Es *elastic.Elastic
  21. UdpClient udp.UdpClient
  22. UdpTaskMap = &sync.Map{}
  23. updatePool chan []map[string]interface{}
  24. updateSp chan bool
  25. updateEsPool chan []map[string]interface{}
  26. updateEsSp chan bool
  27. )
  28. func init() {
  29. config.Init("./common.toml")
  30. InitLog()
  31. InitMgo()
  32. updatePool = make(chan []map[string]interface{}, 5000)
  33. updateSp = make(chan bool, 5)
  34. updateEsPool = make(chan []map[string]interface{}, 5000)
  35. updateEsSp = make(chan bool, 2)
  36. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  37. UdpClient.Listen(processUdpMsg)
  38. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  39. }
  40. type UdpNode struct {
  41. data []byte
  42. addr *net.UDPAddr
  43. timestamp int64
  44. retry int
  45. }
  46. func main() {
  47. go SaveErrorInfo() //保存异常信息
  48. //go CheckErrorNum()
  49. //go updateEsMethod()
  50. go checkMapJob()
  51. go updateMethod()
  52. ch := make(chan bool, 1)
  53. <-ch
  54. }
  55. func InitMgo() {
  56. MgoB = &mongodb.MongodbSim{
  57. MongodbAddr: config.Conf.DB.Mongo.Addr,
  58. DbName: config.Conf.DB.Mongo.Dbname,
  59. Size: config.Conf.DB.Mongo.Size,
  60. UserName: config.Conf.DB.Mongo.User,
  61. Password: config.Conf.DB.Mongo.Password,
  62. }
  63. MgoB.InitPool()
  64. Es = &elastic.Elastic{
  65. S_esurl: config.Conf.DB.Es.Addr,
  66. I_size: config.Conf.DB.Es.Size,
  67. }
  68. Es.InitElasticSize()
  69. }
  70. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  71. defer util.Catch()
  72. switch act {
  73. case udp.OP_TYPE_DATA: //上个节点的数据
  74. var mapInfo map[string]interface{}
  75. err := json.Unmarshal(data, &mapInfo)
  76. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  77. gtid, _ := mapInfo["gtid"].(string)
  78. lteid, _ := mapInfo["lteid"].(string)
  79. if err != nil || gtid == "" || lteid == "" {
  80. UdpClient.WriteUdp([]byte("cgyx udp error"), udp.OP_NOOP, ra) //udp失败回写
  81. } else {
  82. //udp成功回写
  83. if k := util.ObjToString(mapInfo["key"]); k != "" {
  84. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  85. } else {
  86. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  87. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  88. }
  89. log.Info("start dispose ...")
  90. getIntention(gtid, lteid, mapInfo)
  91. }
  92. case udp.OP_NOOP: //下个节点回应
  93. ok := string(data)
  94. if ok != "" {
  95. log.Info("udp re", zap.String("data:", ok))
  96. UdpTaskMap.Delete(ok)
  97. }
  98. }
  99. }
  100. func NextNode(mapInfo map[string]interface{}) {
  101. var next = &net.UDPAddr{
  102. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  103. Port: util.IntAll(config.Conf.Udp.Next.Port),
  104. }
  105. mapInfo["stype"] = config.Conf.Udp.Next.Stype
  106. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
  107. mapInfo["key"] = key
  108. log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
  109. datas, _ := json.Marshal(mapInfo)
  110. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  111. UdpTaskMap.Store(key, node)
  112. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  113. }
  114. func checkMapJob() {
  115. if config.Conf.Mail.Send {
  116. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  117. for {
  118. UdpTaskMap.Range(func(k, v interface{}) bool {
  119. now := time.Now().Unix()
  120. node, _ := v.(*UdpNode)
  121. if now-node.timestamp > 120 {
  122. node.retry++
  123. if node.retry > 5 {
  124. UdpTaskMap.Delete(k)
  125. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-py-dispose-send-fail", k.(string)))
  126. if err == nil {
  127. defer res.Body.Close()
  128. read, err := ioutil.ReadAll(res.Body)
  129. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  130. }
  131. } else {
  132. log.Info("udp重发", zap.Any("k:", k))
  133. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  134. }
  135. } else if now-node.timestamp > 10 {
  136. log.Info("udp任务超时中..", zap.Any("k:", k))
  137. }
  138. return true
  139. })
  140. time.Sleep(60 * time.Second)
  141. }
  142. }
  143. }
  144. func updateMethod() {
  145. log.Info("updateMethod 保存...")
  146. arru := make([][]map[string]interface{}, 500)
  147. indexu := 0
  148. for {
  149. select {
  150. case v := <-updatePool:
  151. arru[indexu] = v
  152. indexu++
  153. if indexu == 500 {
  154. updateSp <- true
  155. go func(arru [][]map[string]interface{}) {
  156. defer func() {
  157. <-updateSp
  158. }()
  159. MgoB.UpdateBulk("bidding", arru...)
  160. }(arru)
  161. arru = make([][]map[string]interface{}, 500)
  162. indexu = 0
  163. }
  164. case <-time.After(1000 * time.Millisecond):
  165. if indexu > 0 {
  166. updateSp <- true
  167. go func(arru [][]map[string]interface{}) {
  168. defer func() {
  169. <-updateSp
  170. }()
  171. MgoB.UpdateBulk("bidding", arru...)
  172. }(arru[:indexu])
  173. arru = make([][]map[string]interface{}, 500)
  174. indexu = 0
  175. }
  176. }
  177. }
  178. }
  179. func updateEsMethod() {
  180. arru := make([][]map[string]interface{}, 200)
  181. indexu := 0
  182. for {
  183. select {
  184. case v := <-updateEsPool:
  185. arru[indexu] = v
  186. indexu++
  187. if indexu == 200 {
  188. updateEsSp <- true
  189. go func(arru [][]map[string]interface{}) {
  190. defer func() {
  191. <-updateEsSp
  192. }()
  193. Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
  194. }(arru)
  195. arru = make([][]map[string]interface{}, 200)
  196. indexu = 0
  197. }
  198. case <-time.After(1000 * time.Millisecond):
  199. if indexu > 0 {
  200. updateEsSp <- true
  201. go func(arru [][]map[string]interface{}) {
  202. defer func() {
  203. <-updateEsSp
  204. }()
  205. Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
  206. }(arru[:indexu])
  207. arru = make([][]map[string]interface{}, 200)
  208. indexu = 0
  209. }
  210. }
  211. }
  212. }