package main import ( "fmt" "github.com/nats-io/nats.go" "go.mongodb.org/mongo-driver/bson" cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil" iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil" su "jygit.jydev.jianyu360.cn/data_capture/myself_util/spiderutil" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "time" ) func init() { //config iu.ReadConfig("config.json", &Config) //mgo MgoB = &mongodb.MongodbSim{ MongodbAddr: cu.ObjToString(Config["mongodb"]), DbName: cu.ObjToString(Config["dbname"]), Size: 1, UserName: cu.ObjToString(Config["username"]), Password: cu.ObjToString(Config["password"]), } MgoB.InitPool() //mail mail := Config["mail"].(map[string]interface{}) Tomail = cu.ObjToString(mail["to"]) Api = cu.ObjToString(mail["api"]) //udp //初始化oss oss := Config["oss"].(map[string]interface{}) su.OssInit( cu.ObjToString(oss["ossEndpoint"]), cu.ObjToString(oss["ossAccessKeyId"]), cu.ObjToString(oss["ossAccessKeySecret"]), cu.ObjToString(oss["ossBucketName"]), ) //初始化grpc OcrServerAddr = cu.ObjToString(Config["ocrserveraddr"]) InitFileTextGrpcClient() //nats InitNats() } func main() { SubscribeNats() ch := make(chan bool, 1) <-ch } // SubscribeNats 订阅 func SubscribeNats() { //先消费,带压缩 Jnats.SubZip(Subscribe, func(msg *nats.Msg) { data := &MsgInfo{} err := bson.Unmarshal(msg.Data, &data) if err != nil { log.Println("解析数据失败:", err) data.Err = err.Error() //SaveData()//保存异常数据 } else { //处理数据 data.Stime = time.Now().Unix() data.CurrSetp = Subscribe isEnd, saveMgo := GetDataAndDownload(data.Data) data.Etime = time.Now().Unix() data.IsEnd = isEnd if saveMgo { //只下载附件未识别成功的数据,跳过中间流程,直接更新bidding data.NextSetp = Subscribe_Save data.Extend.MgoSave.SType = "U" data.Extend.MgoSave.Col = "bidding" } if isEnd != 1 { SaveDataLog(data.Data) //保存记录 } } //消息回写 bs, _ := bson.Marshal(data) err = msg.Respond(bs) if err != nil { fmt.Println("回执失败:", data.Id) //SaveData()//保存异常数据 } }) } // SaveDataLog 保存处理成功记录 func SaveDataLog(data map[string]interface{}) { fmt.Println("保存记录:", data["_id"]) MgoB.SaveByOriID(cu.ObjToString(Config["coll"]), data) }