main.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/nats-io/nats.go"
  5. "go.mongodb.org/mongo-driver/bson"
  6. cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
  7. iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil"
  8. su "jygit.jydev.jianyu360.cn/data_capture/myself_util/spiderutil"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "time"
  12. )
  13. func init() {
  14. //config
  15. iu.ReadConfig("config.json", &Config)
  16. //mgo
  17. MgoB = &mongodb.MongodbSim{
  18. MongodbAddr: cu.ObjToString(Config["mongodb"]),
  19. DbName: cu.ObjToString(Config["dbname"]),
  20. Size: 1,
  21. UserName: cu.ObjToString(Config["username"]),
  22. Password: cu.ObjToString(Config["password"]),
  23. }
  24. MgoB.InitPool()
  25. //mail
  26. mail := Config["mail"].(map[string]interface{})
  27. Tomail = cu.ObjToString(mail["to"])
  28. Api = cu.ObjToString(mail["api"])
  29. //udp
  30. //初始化oss
  31. oss := Config["oss"].(map[string]interface{})
  32. su.OssInit(
  33. cu.ObjToString(oss["ossEndpoint"]),
  34. cu.ObjToString(oss["ossAccessKeyId"]),
  35. cu.ObjToString(oss["ossAccessKeySecret"]),
  36. cu.ObjToString(oss["ossBucketName"]),
  37. )
  38. //初始化grpc
  39. OcrServerAddr = cu.ObjToString(Config["ocrserveraddr"])
  40. InitFileTextGrpcClient()
  41. //nats
  42. InitNats()
  43. }
  44. func main() {
  45. SubscribeNats()
  46. ch := make(chan bool, 1)
  47. <-ch
  48. }
  49. // SubscribeNats 订阅
  50. func SubscribeNats() {
  51. //先消费,带压缩
  52. Jnats.SubZip(Subscribe, func(msg *nats.Msg) {
  53. data := &MsgInfo{}
  54. err := bson.Unmarshal(msg.Data, &data)
  55. if err != nil {
  56. log.Println("解析数据失败:", err)
  57. data.Err = err.Error()
  58. //SaveData()//保存异常数据
  59. } else {
  60. //处理数据
  61. data.Stime = time.Now().Unix()
  62. data.CurrSetp = Subscribe
  63. isEnd, saveMgo := GetDataAndDownload(data.Data)
  64. data.Etime = time.Now().Unix()
  65. data.IsEnd = isEnd
  66. if saveMgo { //只下载附件未识别成功的数据,跳过中间流程,直接更新bidding
  67. data.NextSetp = Subscribe_Save
  68. data.Extend.MgoSave.SType = "U"
  69. data.Extend.MgoSave.Col = "bidding"
  70. }
  71. if isEnd != 1 {
  72. SaveDataLog(data.Data) //保存记录
  73. }
  74. }
  75. //消息回写
  76. bs, _ := bson.Marshal(data)
  77. err = msg.Respond(bs)
  78. if err != nil {
  79. fmt.Println("回执失败:", data.Id)
  80. //SaveData()//保存异常数据
  81. }
  82. })
  83. }
  84. // SaveDataLog 保存处理成功记录
  85. func SaveDataLog(data map[string]interface{}) {
  86. fmt.Println("保存记录:", data["_id"])
  87. MgoB.SaveByOriID(cu.ObjToString(Config["coll"]), data)
  88. }