init.go 11 KB


  1. package main
  2. import (
  3. "context"
  4. "esindex/config"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
  13. "os"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. ProjectField = make(map[string]string, 500) //项目字段
  19. ProjectListF = make(map[string]string, 200)
  20. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  21. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  22. )
  23. // InitLog @Description
  24. // @Author J 2022/7/26 15:30
  25. func InitLog() {
  26. now := time.Now()
  27. logcfg := config.Conf.Log
  28. err := log.InitLog(
  29. log.Path(logcfg.LogPath),
  30. log.Level(logcfg.LogLevel),
  31. log.Compress(logcfg.Compress),
  32. log.MaxSize(logcfg.MaxSize),
  33. log.MaxBackups(logcfg.MaxBackups),
  34. log.MaxAge(logcfg.MaxAge),
  35. log.Format(logcfg.Format),
  36. )
  37. if err != nil {
  38. fmt.Printf("InitLog failed: %v\n", err)
  39. os.Exit(1)
  40. }
  41. log.Info("InitLog", zap.Any("duration", time.Since(now).Seconds()))
  42. }
  43. func InitMgo() {
  44. now := time.Now()
  45. MgoB = &mongodb.MongodbSim{
  46. MongodbAddr: config.Conf.DB.MongoB.Addr,
  47. DbName: config.Conf.DB.MongoB.Dbname,
  48. Size: config.Conf.DB.MongoB.Size,
  49. UserName: config.Conf.DB.MongoB.User,
  50. Password: config.Conf.DB.MongoB.Password,
  51. }
  52. MgoB.InitPool()
  53. if config.Conf.DB.MongoB.Addr == "" || config.Conf.DB.MongoB.Dbname == "" {
  54. log.Error("InitMgo", zap.String("MgoB", "地址或者数据库为空"))
  55. }
  56. if config.Conf.DB.MongoB.Coll == "" {
  57. log.Error("InitMgo", zap.String("MgoB", "查询表为空"))
  58. }
  59. log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds()))
  60. //项目信息
  61. MgoP = &mongodb.MongodbSim{
  62. MongodbAddr: config.Conf.DB.MongoP.Addr,
  63. DbName: config.Conf.DB.MongoP.Dbname,
  64. Size: config.Conf.DB.MongoP.Size,
  65. UserName: config.Conf.DB.MongoP.User,
  66. Password: config.Conf.DB.MongoP.Password,
  67. }
  68. MgoP.InitPool()
  69. if config.Conf.DB.MongoP.Addr == "" || config.Conf.DB.MongoP.Dbname == "" {
  70. log.Error("InitMgo", zap.String("MongoP", "地址或者数据库为空"))
  71. }
  72. if config.Conf.DB.MongoP.Coll == "" {
  73. log.Error("InitMgo", zap.String("MongoP", "查询表为空"))
  74. }
  75. log.Info("InitMgo", zap.Any("MgoP duration", time.Since(now).Seconds()))
  76. //中标单位定时同步
  77. MgoQ = &mongodb.MongodbSim{
  78. MongodbAddr: config.Conf.DB.MongoQ.Addr,
  79. DbName: config.Conf.DB.MongoQ.Dbname,
  80. Size: config.Conf.DB.MongoQ.Size,
  81. UserName: config.Conf.DB.MongoQ.User,
  82. Password: config.Conf.DB.MongoQ.Password,
  83. }
  84. MgoQ.InitPool()
  85. if config.Conf.DB.MongoQ.Addr == "" || config.Conf.DB.MongoQ.Dbname == "" {
  86. log.Error("InitMgo", zap.String("MongoQ", "地址或者数据库为空"))
  87. }
  88. log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds()))
  89. //采购单位
  90. Mysql = &mysqldb.Mysql{
  91. Address: config.Conf.DB.MysqlB.Addr,
  92. DBName: config.Conf.DB.MysqlB.Dbname,
  93. UserName: config.Conf.DB.MysqlB.Username,
  94. PassWord: config.Conf.DB.MysqlB.Password,
  95. }
  96. Mysql.Init()
  97. if config.Conf.DB.MysqlB.Addr == "" || config.Conf.DB.MysqlB.Dbname == "" {
  98. log.Error("InitMgo", zap.String("Mysql", "地址或者数据库为空"))
  99. }
  100. log.Info("InitMgo", zap.Any("MysqlB duration", time.Since(now).Seconds()))
  101. //181 特殊企业,采购单位验证
  102. MgoS = &mongodb.MongodbSim{
  103. MongodbAddr: config.Conf.DB.MongoS.Addr,
  104. DbName: config.Conf.DB.MongoS.Dbname,
  105. Size: config.Conf.DB.MongoS.Size,
  106. UserName: config.Conf.DB.MongoS.User,
  107. Password: config.Conf.DB.MongoS.Password,
  108. }
  109. MgoS.InitPool()
  110. if config.Conf.DB.MongoS.Addr == "" || config.Conf.DB.MongoS.Dbname == "" {
  111. log.Error("InitMgo", zap.String("MongoS", "地址或者数据库为空"))
  112. }
  113. log.Info("InitMgo", zap.Any("MgoS duration", time.Since(now).Seconds()))
  114. }
  115. func InitEs() {
  116. now := time.Now()
  117. Es = &elastic.Elastic{
  118. S_esurl: config.Conf.DB.Es.Addr,
  119. I_size: config.Conf.DB.Es.Size,
  120. Username: config.Conf.DB.Es.Username,
  121. Password: config.Conf.DB.Es.Password,
  122. }
  123. Es.InitElasticSize()
  124. if config.Conf.DB.Es.Addr == "" {
  125. log.Error("InitEs", zap.String("ES", "地址或者数据库为空"))
  126. }
  127. if config.Conf.DB.Es.IndexB == "" {
  128. log.Error("InitEs", zap.String("IndexB", "indexb bidding 索引为空,请检查"))
  129. } else {
  130. log.Debug("InitEs", zap.String("IndexB", config.Conf.DB.Es.IndexB))
  131. }
  132. if config.Conf.DB.Es.IndexP == "" {
  133. log.Error("InitEs", zap.String("IndexB", "projectset 项目索引为空,请检查"))
  134. } else {
  135. log.Debug("InitEs", zap.String("IndexP", config.Conf.DB.Es.IndexP))
  136. }
  137. if config.Conf.DB.Es.IndexTmp == "" {
  138. log.Error("InitEs", zap.String("IndexTmp 为空", "请检查是否需要配置;该配置主要生产环境需要"))
  139. }
  140. if config.Conf.DB.Es.IndexWinner == "" {
  141. log.Error("InitEs", zap.String("IndexWinner", "中标单位 索引为空,请检查"))
  142. }
  143. if config.Conf.DB.Es.IndexBuyer == "" {
  144. log.Error("InitEs", zap.String("IndexBuyer", "采购单位 索引为空,请检查"))
  145. }
  146. Es1 = &elastic.Elastic{
  147. S_esurl: config.Conf.DB.Es.AddrP,
  148. I_size: config.Conf.DB.Es.Size,
  149. Username: config.Conf.DB.Es.Username,
  150. Password: config.Conf.DB.Es.Password,
  151. }
  152. Es1.InitElasticSize()
  153. log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
  154. }
  155. func InitField() {
  156. now := time.Now()
  157. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "project"}`, nil, nil, false, -1, -1)
  158. if len(*info) > 0 {
  159. for _, m := range *info {
  160. if util.IntAll(m["level"]) == 1 {
  161. ProjectField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  162. } else if util.IntAll(m["level"]) == 2 {
  163. ProjectListF[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  164. }
  165. }
  166. }
  167. log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
  168. log.Info("InitField", zap.Any("duration", time.Since(now).Seconds()))
  169. }
  170. //InitEsBiddingField 初始化 bidding 索引字段
  171. func InitEsBiddingField() {
  172. now := time.Now()
  173. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  174. if len(*info) > 0 {
  175. for _, m := range *info {
  176. if util.IntAll(m["level"]) == 1 {
  177. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  178. } else if util.IntAll(m["level"]) == 2 {
  179. pfield := util.ObjToString(m["pfield"])
  180. pfieldMap := BiddingLevelField[pfield]
  181. if pfieldMap == nil {
  182. pfieldMap = make(map[string]string, 0)
  183. }
  184. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  185. BiddingLevelField[pfield] = pfieldMap
  186. }
  187. }
  188. }
  189. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  190. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  191. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  192. }
  193. //verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
  194. func verifyESFields() {
  195. now := time.Now()
  196. log.Info("verifyESFields", zap.String("开始类型检测", ""))
  197. client, _ := es7.NewClient(
  198. es7.SetURL(config.Conf.DB.Es.Addr),
  199. es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
  200. es7.SetSniff(false),
  201. )
  202. index := config.Conf.DB.Es.IndexB //索引表 bidding
  203. // 获取 Elasticsearch 索引的 mapping 信息
  204. mapping, err := client.GetMapping().Index(index).Do(context.Background())
  205. if err != nil {
  206. log.Error("verifyESFields", zap.Any("getting Elasticsearch mapping:", err))
  207. }
  208. indexName, _ := GetIndexName(client, index)
  209. if indexName == "" || mapping == nil {
  210. log.Error("verifyESFields", zap.String("索引不存在,请检查索引", index))
  211. os.Exit(-1)
  212. return
  213. }
  214. if mapping[indexName].(map[string]interface{})["mappings"] == nil || mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"] == nil {
  215. log.Error("verifyESFields", zap.String("索引不存在或状态不对,请检查索引", index))
  216. os.Exit(-1)
  217. return
  218. }
  219. properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
  220. var errField = make([]string, 0)
  221. var okField = make([]string, 0)
  222. var analyzerMap = make(map[string]string) // 分词信息
  223. var esMap = make(map[string]string) //存储es 字段类型
  224. //
  225. for field, ftype := range BiddingField {
  226. eftypeMap, _ := properties[field].(map[string]interface{})
  227. var etype string
  228. var analyzer string
  229. if fftype, ok := eftypeMap["type"]; ok {
  230. etype = fftype.(string)
  231. esMap[field] = etype
  232. }
  233. if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
  234. analyzer = ffanalyzer.(string)
  235. analyzerMap[field] = analyzer
  236. }
  237. if ftype != "" {
  238. if chargeType(ftype, etype) {
  239. okField = append(okField, field)
  240. } else {
  241. errField = append(errField, field)
  242. }
  243. } else {
  244. if field == "_id" {
  245. continue
  246. } else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
  247. if eproperties, ok := eftypeMap["properties"]; ok {
  248. if eproMap, ok := eproperties.(map[string]interface{}); ok {
  249. for k, v := range eproMap {
  250. if innerMap, ok := v.(map[string]interface{}); ok {
  251. if innerType, ok := innerMap["type"]; ok {
  252. innerLevel := BiddingLevelField[field]
  253. esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
  254. if chargeType(innerLevel[k], innerType.(string)) {
  255. okField = append(okField, fmt.Sprintf("%s.%s", field, k))
  256. } else {
  257. errField = append(errField, fmt.Sprintf("%s.%s", field, k))
  258. }
  259. }
  260. }
  261. }
  262. }
  263. }
  264. }
  265. }
  266. }
  267. if len(errField) > 0 {
  268. log.Error("verifyESFields", zap.Int("错误字段数量", len(errField)))
  269. for _, field := range errField {
  270. if strings.Contains(field, ".") {
  271. fe := strings.Split(field, ".")
  272. log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
  273. } else {
  274. log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
  275. }
  276. }
  277. os.Exit(-1)
  278. } else {
  279. log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
  280. }
  281. log.Info("verifyESFields", zap.Any("duration", time.Since(now).Seconds()))
  282. }
  283. func GetIndexName(client *es7.Client, name string) (string, error) {
  284. // 判断 name 是否为一个别名
  285. res, err := client.Aliases().Alias(name).Do(context.Background())
  286. if err != nil {
  287. // 错误处理
  288. if err.(*es7.Error).Status != 404 && err.(*es7.Error).Details != nil {
  289. return "", err
  290. }
  291. }
  292. if res != nil {
  293. for k, v := range res.Indices {
  294. for _, vv := range v.Aliases {
  295. if vv.AliasName == name {
  296. return k, nil
  297. }
  298. }
  299. }
  300. }
  301. // 判断 name 是否为一个正式索引名称
  302. resa, err := client.IndexExists(name).Do(context.Background())
  303. if err != nil {
  304. // 错误处理
  305. return "", err
  306. }
  307. if resa {
  308. return name, nil
  309. }
  310. // 如果 name 既不是别名,也不是正式索引名称,则返回空字符串
  311. return "", nil
  312. }