task.go 12 KB


  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. mgoutil "mongodb"
  6. qu "qfw/util"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/donnie4w/go-logger/logger"
  11. "go.mongodb.org/mongo-driver/bson"
  12. //"go.mongodb.org/mongo-driver/bson/primitive"
  13. )
  14. const ESMODEL = `
  15. {
  16. "query": {
  17. "filtered": {
  18. "filter": {
  19. "bool": {
  20. "must": [
  21. {
  22. "term": {
  23. "buyer": "%s"
  24. }
  25. }
  26. ]
  27. }
  28. },
  29. "query": {
  30. "bool": {
  31. "should": [
  32. {
  33. "multi_match": {
  34. "query": "%s",
  35. "type": "phrase",
  36. "fields": [
  37. "purchasing",
  38. "s_projectname",
  39. "title"
  40. ]
  41. }
  42. }
  43. ]
  44. }
  45. }
  46. }
  47. },
  48. "from": 0,
  49. "size": 100,
  50. "sort": [
  51. {
  52. "publishtime": "desc"
  53. }
  54. ],
  55. "_source": [
  56. "buyerperson",
  57. "buyertel",
  58. "projectname",
  59. "_id"
  60. ]
  61. }
  62. `
  63. func SaveMgo() {
  64. log.Println("Mgo Save...")
  65. arru := make([]map[string]interface{}, 200)
  66. indexu := 0
  67. for {
  68. select {
  69. case v := <-MgoSaveCache:
  70. arru[indexu] = v
  71. indexu++
  72. if indexu == 200 {
  73. SP <- true
  74. go func(arru []map[string]interface{}) {
  75. defer func() {
  76. <-SP
  77. }()
  78. MixMgo.SaveBulk(CollSave, arru...)
  79. }(arru)
  80. arru = make([]map[string]interface{}, 200)
  81. indexu = 0
  82. }
  83. case <-time.After(1000 * time.Millisecond):
  84. if indexu > 0 {
  85. SP <- true
  86. go func(arru []map[string]interface{}) {
  87. defer func() {
  88. <-SP
  89. }()
  90. MixMgo.SaveBulk(CollSave, arru...)
  91. }(arru[:indexu])
  92. arru = make([]map[string]interface{}, 200)
  93. indexu = 0
  94. }
  95. }
  96. }
  97. }
  98. func GetProjectData(sid, eid string) {
  99. defer qu.Catch()
  100. sess := MongoTool.GetMgoConn()
  101. defer MongoTool.DestoryMongoConn(sess)
  102. query := bson.M{
  103. "_id": bson.M{
  104. "$gt": mgoutil.StringTOBsonId(sid),
  105. "$lte": mgoutil.StringTOBsonId(eid),
  106. },
  107. "toptype": "拟建",
  108. }
  109. filed := map[string]interface{}{
  110. "area": 1,
  111. "city": 1,
  112. "buyer": 1,
  113. "projectname": 1,
  114. "title": 1,
  115. "href": 1,
  116. "publishtime": 1,
  117. "main_project": 1,
  118. "nature": 1,
  119. "top_category": 1,
  120. "sub_category": 1,
  121. "stage": 1,
  122. "approvestatus": 1,
  123. "projectinfo": 1,
  124. "projectcode": 1,
  125. }
  126. logger.Debug("query:", query)
  127. count, _ := sess.DB(Dbname).C(CollPro).Find(query).Count()
  128. log.Println("共查询:", count, "条")
  129. if count == 0 {
  130. return
  131. }
  132. it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
  133. pool := make(chan bool, 10) //控制线程数
  134. wg := &sync.WaitGroup{}
  135. sum := 0
  136. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  137. if sum%100 == 0 {
  138. log.Println("current:", sum)
  139. }
  140. pool <- true
  141. wg.Add(1)
  142. go func(pro map[string]interface{}) {
  143. defer func() {
  144. <-pool
  145. wg.Done()
  146. }()
  147. //stage必须存在
  148. stage := qu.ObjToString(pro["stage"])
  149. if stage == "" {
  150. return
  151. }
  152. //top_category必须存在
  153. top_category := qu.ObjToString(pro["top_category"])
  154. if top_category == "" {
  155. return
  156. }
  157. //approvestatus审批通过
  158. approvestatus := qu.ObjToString(pro["approvestatus"])
  159. if approvestatus != "审批通过" {
  160. return
  161. }
  162. //projectinfo.projecttype 报建、核准类、核准、审批类、审批、null
  163. projectinfo := pro["projectinfo"].(map[string]interface{})
  164. if projecttype, ok := projectinfo["projecttype"].(string); ok {
  165. if !ProjectTypeMap[projecttype] { //有值且不在范围内不进行项目预测
  166. return
  167. }
  168. }
  169. delete(pro, "projectinfo")
  170. //nature
  171. nature := qu.ObjToString(pro["nature"])
  172. if !NatureMap[nature] {
  173. return
  174. }
  175. //buyer存在且该buyer不仅有拟建数据
  176. buyer := qu.ObjToString(pro["buyer"])
  177. if buyer != "" {
  178. esqyery := `{"query": {"bool": {"must": [{"term": {"buyer": "` + buyer + `"}}],"must_not": [{"term": {"toptype": "拟建"}}]}},"from": 0,"size": 1}`
  179. list := Es.Get(Index, Itype, esqyery)
  180. if list == nil || len(*list) == 0 { //buyer仅有拟建数据不预测
  181. return
  182. }
  183. }
  184. //id
  185. id := mgoutil.BsonIdToSId(pro["_id"])
  186. pro["infoid"] = id
  187. pro["jyhref"] = `https://www.jianyu360.com/article/content/` + qu.CommonEncodeArticle("content", id) + `.html`
  188. delete(pro, "_id")
  189. //yucetime
  190. pro["yucetime"] = time.Now().Unix()
  191. //buyerclass
  192. ent, _ := MixMgo.FindOne(CollEnt, bson.M{"buyer_name": buyer})
  193. if len(*ent) > 0 && (*ent)["buyerclass"] != nil {
  194. arr := (*ent)["buyerclass"].([]interface{})
  195. if len(arr) == 1 {
  196. pro["buyerclass"] = arr
  197. } else {
  198. var arrTmp []string
  199. for _, v := range arr {
  200. val := qu.ObjToString(v)
  201. if val != "其它" {
  202. arrTmp = append(arrTmp, val)
  203. }
  204. }
  205. pro["buyerclass"] = arrTmp
  206. }
  207. }
  208. maps := []map[string]interface{}{}
  209. sub_category := qu.ObjToString(pro["sub_category"])
  210. arr := Forecast[stage]
  211. ForecastFlag := 0 //标记查询了几次标签表
  212. //qu.Debug("top_category---", top_category, "sub_category---", sub_category, "stage---", stage)
  213. for { //查project_biaoqian
  214. tmpArr := arr
  215. q := bson.M{} //查询条件
  216. if sub_category != "" { //sub_category存在优先用sub_category
  217. q = bson.M{
  218. //"sub_category": sub_category,
  219. "sub_category": bson.M{"$elemMatch": bson.M{"$eq": sub_category}},
  220. }
  221. } else { //top_category
  222. q = bson.M{
  223. "top_category": top_category,
  224. }
  225. }
  226. if stage == "后期施工" || stage == "竣工验收" || stage == "运行维护" {
  227. //qu.Debug("ForecastFlag---", ForecastFlag)
  228. if ForecastFlag == 0 { //第一次增加main_project判断
  229. main_project := qu.ObjToString(pro["main_project"])
  230. //qu.Debug("main_project---", main_project)
  231. if main_project != "" { //main_project存在
  232. tmpArr = append(tmpArr, "物品采购")
  233. //q["main_project"] = main_project
  234. q["main_project"] = bson.M{"$elemMatch": bson.M{"$eq": main_project}}
  235. q["stage"] = bson.M{"$in": tmpArr}
  236. } else { //main_project不存在
  237. if stage == "运行维护" { //stage是运行维护且main_project是空不预测
  238. //qu.Debug("---return---")
  239. return
  240. } else {
  241. q["stage"] = bson.M{"$in": tmpArr}
  242. ForecastFlag++
  243. }
  244. }
  245. } else if ForecastFlag == 1 {
  246. q["stage"] = bson.M{"$in": tmpArr}
  247. }
  248. ForecastFlag++
  249. } else { //规划可研、立项环评、勘察设计、建设准备、前期施工
  250. q["stage"] = bson.M{"$in": tmpArr}
  251. ForecastFlag = 2
  252. }
  253. //qu.Debug("ForecastFlag---", ForecastFlag, "q--------------", q)
  254. result, _ := MixMgo.Find(CollTag, q, nil, nil, false, -1, -1)
  255. //qu.Debug("result---", len(*result))
  256. if len(*result) == 0 && ForecastFlag <= 1 {
  257. continue
  258. } else if len(*result) >= 1 { //第一次查询有results就不再查询
  259. ForecastFlag++
  260. }
  261. for _, t := range *result {
  262. t["p_rate"] = Rate
  263. t["time"] = ""
  264. projects := GetProjects(qu.ObjToString(t["purchasing"]), buyer)
  265. if len(projects) > 0 {
  266. t["p_projects"] = projects
  267. }
  268. maps = append(maps, t)
  269. }
  270. //qu.Debug("ForecastFlag---", ForecastFlag)
  271. if ForecastFlag >= 2 {
  272. break
  273. }
  274. }
  275. if len(maps) > 0 {
  276. pro["results"] = maps
  277. }
  278. MgoSaveCache <- pro
  279. }(tmp)
  280. tmp = make(map[string]interface{})
  281. }
  282. wg.Wait()
  283. log.Println("Run Over...Count:", sum)
  284. }
  285. func GetProjects(purchasing, buyer string) (projects []map[string]interface{}) {
  286. if purchasing != "" {
  287. for _, text := range strings.Split(purchasing, ",") {
  288. latest_project := map[string]interface{}{} //存储最后一条数据信息
  289. result_project := map[string]interface{}{} //存储每个purchasing所查询的招标信息
  290. esquery := fmt.Sprintf(ESMODEL, buyer, text)
  291. list := Es.Get(Index, Itype, esquery)
  292. if list != nil && len(*list) > 0 {
  293. for i, l := range *list {
  294. p_phone := qu.ObjToString(l["buyertel"])
  295. if p_phone != "" { //记录有联系电话的最新信息
  296. result_project["p_purchasing"] = text
  297. result_project["p_phone"] = p_phone
  298. if p_person := qu.ObjToString(l["buyerperson"]); p_person != "" {
  299. result_project["p_person"] = p_person
  300. }
  301. result_project["p_id"] = qu.ObjToString(l["_id"])
  302. result_project["p_orther"] = qu.ObjToString(l["projectname"])
  303. break
  304. }
  305. if i == 0 { //记录第一条数据信息
  306. latest_project["p_purchasing"] = text
  307. // if p_phone != "" {
  308. // latest_project["p_phone"] = p_phone
  309. // }
  310. if p_person := qu.ObjToString(l["buyerperson"]); p_person != "" {
  311. latest_project["p_person"] = p_person
  312. }
  313. latest_project["p_id"] = qu.ObjToString(l["_id"])
  314. latest_project["p_orther"] = qu.ObjToString(l["projectname"])
  315. }
  316. }
  317. }
  318. if len(result_project) > 0 {
  319. projects = append(projects, result_project)
  320. } else if len(latest_project) > 0 {
  321. projects = append(projects, latest_project)
  322. }
  323. }
  324. }
  325. return
  326. }
  327. /*
  328. func GetProjectData_back(t string) {
  329. defer qu.Catch()
  330. sess := MongoTool.GetMgoConn()
  331. defer MongoTool.DestoryMongoConn(sess)
  332. uptime, err := strconv.ParseInt(t, 10, 64)
  333. if err != nil {
  334. qu.Debug("时间转换错误:", t)
  335. return
  336. }
  337. query := bson.M{
  338. "updatetime": bson.M{"$gt": uptime},
  339. "o_projectinfo.nature": bson.M{"$in": Nature},
  340. "spidercode": bson.M{"$in": SpiderCodes},
  341. "stage": bson.M{"$exists": true},
  342. "$or": []bson.M{
  343. {"category_buyer": bson.M{"$in": Category}},
  344. {"category_purpose": bson.M{"$in": Category}},
  345. },
  346. }
  347. filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "category": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1, "title": 1}
  348. count, _ := sess.DB(Dbname).C(CollPro).Find(query).Count()
  349. log.Println("共查询:", count, "条")
  350. if count == 0 {
  351. return
  352. }
  353. it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
  354. pool := make(chan bool, 10) //控制线程数
  355. wg := &sync.WaitGroup{}
  356. //lock := &sync.Mutex{} //控制读写
  357. sum := 0
  358. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  359. if sum%100 == 0 {
  360. log.Println("current:", sum)
  361. }
  362. pool <- true
  363. wg.Add(1)
  364. go func(pro map[string]interface{}) {
  365. defer func() {
  366. <-pool
  367. wg.Done()
  368. }()
  369. stage := qu.ObjToString(pro["stage"])
  370. if stage == "" {
  371. //log.Println("stage is null", pro["infoid"])
  372. return
  373. }
  374. if id, ok := pro["_id"].(string); ok && id != "" {
  375. pro["infoid"] = id
  376. } else {
  377. pro["infoid"] = mgoutil.BsonIdToSId(pro["_id"])
  378. }
  379. pro["yucetime"] = time.Now().Unix()
  380. pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"]
  381. buyer := (*qu.ObjToMap(pro["o_projectinfo"]))["buyer"]
  382. delete(pro, "_id")
  383. delete(pro, "o_projectinfo")
  384. //qu.Debug("buyer---", buyer)
  385. ent, _ := MixMgo.FindOne(CollEnt, bson.M{"buyer_name": buyer})
  386. if len(*ent) > 0 && (*ent)["buyerclass"] != nil {
  387. arr := (*ent)["buyerclass"].(primitive.A)
  388. if len(arr) == 1 {
  389. pro["buyerclass"] = arr
  390. } else {
  391. var arrTmp []string
  392. for _, v := range arr {
  393. val := qu.ObjToString(v)
  394. if val != "其它" {
  395. arrTmp = append(arrTmp, val)
  396. }
  397. }
  398. pro["buyerclass"] = arrTmp
  399. }
  400. }
  401. //qu.Debug("buyerclass---", pro["buyerclass"])
  402. category := GetCategory(pro)
  403. //qu.Debug("category---", category)
  404. if category == "" {
  405. return
  406. }
  407. q := bson.M{
  408. "category": category,
  409. "stage": bson.M{"$in": Forecast[stage]},
  410. }
  411. //qu.Debug("q----", q)
  412. result, _ := MixMgo.Find(CollTag, q, nil, nil, false, -1, -1)
  413. //qu.Debug("result---", *result)
  414. maps := []map[string]interface{}{}
  415. for _, t := range *result {
  416. // if len(t) == 0 {
  417. // continue
  418. // }
  419. r := make(map[string]interface{})
  420. r["stage"] = t["stage"]
  421. r["purchase_classify"] = t["purchase_classify"]
  422. r["purchasing"] = t["purchasing"]
  423. r["p_rate"] = Rate
  424. r["time"] = ""
  425. //tmp["p_projects"] = "" 暂无该字段
  426. maps = append(maps, r)
  427. }
  428. if len(maps) > 0 {
  429. pro["results"] = maps
  430. }
  431. //qu.Debug("pro---", pro)
  432. MgoSaveCache <- pro
  433. }(tmp)
  434. tmp = make(map[string]interface{})
  435. }
  436. wg.Wait()
  437. log.Println("Run Over...Count:", sum)
  438. }*/