123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- package main
- import (
- "context"
- "esindex/config"
- "flag"
- "fmt"
- "github.com/RoaringBitmap/roaring"
- es7 "github.com/olivere/elastic/v7"
- "go.uber.org/zap"
- "io/ioutil"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
- "os"
- "strings"
- "sync"
- "time"
- )
- var (
- ProjectField = make(map[string]string, 500) //项目字段
- ProjectListF = make(map[string]string, 200)
- BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
- BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
- PreProcessField = make(map[string]string, 500) //预处理流程 bidding字段
- dbfile = flag.String("dbfile", "./db", "数据库文件")
- cache = roaring.NewBitmap()
- cacheModify = false //控制10秒 定时写入文件
- mutex sync.Mutex // 互斥锁,用于保护 cache 的并发写入操作
- )
- // InitLog @Description
- // @Author J 2022/7/26 15:30
- func InitLog() {
- now := time.Now()
- logcfg := config.Conf.Log
- err := log.InitLog(
- log.Path(logcfg.LogPath),
- log.Level(logcfg.LogLevel),
- log.Compress(logcfg.Compress),
- log.MaxSize(logcfg.MaxSize),
- log.MaxBackups(logcfg.MaxBackups),
- log.MaxAge(logcfg.MaxAge),
- log.Format(logcfg.Format),
- )
- if err != nil {
- fmt.Printf("InitLog failed: %v\n", err)
- os.Exit(1)
- }
- log.Info("InitLog", zap.Any("duration", time.Since(now).Seconds()))
- }
- func InitMgo() {
- now := time.Now()
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: config.Conf.DB.MongoB.Addr,
- DbName: config.Conf.DB.MongoB.Dbname,
- Size: config.Conf.DB.MongoB.Size,
- UserName: config.Conf.DB.MongoB.User,
- Password: config.Conf.DB.MongoB.Password,
- Direct: config.Conf.DB.MongoB.Direct,
- }
- MgoB.InitPool()
- if config.Conf.DB.MongoB.Addr == "" || config.Conf.DB.MongoB.Dbname == "" {
- log.Error("InitMgo", zap.String("MgoB", "地址或者数据库为空"))
- }
- if config.Conf.DB.MongoB.Coll == "" {
- log.Error("InitMgo", zap.String("MgoB", "查询表为空"))
- }
- log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds()))
- //项目信息
- MgoP = &mongodb.MongodbSim{
- MongodbAddr: config.Conf.DB.MongoP.Addr,
- DbName: config.Conf.DB.MongoP.Dbname,
- Size: config.Conf.DB.MongoP.Size,
- UserName: config.Conf.DB.MongoP.User,
- Password: config.Conf.DB.MongoP.Password,
- Direct: config.Conf.DB.MongoP.Direct,
- }
- MgoP.InitPool()
- if config.Conf.DB.MongoP.Addr == "" || config.Conf.DB.MongoP.Dbname == "" {
- log.Error("InitMgo", zap.String("MongoP", "地址或者数据库为空"))
- }
- if config.Conf.DB.MongoP.Coll == "" {
- log.Error("InitMgo", zap.String("MongoP", "查询表为空"))
- }
- log.Info("InitMgo", zap.Any("MgoP duration", time.Since(now).Seconds()))
- //中标单位定时同步
- MgoQ = &mongodb.MongodbSim{
- MongodbAddr: config.Conf.DB.MongoQ.Addr,
- DbName: config.Conf.DB.MongoQ.Dbname,
- Size: config.Conf.DB.MongoQ.Size,
- UserName: config.Conf.DB.MongoQ.User,
- Password: config.Conf.DB.MongoQ.Password,
- Direct: config.Conf.DB.MongoQ.Direct,
- }
- MgoQ.InitPool()
- if config.Conf.DB.MongoQ.Addr == "" || config.Conf.DB.MongoQ.Dbname == "" {
- log.Error("InitMgo", zap.String("MongoQ", "地址或者数据库为空"))
- }
- log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds()))
- //181 特殊企业,采购单位验证
- MgoS = &mongodb.MongodbSim{
- MongodbAddr: config.Conf.DB.MongoS.Addr,
- DbName: config.Conf.DB.MongoS.Dbname,
- Size: config.Conf.DB.MongoS.Size,
- UserName: config.Conf.DB.MongoS.User,
- Password: config.Conf.DB.MongoS.Password,
- Direct: config.Conf.DB.MongoS.Direct,
- }
- MgoS.InitPool()
- if config.Conf.DB.MongoS.Addr == "" || config.Conf.DB.MongoS.Dbname == "" {
- log.Error("InitMgo", zap.String("MongoS", "地址或者数据库为空"))
- }
- log.Info("InitMgo", zap.Any("MgoS duration", time.Since(now).Seconds()))
- }
- func InitMysql() {
- //采购单位
- now := time.Now()
- Mysql = &mysqldb.Mysql{
- Address: config.Conf.DB.MysqlB.Addr,
- DBName: config.Conf.DB.MysqlB.Dbname,
- UserName: config.Conf.DB.MysqlB.Username,
- PassWord: config.Conf.DB.MysqlB.Password,
- }
- Mysql.Init()
- if config.Conf.DB.MysqlB.Addr == "" || config.Conf.DB.MysqlB.Dbname == "" {
- log.Error("InitMysql", zap.String("Mysql", "地址或者数据库为空"))
- }
- log.Info("InitMysql", zap.Any("MysqlB duration", time.Since(now).Seconds()))
- }
- func InitEs() {
- now := time.Now()
- Es = &elastic.Elastic{
- S_esurl: config.Conf.DB.Es.Addr,
- I_size: config.Conf.DB.Es.Size,
- Username: config.Conf.DB.Es.Username,
- Password: config.Conf.DB.Es.Password,
- }
- Es.InitElasticSize()
- log.Info("InitEs", zap.String("阿里云", config.Conf.DB.Es.Addr))
- if config.Conf.DB.Es.Addr == "" {
- log.Error("InitEs", zap.String("ES", "地址或者数据库为空"))
- }
- if config.Conf.DB.Es.IndexB == "" {
- log.Error("InitEs", zap.String("IndexB", "indexb bidding 索引为空,请检查"))
- } else {
- log.Debug("InitEs", zap.String("IndexB", config.Conf.DB.Es.IndexB))
- }
- if config.Conf.DB.Es.IndexP == "" {
- log.Error("InitEs", zap.String("IndexB", "projectset 项目索引为空,请检查"))
- } else {
- log.Debug("InitEs", zap.String("IndexP", config.Conf.DB.Es.IndexP))
- }
- //if config.Conf.DB.Es.IndexTmp == "" {
- // log.Error("InitEs", zap.String("IndexTmp 为空", "请检查是否需要配置;该配置主要生产环境需要"))
- //}
- if config.Conf.DB.Es.IndexWinner == "" {
- log.Error("InitEs", zap.String("IndexWinner", "中标单位 索引为空,请检查"))
- } else {
- log.Debug("InitEs", zap.String("IndexWinner", config.Conf.DB.Es.IndexWinner))
- }
- if config.Conf.DB.Es.IndexBuyer == "" {
- log.Error("InitEs", zap.String("IndexBuyer", "采购单位 索引为空,请检查"))
- } else {
- log.Debug("InitEs", zap.String("IndexBuyer", config.Conf.DB.Es.IndexBuyer))
- }
- ////采集爬虫 单服务器部署的es;目前已使用华为云
- //Es1 = &elastic.Elastic{
- // S_esurl: config.Conf.DB.Es.AddrP,
- // I_size: config.Conf.DB.Es.Size,
- // Username: config.Conf.DB.Es.Username,
- // Password: config.Conf.DB.Es.Password,
- //}
- //Es1.InitElasticSize()
- //华为云 部署的es
- if config.Conf.DB.Es.Addr2 != "" {
- Es2 = &elastic.Elastic{
- S_esurl: config.Conf.DB.Es.Addr2,
- I_size: config.Conf.DB.Es.Size,
- Username: config.Conf.DB.Es.Username2,
- Password: config.Conf.DB.Es.Password2,
- }
- Es2.InitElasticSize()
- log.Info("InitEs", zap.String("华为云Addr2", config.Conf.DB.Es.Addr2))
- }
- // 华为云新集群,迁移原来阿里云集群数据,标讯、项目、凭安
- if config.Conf.DB.Es.Addr3 != "" {
- Es3 = &elastic.Elastic{
- S_esurl: config.Conf.DB.Es.Addr3,
- I_size: config.Conf.DB.Es.Size,
- Username: config.Conf.DB.Es.Username3,
- Password: config.Conf.DB.Es.Password3,
- }
- Es3.InitElasticSize()
- log.Info("InitEs", zap.String("华为云Addr3", config.Conf.DB.Es.Addr3))
- }
- log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
- }
- func InitField() {
- now := time.Now()
- info, _ := MgoB.Find("bidding_processing_field", `{"stype": "project"}`, nil, nil, false, -1, -1)
- if len(*info) > 0 {
- for _, m := range *info {
- if util.IntAll(m["level"]) == 1 {
- ProjectField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
- } else if util.IntAll(m["level"]) == 2 {
- ProjectListF[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
- }
- }
- }
- log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
- log.Info("InitField", zap.Any("duration", time.Since(now).Seconds()))
- }
- // InitPreProcessField 预处理阶段字段
- func InitPreProcessField() {
- now := time.Now()
- info, _ := MgoB.Find("bidding_processing_field", `{"stype": "pre_process"}`, nil, nil, false, -1, -1)
- if len(*info) > 0 {
- for _, m := range *info {
- if util.IntAll(m["level"]) == 1 {
- PreProcessField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
- }
- }
- }
- log.Info("InitPreProcessField", zap.Int("PreProcessField", len(ProjectField)))
- log.Info("InitPreProcessField", zap.Any("duration", time.Since(now).Seconds()))
- }
- // InitPreEsClient 实例化预处理 索引客户端
- //func InitPreEsClient() {
- // if len(config.Conf.Pre) > 0 {
- // for k, v := range config.Conf.Pre {
- // cli := &elastic.Elastic{
- // S_esurl: v.Addr,
- // I_size: 30,
- // Username: v.Username,
- // Password: v.Password,
- // }
- // cli.InitElasticSize()
- // PreEs[k] = cli
- // }
- // }
- //}
- // InitEsBiddingField 初始化 bidding 索引字段
- func InitEsBiddingField() {
- now := time.Now()
- info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
- if len(*info) > 0 {
- for _, m := range *info {
- if util.IntAll(m["level"]) == 1 {
- BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
- } else if util.IntAll(m["level"]) == 2 {
- pfield := util.ObjToString(m["pfield"])
- pfieldMap := BiddingLevelField[pfield]
- if pfieldMap == nil {
- pfieldMap = make(map[string]string, 0)
- }
- pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
- BiddingLevelField[pfield] = pfieldMap
- }
- }
- }
- log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
- log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
- log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
- }
- // verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
- func verifyESFields() {
- now := time.Now()
- log.Info("verifyESFields", zap.String("开始类型检测", ""))
- client, _ := es7.NewClient(
- es7.SetURL(config.Conf.DB.Es.Addr),
- es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
- es7.SetSniff(false),
- )
- index := config.Conf.DB.Es.IndexB //索引表 bidding
- // 获取 Elasticsearch 索引的 mapping 信息
- mapping, err := client.GetMapping().Index(index).Do(context.Background())
- if err != nil {
- log.Error("verifyESFields", zap.Any("getting Elasticsearch mapping:", err))
- }
- indexName, _ := GetIndexName(client, index)
- if indexName == "" || mapping == nil {
- log.Error("verifyESFields", zap.String("索引不存在,请检查索引", index))
- os.Exit(-1)
- return
- }
- if mapping[indexName].(map[string]interface{})["mappings"] == nil || mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"] == nil {
- log.Error("verifyESFields", zap.String("索引不存在或状态不对,请检查索引", index))
- os.Exit(-1)
- return
- }
- properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
- var errField = make([]string, 0)
- var okField = make([]string, 0)
- var analyzerMap = make(map[string]string) // 分词信息
- var esMap = make(map[string]string) //存储es 字段类型
- //
- for field, ftype := range BiddingField {
- eftypeMap, _ := properties[field].(map[string]interface{})
- var etype string
- var analyzer string
- if fftype, ok := eftypeMap["type"]; ok {
- etype = fftype.(string)
- esMap[field] = etype
- }
- if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
- analyzer = ffanalyzer.(string)
- analyzerMap[field] = analyzer
- }
- if ftype != "" {
- if chargeType(ftype, etype) {
- okField = append(okField, field)
- } else {
- errField = append(errField, field)
- }
- } else {
- if field == "_id" {
- continue
- } else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
- if eproperties, ok := eftypeMap["properties"]; ok {
- if eproMap, ok := eproperties.(map[string]interface{}); ok {
- for k, v := range eproMap {
- if innerMap, ok := v.(map[string]interface{}); ok {
- if innerType, ok := innerMap["type"]; ok {
- innerLevel := BiddingLevelField[field]
- esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
- if chargeType(innerLevel[k], innerType.(string)) {
- okField = append(okField, fmt.Sprintf("%s.%s", field, k))
- } else {
- errField = append(errField, fmt.Sprintf("%s.%s", field, k))
- }
- }
- }
- }
- }
- }
- }
- }
- }
- if len(errField) > 0 {
- log.Error("verifyESFields", zap.Int("错误字段数量", len(errField)))
- for _, field := range errField {
- if strings.Contains(field, ".") {
- fe := strings.Split(field, ".")
- log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
- } else {
- log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
- }
- }
- os.Exit(-1)
- } else {
- log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
- }
- log.Info("verifyESFields", zap.Any("duration", time.Since(now).Seconds()))
- }
- func GetIndexName(client *es7.Client, name string) (string, error) {
- // 判断 name 是否为一个别名
- res, err := client.Aliases().Alias(name).Do(context.Background())
- if err != nil {
- // 错误处理
- if err.(*es7.Error).Status != 404 && err.(*es7.Error).Details != nil {
- return "", err
- }
- }
- if res != nil {
- for k, v := range res.Indices {
- for _, vv := range v.Aliases {
- if vv.AliasName == name {
- return k, nil
- }
- }
- }
- }
- // 判断 name 是否为一个正式索引名称
- resa, err := client.IndexExists(name).Do(context.Background())
- if err != nil {
- // 错误处理
- return "", err
- }
- if resa {
- return name, nil
- }
- // 如果 name 既不是别名,也不是正式索引名称,则返回空字符串
- return "", nil
- }
- // InitBitmap 初始化项目名称副标题 bitmap
- func InitBitmap() {
- if config.Conf.Env.Dbfile != "" {
- dbfile = &config.Conf.Env.Dbfile
- }
- _, err := os.Stat(*dbfile)
- log.Info("InitBitmap", zap.String("dbfile", *dbfile))
- if !os.IsNotExist(err) {
- bs, err := ioutil.ReadFile(*dbfile)
- if err != nil {
- log.Info("InitBitmap", zap.Error(err))
- }
- if len(bs) > 0 {
- _, err := cache.FromBuffer(bs)
- if err != nil {
- log.Info("InitBitmap", zap.Any("cache.FromBuffer", err))
- }
- }
- }
- log.Info("InitBitmap", zap.Any("cache.FromBuffer", "success"))
- //监听,写入文件保存
- go func() {
- for {
- time.Sleep(10 * time.Second)
- if cacheModify {
- saveDb()
- cacheModify = false
- }
- }
- }()
- }
|