فهرست منبع

wip:提交微信关注标签

wkyuer 5 ماه پیش
والد
کامیت
1fed522c0e
4فایلهای تغییر یافته به همراه483 افزوده شده و 0 حذف شده
  1. 2 0
      go.mod
  2. 27 0
      wxSign/config.yaml
  3. 136 0
      wxSign/main.go
  4. 318 0
      wxSign/wxSignGroup/group.go

+ 2 - 0
go.mod

@@ -4,6 +4,7 @@ go 1.20
 
 require (
 	app.yhyue.com/moapp/jybase v0.0.0-20240805110713-0c17face82c4
+	github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
 	github.com/gogf/gf/contrib/drivers/clickhouse/v2 v2.7.4
 	github.com/gogf/gf/contrib/drivers/mysql/v2 v2.7.3
 	github.com/gogf/gf/contrib/nosql/redis/v2 v2.7.3
@@ -63,5 +64,6 @@ require (
 	golang.org/x/sync v0.6.0 // indirect
 	golang.org/x/sys v0.19.0 // indirect
 	golang.org/x/text v0.14.0 // indirect
+	golang.org/x/time v0.9.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
 )

+ 27 - 0
wxSign/config.yaml

@@ -0,0 +1,27 @@
+mongodb:
+  default:
+    address: "192.168.3.149:27180"
+    size: 5
+    dbName: qfw
+    replSet: ""
+    userName: ""
+    password: ""
+#  default:
+#    address: "127.0.0.1:27080"
+#    size: 10
+#    dbName: "qfw"
+
+logger:
+  level: "all" #all info warn
+  path: "logs" # 日志文件路径。默认为空,表示关闭,仅输出到终端
+  file: "{Y-m-d}.log" # 日志文件格式。默认为"{Y-m-d}.log"
+
+# 每天凌晨2点推送前一天访问数据
+runCron: "# 04 14 * * *"
+
+redis:
+  default: # 配置seo的redis
+    address: 192.168.3.149:1713
+
+wxAppId: "wxd6f3e855ef4258cd"
+subDaySign: [1,3,7,9]

+ 136 - 0
wxSign/main.go

@@ -0,0 +1,136 @@
+package main
+
+import (
+	"context"
+	_ "github.com/gogf/gf/contrib/nosql/redis/v2"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gcron"
+	"github.com/gogf/gf/v2/os/gctx"
+	"go.mongodb.org/mongo-driver/bson"
+	"time"
+	"workTasks/common"
+	"workTasks/wxSign/wxSignGroup"
+)
+
+type (
+	UserSubAction struct {
+		S_m_openid string `bson:"s_m_openid"`
+		LastData   int64  `bson:"latest_l_date"`
+	}
+)
+
+func main() {
+	ctx := gctx.New()
+	_, err := gcron.Add(gctx.New(), g.Cfg().MustGet(ctx, "runCron", "# 0 2 * * *").String(), func(ctx context.Context) {
+		//ctx := context.Background()
+		nwsm, err := wxSignGroup.NewWxSignManager()
+		if err != nil {
+			panic(err)
+		}
+		nwsm.LoadGroupUser(ctx)
+		nwsm.ClearGroupUser(ctx)
+
+		//同步最新标签
+		for i, arr := range logNewSign() {
+			if err := nwsm.AddNewUsers(i, arr...); err != nil {
+				panic(err)
+			}
+		}
+		nwsm.LoadGroupUser(ctx)
+	}, "userWxSign")
+	if err != nil {
+		panic(err)
+	}
+	select {}
+}
+
+func logNewSign() map[int][]string {
+	var (
+		dayMaps      = map[int]bool{}
+		signList     = map[int][]string{}
+		now          = time.Now()
+		signData     = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Unix()
+		maxDay   int = 0
+	)
+	for _, dayNum := range g.Cfg().MustGet(context.Background(), "subDaySign").Ints() {
+		if dayNum > maxDay {
+			maxDay = dayNum
+		}
+		dayMaps[dayNum] = true
+	}
+	g.Dump(dayMaps)
+
+	subActionList, err := GetUserSubList(-maxDay)
+	if err != nil {
+		panic(err)
+	}
+	for _, action := range subActionList {
+		sign := int((signData-action.LastData)/(60*60*24)) + 1
+		if dayMaps[sign] {
+			signList[sign] = append(signList[sign], action.S_m_openid)
+		}
+	}
+	//加载分组完成,开始更新
+	g.Dump(signList)
+	return signList
+}
+
+func GetUserSubList(day int) ([]*UserSubAction, error) {
+	//获取30天时间戳
+	now := time.Now()
+	endTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+	startTime := endTime.AddDate(0, 0, day) // 30 天前
+
+	g.Log().Infof(context.Background(), "开始加载 %d-%d关注数据", startTime.Unix(), endTime.Unix())
+
+	client := common.MG.DB().C
+	collection := client.Database("qfw").Collection("jy_subscribe")
+	// 创建聚合管道
+	pipeline := []bson.M{
+		{
+			// 第一个阶段:筛选符合条件的文档
+			"$match": bson.M{"l_date": bson.M{"$gt": startTime.Unix(), "$lt": endTime.Unix()}, "s_event": "subscribe"},
+		},
+		{
+			"$group": bson.M{
+				"_id":           "$s_m_openid",             // 按 s_m_openid 分组
+				"latest_l_date": bson.M{"$max": "$l_date"}, // 计算每个 s_m_openid 出现的次数
+			},
+		},
+		{
+			"$project": bson.M{
+				"_id":           0,
+				"s_m_openid":    "$_id",           // 显示 s_m_openid
+				"latest_l_date": "$latest_l_date", // 显示出现次数
+			},
+		},
+		{
+			"$sort": bson.M{
+				"latest_l_date": -1, // 按 count 降序排序
+			},
+		},
+	}
+	// 执行聚合查询
+	cursor, err := collection.Aggregate(context.Background(), pipeline)
+	if err != nil {
+		g.Log().Fatalf(context.Background(), "Failed to execute aggregation: %v", err)
+		return nil, err
+	}
+	defer cursor.Close(context.Background())
+	var list []*UserSubAction
+	// 遍历结果并打印
+	for cursor.Next(context.Background()) {
+		var result UserSubAction
+		if err := cursor.Decode(&result); err != nil {
+			g.Log().Fatalf(context.Background(), "Failed to decode result: %v", err)
+		}
+		list = append(list, &result)
+	}
+
+	// 检查遍历过程中是否出错
+	if err := cursor.Err(); err != nil {
+		g.Log().Fatalf(context.Background(), "Cursor error: %v", err)
+		return nil, err
+	}
+	return list, nil
+}

+ 318 - 0
wxSign/wxSignGroup/group.go

@@ -0,0 +1,318 @@
+package wxSignGroup
+
+import (
+	"context"
+	"fmt"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"regexp"
+	"time"
+)
+
+type (
+	SignGroup struct {
+		Id    int64  `json:"id"`
+		Name  string `json:"name"`
+		Count int64  `json:"count"`
+	}
+	AccessToken struct {
+		AccessToken string `json:"access_token"`
+		ExpiresIn   int64  `json:"expires_in"`
+		LastTime    int64  `json:"last_time"`
+	}
+	GroupFull struct {
+		Sign     *SignGroup
+		Old, New []string
+	}
+)
+
+type WxSignGroup struct {
+	data map[int]*GroupFull
+}
+
+func NewWxSignManager() (*WxSignGroup, error) {
+	//加载所有分组
+	var (
+		wsg  = &WxSignGroup{}
+		data = map[int]*GroupFull{}
+		ctx  = context.Background()
+
+		re = regexp.MustCompile(`^关注第(\d+)天$`)
+	)
+	//加载分组信息
+	list, err := wsg.allGroup(ctx)
+	if err != nil {
+		return nil, err
+	}
+	for _, day := range g.Cfg().MustGet(ctx, "subDaySign").Ints() {
+		var isExists = false
+		for _, group := range list {
+			match := re.FindStringSubmatch(group.Name)
+			if len(match) == 0 {
+				continue
+			}
+			s := gconv.Int(match[1])
+			if s == day {
+				data[day] = &GroupFull{
+					Sign: group,
+					Old:  []string{},
+					New:  []string{},
+				}
+				isExists = true
+				break
+			}
+		}
+		if !isExists {
+			newGroupName := fmt.Sprintf("关注第%d天", day)
+			newGroup, err := wsg.greatGroup(ctx, newGroupName)
+			if err != nil {
+				g.Log().Errorf(ctx, "创建%s异常 %v", newGroupName, err)
+				continue
+			}
+			data[day] = &GroupFull{
+				Sign: newGroup,
+				Old:  []string{},
+				New:  []string{},
+			}
+		}
+	}
+	wsg.data = data
+	return wsg, nil
+}
+
+func (wsg *WxSignGroup) getToken() string {
+	for {
+		ctx := context.Background()
+		gv, err := g.Redis().Get(ctx, fmt.Sprintf("WxToken_%s", g.Cfg().MustGet(ctx, "wxAppId").String()))
+		if err != nil {
+			g.Log().Error(ctx, "获取accessToken异常", err)
+			continue
+		}
+		var accessToken AccessToken
+		if err := gv.Struct(&accessToken); err != nil {
+			g.Log().Error(ctx, "获取反序列化异常", err)
+			continue
+		}
+		if time.Now().Unix() > accessToken.ExpiresIn {
+			g.Log().Error(ctx, "token已过期", err)
+			continue
+		}
+		return accessToken.AccessToken
+	}
+}
+
+func (wsg *WxSignGroup) AddNewUsers(dayNum int, openids ...string) error {
+	group, ok := wsg.data[dayNum]
+	if !ok {
+		return fmt.Errorf("未知标签")
+	}
+	group.New = append(group.New, openids...)
+	return nil
+}
+
+// LoadGroupUser 加载标签下的用户
+func (wsg *WxSignGroup) LoadGroupUser(ctx context.Context) {
+	//a, b, c := wsg.getGroupUsers(ctx, 0, "")
+	//fmt.Println(a, b, c)
+	//return
+
+	//加载标签对应用户
+	for _, group := range wsg.data {
+		var nextOpenid string
+		group.Old = []string{}
+		for {
+			userIds, nextId, err := wsg.getGroupUsers(ctx, group.Sign.Id, nextOpenid)
+			if err != nil {
+				g.Log().Errorf(ctx, "加载%s用户异常", group.Sign.Name)
+				break
+			}
+			group.Old = append(group.Old, userIds...)
+			if nextId == "" {
+				break
+			}
+			nextOpenid = nextId
+		}
+		g.Log().Infof(ctx, "加载 %s 标签下 %d用户,实际加载%d个", group.Sign.Name, group.Sign.Count, len(group.Old))
+	}
+	g.Dump(wsg.data)
+}
+
+// ClearGroupUser 清除标签下的用户
+func (wsg *WxSignGroup) ClearGroupUser(ctx context.Context) {
+	for _, group := range wsg.data {
+		for _, openids := range chunkArray(group.Old, 50) {
+			if err := wsg.groupDelUser(ctx, group.Sign.Id, openids...); err != nil {
+				g.Log().Errorf(ctx, "清除 %s标签下的用户异常 %v", group.Sign.Name, err)
+			}
+		}
+	}
+}
+
+// UpdateNewGroupUser 更新最新的标签用户
+func (wsg *WxSignGroup) UpdateNewGroupUser(ctx context.Context) {
+	for _, group := range wsg.data {
+		for _, openids := range chunkArray(group.New, 50) {
+			if err := wsg.groupAddUser(ctx, group.Sign.Id, openids...); err != nil {
+				g.Log().Errorf(ctx, "%s标签添加用户异常 %v", group.Sign.Name, err)
+			}
+		}
+	}
+}
+
+func (wsg *WxSignGroup) DelGroup(ctx context.Context, ids ...int) {
+	for _, groupId := range ids {
+		if err := wsg.delGroup(ctx, gconv.Int64(groupId)); err != nil {
+			g.Log().Errorf(ctx, "删除群组异常 %v", err)
+		}
+	}
+}
+
+// greatGroup 创建分组
+func (wsg *WxSignGroup) greatGroup(ctx context.Context, name string) (*SignGroup, error) {
+	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/create?access_token=%s", wsg.getToken()), g.Map{
+			"tag": g.Map{"name": name},
+		})
+	if err != nil {
+		return nil, err
+	}
+	defer res.Close()
+
+	var rData SignGroup
+
+	cMap := gconv.Map(res.ReadAll())
+	if errMsg := gconv.String(cMap["errmsg"]); errMsg != "" {
+		return nil, fmt.Errorf(errMsg)
+	}
+	if err := gconv.Struct(cMap["tag"], &rData); err != nil {
+		return nil, err
+	}
+	return &rData, nil
+}
+
+// delGroup 删除分组
+func (wsg *WxSignGroup) delGroup(ctx context.Context, groupId int64) error {
+	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/delete?access_token=%s", wsg.getToken()), g.Map{
+			"tag": g.Map{"id": groupId},
+		})
+	if err != nil {
+		return err
+	}
+	defer res.Close()
+
+	rMap := gconv.Map(res.ReadAll())
+	if gconv.String(rMap["errmsg"]) != "ok" {
+		return fmt.Errorf("删除分组异常:%s", gconv.String(rMap["errmsg"]))
+	}
+	return nil
+}
+
+// groupAddUser 分组添加用户
+func (wsg *WxSignGroup) groupAddUser(ctx context.Context, groupId int64, openids ...string) error {
+	if len(openids) == 0 {
+		return nil
+	}
+	if len(openids) > 50 {
+		return fmt.Errorf("超出最大限制")
+	}
+	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/members/batchtagging?access_token=%s", wsg.getToken()), g.Map{
+			"tagid":       groupId,
+			"openid_list": openids,
+		})
+	if err != nil {
+		return err
+	}
+	defer res.Close()
+
+	rMap := gconv.Map(res.ReadAll())
+	if gconv.String(rMap["errmsg"]) != "ok" {
+		return fmt.Errorf("添加用户异常:%s", gconv.String(rMap["errmsg"]))
+	}
+	return nil
+}
+
+// groupDelUser 分组删除用户
+func (wsg *WxSignGroup) groupDelUser(ctx context.Context, groupId int64, openids ...string) error {
+	if len(openids) == 0 {
+		return nil
+	}
+	if len(openids) > 50 {
+		return fmt.Errorf("超出最大限制")
+	}
+	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/members/batchuntagging?access_token=%s", wsg.getToken()), g.Map{
+			"tagid":       groupId,
+			"openid_list": openids,
+		})
+	if err != nil {
+		return err
+	}
+	defer res.Close()
+
+	rMap := gconv.Map(res.ReadAll())
+	if gconv.String(rMap["errmsg"]) != "ok" {
+		return fmt.Errorf("标签删除用户异常:%s", gconv.String(rMap["errmsg"]))
+	}
+	return nil
+}
+
+// allGroup 获取所有分组
+func (wsg *WxSignGroup) allGroup(ctx context.Context) ([]*SignGroup, error) {
+	res, err := g.Client().Get(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/tags/get?access_token=%s", wsg.getToken()))
+	if err != nil {
+		return nil, err
+	}
+	defer res.Close()
+	var rData []*SignGroup
+	m := gconv.Map(res.ReadAll())
+	if err := gconv.Struct(m["tags"], &rData); err != nil {
+		return nil, err
+	}
+	return rData, nil
+}
+
+// getGroupUsers 获取所有分组
+func (wsg *WxSignGroup) getGroupUsers(ctx context.Context, groupId int64, next_openid string) ([]string, string, error) {
+	res, err := g.Client().Header(map[string]string{"Content-Type": "application/json"}).
+		Post(ctx, fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/user/tag/get?access_token=%s", wsg.getToken()), g.Map{
+			"tagid":       groupId,
+			"next_openid": next_openid,
+		})
+	if err != nil {
+		return nil, "", err
+	}
+	defer res.Close()
+
+	var rData = struct {
+		Count int `json:"count"`
+		Data  struct {
+			Openid []string `json:"openid"`
+		} `json:"data"`
+		NextOpenid string `json:"next_openid"`
+	}{}
+
+	if err := gconv.Struct(res.ReadAll(), &rData); err != nil {
+		return nil, "", err
+	}
+	if rData.Data.Openid != nil && len(rData.Data.Openid) > 0 {
+		return rData.Data.Openid, rData.NextOpenid, nil
+	}
+	return nil, "", nil
+}
+
+func chunkArray(arr []string, size int) [][]string {
+	var result [][]string
+	// 遍历一维数组,每次取 size 个元素
+	for i := 0; i < len(arr); i += size {
+		// 计算切片的结束位置,确保不超出数组的长度
+		end := i + size
+		if end > len(arr) {
+			end = len(arr)
+		}
+		// 将切片添加到结果数组中
+		result = append(result, arr[i:end])
+	}
+	return result
+}