task.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. es "github.com/olivere/elastic/v7"
  6. "github.com/robfig/cron"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. var fieldArr = []string{"_id", "title", "buyer", "buyerclass", "s_subscopeclass", "yuceendtime", "area", "city", "district", "subtype",
  13. "projectname", "purchasing", "href", "projectcode", "publishtime", "buyerperson", "buyertel", "projectperiod",
  14. "project_duration", "project_timeunit", "signaturedate"}
  15. func TimeTask() {
  16. c := cron.New()
  17. cronstr := "0 0 2 * * ?" //每天2点执行 数据
  18. _ = c.AddFunc(cronstr, func() {
  19. findEs()
  20. })
  21. c.Start()
  22. }
  23. func findEs() {
  24. client := Es.GetEsConn()
  25. defer Es.DestoryEsConn(client)
  26. wg := &sync.WaitGroup{}
  27. currenttime := time.Now().Unix()
  28. stime := time.Unix(currenttime, 0).AddDate(0, 0, -1).Unix()
  29. query := es.NewBoolQuery().
  30. Must(es.NewRangeQuery("comeintime").Gte(stime).Lte(currenttime)).
  31. Must(es.NewExistsQuery("yuceendtime"))
  32. escount := Es.Count("bidding", query)
  33. util.Debug("查询总数:", stime, currenttime, escount)
  34. numDocs := 0
  35. //游标查询,index不支持别名,只能写索引库的名称
  36. res, err := client.Scroll("bidding").Query(query).Scroll("5m").Size(500).Do(context.TODO()) //查询一条获取游标
  37. if err == nil {
  38. taskInfoA(res, wg, &numDocs)
  39. scrollId := res.ScrollId
  40. for {
  41. if scrollId == "" {
  42. util.Debug("ScrollId Is Error")
  43. break
  44. }
  45. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(500).Do(context.TODO()) //查询
  46. if err != nil {
  47. if err.Error() == "EOS" { //迭代完毕
  48. util.Debug("Es Search Data Over:", err)
  49. } else {
  50. util.Debug("Es Search Data Error:", err)
  51. }
  52. break
  53. }
  54. taskInfoA(searchResult, wg, &numDocs)
  55. scrollId = searchResult.ScrollId
  56. }
  57. wg.Wait()
  58. util.Debug("over---", numDocs)
  59. client.ClearScroll().ScrollId(scrollId).Do(context.TODO()) //清理游标
  60. }
  61. }
  62. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  63. ch := make(chan bool, 2)
  64. for _, hit := range searchResult.Hits.Hits {
  65. //开始处理数据
  66. wg.Add(1)
  67. ch <- true
  68. go func(tmpHit *es.SearchHit) {
  69. defer func() {
  70. <-ch
  71. wg.Done()
  72. }()
  73. tmp := make(map[string]interface{})
  74. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  75. save := make(map[string]interface{})
  76. for _, v := range fieldArr {
  77. if tmp[v] != nil {
  78. save[v] = tmp[v]
  79. }
  80. }
  81. id := util.ObjToString(tmpHit.Id)
  82. save["infoid"] = id
  83. delete(tmp, "_id")
  84. save["yucetime"] = time.Now().Unix()
  85. save["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html`
  86. if save["buyer"] == nil || save["buyerperson"] == nil || save["buyertel"] == nil {
  87. esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
  88. info := Es.Get("projectset", esq)
  89. if len(*info) > 0 {
  90. if (*info)[0]["buyer"] != nil {
  91. save["buyer"] = (*info)[0]["buyer"]
  92. }
  93. if (*info)[0]["buyerperson"] != nil {
  94. save["buyerperson"] = (*info)[0]["buyerperson"]
  95. }
  96. if (*info)[0]["buyertel"] != nil {
  97. save["buyertel"] = (*info)[0]["buyertel"]
  98. }
  99. }
  100. }
  101. tpmp := make(map[string]interface{})
  102. tpmp["p_rate"] = "60%"
  103. if save["purchasing"] != nil {
  104. tpmp["purchasing"] = save["purchasing"]
  105. tpmp["purchasing"] = util.ObjToString(tpmp["purchasing"]) + "," + util.ObjToString(save["projectname"])
  106. } else {
  107. tpmp["purchasing"] = save["projectname"]
  108. }
  109. var arr []map[string]interface{}
  110. for _, v := range strings.Split(util.ObjToString(tpmp["purchasing"]), ",") {
  111. p := make(map[string]interface{})
  112. p["p_purchasing"] = v
  113. p["p_id"] = id
  114. p["p_orther"] = save["projectname"]
  115. if save["buyerperson"] != nil {
  116. p["p_person"] = save["buyerperson"]
  117. }
  118. if save["buyertel"] != nil {
  119. p["p_phone"] = save["buyertel"]
  120. }
  121. arr = append(arr, p)
  122. }
  123. tpmp["p_projects"] = arr
  124. if tmp["bidamount"] != nil {
  125. save["sortprice"] = tmp["bidamount"]
  126. save["bidamount"] = tmp["bidamount"]
  127. if save["budget"] != nil {
  128. save["budget"] = tmp["budget"]
  129. }
  130. } else if tmp["budget"] != nil {
  131. save["sortprice"] = tmp["budget"]
  132. save["budget"] = tmp["budget"]
  133. }
  134. delete(save, "buyerperson")
  135. delete(save, "buyertel")
  136. save["results"] = append([]map[string]interface{}{}, tpmp)
  137. savePool <- save
  138. }
  139. }(hit)
  140. *countDocs += 1
  141. if *countDocs%200 == 0 {
  142. util.Debug("Current:", *countDocs)
  143. }
  144. }
  145. }