Pārlūkot izejas kodu

支持 按 小时处理预处理数据

wcc 1 gadu atpakaļ
vecāks
revīzija
69de9cf883
4 mainītis faili ar 102 papildinājumiem un 12 dzēšanām
  1. 2 2
      pre_extract/config.toml
  2. 70 4
      pre_extract/es.go
  3. 5 6
      pre_extract/main.go
  4. 25 0
      pre_extract/pre_test.go

+ 2 - 2
pre_extract/config.toml

@@ -23,10 +23,10 @@
     nextport = "1177"           ## 调用抽取端口
     localport = ":1176"         ## 本地监听端口
     send = false                ## 是否给抽取发送数据;正式环境应为 true
-    spec = "0 31 16 * * *"      ## 每天23点执行;已日更新的方式创建索引
+    spec = "59 59 * * * *"      ## 每天23点执行;已日更新的方式创建索引
     ## spec = "0 00 01 * * *"   ## 每天01点执行;适合 按照月份创建索引
     alias = ["bidding_pre","bidding_pre2","bidding_pre3"]      ## 预处理索引别名,支持多个别名
-    spectype = "day" ## 定时任务类型;正式环境应该是 month 。day 表示每天创建一个索引;month 表示每个月创建一个
+    spectype = "hour" ## 定时任务类型;正式环境应该是 month 。day 表示每天创建一个索引;month 表示每个月创建一个;hour 每个小时创建一次
 
 [[es]]  ## es集群,支持多个集群创建 预处理索引
     [es.19805]  ## 测试环境 es集群

+ 70 - 4
pre_extract/es.go

@@ -122,8 +122,8 @@ var mapping = `    "mappings": {
         }
     }`
 
-//createIndex 创建索引
-func createIndex(clients map[string]*elastic.Client, PreBiddingIndex string) error {
+//CreateIndex 创建索引
+func CreateIndex(clients map[string]*elastic.Client, PreBiddingIndex string) error {
 	//createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping)
 
 	for k, client := range clients {
@@ -302,6 +302,72 @@ func deleteIndex(clients map[string]*elastic.Client, index string) error {
 	return nil
 }
 
+//dealIndexByHour 处理预处理索引,根据小时;
+func dealIndexByHour() {
+	now := time.Now()
+	PreBiddingIndex := ""
+
+	var clients = make(map[string]*elastic.Client, 0)
+	for k, v := range GF.ES {
+		url := v.URL
+		username := v.Username
+		password := v.Password
+		// 创建 Elasticsearch 客户端
+		client, err := elastic.NewClient(
+			elastic.SetURL(url),
+			elastic.SetBasicAuth(username, password),
+			elastic.SetSniff(false),
+		)
+		if err != nil {
+			log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
+		}
+		clients[k] = client
+	}
+
+	next := now.Add(time.Hour)
+	month := int(next.Month())
+	monthStr := strconv.Itoa(month)
+	year := next.Year()
+	yearStr := strconv.Itoa(year)
+	dayStr := strconv.Itoa(next.Day())
+	hour := next.Hour()
+	hourStr := strconv.Itoa(hour)
+	//下一天的索引名称
+	PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr + hourStr
+
+	err := CreateIndex(clients, PreBiddingIndex)
+	if err != nil {
+		log.Info("dealIndexByHour", zap.Error(err))
+		SendMail("预处理索引", "预处理索引创建失败,请检查")
+	}
+
+	log.Info("dealIndexByHour", zap.String(PreBiddingIndex, "创建成功"))
+	//3. 删除昨天的索引
+	last := now.Add(-time.Hour)
+	month2 := int(last.Month())
+	monthStr2 := strconv.Itoa(month2)
+	year2 := last.Year()
+	yearStr2 := strconv.Itoa(year2)
+	dayStr2 := strconv.Itoa(last.Day())
+	hour2 := last.Hour()
+	hourStr2 := strconv.Itoa(hour2)
+	//索引名称
+	lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2 + hourStr2
+	err = deleteIndex(clients, lastIndex)
+	if err != nil {
+		log.Info("dealIndexByHour", zap.Error(err))
+	}
+
+	//4. 删除bidding_extract  过期数据
+	where := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lte": last.Unix(),
+		},
+		"is_pre": 1,
+	}
+	MgoB.Delete("bidding_extract", where)
+}
+
 //dealIndexByDay 处理预处理索引,根据天;
 func dealIndexByDay() {
 	now := time.Now()
@@ -336,7 +402,7 @@ func dealIndexByDay() {
 	//下一天的索引名称
 	PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr
 
-	err := createIndex(clients, PreBiddingIndex)
+	err := CreateIndex(clients, PreBiddingIndex)
 	if err != nil {
 		log.Info("dealIndexByDay", zap.Error(err))
 		SendMail("预处理索引", "预处理索引创建失败,请检查")
@@ -403,7 +469,7 @@ func dealIndexByMonth() {
 		PreBiddingIndex = "bidding_" + yearStr + monthStr
 
 		//2		创建下个月索引结构
-		err := createIndex(clients, PreBiddingIndex)
+		err := CreateIndex(clients, PreBiddingIndex)
 		if err != nil {
 			log.Info("dealIndexByMonth", zap.Error(err))
 			SendMail("预处理索引", "预处理索引创建失败,请检查")

+ 5 - 6
pre_extract/main.go

@@ -68,14 +68,13 @@ func main() {
 		if err != nil {
 			log.Info("main", zap.Any("AddFunc err", err))
 		}
+	} else if GF.Env.SpecType == "hour" {
+		_, err := c.AddFunc(GF.Env.Spec, dealIndexByHour)
+		if err != nil {
+			log.Info("main", zap.Any("AddFunc err", err))
+		}
 	}
 
-	////切换别名-定时任务
-	//_, err := c.AddFunc(GF.Env.SwitchSpec, SwitchAlias)
-	//if err != nil {
-	//	log.Info("main", zap.Any("AddFunc err", err))
-	//}
-
 	c.Start()
 	defer c.Stop()
 

+ 25 - 0
pre_extract/pre_test.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"fmt"
+	"github.com/olivere/elastic/v7"
 	"strconv"
 	"testing"
 	"time"
@@ -39,3 +40,27 @@ func TestDeleteIndex(t *testing.T) {
 	}
 
 }
+
+//TestCreateIndex 创建索引
+func TestCreateIndex(t *testing.T) {
+	var clients = make(map[string]*elastic.Client, 0)
+	for k, v := range GF.ES {
+		url := v.URL
+		username := v.Username
+		password := v.Password
+		// 创建 Elasticsearch 客户端
+		client, err := elastic.NewClient(
+			elastic.SetURL(url),
+			elastic.SetBasicAuth(username, password),
+			elastic.SetSniff(false),
+		)
+		if err != nil {
+			fmt.Println("创建 Elasticsearch 客户端失败", err)
+		}
+		clients[k] = client
+	}
+
+	err := CreateIndex(clients, "bidding_2023112213")
+	fmt.Println(err)
+
+}