package main import ( "context" "esindex/config" "fmt" es7 "github.com/olivere/elastic/v7" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb" "os" "strings" "time" ) var ( ProjectField = make(map[string]string, 500) //项目字段 ProjectListF = make(map[string]string, 200) BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段, BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段 PreProcessField = make(map[string]string, 500) //预处理流程 bidding字段 ) // InitLog @Description // @Author J 2022/7/26 15:30 func InitLog() { now := time.Now() logcfg := config.Conf.Log err := log.InitLog( log.Path(logcfg.LogPath), log.Level(logcfg.LogLevel), log.Compress(logcfg.Compress), log.MaxSize(logcfg.MaxSize), log.MaxBackups(logcfg.MaxBackups), log.MaxAge(logcfg.MaxAge), log.Format(logcfg.Format), ) if err != nil { fmt.Printf("InitLog failed: %v\n", err) os.Exit(1) } log.Info("InitLog", zap.Any("duration", time.Since(now).Seconds())) } func InitMgo() { now := time.Now() MgoB = &mongodb.MongodbSim{ MongodbAddr: config.Conf.DB.MongoB.Addr, DbName: config.Conf.DB.MongoB.Dbname, Size: config.Conf.DB.MongoB.Size, UserName: config.Conf.DB.MongoB.User, Password: config.Conf.DB.MongoB.Password, Direct: config.Conf.DB.MongoB.Direct, } MgoB.InitPool() if config.Conf.DB.MongoB.Addr == "" || config.Conf.DB.MongoB.Dbname == "" { log.Error("InitMgo", zap.String("MgoB", "地址或者数据库为空")) } if config.Conf.DB.MongoB.Coll == "" { log.Error("InitMgo", zap.String("MgoB", "查询表为空")) } log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds())) //项目信息 MgoP = &mongodb.MongodbSim{ MongodbAddr: config.Conf.DB.MongoP.Addr, DbName: config.Conf.DB.MongoP.Dbname, Size: config.Conf.DB.MongoP.Size, UserName: config.Conf.DB.MongoP.User, Password: config.Conf.DB.MongoP.Password, } MgoP.InitPool() if config.Conf.DB.MongoP.Addr == "" || config.Conf.DB.MongoP.Dbname == "" { log.Error("InitMgo", zap.String("MongoP", "地址或者数据库为空")) } if config.Conf.DB.MongoP.Coll == "" { log.Error("InitMgo", zap.String("MongoP", "查询表为空")) } log.Info("InitMgo", zap.Any("MgoP duration", time.Since(now).Seconds())) //中标单位定时同步 MgoQ = &mongodb.MongodbSim{ MongodbAddr: config.Conf.DB.MongoQ.Addr, DbName: config.Conf.DB.MongoQ.Dbname, Size: config.Conf.DB.MongoQ.Size, UserName: config.Conf.DB.MongoQ.User, Password: config.Conf.DB.MongoQ.Password, } MgoQ.InitPool() if config.Conf.DB.MongoQ.Addr == "" || config.Conf.DB.MongoQ.Dbname == "" { log.Error("InitMgo", zap.String("MongoQ", "地址或者数据库为空")) } log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds())) //181 特殊企业,采购单位验证 MgoS = &mongodb.MongodbSim{ MongodbAddr: config.Conf.DB.MongoS.Addr, DbName: config.Conf.DB.MongoS.Dbname, Size: config.Conf.DB.MongoS.Size, UserName: config.Conf.DB.MongoS.User, Password: config.Conf.DB.MongoS.Password, } MgoS.InitPool() if config.Conf.DB.MongoS.Addr == "" || config.Conf.DB.MongoS.Dbname == "" { log.Error("InitMgo", zap.String("MongoS", "地址或者数据库为空")) } log.Info("InitMgo", zap.Any("MgoS duration", time.Since(now).Seconds())) } func InitMysql() { //采购单位 now := time.Now() Mysql = &mysqldb.Mysql{ Address: config.Conf.DB.MysqlB.Addr, DBName: config.Conf.DB.MysqlB.Dbname, UserName: config.Conf.DB.MysqlB.Username, PassWord: config.Conf.DB.MysqlB.Password, } Mysql.Init() if config.Conf.DB.MysqlB.Addr == "" || config.Conf.DB.MysqlB.Dbname == "" { log.Error("InitMysql", zap.String("Mysql", "地址或者数据库为空")) } log.Info("InitMysql", zap.Any("MysqlB duration", time.Since(now).Seconds())) } func InitEs() { now := time.Now() Es = &elastic.Elastic{ S_esurl: config.Conf.DB.Es.Addr, I_size: config.Conf.DB.Es.Size, Username: config.Conf.DB.Es.Username, Password: config.Conf.DB.Es.Password, } Es.InitElasticSize() if config.Conf.DB.Es.Addr == "" { log.Error("InitEs", zap.String("ES", "地址或者数据库为空")) } if config.Conf.DB.Es.IndexB == "" { log.Error("InitEs", zap.String("IndexB", "indexb bidding 索引为空,请检查")) } else { log.Debug("InitEs", zap.String("IndexB", config.Conf.DB.Es.IndexB)) } if config.Conf.DB.Es.IndexP == "" { log.Error("InitEs", zap.String("IndexB", "projectset 项目索引为空,请检查")) } else { log.Debug("InitEs", zap.String("IndexP", config.Conf.DB.Es.IndexP)) } if config.Conf.DB.Es.IndexTmp == "" { log.Error("InitEs", zap.String("IndexTmp 为空", "请检查是否需要配置;该配置主要生产环境需要")) } if config.Conf.DB.Es.IndexWinner == "" { log.Error("InitEs", zap.String("IndexWinner", "中标单位 索引为空,请检查")) } else { log.Debug("InitEs", zap.String("IndexWinner", config.Conf.DB.Es.IndexWinner)) } if config.Conf.DB.Es.IndexBuyer == "" { log.Error("InitEs", zap.String("IndexBuyer", "采购单位 索引为空,请检查")) } else { log.Debug("InitEs", zap.String("IndexBuyer", config.Conf.DB.Es.IndexBuyer)) } //采集爬虫 单服务器部署的es Es1 = &elastic.Elastic{ S_esurl: config.Conf.DB.Es.AddrP, I_size: config.Conf.DB.Es.Size, Username: config.Conf.DB.Es.Username, Password: config.Conf.DB.Es.Password, } Es1.InitElasticSize() //华为云 部署的es if config.Conf.DB.Es.Addr2 != "" { Es2 = &elastic.Elastic{ S_esurl: config.Conf.DB.Es.Addr2, I_size: config.Conf.DB.Es.Size, Username: config.Conf.DB.Es.Username2, Password: config.Conf.DB.Es.Password2, } Es2.InitElasticSize() log.Info("InitEs", zap.String("华为云Addr2", config.Conf.DB.Es.Addr2)) } log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds())) } func InitField() { now := time.Now() info, _ := MgoB.Find("bidding_processing_field", `{"stype": "project"}`, nil, nil, false, -1, -1) if len(*info) > 0 { for _, m := range *info { if util.IntAll(m["level"]) == 1 { ProjectField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"]) } else if util.IntAll(m["level"]) == 2 { ProjectListF[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"]) } } } log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF))) log.Info("InitField", zap.Any("duration", time.Since(now).Seconds())) } //InitPreProcessField 预处理阶段字段 func InitPreProcessField() { now := time.Now() info, _ := MgoB.Find("bidding_processing_field", `{"stype": "pre_process"}`, nil, nil, false, -1, -1) if len(*info) > 0 { for _, m := range *info { if util.IntAll(m["level"]) == 1 { PreProcessField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"]) } } } log.Info("InitPreProcessField", zap.Int("PreProcessField", len(ProjectField))) log.Info("InitPreProcessField", zap.Any("duration", time.Since(now).Seconds())) } //InitPreEsClient 实例化预处理 索引客户端 func InitPreEsClient() { if len(config.Conf.Pre) > 0 { for k, v := range config.Conf.Pre { cli := &elastic.Elastic{ S_esurl: v.Addr, I_size: 30, Username: v.Username, Password: v.Password, } cli.InitElasticSize() PreEs[k] = cli } } } //InitEsBiddingField 初始化 bidding 索引字段 func InitEsBiddingField() { now := time.Now() info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1) if len(*info) > 0 { for _, m := range *info { if util.IntAll(m["level"]) == 1 { BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"]) } else if util.IntAll(m["level"]) == 2 { pfield := util.ObjToString(m["pfield"]) pfieldMap := BiddingLevelField[pfield] if pfieldMap == nil { pfieldMap = make(map[string]string, 0) } pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"]) BiddingLevelField[pfield] = pfieldMap } } } log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField))) log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField))) log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds())) } //verifyESFields 验证es 定义字段类型和 MongoDB 数据字段 func verifyESFields() { now := time.Now() log.Info("verifyESFields", zap.String("开始类型检测", "")) client, _ := es7.NewClient( es7.SetURL(config.Conf.DB.Es.Addr), es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password), es7.SetSniff(false), ) index := config.Conf.DB.Es.IndexB //索引表 bidding // 获取 Elasticsearch 索引的 mapping 信息 mapping, err := client.GetMapping().Index(index).Do(context.Background()) if err != nil { log.Error("verifyESFields", zap.Any("getting Elasticsearch mapping:", err)) } indexName, _ := GetIndexName(client, index) if indexName == "" || mapping == nil { log.Error("verifyESFields", zap.String("索引不存在,请检查索引", index)) os.Exit(-1) return } if mapping[indexName].(map[string]interface{})["mappings"] == nil || mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"] == nil { log.Error("verifyESFields", zap.String("索引不存在或状态不对,请检查索引", index)) os.Exit(-1) return } properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{}) var errField = make([]string, 0) var okField = make([]string, 0) var analyzerMap = make(map[string]string) // 分词信息 var esMap = make(map[string]string) //存储es 字段类型 // for field, ftype := range BiddingField { eftypeMap, _ := properties[field].(map[string]interface{}) var etype string var analyzer string if fftype, ok := eftypeMap["type"]; ok { etype = fftype.(string) esMap[field] = etype } if ffanalyzer, ok := eftypeMap["analyzer"]; ok { analyzer = ffanalyzer.(string) analyzerMap[field] = analyzer } if ftype != "" { if chargeType(ftype, etype) { okField = append(okField, field) } else { errField = append(errField, field) } } else { if field == "_id" { continue } else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" { if eproperties, ok := eftypeMap["properties"]; ok { if eproMap, ok := eproperties.(map[string]interface{}); ok { for k, v := range eproMap { if innerMap, ok := v.(map[string]interface{}); ok { if innerType, ok := innerMap["type"]; ok { innerLevel := BiddingLevelField[field] esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string) if chargeType(innerLevel[k], innerType.(string)) { okField = append(okField, fmt.Sprintf("%s.%s", field, k)) } else { errField = append(errField, fmt.Sprintf("%s.%s", field, k)) } } } } } } } } } if len(errField) > 0 { log.Error("verifyESFields", zap.Int("错误字段数量", len(errField))) for _, field := range errField { if strings.Contains(field, ".") { fe := strings.Split(field, ".") log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field])) } else { log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field])) } } os.Exit(-1) } else { log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField))) } log.Info("verifyESFields", zap.Any("duration", time.Since(now).Seconds())) } func GetIndexName(client *es7.Client, name string) (string, error) { // 判断 name 是否为一个别名 res, err := client.Aliases().Alias(name).Do(context.Background()) if err != nil { // 错误处理 if err.(*es7.Error).Status != 404 && err.(*es7.Error).Details != nil { return "", err } } if res != nil { for k, v := range res.Indices { for _, vv := range v.Aliases { if vv.AliasName == name { return k, nil } } } } // 判断 name 是否为一个正式索引名称 resa, err := client.IndexExists(name).Do(context.Background()) if err != nil { // 错误处理 return "", err } if resa { return name, nil } // 如果 name 既不是别名,也不是正式索引名称,则返回空字符串 return "", nil }