|
@@ -12,13 +12,16 @@ import (
|
|
|
"encoding/pem"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ _ "github.com/gogf/gf/contrib/nosql/redis/v2"
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
+ "github.com/gogf/gf/v2/os/gcron"
|
|
|
"github.com/gogf/gf/v2/os/gctx"
|
|
|
"github.com/gogf/gf/v2/os/glog"
|
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
|
"hash"
|
|
|
+ "io"
|
|
|
"net/url"
|
|
|
"strings"
|
|
|
"time"
|
|
@@ -67,19 +70,16 @@ func initPush(ctx context.Context) *pushManager {
|
|
|
|
|
|
func main() {
|
|
|
ctx := gctx.New()
|
|
|
- //c, e := gcron.Add(ctx, g.Cfg().MustGet(ctx, "runCron2", "5 * * * * *").String(), jyPushManager.RunJob)
|
|
|
- //if e != nil {
|
|
|
- // g.Log().Errorf(ctx, "initAndUpdateDeptManager cron任务异常 %v", e)
|
|
|
- //}
|
|
|
- //c.Status()
|
|
|
- if err := jyPushManager.DoPush(ctx, 2, 1, "91320214579517104U", "2024-09-10"); err != nil {
|
|
|
- g.Log().Print(ctx, err)
|
|
|
+ c, e := gcron.Add(ctx, g.Cfg().MustGet(ctx, "runCron", "# 0 2 * * *").String(), jyPushManager.RunJob)
|
|
|
+ if e != nil {
|
|
|
+ g.Log().Errorf(ctx, "initAndUpdateDeptManager cron任务异常 %v", e)
|
|
|
}
|
|
|
- //jyPushManager.RunJob(ctx)
|
|
|
+ c.Status()
|
|
|
select {}
|
|
|
}
|
|
|
|
|
|
func (m *pushManager) RunJob(ctx context.Context) {
|
|
|
+
|
|
|
sess := common.MG.DB("log").GetMgoConn()
|
|
|
defer common.MG.DB().DestoryMongoConn(sess)
|
|
|
|
|
@@ -91,6 +91,7 @@ func (m *pushManager) RunJob(ctx context.Context) {
|
|
|
bson.D{
|
|
|
{"$group", bson.D{
|
|
|
{"_id", "$credit_no"},
|
|
|
+ {"s_entId", bson.D{{"$first", "$s_entId"}}},
|
|
|
{"unique_users", bson.D{
|
|
|
{"$addToSet", bson.D{
|
|
|
{"$cond", bson.D{
|
|
@@ -111,18 +112,23 @@ func (m *pushManager) RunJob(ctx context.Context) {
|
|
|
{"$project", bson.D{
|
|
|
{"_id", 1},
|
|
|
{"count", 1},
|
|
|
+ {"s_entId", 1},
|
|
|
{"unique_user_count", bson.D{
|
|
|
{"$size", "$unique_users"},
|
|
|
}},
|
|
|
}},
|
|
|
},
|
|
|
}
|
|
|
+ var (
|
|
|
+ startTime = time.Now()
|
|
|
+ total, codeErrTotal, successTotal int
|
|
|
+ )
|
|
|
// 执行聚合查询
|
|
|
cursor, err := sess.M.C.Database("qfw").Collection("jy_zhima_logs").Aggregate(context.TODO(), pipeline)
|
|
|
if err != nil {
|
|
|
glog.Panic(ctx, err)
|
|
|
}
|
|
|
-
|
|
|
+ g.Log().Infof(ctx, "查询%s请求日志耗时%f秒", yesterday.Format(time.DateOnly), time.Now().Sub(startTime).Seconds())
|
|
|
defer cursor.Close(context.TODO())
|
|
|
|
|
|
// 遍历结果
|
|
@@ -134,32 +140,40 @@ func (m *pushManager) RunJob(ctx context.Context) {
|
|
|
}
|
|
|
var (
|
|
|
ent = gconv.String(result["_id"])
|
|
|
+ sid = gconv.String(result["s_entId"])
|
|
|
pv = gconv.Int(result["count"])
|
|
|
uv = gconv.Int(result["unique_user_count"])
|
|
|
)
|
|
|
- if ent != "" && pv > 0 {
|
|
|
- if err := m.DoPush(ctx, pv, uv, ent, start.Format(time.DateOnly)); err != nil {
|
|
|
+ if ent != "" && sid != "" && pv > 0 {
|
|
|
+ code, err := m.DoPush(ctx, pv, uv, ent, sid, start.Format(time.DateOnly))
|
|
|
+ if err != nil {
|
|
|
g.Log().Errorf(ctx, "上报数据异常\nparam ent:%v pv:%v uv:%v\nerror: %v", ent, pv, uv, err)
|
|
|
}
|
|
|
+ switch code {
|
|
|
+ case 1:
|
|
|
+ successTotal++
|
|
|
+ case -1:
|
|
|
+ codeErrTotal++
|
|
|
+ }
|
|
|
+ total++
|
|
|
}
|
|
|
}
|
|
|
+ g.Log().Infof(ctx, "上报%s数据完成 成功%d个 获取code失败%d个 共%d个耗时%f秒", yesterday.Format(time.DateOnly), total, successTotal, codeErrTotal, time.Now().Sub(startTime).Seconds())
|
|
|
}
|
|
|
|
|
|
-func (m *pushManager) DoPush(ctx context.Context, pv, uv int, ent, dateStr string) error {
|
|
|
- //g.Log().Printf(ctx, "模拟上报数据\nparam ent:%v pv:%v uv:%v", ent, pv, uv)
|
|
|
- //return nil
|
|
|
- actionContent := getCode(ent)
|
|
|
+func (m *pushManager) DoPush(ctx context.Context, pv, uv int, ent, sid, dateStr string) (int, error) {
|
|
|
+ actionContent := getCode(ent, sid)
|
|
|
if len(actionContent) == 0 {
|
|
|
- return nil
|
|
|
+ return -1, nil
|
|
|
}
|
|
|
-
|
|
|
+ g.Log().Debugf(ctx, "模拟上报数据\nparam ent:%v pv:%v uv:%v codes:%v", ent, pv, uv, actionContent)
|
|
|
bizC := bizContent{
|
|
|
FeedbackData: []feedBackData{
|
|
|
{
|
|
|
EpCertNo: ent,
|
|
|
ActionType: "PV",
|
|
|
CountType: "accurate",
|
|
|
- ActionContent: []string{"shortvideo_sales_top", "shortvideo_sales_top_num"},
|
|
|
+ ActionContent: actionContent,
|
|
|
ActionDate: dateStr,
|
|
|
ActionCount: gconv.String(pv),
|
|
|
},
|
|
@@ -167,7 +181,7 @@ func (m *pushManager) DoPush(ctx context.Context, pv, uv int, ent, dateStr strin
|
|
|
EpCertNo: ent,
|
|
|
ActionType: "UV",
|
|
|
CountType: "accurate",
|
|
|
- ActionContent: []string{"label_definition", "shortvideo_sales_top_num"},
|
|
|
+ ActionContent: actionContent,
|
|
|
ActionDate: dateStr,
|
|
|
ActionCount: gconv.String(uv),
|
|
|
},
|
|
@@ -176,7 +190,7 @@ func (m *pushManager) DoPush(ctx context.Context, pv, uv int, ent, dateStr strin
|
|
|
}
|
|
|
dataStr, err := json.Marshal(bizC)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return 0, err
|
|
|
}
|
|
|
var data = url.Values{}
|
|
|
data.Add("app_id", g.Cfg().MustGet(ctx, "ali.appid").String())
|
|
@@ -191,22 +205,32 @@ func (m *pushManager) DoPush(ctx context.Context, pv, uv int, ent, dateStr strin
|
|
|
signContentBytes, _ := url.QueryUnescape(data.Encode())
|
|
|
signature, err := m.getSign([]byte(signContentBytes))
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return 0, err
|
|
|
}
|
|
|
-
|
|
|
data.Add("sign", signature)
|
|
|
-
|
|
|
- fmt.Println(fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
|
|
|
-
|
|
|
+ //fmt.Println(fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
|
|
|
res, err := g.Client().Post(ctx, fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return 0, err
|
|
|
}
|
|
|
defer res.Close()
|
|
|
+ pushResp := PushResp{}
|
|
|
+ if err := gconv.Struct(res.ReadAll(), &pushResp); err != nil {
|
|
|
+ return 0, err
|
|
|
+ }
|
|
|
+ if pushResp.ZhimaCreditEpAcceptanceLabelUseResponse.Msg == "Success" {
|
|
|
+ return 1, nil
|
|
|
+ } else {
|
|
|
+ return 0, fmt.Errorf("上报失败")
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- respByte := res.ReadAll()
|
|
|
- g.Log().Print(ctx, "---", string(respByte))
|
|
|
- return nil
|
|
|
+type PushResp struct {
|
|
|
+ ZhimaCreditEpAcceptanceLabelUseResponse struct {
|
|
|
+ Code string `json:"code"`
|
|
|
+ Msg string `json:"msg"`
|
|
|
+ } `json:"zhima_credit_ep_acceptance_label_use_response"`
|
|
|
+ Sign string `json:"sign"`
|
|
|
}
|
|
|
|
|
|
func (m *pushManager) getSign(data []byte) (signature string, err error) {
|
|
@@ -241,14 +265,44 @@ func parsePrivateKey(key string) (*rsa.PrivateKey, error) {
|
|
|
return rsaPrivateKey.(*rsa.PrivateKey), nil
|
|
|
}
|
|
|
|
|
|
-func getCode(creditNo string) (rData []string) {
|
|
|
- res, _ := common.MG.DB().FindOneByField("qyxy_std", map[string]interface{}{
|
|
|
- "credit_no": creditNo,
|
|
|
- }, `{"zhima_labels":1}`)
|
|
|
- if res != nil && len(*res) > 0 {
|
|
|
- for _, m := range gconv.Maps((*res)["zhima_labels"]) {
|
|
|
- rData = append(rData, gconv.String(m["zhima_code"]))
|
|
|
+func getCode(creditNo, entId string) (rData []string) {
|
|
|
+ ctx := context.TODO()
|
|
|
+ if entId != "" { //通过接口查询芝麻code
|
|
|
+ cList := func() []map[string]interface{} {
|
|
|
+ cacheKey := fmt.Sprintf("zhima_%s", entId)
|
|
|
+ gv, _ := g.Redis().Get(ctx, cacheKey)
|
|
|
+ if !gv.IsNil() && len(gv.Maps()) > 0 {
|
|
|
+ return gv.Maps()
|
|
|
+ }
|
|
|
+ zhimaUrl := fmt.Sprintf(g.Cfg().MustGet(ctx, "zhima.api", "https://api.jianyu360.com/data/getzhima?id=%s&appid=jianyu360").String(), entId)
|
|
|
+ res, err := g.Client().Get(ctx, zhimaUrl)
|
|
|
+ if err != nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ defer res.Close()
|
|
|
+ if data, err := io.ReadAll(res.Body); err == nil && len(data) > 0 {
|
|
|
+ if m := gconv.Map(strings.ReplaceAll(gconv.String(data), "\n", "")); len(m) > 0 {
|
|
|
+ if rMaps := gconv.Maps(m["data"]); len(rMaps) > 0 {
|
|
|
+ g.Redis().SetEX(ctx, cacheKey, rMaps, 60*60*24)
|
|
|
+ return rMaps
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }()
|
|
|
+ for _, m := range cList {
|
|
|
+ code := gconv.String(m["zhima_code"])
|
|
|
+ rData = append(rData, code)
|
|
|
}
|
|
|
+ return
|
|
|
}
|
|
|
+ //res, _ := common.MG.DB("ent").FindOneByField("qyxy_std", map[string]interface{}{
|
|
|
+ // "credit_no": creditNo,
|
|
|
+ //}, `{"zhima_labels":1}`)
|
|
|
+ //if res != nil && len(*res) > 0 {
|
|
|
+ // for _, m := range gconv.Maps((*res)["zhima_labels"]) {
|
|
|
+ // rData = append(rData, gconv.String(m["zhima_code"]))
|
|
|
+ // }
|
|
|
+ //}
|
|
|
return
|
|
|
}
|