main.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/discov"
  6. "github.com/zeromicro/go-zero/zrpc"
  7. ms "jy_publishing/megaloscope"
  8. nsq "jy_publishing/nsq"
  9. "net"
  10. "strings"
  11. "utils"
  12. "utils/elastic"
  13. . "utils/log"
  14. "utils/mongodb"
  15. )
  16. var (
  17. Sysconfig map[string]interface{}
  18. MgoBid, MgoExt *mongodb.MongodbSim
  19. BidColl string
  20. ExtColl, ExtColl1 string
  21. FileTopicResult string
  22. Es *elastic.Elastic
  23. Index, IndexAll string
  24. Itype string
  25. JyRpcClient zrpc.Client
  26. ClientAddr string
  27. MCJy, MCAtts *nsq.Consumer
  28. MProducer *nsq.Producer
  29. Ms *ms.Megaloscope //敏感词
  30. UdpClient util.UdpClient //udp对象
  31. )
  32. func init() {
  33. InitLogger("./log/All.log", "debug")
  34. util.ReadConfig(&Sysconfig)
  35. bidding := Sysconfig["bidding"].(map[string]interface{})
  36. BidColl = bidding["dbColl"].(string)
  37. MgoBid = &mongodb.MongodbSim{
  38. MongodbAddr: bidding["addr"].(string),
  39. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  40. DbName: bidding["dbName"].(string),
  41. UserName: bidding["uname"].(string),
  42. Password: bidding["upwd"].(string),
  43. }
  44. MgoBid.InitPool()
  45. extract := Sysconfig["extract"].(map[string]interface{})
  46. ExtColl = extract["dbColl"].(string)
  47. ExtColl1 = extract["dbColl1"].(string)
  48. MgoExt = &mongodb.MongodbSim{
  49. MongodbAddr: extract["addr"].(string),
  50. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  51. DbName: extract["dbName"].(string),
  52. }
  53. MgoExt.InitPool()
  54. es := Sysconfig["es"].(map[string]interface{})
  55. Es = &elastic.Elastic{
  56. S_esurl: util.ObjToString(es["addr"]),
  57. I_size: util.IntAllDef(es["pool"], 10),
  58. }
  59. Index = util.ObjToString(es["index"])
  60. IndexAll = util.ObjToString(es["bidding_all"])
  61. Itype = util.ObjToString(es["itype"])
  62. Es.InitElasticSize()
  63. //加载敏感词文件
  64. Ms = ms.NewMegaloscope("./rules.txt")
  65. initEtcd()
  66. initUdp()
  67. }
  68. func initEtcd() {
  69. jyRpc := Sysconfig["jy_rpc"].(map[string]interface{})
  70. Logger.Debug("etcd 注册rpc服务, " + util.ObjToString(jyRpc["key"]))
  71. JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{
  72. Etcd: discov.EtcdConf{
  73. Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","),
  74. Key: util.ObjToString(jyRpc["key"]),
  75. },
  76. })
  77. }
  78. func initUdp() {
  79. updport := Sysconfig["udpPort"].(string)
  80. UdpClient = util.UdpClient{Local: updport, BufSize: 1024}
  81. Logger.Info("Udp 监听 port: " + updport)
  82. UdpClient.Listen(processUdpMsg)
  83. }
  84. func main() {
  85. go jyNsqMethod()
  86. go attsNsqMethod()
  87. c := make(chan bool, 1)
  88. <-c
  89. }
  90. // @Description 剑鱼消息队列 按照类型处理消息
  91. // @Author J 2022/4/14 11:42 AM
  92. func jyNsqMethod() {
  93. cof := Sysconfig["nsq_jy"].(map[string]interface{})
  94. var err error
  95. MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
  96. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  97. Addr: util.ObjToString(cof["addr"]),
  98. ConnectType: 0, //默认连接nsqd
  99. Topic: util.ObjToString(cof["topic"]),
  100. Channel: util.ObjToString(cof["channel"]),
  101. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  102. })
  103. if err != nil {
  104. Logger.Error(err.Error())
  105. }
  106. for {
  107. select {
  108. case obj := <-MCJy.Ch: //从通道读取即可
  109. Logger.Debug("jy nsq: " + fmt.Sprint(obj))
  110. taskInfo(obj)
  111. }
  112. }
  113. }
  114. // @Description 附件处理完成的数据
  115. // @Author J 2022/4/14 1:18 PM
  116. func attsNsqMethod() {
  117. var err error
  118. cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
  119. MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr"]), util.ObjToString(cofAtts["topic"]), true)
  120. if err != nil {
  121. Logger.Error(err.Error())
  122. }
  123. FileTopicResult = util.ObjToString(cofAtts["topic-result"])
  124. MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
  125. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  126. Addr: util.ObjToString(cofAtts["addr"]),
  127. ConnectType: 0, //默认连接nsqd
  128. Topic: FileTopicResult,
  129. Channel: util.ObjToString(cofAtts["channel"]),
  130. Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
  131. })
  132. if err != nil {
  133. Logger.Error(err.Error())
  134. }
  135. for {
  136. select {
  137. case obj := <-MCAtts.Ch:
  138. Logger.Debug("file extract receive nsq: " + fmt.Sprint(obj))
  139. taskAtts(obj.(map[string]interface{}))
  140. }
  141. }
  142. }
  143. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  144. switch act {
  145. case util.OP_TYPE_DATA: //测试接收
  146. var resp map[string]interface{}
  147. err := json.Unmarshal(data, &resp)
  148. Logger.Info("udp receive...", Field("data", resp))
  149. if err != nil {
  150. //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
  151. } else if resp != nil {
  152. key, _ := resp["key"].(string)
  153. if key == "" {
  154. key = "udpok"
  155. }
  156. go UdpClient.WriteUdp([]byte(key), util.OP_NOOP, ra)
  157. tasktype, _ := resp["stype"].(string)
  158. switch tasktype {
  159. case "jyfb_data_over":
  160. go func() {
  161. JyRpcDataFin(resp["infoid"].(string))
  162. }()
  163. }
  164. }
  165. case util.OP_NOOP: //下个节点回应
  166. Logger.Info("接收回应:", Field("data", string(data)))
  167. }
  168. }