package main import ( jyes "app.yhyue.com/moapp/jybase/es" "encoding/json" "fmt" "go.uber.org/zap" . "jy_publishing/Logger" ms "jy_publishing/megaloscope" nsq "jy_publishing/nsq" "jy_publishing/tool" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" ) var ( Itype string MCJy, MCAtts *nsq.Consumer ) func init() { InitLogger("./log/All.log", "debug") util.ReadConfig(&tool.Sysconfig) //信息类型 if tool.Sysconfig["infoCode"] != nil { b, err := json.Marshal(tool.Sysconfig["infoCode"]) if err == nil { err = json.Unmarshal(b, &tool.InfoCodes) } if err != nil { log.Println("infoCode init err :", err) } } bidding := tool.Sysconfig["bidding"].(map[string]interface{}) tool.BidColl = bidding["dbColl"].(string) tool.MgoBid = &mongodb.MongodbSim{ MongodbAddr: bidding["addr"].(string), Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5), DbName: bidding["dbName"].(string), UserName: bidding["uname"].(string), Password: bidding["upwd"].(string), } tool.MgoBid.InitPool() extract := tool.Sysconfig["extract"].(map[string]interface{}) tool.ExtColl = extract["dbColl"].(string) tool.ExtColl1 = extract["dbColl1"].(string) tool.MgoExt = &mongodb.MongodbSim{ MongodbAddr: extract["addr"].(string), Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5), DbName: extract["dbName"].(string), } tool.MgoExt.InitPool() es := tool.Sysconfig["es"].(map[string]interface{}) if util.ObjToString(es["addr"]) != "" { tool.Es = &elastic.Elastic{ S_esurl: util.ObjToString(es["addr"]), I_size: util.IntAllDef(es["pool"], 10), Username: util.ObjToString(es["user"]), Password: util.ObjToString(es["password"]), } tool.Index = util.ObjToString(es["index"]) tool.IndexAll = util.ObjToString(es["index_all"]) Itype = util.ObjToString(es["itype"]) tool.Es.InitElasticSize() log.Println("初始化 elasticsearch") tool.Elastic = &jyes.EsV7{ Address: util.ObjToString(es["addr"]), UserName: util.ObjToString(es["user"]), Password: util.ObjToString(es["password"]), Size: util.IntAllDef(es["pool"], 10), } tool.Elastic.Init() } //加载敏感词文件 tool.Ms = ms.NewMegaloscope("./rules.txt") tool.InitOss() tool.InitEtcd() initUdp() } func initUdp() { updport := tool.Sysconfig["udpPort"].(string) tool.UdpClient = udp.UdpClient{Local: updport, BufSize: 1024} Logger.Info("Udp 监听 port: " + updport) tool.UdpClient.Listen(processUdpMsg) } func main() { go jyNsqMethod() go attsNsqMethod() c := make(chan bool, 1) <-c } // @Description 剑鱼消息队列 按照类型处理消息 // @Author J 2022/4/14 11:42 AM func jyNsqMethod() { cof := tool.Sysconfig["nsq_jy"].(map[string]interface{}) var err error MCJy, err = nsq.NewConsumer(&nsq.Cconfig{ IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断 Addr: util.ObjToString(cof["addr"]), ConnectType: 1, //默认连接nsqd Topic: util.ObjToString(cof["topic"]), Channel: util.ObjToString(cof["channel"]), Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数 }) if err != nil { Logger.Error(err.Error()) } for { select { case obj := <-MCJy.Ch: //从通道读取即可 Logger.Info("jy nsq: " + fmt.Sprint(obj)) tool.TaskInfo(obj) } } } // @Description 附件处理完成的数据 // @Author J 2022/4/14 1:18 PM func attsNsqMethod() { var err error cofAtts := tool.Sysconfig["nsq_attachment"].(map[string]interface{}) tool.MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true) if err != nil { Logger.Error(err.Error()) } tool.FileTopicResult = util.ObjToString(cofAtts["topic-result"]) MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{ IsJsonEncode: true, Addr: util.ObjToString(cofAtts["addr_c"]), ConnectType: 1, //默认连接nsqd Topic: tool.FileTopicResult, Channel: util.ObjToString(cofAtts["channel"]), Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数 }) if err != nil { Logger.Error(err.Error()) } for { select { case obj := <-MCAtts.Ch: Logger.Info("file extract receive nsq: " + fmt.Sprint(obj)) tool.TaskAtts(obj.(map[string]interface{})) } } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: //测试接收 var resp map[string]interface{} err := json.Unmarshal(data, &resp) Logger.Info("udp receive...", zap.Any("data", resp)) if err != nil { //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点 } else if resp != nil { key, _ := resp["key"].(string) if key == "" { key = "udpok" } go tool.UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) tasktype, _ := resp["stype"].(string) switch tasktype { case "jyfb_data_over": go func() { tool.JyRpcDataFin(resp["infoid"].(string)) }() } } case udp.OP_NOOP: //下个节点回应 Logger.Info("接收回应:", zap.Any("data", string(data))) } }