init.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/mysqldb"
  8. "context"
  9. "esindex/config"
  10. "fmt"
  11. es7 "github.com/olivere/elastic/v7"
  12. "go.uber.org/zap"
  13. "os"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. ProjectField = make(map[string]string, 500) //项目字段
  19. ProjectListF = make(map[string]string, 200)
  20. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  21. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  22. )
  23. // InitLog @Description
  24. // @Author J 2022/7/26 15:30
  25. func InitLog() {
  26. now := time.Now()
  27. logcfg := config.Conf.Log
  28. err := log.InitLog(
  29. log.Path(logcfg.LogPath),
  30. log.Level(logcfg.LogLevel),
  31. log.Compress(logcfg.Compress),
  32. log.MaxSize(logcfg.MaxSize),
  33. log.MaxBackups(logcfg.MaxBackups),
  34. log.MaxAge(logcfg.MaxAge),
  35. log.Format(logcfg.Format),
  36. )
  37. if err != nil {
  38. fmt.Printf("InitLog failed: %v\n", err)
  39. os.Exit(1)
  40. }
  41. log.Info("InitLog", zap.Any("duration", time.Since(now).Seconds()))
  42. }
  43. func InitMgo() {
  44. now := time.Now()
  45. MgoB = &mongodb.MongodbSim{
  46. MongodbAddr: config.Conf.DB.MongoB.Addr,
  47. DbName: config.Conf.DB.MongoB.Dbname,
  48. Size: config.Conf.DB.MongoB.Size,
  49. UserName: config.Conf.DB.MongoB.User,
  50. Password: config.Conf.DB.MongoB.Password,
  51. }
  52. MgoB.InitPool()
  53. log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds()))
  54. MgoP = &mongodb.MongodbSim{
  55. MongodbAddr: config.Conf.DB.MongoP.Addr,
  56. DbName: config.Conf.DB.MongoP.Dbname,
  57. Size: config.Conf.DB.MongoP.Size,
  58. UserName: config.Conf.DB.MongoP.User,
  59. Password: config.Conf.DB.MongoP.Password,
  60. }
  61. MgoP.InitPool()
  62. log.Info("InitMgo", zap.Any("MgoP duration", time.Since(now).Seconds()))
  63. MgoQ = &mongodb.MongodbSim{
  64. MongodbAddr: config.Conf.DB.MongoQ.Addr,
  65. DbName: config.Conf.DB.MongoQ.Dbname,
  66. Size: config.Conf.DB.MongoQ.Size,
  67. UserName: config.Conf.DB.MongoQ.User,
  68. Password: config.Conf.DB.MongoQ.Password,
  69. }
  70. MgoQ.InitPool()
  71. log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds()))
  72. Mysql = &mysqldb.Mysql{
  73. Address: config.Conf.DB.MysqlB.Addr,
  74. DBName: config.Conf.DB.MysqlB.Dbname,
  75. UserName: config.Conf.DB.MysqlB.Username,
  76. PassWord: config.Conf.DB.MysqlB.Password,
  77. }
  78. Mysql.Init()
  79. log.Info("InitMgo", zap.Any("MysqlB duration", time.Since(now).Seconds()))
  80. MgoS = &mongodb.MongodbSim{
  81. MongodbAddr: config.Conf.DB.MongoS.Addr,
  82. DbName: config.Conf.DB.MongoS.Dbname,
  83. Size: config.Conf.DB.MongoS.Size,
  84. UserName: config.Conf.DB.MongoS.User,
  85. Password: config.Conf.DB.MongoS.Password,
  86. }
  87. MgoS.InitPool()
  88. log.Info("InitMgo", zap.Any("MgoS duration", time.Since(now).Seconds()))
  89. }
  90. func InitEs() {
  91. now := time.Now()
  92. Es = &elastic.Elastic{
  93. S_esurl: config.Conf.DB.Es.Addr,
  94. I_size: config.Conf.DB.Es.Size,
  95. Username: config.Conf.DB.Es.Username,
  96. Password: config.Conf.DB.Es.Password,
  97. }
  98. Es.InitElasticSize()
  99. Es1 = &elastic.Elastic{
  100. S_esurl: config.Conf.DB.Es.AddrP,
  101. I_size: config.Conf.DB.Es.Size,
  102. Username: config.Conf.DB.Es.Username,
  103. Password: config.Conf.DB.Es.Password,
  104. }
  105. Es1.InitElasticSize()
  106. log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
  107. }
  108. func InitField() {
  109. now := time.Now()
  110. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "project"}`, nil, nil, false, -1, -1)
  111. if len(*info) > 0 {
  112. for _, m := range *info {
  113. if util.IntAll(m["level"]) == 1 {
  114. ProjectField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  115. } else if util.IntAll(m["level"]) == 2 {
  116. ProjectListF[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  117. }
  118. }
  119. }
  120. log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
  121. log.Info("InitField", zap.Any("duration", time.Since(now).Seconds()))
  122. }
  123. //InitEsBiddingField 初始化 bidding 索引字段
  124. func InitEsBiddingField() {
  125. now := time.Now()
  126. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  127. if len(*info) > 0 {
  128. for _, m := range *info {
  129. if util.IntAll(m["level"]) == 1 {
  130. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  131. } else if util.IntAll(m["level"]) == 2 {
  132. pfield := util.ObjToString(m["pfield"])
  133. pfieldMap := BiddingLevelField[pfield]
  134. if pfieldMap == nil {
  135. pfieldMap = make(map[string]string, 0)
  136. }
  137. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  138. BiddingLevelField[pfield] = pfieldMap
  139. }
  140. }
  141. }
  142. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  143. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  144. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  145. }
  146. //verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
  147. func verifyESFields() {
  148. now := time.Now()
  149. log.Info("verifyESFields", zap.String("开始类型检测", ""))
  150. client, _ := es7.NewClient(
  151. es7.SetURL(config.Conf.DB.Es.Addr),
  152. es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
  153. es7.SetSniff(false),
  154. )
  155. index := config.Conf.DB.Es.IndexB //索引表 bidding
  156. // 获取 Elasticsearch 索引的 mapping 信息
  157. mapping, err := client.GetMapping().Index(index).Do(context.Background())
  158. if err != nil {
  159. log.Info("verifyESFields", zap.Any("getting Elasticsearch mapping:", err))
  160. }
  161. indexName, _ := GetIndexName(client, index)
  162. if indexName == "" {
  163. log.Info("verifyESFields", zap.String("索引不存在,请检查索引", index))
  164. return
  165. }
  166. properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
  167. var errField = make([]string, 0)
  168. var okField = make([]string, 0)
  169. var analyzerMap = make(map[string]string) // 分词信息
  170. var esMap = make(map[string]string) //存储es 字段类型
  171. //
  172. for field, ftype := range BiddingField {
  173. eftypeMap, _ := properties[field].(map[string]interface{})
  174. var etype string
  175. var analyzer string
  176. if fftype, ok := eftypeMap["type"]; ok {
  177. etype = fftype.(string)
  178. esMap[field] = etype
  179. }
  180. if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
  181. analyzer = ffanalyzer.(string)
  182. analyzerMap[field] = analyzer
  183. }
  184. if ftype != "" {
  185. if chargeType(ftype, etype) {
  186. okField = append(okField, field)
  187. } else {
  188. errField = append(errField, field)
  189. }
  190. } else {
  191. if field == "_id" {
  192. continue
  193. } else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
  194. if eproperties, ok := eftypeMap["properties"]; ok {
  195. if eproMap, ok := eproperties.(map[string]interface{}); ok {
  196. for k, v := range eproMap {
  197. if innerMap, ok := v.(map[string]interface{}); ok {
  198. if innerType, ok := innerMap["type"]; ok {
  199. innerLevel := BiddingLevelField[field]
  200. esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
  201. if chargeType(innerLevel[k], innerType.(string)) {
  202. okField = append(okField, fmt.Sprintf("%s.%s", field, k))
  203. } else {
  204. errField = append(errField, fmt.Sprintf("%s.%s", field, k))
  205. }
  206. }
  207. }
  208. }
  209. }
  210. }
  211. }
  212. }
  213. }
  214. if len(errField) > 0 {
  215. log.Info("verifyESFields", zap.Int("错误字段数量", len(errField)))
  216. for _, field := range errField {
  217. if strings.Contains(field, ".") {
  218. fe := strings.Split(field, ".")
  219. log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
  220. } else {
  221. log.Info(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
  222. }
  223. }
  224. } else {
  225. log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
  226. }
  227. log.Info("verifyESFields", zap.Any("duration", time.Since(now).Seconds()))
  228. }
  229. func GetIndexName(client *es7.Client, name string) (string, error) {
  230. // 判断 name 是否为一个别名
  231. res, err := client.Aliases().Alias(name).Do(context.Background())
  232. if err != nil {
  233. // 错误处理
  234. if err.(*es7.Error).Status != 404 && err.(*es7.Error).Details != nil {
  235. return "", err
  236. }
  237. }
  238. if res != nil {
  239. for k, v := range res.Indices {
  240. for _, vv := range v.Aliases {
  241. if vv.AliasName == name {
  242. return k, nil
  243. }
  244. }
  245. }
  246. }
  247. // 判断 name 是否为一个正式索引名称
  248. resa, err := client.IndexExists(name).Do(context.Background())
  249. if err != nil {
  250. // 错误处理
  251. return "", err
  252. }
  253. if resa {
  254. return name, nil
  255. }
  256. // 如果 name 既不是别名,也不是正式索引名称,则返回空字符串
  257. return "", nil
  258. }