Jianghan vor 8 Monaten
Ursprung
Commit
34239c31d2
7 geänderte Dateien mit 69 neuen und 22 gelöschten Zeilen
  1. 2 2
      clueEs/go.mod
  2. 3 12
      clueEs/go.sum
  3. 45 1
      clueEs/main.go
  4. 1 1
      clueEs/task.go
  5. 1 0
      doFreeClueSign/config.yaml
  6. 11 4
      doFreeClueSign/job/job.go
  7. 6 2
      doFreeClueSign/main.go

+ 2 - 2
clueEs/go.mod

@@ -3,13 +3,14 @@ module clueEs
 go 1.20
 
 require (
+	app.yhyue.com/moapp/jybase v0.0.0-20240805110713-0c17face82c4
+	github.com/robfig/cron/v3 v3.0.1
 	github.com/segmentio/kafka-go v0.4.47
 	github.com/spf13/cobra v1.8.1
 	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240202055658-e2ef72e18b40
 )
 
 require (
-	app.yhyue.com/moapp/jybase v0.0.0-20240805110713-0c17face82c4 // indirect
 	github.com/PuerkitoBio/goquery v1.8.0 // indirect
 	github.com/andybalholm/cascadia v1.3.1 // indirect
 	github.com/dchest/captcha v1.0.0 // indirect
@@ -29,7 +30,6 @@ require (
 	github.com/xdg-go/stringprep v1.0.4 // indirect
 	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
 	go.mongodb.org/mongo-driver v1.10.1 // indirect
-	go.uber.org/multierr v1.10.0 // indirect
 	golang.org/x/crypto v0.14.0 // indirect
 	golang.org/x/net v0.17.0 // indirect
 	golang.org/x/sync v0.1.0 // indirect

+ 3 - 12
clueEs/go.sum

@@ -1,10 +1,4 @@
 app.yhyue.com/moapp/esv1 v0.0.0-20220414031211-3da4123e648d/go.mod h1:91/lSD/hS+ckMVP3WdidRzDhC60lLMdyce9QHy0cSMA=
-app.yhyue.com/moapp/jybase v0.0.0-20240626030750-115a3c0929fb h1:LstR4tQbICqo2MO0A6za4rci4Y/lw+Nf898GlImARZM=
-app.yhyue.com/moapp/jybase v0.0.0-20240626030750-115a3c0929fb/go.mod h1:XHNATN6tsJKHdCB0DbUtFdPPHXexTUFyB3RlO+lUUoM=
-app.yhyue.com/moapp/jybase v0.0.0-20240805074155-db07f61335b3 h1:Cu80hl1f3ZOOx4jFROfOzjmnsVTciRltfnY1lMgqVN4=
-app.yhyue.com/moapp/jybase v0.0.0-20240805074155-db07f61335b3/go.mod h1:XHNATN6tsJKHdCB0DbUtFdPPHXexTUFyB3RlO+lUUoM=
-app.yhyue.com/moapp/jybase v0.0.0-20240805105653-d9352550f0b6 h1:nkQCwvoH6TGpt+jKQoafgLR+u5Lkw9OSro4XCqJ2qTc=
-app.yhyue.com/moapp/jybase v0.0.0-20240805105653-d9352550f0b6/go.mod h1:XHNATN6tsJKHdCB0DbUtFdPPHXexTUFyB3RlO+lUUoM=
 app.yhyue.com/moapp/jybase v0.0.0-20240805110713-0c17face82c4 h1:YPgzn9rGR0+eHGuJAIDXOjmGySAWSErBrdjndX3uVog=
 app.yhyue.com/moapp/jybase v0.0.0-20240805110713-0c17face82c4/go.mod h1:XHNATN6tsJKHdCB0DbUtFdPPHXexTUFyB3RlO+lUUoM=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
@@ -190,6 +184,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
 github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
 github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
@@ -214,8 +210,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
-github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
@@ -250,16 +246,11 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
 go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
-go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
 go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
-go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
-go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
 go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
 go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
-go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
-go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

+ 45 - 1
clueEs/main.go

@@ -5,6 +5,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/robfig/cron/v3"
 	"github.com/segmentio/kafka-go"
 	"github.com/spf13/cobra"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
@@ -27,6 +28,7 @@ func init() {
 		PassWord: "Tibi#20211222",
 	}
 	Tidb.Init()
+
 }
 
 func main() {
@@ -38,16 +40,28 @@ func main() {
 	if err := rootCmd.Execute(); err != nil {
 		fmt.Println("rootCmd.Execute failed", err.Error())
 	}
+
+	select {}
 }
 
 func addData() *cobra.Command {
+	var pici int64
 	cmdClient := &cobra.Command{
 		Use:   "add",
 		Short: "Start processing add data",
 		Run: func(cmd *cobra.Command, args []string) {
-			taskAdd()
+			//taskAdd()
+
+			taskAdd1(pici)
+			crn := cron.New(cron.WithSeconds())
+			cronstr := "0 */5 * * * *"
+			crn.AddFunc(cronstr, func() {
+				taskAdd1(pici)
+			})
+			crn.Start()
 		},
 	}
+	cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time")
 	return cmdClient
 }
 
@@ -81,6 +95,36 @@ func taskAdd() {
 	}
 }
 
+func taskAdd1(pici int64) {
+	sql := `SELECT id, uid, userid, position_id, seatNumber, is_assign, comeintime, createtime, updatetime, cluename FROM dwd_f_crm_clue_info WHERE updatetime >= ? ORDER BY id ASC`
+	sql1 := `SELECT count(1) FROM dwd_f_crm_clue_info WHERE updatetime >= ?`
+	log.Println("轮次开始,查询到数据量: ", Tidb.CountBySql(sql1, util.FormatDateByInt64(&pici, util.Date_Full_Layout)))
+	info := Tidb.SelectBySql(sql, util.FormatDateByInt64(&pici, util.Date_Full_Layout))
+	if info != nil && len(*info) > 0 {
+		for _, data := range *info {
+			save := make(map[string]interface{})
+			for _, v := range []string{"id", "uid", "userid", "position_id", "seatNumber", "is_assign", "comeintime", "createtime", "updatetime", "cluename"} {
+				if v == "id" {
+					save[v] = fmt.Sprint(data[v])
+				} else if v == "updatetime" {
+					t1, _ := data[v].(time.Time)
+					save[v] = t1.Unix()
+					if t1.Unix() > pici {
+						pici = t1.Unix()
+					}
+				} else if v == "comeintime" || v == "createtime" {
+					t1, _ := data[v].(time.Time)
+					save[v] = t1.Unix()
+				} else {
+					save[v] = data[v]
+				}
+			}
+			elastic.UpdateNew(esIndex, save)
+		}
+	}
+	log.Println(fmt.Sprintf("轮次结束,last time: %d", pici))
+}
+
 func formatMsg(msg []byte) {
 	msgInfo := make(map[string]interface{})
 	err := json.Unmarshal(msg, &msgInfo)

+ 1 - 1
clueEs/task.go

@@ -14,7 +14,7 @@ var (
 	saveEs []map[string]interface{}
 
 	SaveEsLock = &sync.Mutex{}
-	esIndex    = ""
+	esIndex    = "clue_info"
 )
 
 // @Author jianghan

+ 1 - 0
doFreeClueSign/config.yaml

@@ -1,4 +1,5 @@
 lastId: 258768
+historyData: true
 mongodb:
   default: #qfw
     address: "192.168.3.149:27180"

+ 11 - 4
doFreeClueSign/job/job.go

@@ -2,6 +2,7 @@ package job
 
 import (
 	"context"
+	"doFreeClueSign/db"
 	"doFreeClueSign/public"
 	"github.com/gogf/gf/v2/encoding/gjson"
 	"github.com/gogf/gf/v2/frame/g"
@@ -126,6 +127,7 @@ func loadOrder() {
 			}
 		}
 	}
+	g.Log().Infof(ctx, "loadOrder end: %d", LastId)
 }
 
 func LoadOrderOther() {
@@ -172,17 +174,22 @@ func LoadOrderHis() {
 			}
 			userid := gconv.String(m["user_id"])
 			now := time.Now().Format(time.DateTime)
-			g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 1 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 1)`, userid, now, userid)
+			user, _ := db.MG.DB().FindById("user", userid, `{"i_vip_status":1, "i_member_status": 1}`)
+			if user != nil && len(*user) > 0 {
+				if gconv.Int((*user)["i_vip_status"]) <= 0 && gconv.Int((*user)["i_member_status"]) <= 0 {
+					g.DB("bi_service").Exec(ctx, `INSERT INTO user_statistics (userId, createTime, event) SELECT ?, ?, 1 WHERE NOT EXISTS ( SELECT 1 FROM user_statistics WHERE userId = ? AND event = 1)`, userid, now, userid)
+				}
+			}
 		}
 	}
-	g.Log().Infof(ctx, "LoadOrderHis end")
+	g.Log().Info(ctx, "LoadOrderHis end")
 }
 
 // @Author jianghan
 // @Description 11月9日注册成功
 // @Date 2024/11/20
 func LoadOrderHisMore() {
-	g.Log().Infof(ctx, "LoadOrderHisMore start")
+	g.Log().Info(ctx, "LoadOrderHisMore start")
 	now := time.Now().Format(time.DateTime)
 	sql := `SELECT id, filter, order_code, product_type, user_id, user_phone, vip_starttime, vip_endtime FROM dataexport_order WHERE order_status = 1 AND vip_endtime > '2024-11-16 00:00:00' AND vip_endtime < ? AND filter LIKE '%2024年新用户注册赠送7天超级订阅%' ORDER BY id DESC`
 	sql1 := `SELECT id, vip_endtime FROM jianyu.dataexport_order WHERE (product_type = 'VIP订阅' OR product_type = '大会员') AND id > ? AND user_id = ? AND order_status = 1`
@@ -197,5 +204,5 @@ func LoadOrderHisMore() {
 			}
 		}
 	}
-	g.Log().Infof(ctx, "LoadOrderHisMore end")
+	g.Log().Info(ctx, "LoadOrderHisMore end")
 }

+ 6 - 2
doFreeClueSign/main.go

@@ -1,13 +1,17 @@
 package main
 
 import (
+	"context"
 	"doFreeClueSign/job"
 	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
+	"github.com/gogf/gf/v2/frame/g"
 )
 
 func main() {
-	go job.LoadOrderHis()
-	go job.LoadOrderHisMore()
+	if g.Cfg().MustGet(context.TODO(), "historyData").Bool() {
+		go job.LoadOrderHis()
+		go job.LoadOrderHisMore()
+	}
 
 	job.InitJobManager()