wkyuer 4 месяцев назад
Родитель
Сommit
5f0dd493c2
5 измененных файлов с 128 добавлено и 76 удалено
  1. 2 2
      common/mg.go
  2. 7 2
      userSign/config.yaml
  3. 15 2
      wxSign/config.yaml
  4. 35 18
      wxSign/main.go
  5. 69 52
      wxSign/wxSignGroup/group.go

+ 2 - 2
common/mg.go

@@ -1,11 +1,11 @@
 package common
 
 import (
-	//"app.yhyue.com/moapp/jybase/mongodb"
+	"app.yhyue.com/moapp/jybase/mongodb"
 	"context"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/util/gconv"
-	"workTasks/common/mongodb"
+	//"workTasks/common/mongodb"
 )
 
 type (

+ 7 - 2
userSign/config.yaml

@@ -1,6 +1,7 @@
 database:
   default:
-    link: "clickhouse:jytop:pwdTopJy123@tcp(192.168.3.207:19000)/pub_tags?dial_timeout=2000ms&max_execution_time=60"
+    #link: "clickhouse:jytop:pwdTopJy123@tcp(192.168.3.207:19000)/pub_tags?dial_timeout=2000ms&max_execution_time=60"
+    link: "clickhouse:jianyu_appl:Cli3#fkh4ouSe@tcp(127.0.0.1:9420)/pub_tags?dial_timeout=2000ms&max_execution_time=60"
     #debug: true
   jianyu:
     link: "mysql:root:=PDT49#80Z!RVv52_z@tcp(192.168.3.14:4000)/jianyu"
@@ -16,9 +17,13 @@ mongodb:
     replSet: ""
     userName: ""
     password: ""
+#  default:
+#    address: "127.0.0.1:27080"
+#    size: 10
+#    dbName: "qfw"
 
 runCron:
-  "# 40 14 * * *"
+  "# 07 9 * * *"
 
 testUserPhone:
   - 15225181827

+ 15 - 2
wxSign/config.yaml

@@ -23,5 +23,18 @@ redis:
   default: # 配置seo的redis
     address: 192.168.3.149:1713
 
-wxAppId: "wxd6f3e855ef4258cd"
-subDaySign: [1,3,7,9]
+
+# 获取微信token rpc
+wxTokenRpc: "172.17.162.29:1166"
+
+groupConfig:
+  default: #默认剑鱼
+    wxAppId: "wxd6f3e855ef4258cd"
+    subDaySign: [ 1,3,7,9 ]
+  wyztb:
+    wxAppid: "wx701ee9f9bd87b208"
+    subDaySign: [ 1,3,7,9 ]
+#    "appid": "wx701ee9f9bd87b208",
+#    "appsecret": "2007a26591ca3cb93a417adffc608927",
+#    "desc": "微信公众号-物业招投标"
+

+ 35 - 18
wxSign/main.go

@@ -6,6 +6,7 @@ import (
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/gctx"
+	"github.com/gogf/gf/v2/util/gconv"
 	"go.mongodb.org/mongo-driver/bson"
 	"time"
 	"workTasks/common"
@@ -22,22 +23,32 @@ type (
 func main() {
 	ctx := gctx.New()
 	_, err := gcron.Add(gctx.New(), g.Cfg().MustGet(ctx, "runCron", "# 0 2 * * *").String(), func(ctx context.Context) {
-		//ctx := context.Background()
-		nwsm, err := wxSignGroup.NewWxSignManager()
-		if err != nil {
-			panic(err)
-		}
-		nwsm.LoadGroupUser(ctx)
-		nwsm.ClearGroupUser(ctx)
-
-		//同步最新标签
-		for i, arr := range logNewSign() {
-			if err := nwsm.AddNewUsers(i, arr...); err != nil {
+		for code, mConfig := range g.Cfg().MustGet(ctx, "groupConfig").Map() {
+			var (
+				mConfig = gconv.Map(mConfig)
+				appid   = gconv.String(mConfig["wxAppId"])
+				days    = gconv.Ints(mConfig["subDaySign"])
+			)
+			if appid == "" {
+				g.Log().Errorf(ctx, "获取%s 微信appId异常", code)
+				continue
+			}
+			nwsm, err := wxSignGroup.NewWxSignManager(appid)
+			if err != nil {
 				panic(err)
 			}
+			nwsm.LoadGroupUser(ctx, appid)
+			nwsm.ClearGroupUser(ctx, appid)
+
+			//同步最新标签
+			for i, arr := range logNewSign(code, days) {
+				if err := nwsm.AddNewUsers(i, arr...); err != nil {
+					panic(err)
+				}
+			}
+			nwsm.UpdateNewGroupUser(ctx, appid)
+			nwsm.LoadGroupUser(ctx, appid)
 		}
-		nwsm.UpdateNewGroupUser(ctx)
-		nwsm.LoadGroupUser(ctx)
 	}, "userWxSign")
 	if err != nil {
 		panic(err)
@@ -45,7 +56,7 @@ func main() {
 	select {}
 }
 
-func logNewSign() map[int][]string {
+func logNewSign(code string, days []int) map[int][]string {
 	var (
 		dayMaps      = map[int]bool{}
 		signList     = map[int][]string{}
@@ -53,7 +64,7 @@ func logNewSign() map[int][]string {
 		signData     = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Unix()
 		maxDay   int = 0
 	)
-	for _, dayNum := range g.Cfg().MustGet(context.Background(), "subDaySign").Ints() {
+	for _, dayNum := range days {
 		if dayNum > maxDay {
 			maxDay = dayNum
 		}
@@ -61,7 +72,7 @@ func logNewSign() map[int][]string {
 	}
 	g.Dump(dayMaps)
 
-	subActionList, err := GetUserSubList(-maxDay)
+	subActionList, err := GetUserSubList(code, -maxDay)
 	if err != nil {
 		panic(err)
 	}
@@ -76,7 +87,7 @@ func logNewSign() map[int][]string {
 	return signList
 }
 
-func GetUserSubList(day int) ([]*UserSubAction, error) {
+func GetUserSubList(code string, day int) ([]*UserSubAction, error) {
 	//获取30天时间戳
 	now := time.Now()
 	endTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
@@ -87,10 +98,16 @@ func GetUserSubList(day int) ([]*UserSubAction, error) {
 	client := common.MG.DB().C
 	collection := client.Database("qfw").Collection("jy_subscribe")
 	// 创建聚合管道
+	var codeLimit interface{}
+	if code != "default" {
+		codeLimit = code
+	} else {
+		codeLimit = bson.M{"$exists": 0}
+	}
 	pipeline := []bson.M{
 		{
 			// 第一个阶段:筛选符合条件的文档
-			"$match": bson.M{"l_date": bson.M{"$gt": startTime.Unix(), "$lt": endTime.Unix()}, "s_event": "subscribe", "s_code": bson.M{"$exists": 0}},
+			"$match": bson.M{"l_date": bson.M{"$gt": startTime.Unix(), "$lt": endTime.Unix()}, "s_event": "subscribe", "s_code": codeLimit},
 		},
 		{
 			"$group": bson.M{

+ 69 - 52
wxSign/wxSignGroup/group.go

@@ -5,8 +5,8 @@ import (
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/util/gconv"
+	"net/rpc"
 	"regexp"
-	"time"
 )
 
 type (
@@ -30,7 +30,7 @@ type WxSignGroup struct {
 	data map[int]*GroupFull
 }
 
-func NewWxSignManager() (*WxSignGroup, error) {
+func NewWxSignManager(appId string) (*WxSignGroup, error) {
 	//加载所有分组
 	var (
 		wsg  = &WxSignGroup{}
@@ -40,7 +40,7 @@ func NewWxSignManager() (*WxSignGroup, error) {
 		re = regexp.MustCompile(`^关注第(\d+)天$`)
 	)
 	//加载分组信息
-	list, err := wsg.allGroup(ctx)
+	list, err := wsg.allGroup(ctx, appId)
 	if err != nil {
 		return nil, err
 	}
@@ -64,7 +64,7 @@ func NewWxSignManager() (*WxSignGroup, error) {
 		}
 		if !isExists {
 			newGroupName := fmt.Sprintf("关注第%d天", day)
-			newGroup, err := wsg.greatGroup(ctx, newGroupName)
+			newGroup, err := wsg.greatGroup(ctx, appId, newGroupName)
 			if err != nil {
 				g.Log().Errorf(ctx, "创建%s异常 %v", newGroupName, err)
 				continue
@@ -80,25 +80,46 @@ func NewWxSignManager() (*WxSignGroup, error) {
 	return wsg, nil
 }
 
-func (wsg *WxSignGroup) getToken() string {
-	for {
-		ctx := context.Background()
-		gv, err := g.Redis().Get(ctx, fmt.Sprintf("WxToken_%s", g.Cfg().MustGet(ctx, "wxAppId").String()))
-		if err != nil {
-			g.Log().Error(ctx, "获取accessToken异常", err)
-			continue
-		}
-		var accessToken AccessToken
-		if err := gv.Struct(&accessToken); err != nil {
-			g.Log().Error(ctx, "获取反序列化异常", err)
-			continue
-		}
-		if time.Now().Unix() > accessToken.ExpiresIn {
-			g.Log().Error(ctx, "token已过期", err)
-			continue
-		}
-		return accessToken.AccessToken
+//func (wsg *WxSignGroup) getToken(appId string) string {
+//	for {
+//		ctx := context.Background()
+//		gv, err := g.Redis().Get(ctx, fmt.Sprintf("WxToken_%s", g.Cfg().MustGet(ctx, "wxAppId").String()))
+//		if err != nil {
+//			g.Log().Error(ctx, "获取accessToken异常", err)
+//			continue
+//		}
+//		var accessToken AccessToken
+//		if err := gv.Struct(&accessToken); err != nil {
+//			g.Log().Error(ctx, "获取反序列化异常", err)
+//			continue
+//		}
+//		if time.Now().Unix() > accessToken.ExpiresIn {
+//			g.Log().Error(ctx, "token已过期", err)
+//			continue
+//		}
+//		return accessToken.AccessToken
+//	}
+//}
+
+func (wsg *WxSignGroup) GetWxAccessToken(ctx context.Context, code string) string {
+	var repl string
+	client, err := rpc.DialHTTP("tcp", g.Config().MustGet(ctx, "wxTokenRpc").String())
+	if err != nil {
+		g.Log().Errorf(ctx, code, err)
+		return repl
 	}
+	defer client.Close()
+	err = client.Call("WxTokenRpc.GetAccessToken", code, &repl)
+	if err != nil {
+		g.Log().Errorf(ctx, code, err)
+		return repl
+	}
+	if repl == "" {
+		g.Log().Errorf(ctx, "未获取到accessToken")
+	} else {
+		g.Log().Errorf(ctx, "获取到accessToken %s", repl)
+	}
+	return repl
 }
 
 func (wsg *WxSignGroup) AddNewUsers(dayNum int, openids ...string) error {
@@ -111,17 +132,13 @@ func (wsg *WxSignGroup) AddNewUsers(dayNum int, openids ...string) error {
 }
 
 // LoadGroupUser 加载标签下的用户
-func (wsg *WxSignGroup) LoadGroupUser(ctx context.Context) {
-	//a, b, c := wsg.getGroupUsers(ctx, 0, "")
-	//fmt.Println(a, b, c)
-	//return
-
+func (wsg *WxSignGroup) LoadGroupUser(ctx context.Context, appId string) {
 	//加载标签对应用户
 	for _, group := range wsg.data {
 		var nextOpenid string
 		group.Old = []string{}
 		for {
-			userIds, nextId, err := wsg.getGroupUsers(ctx, group.Sign.Id, nextOpenid)
+			userIds, nextId, err := wsg.getGroupUsers(ctx, appId, group.Sign.Id, nextOpenid)
 			if err != nil {
 				g.Log().Errorf(ctx, "加载%s用户异常", group.Sign.Name)
 				break
@@ -138,39 +155,39 @@ func (wsg *WxSignGroup) LoadGroupUser(ctx context.Context) {
 }
 
 // ClearGroupUser 清除标签下的用户
-func (wsg *WxSignGroup) ClearGroupUser(ctx context.Context) {
+func (wsg *WxSignGroup) ClearGroupUser(ctx context.Context, appId string) {
 	for _, group := range wsg.data {
 		for _, openids := range chunkArray(group.Old, 50) {
-			if err := wsg.groupDelUser(ctx, group.Sign.Id, openids...); err != nil {
-				g.Log().Errorf(ctx, "清除 %s标签下的用户异常 %v", group.Sign.Name, err)
+			if err := wsg.groupDelUser(ctx, appId, group.Sign.Id, openids...); err != nil {
+				g.Log().Errorf(ctx, "%s 清除 %s标签下的用户异常 %v", appId, group.Sign.Name, err)
 			}
 		}
 	}
 }
 
 // UpdateNewGroupUser 更新最新的标签用户
-func (wsg *WxSignGroup) UpdateNewGroupUser(ctx context.Context) {
+func (wsg *WxSignGroup) UpdateNewGroupUser(ctx context.Context, appId string) {
 	for _, group := range wsg.data {
 		for _, openids := range chunkArray(group.New, 50) {
-			if err := wsg.groupAddUser(ctx, group.Sign.Id, openids...); err != nil {
-				g.Log().Errorf(ctx, "%s标签添加用户异常 %v", group.Sign.Name, err)
+			if err := wsg.groupAddUser(ctx, appId, group.Sign.Id, openids...); err != nil {
+				g.Log().Errorf(ctx, "%s %s标签添加用户异常 %v", appId, group.Sign.Name, err)
 			}
 		}
 	}
 }
 
-func (wsg *WxSignGroup) DelGroup(ctx context.Context, ids ...int) {
+func (wsg *WxSignGroup) DelGroup(ctx context.Context, appId string, ids ...int) {
 	for _, groupId := range ids {
-		if err := wsg.delGroup(ctx, gconv.Int64(groupId)); err != nil {
-			g.Log().Errorf(ctx, "删除群组异常 %v", err)
+		if err := wsg.delGroup(ctx, appId, gconv.Int64(groupId)); err != nil {
+			g.Log().Errorf(ctx, "%s 删除群组异常 %v", appId, err)
 		}
 	}
 }
 
 // greatGroup 创建分组
-func (wsg *WxSignGroup) greatGroup(ctx context.Context, name string) (*SignGroup, error) {
+func (wsg *WxSignGroup) greatGroup(ctx context.Context, appId, name string) (*SignGroup, error) {
 	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
-		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/create?access_token=%s", wsg.getToken()), g.Map{
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/create?access_token=%s", wsg.GetWxAccessToken(ctx, appId)), g.Map{
 			"tag": g.Map{"name": name},
 		})
 	if err != nil {
@@ -191,9 +208,9 @@ func (wsg *WxSignGroup) greatGroup(ctx context.Context, name string) (*SignGroup
 }
 
 // delGroup 删除分组
-func (wsg *WxSignGroup) delGroup(ctx context.Context, groupId int64) error {
+func (wsg *WxSignGroup) delGroup(ctx context.Context, appId string, groupId int64) error {
 	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
-		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/delete?access_token=%s", wsg.getToken()), g.Map{
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/delete?access_token=%s", wsg.GetWxAccessToken(ctx, appId)), g.Map{
 			"tag": g.Map{"id": groupId},
 		})
 	if err != nil {
@@ -203,13 +220,13 @@ func (wsg *WxSignGroup) delGroup(ctx context.Context, groupId int64) error {
 
 	rMap := gconv.Map(res.ReadAll())
 	if gconv.String(rMap["errmsg"]) != "ok" {
-		return fmt.Errorf("删除分组异常:%s", gconv.String(rMap["errmsg"]))
+		return fmt.Errorf("%s 删除分组异常:%s", appId, gconv.String(rMap["errmsg"]))
 	}
 	return nil
 }
 
 // groupAddUser 分组添加用户
-func (wsg *WxSignGroup) groupAddUser(ctx context.Context, groupId int64, openids ...string) error {
+func (wsg *WxSignGroup) groupAddUser(ctx context.Context, appId string, groupId int64, openids ...string) error {
 	if len(openids) == 0 {
 		return nil
 	}
@@ -217,7 +234,7 @@ func (wsg *WxSignGroup) groupAddUser(ctx context.Context, groupId int64, openids
 		return fmt.Errorf("超出最大限制")
 	}
 	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
-		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/members/batchtagging?access_token=%s", wsg.getToken()), g.Map{
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/members/batchtagging?access_token=%s", wsg.GetWxAccessToken(ctx, appId)), g.Map{
 			"tagid":       groupId,
 			"openid_list": openids,
 		})
@@ -228,13 +245,13 @@ func (wsg *WxSignGroup) groupAddUser(ctx context.Context, groupId int64, openids
 
 	rMap := gconv.Map(res.ReadAll())
 	if gconv.String(rMap["errmsg"]) != "ok" {
-		return fmt.Errorf("添加用户异常:%s", gconv.String(rMap["errmsg"]))
+		return fmt.Errorf("%d 添加用户异常:%s", appId, gconv.String(rMap["errmsg"]))
 	}
 	return nil
 }
 
 // groupDelUser 分组删除用户
-func (wsg *WxSignGroup) groupDelUser(ctx context.Context, groupId int64, openids ...string) error {
+func (wsg *WxSignGroup) groupDelUser(ctx context.Context, appId string, groupId int64, openids ...string) error {
 	if len(openids) == 0 {
 		return nil
 	}
@@ -242,7 +259,7 @@ func (wsg *WxSignGroup) groupDelUser(ctx context.Context, groupId int64, openids
 		return fmt.Errorf("超出最大限制")
 	}
 	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
-		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/members/batchuntagging?access_token=%s", wsg.getToken()), g.Map{
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/members/batchuntagging?access_token=%s", wsg.GetWxAccessToken(ctx, appId)), g.Map{
 			"tagid":       groupId,
 			"openid_list": openids,
 		})
@@ -253,14 +270,14 @@ func (wsg *WxSignGroup) groupDelUser(ctx context.Context, groupId int64, openids
 
 	rMap := gconv.Map(res.ReadAll())
 	if gconv.String(rMap["errmsg"]) != "ok" {
-		return fmt.Errorf("标签删除用户异常:%s", gconv.String(rMap["errmsg"]))
+		return fmt.Errorf("%s 标签删除用户异常:%s", appId, gconv.String(rMap["errmsg"]))
 	}
 	return nil
 }
 
 // allGroup 获取所有分组
-func (wsg *WxSignGroup) allGroup(ctx context.Context) ([]*SignGroup, error) {
-	res, err := g.Client().Get(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/get?access_token=%s", wsg.getToken()))
+func (wsg *WxSignGroup) allGroup(ctx context.Context, appId string) ([]*SignGroup, error) {
+	res, err := g.Client().Get(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/get?access_token=%s", wsg.GetWxAccessToken(ctx, appId)))
 	if err != nil {
 		return nil, err
 	}
@@ -274,9 +291,9 @@ func (wsg *WxSignGroup) allGroup(ctx context.Context) ([]*SignGroup, error) {
 }
 
 // getGroupUsers 获取所有分组
-func (wsg *WxSignGroup) getGroupUsers(ctx context.Context, groupId int64, next_openid string) ([]string, string, error) {
+func (wsg *WxSignGroup) getGroupUsers(ctx context.Context, appId string, groupId int64, next_openid string) ([]string, string, error) {
 	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
-		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/user/tag/get?access_token=%s", wsg.getToken()), g.Map{
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/user/tag/get?access_token=%s", wsg.GetWxAccessToken(ctx, appId)), g.Map{
 			"tagid":       groupId,
 			"next_openid": next_openid,
 		})