main.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "field-dispose/config"
  10. "fmt"
  11. "go.uber.org/zap"
  12. "io/ioutil"
  13. "net"
  14. "net/http"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MgoB *mongodb.MongodbSim
  20. Es *elastic.Elastic
  21. UdpClient udp.UdpClient
  22. UdpTaskMap = &sync.Map{}
  23. updatePool chan []map[string]interface{}
  24. updateSp chan bool
  25. updateEsPool chan []map[string]interface{}
  26. updateEsSp chan bool
  27. UdpChan = make(chan map[string]interface{}, 500)
  28. SingleThread = make(chan bool, 1)
  29. Skipping = false //rpc重试跳过
  30. )
  31. func init() {
  32. config.Init("./common.toml")
  33. InitLog()
  34. InitMgo()
  35. updatePool = make(chan []map[string]interface{}, 5000)
  36. updateSp = make(chan bool, 5)
  37. updateEsPool = make(chan []map[string]interface{}, 5000)
  38. updateEsSp = make(chan bool, 2)
  39. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  40. UdpClient.Listen(processUdpMsg)
  41. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  42. }
  43. type UdpNode struct {
  44. data []byte
  45. addr *net.UDPAddr
  46. timestamp int64
  47. retry int
  48. }
  49. func main() {
  50. go SaveErrorInfo() //保存异常信息
  51. //go CheckErrorNum()
  52. //go updateEsMethod()
  53. go checkMapJob()
  54. go updateMethod()
  55. for {
  56. mapinfo, ok := <-UdpChan
  57. if !ok {
  58. continue
  59. }
  60. SingleThread <- true
  61. go func(m map[string]interface{}) {
  62. defer func() {
  63. <-SingleThread
  64. }()
  65. log.Info("start dispose ...", zap.Any("key", mapinfo["key"]))
  66. getIntention(m)
  67. }(mapinfo)
  68. }
  69. }
  70. func InitMgo() {
  71. MgoB = &mongodb.MongodbSim{
  72. MongodbAddr: config.Conf.DB.Mongo.Addr,
  73. DbName: config.Conf.DB.Mongo.Dbname,
  74. Size: config.Conf.DB.Mongo.Size,
  75. UserName: config.Conf.DB.Mongo.User,
  76. Password: config.Conf.DB.Mongo.Password,
  77. }
  78. MgoB.InitPool()
  79. Es = &elastic.Elastic{
  80. S_esurl: config.Conf.DB.Es.Addr,
  81. I_size: config.Conf.DB.Es.Size,
  82. }
  83. Es.InitElasticSize()
  84. }
  85. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  86. defer util.Catch()
  87. switch act {
  88. case udp.OP_TYPE_DATA: //上个节点的数据
  89. var mapInfo map[string]interface{}
  90. err := json.Unmarshal(data, &mapInfo)
  91. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  92. if err != nil {
  93. UdpClient.WriteUdp([]byte("error: "+err.Error()), udp.OP_NOOP, ra)
  94. } else {
  95. stype := util.ObjToString(mapInfo["stype"])
  96. switch stype {
  97. case "jqcl":
  98. gtid, _ := mapInfo["gtid"].(string)
  99. lteid, _ := mapInfo["lteid"].(string)
  100. //udp成功回写
  101. if k := util.ObjToString(mapInfo["key"]); k != "" {
  102. go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  103. } else {
  104. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  105. go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  106. }
  107. UdpChan <- mapInfo
  108. case "tout-true":
  109. Skipping = true
  110. case "tout-false":
  111. Skipping = false
  112. }
  113. }
  114. case udp.OP_NOOP: //下个节点回应
  115. ok := string(data)
  116. if ok != "" {
  117. log.Info("udp re", zap.String("data:", ok))
  118. UdpTaskMap.Delete(ok)
  119. }
  120. }
  121. }
  122. func NextNode(mapInfo map[string]interface{}) {
  123. var next = &net.UDPAddr{
  124. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  125. Port: util.IntAll(config.Conf.Udp.Next.Port),
  126. }
  127. mapInfo["stype"] = config.Conf.Udp.Next.Stype
  128. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
  129. mapInfo["key"] = key
  130. log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
  131. datas, _ := json.Marshal(mapInfo)
  132. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  133. UdpTaskMap.Store(key, node)
  134. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  135. }
  136. func checkMapJob() {
  137. if config.Conf.Mail.Send {
  138. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  139. for {
  140. UdpTaskMap.Range(func(k, v interface{}) bool {
  141. now := time.Now().Unix()
  142. node, _ := v.(*UdpNode)
  143. if now-node.timestamp > 120 {
  144. node.retry++
  145. if node.retry > 5 {
  146. UdpTaskMap.Delete(k)
  147. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "python字段识别-send-fail", k.(string)))
  148. if err == nil {
  149. defer res.Body.Close()
  150. read, err := ioutil.ReadAll(res.Body)
  151. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  152. }
  153. } else {
  154. log.Info("udp重发", zap.Any("k:", k))
  155. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  156. }
  157. } else if now-node.timestamp > 10 {
  158. log.Info("udp任务超时中..", zap.Any("k:", k))
  159. }
  160. return true
  161. })
  162. time.Sleep(60 * time.Second)
  163. }
  164. }
  165. }
  166. func updateMethod() {
  167. log.Info("updateMethod 保存...")
  168. arru := make([][]map[string]interface{}, 500)
  169. indexu := 0
  170. for {
  171. select {
  172. case v := <-updatePool:
  173. arru[indexu] = v
  174. indexu++
  175. if indexu == 500 {
  176. updateSp <- true
  177. go func(arru [][]map[string]interface{}) {
  178. defer func() {
  179. <-updateSp
  180. }()
  181. MgoB.UpdateBulk("bidding", arru...)
  182. }(arru)
  183. arru = make([][]map[string]interface{}, 500)
  184. indexu = 0
  185. }
  186. case <-time.After(1000 * time.Millisecond):
  187. if indexu > 0 {
  188. updateSp <- true
  189. go func(arru [][]map[string]interface{}) {
  190. defer func() {
  191. <-updateSp
  192. }()
  193. MgoB.UpdateBulk("bidding", arru...)
  194. }(arru[:indexu])
  195. arru = make([][]map[string]interface{}, 500)
  196. indexu = 0
  197. }
  198. }
  199. }
  200. }
  201. func updateEsMethod() {
  202. arru := make([][]map[string]interface{}, 200)
  203. indexu := 0
  204. for {
  205. select {
  206. case v := <-updateEsPool:
  207. arru[indexu] = v
  208. indexu++
  209. if indexu == 200 {
  210. updateEsSp <- true
  211. go func(arru [][]map[string]interface{}) {
  212. defer func() {
  213. <-updateEsSp
  214. }()
  215. Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
  216. }(arru)
  217. arru = make([][]map[string]interface{}, 200)
  218. indexu = 0
  219. }
  220. case <-time.After(1000 * time.Millisecond):
  221. if indexu > 0 {
  222. updateEsSp <- true
  223. go func(arru [][]map[string]interface{}) {
  224. defer func() {
  225. <-updateEsSp
  226. }()
  227. Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
  228. }(arru[:indexu])
  229. arru = make([][]map[string]interface{}, 200)
  230. indexu = 0
  231. }
  232. }
  233. }
  234. }