Browse Source

数据格式化优化

mxs 1 year ago
parent
commit
169d3e10e2
4 changed files with 18 additions and 17 deletions
  1. 3 3
      client_test.go
  2. 1 1
      go.mod
  3. 9 10
      main.go
  4. 5 3
      nats.go

+ 3 - 3
client_test.go

@@ -1,8 +1,8 @@
 package main
 
 import (
-	"encoding/json"
 	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"testing"
 	"time"
@@ -21,14 +21,14 @@ func Test_NatsClient(t *testing.T) {
 		CurrSetp: "test", //当前步骤
 		Data:     *tmp,   //数据内容
 	}
-	msgByte, _ := json.Marshal(msg)
+	msgByte, _ := bson.Marshal(msg)
 	resp, err := Jnats.PubReqZip(Subscribe, msgByte, 10*time.Second)
 	if err != nil {
 		fmt.Println("发布回执异常:", err)
 		return
 	}
 	respMsg := &MsgInfo{}
-	if json.Unmarshal(resp.Data, &respMsg) == nil {
+	if bson.Unmarshal(resp.Data, &respMsg) == nil {
 		fmt.Println(respMsg.CurrSetp)
 		fmt.Println(respMsg.Data["detail"])
 	} else {

+ 1 - 1
go.mod

@@ -5,6 +5,7 @@ go 1.18
 require (
 	github.com/aliyun/aliyun-oss-go-sdk v3.0.1+incompatible
 	github.com/nats-io/nats.go v1.31.0
+	go.mongodb.org/mongo-driver v1.13.1
 	jygit.jydev.jianyu360.cn/BP/jynats v0.0.0-20231206094405-2ff9da3175bc
 	jygit.jydev.jianyu360.cn/data_capture/myself_util v0.0.0-20231213053029-7188bebb69d3
 	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230915054514-628d4fe7544c
@@ -38,7 +39,6 @@ require (
 	github.com/xdg-go/scram v1.1.2 // indirect
 	github.com/xdg-go/stringprep v1.0.4 // indirect
 	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
-	go.mongodb.org/mongo-driver v1.13.1 // indirect
 	go.opentelemetry.io/otel v1.14.0 // indirect
 	go.opentelemetry.io/otel/sdk v1.14.0 // indirect
 	go.opentelemetry.io/otel/trace v1.14.0 // indirect

+ 9 - 10
main.go

@@ -1,9 +1,9 @@
 package main
 
 import (
-	"encoding/json"
 	"fmt"
 	"github.com/nats-io/nats.go"
+	"go.mongodb.org/mongo-driver/bson"
 	cu "jygit.jydev.jianyu360.cn/data_capture/myself_util/commonutil"
 	iu "jygit.jydev.jianyu360.cn/data_capture/myself_util/initutil"
 	su "jygit.jydev.jianyu360.cn/data_capture/myself_util/spiderutil"
@@ -13,11 +13,10 @@ import (
 )
 
 var (
-	Config    map[string]interface{}
-	Webport   string
-	Subscribe string
-	Api       string
-	To        string
+	Config  map[string]interface{}
+	Webport string
+	Api     string
+	To      string
 )
 
 func init() {
@@ -26,7 +25,6 @@ func init() {
 	InitOss()      //oss
 	InitNats()     //nats
 	Webport = cu.ObjToString(Config["webport"])
-	Subscribe = cu.ObjToString(Config["subscribe"])
 	Api = cu.ObjToString(Config["api"])
 	To = cu.ObjToString(Config["to"])
 }
@@ -37,14 +35,15 @@ func main() {
 	<-ch
 }
 
+// SubscribeNats nats订阅
 func SubscribeNats() {
 	//先消费,带压缩
 	Jnats.SubZip(Subscribe, func(msg *nats.Msg) {
 		data := &MsgInfo{}
-		err := json.Unmarshal(msg.Data, &data)
+		err := bson.Unmarshal(msg.Data, &data)
 		if err != nil {
 			log.Println("解析数据失败:", err)
-			data.Err = err
+			data.Err = err.Error()
 			//SaveData()//保存异常数据
 		} else {
 			//处理数据
@@ -54,7 +53,7 @@ func SubscribeNats() {
 			data.Etime = time.Now().Unix()
 		}
 		//消息回写
-		bs, _ := json.Marshal(data)
+		bs, _ := bson.Marshal(data)
 		err = msg.Respond(bs)
 		if err != nil {
 			fmt.Println("回执失败:", data.Id)

+ 5 - 3
nats.go

@@ -6,8 +6,9 @@ import (
 )
 
 var (
-	NatsUrl string
-	Jnats   *jnats.Jnats
+	NatsUrl   string
+	Jnats     *jnats.Jnats
+	Subscribe string
 )
 
 type MsgInfo struct {
@@ -33,7 +34,7 @@ type MsgInfo struct {
 			Index string //索引
 		}
 	}
-	Err   error //错误信息 有错误会告警并终止流程
+	Err   string //错误信息 有错误会告警并终止流程
 	Stime int64
 	Etime int64
 }
@@ -42,4 +43,5 @@ type MsgInfo struct {
 func InitNats() {
 	NatsUrl = cu.ObjToString(Config["natsurl"])
 	Jnats = jnats.NewJnats(NatsUrl)
+	Subscribe = cu.ObjToString(Config["subscribe"])
 }