package main import ( "bytes" "context" "encoding/base64" "errors" "fmt" "github.com/olivere/elastic/v7" "go.uber.org/zap" "net/http" "strconv" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "time" ) var mapping = ` "mappings": { "dynamic": false, "properties": { "dataweight": { "type": "long" }, "projectcode": { "type": "keyword" }, "object_type": { "type": "keyword" }, "bidopentime": { "type": "long" }, "bidamount": { "type": "double" }, "winner": { "type": "keyword" }, "buyer": { "type": "keyword", "fields": { "mbuyer": { "analyzer": "my_ngram_title", "type": "text" } } }, "budget": { "type": "double" }, "projectname": { "type": "keyword", "fields": { "pname": { "analyzer": "my_ngram_title", "type": "text" } } }, "area": { "type": "keyword" }, "city": { "type": "keyword" }, "district": { "type": "keyword" }, "s_winner": { "analyzer": "douhao", "type": "text", "fields": { "mwinner": { "analyzer": "my_ngram_title", "type": "text" } } }, "pici": { "type": "long" }, "id": { "type": "keyword" }, "title": { "analyzer": "my_ngram_title", "type": "text", "fields": { "mtitle": { "type": "keyword" } } }, "detail": { "analyzer": "my_ngram", "type": "text" }, "site": { "type": "keyword" }, "comeintime": { "type": "long" }, "href": { "type": "keyword" }, "infoformat": { "type": "long" }, "publishtime": { "type": "long" }, "toptype": { "type": "keyword" }, "subtype": { "type": "keyword" }, "createtime": { "type": "long" } } }` //createIndex 创建索引 func createIndex(clients map[string]*elastic.Client, PreBiddingIndex string) error { //createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping) for k, client := range clients { url := GF.ES[k].URL username := GF.ES[k].Username password := GF.ES[k].Password exist, err := client.IndexExists(PreBiddingIndex).Do(context.Background()) if exist { log.Info("CreateIndex", zap.String(PreBiddingIndex, "已经存在了")) } setting := fmt.Sprintf(` "settings": { "index": { "analysis": { "analyzer": { "my_ngram_title": { "filter": [ "lowercase" ], "tokenizer": "my_ngram_title" }, "douhao": { "type": "pattern", "pattern": "," }, "my_ngram": { "filter": [ "lowercase" ], "tokenizer": "my_ngram" } }, "tokenizer": { "my_ngram_title": { "token_chars": [ "letter", "digit", "punctuation", "symbol" ], "min_gram": "1", "type": "nGram", "max_gram": "1" }, "my_ngram": { "token_chars": [ "letter", "digit", "punctuation", "symbol" ], "min_gram": "2", "type": "nGram", "max_gram": "2" } } }, "number_of_shards": "%s", "number_of_replicas": "0", "max_result_window": "20000" } }`, GF.ES[k].Shares) createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping) //1. 开启节点平衡 // 设置临时的节点平衡设置 balanceSettings := `{ "transient" : { "cluster.routing.allocation.enable" : "all" } }` requestURL := fmt.Sprintf("%s/_cluster/settings", url) req, err := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(balanceSettings))) if err != nil { log.Error("开启节点平衡", zap.Error(err)) } req.Header.Set("Content-Type", "application/json") // 添加身份验证头部 auth := username + ":" + password basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) req.Header.Set("Authorization", basicAuth) clientQ := &http.Client{} resp, err := clientQ.Do(req) if err != nil { log.Error("开启节点平衡", zap.Error(err)) return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Fatal("设置节点平衡失败") return err } fmt.Println(url, "节点平衡已开启") //2. 创建索引 createIndexR, err := client.CreateIndex(PreBiddingIndex).BodyString(createJson).Do(context.Background()) if err != nil { log.Error(PreBiddingIndex, zap.Error(err)) return err } if !createIndexR.Acknowledged { log.Error("CreateIndex", zap.String(PreBiddingIndex, "创建索引失败")) return err } //3. 关闭节点平衡 //设置临时的节点平衡设置 disableSettings := `{ "transient" : { "cluster.routing.allocation.enable" : "none" } }` req2, err2 := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(disableSettings))) if err2 != nil { log.Error("开启节点平衡", zap.Error(err)) } req2.Header.Set("Content-Type", "application/json") // 添加身份验证头部 req2.Header.Set("Authorization", basicAuth) //clientQ := &http.Client{} resp2, err2 := clientQ.Do(req) if err2 != nil { log.Error("关闭节点平衡", zap.Error(err)) } defer resp2.Body.Close() fmt.Println(url, "节点平衡已关闭") //4. 新索引 添加别名 for _, alias := range GF.Env.Alias { _, err = client.Alias().Add(PreBiddingIndex, alias).Do(context.Background()) if err != nil { log.Error("添加别名失败:", zap.Error(err)) } } log.Info("CreateIndex", zap.String(url, "索引别名 添加完毕")) } return nil } //deleteIndex 删除索引 func deleteIndex(clients map[string]*elastic.Client, index string) error { if len(clients) == 0 { return errors.New("没有 配置 ES集群信息") } for k, client := range clients { exist, err := client.IndexExists(index).Do(context.Background()) if !exist { log.Info("deleteIndex", zap.String(k, index+" 索引文件不存在")) } if err != nil { return err } num, err := client.Count(index).Do(context.Background()) if err != nil { return err } if num > 0 { log.Info("deleteIndex", zap.String(k, "索引"+index+"还存在有效数据")) } _, err = client.DeleteIndex(index).Do(context.Background()) if err != nil { return err } } return nil } //dealIndexByDay 处理预处理索引,根据天; func dealIndexByDay() { now := time.Now() PreBiddingIndex := "" //hour := now.Hour() // 判断当前时间是否时最后一个小时 //if hour == 23 { //当天最后一小时 var clients = make(map[string]*elastic.Client, 0) for k, v := range GF.ES { url := v.URL username := v.Username password := v.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) } clients[k] = client } next := now.AddDate(0, 0, 1) month := int(next.Month()) monthStr := strconv.Itoa(month) year := next.Year() yearStr := strconv.Itoa(year) dayStr := strconv.Itoa(next.Day()) //下一天的索引名称 PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr err := createIndex(clients, PreBiddingIndex) if err != nil { log.Info("dealIndexByDay", zap.Error(err)) SendMail("预处理索引", "预处理索引创建失败,请检查") } log.Info("dealIndexByDay", zap.String(PreBiddingIndex, "创建成功")) //3. 删除昨天的索引 last := now.AddDate(0, 0, -1) month2 := int(last.Month()) monthStr2 := strconv.Itoa(month2) year2 := last.Year() yearStr2 := strconv.Itoa(year2) dayStr2 := strconv.Itoa(last.Day()) //索引名称 lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2 err = deleteIndex(clients, lastIndex) if err != nil { log.Info("dealIndexByDay", zap.Error(err)) } //} } //dealIndexByMonth 处理预处理索引,根据月份;提前一天创建好 下个月的索引 func dealIndexByMonth() { now := time.Now() PreBiddingIndex := "" // 获取当前月份的最后一天 lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC) // 判断当前时间是否为当前月份的最后一天 if now.Day() == lastDayOfMonth.Day() { //当月最后一天,需要提前创建好索引 fmt.Println("当前时间是当前月份的最后一天") var clients = make(map[string]*elastic.Client, 0) for k, v := range GF.ES { url := v.URL username := v.Username password := v.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) } clients[k] = client } next := now.AddDate(0, 0, 1) month := int(next.Month()) monthStr := strconv.Itoa(month) year := next.Year() yearStr := strconv.Itoa(year) //下一个月的索引名称 PreBiddingIndex = "bidding_" + yearStr + monthStr //2 创建下个月索引结构 err := createIndex(clients, PreBiddingIndex) if err != nil { log.Info("dealIndexByMonth", zap.Error(err)) SendMail("预处理索引", "预处理索引创建失败,请检查") } log.Info("dealIndexByMonth", zap.String(PreBiddingIndex, "创建成功")) //3. 删除上个月的索引 last := now.AddDate(0, -1, 1) month2 := int(last.Month()) monthStr2 := strconv.Itoa(month2) year2 := last.Year() yearStr2 := strconv.Itoa(year2) //上个月的索引名称 lastIndex := "bidding_" + yearStr2 + monthStr2 err = deleteIndex(clients, lastIndex) if err != nil { log.Info("dealIndexByMonth", zap.Error(err)) } } } ////SwitchAlias 切换别名 //func SwitchAlias() { // now := time.Now() // // 判断当前时间是否为当前月份的最后一天 // //按日创建索引 // if GF.Env.SpecType == "day" { // dealAlias() // } else if GF.Env.SpecType == "month" { // // 获取当前月份的最后一天 // lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC) // //按月创建索引 // if now.Day() == lastDayOfMonth.Day() { // dealAlias() // } // } //} // ////dealAlias 处理 索引别名 //func dealAlias() { // now := time.Now() // // for _, v := range GF.ES { // url := v.URL // username := v.Username // password := v.Password // // 创建 Elasticsearch 客户端 // client, err := elastic.NewClient( // elastic.SetURL(url), // elastic.SetBasicAuth(username, password), // elastic.SetSniff(false), // ) // if err != nil { // log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) // } // // preBiddingIndex := "" //下一个索引名称 // next := now.AddDate(0, 0, 1) // month := int(next.Month()) // monthStr := strconv.Itoa(month) // year := next.Year() // yearStr := strconv.Itoa(year) // dayStr := strconv.Itoa(next.Day()) // if GF.Env.SpecType == "month" { // //下一个月的索引名称 // preBiddingIndex = "bidding_" + yearStr + monthStr // } else if GF.Env.SpecType == "day" { // //下一天的索引名称 // preBiddingIndex = "bidding_" + yearStr + monthStr + dayStr // } // // for _, alias := range GF.Env.Alias { // _, err = client.Alias().Add(preBiddingIndex, alias).Do(context.Background()) // if err != nil { // log.Error("添加别名失败:", zap.Error(err)) // } // } // log.Info("dealAlias", zap.String(url, "索引别名处理完毕")) // } //} //SendMail 发送邮件 func SendMail(title, content string) { url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", GF.Email.Api, GF.Email.To, title, content) fmt.Println("url=>", url) res, err := http.Get(url) if err != nil { log.Info("SendMail", zap.Any("err", err)) } else { log.Info("SendMail", zap.Any("res", res)) } }