main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/discov"
  6. "github.com/zeromicro/go-zero/zrpc"
  7. "jy_publishing/Logger"
  8. ms "jy_publishing/megaloscope"
  9. nsq "jy_publishing/nsq"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  14. "log"
  15. "net"
  16. "strings"
  17. )
  18. var (
  19. Sysconfig map[string]interface{}
  20. MgoBid, MgoExt *mongodb.MongodbSim
  21. BidColl string
  22. ExtColl, ExtColl1 string
  23. FileTopicResult string
  24. Es *elastic.Elastic
  25. Index, IndexAll string
  26. Itype string
  27. JyRpcClient zrpc.Client
  28. ClientAddr string
  29. MCJy, MCAtts *nsq.Consumer
  30. MProducer *nsq.Producer
  31. Ms *ms.Megaloscope //敏感词
  32. UdpClient udp.UdpClient //udp对象
  33. InfoCodes []infoCode
  34. )
  35. type infoCode struct {
  36. Code string `json:"code"` //发布信息代码
  37. Name string `json:"name"` //发布信息类型名称
  38. IsPublic int `json:"isPublic"` //是否公开;默认公开:0;不公开:1
  39. ExtractType int `json:"extractType"` //-1:采购信息
  40. }
  41. func init() {
  42. Logger.InitLogger("./log/All.log", "debug")
  43. util.ReadConfig(&Sysconfig)
  44. //信息类型
  45. if Sysconfig["infoCode"] != nil {
  46. b, err := json.Marshal(Sysconfig["infoCode"])
  47. if err == nil {
  48. err = json.Unmarshal(b, &InfoCodes)
  49. }
  50. if err != nil {
  51. log.Println("infoCode init err :", err)
  52. }
  53. }
  54. bidding := Sysconfig["bidding"].(map[string]interface{})
  55. BidColl = bidding["dbColl"].(string)
  56. MgoBid = &mongodb.MongodbSim{
  57. MongodbAddr: bidding["addr"].(string),
  58. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  59. DbName: bidding["dbName"].(string),
  60. UserName: bidding["uname"].(string),
  61. Password: bidding["upwd"].(string),
  62. }
  63. MgoBid.InitPool()
  64. extract := Sysconfig["extract"].(map[string]interface{})
  65. ExtColl = extract["dbColl"].(string)
  66. ExtColl1 = extract["dbColl1"].(string)
  67. MgoExt = &mongodb.MongodbSim{
  68. MongodbAddr: extract["addr"].(string),
  69. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  70. DbName: extract["dbName"].(string),
  71. }
  72. MgoExt.InitPool()
  73. es := Sysconfig["es"].(map[string]interface{})
  74. Es = &elastic.Elastic{
  75. S_esurl: util.ObjToString(es["addr"]),
  76. I_size: util.IntAllDef(es["pool"], 10),
  77. Username: util.ObjToString(es["user"]),
  78. Password: util.ObjToString(es["password"]),
  79. }
  80. Index = util.ObjToString(es["index"])
  81. IndexAll = util.ObjToString(es["index_all"])
  82. Itype = util.ObjToString(es["itype"])
  83. Es.InitElasticSize()
  84. //加载敏感词文件
  85. Ms = ms.NewMegaloscope("./rules.txt")
  86. InitOss()
  87. initEtcd()
  88. initUdp()
  89. }
  90. func initEtcd() {
  91. jyRpc := Sysconfig["jy_rpc"].(map[string]interface{})
  92. Logger.Debug("etcd 注册rpc服务, " + util.ObjToString(jyRpc["key"]))
  93. JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{
  94. Etcd: discov.EtcdConf{
  95. Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","),
  96. Key: util.ObjToString(jyRpc["key"]),
  97. },
  98. })
  99. }
  100. func initUdp() {
  101. updport := Sysconfig["udpPort"].(string)
  102. UdpClient = udp.UdpClient{Local: updport, BufSize: 1024}
  103. Logger.Info("Udp 监听 port: " + updport)
  104. UdpClient.Listen(processUdpMsg)
  105. }
  106. func main() {
  107. go jyNsqMethod()
  108. go attsNsqMethod()
  109. c := make(chan bool, 1)
  110. <-c
  111. }
  112. // @Description 剑鱼消息队列 按照类型处理消息
  113. // @Author J 2022/4/14 11:42 AM
  114. func jyNsqMethod() {
  115. cof := Sysconfig["nsq_jy"].(map[string]interface{})
  116. var err error
  117. MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
  118. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  119. Addr: util.ObjToString(cof["addr"]),
  120. ConnectType: 1, //默认连接nsqd
  121. Topic: util.ObjToString(cof["topic"]),
  122. Channel: util.ObjToString(cof["channel"]),
  123. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  124. })
  125. if err != nil {
  126. Logger.Error(err.Error())
  127. }
  128. for {
  129. select {
  130. case obj := <-MCJy.Ch: //从通道读取即可
  131. Logger.Info("jy nsq: " + fmt.Sprint(obj))
  132. taskInfo(obj)
  133. }
  134. }
  135. }
  136. // @Description 附件处理完成的数据
  137. // @Author J 2022/4/14 1:18 PM
  138. func attsNsqMethod() {
  139. var err error
  140. cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
  141. MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
  142. if err != nil {
  143. Logger.Error(err.Error())
  144. }
  145. FileTopicResult = util.ObjToString(cofAtts["topic-result"])
  146. MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
  147. IsJsonEncode: true,
  148. Addr: util.ObjToString(cofAtts["addr_c"]),
  149. ConnectType: 1, //默认连接nsqd
  150. Topic: FileTopicResult,
  151. Channel: util.ObjToString(cofAtts["channel"]),
  152. Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
  153. })
  154. if err != nil {
  155. Logger.Error(err.Error())
  156. }
  157. for {
  158. select {
  159. case obj := <-MCAtts.Ch:
  160. Logger.Info("file extract receive nsq: " + fmt.Sprint(obj))
  161. taskAtts(obj.(map[string]interface{}))
  162. }
  163. }
  164. }
  165. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  166. switch act {
  167. case udp.OP_TYPE_DATA: //测试接收
  168. var resp map[string]interface{}
  169. err := json.Unmarshal(data, &resp)
  170. Logger.Info("udp receive...", Logger.Field("data", resp))
  171. if err != nil {
  172. //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
  173. } else if resp != nil {
  174. key, _ := resp["key"].(string)
  175. if key == "" {
  176. key = "udpok"
  177. }
  178. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  179. tasktype, _ := resp["stype"].(string)
  180. switch tasktype {
  181. case "jyfb_data_over":
  182. go func() {
  183. JyRpcDataFin(resp["infoid"].(string))
  184. }()
  185. }
  186. }
  187. case udp.OP_NOOP: //下个节点回应
  188. Logger.Info("接收回应:", Logger.Field("data", string(data)))
  189. }
  190. }