123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- package main
- import (
- "encoding/json"
- "github.com/zeromicro/go-zero/core/discov"
- "github.com/zeromicro/go-zero/zrpc"
- ms "jy_publishing/megaloscope"
- nsq "jy_publishing/nsq"
- pb "jy_publishing/proto"
- "net"
- "strings"
- "utils"
- "utils/elastic"
- "utils/log"
- "utils/mongodb"
- )
- var (
- Sysconfig map[string]interface{}
- MgoBid, MgoExt *mongodb.MongodbSim
- BidColl string
- ExtColl, ExtColl1 string
- Es *elastic.Elastic
- Index, IndexAll string
- Itype string
- JyRpcClient zrpc.Client
- ClientAddr string
- MCJy, MCAtts *nsq.Consumer
- MProducer *nsq.Producer
- Ms *ms.Megaloscope //敏感词
- UdpClient util.UdpClient //udp对象
- )
- func init() {
- log.InitLogger("./log/All.log", "debug")
- util.ReadConfig(&Sysconfig)
- bidding := Sysconfig["bidding"].(map[string]interface{})
- BidColl = bidding["dbColl"].(string)
- MgoBid = &mongodb.MongodbSim{
- MongodbAddr: bidding["addr"].(string),
- Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
- DbName: bidding["dbName"].(string),
- UserName: bidding["uname"].(string),
- Password: bidding["upwd"].(string),
- }
- MgoBid.InitPool()
- extract := Sysconfig["extract"].(map[string]interface{})
- ExtColl = extract["dbColl"].(string)
- ExtColl1 = extract["dbColl1"].(string)
- MgoExt = &mongodb.MongodbSim{
- MongodbAddr: extract["addr"].(string),
- Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
- DbName: extract["dbName"].(string),
- }
- MgoExt.InitPool()
- es := Sysconfig["es"].(map[string]interface{})
- Es = &elastic.Elastic{
- S_esurl: util.ObjToString(es["addr"]),
- I_size: util.IntAllDef(es["pool"], 10),
- }
- Index = util.ObjToString(es["index"])
- IndexAll = util.ObjToString(es["bidding_all"])
- Itype = util.ObjToString(es["itype"])
- Es.InitElasticSize()
- //加载敏感词文件
- Ms = ms.NewMegaloscope("./rules.txt")
- jyRpc := Sysconfig["jy_rpc"].(map[string]interface{})
- JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{
- Etcd: discov.EtcdConf{
- Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","),
- Key: util.ObjToString(jyRpc["key"]),
- },
- })
- initUdp()
- initNsq()
- }
- func initNsq() {
- cof := Sysconfig["nsq_jy"].(map[string]interface{})
- MCJy, _ = nsq.NewConsumer(&nsq.Cconfig{
- IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
- Addr: util.ObjToString(cof["addr"]),
- ConnectType: 0, //默认连接nsqd
- Topic: util.ObjToString(cof["topic"]),
- Channel: util.ObjToString(cof["channel"]),
- Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
- })
- cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
- MProducer, _ = nsq.NewProducer(util.ObjToString(cofAtts["addr"]), util.ObjToString(cofAtts["topic"]), true)
- MCAtts, _ = nsq.NewConsumer(&nsq.Cconfig{
- IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
- Addr: util.ObjToString(cofAtts["addr"]),
- ConnectType: 0, //默认连接nsqd
- Topic: util.ObjToString(cofAtts["topic-result"]),
- Channel: util.ObjToString(cofAtts["channel"]),
- Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
- })
- }
- func initUdp() {
- updport := Sysconfig["udpPort"].(string)
- UdpClient = util.UdpClient{Local: updport, BufSize: 1024}
- log.Info("Udp 监听 port: " + updport)
- UdpClient.Listen(processUdpMsg)
- }
- func main() {
- go jyNsqMethod()
- go attsNsqMethod()
- attsMap := map[string]interface{}{"1": map[string]interface{}{
- "fid": "1e0828202d4269d006f95e10e8f7afea4794a3d066d687f160f7dcafba02f74c.docx",
- "filename": "附件202110121614016176.docx",
- "ftype": "docx",
- }}
- other := map[string]interface{}{
- "id": "112",
- "action": "1",
- "msgType": "1",
- "title": []string{},
- "detail": []string{},
- }
- otherJson, _ := json.Marshal(other)
- var attsArr []*pb.Request
- for _, m := range attsMap {
- m1 := m.(map[string]interface{})
- attsArr = append(attsArr, &pb.Request{
- FileUrl: util.ObjToString(m1["fid"]),
- FileName: util.ObjToString(m1["filename"]),
- FileType: util.ObjToString(m1["ftype"]),
- ReturnType: 1,
- ExtractType: 0,
- })
- }
- msginfo := &pb.FileRequest{
- Message: attsArr,
- Other: string(otherJson),
- Topic: "data-attachment",
- }
- //msginfo := map[string]interface{}{
- // "meassage": attsArr,
- // "other": string(otherJson),
- // "topic": "data-attachment",
- //}
- _ = MProducer.Publish(msginfo)
- c := make(chan bool, 1)
- <-c
- }
- // @Description 剑鱼消息队列 按照类型处理消息
- // @Author J 2022/4/14 11:42 AM
- func jyNsqMethod() {
- for {
- select {
- case obj := <-MCJy.Ch: //从通道读取即可
- taskInfo(obj)
- }
- }
- }
- // @Description 附件处理完成的数据
- // @Author J 2022/4/14 1:18 PM
- func attsNsqMethod() {
- for {
- select {
- case obj := <-MCAtts.Ch:
- util.Debug(obj)
- //taskAtts(obj)
- }
- }
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case util.OP_TYPE_DATA: //测试接收
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- log.Info("udp receive...", log.Field("data", rep))
- if err != nil {
- //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
- } else {
- //by, _ := json.Marshal(map[string]interface{}{
- // "taskid": rep["taskid"],
- // "stype": rep["stype"],
- //})
- //go Udpclient.WriteUdp(by, mu.OP_NOOP, ra) //回应上一个节点
- }
- case util.OP_NOOP: //下个节点回应
- log.Info("接收回应:", log.Field("data", string(data)))
- }
- }
|