Bläddra i källkod

wip:芝麻推送并发优化

wangkaiyue 10 månader sedan
förälder
incheckning
3886ef3b33
3 ändrade filer med 218 tillägg och 178 borttagningar
  1. 4 4
      zhimaPush/config.yaml
  2. 42 174
      zhimaPush/main.go
  3. 172 0
      zhimaPush/push.go

+ 4 - 4
zhimaPush/config.yaml

@@ -1,11 +1,11 @@
 mongodb:
 mongodb:
   log:
   log:
-    address: "192.168.3.149:27190"
+    address: "127.0.0.1:27090"
     size: 5
     size: 5
     dbName: "qfw"
     dbName: "qfw"
     replSet: ""
     replSet: ""
-    userName: "admin"
-    password: "123456"
+    userName: "jianyu"
+    password: "jylog2020_123"
   ent:
   ent:
     address: "127.0.0.1:27099"
     address: "127.0.0.1:27099"
     size: 5
     size: 5
@@ -23,7 +23,7 @@ redis:
     address: 192.168.3.149:1712
     address: 192.168.3.149:1712
 
 
 # 每天凌晨2点推送前一天访问数据
 # 每天凌晨2点推送前一天访问数据
-runCron: "# 0 2 * * *"
+runCron: "# 04 14 * * *"
 
 
 ali:
 ali:
   appid: "2021003111633206"
   appid: "2021003111633206"

+ 42 - 174
zhimaPush/main.go

@@ -2,16 +2,7 @@ package main
 
 
 import (
 import (
 	"context"
 	"context"
-	"crypto"
-	"crypto/rand"
 	"crypto/rsa"
 	"crypto/rsa"
-	"crypto/sha256"
-	"crypto/x509"
-	"encoding/base64"
-	"encoding/json"
-	"encoding/pem"
-	"errors"
-	"fmt"
 	_ "github.com/gogf/gf/contrib/nosql/redis/v2"
 	_ "github.com/gogf/gf/contrib/nosql/redis/v2"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/gcron"
@@ -20,10 +11,6 @@ import (
 	"github.com/gogf/gf/v2/util/gconv"
 	"github.com/gogf/gf/v2/util/gconv"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/mongo"
 	"go.mongodb.org/mongo-driver/mongo"
-	"hash"
-	"io"
-	"net/url"
-	"strings"
 	"time"
 	"time"
 	"workTasks/common"
 	"workTasks/common"
 )
 )
@@ -38,6 +25,8 @@ type (
 		SceneCode  string
 		SceneCode  string
 		ApiPath    string
 		ApiPath    string
 		PrivateKey *rsa.PrivateKey
 		PrivateKey *rsa.PrivateKey
+		Cache      chan *feedBackData
+		queryPool  chan bool
 	}
 	}
 
 
 	bizContent struct {
 	bizContent struct {
@@ -64,6 +53,8 @@ func initPush(ctx context.Context) *pushManager {
 		Appid:      g.Cfg().MustGet(ctx, "ali.appid").String(),
 		Appid:      g.Cfg().MustGet(ctx, "ali.appid").String(),
 		ApiPath:    g.Cfg().MustGet(ctx, "ali.apiPath").String(),
 		ApiPath:    g.Cfg().MustGet(ctx, "ali.apiPath").String(),
 		SceneCode:  g.Cfg().MustGet(ctx, "ali.sceneCode").String(),
 		SceneCode:  g.Cfg().MustGet(ctx, "ali.sceneCode").String(),
+		Cache:      make(chan *feedBackData, 500),
+		queryPool:  make(chan bool, 5),
 		PrivateKey: privateKey,
 		PrivateKey: privateKey,
 	}
 	}
 }
 }
@@ -75,7 +66,15 @@ func main() {
 		g.Log().Errorf(ctx, "initAndUpdateDeptManager cron任务异常 %v", e)
 		g.Log().Errorf(ctx, "initAndUpdateDeptManager cron任务异常 %v", e)
 	}
 	}
 	c.Status()
 	c.Status()
-	select {}
+	for {
+		ctx := context.TODO()
+		err := jyPushManager.doPush(ctx)
+		if err != nil {
+			g.Log().Errorf(ctx, "jyPushManager.doPush 异常 %v", err)
+		} else {
+			g.Log().Debugf(ctx, "上报成功")
+		}
+	}
 }
 }
 
 
 func (m *pushManager) RunJob(ctx context.Context) {
 func (m *pushManager) RunJob(ctx context.Context) {
@@ -119,8 +118,7 @@ func (m *pushManager) RunJob(ctx context.Context) {
 		},
 		},
 	}
 	}
 	var (
 	var (
-		startTime                         = time.Now()
-		total, codeErrTotal, successTotal int
+		startTime = time.Now()
 	)
 	)
 	// 执行聚合查询
 	// 执行聚合查询
 	cursor, err := sess.M.C.Database("qfw").Collection("jy_zhima_logs").Aggregate(context.TODO(), pipeline)
 	cursor, err := sess.M.C.Database("qfw").Collection("jy_zhima_logs").Aggregate(context.TODO(), pipeline)
@@ -144,170 +142,40 @@ func (m *pushManager) RunJob(ctx context.Context) {
 			uv  = gconv.Int(result["unique_user_count"])
 			uv  = gconv.Int(result["unique_user_count"])
 		)
 		)
 		if ent != "" && sid != "" && pv > 0 {
 		if ent != "" && sid != "" && pv > 0 {
-			code, err := m.DoPush(ctx, pv, uv, ent, sid, start.Format(time.DateOnly))
-			if err != nil {
-				g.Log().Errorf(ctx, "上报数据异常\nparam ent:%v pv:%v uv:%v\nerror: %v", ent, pv, uv, err)
-			}
-			switch code {
-			case 1:
-				successTotal++
-			case -1:
-				codeErrTotal++
-			}
-			total++
+			m.queryPool <- true
+			go func() {
+				defer func() { <-m.queryPool }()
+				_, err := m.ReadyPush(ctx, pv, uv, ent, sid, start.Format(time.DateOnly))
+				if err != nil {
+					g.Log().Errorf(ctx, "上报数据异常\nparam ent:%v pv:%v uv:%v\nerror: %v", ent, pv, uv, err)
+				}
+			}()
 		}
 		}
 	}
 	}
-	g.Log().Infof(ctx, "上报%s数据完成共%d个 成功%d个 获取code失败%d个 耗时%f秒", yesterday.Format(time.DateOnly), total, successTotal, codeErrTotal, time.Now().Sub(startTime).Seconds())
+	g.Log().Infof(ctx, "上报%s数据完成耗时%f秒", yesterday.Format(time.DateOnly), time.Now().Sub(startTime).Seconds())
 }
 }
 
 
-func (m *pushManager) DoPush(ctx context.Context, pv, uv int, ent, sid, dateStr string) (int, error) {
+func (m *pushManager) ReadyPush(ctx context.Context, pv, uv int, ent, sid, dateStr string) (int, error) {
 	actionContent := getCode(ent, sid)
 	actionContent := getCode(ent, sid)
 	if len(actionContent) == 0 {
 	if len(actionContent) == 0 {
 		return -1, nil
 		return -1, nil
 	}
 	}
-	g.Log().Debugf(ctx, "模拟上报数据\nparam ent:%v pv:%v uv:%v codes:%v", ent, pv, uv, actionContent)
-	bizC := bizContent{
-		FeedbackData: []feedBackData{
-			{
-				EpCertNo:      ent,
-				ActionType:    "PV",
-				CountType:     "accurate",
-				ActionContent: actionContent,
-				ActionDate:    dateStr,
-				ActionCount:   gconv.String(pv),
-			},
-			{
-				EpCertNo:      ent,
-				ActionType:    "UV",
-				CountType:     "accurate",
-				ActionContent: actionContent,
-				ActionDate:    dateStr,
-				ActionCount:   gconv.String(uv),
-			},
-		},
-		SceneCode: m.SceneCode,
-	}
-	dataStr, err := json.Marshal(bizC)
-	if err != nil {
-		return 0, err
-	}
-	var data = url.Values{}
-	data.Add("app_id", g.Cfg().MustGet(ctx, "ali.appid").String())
-	data.Add("method", "zhima.credit.ep.acceptance.label.use")
-	data.Add("format", "json")
-	data.Add("charset", "UTF-8")
-	data.Add("sign_type", "RSA2")
-	data.Add("timestamp", time.Now().Format("2006-01-02 15:04:05"))
-	data.Add("version", "1.0")
-	data.Add("biz_content", string(dataStr))
-
-	signContentBytes, _ := url.QueryUnescape(data.Encode())
-	signature, err := m.getSign([]byte(signContentBytes))
-	if err != nil {
-		return 0, err
-	}
-	data.Add("sign", signature)
-	//fmt.Println(fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
-	res, err := g.Client().Post(ctx, fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
-	if err != nil {
-		return 0, err
-	}
-	defer res.Close()
-	pushResp := PushResp{}
-	if err := gconv.Struct(res.ReadAll(), &pushResp); err != nil {
-		return 0, err
-	}
-	g.Log().Debug(ctx, pushResp)
-	if pushResp.ZhimaCreditEpAcceptanceLabelUseResponse.Msg == "Success" {
-		return 1, nil
-	} else {
-		return 0, fmt.Errorf("上报失败")
-	}
-}
-
-type PushResp struct {
-	ZhimaCreditEpAcceptanceLabelUseResponse struct {
-		Code string `json:"code"`
-		Msg  string `json:"msg"`
-	} `json:"zhima_credit_ep_acceptance_label_use_response"`
-	Sign string `json:"sign"`
-}
-
-func (m *pushManager) getSign(data []byte) (signature string, err error) {
-	var h hash.Hash
-	var hType crypto.Hash
-	h = sha256.New()
-	hType = crypto.SHA256
-
-	h.Write(data)
-	d := h.Sum(nil)
-	bs, err := rsa.SignPKCS1v15(rand.Reader, m.PrivateKey, hType, d)
-	if err != nil {
-		return "", err
-	}
-	signature = base64.StdEncoding.EncodeToString(bs)
-	return
-}
-
-// 加载秘钥
-func parsePrivateKey(key string) (*rsa.PrivateKey, error) {
-	block, _ := pem.Decode([]byte(key))
-	if block == nil {
-		return nil, errors.New("私钥格式不正确")
-	}
-	if strings.ToUpper(block.Type) != "RSA PRIVATE KEY" {
-		return nil, errors.New("私钥类型不正确" + block.Type)
-	}
-	rsaPrivateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
-	if err != nil {
-		return nil, err
-	}
-	return rsaPrivateKey.(*rsa.PrivateKey), nil
-}
-
-func getCode(creditNo, entId string) (rData []string) {
-	ctx := context.TODO()
-	//通过接口查询芝麻code
-	cList := func() []map[string]interface{} {
-		if entId != "" {
-			gv, _ := g.Redis().Get(ctx, fmt.Sprintf("zhima_%s", entId))
-			if !gv.IsNil() && len(gv.Maps()) > 0 {
-				return gv.Maps()
-			}
-		}
-		//从数据库查询
-		if creditNo != "" {
-			res, _ := common.MG.DB("ent").FindOneByField("qyxy_std", map[string]interface{}{
-				"credit_no": creditNo,
-			}, `{"zhima_labels":1}`)
-			if res != nil && len(*res) > 0 {
-				if arr := gconv.Maps((*res)["zhima_labels"]); len(arr) > 0 {
-					return arr
-				}
-			}
-		}
-		//从接口中获取
-		if entId != "" {
-			zhimaUrl := fmt.Sprintf(g.Cfg().MustGet(ctx, "zhima.api", "https://api.jianyu360.com/data/getzhima?id=%s&appid=jianyu360").String(), entId)
-			res, err := g.Client().Get(ctx, zhimaUrl)
-			if err != nil {
-				return nil
-			}
-			defer res.Close()
-			if data, err := io.ReadAll(res.Body); err == nil && len(data) > 0 {
-				if m := gconv.Map(strings.ReplaceAll(gconv.String(data), "\n", "")); len(m) > 0 {
-					if rMaps := gconv.Maps(m["data"]); len(rMaps) > 0 {
-						g.Redis().SetEX(ctx, fmt.Sprintf("zhima_%s", entId), rMaps, 60*60*24)
-						return rMaps
-					}
-				}
-			}
-		}
-		return nil
-	}()
-	for _, m := range cList {
-		code := gconv.String(m["zhima_code"])
-		rData = append(rData, code)
-	}
-	return
+	//g.Log().Debugf(ctx, "模拟上报数据\nparam ent:%v pv:%v uv:%v codes:%v", ent, pv, uv, actionContent)
+	m.Cache <- &feedBackData{
+		EpCertNo:      ent,
+		ActionType:    "PV",
+		CountType:     "accurate",
+		ActionContent: actionContent,
+		ActionDate:    dateStr,
+		ActionCount:   gconv.String(pv),
+	}
+	m.Cache <- &feedBackData{
+		EpCertNo:      ent,
+		ActionType:    "UV",
+		CountType:     "accurate",
+		ActionContent: actionContent,
+		ActionDate:    dateStr,
+		ActionCount:   gconv.String(uv),
+	}
+	return 1, nil
 }
 }

+ 172 - 0
zhimaPush/push.go

@@ -0,0 +1,172 @@
+package main
+
+import (
+	"context"
+	"crypto"
+	"crypto/rand"
+	"crypto/rsa"
+	"crypto/sha256"
+	"crypto/x509"
+	"encoding/base64"
+	"encoding/json"
+	"encoding/pem"
+	"errors"
+	"fmt"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/util/gconv"
+	"hash"
+	"io"
+	"net/url"
+	"strings"
+	"time"
+	"workTasks/common"
+)
+
+func (m *pushManager) doPush(ctx context.Context) error {
+	var (
+		arr []feedBackData
+	)
+	for {
+		select {
+		case data := <-m.Cache:
+			arr = append(arr, *data)
+		}
+		if len(arr) >= 10 {
+			break
+		}
+	}
+	if len(arr) == 0 {
+		return nil
+	}
+
+	bizC := bizContent{
+		FeedbackData: arr,
+		SceneCode:    m.SceneCode,
+	}
+	dataStr, err := json.Marshal(bizC)
+	if err != nil {
+		return fmt.Errorf("序列化数据异常 %v", err)
+	}
+	var data = url.Values{}
+	data.Add("app_id", g.Cfg().MustGet(ctx, "ali.appid").String())
+	data.Add("method", "zhima.credit.ep.acceptance.label.use")
+	data.Add("format", "json")
+	data.Add("charset", "UTF-8")
+	data.Add("sign_type", "RSA2")
+	data.Add("timestamp", time.Now().Format("2006-01-02 15:04:05"))
+	data.Add("version", "1.0")
+	data.Add("biz_content", string(dataStr))
+
+	signContentBytes, _ := url.QueryUnescape(data.Encode())
+	signature, err := m.getSign([]byte(signContentBytes))
+	if err != nil {
+		return fmt.Errorf("获取签名异常 %v", err)
+	}
+	data.Add("sign", signature)
+	g.Log().Debugf(ctx, "%s?%s", m.ApiPath, data.Encode())
+	res, err := g.Client().Post(ctx, fmt.Sprintf("%s?%s", m.ApiPath, data.Encode()))
+	if err != nil {
+		return fmt.Errorf("发送请求异常 %v", err)
+	}
+	defer res.Close()
+	pushResp := PushResp{}
+	bb := res.ReadAll()
+	g.Log().Debugf(ctx, string(bb))
+	if err := gconv.Struct(bb, &pushResp); err != nil {
+		return fmt.Errorf("获取上报内容失败 %v", err)
+	}
+	g.Log().Debug(ctx, pushResp)
+	if pushResp.ZhimaCreditEpAcceptanceLabelUseResponse.Msg == "Success" {
+		g.Log().Debugf(ctx, "上报成功%d个", len(arr))
+		return nil
+	} else {
+		return fmt.Errorf("上报失败")
+	}
+}
+
+type PushResp struct {
+	ZhimaCreditEpAcceptanceLabelUseResponse struct {
+		Code string `json:"code"`
+		Msg  string `json:"msg"`
+	} `json:"zhima_credit_ep_acceptance_label_use_response"`
+	Sign string `json:"sign"`
+}
+
+func (m *pushManager) getSign(data []byte) (signature string, err error) {
+	var h hash.Hash
+	var hType crypto.Hash
+	h = sha256.New()
+	hType = crypto.SHA256
+
+	h.Write(data)
+	d := h.Sum(nil)
+	bs, err := rsa.SignPKCS1v15(rand.Reader, m.PrivateKey, hType, d)
+	if err != nil {
+		return "", err
+	}
+	signature = base64.StdEncoding.EncodeToString(bs)
+	return
+}
+
+// 加载秘钥
+func parsePrivateKey(key string) (*rsa.PrivateKey, error) {
+	block, _ := pem.Decode([]byte(key))
+	if block == nil {
+		return nil, errors.New("私钥格式不正确")
+	}
+	if strings.ToUpper(block.Type) != "RSA PRIVATE KEY" {
+		return nil, errors.New("私钥类型不正确" + block.Type)
+	}
+	rsaPrivateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
+	if err != nil {
+		return nil, err
+	}
+	return rsaPrivateKey.(*rsa.PrivateKey), nil
+}
+
+func getCode(creditNo, entId string) (rData []string) {
+	ctx := context.TODO()
+	//通过接口查询芝麻code
+	cList := func() []map[string]interface{} {
+		if entId != "" {
+			gv, _ := g.Redis().Get(ctx, fmt.Sprintf("zhima_%s", entId))
+			if !gv.IsNil() && len(gv.Maps()) > 0 {
+				return gv.Maps()
+			}
+		}
+		//从数据库查询
+		if creditNo != "" {
+			res, _ := common.MG.DB("ent").FindOneByField("qyxy_std", map[string]interface{}{
+				"credit_no": creditNo,
+			}, `{"zhima_labels":1}`)
+			if res != nil && len(*res) > 0 {
+				if arr := gconv.Maps((*res)["zhima_labels"]); len(arr) > 0 {
+					return arr
+				}
+			}
+		}
+		//从接口中获取
+		if entId != "" {
+			zhimaUrl := fmt.Sprintf(g.Cfg().MustGet(ctx, "zhima.api", "https://api.jianyu360.com/data/getzhima?id=%s&appid=jianyu360").String(), entId)
+			res, err := g.Client().Get(ctx, zhimaUrl)
+			if err != nil {
+				return nil
+			}
+			defer res.Close()
+			if data, err := io.ReadAll(res.Body); err == nil && len(data) > 0 {
+				if m := gconv.Map(strings.ReplaceAll(gconv.String(data), "\n", "")); len(m) > 0 {
+					if rMaps := gconv.Maps(m["data"]); len(rMaps) > 0 {
+						g.Redis().SetEX(ctx, fmt.Sprintf("zhima_%s", entId), rMaps, 60*60*24)
+						return rMaps
+					}
+				}
+			}
+		}
+		return nil
+	}()
+	for _, m := range cList {
+		code := gconv.String(m["zhima_code"])
+		rData = append(rData, code)
+	}
+	return
+}