main.go 6.6 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "field-dispose/config"
  5. "fmt"
  6. "go.uber.org/zap"
  7. "io/ioutil"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  13. "net"
  14. "net/http"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. MgoB *mongodb.MongodbSim
  20. Es *elastic.Elastic
  21. UdpClient udp.UdpClient
  22. UdpTaskMap = &sync.Map{}
  23. updatePool chan []map[string]interface{}
  24. updateSp chan bool
  25. updateEsPool chan []map[string]interface{}
  26. updateEsSp chan bool
  27. UdpChan = make(chan map[string]interface{}, 500)
  28. SingleThread = make(chan bool, 1)
  29. Skipping = false //rpc重试跳过
  30. )
  31. func init() {
  32. config.Init("./common.toml")
  33. InitLog()
  34. InitMgo()
  35. updatePool = make(chan []map[string]interface{}, 5000)
  36. updateSp = make(chan bool, 5)
  37. updateEsPool = make(chan []map[string]interface{}, 5000)
  38. updateEsSp = make(chan bool, 2)
  39. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  40. UdpClient.Listen(processUdpMsg)
  41. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  42. }
  43. type UdpNode struct {
  44. data []byte
  45. addr *net.UDPAddr
  46. timestamp int64
  47. retry int
  48. }
  49. func main() {
  50. go SaveErrorInfo() //保存异常信息
  51. //go CheckErrorNum()
  52. //go updateEsMethod()
  53. go checkMapJob()
  54. go updateMethod()
  55. for {
  56. mapinfo, ok := <-UdpChan
  57. if !ok {
  58. continue
  59. }
  60. SingleThread <- true
  61. go func(m map[string]interface{}) {
  62. defer func() {
  63. <-SingleThread
  64. }()
  65. log.Info("start dispose ...", zap.Any("key", mapinfo["key"]))
  66. getIntention(m)
  67. }(mapinfo)
  68. }
  69. }
  70. func InitMgo() {
  71. MgoB = &mongodb.MongodbSim{
  72. MongodbAddr: config.Conf.DB.Mongo.Addr,
  73. DbName: config.Conf.DB.Mongo.Dbname,
  74. Size: config.Conf.DB.Mongo.Size,
  75. UserName: config.Conf.DB.Mongo.User,
  76. Password: config.Conf.DB.Mongo.Password,
  77. }
  78. MgoB.InitPool()
  79. Es = &elastic.Elastic{
  80. S_esurl: config.Conf.DB.Es.Addr,
  81. I_size: config.Conf.DB.Es.Size,
  82. }
  83. Es.InitElasticSize()
  84. }
  85. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  86. defer util.Catch()
  87. switch act {
  88. case udp.OP_TYPE_DATA: //上个节点的数据
  89. var mapInfo map[string]interface{}
  90. err := json.Unmarshal(data, &mapInfo)
  91. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  92. if err != nil {
  93. UdpClient.WriteUdp([]byte("error: "+err.Error()), udp.OP_NOOP, ra)
  94. } else {
  95. stype := util.ObjToString(mapInfo["stype"])
  96. switch stype {
  97. case "jqcl":
  98. gtid, _ := mapInfo["gtid"].(string)
  99. lteid, _ := mapInfo["lteid"].(string)
  100. //udp成功回写
  101. if k := util.ObjToString(mapInfo["key"]); k != "" {
  102. go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  103. } else {
  104. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  105. go UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  106. }
  107. UdpChan <- mapInfo
  108. case "tout-true":
  109. Skipping = true
  110. go UdpClient.WriteUdp([]byte(fmt.Sprintf("Skipping:%s", "true")), udp.OP_NOOP, ra)
  111. case "tout-false":
  112. Skipping = false
  113. go UdpClient.WriteUdp([]byte(fmt.Sprintf("Skipping:%s", "false")), udp.OP_NOOP, ra)
  114. case "monitor":
  115. log.Info("monitor", zap.Any("mapInfo:", mapInfo))
  116. go UdpClient.WriteUdp([]byte(util.ObjToString(mapInfo["key"])), udp.OP_NOOP, ra)
  117. }
  118. }
  119. case udp.OP_NOOP: //下个节点回应
  120. ok := string(data)
  121. if ok != "" {
  122. log.Info("udp re", zap.String("data:", ok))
  123. UdpTaskMap.Delete(ok)
  124. }
  125. }
  126. }
  127. func NextNode(mapInfo map[string]interface{}) {
  128. var next = &net.UDPAddr{
  129. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  130. Port: util.IntAll(config.Conf.Udp.Next.Port),
  131. }
  132. mapInfo["stype"] = config.Conf.Udp.Next.Stype
  133. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
  134. mapInfo["key"] = key
  135. log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
  136. datas, _ := json.Marshal(mapInfo)
  137. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  138. UdpTaskMap.Store(key, node)
  139. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  140. }
  141. func checkMapJob() {
  142. if config.Conf.Mail.Send {
  143. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  144. for {
  145. UdpTaskMap.Range(func(k, v interface{}) bool {
  146. now := time.Now().Unix()
  147. node, _ := v.(*UdpNode)
  148. if now-node.timestamp > 120 {
  149. node.retry++
  150. if node.retry > 5 {
  151. UdpTaskMap.Delete(k)
  152. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "python字段识别-send-fail", k.(string)))
  153. if err == nil {
  154. defer res.Body.Close()
  155. read, err := ioutil.ReadAll(res.Body)
  156. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  157. }
  158. } else {
  159. log.Info("udp重发", zap.Any("k:", k))
  160. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  161. }
  162. } else if now-node.timestamp > 10 {
  163. log.Info("udp任务超时中..", zap.Any("k:", k))
  164. }
  165. return true
  166. })
  167. time.Sleep(60 * time.Second)
  168. }
  169. }
  170. }
  171. func updateMethod() {
  172. log.Info("updateMethod 保存...")
  173. arru := make([][]map[string]interface{}, 500)
  174. indexu := 0
  175. for {
  176. select {
  177. case v := <-updatePool:
  178. arru[indexu] = v
  179. indexu++
  180. if indexu == 500 {
  181. updateSp <- true
  182. go func(arru [][]map[string]interface{}) {
  183. defer func() {
  184. <-updateSp
  185. }()
  186. MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  187. }(arru)
  188. arru = make([][]map[string]interface{}, 500)
  189. indexu = 0
  190. }
  191. case <-time.After(1000 * time.Millisecond):
  192. if indexu > 0 {
  193. updateSp <- true
  194. go func(arru [][]map[string]interface{}) {
  195. defer func() {
  196. <-updateSp
  197. }()
  198. MgoB.UpdateBulk(config.Conf.DB.Mongo.Coll, arru...)
  199. }(arru[:indexu])
  200. arru = make([][]map[string]interface{}, 500)
  201. indexu = 0
  202. }
  203. }
  204. }
  205. }
  206. func updateEsMethod() {
  207. arru := make([][]map[string]interface{}, 200)
  208. indexu := 0
  209. for {
  210. select {
  211. case v := <-updateEsPool:
  212. arru[indexu] = v
  213. indexu++
  214. if indexu == 200 {
  215. updateEsSp <- true
  216. go func(arru [][]map[string]interface{}) {
  217. defer func() {
  218. <-updateEsSp
  219. }()
  220. Es.UpdateBulk(config.Conf.DB.Es.IndexS, arru...)
  221. }(arru)
  222. arru = make([][]map[string]interface{}, 200)
  223. indexu = 0
  224. }
  225. case <-time.After(1000 * time.Millisecond):
  226. if indexu > 0 {
  227. updateEsSp <- true
  228. go func(arru [][]map[string]interface{}) {
  229. defer func() {
  230. <-updateEsSp
  231. }()
  232. Es.UpdateBulk(config.Conf.DB.Es.IndexS, arru...)
  233. }(arru[:indexu])
  234. arru = make([][]map[string]interface{}, 200)
  235. indexu = 0
  236. }
  237. }
  238. }
  239. }