package main import ( "context" "encoding/json" es "github.com/olivere/elastic/v7" "github.com/robfig/cron" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "strings" "sync" "time" ) var fieldArr = []string{"_id", "title", "buyer", "buyerclass", "s_subscopeclass", "yuceendtime", "area", "city", "district", "subtype", "projectname", "purchasing", "href", "projectcode", "publishtime", "buyerperson", "buyertel", "projectperiod", "project_duration", "project_timeunit", "signaturedate"} func TimeTask() { c := cron.New() cronstr := "0 0 2 * * ?" //每天2点执行 数据 _ = c.AddFunc(cronstr, func() { findEs() }) c.Start() } func findEs() { client := Es.GetEsConn() defer Es.DestoryEsConn(client) wg := &sync.WaitGroup{} currenttime := time.Now().Unix() stime := time.Unix(currenttime, 0).AddDate(0, 0, -1).Unix() query := es.NewBoolQuery(). Must(es.NewRangeQuery("comeintime").Gte(stime).Lte(currenttime)). Must(es.NewExistsQuery("yuceendtime")) escount := Es.Count("bidding", query) util.Debug("查询总数:", stime, currenttime, escount) numDocs := 0 //游标查询,index不支持别名,只能写索引库的名称 res, err := client.Scroll("bidding").Query(query).Scroll("5m").Size(500).Do(context.TODO()) //查询一条获取游标 if err == nil { taskInfoA(res, wg, &numDocs) scrollId := res.ScrollId for { if scrollId == "" { util.Debug("ScrollId Is Error") break } searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(500).Do(context.TODO()) //查询 if err != nil { if err.Error() == "EOS" { //迭代完毕 util.Debug("Es Search Data Over:", err) } else { util.Debug("Es Search Data Error:", err) } break } taskInfoA(searchResult, wg, &numDocs) scrollId = searchResult.ScrollId } wg.Wait() util.Debug("over---", numDocs) client.ClearScroll().ScrollId(scrollId).Do(context.TODO()) //清理游标 } } func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) { ch := make(chan bool, 2) for _, hit := range searchResult.Hits.Hits { //开始处理数据 wg.Add(1) ch <- true go func(tmpHit *es.SearchHit) { defer func() { <-ch wg.Done() }() tmp := make(map[string]interface{}) if json.Unmarshal(tmpHit.Source, &tmp) == nil { save := make(map[string]interface{}) for _, v := range fieldArr { if tmp[v] != nil { save[v] = tmp[v] } } id := util.ObjToString(tmpHit.Id) save["infoid"] = id delete(tmp, "_id") save["yucetime"] = time.Now().Unix() save["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html` if save["buyer"] == nil || save["buyerperson"] == nil || save["buyertel"] == nil { esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}` info := Es.Get("projectset", esq) if len(*info) > 0 { if (*info)[0]["buyer"] != nil { save["buyer"] = (*info)[0]["buyer"] } if (*info)[0]["buyerperson"] != nil { save["buyerperson"] = (*info)[0]["buyerperson"] } if (*info)[0]["buyertel"] != nil { save["buyertel"] = (*info)[0]["buyertel"] } } } tpmp := make(map[string]interface{}) tpmp["p_rate"] = "60%" if save["purchasing"] != nil { tpmp["purchasing"] = save["purchasing"] tpmp["purchasing"] = util.ObjToString(tpmp["purchasing"]) + "," + util.ObjToString(save["projectname"]) } else { tpmp["purchasing"] = save["projectname"] } var arr []map[string]interface{} for _, v := range strings.Split(util.ObjToString(tpmp["purchasing"]), ",") { p := make(map[string]interface{}) p["p_purchasing"] = v p["p_id"] = id p["p_orther"] = save["projectname"] if save["buyerperson"] != nil { p["p_person"] = save["buyerperson"] } if save["buyertel"] != nil { p["p_phone"] = save["buyertel"] } arr = append(arr, p) } tpmp["p_projects"] = arr if tmp["bidamount"] != nil { save["sortprice"] = tmp["bidamount"] save["bidamount"] = tmp["bidamount"] if save["budget"] != nil { save["budget"] = tmp["budget"] } } else if tmp["budget"] != nil { save["sortprice"] = tmp["budget"] save["budget"] = tmp["budget"] } delete(save, "buyerperson") delete(save, "buyertel") save["results"] = append([]map[string]interface{}{}, tpmp) savePool <- save } }(hit) *countDocs += 1 if *countDocs%200 == 0 { util.Debug("Current:", *countDocs) } } }