es.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/base64"
  6. "errors"
  7. "fmt"
  8. "github.com/olivere/elastic/v7"
  9. "go.uber.org/zap"
  10. "net/http"
  11. "strconv"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  13. "time"
  14. )
  15. var mapping = ` "mappings": {
  16. "dynamic": false,
  17. "properties": {
  18. "dataweight": {
  19. "type": "long"
  20. },
  21. "projectcode": {
  22. "type": "keyword"
  23. },
  24. "object_type": {
  25. "type": "keyword"
  26. },
  27. "fromtable": {
  28. "type": "keyword"
  29. },
  30. "bidopentime": {
  31. "type": "long"
  32. },
  33. "bidamount": {
  34. "type": "double"
  35. },
  36. "winner": {
  37. "type": "keyword"
  38. },
  39. "buyer": {
  40. "type": "keyword",
  41. "fields": {
  42. "mbuyer": {
  43. "analyzer": "my_ngram_title",
  44. "type": "text"
  45. }
  46. }
  47. },
  48. "budget": {
  49. "type": "double"
  50. },
  51. "projectname": {
  52. "type": "keyword",
  53. "fields": {
  54. "pname": {
  55. "analyzer": "my_ngram_title",
  56. "type": "text"
  57. }
  58. }
  59. },
  60. "area": {
  61. "type": "keyword"
  62. },
  63. "city": {
  64. "type": "keyword"
  65. },
  66. "district": {
  67. "type": "keyword"
  68. },
  69. "s_winner": {
  70. "analyzer": "douhao",
  71. "type": "text",
  72. "fields": {
  73. "mwinner": {
  74. "analyzer": "my_ngram_title",
  75. "type": "text"
  76. }
  77. }
  78. },
  79. "pici": {
  80. "type": "long"
  81. },
  82. "id": {
  83. "type": "keyword"
  84. },
  85. "title": {
  86. "analyzer": "my_ngram_title",
  87. "type": "text",
  88. "fields": {
  89. "mtitle": {
  90. "type": "keyword"
  91. }
  92. }
  93. },
  94. "detail": {
  95. "analyzer": "my_ngram",
  96. "type": "text"
  97. },
  98. "site": {
  99. "type": "keyword"
  100. },
  101. "comeintime": {
  102. "type": "long"
  103. },
  104. "href": {
  105. "type": "keyword"
  106. },
  107. "infoformat": {
  108. "type": "long"
  109. },
  110. "publishtime": {
  111. "type": "long"
  112. },
  113. "toptype": {
  114. "type": "keyword"
  115. },
  116. "subtype": {
  117. "type": "keyword"
  118. },
  119. "createtime": {
  120. "type": "long"
  121. }
  122. }
  123. }`
  124. //CreateIndex 创建索引
  125. func CreateIndex(clients map[string]*elastic.Client, PreBiddingIndex string) error {
  126. //createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping)
  127. for k, client := range clients {
  128. url := GF.ES[k].URL
  129. username := GF.ES[k].Username
  130. password := GF.ES[k].Password
  131. exist, err := client.IndexExists(PreBiddingIndex).Do(context.Background())
  132. if exist {
  133. log.Info("CreateIndex", zap.String(PreBiddingIndex, "已经存在了"))
  134. continue
  135. }
  136. setting := fmt.Sprintf(` "settings": {
  137. "index": {
  138. "analysis": {
  139. "analyzer": {
  140. "my_ngram_title": {
  141. "filter": [
  142. "lowercase"
  143. ],
  144. "tokenizer": "my_ngram_title"
  145. },
  146. "douhao": {
  147. "type": "pattern",
  148. "pattern": ","
  149. },
  150. "my_ngram": {
  151. "filter": [
  152. "lowercase"
  153. ],
  154. "tokenizer": "my_ngram"
  155. }
  156. },
  157. "tokenizer": {
  158. "my_ngram_title": {
  159. "token_chars": [
  160. "letter",
  161. "digit",
  162. "punctuation",
  163. "symbol"
  164. ],
  165. "min_gram": "1",
  166. "type": "nGram",
  167. "max_gram": "1"
  168. },
  169. "my_ngram": {
  170. "token_chars": [
  171. "letter",
  172. "digit",
  173. "punctuation",
  174. "symbol"
  175. ],
  176. "min_gram": "2",
  177. "type": "nGram",
  178. "max_gram": "2"
  179. }
  180. }
  181. },
  182. "number_of_shards": "%s",
  183. "number_of_replicas": "0",
  184. "max_result_window": "20000"
  185. }
  186. }`, GF.ES[k].Shares)
  187. createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping)
  188. //1. 开启节点平衡
  189. // 设置临时的节点平衡设置
  190. balanceSettings := `{
  191. "transient" : {
  192. "cluster.routing.allocation.enable" : "all"
  193. }
  194. }`
  195. requestURL := fmt.Sprintf("%s/_cluster/settings", url)
  196. req, err := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(balanceSettings)))
  197. if err != nil {
  198. log.Error("开启节点平衡", zap.Error(err))
  199. }
  200. req.Header.Set("Content-Type", "application/json")
  201. // 添加身份验证头部
  202. auth := username + ":" + password
  203. basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
  204. req.Header.Set("Authorization", basicAuth)
  205. clientQ := &http.Client{}
  206. resp, err := clientQ.Do(req)
  207. if err != nil {
  208. log.Error("开启节点平衡", zap.Error(err))
  209. return err
  210. }
  211. defer resp.Body.Close()
  212. if resp.StatusCode != http.StatusOK {
  213. log.Fatal("设置节点平衡失败")
  214. return err
  215. }
  216. fmt.Println(url, "节点平衡已开启")
  217. //2. 创建索引
  218. createIndexR, err := client.CreateIndex(PreBiddingIndex).BodyString(createJson).Do(context.Background())
  219. if err != nil {
  220. log.Error(PreBiddingIndex, zap.Error(err))
  221. return err
  222. }
  223. if !createIndexR.Acknowledged {
  224. log.Error("CreateIndex", zap.String(PreBiddingIndex, "创建索引失败"))
  225. return err
  226. }
  227. //3. 关闭节点平衡
  228. //设置临时的节点平衡设置
  229. disableSettings := `{
  230. "transient" : {
  231. "cluster.routing.allocation.enable" : "none"
  232. }
  233. }`
  234. req2, err2 := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(disableSettings)))
  235. if err2 != nil {
  236. log.Error("开启节点平衡", zap.Error(err))
  237. }
  238. req2.Header.Set("Content-Type", "application/json")
  239. // 添加身份验证头部
  240. req2.Header.Set("Authorization", basicAuth)
  241. //clientQ := &http.Client{}
  242. resp2, err2 := clientQ.Do(req)
  243. if err2 != nil {
  244. log.Error("关闭节点平衡", zap.Error(err))
  245. }
  246. defer resp2.Body.Close()
  247. fmt.Println(url, "节点平衡已关闭")
  248. //4. 新索引 添加别名
  249. for _, alias := range GF.Env.Alias {
  250. _, err = client.Alias().Add(PreBiddingIndex, alias).Do(context.Background())
  251. if err != nil {
  252. log.Error("添加别名失败:", zap.Error(err))
  253. }
  254. }
  255. log.Info("CreateIndex", zap.String(url, "索引别名 添加完毕"))
  256. }
  257. return nil
  258. }
  259. //deleteIndex 删除索引
  260. func deleteIndex(clients map[string]*elastic.Client, index string) error {
  261. if len(clients) == 0 {
  262. return errors.New("没有 配置 ES集群信息")
  263. }
  264. for k, client := range clients {
  265. exist, err := client.IndexExists(index).Do(context.Background())
  266. if !exist {
  267. log.Info("deleteIndex", zap.String(k, index+" 索引文件不存在"))
  268. continue
  269. }
  270. if err != nil {
  271. return err
  272. }
  273. num, err := client.Count(index).Do(context.Background())
  274. if err != nil {
  275. return err
  276. }
  277. if num > 0 {
  278. log.Info("deleteIndex", zap.String(k, "索引"+index+"还存在有效数据"))
  279. }
  280. _, err = client.DeleteIndex(index).Do(context.Background())
  281. if err != nil {
  282. return err
  283. }
  284. }
  285. return nil
  286. }
  287. //dealIndexByHour 处理预处理索引,根据小时;
  288. func dealIndexByHour() {
  289. now := time.Now()
  290. PreBiddingIndex := ""
  291. var clients = make(map[string]*elastic.Client, 0)
  292. for k, v := range GF.ES {
  293. url := v.URL
  294. username := v.Username
  295. password := v.Password
  296. // 创建 Elasticsearch 客户端
  297. client, err := elastic.NewClient(
  298. elastic.SetURL(url),
  299. elastic.SetBasicAuth(username, password),
  300. elastic.SetSniff(false),
  301. )
  302. if err != nil {
  303. log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
  304. }
  305. clients[k] = client
  306. }
  307. next := now.Add(time.Hour)
  308. month := int(next.Month())
  309. monthStr := strconv.Itoa(month)
  310. year := next.Year()
  311. yearStr := strconv.Itoa(year)
  312. dayStr := strconv.Itoa(next.Day())
  313. hour := next.Hour()
  314. hourStr := strconv.Itoa(hour)
  315. //下一天的索引名称
  316. PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr + hourStr
  317. err := CreateIndex(clients, PreBiddingIndex)
  318. if err != nil {
  319. log.Info("dealIndexByHour", zap.Error(err))
  320. SendMail("预处理索引", "预处理索引创建失败,请检查")
  321. }
  322. log.Info("dealIndexByHour", zap.String(PreBiddingIndex, "创建成功"))
  323. //3. 删除昨天的索引
  324. last := now.Add(-time.Hour)
  325. month2 := int(last.Month())
  326. monthStr2 := strconv.Itoa(month2)
  327. year2 := last.Year()
  328. yearStr2 := strconv.Itoa(year2)
  329. dayStr2 := strconv.Itoa(last.Day())
  330. hour2 := last.Hour()
  331. hourStr2 := strconv.Itoa(hour2)
  332. //索引名称
  333. lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2 + hourStr2
  334. err = deleteIndex(clients, lastIndex)
  335. if err != nil {
  336. log.Info("dealIndexByHour", zap.Error(err))
  337. }
  338. //4. 删除bidding_extract 过期数据
  339. where := map[string]interface{}{
  340. "comeintime": map[string]interface{}{
  341. "$lte": last.Unix(),
  342. },
  343. "is_pre": 1,
  344. }
  345. MgoB.Delete("bidding_extract", where)
  346. }
  347. //dealIndexByDay 处理预处理索引,根据天;
  348. func dealIndexByDay() {
  349. now := time.Now()
  350. PreBiddingIndex := ""
  351. //hour := now.Hour()
  352. // 判断当前时间是否时最后一个小时
  353. //if hour == 23 {
  354. //当天最后一小时
  355. var clients = make(map[string]*elastic.Client, 0)
  356. for k, v := range GF.ES {
  357. url := v.URL
  358. username := v.Username
  359. password := v.Password
  360. // 创建 Elasticsearch 客户端
  361. client, err := elastic.NewClient(
  362. elastic.SetURL(url),
  363. elastic.SetBasicAuth(username, password),
  364. elastic.SetSniff(false),
  365. )
  366. if err != nil {
  367. log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
  368. }
  369. clients[k] = client
  370. }
  371. next := now.AddDate(0, 0, 1)
  372. month := int(next.Month())
  373. monthStr := strconv.Itoa(month)
  374. year := next.Year()
  375. yearStr := strconv.Itoa(year)
  376. dayStr := strconv.Itoa(next.Day())
  377. //下一天的索引名称
  378. PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr
  379. err := CreateIndex(clients, PreBiddingIndex)
  380. if err != nil {
  381. log.Info("dealIndexByDay", zap.Error(err))
  382. SendMail("预处理索引", "预处理索引创建失败,请检查")
  383. }
  384. log.Info("dealIndexByDay", zap.String(PreBiddingIndex, "创建成功"))
  385. //3. 删除昨天的索引
  386. last := now.AddDate(0, 0, -1)
  387. month2 := int(last.Month())
  388. monthStr2 := strconv.Itoa(month2)
  389. year2 := last.Year()
  390. yearStr2 := strconv.Itoa(year2)
  391. dayStr2 := strconv.Itoa(last.Day())
  392. //索引名称
  393. lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2
  394. err = deleteIndex(clients, lastIndex)
  395. if err != nil {
  396. log.Info("dealIndexByDay", zap.Error(err))
  397. }
  398. //4. 删除bidding_extract 过期数据
  399. where := map[string]interface{}{
  400. "comeintime": map[string]interface{}{
  401. "$lte": last.Unix(),
  402. },
  403. "is_pre": 1,
  404. }
  405. MgoB.Delete("bidding_extract", where)
  406. }
  407. //dealIndexByMonth 处理预处理索引,根据月份;提前一天创建好 下个月的索引
  408. func dealIndexByMonth() {
  409. now := time.Now()
  410. PreBiddingIndex := ""
  411. // 获取当前月份的最后一天
  412. lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC)
  413. // 判断当前时间是否为当前月份的最后一天
  414. if now.Day() == lastDayOfMonth.Day() {
  415. //当月最后一天,需要提前创建好索引
  416. fmt.Println("当前时间是当前月份的最后一天")
  417. var clients = make(map[string]*elastic.Client, 0)
  418. for k, v := range GF.ES {
  419. url := v.URL
  420. username := v.Username
  421. password := v.Password
  422. // 创建 Elasticsearch 客户端
  423. client, err := elastic.NewClient(
  424. elastic.SetURL(url),
  425. elastic.SetBasicAuth(username, password),
  426. elastic.SetSniff(false),
  427. )
  428. if err != nil {
  429. log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
  430. }
  431. clients[k] = client
  432. }
  433. next := now.AddDate(0, 0, 1)
  434. month := int(next.Month())
  435. monthStr := strconv.Itoa(month)
  436. year := next.Year()
  437. yearStr := strconv.Itoa(year)
  438. //下一个月的索引名称
  439. PreBiddingIndex = "bidding_" + yearStr + monthStr
  440. //2 创建下个月索引结构
  441. err := CreateIndex(clients, PreBiddingIndex)
  442. if err != nil {
  443. log.Info("dealIndexByMonth", zap.Error(err))
  444. SendMail("预处理索引", "预处理索引创建失败,请检查")
  445. }
  446. log.Info("dealIndexByMonth", zap.String(PreBiddingIndex, "创建成功"))
  447. //3. 删除上个月的索引
  448. last := now.AddDate(0, -1, 1)
  449. month2 := int(last.Month())
  450. monthStr2 := strconv.Itoa(month2)
  451. year2 := last.Year()
  452. yearStr2 := strconv.Itoa(year2)
  453. //上个月的索引名称
  454. lastIndex := "bidding_" + yearStr2 + monthStr2
  455. err = deleteIndex(clients, lastIndex)
  456. if err != nil {
  457. log.Info("dealIndexByMonth", zap.Error(err))
  458. }
  459. //4. 删除bidding_extract 过期数据
  460. where := map[string]interface{}{
  461. "comeintime": map[string]interface{}{
  462. "$lte": last.Unix(),
  463. },
  464. "is_pre": 1,
  465. }
  466. MgoB.Delete("bidding_extract", where)
  467. }
  468. }
  469. //SendMail 发送邮件
  470. func SendMail(title, content string) {
  471. url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", GF.Email.Api, GF.Email.To, title, content)
  472. fmt.Println("url=>", url)
  473. res, err := http.Get(url)
  474. if err != nil {
  475. log.Info("SendMail", zap.Any("err", err))
  476. } else {
  477. log.Info("SendMail", zap.Any("res", res))
  478. }
  479. }