Selaa lähdekoodia

更新 预处理索引程序,完善 支持多个别名以及多个集群

wcc 1 vuosi sitten
vanhempi
commit
8ea4d858c7
5 muutettua tiedostoa jossa 231 lisäystä ja 218 poistoa
  1. 2 6
      pre_extract/config.go
  2. 11 16
      pre_extract/config.toml
  3. 194 191
      pre_extract/es.go
  4. 6 5
      pre_extract/main.go
  5. 18 0
      pre_extract/pre_test.go

+ 2 - 6
pre_extract/config.go

@@ -4,7 +4,7 @@ type GlobalConf struct {
 	MongoB MgoConf
 	Log    Log
 	Env    EnvConf
-	ES     EsConf
+	ES     map[string]EsConf
 	Email  EmailConf
 }
 
@@ -16,7 +16,7 @@ type EnvConf struct {
 	LocalPort  string
 	Spec       string
 	SwitchSpec string
-	Alias      string
+	Alias      []string
 	SpecType   string
 	Send       bool
 }
@@ -45,10 +45,6 @@ type EsConf struct {
 	URL      string
 	Username string
 	Password string
-
-	URL2      string
-	Username2 string
-	Password2 string
 }
 
 type EmailConf struct {

+ 11 - 16
pre_extract/config.toml

@@ -23,26 +23,21 @@
     nextport = "1177"           ## 调用抽取端口
     localport = ":1176"         ## 本地监听端口
     send = false                ## 是否给抽取发送数据;正式环境应为 true
-    spec = "0 00 23 * * *"      ## 每天23点执行;//已日更新的方式创建索引
+    spec = "0 00 23 * * *"      ## 每天23点执行;已日更新的方式创建索引
     ## spec = "0 00 01 * * *"   ## 每天01点执行;适合 按照月份创建索引
-    switchspec = "50 59 23 * * *"    ## 切换索引别名的定时任务
-    alias = "bidding_pre"       ## 预处理索引别名
+    alias = ["bidding_pre","bidding_pre2","bidding_pre3"]      ## 预处理索引别名,支持多个别名
     spectype = "day" ## 定时任务类型;正式环境应该是 month 。day 表示每天创建一个索引;month 表示每个月创建一个
 
-[es]
-
-    url = "http://192.168.3.149:9201" ## 测试环境
-    username = ""
-    password = ""
-#
-#    url = "http://127.0.0.1:19905"
-#    username = "jybid"
-#    password = "Top2023_JEB01i@31"
-
-#    url2 = ""
-#    username2 = ""
-#    password2 = ""
+[[es]]  ## es集群,支持多个集群创建 预处理索引
+    [es.19805]  ## 测试环境 es集群
+    url = "http://127.0.0.1:19805" ## 测试环境
+    username = "es_all"
+    password = "TopJkO2E_d1x"
 
+    [es.19905]
+    url = "http://127.0.0.1:19905" ## 测试环境
+    username = "jybid"
+    password = "Top2023_JEB01i@31"
 
 [email]
     api = "http://172.17.145.179:19281/_send/_mail"

+ 194 - 191
pre_extract/es.go

@@ -175,80 +175,64 @@ var mapping = `    "mappings": {
     }`
 
 //createIndex 创建索引
-func createIndex(client *elastic.Client, PreBiddingIndex string) error {
+func createIndex(clients map[string]*elastic.Client, PreBiddingIndex string) error {
 	createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping)
-	//fmt.Println(createJson)
-	//month := int(time.Now().Month())
-	//monthStr := strconv.Itoa(month)
-	//year := time.Now().Year()
-	//yearStr := strconv.Itoa(year)
-	////预处理索引名称
-	//PreBiddingIndex := "bidding_" + yearStr + monthStr
-	url := GF.ES.URL
-	username := GF.ES.Username
-	password := GF.ES.Password
-
-	// 创建 Elasticsearch 客户端
-	//client, err := elastic.NewClient(
-	//	elastic.SetURL(url),
-	//	elastic.SetBasicAuth(username, password),
-	//	elastic.SetSniff(false),
-	//)
-	//if err != nil {
-	//	log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
-	//}
 
-	exist, err := client.IndexExists(PreBiddingIndex).Do(context.Background())
-	if exist {
-		log.Info("CreateIndex", zap.String(PreBiddingIndex, "已经存在了"))
-		return err
-	}
+	for k, client := range clients {
+		url := GF.ES[k].URL
+		username := GF.ES[k].Username
+		password := GF.ES[k].Password
+
+		exist, err := client.IndexExists(PreBiddingIndex).Do(context.Background())
+		if exist {
+			log.Info("CreateIndex", zap.String(PreBiddingIndex, "已经存在了"))
+		}
 
-	//1. 开启节点平衡
-	// 设置临时的节点平衡设置
-	balanceSettings := `{
+		//1. 开启节点平衡
+		// 设置临时的节点平衡设置
+		balanceSettings := `{
   "transient" : {
     "cluster.routing.allocation.enable" : "all"
   }
 }`
 
-	requestURL := fmt.Sprintf("%s/_cluster/settings", url)
-	req, err := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(balanceSettings)))
-	if err != nil {
-		log.Error("开启节点平衡", zap.Error(err))
-	}
-	req.Header.Set("Content-Type", "application/json")
+		requestURL := fmt.Sprintf("%s/_cluster/settings", url)
+		req, err := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(balanceSettings)))
+		if err != nil {
+			log.Error("开启节点平衡", zap.Error(err))
+		}
+		req.Header.Set("Content-Type", "application/json")
 
-	// 添加身份验证头部
-	auth := username + ":" + password
-	basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
-	req.Header.Set("Authorization", basicAuth)
+		// 添加身份验证头部
+		auth := username + ":" + password
+		basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
+		req.Header.Set("Authorization", basicAuth)
 
-	clientQ := &http.Client{}
-	resp, err := clientQ.Do(req)
-	if err != nil {
-		log.Error("开启节点平衡", zap.Error(err))
-		return err
-	}
-	defer resp.Body.Close()
+		clientQ := &http.Client{}
+		resp, err := clientQ.Do(req)
+		if err != nil {
+			log.Error("开启节点平衡", zap.Error(err))
+			return err
+		}
+		defer resp.Body.Close()
 
-	if resp.StatusCode != http.StatusOK {
-		log.Fatal("设置节点平衡失败")
-		return err
-	}
+		if resp.StatusCode != http.StatusOK {
+			log.Fatal("设置节点平衡失败")
+			return err
+		}
 
-	fmt.Println("节点平衡已开启")
-	createIndexR, err := client.CreateIndex(PreBiddingIndex).BodyString(createJson).Do(context.Background())
-	if err != nil {
-		log.Error(PreBiddingIndex, zap.Error(err))
-		return err
-	}
-	if !createIndexR.Acknowledged {
-		log.Error("CreateIndex", zap.String(PreBiddingIndex, "创建索引失败"))
-		return err
-	}
+		fmt.Println(url, "节点平衡已开启")
+		//2. 创建索引
+		createIndexR, err := client.CreateIndex(PreBiddingIndex).BodyString(createJson).Do(context.Background())
+		if err != nil {
+			log.Error(PreBiddingIndex, zap.Error(err))
+			return err
+		}
+		if !createIndexR.Acknowledged {
+			log.Error("CreateIndex", zap.String(PreBiddingIndex, "创建索引失败"))
+			return err
+		}
 
-	defer func() {
 		//3. 关闭节点平衡
 		//设置临时的节点平衡设置
 		disableSettings := `{
@@ -270,25 +254,49 @@ func createIndex(client *elastic.Client, PreBiddingIndex string) error {
 			log.Error("关闭节点平衡", zap.Error(err))
 		}
 		defer resp2.Body.Close()
-		fmt.Println("节点平衡已关闭")
-	}()
+		fmt.Println(url, "节点平衡已关闭")
+
+		//4. 新索引 添加别名
+		for _, alias := range GF.Env.Alias {
+			_, err = client.Alias().Add(PreBiddingIndex, alias).Do(context.Background())
+			if err != nil {
+				log.Error("添加别名失败:", zap.Error(err))
+			}
+		}
+		log.Info("CreateIndex", zap.String(url, "索引别名 添加完毕"))
+	}
 
 	return nil
 }
 
 //deleteIndex 删除索引
-func deleteIndex(client *elastic.Client, index string) error {
-	exist, err := client.IndexExists(index).Do(context.Background())
-	if !exist {
-		return errors.New("索引" + index + "不存在")
-	}
-	if err != nil {
-		return err
+func deleteIndex(clients map[string]*elastic.Client, index string) error {
+	if len(clients) == 0 {
+		return errors.New("没有 配置 ES集群信息")
 	}
 
-	_, err = client.DeleteIndex(index).Do(context.Background())
-	if err != nil {
-		return err
+	for k, client := range clients {
+		exist, err := client.IndexExists(index).Do(context.Background())
+		if !exist {
+			log.Info("deleteIndex", zap.String(k, index+" 索引文件不存在"))
+		}
+		if err != nil {
+			return err
+		}
+
+		num, err := client.Count(index).Do(context.Background())
+		if err != nil {
+			return err
+		}
+		if num > 0 {
+			log.Info("deleteIndex", zap.String(k, "索引"+index+"还存在有效数据"))
+		}
+
+		_, err = client.DeleteIndex(index).Do(context.Background())
+		if err != nil {
+			return err
+		}
+
 	}
 	return nil
 }
@@ -297,13 +305,15 @@ func deleteIndex(client *elastic.Client, index string) error {
 func dealIndexByDay() {
 	now := time.Now()
 	PreBiddingIndex := ""
-	hour := now.Hour()
+	//hour := now.Hour()
 	// 判断当前时间是否时最后一个小时
-	if hour == 23 {
-		//当天最后一小时
-		url := GF.ES.URL
-		username := GF.ES.Username
-		password := GF.ES.Password
+	//if hour == 23 {
+	//当天最后一小时
+	var clients = make(map[string]*elastic.Client, 0)
+	for k, v := range GF.ES {
+		url := v.URL
+		username := v.Username
+		password := v.Password
 		// 创建 Elasticsearch 客户端
 		client, err := elastic.NewClient(
 			elastic.SetURL(url),
@@ -313,37 +323,39 @@ func dealIndexByDay() {
 		if err != nil {
 			log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
 		}
+		clients[k] = client
+	}
 
-		next := now.AddDate(0, 0, 1)
-		month := int(next.Month())
-		monthStr := strconv.Itoa(month)
-		year := next.Year()
-		yearStr := strconv.Itoa(year)
-		dayStr := strconv.Itoa(next.Day())
-		//下一天的索引名称
-		PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr
+	next := now.AddDate(0, 0, 1)
+	month := int(next.Month())
+	monthStr := strconv.Itoa(month)
+	year := next.Year()
+	yearStr := strconv.Itoa(year)
+	dayStr := strconv.Itoa(next.Day())
+	//下一天的索引名称
+	PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr
 
-		err = createIndex(client, PreBiddingIndex)
-		if err != nil {
-			log.Info("dealIndexByDay", zap.Error(err))
-			SendMail("预处理索引", "预处理索引创建失败,请检查")
-		}
+	err := createIndex(clients, PreBiddingIndex)
+	if err != nil {
+		log.Info("dealIndexByDay", zap.Error(err))
+		SendMail("预处理索引", "预处理索引创建失败,请检查")
+	}
 
-		log.Info("dealIndexByDay", zap.String(PreBiddingIndex, "创建成功"))
-		//3. 删除昨天的索引
-		last := now.AddDate(0, 0, -1)
-		month2 := int(last.Month())
-		monthStr2 := strconv.Itoa(month2)
-		year2 := last.Year()
-		yearStr2 := strconv.Itoa(year2)
-		dayStr2 := strconv.Itoa(last.Day())
-		//索引名称
-		lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2
-		err = deleteIndex(client, lastIndex)
-		if err != nil {
-			log.Info("dealIndexByDay", zap.Error(err))
-		}
+	log.Info("dealIndexByDay", zap.String(PreBiddingIndex, "创建成功"))
+	//3. 删除昨天的索引
+	last := now.AddDate(0, 0, -1)
+	month2 := int(last.Month())
+	monthStr2 := strconv.Itoa(month2)
+	year2 := last.Year()
+	yearStr2 := strconv.Itoa(year2)
+	dayStr2 := strconv.Itoa(last.Day())
+	//索引名称
+	lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2
+	err = deleteIndex(clients, lastIndex)
+	if err != nil {
+		log.Info("dealIndexByDay", zap.Error(err))
 	}
+	//}
 }
 
 //dealIndexByMonth 处理预处理索引,根据月份;提前一天创建好 下个月的索引
@@ -356,18 +368,21 @@ func dealIndexByMonth() {
 	if now.Day() == lastDayOfMonth.Day() {
 		//当月最后一天,需要提前创建好索引
 		fmt.Println("当前时间是当前月份的最后一天")
-		url := GF.ES.URL
-		username := GF.ES.Username
-		password := GF.ES.Password
-
-		// 创建 Elasticsearch 客户端
-		client, err := elastic.NewClient(
-			elastic.SetURL(url),
-			elastic.SetBasicAuth(username, password),
-			elastic.SetSniff(false),
-		)
-		if err != nil {
-			log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
+		var clients = make(map[string]*elastic.Client, 0)
+		for k, v := range GF.ES {
+			url := v.URL
+			username := v.Username
+			password := v.Password
+			// 创建 Elasticsearch 客户端
+			client, err := elastic.NewClient(
+				elastic.SetURL(url),
+				elastic.SetBasicAuth(username, password),
+				elastic.SetSniff(false),
+			)
+			if err != nil {
+				log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
+			}
+			clients[k] = client
 		}
 
 		next := now.AddDate(0, 0, 1)
@@ -379,7 +394,7 @@ func dealIndexByMonth() {
 		PreBiddingIndex = "bidding_" + yearStr + monthStr
 
 		//2		创建下个月索引结构
-		err = createIndex(client, PreBiddingIndex)
+		err := createIndex(clients, PreBiddingIndex)
 		if err != nil {
 			log.Info("dealIndexByMonth", zap.Error(err))
 			SendMail("预处理索引", "预处理索引创建失败,请检查")
@@ -394,7 +409,7 @@ func dealIndexByMonth() {
 		yearStr2 := strconv.Itoa(year2)
 		//上个月的索引名称
 		lastIndex := "bidding_" + yearStr2 + monthStr2
-		err = deleteIndex(client, lastIndex)
+		err = deleteIndex(clients, lastIndex)
 		if err != nil {
 			log.Info("dealIndexByMonth", zap.Error(err))
 		}
@@ -402,77 +417,65 @@ func dealIndexByMonth() {
 
 }
 
-//SwitchAlias 切换别名
-func SwitchAlias() {
-	now := time.Now()
-	// 判断当前时间是否为当前月份的最后一天
-	//按日创建索引
-	if GF.Env.SpecType == "day" {
-		dealAlias()
-	} else if GF.Env.SpecType == "month" {
-		// 获取当前月份的最后一天
-		lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC)
-		//按月创建索引
-		if now.Day() == lastDayOfMonth.Day() {
-			dealAlias()
-		}
-	}
-}
-
-func dealAlias() {
-	now := time.Now()
-	url := GF.ES.URL
-	username := GF.ES.Username
-	password := GF.ES.Password
-
-	preBiddingIndex := "" //下一个索引名称
-	currIndex := ""       //当前索引名称
-	// 创建 Elasticsearch 客户端
-	client, err := elastic.NewClient(
-		elastic.SetURL(url),
-		elastic.SetBasicAuth(username, password),
-		elastic.SetSniff(false),
-	)
-	if err != nil {
-		log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
-	}
-	next := now.AddDate(0, 0, 1)
-	month := int(next.Month())
-	monthStr := strconv.Itoa(month)
-	year := next.Year()
-	yearStr := strconv.Itoa(year)
-	dayStr := strconv.Itoa(next.Day())
-	if GF.Env.SpecType == "month" {
-		//下一个月的索引名称
-		preBiddingIndex = "bidding_" + yearStr + monthStr
-	} else if GF.Env.SpecType == "day" {
-		//下一天的索引名称
-		preBiddingIndex = "bidding_" + yearStr + monthStr + dayStr
-	}
-
-	month2 := int(now.Month())
-	monthStr2 := strconv.Itoa(month2)
-	year2 := now.Year()
-	yearStr2 := strconv.Itoa(year2)
-	if GF.Env.SpecType == "month" {
-		//当前;的索引名称
-		currIndex = "bidding_" + yearStr2 + monthStr2
-	} else if GF.Env.SpecType == "day" {
-		//当前;的索引名称
-		currIndex = "bidding_" + yearStr2 + monthStr2 + strconv.Itoa(now.Day())
-	}
-
-	_, err = client.Alias().Add(preBiddingIndex, GF.Env.Alias).Do(context.Background())
-	if err != nil {
-		log.Error("添加别名失败:", zap.Error(err))
-		SendMail("添加别名失败", "添加别名失败,请检查")
-	}
-	_, err = client.Alias().Remove(currIndex, GF.Env.Alias).Do(context.Background())
-	if err != nil {
-		log.Error("删除别名失败:", zap.Error(err))
-		SendMail("删除别名失败", "删除别名失败,请检查")
-	}
-}
+////SwitchAlias 切换别名
+//func SwitchAlias() {
+//	now := time.Now()
+//	// 判断当前时间是否为当前月份的最后一天
+//	//按日创建索引
+//	if GF.Env.SpecType == "day" {
+//		dealAlias()
+//	} else if GF.Env.SpecType == "month" {
+//		// 获取当前月份的最后一天
+//		lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC)
+//		//按月创建索引
+//		if now.Day() == lastDayOfMonth.Day() {
+//			dealAlias()
+//		}
+//	}
+//}
+//
+////dealAlias 处理 索引别名
+//func dealAlias() {
+//	now := time.Now()
+//
+//	for _, v := range GF.ES {
+//		url := v.URL
+//		username := v.Username
+//		password := v.Password
+//		// 创建 Elasticsearch 客户端
+//		client, err := elastic.NewClient(
+//			elastic.SetURL(url),
+//			elastic.SetBasicAuth(username, password),
+//			elastic.SetSniff(false),
+//		)
+//		if err != nil {
+//			log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
+//		}
+//
+//		preBiddingIndex := "" //下一个索引名称
+//		next := now.AddDate(0, 0, 1)
+//		month := int(next.Month())
+//		monthStr := strconv.Itoa(month)
+//		year := next.Year()
+//		yearStr := strconv.Itoa(year)
+//		dayStr := strconv.Itoa(next.Day())
+//		if GF.Env.SpecType == "month" {
+//			//下一个月的索引名称
+//			preBiddingIndex = "bidding_" + yearStr + monthStr
+//		} else if GF.Env.SpecType == "day" {
+//			//下一天的索引名称
+//			preBiddingIndex = "bidding_" + yearStr + monthStr + dayStr
+//		}
+//
+//		for _, alias := range GF.Env.Alias {
+//			_, err = client.Alias().Add(preBiddingIndex, alias).Do(context.Background())
+//			if err != nil {
+//				log.Error("添加别名失败:", zap.Error(err))
+//			}
+//		}
+//		log.Info("dealAlias", zap.String(url, "索引别名处理完毕"))
+//	}
+//}
 
 //SendMail 发送邮件
 func SendMail(title, content string) {

+ 6 - 5
pre_extract/main.go

@@ -70,11 +70,11 @@ func main() {
 		}
 	}
 
-	//切换别名-定时任务
-	_, err := c.AddFunc(GF.Env.SwitchSpec, SwitchAlias)
-	if err != nil {
-		log.Info("main", zap.Any("AddFunc err", err))
-	}
+	////切换别名-定时任务
+	//_, err := c.AddFunc(GF.Env.SwitchSpec, SwitchAlias)
+	//if err != nil {
+	//	log.Info("main", zap.Any("AddFunc err", err))
+	//}
 
 	c.Start()
 	defer c.Stop()
@@ -84,6 +84,7 @@ func main() {
 		go SendPreData()
 	}
 
+	log.Info("main", zap.String("监听端口:", GF.Env.LocalPort))
 	select {}
 }
 

+ 18 - 0
pre_extract/pre_test.go

@@ -51,3 +51,21 @@ func TestCronExpression(t *testing.T) {
 
 	select {}
 }
+
+func TestDeleteIndex(t *testing.T) {
+
+	data := map[string]string{
+		"aa": "aaa",
+		"bb": "bbb",
+		"cc": "ccc",
+	}
+
+	for _, v := range data {
+		if v == "aaa" {
+			return
+		}
+
+		fmt.Println(v)
+	}
+
+}