main.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package main
  2. import (
  3. jyes "app.yhyue.com/moapp/jybase/es"
  4. "encoding/json"
  5. "fmt"
  6. "go.uber.org/zap"
  7. . "jy_publishing/Logger"
  8. ms "jy_publishing/megaloscope"
  9. nsq "jy_publishing/nsq"
  10. "jy_publishing/tool"
  11. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  15. "log"
  16. "net"
  17. )
  18. var (
  19. Itype string
  20. MCJy, MCAtts *nsq.Consumer
  21. )
  22. func init() {
  23. InitLogger("./log/All.log", "debug")
  24. util.ReadConfig(&tool.Sysconfig)
  25. //信息类型
  26. if tool.Sysconfig["infoCode"] != nil {
  27. b, err := json.Marshal(tool.Sysconfig["infoCode"])
  28. if err == nil {
  29. err = json.Unmarshal(b, &tool.InfoCodes)
  30. }
  31. if err != nil {
  32. log.Println("infoCode init err :", err)
  33. }
  34. }
  35. bidding := tool.Sysconfig["bidding"].(map[string]interface{})
  36. tool.BidColl = bidding["dbColl"].(string)
  37. tool.MgoBid = &mongodb.MongodbSim{
  38. MongodbAddr: bidding["addr"].(string),
  39. Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5),
  40. DbName: bidding["dbName"].(string),
  41. UserName: bidding["uname"].(string),
  42. Password: bidding["upwd"].(string),
  43. }
  44. tool.MgoBid.InitPool()
  45. extract := tool.Sysconfig["extract"].(map[string]interface{})
  46. tool.ExtColl = extract["dbColl"].(string)
  47. tool.ExtColl1 = extract["dbColl1"].(string)
  48. tool.MgoExt = &mongodb.MongodbSim{
  49. MongodbAddr: extract["addr"].(string),
  50. Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5),
  51. DbName: extract["dbName"].(string),
  52. }
  53. tool.MgoExt.InitPool()
  54. es := tool.Sysconfig["es"].(map[string]interface{})
  55. if util.ObjToString(es["addr"]) != "" {
  56. tool.Es = &elastic.Elastic{
  57. S_esurl: util.ObjToString(es["addr"]),
  58. I_size: util.IntAllDef(es["pool"], 10),
  59. Username: util.ObjToString(es["user"]),
  60. Password: util.ObjToString(es["password"]),
  61. }
  62. tool.Index = util.ObjToString(es["index"])
  63. tool.IndexAll = util.ObjToString(es["index_all"])
  64. Itype = util.ObjToString(es["itype"])
  65. tool.Es.InitElasticSize()
  66. log.Println("初始化 elasticsearch")
  67. tool.Elastic = &jyes.EsV7{
  68. Address: util.ObjToString(es["addr"]),
  69. UserName: util.ObjToString(es["user"]),
  70. Password: util.ObjToString(es["password"]),
  71. Size: util.IntAllDef(es["pool"], 10),
  72. }
  73. tool.Elastic.Init()
  74. }
  75. //加载敏感词文件
  76. tool.Ms = ms.NewMegaloscope("./rules.txt")
  77. tool.InitOss()
  78. tool.InitEtcd()
  79. initUdp()
  80. }
  81. func initUdp() {
  82. updport := tool.Sysconfig["udpPort"].(string)
  83. tool.UdpClient = udp.UdpClient{Local: updport, BufSize: 1024}
  84. Logger.Info("Udp 监听 port: " + updport)
  85. tool.UdpClient.Listen(processUdpMsg)
  86. }
  87. func main() {
  88. go jyNsqMethod()
  89. go attsNsqMethod()
  90. c := make(chan bool, 1)
  91. <-c
  92. }
  93. // @Description 剑鱼消息队列 按照类型处理消息
  94. // @Author J 2022/4/14 11:42 AM
  95. func jyNsqMethod() {
  96. cof := tool.Sysconfig["nsq_jy"].(map[string]interface{})
  97. var err error
  98. MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
  99. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  100. Addr: util.ObjToString(cof["addr"]),
  101. ConnectType: 1, //默认连接nsqd
  102. Topic: util.ObjToString(cof["topic"]),
  103. Channel: util.ObjToString(cof["channel"]),
  104. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  105. })
  106. if err != nil {
  107. Logger.Error(err.Error())
  108. }
  109. for {
  110. select {
  111. case obj := <-MCJy.Ch: //从通道读取即可
  112. Logger.Info("jy nsq: " + fmt.Sprint(obj))
  113. tool.TaskInfo(obj)
  114. }
  115. }
  116. }
  117. // @Description 附件处理完成的数据
  118. // @Author J 2022/4/14 1:18 PM
  119. func attsNsqMethod() {
  120. var err error
  121. cofAtts := tool.Sysconfig["nsq_attachment"].(map[string]interface{})
  122. tool.MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
  123. if err != nil {
  124. Logger.Error(err.Error())
  125. }
  126. tool.FileTopicResult = util.ObjToString(cofAtts["topic-result"])
  127. MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
  128. IsJsonEncode: true,
  129. Addr: util.ObjToString(cofAtts["addr_c"]),
  130. ConnectType: 1, //默认连接nsqd
  131. Topic: tool.FileTopicResult,
  132. Channel: util.ObjToString(cofAtts["channel"]),
  133. Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
  134. })
  135. if err != nil {
  136. Logger.Error(err.Error())
  137. }
  138. for {
  139. select {
  140. case obj := <-MCAtts.Ch:
  141. Logger.Info("file extract receive nsq: " + fmt.Sprint(obj))
  142. tool.TaskAtts(obj.(map[string]interface{}))
  143. }
  144. }
  145. }
  146. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  147. switch act {
  148. case udp.OP_TYPE_DATA: //测试接收
  149. var resp map[string]interface{}
  150. err := json.Unmarshal(data, &resp)
  151. Logger.Info("udp receive...", zap.Any("data", resp))
  152. if err != nil {
  153. //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
  154. } else if resp != nil {
  155. key, _ := resp["key"].(string)
  156. if key == "" {
  157. key = "udpok"
  158. }
  159. go tool.UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  160. tasktype, _ := resp["stype"].(string)
  161. switch tasktype {
  162. case "jyfb_data_over":
  163. go func() {
  164. tool.JyRpcDataFin(resp["infoid"].(string))
  165. }()
  166. }
  167. }
  168. case udp.OP_NOOP: //下个节点回应
  169. Logger.Info("接收回应:", zap.Any("data", string(data)))
  170. }
  171. }