main.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "field-dispose/config"
  10. "fmt"
  11. "go.uber.org/zap"
  12. "io/ioutil"
  13. "net"
  14. "net/http"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MgoB *mongodb.MongodbSim
  20. Es *elastic.Elastic
  21. UdpClient udp.UdpClient
  22. UdpTaskMap = &sync.Map{}
  23. updatePool chan []map[string]interface{}
  24. updateSp chan bool
  25. updateEsPool chan []map[string]interface{}
  26. updateEsSp chan bool
  27. UdpChan = make(chan map[string]interface{}, 500)
  28. SingleThread = make(chan bool, 1)
  29. )
  30. func init() {
  31. config.Init("./common.toml")
  32. InitLog()
  33. InitMgo()
  34. updatePool = make(chan []map[string]interface{}, 5000)
  35. updateSp = make(chan bool, 5)
  36. updateEsPool = make(chan []map[string]interface{}, 5000)
  37. updateEsSp = make(chan bool, 2)
  38. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  39. UdpClient.Listen(processUdpMsg)
  40. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  41. }
  42. type UdpNode struct {
  43. data []byte
  44. addr *net.UDPAddr
  45. timestamp int64
  46. retry int
  47. }
  48. func main() {
  49. go SaveErrorInfo() //保存异常信息
  50. //go CheckErrorNum()
  51. //go updateEsMethod()
  52. go checkMapJob()
  53. go updateMethod()
  54. for {
  55. mapinfo, ok := <-UdpChan
  56. if !ok {
  57. continue
  58. }
  59. SingleThread <- true
  60. go func(m map[string]interface{}) {
  61. defer func() {
  62. <-SingleThread
  63. }()
  64. log.Info("start dispose ...", zap.Any("key", mapinfo["key"]))
  65. getIntention(m)
  66. }(mapinfo)
  67. }
  68. }
  69. func InitMgo() {
  70. MgoB = &mongodb.MongodbSim{
  71. MongodbAddr: config.Conf.DB.Mongo.Addr,
  72. DbName: config.Conf.DB.Mongo.Dbname,
  73. Size: config.Conf.DB.Mongo.Size,
  74. UserName: config.Conf.DB.Mongo.User,
  75. Password: config.Conf.DB.Mongo.Password,
  76. }
  77. MgoB.InitPool()
  78. Es = &elastic.Elastic{
  79. S_esurl: config.Conf.DB.Es.Addr,
  80. I_size: config.Conf.DB.Es.Size,
  81. }
  82. Es.InitElasticSize()
  83. }
  84. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  85. defer util.Catch()
  86. switch act {
  87. case udp.OP_TYPE_DATA: //上个节点的数据
  88. var mapInfo map[string]interface{}
  89. err := json.Unmarshal(data, &mapInfo)
  90. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  91. gtid, _ := mapInfo["gtid"].(string)
  92. lteid, _ := mapInfo["lteid"].(string)
  93. if err != nil || gtid == "" || lteid == "" {
  94. UdpClient.WriteUdp([]byte("cgyx udp error"), udp.OP_NOOP, ra) //udp失败回写
  95. } else {
  96. //udp成功回写
  97. if k := util.ObjToString(mapInfo["key"]); k != "" {
  98. go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  99. } else {
  100. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  101. go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  102. }
  103. UdpChan <- mapInfo
  104. }
  105. case udp.OP_NOOP: //下个节点回应
  106. ok := string(data)
  107. if ok != "" {
  108. log.Info("udp re", zap.String("data:", ok))
  109. UdpTaskMap.Delete(ok)
  110. }
  111. }
  112. }
  113. func NextNode(mapInfo map[string]interface{}) {
  114. var next = &net.UDPAddr{
  115. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  116. Port: util.IntAll(config.Conf.Udp.Next.Port),
  117. }
  118. mapInfo["stype"] = config.Conf.Udp.Next.Stype
  119. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
  120. mapInfo["key"] = key
  121. log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
  122. datas, _ := json.Marshal(mapInfo)
  123. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  124. UdpTaskMap.Store(key, node)
  125. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  126. }
  127. func checkMapJob() {
  128. if config.Conf.Mail.Send {
  129. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  130. for {
  131. UdpTaskMap.Range(func(k, v interface{}) bool {
  132. now := time.Now().Unix()
  133. node, _ := v.(*UdpNode)
  134. if now-node.timestamp > 120 {
  135. node.retry++
  136. if node.retry > 5 {
  137. UdpTaskMap.Delete(k)
  138. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-py-dispose-send-fail", k.(string)))
  139. if err == nil {
  140. defer res.Body.Close()
  141. read, err := ioutil.ReadAll(res.Body)
  142. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  143. }
  144. } else {
  145. log.Info("udp重发", zap.Any("k:", k))
  146. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  147. }
  148. } else if now-node.timestamp > 10 {
  149. log.Info("udp任务超时中..", zap.Any("k:", k))
  150. }
  151. return true
  152. })
  153. time.Sleep(60 * time.Second)
  154. }
  155. }
  156. }
  157. func updateMethod() {
  158. log.Info("updateMethod 保存...")
  159. arru := make([][]map[string]interface{}, 500)
  160. indexu := 0
  161. for {
  162. select {
  163. case v := <-updatePool:
  164. arru[indexu] = v
  165. indexu++
  166. if indexu == 500 {
  167. updateSp <- true
  168. go func(arru [][]map[string]interface{}) {
  169. defer func() {
  170. <-updateSp
  171. }()
  172. MgoB.UpdateBulk("bidding", arru...)
  173. }(arru)
  174. arru = make([][]map[string]interface{}, 500)
  175. indexu = 0
  176. }
  177. case <-time.After(1000 * time.Millisecond):
  178. if indexu > 0 {
  179. updateSp <- true
  180. go func(arru [][]map[string]interface{}) {
  181. defer func() {
  182. <-updateSp
  183. }()
  184. MgoB.UpdateBulk("bidding", arru...)
  185. }(arru[:indexu])
  186. arru = make([][]map[string]interface{}, 500)
  187. indexu = 0
  188. }
  189. }
  190. }
  191. }
  192. func updateEsMethod() {
  193. arru := make([][]map[string]interface{}, 200)
  194. indexu := 0
  195. for {
  196. select {
  197. case v := <-updateEsPool:
  198. arru[indexu] = v
  199. indexu++
  200. if indexu == 200 {
  201. updateEsSp <- true
  202. go func(arru [][]map[string]interface{}) {
  203. defer func() {
  204. <-updateEsSp
  205. }()
  206. Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
  207. }(arru)
  208. arru = make([][]map[string]interface{}, 200)
  209. indexu = 0
  210. }
  211. case <-time.After(1000 * time.Millisecond):
  212. if indexu > 0 {
  213. updateEsSp <- true
  214. go func(arru [][]map[string]interface{}) {
  215. defer func() {
  216. <-updateEsSp
  217. }()
  218. Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
  219. }(arru[:indexu])
  220. arru = make([][]map[string]interface{}, 200)
  221. indexu = 0
  222. }
  223. }
  224. }
  225. }