package main import ( "encoding/json" "fmt" "github.com/cron" "go.mongodb.org/mongo-driver/bson" es "gopkg.in/olivere/elastic.v1" "mongodb" "qfw/util" "regexp" "strings" "sync" "time" ) var date1 = regexp.MustCompile("20[0-2][0-9][年|\\-\\/|.][0-9]{1,2}[月|\\-|\\/|.][0-9]{1,2}[日]?") func TimeTask() { c := cron.New() cronstr := "0 0 2 * * ?" //每天3点执行 临时表数据 cronstr1 := "0 0 3 * * ?" //每天4点执行 结果表数据 _ = c.AddFunc(cronstr, func() { findEs() }) _ = c.AddFunc(cronstr1, func() { fcResult() }) c.Start() } func findEs() { util.Debug("预测结果时间-------临时表保存数据----------") client := Es.GetEsConn() defer Es.DestoryEsConn(client) ch := make(chan bool, 5) wg := &sync.WaitGroup{} //esquery := `{ //"query": { // "bool": { // "must": [ // { // "terms": { // "subtype": [ // "成交", // "合同" // ] // } // }, // { // "terms": { // "s_subscopeclass": [ // "服务采购_法律咨询", // "服务采购_会计", // "服务采购_物业", // "服务采购_审计", // "服务采购_安保", // "服务采购_仓储物流", // "服务采购_广告宣传印刷" // ] // } // } // ] // } //}, //"_source": [ // "_id", // "title", // "buyer", // "buyerclass", // "s_subscopeclass", // "yuceendtime", // "area", // "city", // "subtype", // "projectname", // "purchasing", // "href", // "projectcode", // "publishtime", // "buyerperson", // "buyertel"] //}` //esquery := `{"query":{"bool":{"must":[{"term":{"id":"6168359c06a9d911e598d573"}}]}}}` currenttime := time.Now().Unix() stime := time.Unix(currenttime, 0).AddDate(0, 0, -1).Unix() esquery := `{"query":{"bool":{"must":[{"range":{"comeintime":{"from":` + fmt.Sprint(stime) + `,"to":` + fmt.Sprint(currenttime) + `}}}],"must_not":[{"constant_score":{"filter":{"missing":{"field":"yuceendtime"}}}}]}}}` util.Debug(esquery) fieldArr := []string{"_id", "title", "buyer", "buyerclass", "s_subscopeclass", "yuceendtime", "area", "city", "subtype", "projectname", "purchasing", "href", "projectcode", "publishtime", "buyerperson", "buyertel", "projectperiod", "project_duration", "project_timeunit", "signaturedate"} escount := Es.Count("bidding", "bidding", esquery) util.Debug("查询总数:", escount) //查询条件类型转换 var q es.Query tmpQuery := es.BoolQuery{ QueryStrings: esquery, } q = tmpQuery numDocs := 0 //游标查询,index不支持别名,只能写索引库的名称 res, err := client.Scroll("bidding_v1").Query(q).Size(500).Do() //查询一条获取游标 if err == nil { scrollId := res.ScrollId for { if scrollId == "" { util.Debug("ScrollId Is Error") break } searchResult, err := client.Scroll("bidding_v1").Size(500).ScrollId(scrollId).Do() //查询 if err != nil { if err.Error() == "EOS" { //迭代完毕 util.Debug("Es Search Data Over:", err) } else { util.Debug("Es Search Data Error:", err) } break } for _, hit := range searchResult.Hits.Hits { //开始处理数据 wg.Add(1) ch <- true go func(tmpHit *es.SearchHit) { defer func() { <-ch wg.Done() }() tmp := make(map[string]interface{}) if json.Unmarshal(*tmpHit.Source, &tmp) == nil { save := make(map[string]interface{}) for _, v := range fieldArr { if tmp[v] != nil { save[v] = tmp[v] } } //istart, iend := YcTime(save) //if istart > 0 && iend > 0 { // save["yucestarttime"] = istart // save["yuceendtime"] = iend // // upStr := fmt.Sprintf("ctx._source.yuceendtime=%d", iend) // if tmp["yeceendtime"] != nil { // upStr += fmt.Sprintf(";ctx._source.yeceendtime=%d", 0) // } // updateEs := map[string]string{ // "id": util.ObjToString(tmp["_id"]), // "updateStr": upStr, // } // Es.UpdateOne("bidding", "bidding", updateEs) //} savePool <- save //update := []map[string]interface{}{{ // "_id": save["_id"], //}, // {"$set": save}, //} //updatePool <- update } }(hit) numDocs += 1 if numDocs%100 == 0 { util.Debug("Current:", numDocs) } } scrollId = searchResult.ScrollId } wg.Wait() util.Debug("over---", numDocs) client.ClearScroll().ScrollId(scrollId).Do() //清理游标 } } func YcTime(tmp map[string]interface{}) (int64, int64) { // yucestarttime、yuceendtime yucestarttime := int64(0) yuceendtime := int64(0) // 项目周期中 if util.ObjToString(tmp["projectperiod"]) != "" { dateStr := date1.FindStringSubmatch(util.ObjToString(tmp["projectperiod"])) if len(dateStr) == 2 { sdate := FormatDateStr(dateStr[0]) edate := FormatDateStr(dateStr[1]) if sdate < edate && sdate != 0 && edate != 0 { yucestarttime = sdate yuceendtime = edate } } } if yucestarttime > 0 && yuceendtime > yucestarttime { return yucestarttime, yuceendtime } // 预测开始时间 合同签订日期 if util.IntAll(tmp["signaturedate"]) <= 0 { if util.IntAll(tmp["publishtime"]) <= 0 { return 0, 0 } else { yucestarttime = util.Int64All(tmp["publishtime"]) } } else { yucestarttime = util.Int64All(tmp["signaturedate"]) } // 预测结束时间 if yucestarttime > 0 { if util.IntAll(tmp["project_duration"]) > 0 && util.ObjToString(tmp["project_timeunit"]) != "" { yuceendtime = YcEndTime(yucestarttime, util.IntAll(tmp["project_duration"]), util.ObjToString(tmp["project_timeunit"])) return yucestarttime, yuceendtime } } return 0, 0 } func fcResult() { util.Debug("预测结果时间-------结果表迁移数据----------") currenttime := time.Now().Unix() endtime := time.Unix(currenttime, 0).AddDate(0, 3, 0).Unix() q := bson.M{"yuceendtime": bson.M{"$gte": currenttime, "$lt": endtime}, "move": nil} field := bson.M{"project_duration": 0, "project_timeunit": 0, "projectperiod": 0, "s_subscopeclass": 0} util.Debug(q) sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) result := sess.DB("mixdata").C("project_forecast_yece_tmp").Find(q).Select(field).Iter() count := 0 for tmp := make(map[string]interface{}); result.Next(&tmp); count++ { if count%200 == 0 { util.Debug("count---", count) } update := []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": bson.M{"move": true}}, } updatePool <- update id := mongodb.BsonIdToSId(tmp["_id"]) tmp["infoid"] = id delete(tmp, "_id") tmp["yucetime"] = currenttime tmp["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html` if tmp["buyer"] == nil || tmp["buyerperson"] == nil || tmp["buyertel"] == nil { esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}` info := Es.Get("project", "project", esq) if len(*info) > 0 { if (*info)[0]["buyer"] != nil { tmp["buyer"] = (*info)[0]["buyer"] } if (*info)[0]["buyerperson"] != nil { tmp["buyerperson"] = (*info)[0]["buyerperson"] } if (*info)[0]["buyertel"] != nil { tmp["buyertel"] = (*info)[0]["buyertel"] } } } tpmp := make(map[string]interface{}) tpmp["p_rate"] = "60%" if tmp["purchasing"] != nil { tpmp["purchasing"] = tmp["purchasing"] tpmp["purchasing"] = util.ObjToString(tpmp["purchasing"]) + "," + util.ObjToString(tmp["projectname"]) } else { tpmp["purchasing"] = tmp["projectname"] } var arr []map[string]interface{} for _, v := range strings.Split(util.ObjToString(tpmp["purchasing"]), ",") { p := make(map[string]interface{}) p["p_purchasing"] = v p["p_id"] = id p["p_orther"] = tmp["projectname"] if tmp["buyerperson"] != nil { p["p_person"] = tmp["buyerperson"] } if tmp["buyertel"] != nil { p["p_phone"] = tmp["buyertel"] } arr = append(arr, p) } tpmp["p_projects"] = arr delete(tmp, "buyerperson") delete(tmp, "buyertel") tmp["results"] = append([]map[string]interface{}{}, tpmp) savePool1 <- tmp //update1 := []map[string]interface{}{{ // "infoid": id, //}, // {"$set": tmp}, //} //updatePool1 <- update1 } util.Debug("over ---", count) } func YcEndTime(starttime int64, num int, unit string) int64 { yuceendtime := int64(0) if unit == "日历天" || unit == "天" || unit == "日" { yuceendtime = starttime + int64(num*86400) } else if unit == "周" { yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix() } else if unit == "月" { yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix() } else if unit == "年" { yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix() } else if unit == "工作日" { n := num / 7 * 2 yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix() } return yuceendtime } func FormatDateStr(ds string) int64 { ds = strings.Replace(ds, "年", "-", -1) ds = strings.Replace(ds, "月", "-", -1) ds = strings.Replace(ds, "日", "", -1) ds = strings.Replace(ds, "/", "-", -1) ds = strings.Replace(ds, ".", "-", -1) util.Debug(ds) location, err := time.ParseInLocation(util.Date_Short_Layout, ds, time.Local) if err != nil { util.Debug(err) return 0 } else { return location.Unix() } }