init.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package main
  2. import (
  3. "context"
  4. "esindex/config"
  5. "flag"
  6. "fmt"
  7. "github.com/RoaringBitmap/roaring"
  8. es7 "github.com/olivere/elastic/v7"
  9. "go.uber.org/zap"
  10. "io/ioutil"
  11. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
  16. "os"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. ProjectField = make(map[string]string, 500) //项目字段
  23. ProjectListF = make(map[string]string, 200)
  24. BiddingField = make(map[string]string, 200) //bidding_processing_field, level=1 最外层字段,
  25. BiddingLevelField = make(map[string]map[string]string) //level=2 的第二层字段
  26. PreProcessField = make(map[string]string, 500) //预处理流程 bidding字段
  27. dbfile = flag.String("dbfile", "./db", "数据库文件")
  28. cache = roaring.NewBitmap()
  29. cacheModify = false //控制10秒 定时写入文件
  30. mutex sync.Mutex // 互斥锁,用于保护 cache 的并发写入操作
  31. )
  32. // InitLog @Description
  33. // @Author J 2022/7/26 15:30
  34. func InitLog() {
  35. now := time.Now()
  36. logcfg := config.Conf.Log
  37. err := log.InitLog(
  38. log.Path(logcfg.LogPath),
  39. log.Level(logcfg.LogLevel),
  40. log.Compress(logcfg.Compress),
  41. log.MaxSize(logcfg.MaxSize),
  42. log.MaxBackups(logcfg.MaxBackups),
  43. log.MaxAge(logcfg.MaxAge),
  44. log.Format(logcfg.Format),
  45. )
  46. if err != nil {
  47. fmt.Printf("InitLog failed: %v\n", err)
  48. os.Exit(1)
  49. }
  50. log.Info("InitLog", zap.Any("duration", time.Since(now).Seconds()))
  51. }
  52. func InitMgo() {
  53. now := time.Now()
  54. MgoB = &mongodb.MongodbSim{
  55. MongodbAddr: config.Conf.DB.MongoB.Addr,
  56. DbName: config.Conf.DB.MongoB.Dbname,
  57. Size: config.Conf.DB.MongoB.Size,
  58. UserName: config.Conf.DB.MongoB.User,
  59. Password: config.Conf.DB.MongoB.Password,
  60. Direct: config.Conf.DB.MongoB.Direct,
  61. }
  62. MgoB.InitPool()
  63. if config.Conf.DB.MongoB.Addr == "" || config.Conf.DB.MongoB.Dbname == "" {
  64. log.Error("InitMgo", zap.String("MgoB", "地址或者数据库为空"))
  65. }
  66. if config.Conf.DB.MongoB.Coll == "" {
  67. log.Error("InitMgo", zap.String("MgoB", "查询表为空"))
  68. }
  69. log.Info("InitMgo", zap.Any("MgoB duration", time.Since(now).Seconds()))
  70. //项目信息
  71. MgoP = &mongodb.MongodbSim{
  72. MongodbAddr: config.Conf.DB.MongoP.Addr,
  73. DbName: config.Conf.DB.MongoP.Dbname,
  74. Size: config.Conf.DB.MongoP.Size,
  75. UserName: config.Conf.DB.MongoP.User,
  76. Password: config.Conf.DB.MongoP.Password,
  77. Direct: config.Conf.DB.MongoP.Direct,
  78. }
  79. MgoP.InitPool()
  80. if config.Conf.DB.MongoP.Addr == "" || config.Conf.DB.MongoP.Dbname == "" {
  81. log.Error("InitMgo", zap.String("MongoP", "地址或者数据库为空"))
  82. }
  83. if config.Conf.DB.MongoP.Coll == "" {
  84. log.Error("InitMgo", zap.String("MongoP", "查询表为空"))
  85. }
  86. log.Info("InitMgo", zap.Any("MgoP duration", time.Since(now).Seconds()))
  87. //中标单位定时同步
  88. MgoQ = &mongodb.MongodbSim{
  89. MongodbAddr: config.Conf.DB.MongoQ.Addr,
  90. DbName: config.Conf.DB.MongoQ.Dbname,
  91. Size: config.Conf.DB.MongoQ.Size,
  92. UserName: config.Conf.DB.MongoQ.User,
  93. Password: config.Conf.DB.MongoQ.Password,
  94. Direct: config.Conf.DB.MongoQ.Direct,
  95. }
  96. MgoQ.InitPool()
  97. if config.Conf.DB.MongoQ.Addr == "" || config.Conf.DB.MongoQ.Dbname == "" {
  98. log.Error("InitMgo", zap.String("MongoQ", "地址或者数据库为空"))
  99. }
  100. log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds()))
  101. //181 特殊企业,采购单位验证
  102. MgoS = &mongodb.MongodbSim{
  103. MongodbAddr: config.Conf.DB.MongoS.Addr,
  104. DbName: config.Conf.DB.MongoS.Dbname,
  105. Size: config.Conf.DB.MongoS.Size,
  106. UserName: config.Conf.DB.MongoS.User,
  107. Password: config.Conf.DB.MongoS.Password,
  108. Direct: config.Conf.DB.MongoS.Direct,
  109. }
  110. MgoS.InitPool()
  111. if config.Conf.DB.MongoS.Addr == "" || config.Conf.DB.MongoS.Dbname == "" {
  112. log.Error("InitMgo", zap.String("MongoS", "地址或者数据库为空"))
  113. }
  114. log.Info("InitMgo", zap.Any("MgoS duration", time.Since(now).Seconds()))
  115. }
  116. func InitMysql() {
  117. //采购单位
  118. now := time.Now()
  119. Mysql = &mysqldb.Mysql{
  120. Address: config.Conf.DB.MysqlB.Addr,
  121. DBName: config.Conf.DB.MysqlB.Dbname,
  122. UserName: config.Conf.DB.MysqlB.Username,
  123. PassWord: config.Conf.DB.MysqlB.Password,
  124. }
  125. Mysql.Init()
  126. if config.Conf.DB.MysqlB.Addr == "" || config.Conf.DB.MysqlB.Dbname == "" {
  127. log.Error("InitMysql", zap.String("Mysql", "地址或者数据库为空"))
  128. }
  129. log.Info("InitMysql", zap.Any("MysqlB duration", time.Since(now).Seconds()))
  130. }
  131. func InitEs() {
  132. now := time.Now()
  133. Es = &elastic.Elastic{
  134. S_esurl: config.Conf.DB.Es.Addr,
  135. I_size: config.Conf.DB.Es.Size,
  136. Username: config.Conf.DB.Es.Username,
  137. Password: config.Conf.DB.Es.Password,
  138. }
  139. Es.InitElasticSize()
  140. log.Info("InitEs", zap.String("阿里云", config.Conf.DB.Es.Addr))
  141. if config.Conf.DB.Es.Addr == "" {
  142. log.Error("InitEs", zap.String("ES", "地址或者数据库为空"))
  143. }
  144. if config.Conf.DB.Es.IndexB == "" {
  145. log.Error("InitEs", zap.String("IndexB", "indexb bidding 索引为空,请检查"))
  146. } else {
  147. log.Debug("InitEs", zap.String("IndexB", config.Conf.DB.Es.IndexB))
  148. }
  149. if config.Conf.DB.Es.IndexP == "" {
  150. log.Error("InitEs", zap.String("IndexB", "projectset 项目索引为空,请检查"))
  151. } else {
  152. log.Debug("InitEs", zap.String("IndexP", config.Conf.DB.Es.IndexP))
  153. }
  154. //if config.Conf.DB.Es.IndexTmp == "" {
  155. // log.Error("InitEs", zap.String("IndexTmp 为空", "请检查是否需要配置;该配置主要生产环境需要"))
  156. //}
  157. if config.Conf.DB.Es.IndexWinner == "" {
  158. log.Error("InitEs", zap.String("IndexWinner", "中标单位 索引为空,请检查"))
  159. } else {
  160. log.Debug("InitEs", zap.String("IndexWinner", config.Conf.DB.Es.IndexWinner))
  161. }
  162. if config.Conf.DB.Es.IndexBuyer == "" {
  163. log.Error("InitEs", zap.String("IndexBuyer", "采购单位 索引为空,请检查"))
  164. } else {
  165. log.Debug("InitEs", zap.String("IndexBuyer", config.Conf.DB.Es.IndexBuyer))
  166. }
  167. ////采集爬虫 单服务器部署的es;目前已使用华为云
  168. //Es1 = &elastic.Elastic{
  169. // S_esurl: config.Conf.DB.Es.AddrP,
  170. // I_size: config.Conf.DB.Es.Size,
  171. // Username: config.Conf.DB.Es.Username,
  172. // Password: config.Conf.DB.Es.Password,
  173. //}
  174. //Es1.InitElasticSize()
  175. //华为云 部署的es
  176. if config.Conf.DB.Es.Addr2 != "" {
  177. Es2 = &elastic.Elastic{
  178. S_esurl: config.Conf.DB.Es.Addr2,
  179. I_size: config.Conf.DB.Es.Size,
  180. Username: config.Conf.DB.Es.Username2,
  181. Password: config.Conf.DB.Es.Password2,
  182. }
  183. Es2.InitElasticSize()
  184. log.Info("InitEs", zap.String("华为云Addr2", config.Conf.DB.Es.Addr2))
  185. }
  186. // 华为云新集群,迁移原来阿里云集群数据,标讯、项目、凭安
  187. if config.Conf.DB.Es.Addr3 != "" {
  188. Es3 = &elastic.Elastic{
  189. S_esurl: config.Conf.DB.Es.Addr3,
  190. I_size: config.Conf.DB.Es.Size,
  191. Username: config.Conf.DB.Es.Username3,
  192. Password: config.Conf.DB.Es.Password3,
  193. }
  194. Es3.InitElasticSize()
  195. log.Info("InitEs", zap.String("华为云Addr3", config.Conf.DB.Es.Addr3))
  196. }
  197. log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
  198. }
  199. func InitField() {
  200. now := time.Now()
  201. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "project"}`, nil, nil, false, -1, -1)
  202. if len(*info) > 0 {
  203. for _, m := range *info {
  204. if util.IntAll(m["level"]) == 1 {
  205. ProjectField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  206. } else if util.IntAll(m["level"]) == 2 {
  207. ProjectListF[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  208. }
  209. }
  210. }
  211. log.Info("InitField", zap.Int("ProjectField", len(ProjectField)), zap.Int("ProjectListF", len(ProjectListF)))
  212. log.Info("InitField", zap.Any("duration", time.Since(now).Seconds()))
  213. }
  214. // InitPreProcessField 预处理阶段字段
  215. func InitPreProcessField() {
  216. now := time.Now()
  217. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "pre_process"}`, nil, nil, false, -1, -1)
  218. if len(*info) > 0 {
  219. for _, m := range *info {
  220. if util.IntAll(m["level"]) == 1 {
  221. PreProcessField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  222. }
  223. }
  224. }
  225. log.Info("InitPreProcessField", zap.Int("PreProcessField", len(ProjectField)))
  226. log.Info("InitPreProcessField", zap.Any("duration", time.Since(now).Seconds()))
  227. }
  228. // InitPreEsClient 实例化预处理 索引客户端
  229. //func InitPreEsClient() {
  230. // if len(config.Conf.Pre) > 0 {
  231. // for k, v := range config.Conf.Pre {
  232. // cli := &elastic.Elastic{
  233. // S_esurl: v.Addr,
  234. // I_size: 30,
  235. // Username: v.Username,
  236. // Password: v.Password,
  237. // }
  238. // cli.InitElasticSize()
  239. // PreEs[k] = cli
  240. // }
  241. // }
  242. //}
  243. // InitEsBiddingField 初始化 bidding 索引字段
  244. func InitEsBiddingField() {
  245. now := time.Now()
  246. info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)
  247. if len(*info) > 0 {
  248. for _, m := range *info {
  249. if util.IntAll(m["level"]) == 1 {
  250. BiddingField[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  251. } else if util.IntAll(m["level"]) == 2 {
  252. pfield := util.ObjToString(m["pfield"])
  253. pfieldMap := BiddingLevelField[pfield]
  254. if pfieldMap == nil {
  255. pfieldMap = make(map[string]string, 0)
  256. }
  257. pfieldMap[util.ObjToString(m["field"])] = util.ObjToString(m["ftype"])
  258. BiddingLevelField[pfield] = pfieldMap
  259. }
  260. }
  261. }
  262. log.Info("InitEsBiddingField", zap.Int("BiddingField es 一级字段数量", len(BiddingField)))
  263. log.Info("InitEsBiddingField", zap.Int("BiddingLevelField es 二级字段数量", len(BiddingLevelField)))
  264. log.Info("InitEsBiddingField", zap.Any("duration", time.Since(now).Seconds()))
  265. }
  266. // verifyESFields 验证es 定义字段类型和 MongoDB 数据字段
  267. func verifyESFields() {
  268. now := time.Now()
  269. log.Info("verifyESFields", zap.String("开始类型检测", ""))
  270. client, _ := es7.NewClient(
  271. es7.SetURL(config.Conf.DB.Es.Addr),
  272. es7.SetBasicAuth(config.Conf.DB.Es.Username, config.Conf.DB.Es.Password),
  273. es7.SetSniff(false),
  274. )
  275. index := config.Conf.DB.Es.IndexB //索引表 bidding
  276. // 获取 Elasticsearch 索引的 mapping 信息
  277. mapping, err := client.GetMapping().Index(index).Do(context.Background())
  278. if err != nil {
  279. log.Error("verifyESFields", zap.Any("getting Elasticsearch mapping:", err))
  280. }
  281. indexName, _ := GetIndexName(client, index)
  282. if indexName == "" || mapping == nil {
  283. log.Error("verifyESFields", zap.String("索引不存在,请检查索引", index))
  284. os.Exit(-1)
  285. return
  286. }
  287. if mapping[indexName].(map[string]interface{})["mappings"] == nil || mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"] == nil {
  288. log.Error("verifyESFields", zap.String("索引不存在或状态不对,请检查索引", index))
  289. os.Exit(-1)
  290. return
  291. }
  292. properties := mapping[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})
  293. var errField = make([]string, 0)
  294. var okField = make([]string, 0)
  295. var analyzerMap = make(map[string]string) // 分词信息
  296. var esMap = make(map[string]string) //存储es 字段类型
  297. //
  298. for field, ftype := range BiddingField {
  299. eftypeMap, _ := properties[field].(map[string]interface{})
  300. var etype string
  301. var analyzer string
  302. if fftype, ok := eftypeMap["type"]; ok {
  303. etype = fftype.(string)
  304. esMap[field] = etype
  305. }
  306. if ffanalyzer, ok := eftypeMap["analyzer"]; ok {
  307. analyzer = ffanalyzer.(string)
  308. analyzerMap[field] = analyzer
  309. }
  310. if ftype != "" {
  311. if chargeType(ftype, etype) {
  312. okField = append(okField, field)
  313. } else {
  314. errField = append(errField, field)
  315. }
  316. } else {
  317. if field == "_id" {
  318. continue
  319. } else if field == "purchasinglist" || field == "package" || field == "winnerorder" || field == "procurementlist" {
  320. if eproperties, ok := eftypeMap["properties"]; ok {
  321. if eproMap, ok := eproperties.(map[string]interface{}); ok {
  322. for k, v := range eproMap {
  323. if innerMap, ok := v.(map[string]interface{}); ok {
  324. if innerType, ok := innerMap["type"]; ok {
  325. innerLevel := BiddingLevelField[field]
  326. esMap[fmt.Sprintf("%s.%s", field, k)] = innerType.(string)
  327. if chargeType(innerLevel[k], innerType.(string)) {
  328. okField = append(okField, fmt.Sprintf("%s.%s", field, k))
  329. } else {
  330. errField = append(errField, fmt.Sprintf("%s.%s", field, k))
  331. }
  332. }
  333. }
  334. }
  335. }
  336. }
  337. }
  338. }
  339. }
  340. if len(errField) > 0 {
  341. log.Error("verifyESFields", zap.Int("错误字段数量", len(errField)))
  342. for _, field := range errField {
  343. if strings.Contains(field, ".") {
  344. fe := strings.Split(field, ".")
  345. log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingLevelField[fe[0]][fe[1]]), esMap[field]))
  346. } else {
  347. log.Error(fmt.Sprintf("%s 字段类型错误", field), zap.String(fmt.Sprintf("数据库类型为:%s,但是es字段类型是:", BiddingField[field]), esMap[field]))
  348. }
  349. }
  350. os.Exit(-1)
  351. } else {
  352. log.Info("es 字段类型检测结束,", zap.Int("所有字段都符合,检测字段数量为:", len(okField)))
  353. }
  354. log.Info("verifyESFields", zap.Any("duration", time.Since(now).Seconds()))
  355. }
  356. func GetIndexName(client *es7.Client, name string) (string, error) {
  357. // 判断 name 是否为一个别名
  358. res, err := client.Aliases().Alias(name).Do(context.Background())
  359. if err != nil {
  360. // 错误处理
  361. if err.(*es7.Error).Status != 404 && err.(*es7.Error).Details != nil {
  362. return "", err
  363. }
  364. }
  365. if res != nil {
  366. for k, v := range res.Indices {
  367. for _, vv := range v.Aliases {
  368. if vv.AliasName == name {
  369. return k, nil
  370. }
  371. }
  372. }
  373. }
  374. // 判断 name 是否为一个正式索引名称
  375. resa, err := client.IndexExists(name).Do(context.Background())
  376. if err != nil {
  377. // 错误处理
  378. return "", err
  379. }
  380. if resa {
  381. return name, nil
  382. }
  383. // 如果 name 既不是别名,也不是正式索引名称,则返回空字符串
  384. return "", nil
  385. }
  386. // InitBitmap 初始化项目名称副标题 bitmap
  387. func InitBitmap() {
  388. if config.Conf.Env.Dbfile != "" {
  389. dbfile = &config.Conf.Env.Dbfile
  390. }
  391. _, err := os.Stat(*dbfile)
  392. log.Info("InitBitmap", zap.String("dbfile", *dbfile))
  393. if !os.IsNotExist(err) {
  394. bs, err := ioutil.ReadFile(*dbfile)
  395. if err != nil {
  396. log.Info("InitBitmap", zap.Error(err))
  397. }
  398. if len(bs) > 0 {
  399. _, err := cache.FromBuffer(bs)
  400. if err != nil {
  401. log.Info("InitBitmap", zap.Any("cache.FromBuffer", err))
  402. }
  403. }
  404. }
  405. log.Info("InitBitmap", zap.Any("cache.FromBuffer", "success"))
  406. //监听,写入文件保存
  407. go func() {
  408. for {
  409. time.Sleep(10 * time.Second)
  410. if cacheModify {
  411. saveDb()
  412. cacheModify = false
  413. }
  414. }
  415. }()
  416. }