package main import ( "encoding/json" "fmt" "github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/zrpc" "jy_publishing/Logger" ms "jy_publishing/megaloscope" nsq "jy_publishing/nsq" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "strings" ) var ( Sysconfig map[string]interface{} MgoBid, MgoExt *mongodb.MongodbSim BidColl string ExtColl, ExtColl1 string FileTopicResult string Es *elastic.Elastic Index, IndexAll string Itype string JyRpcClient zrpc.Client ClientAddr string MCJy, MCAtts *nsq.Consumer MProducer *nsq.Producer Ms *ms.Megaloscope //敏感词 UdpClient udp.UdpClient //udp对象 InfoCodes []infoCode ) type infoCode struct { Code string `json:"code"` Name string `json:"name"` } func init() { Logger.InitLogger("./log/All.log", "debug") util.ReadConfig(&Sysconfig) //信息类型 if Sysconfig["infoCode"] != nil { b, err := json.Marshal(Sysconfig["infoCode"]) if err == nil { err = json.Unmarshal(b, &InfoCodes) } if err != nil { log.Println("infoCode init err :", err) } } bidding := Sysconfig["bidding"].(map[string]interface{}) BidColl = bidding["dbColl"].(string) MgoBid = &mongodb.MongodbSim{ MongodbAddr: bidding["addr"].(string), Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5), DbName: bidding["dbName"].(string), UserName: bidding["uname"].(string), Password: bidding["upwd"].(string), } MgoBid.InitPool() extract := Sysconfig["extract"].(map[string]interface{}) ExtColl = extract["dbColl"].(string) ExtColl1 = extract["dbColl1"].(string) MgoExt = &mongodb.MongodbSim{ MongodbAddr: extract["addr"].(string), Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5), DbName: extract["dbName"].(string), } MgoExt.InitPool() es := Sysconfig["es"].(map[string]interface{}) Es = &elastic.Elastic{ S_esurl: util.ObjToString(es["addr"]), I_size: util.IntAllDef(es["pool"], 10), Username: util.ObjToString(es["user"]), Password: util.ObjToString(es["password"]), } Index = util.ObjToString(es["index"]) IndexAll = util.ObjToString(es["index_all"]) Itype = util.ObjToString(es["itype"]) Es.InitElasticSize() //加载敏感词文件 Ms = ms.NewMegaloscope("./rules.txt") InitOss() initEtcd() initUdp() } func initEtcd() { jyRpc := Sysconfig["jy_rpc"].(map[string]interface{}) Logger.Debug("etcd 注册rpc服务, " + util.ObjToString(jyRpc["key"])) JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{ Etcd: discov.EtcdConf{ Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","), Key: util.ObjToString(jyRpc["key"]), }, }) } func initUdp() { updport := Sysconfig["udpPort"].(string) UdpClient = udp.UdpClient{Local: updport, BufSize: 1024} Logger.Info("Udp 监听 port: " + updport) UdpClient.Listen(processUdpMsg) } func main() { go jyNsqMethod() go attsNsqMethod() c := make(chan bool, 1) <-c } // @Description 剑鱼消息队列 按照类型处理消息 // @Author J 2022/4/14 11:42 AM func jyNsqMethod() { cof := Sysconfig["nsq_jy"].(map[string]interface{}) var err error MCJy, err = nsq.NewConsumer(&nsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: util.ObjToString(cof["addr"]), ConnectType: 1, //默认连接nsqd Topic: util.ObjToString(cof["topic"]), Channel: util.ObjToString(cof["channel"]), Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数 }) if err != nil { Logger.Error(err.Error()) } for { select { case obj := <-MCJy.Ch: //从通道读取即可 Logger.Info("jy nsq: " + fmt.Sprint(obj)) taskInfo(obj) } } } // @Description 附件处理完成的数据 // @Author J 2022/4/14 1:18 PM func attsNsqMethod() { var err error cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{}) MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true) if err != nil { Logger.Error(err.Error()) } FileTopicResult = util.ObjToString(cofAtts["topic-result"]) MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{ IsJsonEncode: true, Addr: util.ObjToString(cofAtts["addr_c"]), ConnectType: 1, //默认连接nsqd Topic: FileTopicResult, Channel: util.ObjToString(cofAtts["channel"]), Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数 }) if err != nil { Logger.Error(err.Error()) } for { select { case obj := <-MCAtts.Ch: Logger.Info("file extract receive nsq: " + fmt.Sprint(obj)) taskAtts(obj.(map[string]interface{})) } } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: //测试接收 var resp map[string]interface{} err := json.Unmarshal(data, &resp) Logger.Info("udp receive...", Logger.Field("data", resp)) if err != nil { //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点 } else if resp != nil { key, _ := resp["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) tasktype, _ := resp["stype"].(string) switch tasktype { case "jyfb_data_over": go func() { JyRpcDataFin(resp["infoid"].(string)) }() } } case udp.OP_NOOP: //下个节点回应 Logger.Info("接收回应:", Logger.Field("data", string(data))) } }