task.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/cron"
  7. "go.mongodb.org/mongo-driver/bson"
  8. es "gopkg.in/olivere/elastic.v7"
  9. "mongodb"
  10. "qfw/util"
  11. "regexp"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var date1 = regexp.MustCompile("20[0-2][0-9][年|\\-\\/|.][0-9]{1,2}[月|\\-|\\/|.][0-9]{1,2}[日]?")
  17. func TimeTask() {
  18. c := cron.New()
  19. cronstr := "0 0 2 * * ?" //每天3点执行 临时表数据
  20. cronstr1 := "0 0 3 * * ?" //每天4点执行 结果表数据
  21. _ = c.AddFunc(cronstr, func() {
  22. findEs()
  23. })
  24. _ = c.AddFunc(cronstr1, func() {
  25. fcResult()
  26. })
  27. c.Start()
  28. }
  29. func findEs() {
  30. util.Debug("预测结果时间-------临时表保存数据----------")
  31. client := Es.GetEsConn()
  32. defer Es.DestoryEsConn(client)
  33. ch := make(chan bool, 5)
  34. wg := &sync.WaitGroup{}
  35. //esquery := `{
  36. //"query": {
  37. // "bool": {
  38. // "must": [
  39. // {
  40. // "terms": {
  41. // "subtype": [
  42. // "成交",
  43. // "合同"
  44. // ]
  45. // }
  46. // },
  47. // {
  48. // "terms": {
  49. // "s_subscopeclass": [
  50. // "服务采购_法律咨询",
  51. // "服务采购_会计",
  52. // "服务采购_物业",
  53. // "服务采购_审计",
  54. // "服务采购_安保",
  55. // "服务采购_仓储物流",
  56. // "服务采购_广告宣传印刷"
  57. // ]
  58. // }
  59. // }
  60. // ]
  61. // }
  62. //},
  63. //"_source": [
  64. // "_id",
  65. // "title",
  66. // "buyer",
  67. // "buyerclass",
  68. // "s_subscopeclass",
  69. // "yuceendtime",
  70. // "area",
  71. // "city",
  72. // "subtype",
  73. // "projectname",
  74. // "purchasing",
  75. // "href",
  76. // "projectcode",
  77. // "publishtime",
  78. // "buyerperson",
  79. // "buyertel"]
  80. //}`
  81. //esquery := `{"query":{"bool":{"must":[{"term":{"id":"6168359c06a9d911e598d573"}}]}}}`
  82. currenttime := time.Now().Unix()
  83. stime := time.Unix(currenttime, 0).AddDate(0, 0, -1).Unix()
  84. esquery := `{"query":{"bool":{"must":[{"range":{"comeintime":{"from":` + fmt.Sprint(stime) + `,"to":` + fmt.Sprint(currenttime) + `}}}],"must_not":[{"constant_score":{"filter":{"missing":{"field":"yuceendtime"}}}}]}}}`
  85. util.Debug(esquery)
  86. fieldArr := []string{"_id", "title", "buyer", "buyerclass", "s_subscopeclass", "yuceendtime", "area", "city", "subtype",
  87. "projectname", "purchasing", "href", "projectcode", "publishtime", "buyerperson", "buyertel", "projectperiod",
  88. "project_duration", "project_timeunit", "signaturedate"}
  89. escount := Es.Count("bidding", "bidding", esquery)
  90. util.Debug("查询总数:", escount)
  91. //查询条件类型转换
  92. //var q es.Query
  93. //tmpQuery := es.BoolQuery{
  94. // QueryStrings: esquery,
  95. //}
  96. //q = tmpQuery
  97. numDocs := 0
  98. query := es.NewBoolQuery().
  99. Must(es.NewRangeQuery("comeintime").Gte(stime).Lte(currenttime)).
  100. MustNot(es.NewExistsQuery("yuceendtime"))
  101. //游标查询,index不支持别名,只能写索引库的名称
  102. res, err := client.Scroll("bidding").Query(query).Scroll("5m").Size(500).Do(context.TODO()) //查询一条获取游标
  103. if err == nil {
  104. scrollId := res.ScrollId
  105. for {
  106. if scrollId == "" {
  107. util.Debug("ScrollId Is Error")
  108. break
  109. }
  110. searchResult, err := client.Scroll("1m").Size(500).ScrollId(scrollId).Do(context.TODO()) //查询
  111. if err != nil {
  112. if err.Error() == "EOS" { //迭代完毕
  113. util.Debug("Es Search Data Over:", err)
  114. } else {
  115. util.Debug("Es Search Data Error:", err)
  116. }
  117. break
  118. }
  119. for _, hit := range searchResult.Hits.Hits {
  120. //开始处理数据
  121. wg.Add(1)
  122. ch <- true
  123. go func(tmpHit *es.SearchHit) {
  124. defer func() {
  125. <-ch
  126. wg.Done()
  127. }()
  128. tmp := make(map[string]interface{})
  129. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  130. save := make(map[string]interface{})
  131. for _, v := range fieldArr {
  132. if tmp[v] != nil {
  133. save[v] = tmp[v]
  134. }
  135. }
  136. //istart, iend := YcTime(save)
  137. //if istart > 0 && iend > 0 {
  138. // save["yucestarttime"] = istart
  139. // save["yuceendtime"] = iend
  140. //
  141. // upStr := fmt.Sprintf("ctx._source.yuceendtime=%d", iend)
  142. // if tmp["yeceendtime"] != nil {
  143. // upStr += fmt.Sprintf(";ctx._source.yeceendtime=%d", 0)
  144. // }
  145. // updateEs := map[string]string{
  146. // "id": util.ObjToString(tmp["_id"]),
  147. // "updateStr": upStr,
  148. // }
  149. // Es.UpdateOne("bidding", "bidding", updateEs)
  150. //}
  151. savePool <- save
  152. //update := []map[string]interface{}{{
  153. // "_id": save["_id"],
  154. //},
  155. // {"$set": save},
  156. //}
  157. //updatePool <- update
  158. }
  159. }(hit)
  160. numDocs += 1
  161. if numDocs%5000 == 0 {
  162. util.Debug("Current:", numDocs)
  163. }
  164. }
  165. scrollId = searchResult.ScrollId
  166. }
  167. wg.Wait()
  168. util.Debug("over---", numDocs)
  169. client.ClearScroll().ScrollId(scrollId).Do(context.TODO()) //清理游标
  170. }
  171. }
  172. func YcTime(tmp map[string]interface{}) (int64, int64) {
  173. // yucestarttime、yuceendtime
  174. yucestarttime := int64(0)
  175. yuceendtime := int64(0)
  176. // 项目周期中
  177. if util.ObjToString(tmp["projectperiod"]) != "" {
  178. dateStr := date1.FindStringSubmatch(util.ObjToString(tmp["projectperiod"]))
  179. if len(dateStr) == 2 {
  180. sdate := FormatDateStr(dateStr[0])
  181. edate := FormatDateStr(dateStr[1])
  182. if sdate < edate && sdate != 0 && edate != 0 {
  183. yucestarttime = sdate
  184. yuceendtime = edate
  185. }
  186. }
  187. }
  188. if yucestarttime > 0 && yuceendtime > yucestarttime {
  189. return yucestarttime, yuceendtime
  190. }
  191. // 预测开始时间 合同签订日期
  192. if util.IntAll(tmp["signaturedate"]) <= 0 {
  193. if util.IntAll(tmp["publishtime"]) <= 0 {
  194. return 0, 0
  195. } else {
  196. yucestarttime = util.Int64All(tmp["publishtime"])
  197. }
  198. } else {
  199. yucestarttime = util.Int64All(tmp["signaturedate"])
  200. }
  201. // 预测结束时间
  202. if yucestarttime > 0 {
  203. if util.IntAll(tmp["project_duration"]) > 0 && util.ObjToString(tmp["project_timeunit"]) != "" {
  204. yuceendtime = YcEndTime(yucestarttime, util.IntAll(tmp["project_duration"]), util.ObjToString(tmp["project_timeunit"]))
  205. return yucestarttime, yuceendtime
  206. }
  207. }
  208. return 0, 0
  209. }
  210. func fcResult() {
  211. util.Debug("预测结果时间-------结果表迁移数据----------")
  212. //currenttime := time.Now().Unix()
  213. //endtime := time.Unix(currenttime, 0).AddDate(0, 3, 0).Unix()
  214. q := bson.M{"yuceendtime": bson.M{"$exists": 1}, "move": nil}
  215. field := bson.M{"project_duration": 0, "project_timeunit": 0, "projectperiod": 0, "s_subscopeclass": 0}
  216. util.Debug(q)
  217. sess := MongoTool.GetMgoConn()
  218. defer MongoTool.DestoryMongoConn(sess)
  219. result := sess.DB("mixdata").C("project_forecast_yece_tmp").Find(q).Select(field).Iter()
  220. count := 0
  221. for tmp := make(map[string]interface{}); result.Next(&tmp); count++ {
  222. if count%200 == 0 {
  223. util.Debug("count---", count)
  224. }
  225. update := []map[string]interface{}{{
  226. "_id": tmp["_id"],
  227. },
  228. {"$set": bson.M{"move": true}},
  229. }
  230. updatePool <- update
  231. id := mongodb.BsonIdToSId(tmp["_id"])
  232. tmp["infoid"] = id
  233. tmp["yucetime"] = util.Int64All(tmp["publishtime"])
  234. tmp["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html`
  235. if tmp["buyer"] == nil || tmp["buyerperson"] == nil || tmp["buyertel"] == nil {
  236. esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
  237. info := Es.Get("projectset", esq)
  238. if len(*info) > 0 {
  239. if (*info)[0]["buyer"] != nil {
  240. tmp["buyer"] = (*info)[0]["buyer"]
  241. }
  242. if (*info)[0]["buyerperson"] != nil {
  243. tmp["buyerperson"] = (*info)[0]["buyerperson"]
  244. }
  245. if (*info)[0]["buyertel"] != nil {
  246. tmp["buyertel"] = (*info)[0]["buyertel"]
  247. }
  248. }
  249. }
  250. tpmp := make(map[string]interface{})
  251. tpmp["p_rate"] = "60%"
  252. if tmp["purchasing"] != nil {
  253. tpmp["purchasing"] = tmp["purchasing"]
  254. tpmp["purchasing"] = util.ObjToString(tpmp["purchasing"]) + "," + util.ObjToString(tmp["projectname"])
  255. } else {
  256. tpmp["purchasing"] = tmp["projectname"]
  257. }
  258. var arr []map[string]interface{}
  259. for _, v := range strings.Split(util.ObjToString(tpmp["purchasing"]), ",") {
  260. p := make(map[string]interface{})
  261. p["p_purchasing"] = v
  262. p["p_id"] = id
  263. p["p_orther"] = tmp["projectname"]
  264. if tmp["buyerperson"] != nil {
  265. p["p_person"] = tmp["buyerperson"]
  266. }
  267. if tmp["buyertel"] != nil {
  268. p["p_phone"] = tmp["buyertel"]
  269. }
  270. arr = append(arr, p)
  271. }
  272. tpmp["p_projects"] = arr
  273. delete(tmp, "buyerperson")
  274. delete(tmp, "buyertel")
  275. tmp["results"] = append([]map[string]interface{}{}, tpmp)
  276. savePool1 <- tmp
  277. //update1 := []map[string]interface{}{{
  278. // "infoid": id,
  279. //},
  280. // {"$set": tmp},
  281. //}
  282. //updatePool1 <- update1
  283. }
  284. util.Debug("over ---", count)
  285. }
  286. func YcEndTime(starttime int64, num int, unit string) int64 {
  287. yuceendtime := int64(0)
  288. if unit == "日历天" || unit == "天" || unit == "日" {
  289. yuceendtime = starttime + int64(num*86400)
  290. } else if unit == "周" {
  291. yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix()
  292. } else if unit == "月" {
  293. yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix()
  294. } else if unit == "年" {
  295. yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix()
  296. } else if unit == "工作日" {
  297. n := num / 7 * 2
  298. yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix()
  299. }
  300. return yuceendtime
  301. }
  302. func FormatDateStr(ds string) int64 {
  303. ds = strings.Replace(ds, "年", "-", -1)
  304. ds = strings.Replace(ds, "月", "-", -1)
  305. ds = strings.Replace(ds, "日", "", -1)
  306. ds = strings.Replace(ds, "/", "-", -1)
  307. ds = strings.Replace(ds, ".", "-", -1)
  308. util.Debug(ds)
  309. location, err := time.ParseInLocation(util.Date_Short_Layout, ds, time.Local)
  310. if err != nil {
  311. util.Debug(err)
  312. return 0
  313. } else {
  314. return location.Unix()
  315. }
  316. }