init.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package main
  2. import (
  3. "context"
  4. "esindex/config"
  5. "fmt"
  6. es7 "github.com/olivere/elastic/v7"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
  13. "os"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. ProjectField = make(map[string]string, 500) //项目字段
  19. ProjectListF = make(map[string]string, 200)
  20. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  21. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  22. PreProcessField = make(map[string]string, 500) //预处理流程 bidding字段
  23. )
  24. // InitLog @Description
  25. // @Author J 2022/7/26 15:30
  26. func InitLog() {
  27. now := time.Now()
  28. logcfg := config.Conf.Log
  29. err := log.InitLog(
  30. log.Path(logcfg.LogPath),
  31. log.Level(logcfg.LogLevel),
  32. log.Compress(logcfg.Compress),
  33. log.MaxSize(logcfg.MaxSize),
  34. log.MaxBackups(logcfg.MaxBackups),
  35. log.MaxAge(logcfg.MaxAge),
  36. log.Format(logcfg.Format),
  37. )
  38. if err != nil {
  39. fmt.Printf("InitLog failed: %v\n", err)
  40. os.Exit(1)
  41. }
  42. log.Info("InitLog", zap.Any("duration", time.Since(now).Seconds()))
  43. }
  44. func InitMgo() {
  45. now := time.Now()
  46. MgoB = &mongodb.MongodbSim{
  47. MongodbAddr: config.Conf.DB.MongoB.Addr,
  48. DbName: config.Conf.DB.MongoB.Dbname,
  49. Size: config.Conf.DB.MongoB.Size,
  50. UserName: config.Conf.DB.MongoB.User,
  51. Password: config.Conf.DB.MongoB.Password,
  52. Direct: config.Conf.DB.MongoB.Direct,
  53. }
  54. MgoB.InitPool()
  55. if config.Conf.DB.MongoB.Addr == "" || config.Conf.DB.MongoB.Dbname == "" {
  56. log.Error("InitMgo", zap.String("MgoB", "地址或者数据库为空"))
  57. }
  58. if config.Conf.DB.MongoB.Coll == "" {
  59. log.Error("InitMgo", zap.String("MgoB", "查询表为空"))
  60. }
  61. log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds()))
  62. //项目信息
  63. MgoP = &mongodb.MongodbSim{
  64. MongodbAddr: config.Conf.DB.MongoP.Addr,
  65. DbName: config.Conf.DB.MongoP.Dbname,
  66. Size: config.Conf.DB.MongoP.Size,
  67. UserName: config.Conf.DB.MongoP.User,
  68. Password: config.Conf.DB.MongoP.Password,
  69. Direct: config.Conf.DB.MongoP.Direct,
  70. }
  71. MgoP.InitPool()
  72. if config.Conf.DB.MongoP.Addr == "" || config.Conf.DB.MongoP.Dbname == "" {
  73. log.Error("InitMgo", zap.String("MongoP", "地址或者数据库为空"))
  74. }
  75. if config.Conf.DB.MongoP.Coll == "" {
  76. log.Error("InitMgo", zap.String("MongoP", "查询表为空"))
  77. }
  78. log.Info("InitMgo", zap.Any("MgoP duration", time.Since(now).Seconds()))
  79. //中标单位定时同步
  80. MgoQ = &mongodb.MongodbSim{
  81. MongodbAddr: config.Conf.DB.MongoQ.Addr,
  82. DbName: config.Conf.DB.MongoQ.Dbname,
  83. Size: config.Conf.DB.MongoQ.Size,
  84. UserName: config.Conf.DB.MongoQ.User,
  85. Password: config.Conf.DB.MongoQ.Password,
  86. Direct: config.Conf.DB.MongoQ.Direct,
  87. }
  88. MgoQ.InitPool()
  89. if config.Conf.DB.MongoQ.Addr == "" || config.Conf.DB.MongoQ.Dbname == "" {
  90. log.Error("InitMgo", zap.String("MongoQ", "地址或者数据库为空"))
  91. }
  92. log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds()))
  93. //181 特殊企业,采购单位验证
  94. MgoS = &mongodb.MongodbSim{
  95. MongodbAddr: config.Conf.DB.MongoS.Addr,
  96. DbName: config.Conf.DB.MongoS.Dbname,
  97. Size: config.Conf.DB.MongoS.Size,
  98. UserName: config.Conf.DB.MongoS.User,
  99. Password: config.Conf.DB.MongoS.Password,
  100. Direct: config.Conf.DB.MongoS.Direct,
  101. }
  102. MgoS.InitPool()
  103. if config.Conf.DB.MongoS.Addr == "" || config.Conf.DB.MongoS.Dbname == "" {
  104. log.Error("InitMgo", zap.String("MongoS", "地址或者数据库为空"))
  105. }
  106. log.Info("InitMgo", zap.Any("MgoS duration", time.Since(now).Seconds()))
  107. }
  108. func InitMysql() {
  109. //采购单位
  110. now := time.Now()
  111. Mysql = &mysqldb.Mysql{
  112. Address: config.Conf.DB.MysqlB.Addr,
  113. DBName: config.Conf.DB.MysqlB.Dbname,
  114. UserName: config.Conf.DB.MysqlB.Username,
  115. PassWord: config.Conf.DB.MysqlB.Password,
  116. }
  117. Mysql.Init()
  118. if config.Conf.DB.MysqlB.Addr == "" || config.Conf.DB.MysqlB.Dbname == "" {
  119. log.Error("InitMysql", zap.String("Mysql", "地址或者数据库为空"))
  120. }
  121. log.Info("InitMysql", zap.Any("MysqlB duration", time.Since(now).Seconds()))
  122. }
  123. func InitEs() {
  124. now := time.Now()
  125. Es = &elastic.Elastic{
  126. S_esurl: config.Conf.DB.Es.Addr,
  127. I_size: config.Conf.DB.Es.Size,
  128. Username: config.Conf.DB.Es.Username,
  129. Password: config.Conf.DB.Es.Password,
  130. }
  131. Es.InitElasticSize()
  132. log.Info("InitEs", zap.String("阿里云", config.Conf.DB.Es.Addr))
  133. if config.Conf.DB.Es.Addr == "" {
  134. log.Error("InitEs", zap.String("ES", "地址或者数据库为空"))
  135. }
  136. if config.Conf.DB.Es.IndexB == "" {
  137. log.Error("InitEs", zap.String("IndexB", "indexb bidding 索引为空,请检查"))
  138. } else {
  139. log.Debug("InitEs", zap.String("IndexB", config.Conf.DB.Es.IndexB))
  140. }
  141. if config.Conf.DB.Es.IndexP == "" {
  142. log.Error("InitEs", zap.String("IndexB", "projectset 项目索引为空,请检查"))
  143. } else {
  144. log.Debug("InitEs", zap.String("IndexP", config.Conf.DB.Es.IndexP))
  145. }
  146. //if config.Conf.DB.Es.IndexTmp == "" {
  147. // log.Error("InitEs", zap.String("IndexTmp 为空", "请检查是否需要配置;该配置主要生产环境需要"))
  148. //}
  149. if config.Conf.DB.Es.IndexWinner == "" {
  150. log.Error("InitEs", zap.String("IndexWinner", "中标单位 索引为空,请检查"))
  151. } else {
  152. log.Debug("InitEs", zap.String("IndexWinner", config.Conf.DB.Es.IndexWinner))
  153. }
  154. if config.Conf.DB.Es.IndexBuyer == "" {
  155. log.Error("InitEs", zap.String("IndexBuyer", "采购单位 索引为空,请检查"))
  156. } else {
  157. log.Debug("InitEs", zap.String("IndexBuyer", config.Conf.DB.Es.IndexBuyer))
  158. }
  159. ////采集爬虫 单服务器部署的es;目前已使用华为云
  160. //Es1 = &elastic.Elastic{
  161. // S_esurl: config.Conf.DB.Es.AddrP,
  162. // I_size: config.Conf.DB.Es.Size,
  163. // Username: config.Conf.DB.Es.Username,
  164. // Password: config.Conf.DB.Es.Password,
  165. //}
  166. //Es1.InitElasticSize()
  167. //华为云 部署的es
  168. if config.Conf.DB.Es.Addr2 != "" {
  169. Es2 = &elastic.Elastic{
  170. S_esurl: config.Conf.DB.Es.Addr2,
  171. I_size: config.Conf.DB.Es.Size,
  172. Username: config.Conf.DB.Es.Username2,
  173. Password: config.Conf.DB.Es.Password2,
  174. }
  175. Es2.InitElasticSize()
  176. log.Info("InitEs", zap.String("华为云Addr2", config.Conf.DB.Es.Addr2))
  177. }
  178. log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
  179. }
  180. func InitField() {
  181. now := time.Now()
  182. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "project"}`, nil, nil, false, -1, -1)
  183. if len(*info) > 0 {
  184. for _, m := range *info {
  185. if util.IntAll(m["level"]) == 1 {
  186. ProjectField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  187. } else if util.IntAll(m["level"]) == 2 {
  188. ProjectListF[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  189. }
  190. }
  191. }
  192. log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
  193. log.Info("InitField", zap.Any("duration", time.Since(now).Seconds()))
  194. }
  195. // InitPreProcessField 预处理阶段字段
  196. func InitPreProcessField() {
  197. now := time.Now()
  198. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "pre_process"}`, nil, nil, false, -1, -1)
  199. if len(*info) > 0 {
  200. for _, m := range *info {
  201. if util.IntAll(m["level"]) == 1 {
  202. PreProcessField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  203. }
  204. }
  205. }
  206. log.Info("InitPreProcessField", zap.Int("PreProcessField", len(ProjectField)))
  207. log.Info("InitPreProcessField", zap.Any("duration", time.Since(now).Seconds()))
  208. }
  209. // InitPreEsClient 实例化预处理 索引客户端
  210. func InitPreEsClient() {
  211. if len(config.Conf.Pre) > 0 {
  212. for k, v := range config.Conf.Pre {
  213. cli := &elastic.Elastic{
  214. S_esurl: v.Addr,
  215. I_size: 30,
  216. Username: v.Username,
  217. Password: v.Password,
  218. }
  219. cli.InitElasticSize()
  220. PreEs[k] = cli
  221. }
  222. }
  223. }
  224. // InitEsBiddingField 初始化 bidding 索引字段
  225. func InitEsBiddingField() {
  226. now := time.Now()
  227. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  228. if len(*info) > 0 {
  229. for _, m := range *info {
  230. if util.IntAll(m["level"]) == 1 {
  231. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  232. } else if util.IntAll(m["level"]) == 2 {
  233. pfield := util.ObjToString(m["pfield"])
  234. pfieldMap := BiddingLevelField[pfield]
  235. if pfieldMap == nil {
  236. pfieldMap = make(map[string]string, 0)
  237. }
  238. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  239. BiddingLevelField[pfield] = pfieldMap
  240. }
  241. }
  242. }
  243. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  244. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  245. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  246. }
  247. // verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
  248. func verifyESFields() {
  249. now := time.Now()
  250. log.Info("verifyESFields", zap.String("开始类型检测", ""))
  251. client, _ := es7.NewClient(
  252. es7.SetURL(config.Conf.DB.Es.Addr),
  253. es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
  254. es7.SetSniff(false),
  255. )
  256. index := config.Conf.DB.Es.IndexB //索引表 bidding
  257. // 获取 Elasticsearch 索引的 mapping 信息
  258. mapping, err := client.GetMapping().Index(index).Do(context.Background())
  259. if err != nil {
  260. log.Error("verifyESFields", zap.Any("getting Elasticsearch mapping:", err))
  261. }
  262. indexName, _ := GetIndexName(client, index)
  263. if indexName == "" || mapping == nil {
  264. log.Error("verifyESFields", zap.String("索引不存在,请检查索引", index))
  265. os.Exit(-1)
  266. return
  267. }
  268. if mapping[indexName].(map[string]interface{})["mappings"] == nil || mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"] == nil {
  269. log.Error("verifyESFields", zap.String("索引不存在或状态不对,请检查索引", index))
  270. os.Exit(-1)
  271. return
  272. }
  273. properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
  274. var errField = make([]string, 0)
  275. var okField = make([]string, 0)
  276. var analyzerMap = make(map[string]string) // 分词信息
  277. var esMap = make(map[string]string) //存储es 字段类型
  278. //
  279. for field, ftype := range BiddingField {
  280. eftypeMap, _ := properties[field].(map[string]interface{})
  281. var etype string
  282. var analyzer string
  283. if fftype, ok := eftypeMap["type"]; ok {
  284. etype = fftype.(string)
  285. esMap[field] = etype
  286. }
  287. if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
  288. analyzer = ffanalyzer.(string)
  289. analyzerMap[field] = analyzer
  290. }
  291. if ftype != "" {
  292. if chargeType(ftype, etype) {
  293. okField = append(okField, field)
  294. } else {
  295. errField = append(errField, field)
  296. }
  297. } else {
  298. if field == "_id" {
  299. continue
  300. } else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
  301. if eproperties, ok := eftypeMap["properties"]; ok {
  302. if eproMap, ok := eproperties.(map[string]interface{}); ok {
  303. for k, v := range eproMap {
  304. if innerMap, ok := v.(map[string]interface{}); ok {
  305. if innerType, ok := innerMap["type"]; ok {
  306. innerLevel := BiddingLevelField[field]
  307. esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
  308. if chargeType(innerLevel[k], innerType.(string)) {
  309. okField = append(okField, fmt.Sprintf("%s.%s", field, k))
  310. } else {
  311. errField = append(errField, fmt.Sprintf("%s.%s", field, k))
  312. }
  313. }
  314. }
  315. }
  316. }
  317. }
  318. }
  319. }
  320. }
  321. if len(errField) > 0 {
  322. log.Error("verifyESFields", zap.Int("错误字段数量", len(errField)))
  323. for _, field := range errField {
  324. if strings.Contains(field, ".") {
  325. fe := strings.Split(field, ".")
  326. log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
  327. } else {
  328. log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
  329. }
  330. }
  331. os.Exit(-1)
  332. } else {
  333. log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
  334. }
  335. log.Info("verifyESFields", zap.Any("duration", time.Since(now).Seconds()))
  336. }
  337. func GetIndexName(client *es7.Client, name string) (string, error) {
  338. // 判断 name 是否为一个别名
  339. res, err := client.Aliases().Alias(name).Do(context.Background())
  340. if err != nil {
  341. // 错误处理
  342. if err.(*es7.Error).Status != 404 && err.(*es7.Error).Details != nil {
  343. return "", err
  344. }
  345. }
  346. if res != nil {
  347. for k, v := range res.Indices {
  348. for _, vv := range v.Aliases {
  349. if vv.AliasName == name {
  350. return k, nil
  351. }
  352. }
  353. }
  354. }
  355. // 判断 name 是否为一个正式索引名称
  356. resa, err := client.IndexExists(name).Do(context.Background())
  357. if err != nil {
  358. // 错误处理
  359. return "", err
  360. }
  361. if resa {
  362. return name, nil
  363. }
  364. // 如果 name 既不是别名,也不是正式索引名称,则返回空字符串
  365. return "", nil
  366. }