123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package main
- import (
- "fmt"
- "github.com/nats-io/nats.go"
- "go.mongodb.org/mongo-driver/bson"
- cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
- iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil"
- su "jygit.jydev.jianyu360.cn/data_capture/myself_util/spiderutil"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "time"
- )
- func init() {
- //config
- iu.ReadConfig("config.json", &Config)
- //mgo
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: cu.ObjToString(Config["mongodb"]),
- DbName: cu.ObjToString(Config["dbname"]),
- Size: 1,
- UserName: cu.ObjToString(Config["username"]),
- Password: cu.ObjToString(Config["password"]),
- }
- MgoB.InitPool()
- //mail
- mail := Config["mail"].(map[string]interface{})
- Tomail = cu.ObjToString(mail["to"])
- Api = cu.ObjToString(mail["api"])
- //udp
- //初始化oss
- oss := Config["oss"].(map[string]interface{})
- su.OssInit(
- cu.ObjToString(oss["ossEndpoint"]),
- cu.ObjToString(oss["ossAccessKeyId"]),
- cu.ObjToString(oss["ossAccessKeySecret"]),
- cu.ObjToString(oss["ossBucketName"]),
- )
- //初始化grpc
- OcrServerAddr = cu.ObjToString(Config["ocrserveraddr"])
- InitFileTextGrpcClient()
- //nats
- InitNats()
- }
- func main() {
- SubscribeNats()
- ch := make(chan bool, 1)
- <-ch
- }
- // SubscribeNats 订阅
- func SubscribeNats() {
- //先消费,带压缩
- Jnats.SubZip(Subscribe, func(msg *nats.Msg) {
- data := &MsgInfo{}
- err := bson.Unmarshal(msg.Data, &data)
- if err != nil {
- log.Println("解析数据失败:", err)
- data.Err = err.Error()
- //SaveData()//保存异常数据
- } else {
- //处理数据
- data.Stime = time.Now().Unix()
- data.CurrSetp = Subscribe
- isEnd, saveMgo := GetDataAndDownload(data.Data)
- data.Etime = time.Now().Unix()
- data.IsEnd = isEnd
- if saveMgo { //只下载附件未识别成功的数据,跳过中间流程,直接更新bidding
- data.NextSetp = Subscribe_Save
- data.Extend.MgoSave.SType = "U"
- data.Extend.MgoSave.Col = "bidding"
- }
- if isEnd != 1 {
- SaveDataLog(data.Data) //保存记录
- }
- }
- //消息回写
- bs, _ := bson.Marshal(data)
- err = msg.Respond(bs)
- if err != nil {
- fmt.Println("回执失败:", data.Id)
- //SaveData()//保存异常数据
- }
- })
- }
- // SaveDataLog 保存处理成功记录
- func SaveDataLog(data map[string]interface{}) {
- fmt.Println("保存记录:", data["_id"])
- MgoB.SaveByOriID(cu.ObjToString(Config["coll"]), data)
- }
|