init.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "context"
  8. "esindex/config"
  9. "fmt"
  10. es7 "github.com/olivere/elastic/v7"
  11. "go.uber.org/zap"
  12. "os"
  13. "strings"
  14. )
  15. var (
  16. ProjectField = make(map[string]string, 500) //项目字段
  17. ProjectListF = make(map[string]string, 200)
  18. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  19. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  20. )
  21. // InitLog @Description
  22. // @Author J 2022/7/26 15:30
  23. func InitLog() {
  24. logcfg := config.Conf.Log
  25. err := log.InitLog(
  26. log.Path(logcfg.LogPath),
  27. log.Level(logcfg.LogLevel),
  28. log.Compress(logcfg.Compress),
  29. log.MaxSize(logcfg.MaxSize),
  30. log.MaxBackups(logcfg.MaxBackups),
  31. log.MaxAge(logcfg.MaxAge),
  32. log.Format(logcfg.Format),
  33. )
  34. if err != nil {
  35. fmt.Printf("InitLog failed: %v\n", err)
  36. os.Exit(1)
  37. }
  38. }
  39. func InitMgo() {
  40. MgoB = &mongodb.MongodbSim{
  41. MongodbAddr: config.Conf.DB.MongoB.Addr,
  42. DbName: config.Conf.DB.MongoB.Dbname,
  43. Size: config.Conf.DB.MongoB.Size,
  44. UserName: config.Conf.DB.MongoB.User,
  45. Password: config.Conf.DB.MongoB.Password,
  46. }
  47. MgoB.InitPool()
  48. MgoP = &mongodb.MongodbSim{
  49. MongodbAddr: config.Conf.DB.MongoP.Addr,
  50. DbName: config.Conf.DB.MongoP.Dbname,
  51. Size: config.Conf.DB.MongoP.Size,
  52. UserName: config.Conf.DB.MongoP.User,
  53. Password: config.Conf.DB.MongoP.Password,
  54. }
  55. MgoP.InitPool()
  56. MgoQ = &mongodb.MongodbSim{
  57. MongodbAddr: config.Conf.DB.MongoQ.Addr,
  58. DbName: config.Conf.DB.MongoQ.Dbname,
  59. Size: config.Conf.DB.MongoQ.Size,
  60. UserName: config.Conf.DB.MongoQ.User,
  61. Password: config.Conf.DB.MongoQ.Password,
  62. }
  63. MgoQ.InitPool()
  64. }
  65. func InitEs() {
  66. Es = &elastic.Elastic{
  67. S_esurl: config.Conf.DB.Es.Addr,
  68. I_size: config.Conf.DB.Es.Size,
  69. Username: config.Conf.DB.Es.Username,
  70. Password: config.Conf.DB.Es.Password,
  71. }
  72. Es.InitElasticSize()
  73. Es1 = &elastic.Elastic{
  74. S_esurl: config.Conf.DB.Es.AddrP,
  75. I_size: config.Conf.DB.Es.Size,
  76. Username: config.Conf.DB.Es.Username,
  77. Password: config.Conf.DB.Es.Password,
  78. }
  79. Es1.InitElasticSize()
  80. }
  81. func InitField() {
  82. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "project"}`, nil, nil, false, -1, -1)
  83. if len(*info) > 0 {
  84. for _, m := range *info {
  85. if util.IntAll(m["level"]) == 1 {
  86. ProjectField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  87. } else if util.IntAll(m["level"]) == 2 {
  88. ProjectListF[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  89. }
  90. }
  91. }
  92. log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
  93. }
  94. //InitEsBiddingField 初始化 bidding 索引字段
  95. func InitEsBiddingField() {
  96. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  97. if len(*info) > 0 {
  98. for _, m := range *info {
  99. if util.IntAll(m["level"]) == 1 {
  100. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  101. } else if util.IntAll(m["level"]) == 2 {
  102. pfield := util.ObjToString(m["pfield"])
  103. pfieldMap := BiddingLevelField[pfield]
  104. if pfieldMap == nil {
  105. pfieldMap = make(map[string]string, 0)
  106. }
  107. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  108. BiddingLevelField[pfield] = pfieldMap
  109. }
  110. }
  111. }
  112. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  113. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  114. }
  115. //verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
  116. func verifyESFields() {
  117. log.Info("verifyESFields", zap.String("开始类型检测", ""))
  118. client, _ := es7.NewClient(
  119. es7.SetURL(config.Conf.DB.Es.Addr),
  120. es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
  121. es7.SetSniff(false),
  122. )
  123. index := config.Conf.DB.Es.IndexB //索引表 bidding
  124. // 获取 Elasticsearch 索引的 mapping 信息
  125. mapping, err := client.GetMapping().Index(index).Do(context.Background())
  126. if err != nil {
  127. log.Info("verifyESFields", zap.Any("getting Elasticsearch mapping:", err))
  128. }
  129. indexName, _ := GetIndexName(client, index)
  130. if indexName == "" {
  131. log.Info("verifyESFields", zap.String("索引不存在,请检查索引", index))
  132. return
  133. }
  134. properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
  135. var errField = make([]string, 0)
  136. var okField = make([]string, 0)
  137. var analyzerMap = make(map[string]string) // 分词信息
  138. var esMap = make(map[string]string) //存储es 字段类型
  139. //
  140. for field, ftype := range BiddingField {
  141. eftypeMap, _ := properties[field].(map[string]interface{})
  142. var etype string
  143. var analyzer string
  144. if fftype, ok := eftypeMap["type"]; ok {
  145. etype = fftype.(string)
  146. esMap[field] = etype
  147. }
  148. if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
  149. analyzer = ffanalyzer.(string)
  150. analyzerMap[field] = analyzer
  151. }
  152. if ftype != "" {
  153. if chargeType(ftype, etype) {
  154. okField = append(okField, field)
  155. } else {
  156. errField = append(errField, field)
  157. }
  158. } else {
  159. if field == "_id" {
  160. continue
  161. } else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
  162. if eproperties, ok := eftypeMap["properties"]; ok {
  163. if eproMap, ok := eproperties.(map[string]interface{}); ok {
  164. for k, v := range eproMap {
  165. if innerMap, ok := v.(map[string]interface{}); ok {
  166. if innerType, ok := innerMap["type"]; ok {
  167. innerLevel := BiddingLevelField[field]
  168. esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
  169. if chargeType(innerLevel[k], innerType.(string)) {
  170. okField = append(okField, fmt.Sprintf("%s.%s", field, k))
  171. } else {
  172. errField = append(errField, fmt.Sprintf("%s.%s", field, k))
  173. }
  174. }
  175. }
  176. }
  177. }
  178. }
  179. }
  180. }
  181. }
  182. if len(errField) > 0 {
  183. log.Info("verifyESFields", zap.Int("错误字段数量", len(errField)))
  184. for _, field := range errField {
  185. if strings.Contains(field, ".") {
  186. fe := strings.Split(field, ".")
  187. log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
  188. } else {
  189. log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
  190. }
  191. }
  192. } else {
  193. log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
  194. }
  195. }
  196. func GetIndexName(client *es7.Client, name string) (string, error) {
  197. // 判断 name 是否为一个别名
  198. res, err := client.Aliases().Alias(name).Do(context.Background())
  199. if err != nil {
  200. // 错误处理
  201. return "", err
  202. }
  203. for k, v := range res.Indices {
  204. for _, vv := range v.Aliases {
  205. if vv.AliasName == name {
  206. return k, nil
  207. }
  208. }
  209. }
  210. // 判断 name 是否为一个正式索引名称
  211. resa, err := client.IndexExists(name).Do(context.Background())
  212. if err != nil {
  213. // 错误处理
  214. return "", err
  215. }
  216. if resa {
  217. return name, nil
  218. }
  219. // 如果 name 既不是别名,也不是正式索引名称,则返回空字符串
  220. return "", nil
  221. }