mxs преди 1 година
родител
ревизия
aac67b28d9
променени са 2 файла, в които са добавени 148 реда и са изтрити 0 реда
  1. 142 0
      src/spiderutil/nats.go
  2. 6 0
      src/spiderutil/sysconfig.go

+ 142 - 0
src/spiderutil/nats.go

@@ -0,0 +1,142 @@
+package spiderutil
+
+import (
+	"context"
+	"github.com/gogf/gf/v2/encoding/gcompress"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/nats-io/nats.go"
+	"time"
+)
+
+/*
+没有消费者,消息会丢掉
+1、使用reply机制,应用来控制
+2、必须先有订阅,且订阅者不能断
+*/
+
+type Jnats struct {
+	Addr string //nats服务地址
+	Nc   *nats.Conn
+}
+
+func NewJnats(addr string) *Jnats {
+	js := &Jnats{
+		Addr: addr,
+	}
+	js.ReConnect()
+	return js
+}
+
+// 连接、设置、重试
+func (j *Jnats) ReConnect() bool {
+	var err error
+	opts := []nats.Option{
+		//nats.Name(c.Name), 指定clent名字
+		// nats.SetCustomDialer(n),
+		nats.MaxReconnects(86400),
+		nats.ReconnectWait(time.Second), //默认两秒
+		nats.ReconnectBufSize(83886080), //在链接繁时的消息缓冲,默认8M
+		nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
+			g.Log().Error(context.Background(), "NATS error: %v", err)
+		}),
+		nats.DisconnectHandler(func(c *nats.Conn) {
+			g.Log().Info(context.Background(), "Disconnected from NATS")
+		}),
+		nats.ClosedHandler(func(c *nats.Conn) {
+			g.Log().Info(context.Background(), "NATS connection is closed")
+		}),
+	}
+	if j.Nc == nil || j.Nc.IsClosed() {
+		j.Nc, err = nats.Connect(j.Addr, opts...)
+		if err != nil {
+			g.Log().Error(context.Background(), "NATS connect error: %v", err)
+			time.Sleep(time.Second)
+			return j.ReConnect()
+		} else {
+			return true
+		}
+	} else if j.Nc.IsConnected() { //连接状态
+		return true
+	} else { //异常状态
+		j.Nc.Flush()
+		j.Nc.Drain()
+		j.Nc.Close()
+		time.Sleep(time.Second)
+		return j.ReConnect()
+	}
+	return false
+}
+
+// 生产消息
+func (j *Jnats) Pub(sub string, msg any) error {
+	return j.Nc.Publish(sub, g.NewVar(msg).Bytes())
+}
+
+// 生产压缩消息
+func (j *Jnats) PubZip(sub string, msg any) error {
+	res, err := gcompress.Zlib(g.NewVar(msg).Bytes())
+	if err != nil {
+		return err
+	} else {
+		return j.Nc.Publish(sub, res)
+	}
+}
+
+// 直接消费消息
+func (j *Jnats) Sub(sub string, handle func(msg *nats.Msg)) {
+	_, err := j.Nc.Subscribe(sub, handle)
+	if err != nil {
+		g.Log().Error(context.Background(), err)
+	}
+}
+
+// 直接消费压缩消息
+func (j *Jnats) SubZip(sub string, handle func(msg *nats.Msg)) {
+	_, err := j.Nc.Subscribe(sub, func(msg *nats.Msg) {
+		v := msg.Data
+		if len(v) > 0 {
+			res, err := gcompress.UnZlib(v)
+			if err != nil {
+				g.Log().Error(context.Background(), "NATS gcompress error: %v", err, msg)
+			} else {
+				msg.Data = res
+			}
+		}
+		handle(msg)
+	})
+	if err != nil {
+		g.Log().Error(context.Background(), err)
+	}
+}
+
+// 队列负载分组消费压缩消息
+func (j *Jnats) QueueSubZip(sub, queue string, handle func(msg *nats.Msg)) {
+	_, err := j.Nc.QueueSubscribe(sub, queue, func(msg *nats.Msg) {
+		v := msg.Data
+		if len(v) > 0 {
+			res, err := gcompress.UnZlib(v)
+			if err != nil {
+				g.Log().Error(context.Background(), "NATS gcompress error: %v", err, msg)
+			} else {
+				msg.Data = res
+			}
+		}
+		handle(msg)
+	})
+	if err != nil {
+		g.Log().Error(context.Background(), err)
+	}
+}
+
+//queue 队列消息暂时不用
+
+// 回复机制
+// 生产压缩消息并请求回执
+func (j *Jnats) PubReqZip(sub string, msg any, timeout time.Duration) (*nats.Msg, error) {
+	res, err := gcompress.Zlib(g.NewVar(msg).Bytes())
+	if err != nil {
+		return nil, err
+	} else {
+		return j.Nc.Request(sub, res, timeout)
+	}
+}

+ 6 - 0
src/spiderutil/sysconfig.go

@@ -112,6 +112,12 @@ type config struct {
 	DataDB dbInfo `json:"datadb"`
 	//spider数据库
 	LuaSpiderDB dbInfo `json:"luaspiderdb"`
+	//nats
+	SpiderNats Nats `json:"nats"`
+}
+type Nats struct {
+	NatsUrl   string `json:"natsurl"`
+	Subscribe string `json:"subscribe"`
 }
 
 type flow struct {