1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- package main
- import (
- "fmt"
- "github.com/nats-io/nats.go"
- "go.mongodb.org/mongo-driver/bson"
- cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
- iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil"
- "log"
- "net/http"
- "time"
- )
- var (
- Config map[string]interface{}
- Webport string
- Api string
- To string
- )
- func init() {
- iu.ReadConfig(&Config)
- InitFileInfo() //初始化附件解析信息
- InitOss() //oss
- InitNats() //nats
- Webport = cu.ObjToString(Config["webport"])
- Api = cu.ObjToString(Config["api"])
- To = cu.ObjToString(Config["to"])
- }
- func main() {
- go http.ListenAndServe(":"+Webport, nil)
- SubscribeNats()
- ch := make(chan bool, 1)
- <-ch
- }
- // SubscribeNats nats订阅
- func SubscribeNats() {
- //先消费,带压缩
- Jnats.SubZip(Subscribe+"."+Step, func(msg *nats.Msg) {
- NatsThreads <- true
- go func(msg *nats.Msg) {
- defer func() {
- <-NatsThreads
- }()
- data := &MsgInfo{}
- err := bson.Unmarshal(msg.Data, &data)
- if err != nil {
- log.Println("解析数据失败:", err)
- data.Err = err.Error()
- //SaveData()//保存异常数据
- } else {
- //处理数据
- data.Stime = time.Now().Unix()
- data.CurrSetp = Step
- DealFile(data.Data)
- data.Etime = time.Now().Unix()
- }
- //消息回写
- bs, _ := bson.Marshal(data)
- err = msg.Respond(bs)
- if err != nil {
- fmt.Println("回执失败:", data.Id, data.Data["_id"])
- //SaveData()//保存异常数据
- }
- }(msg)
- })
- }
- /*func SendMail(bodyTextAll string) {
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, To, "ocr_file_over", bodyTextAll))
- if err == nil {
- defer res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- fmt.Println("邮件发送成功:", string(read), err)
- } else {
- fmt.Println("邮件发送失败:", err)
- }
- }
- */
|