123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- // util
- package util
- import (
- "bytes"
- "compress/gzip"
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "time"
- "github.com/nats-io/nats.go"
- "go.mongodb.org/mongo-driver/bson"
- "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
- )
- type MsgInfo struct {
- Id string //消息唯一id
- CurrSetp string //当前步骤
- NextSetp string
- Data map[string]interface{} //数据内容
- Extend struct {
- File struct {
- IsFile int //是否有附件 1是 -1否
- }
- Extract struct { //抽取
- }
- MgoSave struct { //mgo保存更新
- SType string //更新u 保存s
- Col string //表
- }
- EsSave struct { //es保存更新
- SType string //更新u 保存s
- Index string //索引
- }
- }
- Err string //错误信息
- Stime int64
- Etime int64
- }
- func SendRequest(jn *jnats.Jnats, subject, step string, requestData *MsgInfo, timeout time.Duration) (*MsgInfo, error) {
- requestData.CurrSetp = step
- stime := time.Now().Unix()
- var rep *nats.Msg
- var errs error
- // 发送请求并等待响应
- bs, err := bson.Marshal(requestData)
- if err != nil {
- return nil, err
- }
- if step == "extract" {
- // 压缩
- var buf bytes.Buffer
- gz := gzip.NewWriter(&buf)
- _, errs := gz.Write(bs)
- if errs != nil {
- fmt.Println("压缩失败:", errs)
- return nil, errs
- }
- errs = gz.Close()
- if errs != nil {
- fmt.Println("关闭压缩器失败:", errs)
- return nil, errs
- }
- // 获取压缩后的字节数据
- compressedData := buf.Bytes()
- rep, errs = jn.Nc.Request(subject+"."+step, compressedData, timeout)
- if err != nil {
- return nil, errs
- }
- } else {
- rep, errs = jn.PubReqZip(subject+"."+step, bs, timeout)
- if err != nil {
- return nil, errs
- }
- }
- // 返回响应数据
- msgInfo := &MsgInfo{}
- msgInfo.Etime = time.Now().Unix()
- msgInfo.Stime = stime
- if rep != nil {
- err = bson.Unmarshal(rep.Data, msgInfo)
- if err != nil {
- return nil, err
- }
- } else {
- return nil, errors.New("返回消息为空")
- }
- return msgInfo, nil
- }
- // 发送告警
- func Send(msg string) {
- m := map[string]interface{}{
- "msgtype": "text",
- "text": map[string]string{
- "content": msg,
- },
- }
- b, _ := json.Marshal(m)
- res, err := http.Post("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=337d29b8-a5fc-401c-8fc7-ecc42e864360", "application/json", bytes.NewReader(b))
- if err != nil {
- log.Println("发送出错", err)
- } else {
- defer res.Body.Close()
- resByte, _ := ioutil.ReadAll(res.Body)
- log.Println("发送结果", string(resByte))
- }
- }
|