123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/cron"
- "go.mongodb.org/mongo-driver/bson"
- es "gopkg.in/olivere/elastic.v1"
- "mongodb"
- "qfw/util"
- "regexp"
- "strings"
- "sync"
- "time"
- )
- var date1 = regexp.MustCompile("20[0-2][0-9][年|\\-\\/|.][0-9]{1,2}[月|\\-|\\/|.][0-9]{1,2}[日]?")
- func TimeTask() {
- c := cron.New()
- cronstr := "0 0 2 * * ?" //每天3点执行 临时表数据
- cronstr1 := "0 0 3 * * ?" //每天4点执行 结果表数据
- _ = c.AddFunc(cronstr, func() {
- findEs()
- })
- _ = c.AddFunc(cronstr1, func() {
- fcResult()
- })
- c.Start()
- }
- func findEs() {
- util.Debug("预测结果时间-------临时表保存数据----------")
- client := Es.GetEsConn()
- defer Es.DestoryEsConn(client)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- //esquery := `{
- //"query": {
- // "bool": {
- // "must": [
- // {
- // "terms": {
- // "subtype": [
- // "成交",
- // "合同"
- // ]
- // }
- // },
- // {
- // "terms": {
- // "s_subscopeclass": [
- // "服务采购_法律咨询",
- // "服务采购_会计",
- // "服务采购_物业",
- // "服务采购_审计",
- // "服务采购_安保",
- // "服务采购_仓储物流",
- // "服务采购_广告宣传印刷"
- // ]
- // }
- // }
- // ]
- // }
- //},
- //"_source": [
- // "_id",
- // "title",
- // "buyer",
- // "buyerclass",
- // "s_subscopeclass",
- // "yuceendtime",
- // "area",
- // "city",
- // "subtype",
- // "projectname",
- // "purchasing",
- // "href",
- // "projectcode",
- // "publishtime",
- // "buyerperson",
- // "buyertel"]
- //}`
- //esquery := `{"query":{"bool":{"must":[{"term":{"id":"6168359c06a9d911e598d573"}}]}}}`
- currenttime := time.Now().Unix()
- stime := time.Unix(currenttime, 0).AddDate(0, 0, -1).Unix()
- esquery := `{"query":{"bool":{"must":[{"range":{"comeintime":{"from":` + fmt.Sprint(stime) + `,"to":` + fmt.Sprint(currenttime) + `}}}],"must_not":[{"constant_score":{"filter":{"missing":{"field":"yuceendtime"}}}}]}}}`
- util.Debug(esquery)
- fieldArr := []string{"_id", "title", "buyer", "buyerclass", "s_subscopeclass", "yuceendtime", "area", "city", "subtype",
- "projectname", "purchasing", "href", "projectcode", "publishtime", "buyerperson", "buyertel", "projectperiod",
- "project_duration", "project_timeunit", "signaturedate"}
- escount := Es.Count("bidding", "bidding", esquery)
- util.Debug("查询总数:", escount)
- //查询条件类型转换
- var q es.Query
- tmpQuery := es.BoolQuery{
- QueryStrings: esquery,
- }
- q = tmpQuery
- numDocs := 0
- //游标查询,index不支持别名,只能写索引库的名称
- res, err := client.Scroll("bidding_v1").Query(q).Size(500).Do() //查询一条获取游标
- if err == nil {
- scrollId := res.ScrollId
- for {
- if scrollId == "" {
- util.Debug("ScrollId Is Error")
- break
- }
- searchResult, err := client.Scroll("bidding_v1").Size(500).ScrollId(scrollId).Do() //查询
- if err != nil {
- if err.Error() == "EOS" { //迭代完毕
- util.Debug("Es Search Data Over:", err)
- } else {
- util.Debug("Es Search Data Error:", err)
- }
- break
- }
- for _, hit := range searchResult.Hits.Hits {
- //开始处理数据
- wg.Add(1)
- ch <- true
- go func(tmpHit *es.SearchHit) {
- defer func() {
- <-ch
- wg.Done()
- }()
- tmp := make(map[string]interface{})
- if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
- save := make(map[string]interface{})
- for _, v := range fieldArr {
- if tmp[v] != nil {
- save[v] = tmp[v]
- }
- }
- //istart, iend := YcTime(save)
- //if istart > 0 && iend > 0 {
- // save["yucestarttime"] = istart
- // save["yuceendtime"] = iend
- //
- // upStr := fmt.Sprintf("ctx._source.yuceendtime=%d", iend)
- // if tmp["yeceendtime"] != nil {
- // upStr += fmt.Sprintf(";ctx._source.yeceendtime=%d", 0)
- // }
- // updateEs := map[string]string{
- // "id": util.ObjToString(tmp["_id"]),
- // "updateStr": upStr,
- // }
- // Es.UpdateOne("bidding", "bidding", updateEs)
- //}
- savePool <- save
- //update := []map[string]interface{}{{
- // "_id": save["_id"],
- //},
- // {"$set": save},
- //}
- //updatePool <- update
- }
- }(hit)
- numDocs += 1
- if numDocs%100 == 0 {
- util.Debug("Current:", numDocs)
- }
- }
- scrollId = searchResult.ScrollId
- }
- wg.Wait()
- util.Debug("over---", numDocs)
- client.ClearScroll().ScrollId(scrollId).Do() //清理游标
- }
- }
- func YcTime(tmp map[string]interface{}) (int64, int64) {
- // yucestarttime、yuceendtime
- yucestarttime := int64(0)
- yuceendtime := int64(0)
- // 项目周期中
- if util.ObjToString(tmp["projectperiod"]) != "" {
- dateStr := date1.FindStringSubmatch(util.ObjToString(tmp["projectperiod"]))
- if len(dateStr) == 2 {
- sdate := FormatDateStr(dateStr[0])
- edate := FormatDateStr(dateStr[1])
- if sdate < edate && sdate != 0 && edate != 0 {
- yucestarttime = sdate
- yuceendtime = edate
- }
- }
- }
- if yucestarttime > 0 && yuceendtime > yucestarttime {
- return yucestarttime, yuceendtime
- }
- // 预测开始时间 合同签订日期
- if util.IntAll(tmp["signaturedate"]) <= 0 {
- if util.IntAll(tmp["publishtime"]) <= 0 {
- return 0, 0
- } else {
- yucestarttime = util.Int64All(tmp["publishtime"])
- }
- } else {
- yucestarttime = util.Int64All(tmp["signaturedate"])
- }
- // 预测结束时间
- if yucestarttime > 0 {
- if util.IntAll(tmp["project_duration"]) > 0 && util.ObjToString(tmp["project_timeunit"]) != "" {
- yuceendtime = YcEndTime(yucestarttime, util.IntAll(tmp["project_duration"]), util.ObjToString(tmp["project_timeunit"]))
- return yucestarttime, yuceendtime
- }
- }
- return 0, 0
- }
- func fcResult() {
- util.Debug("预测结果时间-------结果表迁移数据----------")
- currenttime := time.Now().Unix()
- endtime := time.Unix(currenttime, 0).AddDate(0, 3, 0).Unix()
- q := bson.M{"yuceendtime": bson.M{"$gte": currenttime, "$lt": endtime}, "move": nil}
- field := bson.M{"project_duration": 0, "project_timeunit": 0, "projectperiod": 0, "s_subscopeclass": 0}
- util.Debug(q)
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- result := sess.DB("mixdata").C("project_forecast_yece_tmp").Find(q).Select(field).Iter()
- count := 0
- for tmp := make(map[string]interface{}); result.Next(&tmp); count++ {
- if count%200 == 0 {
- util.Debug("count---", count)
- }
- update := []map[string]interface{}{{
- "_id": tmp["_id"],
- },
- {"$set": bson.M{"move": true}},
- }
- updatePool <- update
- id := mongodb.BsonIdToSId(tmp["_id"])
- tmp["infoid"] = id
- delete(tmp, "_id")
- tmp["yucetime"] = currenttime
- tmp["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html`
- if tmp["buyer"] == nil || tmp["buyerperson"] == nil || tmp["buyertel"] == nil {
- esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
- info := Es.Get("project", "project", esq)
- if len(*info) > 0 {
- if (*info)[0]["buyer"] != nil {
- tmp["buyer"] = (*info)[0]["buyer"]
- }
- if (*info)[0]["buyerperson"] != nil {
- tmp["buyerperson"] = (*info)[0]["buyerperson"]
- }
- if (*info)[0]["buyertel"] != nil {
- tmp["buyertel"] = (*info)[0]["buyertel"]
- }
- }
- }
- tpmp := make(map[string]interface{})
- tpmp["p_rate"] = "60%"
- if tmp["purchasing"] != nil {
- tpmp["purchasing"] = tmp["purchasing"]
- tpmp["purchasing"] = util.ObjToString(tpmp["purchasing"]) + "," + util.ObjToString(tmp["projectname"])
- } else {
- tpmp["purchasing"] = tmp["projectname"]
- }
- var arr []map[string]interface{}
- for _, v := range strings.Split(util.ObjToString(tpmp["purchasing"]), ",") {
- p := make(map[string]interface{})
- p["p_purchasing"] = v
- p["p_id"] = id
- p["p_orther"] = tmp["projectname"]
- if tmp["buyerperson"] != nil {
- p["p_person"] = tmp["buyerperson"]
- }
- if tmp["buyertel"] != nil {
- p["p_phone"] = tmp["buyertel"]
- }
- arr = append(arr, p)
- }
- tpmp["p_projects"] = arr
- delete(tmp, "buyerperson")
- delete(tmp, "buyertel")
- tmp["results"] = append([]map[string]interface{}{}, tpmp)
- savePool1 <- tmp
- //update1 := []map[string]interface{}{{
- // "infoid": id,
- //},
- // {"$set": tmp},
- //}
- //updatePool1 <- update1
- }
- util.Debug("over ---", count)
- }
- func YcEndTime(starttime int64, num int, unit string) int64 {
- yuceendtime := int64(0)
- if unit == "日历天" || unit == "天" || unit == "日" {
- yuceendtime = starttime + int64(num*86400)
- } else if unit == "周" {
- yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix()
- } else if unit == "月" {
- yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix()
- } else if unit == "年" {
- yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix()
- } else if unit == "工作日" {
- n := num / 7 * 2
- yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix()
- }
- return yuceendtime
- }
- func FormatDateStr(ds string) int64 {
- ds = strings.Replace(ds, "年", "-", -1)
- ds = strings.Replace(ds, "月", "-", -1)
- ds = strings.Replace(ds, "日", "", -1)
- ds = strings.Replace(ds, "/", "-", -1)
- ds = strings.Replace(ds, ".", "-", -1)
- util.Debug(ds)
- location, err := time.ParseInLocation(util.Date_Short_Layout, ds, time.Local)
- if err != nil {
- util.Debug(err)
- return 0
- } else {
- return location.Unix()
- }
- }
|