util.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. // util
  2. package util
  3. import (
  4. "bytes"
  5. "compress/gzip"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "time"
  13. "github.com/nats-io/nats.go"
  14. "go.mongodb.org/mongo-driver/bson"
  15. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  16. )
  17. type MsgInfo struct {
  18. Id string //消息唯一id
  19. CurrSetp string //当前步骤
  20. NextSetp string
  21. Data map[string]interface{} //数据内容
  22. Extend struct {
  23. File struct {
  24. IsFile int //是否有附件 1是 -1否
  25. }
  26. Extract struct { //抽取
  27. }
  28. MgoSave struct { //mgo保存更新
  29. SType string //更新u 保存s
  30. Col string //表
  31. }
  32. EsSave struct { //es保存更新
  33. SType string //更新u 保存s
  34. Index string //索引
  35. }
  36. }
  37. Err string //错误信息
  38. Stime int64
  39. Etime int64
  40. }
  41. func SendRequest(jn *jnats.Jnats, subject, step string, requestData *MsgInfo, timeout time.Duration) (*MsgInfo, error) {
  42. requestData.CurrSetp = step
  43. stime := time.Now().Unix()
  44. var rep *nats.Msg
  45. var errs error
  46. // 发送请求并等待响应
  47. bs, err := bson.Marshal(requestData)
  48. if err != nil {
  49. return nil, err
  50. }
  51. if step == "extract" {
  52. // 压缩
  53. var buf bytes.Buffer
  54. gz := gzip.NewWriter(&buf)
  55. _, errs := gz.Write(bs)
  56. if errs != nil {
  57. fmt.Println("压缩失败:", errs)
  58. return nil, errs
  59. }
  60. errs = gz.Close()
  61. if errs != nil {
  62. fmt.Println("关闭压缩器失败:", errs)
  63. return nil, errs
  64. }
  65. // 获取压缩后的字节数据
  66. compressedData := buf.Bytes()
  67. rep, errs = jn.Nc.Request(subject+"."+step, compressedData, timeout)
  68. if err != nil {
  69. return nil, errs
  70. }
  71. } else {
  72. rep, errs = jn.PubReqZip(subject+"."+step, bs, timeout)
  73. if err != nil {
  74. return nil, errs
  75. }
  76. }
  77. // 返回响应数据
  78. msgInfo := &MsgInfo{}
  79. msgInfo.Etime = time.Now().Unix()
  80. msgInfo.Stime = stime
  81. if rep != nil {
  82. err = bson.Unmarshal(rep.Data, msgInfo)
  83. if err != nil {
  84. return nil, err
  85. }
  86. } else {
  87. return nil, errors.New("返回消息为空")
  88. }
  89. return msgInfo, nil
  90. }
  91. // 发送告警
  92. func Send(msg string) {
  93. m := map[string]interface{}{
  94. "msgtype": "text",
  95. "text": map[string]string{
  96. "content": msg,
  97. },
  98. }
  99. b, _ := json.Marshal(m)
  100. res, err := http.Post("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=337d29b8-a5fc-401c-8fc7-ecc42e864360", "application/json", bytes.NewReader(b))
  101. if err != nil {
  102. log.Println("发送出错", err)
  103. } else {
  104. defer res.Body.Close()
  105. resByte, _ := ioutil.ReadAll(res.Body)
  106. log.Println("发送结果", string(resByte))
  107. }
  108. }