es.go 14 KB


  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/base64"
  6. "errors"
  7. "fmt"
  8. "github.com/olivere/elastic/v7"
  9. "go.uber.org/zap"
  10. "net/http"
  11. "strconv"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  13. "time"
  14. )
  15. var mapping = ` "mappings": {
  16. "dynamic": false,
  17. "properties": {
  18. "dataweight": {
  19. "type": "long"
  20. },
  21. "projectcode": {
  22. "type": "keyword"
  23. },
  24. "object_type": {
  25. "type": "keyword"
  26. },
  27. "bidopentime": {
  28. "type": "long"
  29. },
  30. "bidamount": {
  31. "type": "double"
  32. },
  33. "winner": {
  34. "type": "keyword"
  35. },
  36. "buyer": {
  37. "type": "keyword",
  38. "fields": {
  39. "mbuyer": {
  40. "analyzer": "my_ngram_title",
  41. "type": "text"
  42. }
  43. }
  44. },
  45. "budget": {
  46. "type": "double"
  47. },
  48. "projectname": {
  49. "type": "keyword",
  50. "fields": {
  51. "pname": {
  52. "analyzer": "my_ngram_title",
  53. "type": "text"
  54. }
  55. }
  56. },
  57. "area": {
  58. "type": "keyword"
  59. },
  60. "city": {
  61. "type": "keyword"
  62. },
  63. "district": {
  64. "type": "keyword"
  65. },
  66. "s_winner": {
  67. "analyzer": "douhao",
  68. "type": "text",
  69. "fields": {
  70. "mwinner": {
  71. "analyzer": "my_ngram_title",
  72. "type": "text"
  73. }
  74. }
  75. },
  76. "pici": {
  77. "type": "long"
  78. },
  79. "id": {
  80. "type": "keyword"
  81. },
  82. "title": {
  83. "analyzer": "my_ngram_title",
  84. "type": "text",
  85. "fields": {
  86. "mtitle": {
  87. "type": "keyword"
  88. }
  89. }
  90. },
  91. "detail": {
  92. "analyzer": "my_ngram",
  93. "type": "text"
  94. },
  95. "site": {
  96. "type": "keyword"
  97. },
  98. "comeintime": {
  99. "type": "long"
  100. },
  101. "href": {
  102. "type": "keyword"
  103. },
  104. "infoformat": {
  105. "type": "long"
  106. },
  107. "publishtime": {
  108. "type": "long"
  109. },
  110. "toptype": {
  111. "type": "keyword"
  112. },
  113. "subtype": {
  114. "type": "keyword"
  115. },
  116. "createtime": {
  117. "type": "long"
  118. }
  119. }
  120. }`
  121. //CreateIndex 创建索引
  122. func CreateIndex(clients map[string]*elastic.Client, PreBiddingIndex string) error {
  123. //createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping)
  124. for k, client := range clients {
  125. url := GF.ES[k].URL
  126. username := GF.ES[k].Username
  127. password := GF.ES[k].Password
  128. exist, err := client.IndexExists(PreBiddingIndex).Do(context.Background())
  129. if exist {
  130. log.Info("CreateIndex", zap.String(PreBiddingIndex, "已经存在了"))
  131. continue
  132. }
  133. setting := fmt.Sprintf(` "settings": {
  134. "index": {
  135. "analysis": {
  136. "analyzer": {
  137. "my_ngram_title": {
  138. "filter": [
  139. "lowercase"
  140. ],
  141. "tokenizer": "my_ngram_title"
  142. },
  143. "douhao": {
  144. "type": "pattern",
  145. "pattern": ","
  146. },
  147. "my_ngram": {
  148. "filter": [
  149. "lowercase"
  150. ],
  151. "tokenizer": "my_ngram"
  152. }
  153. },
  154. "tokenizer": {
  155. "my_ngram_title": {
  156. "token_chars": [
  157. "letter",
  158. "digit",
  159. "punctuation",
  160. "symbol"
  161. ],
  162. "min_gram": "1",
  163. "type": "nGram",
  164. "max_gram": "1"
  165. },
  166. "my_ngram": {
  167. "token_chars": [
  168. "letter",
  169. "digit",
  170. "punctuation",
  171. "symbol"
  172. ],
  173. "min_gram": "2",
  174. "type": "nGram",
  175. "max_gram": "2"
  176. }
  177. }
  178. },
  179. "number_of_shards": "%s",
  180. "number_of_replicas": "0",
  181. "max_result_window": "20000"
  182. }
  183. }`, GF.ES[k].Shares)
  184. createJson := fmt.Sprintf(`{%s,%s}`, setting, mapping)
  185. //1. 开启节点平衡
  186. // 设置临时的节点平衡设置
  187. balanceSettings := `{
  188. "transient" : {
  189. "cluster.routing.allocation.enable" : "all"
  190. }
  191. }`
  192. requestURL := fmt.Sprintf("%s/_cluster/settings", url)
  193. req, err := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(balanceSettings)))
  194. if err != nil {
  195. log.Error("开启节点平衡", zap.Error(err))
  196. }
  197. req.Header.Set("Content-Type", "application/json")
  198. // 添加身份验证头部
  199. auth := username + ":" + password
  200. basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
  201. req.Header.Set("Authorization", basicAuth)
  202. clientQ := &http.Client{}
  203. resp, err := clientQ.Do(req)
  204. if err != nil {
  205. log.Error("开启节点平衡", zap.Error(err))
  206. return err
  207. }
  208. defer resp.Body.Close()
  209. if resp.StatusCode != http.StatusOK {
  210. log.Fatal("设置节点平衡失败")
  211. return err
  212. }
  213. fmt.Println(url, "节点平衡已开启")
  214. //2. 创建索引
  215. createIndexR, err := client.CreateIndex(PreBiddingIndex).BodyString(createJson).Do(context.Background())
  216. if err != nil {
  217. log.Error(PreBiddingIndex, zap.Error(err))
  218. return err
  219. }
  220. if !createIndexR.Acknowledged {
  221. log.Error("CreateIndex", zap.String(PreBiddingIndex, "创建索引失败"))
  222. return err
  223. }
  224. //3. 关闭节点平衡
  225. //设置临时的节点平衡设置
  226. disableSettings := `{
  227. "transient" : {
  228. "cluster.routing.allocation.enable" : "none"
  229. }
  230. }`
  231. req2, err2 := http.NewRequest("PUT", requestURL, bytes.NewBuffer([]byte(disableSettings)))
  232. if err2 != nil {
  233. log.Error("开启节点平衡", zap.Error(err))
  234. }
  235. req2.Header.Set("Content-Type", "application/json")
  236. // 添加身份验证头部
  237. req2.Header.Set("Authorization", basicAuth)
  238. //clientQ := &http.Client{}
  239. resp2, err2 := clientQ.Do(req)
  240. if err2 != nil {
  241. log.Error("关闭节点平衡", zap.Error(err))
  242. }
  243. defer resp2.Body.Close()
  244. fmt.Println(url, "节点平衡已关闭")
  245. //4. 新索引 添加别名
  246. for _, alias := range GF.Env.Alias {
  247. _, err = client.Alias().Add(PreBiddingIndex, alias).Do(context.Background())
  248. if err != nil {
  249. log.Error("添加别名失败:", zap.Error(err))
  250. }
  251. }
  252. log.Info("CreateIndex", zap.String(url, "索引别名 添加完毕"))
  253. }
  254. return nil
  255. }
  256. //deleteIndex 删除索引
  257. func deleteIndex(clients map[string]*elastic.Client, index string) error {
  258. if len(clients) == 0 {
  259. return errors.New("没有 配置 ES集群信息")
  260. }
  261. for k, client := range clients {
  262. exist, err := client.IndexExists(index).Do(context.Background())
  263. if !exist {
  264. log.Info("deleteIndex", zap.String(k, index+" 索引文件不存在"))
  265. continue
  266. }
  267. if err != nil {
  268. return err
  269. }
  270. num, err := client.Count(index).Do(context.Background())
  271. if err != nil {
  272. return err
  273. }
  274. if num > 0 {
  275. log.Info("deleteIndex", zap.String(k, "索引"+index+"还存在有效数据"))
  276. }
  277. _, err = client.DeleteIndex(index).Do(context.Background())
  278. if err != nil {
  279. return err
  280. }
  281. }
  282. return nil
  283. }
  284. //dealIndexByHour 处理预处理索引,根据小时;
  285. func dealIndexByHour() {
  286. now := time.Now()
  287. PreBiddingIndex := ""
  288. var clients = make(map[string]*elastic.Client, 0)
  289. for k, v := range GF.ES {
  290. url := v.URL
  291. username := v.Username
  292. password := v.Password
  293. // 创建 Elasticsearch 客户端
  294. client, err := elastic.NewClient(
  295. elastic.SetURL(url),
  296. elastic.SetBasicAuth(username, password),
  297. elastic.SetSniff(false),
  298. )
  299. if err != nil {
  300. log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
  301. }
  302. clients[k] = client
  303. }
  304. next := now.Add(time.Hour)
  305. month := int(next.Month())
  306. monthStr := strconv.Itoa(month)
  307. year := next.Year()
  308. yearStr := strconv.Itoa(year)
  309. dayStr := strconv.Itoa(next.Day())
  310. hour := next.Hour()
  311. hourStr := strconv.Itoa(hour)
  312. //下一天的索引名称
  313. PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr + hourStr
  314. err := CreateIndex(clients, PreBiddingIndex)
  315. if err != nil {
  316. log.Info("dealIndexByHour", zap.Error(err))
  317. SendMail("预处理索引", "预处理索引创建失败,请检查")
  318. }
  319. log.Info("dealIndexByHour", zap.String(PreBiddingIndex, "创建成功"))
  320. //3. 删除昨天的索引
  321. last := now.Add(-time.Hour)
  322. month2 := int(last.Month())
  323. monthStr2 := strconv.Itoa(month2)
  324. year2 := last.Year()
  325. yearStr2 := strconv.Itoa(year2)
  326. dayStr2 := strconv.Itoa(last.Day())
  327. hour2 := last.Hour()
  328. hourStr2 := strconv.Itoa(hour2)
  329. //索引名称
  330. lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2 + hourStr2
  331. err = deleteIndex(clients, lastIndex)
  332. if err != nil {
  333. log.Info("dealIndexByHour", zap.Error(err))
  334. }
  335. //4. 删除bidding_extract 过期数据
  336. where := map[string]interface{}{
  337. "comeintime": map[string]interface{}{
  338. "$lte": last.Unix(),
  339. },
  340. "is_pre": 1,
  341. }
  342. MgoB.Delete("bidding_extract", where)
  343. }
  344. //dealIndexByDay 处理预处理索引,根据天;
  345. func dealIndexByDay() {
  346. now := time.Now()
  347. PreBiddingIndex := ""
  348. //hour := now.Hour()
  349. // 判断当前时间是否时最后一个小时
  350. //if hour == 23 {
  351. //当天最后一小时
  352. var clients = make(map[string]*elastic.Client, 0)
  353. for k, v := range GF.ES {
  354. url := v.URL
  355. username := v.Username
  356. password := v.Password
  357. // 创建 Elasticsearch 客户端
  358. client, err := elastic.NewClient(
  359. elastic.SetURL(url),
  360. elastic.SetBasicAuth(username, password),
  361. elastic.SetSniff(false),
  362. )
  363. if err != nil {
  364. log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
  365. }
  366. clients[k] = client
  367. }
  368. next := now.AddDate(0, 0, 1)
  369. month := int(next.Month())
  370. monthStr := strconv.Itoa(month)
  371. year := next.Year()
  372. yearStr := strconv.Itoa(year)
  373. dayStr := strconv.Itoa(next.Day())
  374. //下一天的索引名称
  375. PreBiddingIndex = "bidding_" + yearStr + monthStr + dayStr
  376. err := CreateIndex(clients, PreBiddingIndex)
  377. if err != nil {
  378. log.Info("dealIndexByDay", zap.Error(err))
  379. SendMail("预处理索引", "预处理索引创建失败,请检查")
  380. }
  381. log.Info("dealIndexByDay", zap.String(PreBiddingIndex, "创建成功"))
  382. //3. 删除昨天的索引
  383. last := now.AddDate(0, 0, -1)
  384. month2 := int(last.Month())
  385. monthStr2 := strconv.Itoa(month2)
  386. year2 := last.Year()
  387. yearStr2 := strconv.Itoa(year2)
  388. dayStr2 := strconv.Itoa(last.Day())
  389. //索引名称
  390. lastIndex := "bidding_" + yearStr2 + monthStr2 + dayStr2
  391. err = deleteIndex(clients, lastIndex)
  392. if err != nil {
  393. log.Info("dealIndexByDay", zap.Error(err))
  394. }
  395. //4. 删除bidding_extract 过期数据
  396. where := map[string]interface{}{
  397. "comeintime": map[string]interface{}{
  398. "$lte": last.Unix(),
  399. },
  400. "is_pre": 1,
  401. }
  402. MgoB.Delete("bidding_extract", where)
  403. }
  404. //dealIndexByMonth 处理预处理索引,根据月份;提前一天创建好 下个月的索引
  405. func dealIndexByMonth() {
  406. now := time.Now()
  407. PreBiddingIndex := ""
  408. // 获取当前月份的最后一天
  409. lastDayOfMonth := time.Date(now.Year(), now.Month()+1, 0, 0, 0, 0, 0, time.UTC)
  410. // 判断当前时间是否为当前月份的最后一天
  411. if now.Day() == lastDayOfMonth.Day() {
  412. //当月最后一天,需要提前创建好索引
  413. fmt.Println("当前时间是当前月份的最后一天")
  414. var clients = make(map[string]*elastic.Client, 0)
  415. for k, v := range GF.ES {
  416. url := v.URL
  417. username := v.Username
  418. password := v.Password
  419. // 创建 Elasticsearch 客户端
  420. client, err := elastic.NewClient(
  421. elastic.SetURL(url),
  422. elastic.SetBasicAuth(username, password),
  423. elastic.SetSniff(false),
  424. )
  425. if err != nil {
  426. log.Error("创建 Elasticsearch 客户端失败:", zap.Error(err))
  427. }
  428. clients[k] = client
  429. }
  430. next := now.AddDate(0, 0, 1)
  431. month := int(next.Month())
  432. monthStr := strconv.Itoa(month)
  433. year := next.Year()
  434. yearStr := strconv.Itoa(year)
  435. //下一个月的索引名称
  436. PreBiddingIndex = "bidding_" + yearStr + monthStr
  437. //2 创建下个月索引结构
  438. err := CreateIndex(clients, PreBiddingIndex)
  439. if err != nil {
  440. log.Info("dealIndexByMonth", zap.Error(err))
  441. SendMail("预处理索引", "预处理索引创建失败,请检查")
  442. }
  443. log.Info("dealIndexByMonth", zap.String(PreBiddingIndex, "创建成功"))
  444. //3. 删除上个月的索引
  445. last := now.AddDate(0, -1, 1)
  446. month2 := int(last.Month())
  447. monthStr2 := strconv.Itoa(month2)
  448. year2 := last.Year()
  449. yearStr2 := strconv.Itoa(year2)
  450. //上个月的索引名称
  451. lastIndex := "bidding_" + yearStr2 + monthStr2
  452. err = deleteIndex(clients, lastIndex)
  453. if err != nil {
  454. log.Info("dealIndexByMonth", zap.Error(err))
  455. }
  456. //4. 删除bidding_extract 过期数据
  457. where := map[string]interface{}{
  458. "comeintime": map[string]interface{}{
  459. "$lte": last.Unix(),
  460. },
  461. "is_pre": 1,
  462. }
  463. MgoB.Delete("bidding_extract", where)
  464. }
  465. }
  466. //SendMail 发送邮件
  467. func SendMail(title, content string) {
  468. url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", GF.Email.Api, GF.Email.To, title, content)
  469. fmt.Println("url=>", url)
  470. res, err := http.Get(url)
  471. if err != nil {
  472. log.Info("SendMail", zap.Any("err", err))
  473. } else {
  474. log.Info("SendMail", zap.Any("res", res))
  475. }
  476. }