package main import ( "bytes" "context" "encoding/base64" "errors" "fmt" "github.com/olivere/elastic/v7" "go.uber.org/zap" "net/http" "strconv" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "time" ) var mapping = ` "mappings": { "dynamic": false, "properties": { "dataweight": { "type": "long" }, "projectcode": { "type": "keyword" }, "object_type": { "type": "keyword" }, "bidopentime": { "type": "long" }, "bidamount": { "type": "double" }, "winner": { "type": "keyword" }, "buyer": { "type": "keyword", "fields": { "mbuyer": { "analyzer": "my_ngram_title", "type": "text" } } }, "budget": { "type": "double" }, "projectname": { "type": "keyword", "fields": { "pname": { "analyzer": "my_ngram_title", "type": "text" } } }, "area": { "type": "keyword" }, "city": { "type": "keyword" }, "district": { "type": "keyword" }, "s_winner": { "analyzer": "douhao", "type": "text", "fields": { "mwinner": { "analyzer": "my_ngram_title", "type": "text" } } }, "pici": { "type": "long" }, "id": { "type": "keyword" }, "title": { "analyzer": "my_ngram_title", "type": "text", "fields": { "mtitle": { "type": "keyword" } } }, "detail": { "analyzer": "my_ngram", "type": "text" }, "site": { "type": "keyword" }, "comeintime": { "type": "long" }, "href": { "type": "keyword" }, "infoformat": { "type": "long" }, "publishtime": { "type": "long" }, "toptype": { "type": "keyword" }, "subtype": { "type": "keyword" }, "createtime": { "type": "long" } } }` //CreateIndex 创建索引 func CreateIndex(clients map[string]*elastic.Client, PreBiddingIndex string) error { //createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping) for k, client := range clients { url := GF.ES[k].URL username := GF.ES[k].Username password := GF.ES[k].Password exist, err := client.IndexExists(PreBiddingIndex).Do(context.Background()) if exist { log.Info("CreateIndex", zap.String(PreBiddingIndex, "已经存在了")) } setting := fmt.Sprintf(` "settings": { "index": { "analysis": { "analyzer": { "my_ngram_title": { "filter": [ "lowercase" ], "tokenizer": "my_ngram_title" }, "douhao": { "type": "pattern", "pattern": "," }, "my_ngram": { "filter": [ "lowercase" ], "tokenizer": "my_ngram" } }, "tokenizer": { "my_ngram_title": { "token_chars": [ "letter", "digit", "punctuation", "symbol" ], "min_gram": "1", "type": "nGram", "max_gram": "1" }, "my_ngram": { "token_chars": [ "letter", "digit", "punctuation", "symbol" ], "min_gram": "2", "type": "nGram", "max_gram": "2" } } }, "number_of_shards": "%s", "number_of_replicas": "0", "max_result_window": "20000" } }`, GF.ES[k].Shares) createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping) //1. 开启节点平衡 // 设置临时的节点平衡设置 balanceSettings := `{ "transient" : { "cluster.routing.allocation.enable" : "all" } }` requestURL := fmt.Sprintf("%s/_cluster/settings", url) req, err := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(balanceSettings))) if err != nil { log.Error("开启节点平衡", zap.Error(err)) } req.Header.Set("Content-Type", "application/json") // 添加身份验证头部 auth := username + ":" + password basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) req.Header.Set("Authorization", basicAuth) clientQ := &http.Client{} resp, err := clientQ.Do(req) if err != nil { log.Error("开启节点平衡", zap.Error(err)) return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Fatal("设置节点平衡失败") return err } fmt.Println(url, "节点平衡已开启") //2. 创建索引 createIndexR, err := client.CreateIndex(PreBiddingIndex).BodyString(createJson).Do(context.Background()) if err != nil { log.Error(PreBiddingIndex, zap.Error(err)) return err } if !createIndexR.Acknowledged { log.Error("CreateIndex", zap.String(PreBiddingIndex, "创建索引失败")) return err } //3. 关闭节点平衡 //设置临时的节点平衡设置 disableSettings := `{ "transient" : { "cluster.routing.allocation.enable" : "none" } }` req2, err2 := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(disableSettings))) if err2 != nil { log.Error("开启节点平衡", zap.Error(err)) } req2.Header.Set("Content-Type", "application/json") // 添加身份验证头部 req2.Header.Set("Authorization", basicAuth) //clientQ := &http.Client{} resp2, err2 := clientQ.Do(req) if err2 != nil { log.Error("关闭节点平衡", zap.Error(err)) } defer resp2.Body.Close() fmt.Println(url, "节点平衡已关闭") //4. 新索引 添加别名 for _, alias := range GF.Env.Alias { _, err = client.Alias().Add(PreBiddingIndex, alias).Do(context.Background()) if err != nil { log.Error("添加别名失败:", zap.Error(err)) } } log.Info("CreateIndex", zap.String(url, "索引别名 添加完毕")) } return nil } //deleteIndex 删除索引 func deleteIndex(clients map[string]*elastic.Client, index string) error { if len(clients) == 0 { return errors.New("没有 配置 ES集群信息") } for k, client := range clients { exist, err := client.IndexExists(index).Do(context.Background()) if !exist { log.Info("deleteIndex", zap.String(k, index+" 索引文件不存在")) } if err != nil { return err } num, err := client.Count(index).Do(context.Background()) if err != nil { return err } if num > 0 { log.Info("deleteIndex", zap.String(k, "索引"+index+"还存在有效数据")) } _, err = client.DeleteIndex(index).Do(context.Background()) if err != nil { return err } } return nil } //dealIndexByHour 处理预处理索引,根据小时; func dealIndexByHour() { now := time.Now() PreBiddingIndex := "" var clients = make(map[string]*elastic.Client, 0) for k, v := range GF.ES { url := v.URL username := v.Username password := v.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) } clients[k] = client } next := now.Add(time.Hour) month := int(next.Month()) monthStr := strconv.Itoa(month) year := next.Year() yearStr := strconv.Itoa(year) dayStr := strconv.Itoa(next.Day()) hour := next.Hour() hourStr := strconv.Itoa(hour) //下一天的索引名称 PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr + hourStr err := CreateIndex(clients, PreBiddingIndex) if err != nil { log.Info("dealIndexByHour", zap.Error(err)) SendMail("预处理索引", "预处理索引创建失败,请检查") } log.Info("dealIndexByHour", zap.String(PreBiddingIndex, "创建成功")) //3. 删除昨天的索引 last := now.Add(-time.Hour) month2 := int(last.Month()) monthStr2 := strconv.Itoa(month2) year2 := last.Year() yearStr2 := strconv.Itoa(year2) dayStr2 := strconv.Itoa(last.Day()) hour2 := last.Hour() hourStr2 := strconv.Itoa(hour2) //索引名称 lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2 + hourStr2 err = deleteIndex(clients, lastIndex) if err != nil { log.Info("dealIndexByHour", zap.Error(err)) } //4. 删除bidding_extract 过期数据 where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lte": last.Unix(), }, "is_pre": 1, } MgoB.Delete("bidding_extract", where) } //dealIndexByDay 处理预处理索引,根据天; func dealIndexByDay() { now := time.Now() PreBiddingIndex := "" //hour := now.Hour() // 判断当前时间是否时最后一个小时 //if hour == 23 { //当天最后一小时 var clients = make(map[string]*elastic.Client, 0) for k, v := range GF.ES { url := v.URL username := v.Username password := v.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) } clients[k] = client } next := now.AddDate(0, 0, 1) month := int(next.Month()) monthStr := strconv.Itoa(month) year := next.Year() yearStr := strconv.Itoa(year) dayStr := strconv.Itoa(next.Day()) //下一天的索引名称 PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr err := CreateIndex(clients, PreBiddingIndex) if err != nil { log.Info("dealIndexByDay", zap.Error(err)) SendMail("预处理索引", "预处理索引创建失败,请检查") } log.Info("dealIndexByDay", zap.String(PreBiddingIndex, "创建成功")) //3. 删除昨天的索引 last := now.AddDate(0, 0, -1) month2 := int(last.Month()) monthStr2 := strconv.Itoa(month2) year2 := last.Year() yearStr2 := strconv.Itoa(year2) dayStr2 := strconv.Itoa(last.Day()) //索引名称 lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2 err = deleteIndex(clients, lastIndex) if err != nil { log.Info("dealIndexByDay", zap.Error(err)) } //4. 删除bidding_extract 过期数据 where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lte": last.Unix(), }, "is_pre": 1, } MgoB.Delete("bidding_extract", where) } //dealIndexByMonth 处理预处理索引,根据月份;提前一天创建好 下个月的索引 func dealIndexByMonth() { now := time.Now() PreBiddingIndex := "" // 获取当前月份的最后一天 lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC) // 判断当前时间是否为当前月份的最后一天 if now.Day() == lastDayOfMonth.Day() { //当月最后一天,需要提前创建好索引 fmt.Println("当前时间是当前月份的最后一天") var clients = make(map[string]*elastic.Client, 0) for k, v := range GF.ES { url := v.URL username := v.Username password := v.Password // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err)) } clients[k] = client } next := now.AddDate(0, 0, 1) month := int(next.Month()) monthStr := strconv.Itoa(month) year := next.Year() yearStr := strconv.Itoa(year) //下一个月的索引名称 PreBiddingIndex = "bidding_" + yearStr + monthStr //2 创建下个月索引结构 err := CreateIndex(clients, PreBiddingIndex) if err != nil { log.Info("dealIndexByMonth", zap.Error(err)) SendMail("预处理索引", "预处理索引创建失败,请检查") } log.Info("dealIndexByMonth", zap.String(PreBiddingIndex, "创建成功")) //3. 删除上个月的索引 last := now.AddDate(0, -1, 1) month2 := int(last.Month()) monthStr2 := strconv.Itoa(month2) year2 := last.Year() yearStr2 := strconv.Itoa(year2) //上个月的索引名称 lastIndex := "bidding_" + yearStr2 + monthStr2 err = deleteIndex(clients, lastIndex) if err != nil { log.Info("dealIndexByMonth", zap.Error(err)) } //4. 删除bidding_extract 过期数据 where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lte": last.Unix(), }, "is_pre": 1, } MgoB.Delete("bidding_extract", where) } } //SendMail 发送邮件 func SendMail(title, content string) { url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", GF.Email.Api, GF.Email.To, title, content) fmt.Println("url=>", url) res, err := http.Get(url) if err != nil { log.Info("SendMail", zap.Any("err", err)) } else { log.Info("SendMail", zap.Any("res", res)) } }