wangchuanjin 3 年之前
父節點
當前提交
be255f451b
共有 7 個文件被更改,包括 143 次插入191 次删除
  1. 1 0
      go.mod
  2. 18 30
      handler/handler.go
  3. 87 0
      handler/points.go
  4. 33 0
      handler/push.go
  5. 3 23
      main.go
  6. 0 137
      points/points.go
  7. 1 1
      test/main_test.go

+ 1 - 0
go.mod

@@ -7,4 +7,5 @@ require (
 	app.yhyue.com/moapp/jybase v0.0.0-20210322021809-141cc2c37946
 	github.com/nsqio/go-nsq v1.0.8
 	github.com/tal-tech/go-zero v1.1.5
+	google.golang.org/grpc v1.29.1
 )

+ 18 - 30
push/push.go → handler/handler.go

@@ -1,4 +1,4 @@
-package push
+package handler
 
 import (
 	"encoding/json"
@@ -9,19 +9,28 @@ import (
 	"app.yhyue.com/moapp/jybase/go-logger/logger"
 	. "app.yhyue.com/moapp/message/db"
 	"app.yhyue.com/moapp/message/model"
-	"app.yhyue.com/moapp/message/util"
 	"github.com/nsqio/go-nsq"
 )
 
-type Push struct {
-}
+var (
+	funcMap = map[string]func(msg *model.Message){
+		"jyapp_wx_register":      VarPoints.Jy_user_new,
+		"jyapp_phone_register":   VarPoints.Jy_user_new,
+		"jypc_phone_register":    VarPoints.Jy_user_new,
+		"jywx_subscribe_new":     VarPoints.Jy_user_new,
+		"jywx_subscribe_invite":  VarPoints.Jywx_subscribe_invite,
+		"jywx_subscribe_invited": VarPoints.Jywx_subscribe_invited,
+		"jydocs_doc_open":        VarPoints.Jydocs_doc_open,
+		"jyweb_article_open":     VarPoints.Jyweb_article_open,
+		"jywx_activity_message":  VarPush.Jywx_activity_message,
+	}
+)
 
-var funcMap = map[string]func(msg *model.Message){
-	"jywx_activity_message": jywx_activity_message,
+type Handler struct {
 }
 
-//处理文库积分
-func (p *Push) HandleMessage(m *nsq.Message) error {
+//
+func (h *Handler) HandleMessage(m *nsq.Message) error {
 	defer common.Catch()
 	if len(m.Body) == 0 {
 		// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
@@ -46,6 +55,7 @@ func (p *Push) HandleMessage(m *nsq.Message) error {
 		f(msg)
 	} else {
 		logger.Info("无效的code值", msg)
+		return nil
 	}
 	Mgo_Log.Save("nsq_logs", map[string]interface{}{
 		"createtime": time.Now().Unix(),
@@ -55,25 +65,3 @@ func (p *Push) HandleMessage(m *nsq.Message) error {
 	// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
 	return nil
 }
-
-func jywx_activity_message(msg *model.Message) {
-	if len(msg.E_body) == 0 {
-		return
-	}
-	util.StartPush(msg.E_body)
-	/*
-			phone := qu.ObjToString(mess["phone"])
-		product_type := qu.ObjToString(mess["product_type"])
-		order_code := qu.ObjToString(mess["order_code"])
-		buy_time := qu.ObjToString(mess["buy_time"])
-		end_time := qu.ObjToString(mess["end_time"])
-	*/
-	// util.StartPush("", map[string]interface{}{
-	// 	"phone":        "18624906090",
-	// 	"product_type": "超级订阅一个月",
-	// 	"order_code":   "1234567890",
-	// 	"buy_time":     "2021-10-19 22:22:22",
-	// 	"end_time":     "2021-11-19 23:59:59",
-	// })
-	// return
-}

+ 87 - 0
handler/points.go

@@ -0,0 +1,87 @@
+package handler
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"app.yhyue.com/moapp/jyPoints/rpc/integral"
+	"app.yhyue.com/moapp/jyPoints/rpc/integralclient"
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/go-logger/logger"
+	"app.yhyue.com/moapp/jybase/redis"
+	. "app.yhyue.com/moapp/message/client"
+	. "app.yhyue.com/moapp/message/config"
+	"app.yhyue.com/moapp/message/model"
+)
+
+const (
+	Redis_Main = "main"
+)
+
+var VarPoints = &Points{}
+
+type Points struct {
+}
+
+//产生新用户
+func (p *Points) Jy_user_new(msg *model.Message) {
+	p.integralHarvest(msg, Config.DocPoints.Jywx_subscribe_new, 1002)
+}
+
+//邀请人
+func (p *Points) Jywx_subscribe_invite(msg *model.Message) {
+	p.integralHarvest(msg, Config.DocPoints.Jywx_subscribe_invite, 1003)
+}
+
+//被邀请人
+func (p *Points) Jywx_subscribe_invited(msg *model.Message) {
+	p.integralHarvest(msg, Config.DocPoints.Jywx_subscribe_invited, 1002)
+}
+
+//打开文库文章页
+func (p *Points) Jydocs_doc_open(msg *model.Message) {
+	p.Jy_open(msg, Config.DocPoints.Open.Jydocs_doc_open)
+}
+
+//打开招投标信息文章页
+func (p *Points) Jyweb_article_open(msg *model.Message) {
+	p.Jy_open(msg, Config.DocPoints.Open.Jyweb_article_open)
+}
+
+//
+func (p *Points) Jy_open(msg *model.Message, point int64) {
+	key := fmt.Sprintf("jypoints_share_article_open_%s", msg.E_userId)
+	if redis.Incr(Redis_Main, key) <= Config.DocPoints.Open.Max {
+		now := time.Now()
+		redis.SetExpire(Redis_Main, key, int(time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.Local).Unix()-now.Unix()))
+		p.integralHarvest(msg, point, 1005)
+	} else {
+		logger.Info(fmt.Sprintf("%+v", msg), "超过一天最大次数", Config.DocPoints.Open.Max, "限制,不再增加积分")
+	}
+}
+
+//增加积分
+func (p *Points) integralHarvest(msg *model.Message, point, pointType int64) bool {
+	t := time.Unix(msg.E_time, 0).AddDate(1, 0, 0)
+	req := &integral.Req{
+		UserId:        msg.E_userId,
+		AppId:         Config.DocPoints.AppId,
+		PointType:     pointType,
+		Point:         point,
+		EndDate:       date.FormatDate(&t, date.Date_Full_Layout),
+		OperationType: false,
+	}
+	resp, err := integralclient.NewIntegral(ZrpcClient).IntegralHarvest(context.Background(), req)
+	if err != nil {
+		logger.Info(fmt.Sprintf("%+v", msg), "IntegralHarvest Resp error", err)
+		return false
+	}
+	if resp.Code == 1 {
+		logger.Info(fmt.Sprintf("%+v", msg), "已成功增加", point, "积分")
+		return true
+	} else {
+		logger.Info(fmt.Sprintf("%+v", msg), "增加", point, "积分失败", "Code", resp.Code, "Message", resp.Message)
+		return false
+	}
+}

+ 33 - 0
handler/push.go

@@ -0,0 +1,33 @@
+package handler
+
+import (
+	"app.yhyue.com/moapp/message/model"
+	"app.yhyue.com/moapp/message/util"
+)
+
+var VarPush = &Push{}
+
+type Push struct {
+}
+
+func (p *Push) Jywx_activity_message(msg *model.Message) {
+	if len(msg.E_body) == 0 {
+		return
+	}
+	util.StartPush(msg.E_body)
+	/*
+			phone := qu.ObjToString(mess["phone"])
+		product_type := qu.ObjToString(mess["product_type"])
+		order_code := qu.ObjToString(mess["order_code"])
+		buy_time := qu.ObjToString(mess["buy_time"])
+		end_time := qu.ObjToString(mess["end_time"])
+	*/
+	// util.StartPush("", map[string]interface{}{
+	// 	"phone":        "18624906090",
+	// 	"product_type": "超级订阅一个月",
+	// 	"order_code":   "1234567890",
+	// 	"buy_time":     "2021-10-19 22:22:22",
+	// 	"end_time":     "2021-11-19 23:59:59",
+	// })
+	// return
+}

+ 3 - 23
main.go

@@ -5,8 +5,7 @@ import (
 
 	"app.yhyue.com/moapp/jybase/go-logger/logger"
 	. "app.yhyue.com/moapp/message/config"
-	"app.yhyue.com/moapp/message/points"
-	"app.yhyue.com/moapp/message/push"
+	"app.yhyue.com/moapp/message/handler"
 	"github.com/nsqio/go-nsq"
 )
 
@@ -19,38 +18,19 @@ func init() {
 func main() {
 	// Instantiate a consumer that will subscribe to the provided channel.
 	config := nsq.NewConfig()
-	consumer, err := nsq.NewConsumer("jy_event", "points", config)
+	consumer, err := nsq.NewConsumer("jy_event", "event", config)
 	if err != nil {
 		log.Fatal(err)
 	}
-
 	// Set the Handler for messages received by this Consumer. Can be called multiple times.
 	// See also AddConcurrentHandlers.
-	consumer.AddHandler(&points.Points{})
-
+	consumer.AddHandler(&handler.Handler{})
 	// Use nsqlookupd to discover nsqd instances.
 	// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
 	err = consumer.ConnectToNSQLookupd(Config.Nsq.Address)
 	if err != nil {
 		log.Fatal(err)
 	}
-
-	//push
-	config_push := nsq.NewConfig()
-	consumer_push, err := nsq.NewConsumer("jy_event", "push", config_push)
-	if err != nil {
-		log.Fatal(err)
-	}
-
-	consumer_push.AddHandler(&push.Push{})
-
-	// Use nsqlookupd to discover nsqd instances.
-	// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
-	err = consumer_push.ConnectToNSQLookupd(Config.Nsq.Address)
-	if err != nil {
-		log.Fatal(err)
-	}
-
 	// Gracefully stop the consumer.
 	//consumer.Stop()
 	select {}

+ 0 - 137
points/points.go

@@ -1,137 +0,0 @@
-package points
-
-import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"time"
-
-	"app.yhyue.com/moapp/jyPoints/rpc/integral"
-	"app.yhyue.com/moapp/jyPoints/rpc/integralclient"
-	"app.yhyue.com/moapp/jybase/common"
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/go-logger/logger"
-	"app.yhyue.com/moapp/jybase/redis"
-	. "app.yhyue.com/moapp/message/client"
-	. "app.yhyue.com/moapp/message/config"
-	. "app.yhyue.com/moapp/message/db"
-	"app.yhyue.com/moapp/message/model"
-	"github.com/nsqio/go-nsq"
-)
-
-const (
-	Redis_Main = "main"
-)
-
-type Points struct {
-}
-
-var funcMap = map[string]func(msg *model.Message){
-	"jyapp_wx_register":      jy_user_new,
-	"jyapp_phone_register":   jy_user_new,
-	"jypc_phone_register":    jy_user_new,
-	"jywx_subscribe_new":     jy_user_new,
-	"jywx_subscribe_invite":  jywx_subscribe_invite,
-	"jywx_subscribe_invited": jywx_subscribe_invited,
-	"jydocs_doc_open":        jydocs_doc_open,
-	"jyweb_article_open":     jyweb_article_open,
-}
-
-//处理文库积分
-func (p *Points) HandleMessage(m *nsq.Message) error {
-	defer common.Catch()
-	if len(m.Body) == 0 {
-		// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
-		return nil
-	}
-	var msg *model.Message
-	err := json.Unmarshal(m.Body, &msg)
-	if err != nil {
-		logger.Error(err)
-		return err
-	}
-	logger.Info("接收到消息", fmt.Sprintf("%+v", msg))
-	if msg.E_code == "" {
-		logger.Info("缺少参数e_code", fmt.Sprintf("%+v", msg))
-		return nil
-	} else if msg.E_time == 0 {
-		logger.Info("缺少参数e_time", fmt.Sprintf("%+v", msg))
-		return nil
-	}
-	f, f_ok := funcMap[msg.E_code]
-	if f_ok {
-		f(msg)
-	} else {
-		logger.Info("无效的code值", msg)
-		return nil
-	}
-	Mgo_Log.Save("nsq_logs", map[string]interface{}{
-		"createtime": time.Now().Unix(),
-		"body":       msg,
-		"type":       "consumer", //producer or consumer
-	})
-	// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
-	return nil
-}
-
-//产生新用户
-func jy_user_new(msg *model.Message) {
-	integralHarvest(msg, Config.DocPoints.Jywx_subscribe_new, 1002)
-}
-
-//邀请人
-func jywx_subscribe_invite(msg *model.Message) {
-	integralHarvest(msg, Config.DocPoints.Jywx_subscribe_invite, 1003)
-}
-
-//被邀请人
-func jywx_subscribe_invited(msg *model.Message) {
-	integralHarvest(msg, Config.DocPoints.Jywx_subscribe_invited, 1002)
-}
-
-//打开文库文章页
-func jydocs_doc_open(msg *model.Message) {
-	jy_open(msg, Config.DocPoints.Open.Jydocs_doc_open)
-}
-
-//打开招投标信息文章页
-func jyweb_article_open(msg *model.Message) {
-	jy_open(msg, Config.DocPoints.Open.Jyweb_article_open)
-}
-
-//
-func jy_open(msg *model.Message, point int64) {
-	key := fmt.Sprintf("jypoints_share_article_open_%s", msg.E_userId)
-	if redis.Incr(Redis_Main, key) <= Config.DocPoints.Open.Max {
-		now := time.Now()
-		redis.SetExpire(Redis_Main, key, int(time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.Local).Unix()-now.Unix()))
-		integralHarvest(msg, point, 1005)
-	} else {
-		logger.Info(fmt.Sprintf("%+v", msg), "超过一天最大次数", Config.DocPoints.Open.Max, "限制,不再增加积分")
-	}
-}
-
-//增加积分
-func integralHarvest(msg *model.Message, point, pointType int64) bool {
-	t := time.Unix(msg.E_time, 0).AddDate(1, 0, 0)
-	req := &integral.Req{
-		UserId:        msg.E_userId,
-		AppId:         Config.DocPoints.AppId,
-		PointType:     pointType,
-		Point:         point,
-		EndDate:       date.FormatDate(&t, date.Date_Full_Layout),
-		OperationType: false,
-	}
-	resp, err := integralclient.NewIntegral(ZrpcClient).IntegralHarvest(context.Background(), req)
-	if err != nil {
-		logger.Info(fmt.Sprintf("%+v", msg), "IntegralHarvest Resp error", err)
-		return false
-	}
-	if resp.Code == 1 {
-		logger.Info(fmt.Sprintf("%+v", msg), "已成功增加", point, "积分")
-		return true
-	} else {
-		logger.Info(fmt.Sprintf("%+v", msg), "增加", point, "积分失败", "Code", resp.Code, "Message", resp.Message)
-		return false
-	}
-}

+ 1 - 1
test/main_test.go

@@ -13,7 +13,7 @@ import (
 
 func Test_Producer(t *testing.T) {
 	Mgo_Log := &mongodb.MongodbSim{
-		MongodbAddr: "192.168.3.128:27090",
+		MongodbAddr: "192.168.3.206:27090",
 		Size:        5,
 		DbName:      "qfw",
 		UserName:    "admin",