task.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/cron"
  6. "go.mongodb.org/mongo-driver/bson"
  7. es "gopkg.in/olivere/elastic.v1"
  8. "mongodb"
  9. "qfw/util"
  10. "regexp"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. var date1 = regexp.MustCompile("20[0-2][0-9][年|\\-\\/|.][0-9]{1,2}[月|\\-|\\/|.][0-9]{1,2}[日]?")
  16. func TimeTask() {
  17. c := cron.New()
  18. cronstr := "0 0 2 * * ?" //每天3点执行 临时表数据
  19. cronstr1 := "0 0 3 * * ?" //每天4点执行 结果表数据
  20. _ = c.AddFunc(cronstr, func() {
  21. findEs()
  22. })
  23. _ = c.AddFunc(cronstr1, func() {
  24. fcResult()
  25. })
  26. c.Start()
  27. }
  28. func findEs() {
  29. util.Debug("预测结果时间-------临时表保存数据----------")
  30. client := Es.GetEsConn()
  31. defer Es.DestoryEsConn(client)
  32. ch := make(chan bool, 5)
  33. wg := &sync.WaitGroup{}
  34. //esquery := `{
  35. //"query": {
  36. // "bool": {
  37. // "must": [
  38. // {
  39. // "terms": {
  40. // "subtype": [
  41. // "成交",
  42. // "合同"
  43. // ]
  44. // }
  45. // },
  46. // {
  47. // "terms": {
  48. // "s_subscopeclass": [
  49. // "服务采购_法律咨询",
  50. // "服务采购_会计",
  51. // "服务采购_物业",
  52. // "服务采购_审计",
  53. // "服务采购_安保",
  54. // "服务采购_仓储物流",
  55. // "服务采购_广告宣传印刷"
  56. // ]
  57. // }
  58. // }
  59. // ]
  60. // }
  61. //},
  62. //"_source": [
  63. // "_id",
  64. // "title",
  65. // "buyer",
  66. // "buyerclass",
  67. // "s_subscopeclass",
  68. // "yuceendtime",
  69. // "area",
  70. // "city",
  71. // "subtype",
  72. // "projectname",
  73. // "purchasing",
  74. // "href",
  75. // "projectcode",
  76. // "publishtime",
  77. // "buyerperson",
  78. // "buyertel"]
  79. //}`
  80. //esquery := `{"query":{"bool":{"must":[{"term":{"id":"6168359c06a9d911e598d573"}}]}}}`
  81. currenttime := time.Now().Unix()
  82. stime := time.Unix(currenttime, 0).AddDate(0, 0, -1).Unix()
  83. esquery := `{"query":{"bool":{"must":[{"range":{"comeintime":{"from":` + fmt.Sprint(stime) + `,"to":` + fmt.Sprint(currenttime) + `}}}],"must_not":[{"constant_score":{"filter":{"missing":{"field":"yuceendtime"}}}}]}}}`
  84. util.Debug(esquery)
  85. fieldArr := []string{"_id", "title", "buyer", "buyerclass", "s_subscopeclass", "yuceendtime", "area", "city", "subtype",
  86. "projectname", "purchasing", "href", "projectcode", "publishtime", "buyerperson", "buyertel", "projectperiod",
  87. "project_duration", "project_timeunit", "signaturedate"}
  88. escount := Es.Count("bidding", "bidding", esquery)
  89. util.Debug("查询总数:", escount)
  90. //查询条件类型转换
  91. var q es.Query
  92. tmpQuery := es.BoolQuery{
  93. QueryStrings: esquery,
  94. }
  95. q = tmpQuery
  96. numDocs := 0
  97. //游标查询,index不支持别名,只能写索引库的名称
  98. res, err := client.Scroll("bidding_v1").Query(q).Size(500).Do() //查询一条获取游标
  99. if err == nil {
  100. scrollId := res.ScrollId
  101. for {
  102. if scrollId == "" {
  103. util.Debug("ScrollId Is Error")
  104. break
  105. }
  106. searchResult, err := client.Scroll("bidding_v1").Size(500).ScrollId(scrollId).Do() //查询
  107. if err != nil {
  108. if err.Error() == "EOS" { //迭代完毕
  109. util.Debug("Es Search Data Over:", err)
  110. } else {
  111. util.Debug("Es Search Data Error:", err)
  112. }
  113. break
  114. }
  115. for _, hit := range searchResult.Hits.Hits {
  116. //开始处理数据
  117. wg.Add(1)
  118. ch <- true
  119. go func(tmpHit *es.SearchHit) {
  120. defer func() {
  121. <-ch
  122. wg.Done()
  123. }()
  124. tmp := make(map[string]interface{})
  125. if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
  126. save := make(map[string]interface{})
  127. for _, v := range fieldArr {
  128. if tmp[v] != nil {
  129. save[v] = tmp[v]
  130. }
  131. }
  132. //istart, iend := YcTime(save)
  133. //if istart > 0 && iend > 0 {
  134. // save["yucestarttime"] = istart
  135. // save["yuceendtime"] = iend
  136. //
  137. // upStr := fmt.Sprintf("ctx._source.yuceendtime=%d", iend)
  138. // if tmp["yeceendtime"] != nil {
  139. // upStr += fmt.Sprintf(";ctx._source.yeceendtime=%d", 0)
  140. // }
  141. // updateEs := map[string]string{
  142. // "id": util.ObjToString(tmp["_id"]),
  143. // "updateStr": upStr,
  144. // }
  145. // Es.UpdateOne("bidding", "bidding", updateEs)
  146. //}
  147. savePool <- save
  148. //update := []map[string]interface{}{{
  149. // "_id": save["_id"],
  150. //},
  151. // {"$set": save},
  152. //}
  153. //updatePool <- update
  154. }
  155. }(hit)
  156. numDocs += 1
  157. if numDocs%100 == 0 {
  158. util.Debug("Current:", numDocs)
  159. }
  160. }
  161. scrollId = searchResult.ScrollId
  162. }
  163. wg.Wait()
  164. util.Debug("over---", numDocs)
  165. client.ClearScroll().ScrollId(scrollId).Do() //清理游标
  166. }
  167. }
  168. func YcTime(tmp map[string]interface{}) (int64, int64) {
  169. // yucestarttime、yuceendtime
  170. yucestarttime := int64(0)
  171. yuceendtime := int64(0)
  172. // 项目周期中
  173. if util.ObjToString(tmp["projectperiod"]) != "" {
  174. dateStr := date1.FindStringSubmatch(util.ObjToString(tmp["projectperiod"]))
  175. if len(dateStr) == 2 {
  176. sdate := FormatDateStr(dateStr[0])
  177. edate := FormatDateStr(dateStr[1])
  178. if sdate < edate && sdate != 0 && edate != 0 {
  179. yucestarttime = sdate
  180. yuceendtime = edate
  181. }
  182. }
  183. }
  184. if yucestarttime > 0 && yuceendtime > yucestarttime {
  185. return yucestarttime, yuceendtime
  186. }
  187. // 预测开始时间 合同签订日期
  188. if util.IntAll(tmp["signaturedate"]) <= 0 {
  189. if util.IntAll(tmp["publishtime"]) <= 0 {
  190. return 0, 0
  191. } else {
  192. yucestarttime = util.Int64All(tmp["publishtime"])
  193. }
  194. } else {
  195. yucestarttime = util.Int64All(tmp["signaturedate"])
  196. }
  197. // 预测结束时间
  198. if yucestarttime > 0 {
  199. if util.IntAll(tmp["project_duration"]) > 0 && util.ObjToString(tmp["project_timeunit"]) != "" {
  200. yuceendtime = YcEndTime(yucestarttime, util.IntAll(tmp["project_duration"]), util.ObjToString(tmp["project_timeunit"]))
  201. return yucestarttime, yuceendtime
  202. }
  203. }
  204. return 0, 0
  205. }
  206. func fcResult() {
  207. util.Debug("预测结果时间-------结果表迁移数据----------")
  208. currenttime := time.Now().Unix()
  209. endtime := time.Unix(currenttime, 0).AddDate(0, 3, 0).Unix()
  210. q := bson.M{"yuceendtime": bson.M{"$gte": currenttime, "$lt": endtime}, "move": nil}
  211. field := bson.M{"project_duration": 0, "project_timeunit": 0, "projectperiod": 0, "s_subscopeclass": 0}
  212. util.Debug(q)
  213. sess := MongoTool.GetMgoConn()
  214. defer MongoTool.DestoryMongoConn(sess)
  215. result := sess.DB("mixdata").C("project_forecast_yece_tmp").Find(q).Select(field).Iter()
  216. count := 0
  217. for tmp := make(map[string]interface{}); result.Next(&tmp); count++ {
  218. if count%200 == 0 {
  219. util.Debug("count---", count)
  220. }
  221. update := []map[string]interface{}{{
  222. "_id": tmp["_id"],
  223. },
  224. {"$set": bson.M{"move": true}},
  225. }
  226. updatePool <- update
  227. id := mongodb.BsonIdToSId(tmp["_id"])
  228. tmp["infoid"] = id
  229. delete(tmp, "_id")
  230. tmp["yucetime"] = currenttime
  231. tmp["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html`
  232. if tmp["buyer"] == nil || tmp["buyerperson"] == nil || tmp["buyertel"] == nil {
  233. esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
  234. info := Es.Get("project", "project", esq)
  235. if len(*info) > 0 {
  236. if (*info)[0]["buyer"] != nil {
  237. tmp["buyer"] = (*info)[0]["buyer"]
  238. }
  239. if (*info)[0]["buyerperson"] != nil {
  240. tmp["buyerperson"] = (*info)[0]["buyerperson"]
  241. }
  242. if (*info)[0]["buyertel"] != nil {
  243. tmp["buyertel"] = (*info)[0]["buyertel"]
  244. }
  245. }
  246. }
  247. tpmp := make(map[string]interface{})
  248. tpmp["p_rate"] = "60%"
  249. if tmp["purchasing"] != nil {
  250. tpmp["purchasing"] = tmp["purchasing"]
  251. tpmp["purchasing"] = util.ObjToString(tpmp["purchasing"]) + "," + util.ObjToString(tmp["projectname"])
  252. } else {
  253. tpmp["purchasing"] = tmp["projectname"]
  254. }
  255. var arr []map[string]interface{}
  256. for _, v := range strings.Split(util.ObjToString(tpmp["purchasing"]), ",") {
  257. p := make(map[string]interface{})
  258. p["p_purchasing"] = v
  259. p["p_id"] = id
  260. p["p_orther"] = tmp["projectname"]
  261. if tmp["buyerperson"] != nil {
  262. p["p_person"] = tmp["buyerperson"]
  263. }
  264. if tmp["buyertel"] != nil {
  265. p["p_phone"] = tmp["buyertel"]
  266. }
  267. arr = append(arr, p)
  268. }
  269. tpmp["p_projects"] = arr
  270. delete(tmp, "buyerperson")
  271. delete(tmp, "buyertel")
  272. tmp["results"] = append([]map[string]interface{}{}, tpmp)
  273. savePool1 <- tmp
  274. //update1 := []map[string]interface{}{{
  275. // "infoid": id,
  276. //},
  277. // {"$set": tmp},
  278. //}
  279. //updatePool1 <- update1
  280. }
  281. util.Debug("over ---", count)
  282. }
  283. func YcEndTime(starttime int64, num int, unit string) int64 {
  284. yuceendtime := int64(0)
  285. if unit == "日历天" || unit == "天" || unit == "日" {
  286. yuceendtime = starttime + int64(num*86400)
  287. } else if unit == "周" {
  288. yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix()
  289. } else if unit == "月" {
  290. yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix()
  291. } else if unit == "年" {
  292. yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix()
  293. } else if unit == "工作日" {
  294. n := num / 7 * 2
  295. yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix()
  296. }
  297. return yuceendtime
  298. }
  299. func FormatDateStr(ds string) int64 {
  300. ds = strings.Replace(ds, "年", "-", -1)
  301. ds = strings.Replace(ds, "月", "-", -1)
  302. ds = strings.Replace(ds, "日", "", -1)
  303. ds = strings.Replace(ds, "/", "-", -1)
  304. ds = strings.Replace(ds, ".", "-", -1)
  305. util.Debug(ds)
  306. location, err := time.ParseInLocation(util.Date_Short_Layout, ds, time.Local)
  307. if err != nil {
  308. util.Debug(err)
  309. return 0
  310. } else {
  311. return location.Unix()
  312. }
  313. }