123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package main
- import (
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- . "jy_publishing/Logger"
- ms "jy_publishing/megaloscope"
- nsq "jy_publishing/nsq"
- "jy_publishing/tool"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "log"
- "net"
- )
- var (
- Itype string
- MCJy, MCAtts *nsq.Consumer
- )
- func init() {
- InitLogger("./log/All.log", "debug")
- util.ReadConfig(&tool.Sysconfig)
- //信息类型
- if tool.Sysconfig["infoCode"] != nil {
- b, err := json.Marshal(tool.Sysconfig["infoCode"])
- if err == nil {
- err = json.Unmarshal(b, &tool.InfoCodes)
- }
- if err != nil {
- log.Println("infoCode init err :", err)
- }
- }
- bidding := tool.Sysconfig["bidding"].(map[string]interface{})
- tool.BidColl = bidding["dbColl"].(string)
- tool.MgoBid = &mongodb.MongodbSim{
- MongodbAddr: bidding["addr"].(string),
- Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5),
- DbName: bidding["dbName"].(string),
- UserName: bidding["uname"].(string),
- Password: bidding["upwd"].(string),
- }
- tool.MgoBid.InitPool()
- extract := tool.Sysconfig["extract"].(map[string]interface{})
- tool.ExtColl = extract["dbColl"].(string)
- tool.ExtColl1 = extract["dbColl1"].(string)
- tool.MgoExt = &mongodb.MongodbSim{
- MongodbAddr: extract["addr"].(string),
- Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5),
- DbName: extract["dbName"].(string),
- }
- tool.MgoExt.InitPool()
- es := tool.Sysconfig["es"].(map[string]interface{})
- if util.ObjToString(es["addr"]) != "" {
- tool.Es = &elastic.Elastic{
- S_esurl: util.ObjToString(es["addr"]),
- I_size: util.IntAllDef(es["pool"], 10),
- Username: util.ObjToString(es["user"]),
- Password: util.ObjToString(es["password"]),
- }
- tool.Index = util.ObjToString(es["index"])
- tool.IndexAll = util.ObjToString(es["index_all"])
- Itype = util.ObjToString(es["itype"])
- tool.Es.InitElasticSize()
- log.Println("初始化 elasticsearch")
- }
- //加载敏感词文件
- tool.Ms = ms.NewMegaloscope("./rules.txt")
- tool.InitOss()
- tool.InitEtcd()
- initUdp()
- }
- func initUdp() {
- updport := tool.Sysconfig["udpPort"].(string)
- tool.UdpClient = udp.UdpClient{Local: updport, BufSize: 1024}
- Logger.Info("Udp 监听 port: " + updport)
- tool.UdpClient.Listen(processUdpMsg)
- }
- func main() {
- go jyNsqMethod()
- go attsNsqMethod()
- c := make(chan bool, 1)
- <-c
- }
- // @Description 剑鱼消息队列 按照类型处理消息
- // @Author J 2022/4/14 11:42 AM
- func jyNsqMethod() {
- cof := tool.Sysconfig["nsq_jy"].(map[string]interface{})
- var err error
- MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
- IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
- Addr: util.ObjToString(cof["addr"]),
- ConnectType: 1, //默认连接nsqd
- Topic: util.ObjToString(cof["topic"]),
- Channel: util.ObjToString(cof["channel"]),
- Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
- })
- if err != nil {
- Logger.Error(err.Error())
- }
- for {
- select {
- case obj := <-MCJy.Ch: //从通道读取即可
- Logger.Info("jy nsq: " + fmt.Sprint(obj))
- tool.TaskInfo(obj)
- }
- }
- }
- // @Description 附件处理完成的数据
- // @Author J 2022/4/14 1:18 PM
- func attsNsqMethod() {
- var err error
- cofAtts := tool.Sysconfig["nsq_attachment"].(map[string]interface{})
- tool.MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
- if err != nil {
- Logger.Error(err.Error())
- }
- tool.FileTopicResult = util.ObjToString(cofAtts["topic-result"])
- MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
- IsJsonEncode: true,
- Addr: util.ObjToString(cofAtts["addr_c"]),
- ConnectType: 1, //默认连接nsqd
- Topic: tool.FileTopicResult,
- Channel: util.ObjToString(cofAtts["channel"]),
- Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
- })
- if err != nil {
- Logger.Error(err.Error())
- }
- for {
- select {
- case obj := <-MCAtts.Ch:
- Logger.Info("file extract receive nsq: " + fmt.Sprint(obj))
- tool.TaskAtts(obj.(map[string]interface{}))
- }
- }
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA: //测试接收
- var resp map[string]interface{}
- err := json.Unmarshal(data, &resp)
- Logger.Info("udp receive...", zap.Any("data", resp))
- if err != nil {
- //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
- } else if resp != nil {
- key, _ := resp["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go tool.UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- tasktype, _ := resp["stype"].(string)
- switch tasktype {
- case "jyfb_data_over":
- go func() {
- tool.JyRpcDataFin(resp["infoid"].(string))
- }()
- }
- }
- case udp.OP_NOOP: //下个节点回应
- Logger.Info("接收回应:", zap.Any("data", string(data)))
- }
- }
|