main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/discov"
  6. "github.com/zeromicro/go-zero/zrpc"
  7. ms "jy_publishing/megaloscope"
  8. nsq "jy_publishing/nsq"
  9. "net"
  10. "strings"
  11. "utils"
  12. "utils/elastic"
  13. "utils/log"
  14. "utils/mongodb"
  15. )
  16. var (
  17. Sysconfig map[string]interface{}
  18. MgoBid, MgoExt *mongodb.MongodbSim
  19. BidColl string
  20. ExtColl, ExtColl1 string
  21. Es *elastic.Elastic
  22. Index, IndexAll string
  23. Itype string
  24. JyRpcClient zrpc.Client
  25. ClientAddr string
  26. MCJy, MCAtts *nsq.Consumer
  27. MProducer *nsq.Producer
  28. Ms *ms.Megaloscope //敏感词
  29. UdpClient util.UdpClient //udp对象
  30. )
  31. func init() {
  32. log.InitLogger("./log/All.log", "debug")
  33. util.ReadConfig(&Sysconfig)
  34. bidding := Sysconfig["bidding"].(map[string]interface{})
  35. BidColl = bidding["dbColl"].(string)
  36. MgoBid = &mongodb.MongodbSim{
  37. MongodbAddr: bidding["addr"].(string),
  38. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  39. DbName: bidding["dbName"].(string),
  40. UserName: bidding["uname"].(string),
  41. Password: bidding["upwd"].(string),
  42. }
  43. MgoBid.InitPool()
  44. extract := Sysconfig["extract"].(map[string]interface{})
  45. ExtColl = extract["dbColl"].(string)
  46. ExtColl1 = extract["dbColl1"].(string)
  47. MgoExt = &mongodb.MongodbSim{
  48. MongodbAddr: extract["addr"].(string),
  49. Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
  50. DbName: extract["dbName"].(string),
  51. }
  52. MgoExt.InitPool()
  53. es := Sysconfig["es"].(map[string]interface{})
  54. Es = &elastic.Elastic{
  55. S_esurl: util.ObjToString(es["addr"]),
  56. I_size: util.IntAllDef(es["pool"], 10),
  57. }
  58. Index = util.ObjToString(es["index"])
  59. IndexAll = util.ObjToString(es["bidding_all"])
  60. Itype = util.ObjToString(es["itype"])
  61. Es.InitElasticSize()
  62. //加载敏感词文件
  63. Ms = ms.NewMegaloscope("./rules.txt")
  64. jyRpc := Sysconfig["jy_rpc"].(map[string]interface{})
  65. JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{
  66. Etcd: discov.EtcdConf{
  67. Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","),
  68. Key: util.ObjToString(jyRpc["key"]),
  69. },
  70. })
  71. //initUdp()
  72. //initNsq()
  73. }
  74. func initNsq() {
  75. cof := Sysconfig["nsq_jy"].(map[string]interface{})
  76. MCJy, _ = nsq.NewConsumer(&nsq.Cconfig{
  77. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  78. Addr: util.ObjToString(cof["addr"]),
  79. ConnectType: 0, //默认连接nsqd
  80. Topic: util.ObjToString(cof["topic"]),
  81. Channel: util.ObjToString(cof["channel"]),
  82. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  83. })
  84. cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
  85. MProducer, _ = nsq.NewProducer(util.ObjToString(cofAtts["addr"]), util.ObjToString(cofAtts["topic"]), true)
  86. MCAtts, _ = nsq.NewConsumer(&nsq.Cconfig{
  87. IsJsonEncode: false, //与生产者配置对应,设为true会取第1个字节进行类型判断
  88. Addr: util.ObjToString(cofAtts["addr"]),
  89. ConnectType: 0, //默认连接nsqd
  90. Topic: util.ObjToString(cofAtts["topic-result"]),
  91. Channel: util.ObjToString(cofAtts["channel"]),
  92. Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
  93. })
  94. }
  95. func initUdp() {
  96. updport := Sysconfig["udpPort"].(string)
  97. UdpClient = util.UdpClient{Local: updport, BufSize: 1024}
  98. log.Info("Udp 监听 port: " + updport)
  99. UdpClient.Listen(processUdpMsg)
  100. }
  101. func main() {
  102. //go jyNsqMethod()
  103. go attsNsqMethod()
  104. attsMap := map[string]interface{}{"1": map[string]interface{}{
  105. "fid": "1e0828202d4269d006f95e10e8f7afea4794a3d066d687f160f7dcafba02f74c.docx",
  106. "filename": "附件202110121614016176.docx",
  107. "ftype": "docx",
  108. }}
  109. other := map[string]interface{}{
  110. "id": "113",
  111. "action": "1",
  112. "msgType": "1",
  113. "title": []string{"汉"},
  114. "detail": []string{},
  115. }
  116. //otherJson, _ := json.Marshal(other)
  117. //var attsArr []*pb.Request
  118. var attsArr []map[string]interface{}
  119. for _, m := range attsMap {
  120. m1 := m.(map[string]interface{})
  121. //attsArr = append(attsArr, &pb.Request{
  122. // FileUrl: util.ObjToString(m1["fid"]),
  123. // FileName: util.ObjToString(m1["filename"]),
  124. // FileType: util.ObjToString(m1["ftype"]),
  125. // ReturnType: 1,
  126. // ExtractType: 1,
  127. //})
  128. attsArr = append(attsArr, map[string]interface{}{
  129. "fileUrl": util.ObjToString(m1["fid"]),
  130. "fileName": util.ObjToString(m1["filename"]),
  131. "fileType": util.ObjToString(m1["ftype"]),
  132. "returnType": 1,
  133. "extractType": 0,
  134. })
  135. }
  136. msginfo := map[string]interface{}{
  137. "message": attsArr,
  138. "other": other,
  139. "topic": "attachment-txt",
  140. }
  141. util.Debug(msginfo)
  142. _ = MProducer.Publish(msginfo)
  143. c := make(chan bool, 1)
  144. <-c
  145. }
  146. // @Description 剑鱼消息队列 按照类型处理消息
  147. // @Author J 2022/4/14 11:42 AM
  148. func jyNsqMethod() {
  149. for {
  150. select {
  151. case obj := <-MCJy.Ch: //从通道读取即可
  152. fmt.Println("---", obj)
  153. taskInfo(obj)
  154. }
  155. }
  156. }
  157. // @Description 附件处理完成的数据
  158. // @Author J 2022/4/14 1:18 PM
  159. func attsNsqMethod() {
  160. for {
  161. select {
  162. case obj := <-MCAtts.Ch:
  163. var resp *AttsResponse
  164. _ = json.Unmarshal(obj.([]byte), &resp)
  165. fmt.Println("---", resp)
  166. taskAtts(resp)
  167. }
  168. }
  169. }
  170. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  171. switch act {
  172. case util.OP_TYPE_DATA: //测试接收
  173. var resp map[string]interface{}
  174. err := json.Unmarshal(data, &resp)
  175. log.Info("udp receive...", log.Field("data", resp))
  176. if err != nil {
  177. //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
  178. } else if resp != nil {
  179. key, _ := resp["key"].(string)
  180. if key == "" {
  181. key = "udpok"
  182. }
  183. go UdpClient.WriteUdp([]byte(key), util.OP_NOOP, ra)
  184. tasktype, _ := resp["stype"].(string)
  185. switch tasktype {
  186. case "jyfb_data_over":
  187. go func() {
  188. DelMethod(resp["infoid"].(string))
  189. }()
  190. }
  191. }
  192. case util.OP_NOOP: //下个节点回应
  193. log.Info("接收回应:", log.Field("data", string(data)))
  194. }
  195. }