|
@@ -1,142 +0,0 @@
|
|
|
-package spiderutil
|
|
|
-
|
|
|
-import (
|
|
|
- "context"
|
|
|
- "github.com/gogf/gf/v2/encoding/gcompress"
|
|
|
- "github.com/gogf/gf/v2/frame/g"
|
|
|
- "github.com/nats-io/nats.go"
|
|
|
- "time"
|
|
|
-)
|
|
|
-
|
|
|
-/*
|
|
|
-没有消费者,消息会丢掉
|
|
|
-1、使用reply机制,应用来控制
|
|
|
-2、必须先有订阅,且订阅者不能断
|
|
|
-*/
|
|
|
-
|
|
|
-type Jnats struct {
|
|
|
- Addr string //nats服务地址
|
|
|
- Nc *nats.Conn
|
|
|
-}
|
|
|
-
|
|
|
-func NewJnats(addr string) *Jnats {
|
|
|
- js := &Jnats{
|
|
|
- Addr: addr,
|
|
|
- }
|
|
|
- js.ReConnect()
|
|
|
- return js
|
|
|
-}
|
|
|
-
|
|
|
-// 连接、设置、重试
|
|
|
-func (j *Jnats) ReConnect() bool {
|
|
|
- var err error
|
|
|
- opts := []nats.Option{
|
|
|
- //nats.Name(c.Name), 指定clent名字
|
|
|
- // nats.SetCustomDialer(n),
|
|
|
- nats.MaxReconnects(86400),
|
|
|
- nats.ReconnectWait(time.Second), //默认两秒
|
|
|
- nats.ReconnectBufSize(83886080), //在链接繁时的消息缓冲,默认8M
|
|
|
- nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
|
|
|
- g.Log().Error(context.Background(), "NATS error: %v", err)
|
|
|
- }),
|
|
|
- nats.DisconnectHandler(func(c *nats.Conn) {
|
|
|
- g.Log().Info(context.Background(), "Disconnected from NATS")
|
|
|
- }),
|
|
|
- nats.ClosedHandler(func(c *nats.Conn) {
|
|
|
- g.Log().Info(context.Background(), "NATS connection is closed")
|
|
|
- }),
|
|
|
- }
|
|
|
- if j.Nc == nil || j.Nc.IsClosed() {
|
|
|
- j.Nc, err = nats.Connect(j.Addr, opts...)
|
|
|
- if err != nil {
|
|
|
- g.Log().Error(context.Background(), "NATS connect error: %v", err)
|
|
|
- time.Sleep(time.Second)
|
|
|
- return j.ReConnect()
|
|
|
- } else {
|
|
|
- return true
|
|
|
- }
|
|
|
- } else if j.Nc.IsConnected() { //连接状态
|
|
|
- return true
|
|
|
- } else { //异常状态
|
|
|
- j.Nc.Flush()
|
|
|
- j.Nc.Drain()
|
|
|
- j.Nc.Close()
|
|
|
- time.Sleep(time.Second)
|
|
|
- return j.ReConnect()
|
|
|
- }
|
|
|
- return false
|
|
|
-}
|
|
|
-
|
|
|
-// 生产消息
|
|
|
-func (j *Jnats) Pub(sub string, msg any) error {
|
|
|
- return j.Nc.Publish(sub, g.NewVar(msg).Bytes())
|
|
|
-}
|
|
|
-
|
|
|
-// 生产压缩消息
|
|
|
-func (j *Jnats) PubZip(sub string, msg any) error {
|
|
|
- res, err := gcompress.Zlib(g.NewVar(msg).Bytes())
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- } else {
|
|
|
- return j.Nc.Publish(sub, res)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// 直接消费消息
|
|
|
-func (j *Jnats) Sub(sub string, handle func(msg *nats.Msg)) {
|
|
|
- _, err := j.Nc.Subscribe(sub, handle)
|
|
|
- if err != nil {
|
|
|
- g.Log().Error(context.Background(), err)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// 直接消费压缩消息
|
|
|
-func (j *Jnats) SubZip(sub string, handle func(msg *nats.Msg)) {
|
|
|
- _, err := j.Nc.Subscribe(sub, func(msg *nats.Msg) {
|
|
|
- v := msg.Data
|
|
|
- if len(v) > 0 {
|
|
|
- res, err := gcompress.UnZlib(v)
|
|
|
- if err != nil {
|
|
|
- g.Log().Error(context.Background(), "NATS gcompress error: %v", err, msg)
|
|
|
- } else {
|
|
|
- msg.Data = res
|
|
|
- }
|
|
|
- }
|
|
|
- handle(msg)
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- g.Log().Error(context.Background(), err)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// 队列负载分组消费压缩消息
|
|
|
-func (j *Jnats) QueueSubZip(sub, queue string, handle func(msg *nats.Msg)) {
|
|
|
- _, err := j.Nc.QueueSubscribe(sub, queue, func(msg *nats.Msg) {
|
|
|
- v := msg.Data
|
|
|
- if len(v) > 0 {
|
|
|
- res, err := gcompress.UnZlib(v)
|
|
|
- if err != nil {
|
|
|
- g.Log().Error(context.Background(), "NATS gcompress error: %v", err, msg)
|
|
|
- } else {
|
|
|
- msg.Data = res
|
|
|
- }
|
|
|
- }
|
|
|
- handle(msg)
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- g.Log().Error(context.Background(), err)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-//queue 队列消息暂时不用
|
|
|
-
|
|
|
-// 回复机制
|
|
|
-// 生产压缩消息并请求回执
|
|
|
-func (j *Jnats) PubReqZip(sub string, msg any, timeout time.Duration) (*nats.Msg, error) {
|
|
|
- res, err := gcompress.Zlib(g.NewVar(msg).Bytes())
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- } else {
|
|
|
- return j.Nc.Request(sub, res, timeout)
|
|
|
- }
|
|
|
-}
|