xuzhiheng 2 жил өмнө
parent
commit
da19f12af4

+ 31 - 0
entity/entity.go

@@ -3,10 +3,13 @@ package entity
 import (
 	"log"
 
+	"encoding/json"
+
 	util "app.yhyue.com/moapp/jybase/common"
 	elastic "app.yhyue.com/moapp/jybase/esv1"
 	"app.yhyue.com/moapp/jybase/mongodb"
 	"app.yhyue.com/moapp/jybase/mysql"
+	"github.com/nsqio/go-nsq"
 	"github.com/zeromicro/go-zero/core/logx"
 )
 
@@ -21,6 +24,9 @@ var (
 	AreaCode   = map[string]string{}
 )
 
+type Handler struct {
+}
+
 func InitMysql(n, x, y, z, s *mysql.Mysql) {
 	JyMysql = &mysql.Mysql{
 		Address:      n.Address,
@@ -99,3 +105,28 @@ func InitArea() {
 	}
 	log.Println("AreaCodeLen ", len(AreaCode))
 }
+
+func (h *Handler) HandleMessage(m *nsq.Message) error {
+	defer util.Catch()
+	if len(m.Body) == 0 {
+		// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
+		return nil
+	}
+	var msg *map[string]interface{}
+	err := json.Unmarshal(m.Body, &msg)
+	if err != nil {
+		log.Println(err)
+		return err
+	}
+	entPostionId := util.Int64All((*msg)["entPostionId"])
+	userPositionId := util.Int64All((*msg)["userPositionId"])
+	entId := util.Int64All((*msg)["entId"])
+	log.Println(userPositionId, entPostionId, entId)
+	ok := JyBiMysql.Update("dwd_f_report_data_baseinfo", map[string]interface{}{"position_id": userPositionId}, map[string]interface{}{"position_id": entPostionId, "ent_id": entId, "position_type": 1})
+	if ok {
+		log.Println("个人销售数据同步企业数据成功!!!")
+	} else {
+		log.Println("个人销售数据同步企业数据失败-----")
+	}
+	return nil
+}

+ 1 - 0
go.mod

@@ -6,6 +6,7 @@ require (
 	app.yhyue.com/moapp/jybase v0.0.0-20221010080805-39dc6a853eff
 	bp.jydev.jianyu360.cn/BaseService/gateway v1.3.4
 	github.com/golang/protobuf v1.5.2
+	github.com/nsqio/go-nsq v1.1.0
 	github.com/zeromicro/go-zero v1.4.2
 	google.golang.org/grpc v1.51.0
 	google.golang.org/protobuf v1.28.1

+ 1 - 0
go.sum

@@ -431,6 +431,7 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
 github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
 github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
 github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
 github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
 github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=

+ 13 - 0
rpc/biservice.go

@@ -12,6 +12,7 @@ import (
 	"bp.jydev.jianyu360.cn/BaseService/biService/rpc/internal/server"
 	"bp.jydev.jianyu360.cn/BaseService/biService/rpc/internal/svc"
 	"bp.jydev.jianyu360.cn/BaseService/biService/rpc/pb"
+	"github.com/nsqio/go-nsq"
 	"github.com/zeromicro/go-zero/core/conf"
 	"github.com/zeromicro/go-zero/zrpc"
 	"google.golang.org/grpc"
@@ -30,6 +31,18 @@ func main() {
 	entity.InitMongo(c.Mongo.Qfw.MongodbAddr, c.Mongo.Qfw.DbName, c.Mongo.Qfw.Size)
 	entity.InitEs(c.Es.Address, c.Es.DbSize)
 	entity.InitArea()
+	//nsq
+	config := nsq.NewConfig()
+	consumer, err := nsq.NewConsumer(c.TopicName, "position_sync", config)
+	if err != nil {
+		fmt.Println(err)
+	}
+	consumer.AddHandler(&entity.Handler{})
+	err = consumer.ConnectToNSQLookupd(c.NsqUrl)
+	if err != nil {
+		fmt.Println(err)
+	}
+	//
 	s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
 		pb.RegisterBiServiceServer(grpcServer, srv)
 	})

+ 2 - 0
rpc/etc/biservice.yaml

@@ -57,3 +57,5 @@ Logx:
   Level: info #info|error|severe
   KeepDays: 100
 AddCountLimit: 500
+TopicName: jy_position_sync
+NsqUrl: 192.168.3.240:4161

+ 2 - 0
rpc/internal/config/config.go

@@ -32,4 +32,6 @@ type Config struct {
 	}
 	Mode          string
 	AddCountLimit int
+	TopicName     string
+	NsqUrl        string
 }