|
@@ -3,112 +3,79 @@ package main
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
- "github.com/zeromicro/go-zero/core/discov"
|
|
|
- "github.com/zeromicro/go-zero/zrpc"
|
|
|
"jy_publishing/Logger"
|
|
|
ms "jy_publishing/megaloscope"
|
|
|
nsq "jy_publishing/nsq"
|
|
|
+ "jy_publishing/tool"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
|
|
|
"log"
|
|
|
"net"
|
|
|
- "strings"
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- Sysconfig map[string]interface{}
|
|
|
- MgoBid, MgoExt *mongodb.MongodbSim
|
|
|
- BidColl string
|
|
|
- ExtColl, ExtColl1 string
|
|
|
- FileTopicResult string
|
|
|
- Es *elastic.Elastic
|
|
|
- Index, IndexAll string
|
|
|
- Itype string
|
|
|
- JyRpcClient zrpc.Client
|
|
|
- ClientAddr string
|
|
|
- MCJy, MCAtts *nsq.Consumer
|
|
|
- MProducer *nsq.Producer
|
|
|
- Ms *ms.Megaloscope //敏感词
|
|
|
- UdpClient udp.UdpClient //udp对象
|
|
|
- InfoCodes []infoCode
|
|
|
+ Itype string
|
|
|
+ MCJy, MCAtts *nsq.Consumer
|
|
|
)
|
|
|
|
|
|
-type infoCode struct {
|
|
|
- Code string `json:"code"` //发布信息代码
|
|
|
- Name string `json:"name"` //发布信息类型名称
|
|
|
- IsPublic int `json:"isPublic"` //是否公开;默认公开:0;不公开:1
|
|
|
- ExtractType int `json:"extractType"` //-1:采购信息
|
|
|
-}
|
|
|
-
|
|
|
func init() {
|
|
|
Logger.InitLogger("./log/All.log", "debug")
|
|
|
- util.ReadConfig(&Sysconfig)
|
|
|
+ util.ReadConfig(&tool.Sysconfig)
|
|
|
//信息类型
|
|
|
- if Sysconfig["infoCode"] != nil {
|
|
|
- b, err := json.Marshal(Sysconfig["infoCode"])
|
|
|
+ if tool.Sysconfig["infoCode"] != nil {
|
|
|
+ b, err := json.Marshal(tool.Sysconfig["infoCode"])
|
|
|
if err == nil {
|
|
|
- err = json.Unmarshal(b, &InfoCodes)
|
|
|
+ err = json.Unmarshal(b, &tool.InfoCodes)
|
|
|
}
|
|
|
if err != nil {
|
|
|
log.Println("infoCode init err :", err)
|
|
|
}
|
|
|
}
|
|
|
- bidding := Sysconfig["bidding"].(map[string]interface{})
|
|
|
- BidColl = bidding["dbColl"].(string)
|
|
|
- MgoBid = &mongodb.MongodbSim{
|
|
|
+ bidding := tool.Sysconfig["bidding"].(map[string]interface{})
|
|
|
+ tool.BidColl = bidding["dbColl"].(string)
|
|
|
+ tool.MgoBid = &mongodb.MongodbSim{
|
|
|
MongodbAddr: bidding["addr"].(string),
|
|
|
- Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
|
|
|
+ Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5),
|
|
|
DbName: bidding["dbName"].(string),
|
|
|
UserName: bidding["uname"].(string),
|
|
|
Password: bidding["upwd"].(string),
|
|
|
}
|
|
|
- MgoBid.InitPool()
|
|
|
- extract := Sysconfig["extract"].(map[string]interface{})
|
|
|
- ExtColl = extract["dbColl"].(string)
|
|
|
- ExtColl1 = extract["dbColl1"].(string)
|
|
|
- MgoExt = &mongodb.MongodbSim{
|
|
|
+ tool.MgoBid.InitPool()
|
|
|
+ extract := tool.Sysconfig["extract"].(map[string]interface{})
|
|
|
+ tool.ExtColl = extract["dbColl"].(string)
|
|
|
+ tool.ExtColl1 = extract["dbColl1"].(string)
|
|
|
+ tool.MgoExt = &mongodb.MongodbSim{
|
|
|
MongodbAddr: extract["addr"].(string),
|
|
|
- Size: util.IntAllDef(Sysconfig["mgoPoolSize"], 5),
|
|
|
+ Size: util.IntAllDef(tool.Sysconfig["mgoPoolSize"], 5),
|
|
|
DbName: extract["dbName"].(string),
|
|
|
}
|
|
|
- MgoExt.InitPool()
|
|
|
- es := Sysconfig["es"].(map[string]interface{})
|
|
|
- Es = &elastic.Elastic{
|
|
|
+ tool.MgoExt.InitPool()
|
|
|
+ es := tool.Sysconfig["es"].(map[string]interface{})
|
|
|
+ tool.Es = &elastic.Elastic{
|
|
|
S_esurl: util.ObjToString(es["addr"]),
|
|
|
I_size: util.IntAllDef(es["pool"], 10),
|
|
|
Username: util.ObjToString(es["user"]),
|
|
|
Password: util.ObjToString(es["password"]),
|
|
|
}
|
|
|
- Index = util.ObjToString(es["index"])
|
|
|
- IndexAll = util.ObjToString(es["index_all"])
|
|
|
+ tool.Index = util.ObjToString(es["index"])
|
|
|
+ tool.IndexAll = util.ObjToString(es["index_all"])
|
|
|
Itype = util.ObjToString(es["itype"])
|
|
|
- Es.InitElasticSize()
|
|
|
+ tool.Es.InitElasticSize()
|
|
|
|
|
|
//加载敏感词文件
|
|
|
- Ms = ms.NewMegaloscope("./rules.txt")
|
|
|
- InitOss()
|
|
|
- initEtcd()
|
|
|
+ tool.Ms = ms.NewMegaloscope("./rules.txt")
|
|
|
+ tool.InitOss()
|
|
|
+ tool.InitEtcd()
|
|
|
initUdp()
|
|
|
}
|
|
|
|
|
|
-func initEtcd() {
|
|
|
- jyRpc := Sysconfig["jy_rpc"].(map[string]interface{})
|
|
|
- Logger.Debug("etcd 注册rpc服务, " + util.ObjToString(jyRpc["key"]))
|
|
|
- JyRpcClient = zrpc.MustNewClient(zrpc.RpcClientConf{
|
|
|
- Etcd: discov.EtcdConf{
|
|
|
- Hosts: strings.Split(util.ObjToString(jyRpc["addr"]), ","),
|
|
|
- Key: util.ObjToString(jyRpc["key"]),
|
|
|
- },
|
|
|
- })
|
|
|
-}
|
|
|
-
|
|
|
func initUdp() {
|
|
|
- updport := Sysconfig["udpPort"].(string)
|
|
|
- UdpClient = udp.UdpClient{Local: updport, BufSize: 1024}
|
|
|
+ updport := tool.Sysconfig["udpPort"].(string)
|
|
|
+ tool.UdpClient = udp.UdpClient{Local: updport, BufSize: 1024}
|
|
|
Logger.Info("Udp 监听 port: " + updport)
|
|
|
- UdpClient.Listen(processUdpMsg)
|
|
|
+ tool.UdpClient.Listen(processUdpMsg)
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
@@ -122,7 +89,7 @@ func main() {
|
|
|
// @Description 剑鱼消息队列 按照类型处理消息
|
|
|
// @Author J 2022/4/14 11:42 AM
|
|
|
func jyNsqMethod() {
|
|
|
- cof := Sysconfig["nsq_jy"].(map[string]interface{})
|
|
|
+ cof := tool.Sysconfig["nsq_jy"].(map[string]interface{})
|
|
|
var err error
|
|
|
MCJy, err = nsq.NewConsumer(&nsq.Cconfig{
|
|
|
IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
|
|
@@ -139,7 +106,7 @@ func jyNsqMethod() {
|
|
|
select {
|
|
|
case obj := <-MCJy.Ch: //从通道读取即可
|
|
|
Logger.Info("jy nsq: " + fmt.Sprint(obj))
|
|
|
- taskInfo(obj)
|
|
|
+ tool.TaskInfo(obj)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -148,17 +115,17 @@ func jyNsqMethod() {
|
|
|
// @Author J 2022/4/14 1:18 PM
|
|
|
func attsNsqMethod() {
|
|
|
var err error
|
|
|
- cofAtts := Sysconfig["nsq_attachment"].(map[string]interface{})
|
|
|
- MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
|
|
|
+ cofAtts := tool.Sysconfig["nsq_attachment"].(map[string]interface{})
|
|
|
+ tool.MProducer, err = nsq.NewProducer(util.ObjToString(cofAtts["addr_p"]), util.ObjToString(cofAtts["topic"]), true)
|
|
|
if err != nil {
|
|
|
Logger.Error(err.Error())
|
|
|
}
|
|
|
- FileTopicResult = util.ObjToString(cofAtts["topic-result"])
|
|
|
+ tool.FileTopicResult = util.ObjToString(cofAtts["topic-result"])
|
|
|
MCAtts, err = nsq.NewConsumer(&nsq.Cconfig{
|
|
|
IsJsonEncode: true,
|
|
|
Addr: util.ObjToString(cofAtts["addr_c"]),
|
|
|
ConnectType: 1, //默认连接nsqd
|
|
|
- Topic: FileTopicResult,
|
|
|
+ Topic: tool.FileTopicResult,
|
|
|
Channel: util.ObjToString(cofAtts["channel"]),
|
|
|
Concurrent: util.IntAllDef(cofAtts["concurrent"], 1), //并发数
|
|
|
})
|
|
@@ -170,7 +137,7 @@ func attsNsqMethod() {
|
|
|
select {
|
|
|
case obj := <-MCAtts.Ch:
|
|
|
Logger.Info("file extract receive nsq: " + fmt.Sprint(obj))
|
|
|
- taskAtts(obj.(map[string]interface{}))
|
|
|
+ tool.TaskAtts(obj.(map[string]interface{}))
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -188,12 +155,12 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
if key == "" {
|
|
|
key = "udpok"
|
|
|
}
|
|
|
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
|
|
|
+ go tool.UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
|
|
|
tasktype, _ := resp["stype"].(string)
|
|
|
switch tasktype {
|
|
|
case "jyfb_data_over":
|
|
|
go func() {
|
|
|
- JyRpcDataFin(resp["infoid"].(string))
|
|
|
+ tool.JyRpcDataFin(resp["infoid"].(string))
|
|
|
}()
|
|
|
}
|
|
|
}
|