123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/zeromicro/go-zero/core/discov"
- "github.com/zeromicro/go-zero/zrpc"
- "jy_publishing/Logger"
- ms "jy_publishing/megaloscope"
- nsq "jy_publishing/nsq"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "log"
- "net"
- "strings"
- )
- var (
- Sysconfig map[string]interface{}
- MgoBid, MgoExt *mongodb.MongodbSim
- BidColl string
- ExtColl, ExtColl1 string
- FileTopicResult string
- Es *elastic.Elastic
- Index, IndexAll string
- Itype string
- JyRpcClient zrpc.Client
- ClientAddr string
- MCJy, MCAtts *nsq.Consumer
- MProducer *nsq.Producer
- Ms *ms.Megaloscope //敏感词
- UdpClient udp.UdpClient //udp对象
- InfoCodes []infoCode
- )
- type infoCode struct {
- Code string `json:"code"`
- Name string `json:"name"`
- }
- func init() {
- Logger.InitLogger("./log/All.log", "debug")
- util.ReadConfig(&Sysconfig)
- //信息类型
- if Sysconfig["infoCode"] != nil {
- b, err := json.Marshal(Sysconfig["infoCode"])
- if err == nil {
- err = json.Unmarshal(b, &InfoCodes)
- }
- if err != nil {
- log.Println("infoCode init err :", err)
- }
- }
- bidding := Sysconfig["bidding"].(map[string]interface{})
- BidColl = bidding["dbColl"].(string)
- MgoBid = &mongodb.MongodbSim{
- MongodbAddr: bidding["addr"].(string),
- Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
- DbName: bidding["dbName"].(string),
- UserName: bidding["uname"].(string),
- Password: bidding["upwd"].(string),
- }
- MgoBid.InitPool()
- extract := Sysconfig["extract"].(map[string]interface{})
- ExtColl = extract["dbColl"].(string)
- ExtColl1 = extract["dbColl1"].(string)
- MgoExt = &mongodb.MongodbSim{
- MongodbAddr: extract["addr"].(string),
- Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
- DbName: extract["dbName"].(string),
- }
- MgoExt.InitPool()
- es := Sysconfig["es"].(map[string]interface{})
- Es = &elastic.Elastic{
- S_esurl: util.ObjToString(es["addr"]),
- I_size: util.IntAllDef(es["pool"], 10),
- Username: util.ObjToString(es["user"]),
- Password: util.ObjToString(es["password"]),
- }
- Index = util.ObjToString(es["index"])
- IndexAll = util.ObjToString(es["index_all"])
- Itype = util.ObjToString(es["itype"])
- Es.InitElasticSize()
- //加载敏感词文件
- Ms = ms.NewMegaloscope("./rules.txt")
- InitOss()
- initEtcd()
- initUdp()
- }
- func initEtcd() {
- jyRpc := Sysconfig["jy_rpc"].(map[string]interface{})
- Logger.Debug("etcd 注册rpc服务, " + util.ObjToString(jyRpc["key"]))
- JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{
- Etcd: discov.EtcdConf{
- Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","),
- Key: util.ObjToString(jyRpc["key"]),
- },
- })
- }
- func initUdp() {
- updport := Sysconfig["udpPort"].(string)
- UdpClient = udp.UdpClient{Local: updport, BufSize: 1024}
- Logger.Info("Udp 监听 port: " + updport)
- UdpClient.Listen(processUdpMsg)
- }
- func main() {
- go jyNsqMethod()
- go attsNsqMethod()
- c := make(chan bool, 1)
- <-c
- }
- // @Description 剑鱼消息队列 按照类型处理消息
- // @Author J 2022/4/14 11:42 AM
- func jyNsqMethod() {
- cof := Sysconfig["nsq_jy"].(map[string]interface{})
- var err error
- MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
- IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
- Addr: util.ObjToString(cof["addr"]),
- ConnectType: 1, //默认连接nsqd
- Topic: util.ObjToString(cof["topic"]),
- Channel: util.ObjToString(cof["channel"]),
- Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
- })
- if err != nil {
- Logger.Error(err.Error())
- }
- for {
- select {
- case obj := <-MCJy.Ch: //从通道读取即可
- Logger.Info("jy nsq: " + fmt.Sprint(obj))
- taskInfo(obj)
- }
- }
- }
- // @Description 附件处理完成的数据
- // @Author J 2022/4/14 1:18 PM
- func attsNsqMethod() {
- var err error
- cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
- MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
- if err != nil {
- Logger.Error(err.Error())
- }
- FileTopicResult = util.ObjToString(cofAtts["topic-result"])
- MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
- IsJsonEncode: true,
- Addr: util.ObjToString(cofAtts["addr_c"]),
- ConnectType: 1, //默认连接nsqd
- Topic: FileTopicResult,
- Channel: util.ObjToString(cofAtts["channel"]),
- Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
- })
- if err != nil {
- Logger.Error(err.Error())
- }
- for {
- select {
- case obj := <-MCAtts.Ch:
- Logger.Info("file extract receive nsq: " + fmt.Sprint(obj))
- taskAtts(obj.(map[string]interface{}))
- }
- }
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA: //测试接收
- var resp map[string]interface{}
- err := json.Unmarshal(data, &resp)
- Logger.Info("udp receive...", Logger.Field("data", resp))
- if err != nil {
- //go Udpclient.WriteUdp([]byte{}, mu.OP_NOOP, ra) //回应上一个节点
- } else if resp != nil {
- key, _ := resp["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- tasktype, _ := resp["stype"].(string)
- switch tasktype {
- case "jyfb_data_over":
- go func() {
- JyRpcDataFin(resp["infoid"].(string))
- }()
- }
- }
- case udp.OP_NOOP: //下个节点回应
- Logger.Info("接收回应:", Logger.Field("data", string(data)))
- }
- }
|