123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- package main
- import (
- "context"
- "encoding/json"
- es "github.com/olivere/elastic/v7"
- "github.com/robfig/cron"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "strings"
- "sync"
- "time"
- )
- var fieldArr = []string{"_id", "title", "buyer", "buyerclass", "s_subscopeclass", "yuceendtime", "area", "city", "district", "subtype",
- "projectname", "purchasing", "href", "projectcode", "publishtime", "buyerperson", "buyertel", "projectperiod",
- "project_duration", "project_timeunit", "signaturedate"}
- func TimeTask() {
- c := cron.New()
- cronstr := "0 0 2 * * ?" //每天2点执行 数据
- _ = c.AddFunc(cronstr, func() {
- findEs()
- })
- c.Start()
- }
- func findEs() {
- client := Es.GetEsConn()
- defer Es.DestoryEsConn(client)
- wg := &sync.WaitGroup{}
- currenttime := time.Now().Unix()
- stime := time.Unix(currenttime, 0).AddDate(0, 0, -1).Unix()
- query := es.NewBoolQuery().
- Must(es.NewRangeQuery("comeintime").Gte(stime).Lte(currenttime)).
- Must(es.NewExistsQuery("yuceendtime"))
- escount := Es.Count("bidding", query)
- util.Debug("查询总数:", stime, currenttime, escount)
- numDocs := 0
- //游标查询,index不支持别名,只能写索引库的名称
- res, err := client.Scroll("bidding").Query(query).Scroll("5m").Size(500).Do(context.TODO()) //查询一条获取游标
- if err == nil {
- taskInfoA(res, wg, &numDocs)
- scrollId := res.ScrollId
- for {
- if scrollId == "" {
- util.Debug("ScrollId Is Error")
- break
- }
- searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(500).Do(context.TODO()) //查询
- if err != nil {
- if err.Error() == "EOS" { //迭代完毕
- util.Debug("Es Search Data Over:", err)
- } else {
- util.Debug("Es Search Data Error:", err)
- }
- break
- }
- taskInfoA(searchResult, wg, &numDocs)
- scrollId = searchResult.ScrollId
- }
- wg.Wait()
- util.Debug("over---", numDocs)
- client.ClearScroll().ScrollId(scrollId).Do(context.TODO()) //清理游标
- }
- }
- func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
- ch := make(chan bool, 2)
- for _, hit := range searchResult.Hits.Hits {
- //开始处理数据
- wg.Add(1)
- ch <- true
- go func(tmpHit *es.SearchHit) {
- defer func() {
- <-ch
- wg.Done()
- }()
- tmp := make(map[string]interface{})
- if json.Unmarshal(tmpHit.Source, &tmp) == nil {
- save := make(map[string]interface{})
- for _, v := range fieldArr {
- if tmp[v] != nil {
- save[v] = tmp[v]
- }
- }
- id := util.ObjToString(tmpHit.Id)
- save["infoid"] = id
- delete(tmp, "_id")
- save["yucetime"] = time.Now().Unix()
- save["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html`
- if save["buyer"] == nil || save["buyerperson"] == nil || save["buyertel"] == nil {
- esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
- info := Es.Get("projectset", esq)
- if len(*info) > 0 {
- if (*info)[0]["buyer"] != nil {
- save["buyer"] = (*info)[0]["buyer"]
- }
- if (*info)[0]["buyerperson"] != nil {
- save["buyerperson"] = (*info)[0]["buyerperson"]
- }
- if (*info)[0]["buyertel"] != nil {
- save["buyertel"] = (*info)[0]["buyertel"]
- }
- }
- }
- tpmp := make(map[string]interface{})
- tpmp["p_rate"] = "60%"
- if save["purchasing"] != nil {
- tpmp["purchasing"] = save["purchasing"]
- tpmp["purchasing"] = util.ObjToString(tpmp["purchasing"]) + "," + util.ObjToString(save["projectname"])
- } else {
- tpmp["purchasing"] = save["projectname"]
- }
- var arr []map[string]interface{}
- for _, v := range strings.Split(util.ObjToString(tpmp["purchasing"]), ",") {
- p := make(map[string]interface{})
- p["p_purchasing"] = v
- p["p_id"] = id
- p["p_orther"] = save["projectname"]
- if save["buyerperson"] != nil {
- p["p_person"] = save["buyerperson"]
- }
- if save["buyertel"] != nil {
- p["p_phone"] = save["buyertel"]
- }
- arr = append(arr, p)
- }
- tpmp["p_projects"] = arr
- if tmp["bidamount"] != nil {
- save["sortprice"] = tmp["bidamount"]
- save["bidamount"] = tmp["bidamount"]
- if save["budget"] != nil {
- save["budget"] = tmp["budget"]
- }
- } else if tmp["budget"] != nil {
- save["sortprice"] = tmp["budget"]
- save["budget"] = tmp["budget"]
- }
- delete(save, "buyerperson")
- delete(save, "buyertel")
- save["results"] = append([]map[string]interface{}{}, tpmp)
- savePool <- save
- }
- }(hit)
- *countDocs += 1
- if *countDocs%200 == 0 {
- util.Debug("Current:", *countDocs)
- }
- }
- }
|