main.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/nats-io/nats.go"
  5. "go.mongodb.org/mongo-driver/bson"
  6. cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
  7. iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil"
  8. su "jygit.jydev.jianyu360.cn/data_capture/myself_util/spiderutil"
  9. "log"
  10. "net/http"
  11. "time"
  12. )
  13. var (
  14. Config map[string]interface{}
  15. Webport string
  16. Api string
  17. To string
  18. )
  19. func init() {
  20. iu.ReadConfig(&Config)
  21. InitFileInfo() //初始化附件解析信息
  22. InitOss() //oss
  23. InitNats() //nats
  24. Webport = cu.ObjToString(Config["webport"])
  25. Api = cu.ObjToString(Config["api"])
  26. To = cu.ObjToString(Config["to"])
  27. }
  28. func main() {
  29. go http.ListenAndServe(":"+Webport, nil)
  30. SubscribeNats()
  31. ch := make(chan bool, 1)
  32. <-ch
  33. }
  34. // SubscribeNats nats订阅
  35. func SubscribeNats() {
  36. //先消费,带压缩
  37. Jnats.SubZip(Subscribe, func(msg *nats.Msg) {
  38. data := &MsgInfo{}
  39. err := bson.Unmarshal(msg.Data, &data)
  40. if err != nil {
  41. log.Println("解析数据失败:", err)
  42. data.Err = err.Error()
  43. //SaveData()//保存异常数据
  44. } else {
  45. //处理数据
  46. data.Stime = time.Now().Unix()
  47. data.CurrSetp = Subscribe
  48. DealFile(data.Data)
  49. data.Etime = time.Now().Unix()
  50. }
  51. //消息回写
  52. bs, _ := bson.Marshal(data)
  53. err = msg.Respond(bs)
  54. if err != nil {
  55. fmt.Println("回执失败:", data.Id)
  56. //SaveData()//保存异常数据
  57. }
  58. })
  59. }
  60. func DealFile(tmp map[string]interface{}) {
  61. site := cu.ObjToString(tmp["site"]) //解析附件站点
  62. if limitRatio := OssSite[site]; limitRatio > 0 { //配置站点解析附件,根据准确率情况替换正文
  63. replace, filetext := AnalysisFile(true, limitRatio, tmp)
  64. if replace { //替换正文
  65. tmp["detail"] = filetext
  66. }
  67. } else { //其它网站附件信息,detail无效,只有一个附件且不是ocr识别的,替换正文
  68. //判断detail是否有效
  69. detail := cu.ObjToString(tmp["detail"])
  70. detail = su.FilterDetail(detail) //只保留文本内容
  71. if len([]rune(detail)) <= 5 || (len([]rune(detail)) <= 50 && SpecialTextReg.MatchString(detail)) {
  72. replace, filetext := AnalysisFile(false, 0, tmp)
  73. if replace { //替换正文
  74. tmp["detail"] = filetext
  75. }
  76. }
  77. }
  78. }
  79. /*func SendMail(bodyTextAll string) {
  80. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, To, "ocr_file_over", bodyTextAll))
  81. if err == nil {
  82. defer res.Body.Close()
  83. read, err := ioutil.ReadAll(res.Body)
  84. fmt.Println("邮件发送成功:", string(read), err)
  85. } else {
  86. fmt.Println("邮件发送失败:", err)
  87. }
  88. }
  89. */