package main import ( "encoding/json" "github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/zrpc" ms "jy_publishing/megaloscope" nsq "jy_publishing/nsq" pb "jy_publishing/proto" "net" "strings" "utils" "utils/elastic" "utils/log" "utils/mongodb" ) var ( Sysconfig map[string]interface{} MgoBid, MgoExt *mongodb.MongodbSim BidColl string ExtColl, ExtColl1 string Es *elastic.Elastic Index, IndexAll string Itype string JyRpcClient zrpc.Client ClientAddr string MCJy, MCAtts *nsq.Consumer MProducer *nsq.Producer Ms *ms.Megaloscope //敏感词 UdpClient util.UdpClient //udp对象 ) func init() { log.InitLogger("./log/All.log", "debug") util.ReadConfig(&Sysconfig) bidding := Sysconfig["bidding"].(map[string]interface{}) BidColl = bidding["dbColl"].(string) MgoBid = &mongodb.MongodbSim{ MongodbAddr: bidding["addr"].(string), Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5), DbName: bidding["dbName"].(string), UserName: bidding["uname"].(string), Password: bidding["upwd"].(string), } MgoBid.InitPool() extract := Sysconfig["extract"].(map[string]interface{}) ExtColl = extract["dbColl"].(string) ExtColl1 = extract["dbColl1"].(string) MgoExt = &mongodb.MongodbSim{ MongodbAddr: extract["addr"].(string), Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5), DbName: extract["dbName"].(string), } MgoExt.InitPool() es := Sysconfig["es"].(map[string]interface{}) Es = &elastic.Elastic{ S_esurl: util.ObjToString(es["addr"]), I_size: util.IntAllDef(es["pool"], 10), } Index = util.ObjToString(es["index"]) IndexAll = util.ObjToString(es["bidding_all"]) Itype = util.ObjToString(es["itype"]) Es.InitElasticSize() //加载敏感词文件 Ms = ms.NewMegaloscope("./rules.txt") jyRpc := Sysconfig["jy_rpc"].(map[string]interface{}) JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{ Etcd: discov.EtcdConf{ Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","), Key: util.ObjToString(jyRpc["key"]), }, }) initUdp() initNsq() } func initNsq() { cof := Sysconfig["nsq_jy"].(map[string]interface{}) MCJy, _ = nsq.NewConsumer(&nsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: util.ObjToString(cof["addr"]), ConnectType: 0, //默认连接nsqd Topic: util.ObjToString(cof["topic"]), Channel: util.ObjToString(cof["channel"]), Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数 }) cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{}) MProducer, _ = nsq.NewProducer(util.ObjToString(cofAtts["addr"]), util.ObjToString(cofAtts["topic"]), true) MCAtts, _ = nsq.NewConsumer(&nsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: util.ObjToString(cofAtts["addr"]), ConnectType: 0, //默认连接nsqd Topic: util.ObjToString(cofAtts["topic-result"]), Channel: util.ObjToString(cofAtts["channel"]), Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数 }) } func initUdp() { updport := Sysconfig["udpPort"].(string) UdpClient = util.UdpClient{Local: updport, BufSize: 1024} log.Info("Udp 监听 port: " + updport) UdpClient.Listen(processUdpMsg) } func main() { go jyNsqMethod() go attsNsqMethod() attsMap := map[string]interface{}{"1": map[string]interface{}{ "fid": "1e0828202d4269d006f95e10e8f7afea4794a3d066d687f160f7dcafba02f74c.docx", "filename": "附件202110121614016176.docx", "ftype": "docx", }} other := map[string]interface{}{ "id": "112", "action": "1", "msgType": "1", "title": []string{}, "detail": []string{}, } otherJson, _ := json.Marshal(other) var attsArr []*pb.Request for _, m := range attsMap { m1 := m.(map[string]interface{}) attsArr = append(attsArr, &pb.Request{ FileUrl: util.ObjToString(m1["fid"]), FileName: util.ObjToString(m1["filename"]), FileType: util.ObjToString(m1["ftype"]), ReturnType: 1, ExtractType: 0, }) } msginfo := &pb.FileRequest{ Message: attsArr, Other: string(otherJson), Topic: "data-attachment", } //msginfo := map[string]interface{}{ // "meassage": attsArr, // "other": string(otherJson), // "topic": "data-attachment", //} _ = MProducer.Publish(msginfo) c := make(chan bool, 1) <-c } // @Description 剑鱼消息队列 按照类型处理消息 // @Author J 2022/4/14 11:42 AM func jyNsqMethod() { for { select { case obj := <-MCJy.Ch: //从通道读取即可 taskInfo(obj) } } } // @Description 附件处理完成的数据 // @Author J 2022/4/14 1:18 PM func attsNsqMethod() { for { select { case obj := <-MCAtts.Ch: util.Debug(obj) //taskAtts(obj) } } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case util.OP_TYPE_DATA: //测试接收 var rep map[string]interface{} err := json.Unmarshal(data, &rep) log.Info("udp receive...", log.Field("data", rep)) if err != nil { //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点 } else { //by, _ := json.Marshal(map[string]interface{}{ // "taskid": rep["taskid"], // "stype": rep["stype"], //}) //go Udpclient.WriteUdp(by, mu.OP_NOOP, ra) //回应上一个节点 } case util.OP_NOOP: //下个节点回应 log.Info("接收回应:", log.Field("data", string(data))) } }