package main import ( "bytes" "context" "encoding/base64" "errors" "fmt" "github.com/olivere/elastic/v7" "go.uber.org/zap" "net/http" "strconv" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "time" ) var setting = fmt.Sprintf(` "settings": { "index": { "analysis": { "analyzer": { "my_ngram_title": { "filter": [ "lowercase" ], "tokenizer": "my_ngram_title" }, "douhao": { "type": "pattern", "pattern": "," }, "my_ngram": { "filter": [ "lowercase" ], "tokenizer": "my_ngram" } }, "tokenizer": { "my_ngram_title": { "token_chars": [ "letter", "digit", "punctuation", "symbol" ], "min_gram": "1", "type": "nGram", "max_gram": "1" }, "my_ngram": { "token_chars": [ "letter", "digit", "punctuation", "symbol" ], "min_gram": "2", "type": "nGram", "max_gram": "2" } } }, "number_of_shards": "%s", "number_of_replicas": "0", "max_result_window": "20000" } }`, "2") var mapping = ` "mappings": { "dynamic": false, "properties": { "dataweight": { "type": "long" }, "projectcode": { "type": "keyword" }, "object_type": { "type": "keyword" }, "bidopentime": { "type": "long" }, "bidamount": { "type": "double" }, "winner": { "type": "keyword" }, "buyer": { "type": "keyword", "fields": { "mbuyer": { "analyzer": "my_ngram_title", "type": "text" } } }, "budget": { "type": "double" }, "projectname": { "type": "keyword", "fields": { "pname": { "analyzer": "my_ngram_title", "type": "text" } } }, "area": { "type": "keyword" }, "city": { "type": "keyword" }, "district": { "type": "keyword" }, "s_winner": { "analyzer": "douhao", "type": "text", "fields": { "mwinner": { "analyzer": "my_ngram_title", "type": "text" } } }, "pici": { "type": "long" }, "id": { "type": "keyword" }, "title": { "analyzer": "my_ngram_title", "type": "text", "fields": { "mtitle": { "type": "keyword" } } }, "detail": { "analyzer": "my_ngram", "type": "text" }, "site": { "type": "keyword" }, "comeintime": { "type": "long" }, "href": { "type": "keyword" }, "infoformat": { "type": "long" }, "publishtime": { "type": "long" }, "toptype": { "type": "keyword" }, "subtype": { "type": "keyword" }, "createtime": { "type": "long" } } }` //createIndex 创建索引 func createIndex(client *elastic.Client, PreBiddingIndex string) error { createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping) //fmt.Println(createJson) //month := int(time.Now().Month()) //monthStr := strconv.Itoa(month) //year := time.Now().Year() //yearStr := strconv.Itoa(year) ////预处理索引名称 //PreBiddingIndex := "bidding_" + yearStr + monthStr url := GF.ES.URL username := GF.ES.Username password := GF.ES.Password // 创建 Elasticsearch 客户端 //client, err := elastic.NewClient( // elastic.SetURL(url), // elastic.SetBasicAuth(username, password), // elastic.SetSniff(false), //) //if err != nil { // log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) //} exist, err := client.IndexExists(PreBiddingIndex).Do(context.Background()) if exist { log.Info("CreateIndex", zap.String(PreBiddingIndex, "已经存在了")) return err } //1. 开启节点平衡 // 设置临时的节点平衡设置 balanceSettings := `{ "transient" : { "cluster.routing.allocation.enable" : "all" } }` requestURL := fmt.Sprintf("%s/_cluster/settings", url) req, err := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(balanceSettings))) if err != nil { log.Error("开启节点平衡", zap.Error(err)) } req.Header.Set("Content-Type", "application/json") // 添加身份验证头部 auth := username + ":" + password basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) req.Header.Set("Authorization", basicAuth) clientQ := &http.Client{} resp, err := clientQ.Do(req) if err != nil { log.Error("开启节点平衡", zap.Error(err)) return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Fatal("设置节点平衡失败") return err } fmt.Println("节点平衡已开启") createIndexR, err := client.CreateIndex(PreBiddingIndex).BodyString(createJson).Do(context.Background()) if err != nil { log.Error(PreBiddingIndex, zap.Error(err)) return err } if !createIndexR.Acknowledged { log.Error("CreateIndex", zap.String(PreBiddingIndex, "创建索引失败")) return err } defer func() { //3. 关闭节点平衡 //设置临时的节点平衡设置 disableSettings := `{ "transient" : { "cluster.routing.allocation.enable" : "none" } }` req2, err2 := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(disableSettings))) if err2 != nil { log.Error("开启节点平衡", zap.Error(err)) } req2.Header.Set("Content-Type", "application/json") // 添加身份验证头部 req2.Header.Set("Authorization", basicAuth) //clientQ := &http.Client{} resp2, err2 := clientQ.Do(req) if err2 != nil { log.Error("关闭节点平衡", zap.Error(err)) } defer resp2.Body.Close() fmt.Println("节点平衡已关闭") }() return nil } //deleteIndex 删除索引 func deleteIndex(client *elastic.Client, index string) error { exist, err := client.IndexExists(index).Do(context.Background()) if !exist { return errors.New("索引" + index + "不存在") } if err != nil { return err } _, err = client.DeleteIndex(index).Do(context.Background()) if err != nil { return err } return nil } //dealIndexByDay 处理预处理索引,根据天; func dealIndexByDay() { now := time.Now() PreBiddingIndex := "" hour := now.Hour() // 判断当前时间是否时最后一个小时 if hour == 23 { //当天最后一小时 url := GF.ES.URL username := GF.ES.Username password := GF.ES.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) } next := now.AddDate(0, 0, 1) month := int(next.Month()) monthStr := strconv.Itoa(month) year := next.Year() yearStr := strconv.Itoa(year) dayStr := strconv.Itoa(next.Day()) //下一天的索引名称 PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr err = createIndex(client, PreBiddingIndex) if err != nil { log.Info("dealIndexByDay", zap.Error(err)) SendMail("预处理索引", "预处理索引创建失败,请检查") } log.Info("dealIndexByDay", zap.String(PreBiddingIndex, "创建成功")) //3. 删除昨天的索引 last := now.AddDate(0, 0, -1) month2 := int(last.Month()) monthStr2 := strconv.Itoa(month2) year2 := last.Year() yearStr2 := strconv.Itoa(year2) dayStr2 := strconv.Itoa(last.Day()) //索引名称 lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2 err = deleteIndex(client, lastIndex) if err != nil { log.Info("dealIndexByDay", zap.Error(err)) } } } //dealIndexByMonth 处理预处理索引,根据月份;提前一天创建好 下个月的索引 func dealIndexByMonth() { now := time.Now() PreBiddingIndex := "" // 获取当前月份的最后一天 lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC) // 判断当前时间是否为当前月份的最后一天 if now.Day() == lastDayOfMonth.Day() { //当月最后一天,需要提前创建好索引 fmt.Println("当前时间是当前月份的最后一天") url := GF.ES.URL username := GF.ES.Username password := GF.ES.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) } next := now.AddDate(0, 0, 1) month := int(next.Month()) monthStr := strconv.Itoa(month) year := next.Year() yearStr := strconv.Itoa(year) //下一个月的索引名称 PreBiddingIndex = "bidding_" + yearStr + monthStr //2 创建下个月索引结构 err = createIndex(client, PreBiddingIndex) if err != nil { log.Info("dealIndexByMonth", zap.Error(err)) SendMail("预处理索引", "预处理索引创建失败,请检查") } log.Info("dealIndexByMonth", zap.String(PreBiddingIndex, "创建成功")) //3. 删除上个月的索引 last := now.AddDate(0, -1, 1) month2 := int(last.Month()) monthStr2 := strconv.Itoa(month2) year2 := last.Year() yearStr2 := strconv.Itoa(year2) //上个月的索引名称 lastIndex := "bidding_" + yearStr2 + monthStr2 err = deleteIndex(client, lastIndex) if err != nil { log.Info("dealIndexByMonth", zap.Error(err)) } } } //SendMail 发送邮件 func SendMail(title, content string) { url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", GF.Email.Api, GF.Email.To, title, content) fmt.Println("url=>", url) res, err := http.Get(url) if err != nil { log.Info("SendMail", zap.Any("err", err)) } else { log.Info("SendMail", zap.Any("res", res)) } }