|
@@ -0,0 +1,194 @@
|
|
|
+package jnatsjs
|
|
|
+
|
|
|
+//
|
|
|
+//import (
|
|
|
+// "context"
|
|
|
+// "github.com/gogf/gf/v2/encoding/gcompress"
|
|
|
+// "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
|
|
|
+// "time"
|
|
|
+//
|
|
|
+// "github.com/gogf/gf/v2/frame/g"
|
|
|
+// "github.com/nats-io/nats.go"
|
|
|
+//)
|
|
|
+//
|
|
|
+//type Jnatsjs struct {
|
|
|
+// Jnats *jnats.Jnats
|
|
|
+// Jctx nats.JetStreamContext
|
|
|
+//}
|
|
|
+//
|
|
|
+//func NewJnatsjs(addr string) *Jnatsjs {
|
|
|
+// jn := jnats.NewJnats(addr)
|
|
|
+// jctx, err := jn.Nc.JetStream()
|
|
|
+// if err != nil {
|
|
|
+// g.Log().Error(context.Background(), err)
|
|
|
+// }
|
|
|
+// jsn := &Jnatsjs{Jnats: jn, Jctx: jctx}
|
|
|
+// jsn.NewDefaultStream()
|
|
|
+// //jsn.AddDefaultConsumer()
|
|
|
+// return jsn
|
|
|
+//}
|
|
|
+//
|
|
|
+//func (j *Jnatsjs) ReConnect() {
|
|
|
+// if j.Jnats.ReConnect() {
|
|
|
+// jctx, err := j.Jnats.Nc.JetStream()
|
|
|
+// if err != nil {
|
|
|
+// g.Log().Error(context.Background(), err)
|
|
|
+// } else {
|
|
|
+// j.Jctx = jctx
|
|
|
+// j.NewDefaultStream()
|
|
|
+// }
|
|
|
+// }
|
|
|
+//}
|
|
|
+//
|
|
|
+//// 添加流,如果存在则不处理
|
|
|
+//func (j *Jnatsjs) NewStream(cfg *nats.StreamConfig) (*nats.StreamInfo, error) {
|
|
|
+// st, err := j.Jctx.StreamInfo(cfg.Name)
|
|
|
+// if st != nil {
|
|
|
+// return st, err
|
|
|
+// }
|
|
|
+// return j.Jctx.AddStream(cfg)
|
|
|
+//}
|
|
|
+//
|
|
|
+//// 添加默认的流 jydef ,管理以jydef开头的主题
|
|
|
+//func (j *Jnatsjs) NewDefaultStream() (*nats.StreamInfo, error) {
|
|
|
+// cfg := &nats.StreamConfig{
|
|
|
+// Name: "jydef",
|
|
|
+// Subjects: []string{"jydef.>"},
|
|
|
+// Storage: nats.MemoryStorage, // 儲存的方式 (預設 FileStorage)
|
|
|
+// Retention: nats.LimitsPolicy, // 保留的策略
|
|
|
+// Discard: nats.DiscardOld,
|
|
|
+// MaxAge: 72 * time.Hour, //保存三天
|
|
|
+// MaxBytes: 2147483648, //占用2G
|
|
|
+// MaxMsgs: 2000000, //二佰万消息
|
|
|
+// MaxConsumers: 256, //消费者
|
|
|
+// }
|
|
|
+// st, err := j.Jctx.StreamInfo(cfg.Name)
|
|
|
+// if st != nil {
|
|
|
+// return st, err
|
|
|
+// }
|
|
|
+// return j.Jctx.AddStream(cfg)
|
|
|
+//}
|
|
|
+//
|
|
|
+//// 添加默认的消费者
|
|
|
+//func (j *Jnatsjs) AddDefaultConsumer() (*nats.ConsumerInfo, error) {
|
|
|
+// c, err := j.Jctx.ConsumerInfo("jydef", "jydef")
|
|
|
+// if c != nil { //存在
|
|
|
+// return c, err
|
|
|
+// } else {
|
|
|
+// if err != nil { //nats: consumer not found
|
|
|
+// g.Log().Error(context.Background(), err)
|
|
|
+// }
|
|
|
+//
|
|
|
+// cfg := &nats.ConsumerConfig{
|
|
|
+// Name: "jydef",
|
|
|
+// Durable: "jydef",
|
|
|
+// AckPolicy: nats.AckExplicitPolicy,
|
|
|
+// AckWait: 3600 * time.Second,
|
|
|
+//
|
|
|
+// MaxWaiting: 2048,
|
|
|
+// MaxAckPending: 32,
|
|
|
+// }
|
|
|
+// c, err = j.Jctx.AddConsumer("jydef", cfg)
|
|
|
+//
|
|
|
+// }
|
|
|
+// return c, err
|
|
|
+//}
|
|
|
+//
|
|
|
+//// 发布消息
|
|
|
+//func (j *Jnatsjs) Pub(sub string, msg any) (nats.PubAckFuture, error) {
|
|
|
+//START:
|
|
|
+// na, err := j.Jctx.PublishAsync(sub, g.NewVar(msg).Bytes())
|
|
|
+// if err != nil { //nats: connection closed
|
|
|
+// g.Log().Error(context.Background(), err)
|
|
|
+// time.Sleep(220 * time.Millisecond)
|
|
|
+// j.ReConnect()
|
|
|
+// goto START
|
|
|
+// }
|
|
|
+// return na, err
|
|
|
+//
|
|
|
+//}
|
|
|
+//
|
|
|
+//// 发布压缩消息
|
|
|
+//func (j *Jnatsjs) PubZip(sub string, msg any) (nats.PubAckFuture, error) {
|
|
|
+//START:
|
|
|
+// res, err := gcompress.Zlib(g.NewVar(msg).Bytes())
|
|
|
+// if err != nil {
|
|
|
+// return nil, err
|
|
|
+// } else {
|
|
|
+// na, err := j.Jctx.PublishAsync(sub, res)
|
|
|
+// if err != nil { //nats: connection closed
|
|
|
+// g.Log().Error(context.Background(), err)
|
|
|
+// time.Sleep(220 * time.Millisecond)
|
|
|
+// j.ReConnect()
|
|
|
+// goto START
|
|
|
+// }
|
|
|
+// return na, err
|
|
|
+// }
|
|
|
+//}
|
|
|
+//
|
|
|
+//// 直接消费消息,需要手动ack
|
|
|
+//func (j *Jnatsjs) Sub(sub, dur string, handle func(msg *nats.Msg)) {
|
|
|
+// ctx := context.Background()
|
|
|
+//START:
|
|
|
+// sub1, err1 := j.Jctx.PullSubscribe(sub, dur) //, nats.PullMaxWaiting(128)),配置为2048
|
|
|
+// if err1 != nil {
|
|
|
+// g.Log().Error(ctx, err1)
|
|
|
+// }
|
|
|
+// for {
|
|
|
+// ctx1, _ := context.WithTimeout(context.Background(), 1*time.Second)
|
|
|
+// msgs, err := sub1.Fetch(64, nats.Context(ctx1))
|
|
|
+// if err == nil {
|
|
|
+// for _, v := range msgs {
|
|
|
+// handle(v)
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// if err != context.DeadlineExceeded { //服务出问题,invalid subscription
|
|
|
+// g.Log().Error(ctx, err)
|
|
|
+// time.Sleep(200 * time.Millisecond)
|
|
|
+// j.ReConnect()
|
|
|
+// goto START
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// // START:
|
|
|
+// // _, err := j.Jctx.Subscribe(sub, handle, nats.ManualAck(), nats.Durable(dur))
|
|
|
+// // if err != nil {
|
|
|
+// // g.Log().Error(context.Background(), err)
|
|
|
+// // time.Sleep(200 * time.Millisecond)
|
|
|
+// // j.ReConnect()
|
|
|
+// // goto START
|
|
|
+// // }
|
|
|
+//
|
|
|
+//}
|
|
|
+//
|
|
|
+//// 消费压缩消息,需要手动ack
|
|
|
+//func (j *Jnatsjs) SubZip(sub, dur string, handle func(msg *nats.Msg)) {
|
|
|
+// ctx := context.Background()
|
|
|
+//START:
|
|
|
+// sub1, err1 := j.Jctx.PullSubscribe(sub, dur) //, nats.PullMaxWaiting(128)),配置为2048
|
|
|
+// if err1 != nil {
|
|
|
+// g.Log().Error(ctx, err1)
|
|
|
+// }
|
|
|
+// for {
|
|
|
+// ctx1, _ := context.WithTimeout(context.Background(), 1*time.Second)
|
|
|
+// msgs, err := sub1.Fetch(64, nats.Context(ctx1))
|
|
|
+// if err == nil {
|
|
|
+// for _, v := range msgs {
|
|
|
+// res, err := gcompress.UnZlib(v.Data)
|
|
|
+// if err != nil {
|
|
|
+// g.Log().Error(context.Background(), "NATS gcompress error: %v", err, v)
|
|
|
+// } else {
|
|
|
+// v.Data = res
|
|
|
+// }
|
|
|
+// handle(v)
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// if err != context.DeadlineExceeded { //服务出问题,invalid subscription
|
|
|
+// g.Log().Error(ctx, err)
|
|
|
+// time.Sleep(200 * time.Millisecond)
|
|
|
+// j.ReConnect()
|
|
|
+// goto START
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//}
|