Browse Source

定时删除 多余监控数据

wcc 1 year ago
parent
commit
9c9688f8cd
3 changed files with 115 additions and 4 deletions
  1. 4 3
      bidding_listen/config.go
  2. 1 0
      bidding_listen/config.toml
  3. 110 1
      bidding_listen/main.go

+ 4 - 3
bidding_listen/config.go

@@ -20,9 +20,10 @@ type MgoConf struct {
 
 //CronConf 定时任务
 type CronConf struct {
-	Spec  string
-	Start int
-	End   int
+	Spec   string
+	Start  int
+	End    int
+	Delete int
 }
 
 type EspConf struct {

+ 1 - 0
bidding_listen/config.toml

@@ -28,6 +28,7 @@
     spec = "0 00 20 * * *"   ## 每天20点执行
     start = -1               ## 表示开始时间昨天凌晨
     end = 0                  ## 表示截止时间到今天凌晨
+    delete = -30              ## 删除30天之前的数据
 
 
 [mysql]

+ 110 - 1
bidding_listen/main.go

@@ -5,6 +5,7 @@ import (
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/mysqldb"
+	"context"
 	"fmt"
 	"github.com/robfig/cron/v3"
 	"go.uber.org/zap"
@@ -41,9 +42,116 @@ func specData() {
 	dealBidding()
 	time.Sleep(time.Minute * 10)
 	dealProject()
+	go deleteData()
 	log.Info("main", zap.String("结束", "over"))
 }
 
+//deleteData 删除数据
+func deleteData() {
+	now := time.Now()
+	deleteTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Delete, 0, 0, 0, 0, now.Location())
+	deleteStr := deleteTime.Format("2006-01-02 15:04:05")
+	rowsPerPage := 10000
+
+	finalId := 0
+	lastSql := fmt.Sprintf(`
+	 SELECT
+             t.id
+
+         FROM
+             ods_datamonitoring_bidding AS t
+
+         WHERE   t.comeintime < %s
+
+         ORDER BY t.id DESC  LIMIT 1
+`, deleteStr)
+
+	lastInfo := Mysql.SelectBySql(lastSql)
+	if len(*lastInfo) > 0 {
+		finalId = utils.IntAll((*lastInfo)[0]["id"])
+	}
+
+	log.Info("dealData", zap.Int("finalId", finalId))
+	lastid, total := 0, 0
+
+	for {
+		query := fmt.Sprintf(`
+SELECT
+              t.id
+          FROM
+              ods_datamonitoring_bidding AS t
+
+         WHERE t.id > %d  && t.comeintime < %s
+         ORDER BY t.id ASC
+
+         LIMIT %d;
+     `, lastid, deleteStr, rowsPerPage)
+
+		ctx := context.Background()
+		rows, err := Mysql.DB.QueryContext(ctx, query)
+		if err != nil {
+			log.Info("dealData", zap.Any("QueryContext err", err))
+		}
+
+		if finalId == lastid {
+			log.Info("dealData over", zap.Any("total", total), zap.Any("lastid", lastid))
+			break
+		}
+
+		columns, err := rows.Columns()
+		if err != nil {
+			log.Info("dealData", zap.Any("rows.Columns", err))
+		}
+
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Info("dealData", zap.Any("rows.Scan", err))
+				break
+			}
+
+			for i, col := range values {
+				if v, ok := col.([]uint8); ok {
+					ret[columns[i]] = string(v)
+				} else {
+					ret[columns[i]] = col
+				}
+			}
+
+			total++
+			if total%1000 == 0 {
+				log.Info("dealData", zap.Int("current total", total))
+			}
+			lastid = utils.IntAll(ret["id"])
+
+			where := map[string]interface{}{
+				"id": ret["id"],
+			}
+			Mysql.Delete("ods_datamonitoring_bidding", where)
+
+			ret = make(map[string]interface{})
+		}
+
+		rows.Close()
+
+		if err := rows.Err(); err != nil {
+			log.Info("buyer", zap.Any("err", err))
+		}
+
+	}
+
+	log.Info("dealData", zap.Int("结束,total:", total))
+
+}
+
 func dealBidding() {
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
@@ -133,7 +241,8 @@ func saveBidding(tmp map[string]interface{}) {
 	}
 
 	if utils.IntAll(tmp["extracttype"]) == -1 || utils.IntAll(tmp["dataprocess"]) == 7 {
-		insert["is_repeat"] = 1
+		//insert["is_repeat"] = 1
+		return
 	} else {
 		insert["is_repeat"] = 0
 	}