|
@@ -0,0 +1,237 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "crypto"
|
|
|
+ "crypto/rand"
|
|
|
+ "crypto/rsa"
|
|
|
+ "crypto/sha256"
|
|
|
+ "crypto/x509"
|
|
|
+ "encoding/base64"
|
|
|
+ "encoding/json"
|
|
|
+ "encoding/pem"
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "github.com/gogf/gf/v2/frame/g"
|
|
|
+ "github.com/gogf/gf/v2/os/gctx"
|
|
|
+ "github.com/gogf/gf/v2/os/glog"
|
|
|
+ "github.com/gogf/gf/v2/util/gconv"
|
|
|
+ "go.mongodb.org/mongo-driver/bson"
|
|
|
+ "go.mongodb.org/mongo-driver/mongo"
|
|
|
+ "hash"
|
|
|
+ "net/url"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+ "workTasks/common"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ jyPushManager *pushManager = initPush(gctx.New())
|
|
|
+)
|
|
|
+
|
|
|
+type (
|
|
|
+ pushManager struct {
|
|
|
+ Appid string
|
|
|
+ SceneCode string
|
|
|
+ ApiPath string
|
|
|
+ PrivateKey *rsa.PrivateKey
|
|
|
+ }
|
|
|
+
|
|
|
+ bizContent struct {
|
|
|
+ FeedbackData []feedBackData `json:"feedback_data"`
|
|
|
+ SceneCode string `json:"scene_code"` //JIANYU_LABEL
|
|
|
+ }
|
|
|
+
|
|
|
+ feedBackData struct {
|
|
|
+ EpCertNo string `json:"ep_cert_no"`
|
|
|
+ ActionType string `json:"action_type"`
|
|
|
+ CountType string `json:"count_type"`
|
|
|
+ ActionContent []string `json:"action_content"`
|
|
|
+ ActionDate string `json:"action_date"`
|
|
|
+ ActionCount string `json:"action_count"`
|
|
|
+ }
|
|
|
+)
|
|
|
+
|
|
|
+func initPush(ctx context.Context) *pushManager {
|
|
|
+ privateKey, err := parsePrivateKey(g.Cfg().MustGet(ctx, "ali.privateKey").String())
|
|
|
+ if err != nil {
|
|
|
+ g.Log().Panic(ctx, err)
|
|
|
+ }
|
|
|
+ return &pushManager{
|
|
|
+ Appid: g.Cfg().MustGet(ctx, "ali.appid").String(),
|
|
|
+ ApiPath: g.Cfg().MustGet(ctx, "ali.apiPath").String(),
|
|
|
+ SceneCode: g.Cfg().MustGet(ctx, "ali.sceneCode").String(),
|
|
|
+ PrivateKey: privateKey,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ ctx := gctx.New()
|
|
|
+ //c, e := gcron.Add(ctx, g.Cfg().MustGet(ctx, "runCron2", "5 * * * * *").String(), jyPushManager.RunJob)
|
|
|
+ //if e != nil {
|
|
|
+ // g.Log().Errorf(ctx, "initAndUpdateDeptManager cron任务异常 %v", e)
|
|
|
+ //}
|
|
|
+ //c.Status()
|
|
|
+ if err := jyPushManager.DoPush(ctx, 2, 1, "91320214579517104U", "2024-09-10"); err != nil {
|
|
|
+ g.Log().Print(ctx, err)
|
|
|
+ }
|
|
|
+ //jyPushManager.RunJob(ctx)
|
|
|
+ select {}
|
|
|
+}
|
|
|
+
|
|
|
+func (m *pushManager) RunJob(ctx context.Context) {
|
|
|
+ sess := common.MG.DB("log").GetMgoConn()
|
|
|
+ defer common.MG.DB().DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ yesterday := time.Now().AddDate(0, 0, -1)
|
|
|
+ start := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, yesterday.Location())
|
|
|
+
|
|
|
+ pipeline := mongo.Pipeline{
|
|
|
+ bson.D{{"$match", g.Map{"l_date": g.Map{"$gte": start.Unix(), "$lt": start.Unix() + 60*60*24}}}},
|
|
|
+ bson.D{
|
|
|
+ {"$group", bson.D{
|
|
|
+ {"_id", "$credit_no"},
|
|
|
+ {"unique_users", bson.D{
|
|
|
+ {"$addToSet", bson.D{
|
|
|
+ {"$cond", bson.D{
|
|
|
+ {"if", bson.D{
|
|
|
+ {"$eq", []interface{}{"$s_userId", ""}},
|
|
|
+ }},
|
|
|
+ {"then", "empty"},
|
|
|
+ {"else", "$s_userId"},
|
|
|
+ }},
|
|
|
+ }},
|
|
|
+ }},
|
|
|
+ {"count", bson.D{
|
|
|
+ {"$sum", 1},
|
|
|
+ }},
|
|
|
+ }},
|
|
|
+ },
|
|
|
+ bson.D{
|
|
|
+ {"$project", bson.D{
|
|
|
+ {"_id", 1},
|
|
|
+ {"count", 1},
|
|
|
+ {"unique_user_count", bson.D{
|
|
|
+ {"$size", "$unique_users"},
|
|
|
+ }},
|
|
|
+ }},
|
|
|
+ },
|
|
|
+ }
|
|
|
+ // 执行聚合查询
|
|
|
+ cursor, err := sess.M.C.Database("qfw").Collection("jy_zhima_logs").Aggregate(context.TODO(), pipeline)
|
|
|
+ if err != nil {
|
|
|
+ glog.Panic(ctx, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ defer cursor.Close(context.TODO())
|
|
|
+
|
|
|
+ // 遍历结果
|
|
|
+ for cursor.Next(context.TODO()) {
|
|
|
+ var result bson.M
|
|
|
+ if err := cursor.Decode(&result); err != nil {
|
|
|
+ g.Log().Errorf(ctx, "获取行数据异常 %v", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ var (
|
|
|
+ ent = gconv.String(result["_id"])
|
|
|
+ pv = gconv.Int(result["count"])
|
|
|
+ uv = gconv.Int(result["unique_user_count"])
|
|
|
+ )
|
|
|
+ if ent != "" && pv > 0 {
|
|
|
+ if err := m.DoPush(ctx, pv, uv, ent, start.Format(time.DateOnly)); err != nil {
|
|
|
+ g.Log().Errorf(ctx, "上报数据异常\nparam ent:%v pv:%v uv:%v\nerror: %v", ent, pv, uv, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (m *pushManager) DoPush(ctx context.Context, pv, uv int, ent, dateStr string) error {
|
|
|
+ //g.Log().Printf(ctx, "模拟上报数据\nparam ent:%v pv:%v uv:%v", ent, pv, uv)
|
|
|
+ //return nil
|
|
|
+ bizC := bizContent{
|
|
|
+ FeedbackData: []feedBackData{
|
|
|
+ {
|
|
|
+ EpCertNo: ent,
|
|
|
+ ActionType: "PV",
|
|
|
+ CountType: "accurate",
|
|
|
+ ActionContent: []string{"shortvideo_sales_top", "shortvideo_sales_top_num"},
|
|
|
+ ActionDate: dateStr,
|
|
|
+ ActionCount: gconv.String(pv),
|
|
|
+ },
|
|
|
+ {
|
|
|
+ EpCertNo: ent,
|
|
|
+ ActionType: "UV",
|
|
|
+ CountType: "accurate",
|
|
|
+ ActionContent: []string{"label_definition", "shortvideo_sales_top_num"},
|
|
|
+ ActionDate: dateStr,
|
|
|
+ ActionCount: gconv.String(uv),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ SceneCode: m.SceneCode,
|
|
|
+ }
|
|
|
+ dataStr, err := json.Marshal(bizC)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ var data = url.Values{}
|
|
|
+ data.Add("app_id", g.Cfg().MustGet(ctx, "ali.appid").String())
|
|
|
+ data.Add("method", "zhima.credit.ep.acceptance.label.use")
|
|
|
+ data.Add("format", "json")
|
|
|
+ data.Add("charset", "UTF-8")
|
|
|
+ data.Add("sign_type", "RSA2")
|
|
|
+ data.Add("timestamp", time.Now().Format("2006-01-02 15:04:05"))
|
|
|
+ data.Add("version", "1.0")
|
|
|
+ data.Add("biz_content", string(dataStr))
|
|
|
+
|
|
|
+ signContentBytes, _ := url.QueryUnescape(data.Encode())
|
|
|
+ signature, err := m.getSign([]byte(signContentBytes))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ data.Add("sign", signature)
|
|
|
+
|
|
|
+ fmt.Println(fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
|
|
|
+
|
|
|
+ res, err := g.Client().Post(ctx, fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ defer res.Close()
|
|
|
+
|
|
|
+ respByte := res.ReadAll()
|
|
|
+ g.Log().Print(ctx, "---", string(respByte))
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (m *pushManager) getSign(data []byte) (signature string, err error) {
|
|
|
+ var h hash.Hash
|
|
|
+ var hType crypto.Hash
|
|
|
+ h = sha256.New()
|
|
|
+ hType = crypto.SHA256
|
|
|
+
|
|
|
+ h.Write(data)
|
|
|
+ d := h.Sum(nil)
|
|
|
+ bs, err := rsa.SignPKCS1v15(rand.Reader, m.PrivateKey, hType, d)
|
|
|
+ if err != nil {
|
|
|
+ return "", err
|
|
|
+ }
|
|
|
+ signature = base64.StdEncoding.EncodeToString(bs)
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// 加载秘钥
|
|
|
+func parsePrivateKey(key string) (*rsa.PrivateKey, error) {
|
|
|
+ block, _ := pem.Decode([]byte(key))
|
|
|
+ if block == nil {
|
|
|
+ return nil, errors.New("私钥格式不正确")
|
|
|
+ }
|
|
|
+ if strings.ToUpper(block.Type) != "RSA PRIVATE KEY" {
|
|
|
+ return nil, errors.New("私钥类型不正确" + block.Type)
|
|
|
+ }
|
|
|
+ rsaPrivateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return rsaPrivateKey.(*rsa.PrivateKey), nil
|
|
|
+}
|