package main import ( "fmt" "github.com/nats-io/nats.go" "go.mongodb.org/mongo-driver/bson" cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil" iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil" "log" "net/http" "time" ) var ( Config map[string]interface{} Webport string Api string To string ) func init() { iu.ReadConfig(&Config) InitFileInfo() //初始化附件解析信息 InitOss() //oss InitNats() //nats Webport = cu.ObjToString(Config["webport"]) Api = cu.ObjToString(Config["api"]) To = cu.ObjToString(Config["to"]) } func main() { go http.ListenAndServe(":"+Webport, nil) SubscribeNats() ch := make(chan bool, 1) <-ch } // SubscribeNats nats订阅 func SubscribeNats() { //先消费,带压缩 Jnats.SubZip(Subscribe+"."+Step, func(msg *nats.Msg) { NatsThreads <- true go func(msg *nats.Msg) { defer func() { <-NatsThreads }() data := &MsgInfo{} err := bson.Unmarshal(msg.Data, &data) if err != nil { log.Println("解析数据失败:", err) data.Err = err.Error() //SaveData()//保存异常数据 } else { //处理数据 data.Stime = time.Now().Unix() data.CurrSetp = Step DealFile(data.Data) data.Etime = time.Now().Unix() } //消息回写 bs, _ := bson.Marshal(data) err = msg.Respond(bs) if err != nil { fmt.Println("回执失败:", data.Id, data.Data["_id"]) //SaveData()//保存异常数据 } }(msg) }) } /*func SendMail(bodyTextAll string) { res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, To, "ocr_file_over", bodyTextAll)) if err == nil { defer res.Body.Close() read, err := ioutil.ReadAll(res.Body) fmt.Println("邮件发送成功:", string(read), err) } else { fmt.Println("邮件发送失败:", err) } } */