main.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/discov"
  6. "github.com/zeromicro/go-zero/zrpc"
  7. "jy_publishing/Logger"
  8. ms "jy_publishing/megaloscope"
  9. nsq "jy_publishing/nsq"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  14. "net"
  15. "strings"
  16. )
  17. var (
  18. Sysconfig map[string]interface{}
  19. MgoBid, MgoExt *mongodb.MongodbSim
  20. BidColl string
  21. ExtColl, ExtColl1 string
  22. FileTopicResult string
  23. Es *elastic.Elastic
  24. Index, IndexAll string
  25. Itype string
  26. JyRpcClient zrpc.Client
  27. ClientAddr string
  28. MCJy, MCAtts *nsq.Consumer
  29. MProducer *nsq.Producer
  30. Ms *ms.Megaloscope //敏感词
  31. UdpClient udp.UdpClient //udp对象
  32. )
  33. func init() {
  34. Logger.InitLogger("./log/All.log", "debug")
  35. util.ReadConfig(&Sysconfig)
  36. bidding := Sysconfig["bidding"].(map[string]interface{})
  37. BidColl = bidding["dbColl"].(string)
  38. MgoBid = &mongodb.MongodbSim{
  39. MongodbAddr: bidding["addr"].(string),
  40. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  41. DbName: bidding["dbName"].(string),
  42. UserName: bidding["uname"].(string),
  43. Password: bidding["upwd"].(string),
  44. }
  45. MgoBid.InitPool()
  46. extract := Sysconfig["extract"].(map[string]interface{})
  47. ExtColl = extract["dbColl"].(string)
  48. ExtColl1 = extract["dbColl1"].(string)
  49. MgoExt = &mongodb.MongodbSim{
  50. MongodbAddr: extract["addr"].(string),
  51. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  52. DbName: extract["dbName"].(string),
  53. }
  54. MgoExt.InitPool()
  55. es := Sysconfig["es"].(map[string]interface{})
  56. Es = &elastic.Elastic{
  57. S_esurl: util.ObjToString(es["addr"]),
  58. I_size: util.IntAllDef(es["pool"], 10),
  59. }
  60. Index = util.ObjToString(es["index"])
  61. IndexAll = util.ObjToString(es["index_all"])
  62. Itype = util.ObjToString(es["itype"])
  63. Es.InitElasticSize()
  64. //加载敏感词文件
  65. Ms = ms.NewMegaloscope("./rules.txt")
  66. InitOss()
  67. initEtcd()
  68. initUdp()
  69. }
  70. func initEtcd() {
  71. jyRpc := Sysconfig["jy_rpc"].(map[string]interface{})
  72. Logger.Debug("etcd 注册rpc服务, " + util.ObjToString(jyRpc["key"]))
  73. JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{
  74. Etcd: discov.EtcdConf{
  75. Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","),
  76. Key: util.ObjToString(jyRpc["key"]),
  77. },
  78. })
  79. }
  80. func initUdp() {
  81. updport := Sysconfig["udpPort"].(string)
  82. UdpClient = udp.UdpClient{Local: updport, BufSize: 1024}
  83. Logger.Info("Udp 监听 port: " + updport)
  84. UdpClient.Listen(processUdpMsg)
  85. }
  86. func main() {
  87. go jyNsqMethod()
  88. go attsNsqMethod()
  89. c := make(chan bool, 1)
  90. <-c
  91. }
  92. // @Description 剑鱼消息队列 按照类型处理消息
  93. // @Author J 2022/4/14 11:42 AM
  94. func jyNsqMethod() {
  95. cof := Sysconfig["nsq_jy"].(map[string]interface{})
  96. var err error
  97. MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
  98. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  99. Addr: util.ObjToString(cof["addr"]),
  100. ConnectType: 1, //默认连接nsqd
  101. Topic: util.ObjToString(cof["topic"]),
  102. Channel: util.ObjToString(cof["channel"]),
  103. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  104. })
  105. if err != nil {
  106. Logger.Error(err.Error())
  107. }
  108. for {
  109. select {
  110. case obj := <-MCJy.Ch: //从通道读取即可
  111. Logger.Info("jy nsq: " + fmt.Sprint(obj))
  112. taskInfo(obj)
  113. }
  114. }
  115. }
  116. // @Description 附件处理完成的数据
  117. // @Author J 2022/4/14 1:18 PM
  118. func attsNsqMethod() {
  119. var err error
  120. cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
  121. MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
  122. if err != nil {
  123. Logger.Error(err.Error())
  124. }
  125. FileTopicResult = util.ObjToString(cofAtts["topic-result"])
  126. MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
  127. IsJsonEncode: true,
  128. Addr: util.ObjToString(cofAtts["addr_c"]),
  129. ConnectType: 1, //默认连接nsqd
  130. Topic: FileTopicResult,
  131. Channel: util.ObjToString(cofAtts["channel"]),
  132. Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
  133. })
  134. if err != nil {
  135. Logger.Error(err.Error())
  136. }
  137. for {
  138. select {
  139. case obj := <-MCAtts.Ch:
  140. Logger.Info("file extract receive nsq: " + fmt.Sprint(obj))
  141. taskAtts(obj.(map[string]interface{}))
  142. }
  143. }
  144. }
  145. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  146. switch act {
  147. case udp.OP_TYPE_DATA: //测试接收
  148. var resp map[string]interface{}
  149. err := json.Unmarshal(data, &resp)
  150. Logger.Info("udp receive...", Logger.Field("data", resp))
  151. if err != nil {
  152. //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
  153. } else if resp != nil {
  154. key, _ := resp["key"].(string)
  155. if key == "" {
  156. key = "udpok"
  157. }
  158. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  159. tasktype, _ := resp["stype"].(string)
  160. switch tasktype {
  161. case "jyfb_data_over":
  162. go func() {
  163. JyRpcDataFin(resp["infoid"].(string))
  164. }()
  165. }
  166. }
  167. case udp.OP_NOOP: //下个节点回应
  168. Logger.Info("接收回应:", Logger.Field("data", string(data)))
  169. }
  170. }