task.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. package main
  2. import (
  3. "context"
  4. "data_project_information/config"
  5. "encoding/json"
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/gogf/gf/v2/util/gconv"
  11. es "github.com/olivere/elastic/v7"
  12. "go.mongodb.org/mongo-driver/bson"
  13. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  16. )
  17. //var piciLock sync.Mutex
  18. func task() {
  19. client := Es.GetEsConn()
  20. defer Es.DestoryEsConn(client)
  21. wg := &sync.WaitGroup{}
  22. tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200}
  23. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form") // 查询字段
  24. for i, tm := range tf {
  25. query := es.NewBoolQuery().
  26. //Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
  27. Must(es.NewExistsQuery("property_form"))
  28. if i == 0 {
  29. query.Must(es.NewRangeQuery("comeintime").Lte(tm))
  30. } else if i == 1 {
  31. query.Must(es.NewRangeQuery("comeintime").Gte(tf[0]).Lte(tm))
  32. } else if i == 2 {
  33. query.Must(es.NewRangeQuery("comeintime").Gte(tf[1]).Lte(tm))
  34. } else if i == 3 {
  35. query.Must(es.NewRangeQuery("comeintime").Gte(tf[2]).Lte(tm))
  36. } else {
  37. query.Must(es.NewRangeQuery("comeintime").Lte(tf[3]))
  38. }
  39. util.Debug(fmt.Sprintf("第%d次查询,数据量为:%d", i+1, Es.Count("bidding", query)))
  40. countDocs := 0
  41. res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  42. if err == nil {
  43. taskInfoA(res, wg, &countDocs)
  44. scrollId := res.ScrollId
  45. for {
  46. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  47. if err != nil {
  48. util.Debug("Es Search Data Error:", err.Error())
  49. break
  50. }
  51. taskInfoA(searchResult, wg, &countDocs)
  52. scrollId = searchResult.ScrollId
  53. }
  54. wg.Wait()
  55. util.Debug(fmt.Sprintf("第%d次处理结束,处理文档%d条", i+1, countDocs))
  56. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  57. } else {
  58. util.Debug(err)
  59. }
  60. }
  61. }
  62. func taskOld() {
  63. query := es.NewBoolQuery().
  64. Should(es.NewExistsQuery("tag_topinformation"), es.NewExistsQuery("tag_topinformation_ai")).
  65. MinimumNumberShouldMatch(1)
  66. query.Must(es.NewRangeQuery("pici").Gte(1735801800).Lte(1741244975))
  67. util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding", query)))
  68. client := Es.GetEsConn()
  69. defer Es.DestoryEsConn(client)
  70. wg := &sync.WaitGroup{}
  71. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段
  72. countDocs := 0
  73. res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  74. if err == nil {
  75. taskInfoA(res, wg, &countDocs)
  76. scrollId := res.ScrollId
  77. for {
  78. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  79. if err != nil {
  80. util.Debug("Es Search Data Error:", err.Error())
  81. break
  82. }
  83. taskInfoA(searchResult, wg, &countDocs)
  84. scrollId = searchResult.ScrollId
  85. }
  86. wg.Wait()
  87. util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs))
  88. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  89. } else {
  90. util.Debug(err)
  91. }
  92. }
  93. func taskAdd() {
  94. client := Es.GetEsConn()
  95. defer Es.DestoryEsConn(client)
  96. wg := &sync.WaitGroup{}
  97. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段
  98. query := es.NewBoolQuery().
  99. // Should(es.NewExistsQuery("tag_topinformation")).
  100. // Should(es.NewExistsQuery("tag_topinformation_ai")).
  101. Should(es.NewExistsQuery("tag_topinformation"), es.NewExistsQuery("tag_topinformation_ai")).
  102. MinimumNumberShouldMatch(1)
  103. pici := time.Now().Unix()
  104. start := pici - 3600*2
  105. end := pici - 3600
  106. util.Debug(start, end)
  107. //if config.Conf.Serve.Pici > 0 {
  108. //query.Must(es.NewRangeQuery("pici").Gte(config.Conf.Serve.Pici).Lte(config.Conf.Serve.Pici))
  109. //}
  110. query.Must(es.NewRangeQuery("pici").Gte(start).Lte(end))
  111. //query.Must(es.NewTermQuery("_id", "673fc96ab25c3e1deb743862"))
  112. util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding", query)))
  113. countDocs := 0
  114. res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  115. if err == nil {
  116. taskInfoA(res, wg, &countDocs)
  117. scrollId := res.ScrollId
  118. for {
  119. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  120. if err != nil {
  121. util.Debug("Es Search Data Error:", err.Error())
  122. break
  123. }
  124. taskInfoA(searchResult, wg, &countDocs)
  125. scrollId = searchResult.ScrollId
  126. }
  127. wg.Wait()
  128. util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs))
  129. util.Debug(config.Conf.Serve.Pici)
  130. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  131. } else {
  132. util.Debug(err)
  133. }
  134. }
  135. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  136. chd := make(chan bool, 5)
  137. for _, hit := range searchResult.Hits.Hits {
  138. //开始处理数据
  139. wg.Add(1)
  140. chd <- true
  141. go func(tmpHit *es.SearchHit) {
  142. defer func() {
  143. <-chd
  144. wg.Done()
  145. }()
  146. tmp := make(map[string]interface{})
  147. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  148. //piciLock.Lock()
  149. //if util.Int64All(tmp["pici"]) > config.Conf.Serve.Pici {
  150. // config.Conf.Serve.Pici = util.Int64All(tmp["pici"])
  151. //}
  152. //piciLock.Unlock()
  153. update := make(map[string]interface{})
  154. if tmp["tag_topinformation"] != nil {
  155. update["tag_topinformation"] = tmp["tag_topinformation"]
  156. }
  157. if tmp["tag_subinformation"] != nil {
  158. update["tag_subinformation"] = tmp["tag_subinformation"]
  159. }
  160. //if tmp["tag_topinformation_ai"] != nil {
  161. // update["tag_topinformation_ai"] = tmp["tag_topinformation_ai"]
  162. //}
  163. if tmp["tag_subinformation_ai"] != nil {
  164. update["tag_subinformation_ai"] = tmp["tag_subinformation_ai"]
  165. }
  166. if tmp["tag_set"] != nil {
  167. update["tag_set"] = tmp["tag_set"]
  168. }
  169. if tmp["property_form"] != nil {
  170. update["property_form"] = tmp["property_form"]
  171. }
  172. tag_topinformationArr := gconv.Strings(tmp["tag_topinformation"])
  173. tag_topinformation_aiArr := gconv.Strings(tmp["tag_topinformation_ai"])
  174. //数据合并去重
  175. tag_topinformation := uniqueMerge(tag_topinformation_aiArr, tag_topinformationArr)
  176. if len(tag_topinformation) > 0 {
  177. update["tag_topinformation"] = tag_topinformation
  178. }
  179. //没有标签时
  180. //if tmp["tag_topinformation"] == nil {
  181. //
  182. //updatePool <- []map[string]interface{}{
  183. // {"ids": tmpHit.Id},
  184. // {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id},
  185. // "$unset": bson.M{"tag_topinformation": 1, "tag_subinformation": 1}},
  186. //}
  187. ////因为项目删除了tag_topinformation,tag_subinformation字段,所以索引也同步删除
  188. //pid := getPidByIds(tmpHit.Id)
  189. //if pid != "" {
  190. // update["tag_topinformation"] = []string{}
  191. // update["tag_subinformation"] = []string{}
  192. // updateEsPool <- []map[string]interface{}{
  193. // {"_id": pid},
  194. // update,
  195. // }
  196. //}
  197. //} else {
  198. if len(update) > 0 {
  199. //fmt.Println(tmpHit.Id, update)
  200. updatePool <- []map[string]interface{}{
  201. {"ids": tmpHit.Id},
  202. {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id}},
  203. }
  204. //修改es
  205. pid := getPidByIds(tmpHit.Id)
  206. if pid != "" {
  207. update["id"] = pid
  208. updateEsPool <- []map[string]interface{}{
  209. {"_id": pid},
  210. update,
  211. }
  212. }
  213. }
  214. //}
  215. }
  216. }(hit)
  217. *countDocs += 1
  218. if *countDocs%10000 == 0 {
  219. util.Debug("Current:", *countDocs)
  220. }
  221. }
  222. }
  223. func taskT() {
  224. sess := Mgo.GetMgoConn()
  225. defer Mgo.DestoryMongoConn(sess)
  226. ch := make(chan bool, 5)
  227. wg := &sync.WaitGroup{}
  228. q := bson.M{"firsttime": bson.M{"$gte": 1711360551}}
  229. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter()
  230. count := 0
  231. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  232. if count%20000 == 0 {
  233. log.Info(fmt.Sprintf("current --- %d", count))
  234. }
  235. ch <- true
  236. wg.Add(1)
  237. //
  238. go func(tmp map[string]interface{}) {
  239. defer func() {
  240. <-ch
  241. wg.Done()
  242. }()
  243. if tmp["tag_topinformation"] != nil && tmp["tag_subinformation"] != nil {
  244. taskinfotA(tmp)
  245. }
  246. }(tmp)
  247. tmp = make(map[string]interface{})
  248. }
  249. wg.Wait()
  250. log.Info(fmt.Sprintf("over --- %d", count))
  251. }
  252. var YeTai = map[string]string{
  253. "21": "住宅",
  254. "22": "政府办公楼",
  255. "23": "学校",
  256. "24": "医院",
  257. "25": "产业园区",
  258. "26": "旅游景区",
  259. "27": "交通运输",
  260. "28": "商务办公楼",
  261. "29": "酒店",
  262. }
  263. var ZhouQi = map[int]string{
  264. 11: "1年以下",
  265. 12: "1年",
  266. 13: "2年",
  267. 14: "3年",
  268. 15: "5年",
  269. 16: "其他",
  270. }
  271. func taskinfotA(tmp map[string]interface{}) {
  272. save := make(map[string]interface{})
  273. pid := mongodb.BsonIdToSId(tmp["_id"])
  274. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  275. save["source_id"] = pid
  276. save["project_name"] = util.ObjToString(tmp["projectname"])
  277. save["area"] = tmp["area"]
  278. save["city"] = tmp["city"]
  279. save["district"] = tmp["district"]
  280. save["budget"] = tmp["budget"]
  281. save["bidamount"] = tmp["bidamount"]
  282. save["first_url"] = fmt.Sprintf("https://www.jianyu360.cn/article/content/%s.html", util.CommonEncodeArticle("content", ids[0]))
  283. save["business_label"] = getStr(util.ObjToString(tmp["buyerclass"]))
  284. //bt := util.ObjToString(tmp["bidtype"])
  285. bs := util.ObjToString(tmp["bidstatus"])
  286. if bs == "中标" || bs == "成交" || bs == "合同" {
  287. save["bidstatus"] = 1
  288. } else if bs == "废标" || bs == "流标" {
  289. save["bidstatus"] = 0
  290. } else {
  291. save["bidstatus"] = 2
  292. }
  293. if tmp["firsttime"] != nil && util.IntAll(tmp["firsttime"]) > 0 {
  294. t := util.Int64All(tmp["firsttime"])
  295. save["publish_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  296. }
  297. if util.IntAll(save["bidstatus"]) == 1 && util.IntAll(tmp["jgtime"]) > 0 {
  298. t := util.Int64All(tmp["jgtime"])
  299. save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  300. }
  301. // 补充结果时间
  302. //if util.Int64All(save["result_time"]) <= 0 && util.IntAll(save["bidstatus"]) == 1 {
  303. // t := util.Int64All(tmp["lasttime"])
  304. // if t > 0 {
  305. // save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  306. // }
  307. //}
  308. save["create_time"] = time.Now().Format(util.Date_Full_Layout)
  309. save["update_time"] = time.Now().Format(util.Date_Full_Layout)
  310. for _, id := range ids {
  311. if info := CkhTool.FindOne("information", bson.M{"datajson_id": id}, "starttime,endtime,datajson_expiredate", ""); info != nil && len(*info) > 0 {
  312. if (*info)["starttime"] != nil && util.IntAll((*info)["starttime"]) > 0 {
  313. t := util.Int64All((*info)["starttime"])
  314. save["contract_cycle_start_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  315. }
  316. if (*info)["datajson_expiredate"] != nil && util.IntAll((*info)["datajson_expiredate"]) > 0 {
  317. t := util.Int64All((*info)["datajson_expiredate"])
  318. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  319. }
  320. if (*info)["endtime"] != nil && util.IntAll((*info)["endtime"]) > 0 && save["contract_cycle_end_time"] == nil {
  321. t := util.Int64All((*info)["endtime"])
  322. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  323. }
  324. break
  325. }
  326. }
  327. // 行业标签
  328. v1 := util.ObjArrToStringArr(tmp["tag_subinformation"].([]interface{}))
  329. for _, sub := range v1 {
  330. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": sub, "stype": 1})
  331. }
  332. save["industry_label"] = strings.Join(v1, ",")
  333. // 物业业态
  334. if tagset, ok := tmp["tag_set"].(map[string]interface{}); ok {
  335. if wuye, o1 := tagset["wuye"].(map[string]interface{}); o1 {
  336. if yt := util.ObjToString(wuye["property_form"]); yt != "" {
  337. var v2 []string
  338. for _, s := range strings.Split(yt, ",") {
  339. if s1 := YeTai[s]; s1 != "" {
  340. v2 = append(v2, s1)
  341. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": s1, "stype": 2})
  342. }
  343. }
  344. save["activities_label"] = strings.Join(v2, ",")
  345. }
  346. if p := util.IntAll(wuye["period"]); p != 0 && ZhouQi[p] != "" {
  347. save["contract_cycle"] = ZhouQi[p]
  348. if util.IntAll(save["bidstatus"]) == 1 {
  349. if util.IntAll(tmp["jgtime"]) > 0 {
  350. if et := getDate(p, util.Int64All(tmp["jgtime"])); et > 0 {
  351. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  352. }
  353. } else if t := getDate(p, util.Int64All(tmp["lasttime"])); t > 0 {
  354. if t > 0 {
  355. save["next_bid_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  356. }
  357. }
  358. }
  359. }
  360. }
  361. }
  362. // 2023年之后的数据
  363. if util.IntAll(save["bidstatus"]) == 1 {
  364. if save["next_bid_time"] == nil && util.Int64All(tmp["jgtime"]) > 1672502400 {
  365. st, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_start_time"]))
  366. et, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_end_time"]))
  367. if t := et.Unix() - st.Unix(); t < 90*24*60*60 {
  368. if tmp["jgtime"] != nil && util.IntAll(tmp["jgtime"]) > 0 {
  369. if et1 := getDate(12, util.Int64All(tmp["jgtime"])); et1 > 0 {
  370. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  371. }
  372. } else {
  373. if t1 := util.Int64All(tmp["lasttime"]); t1 > 0 {
  374. if et1 := getDate(12, util.Int64All(tmp["lasttime"])); et1 > 0 {
  375. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  376. }
  377. }
  378. }
  379. } else {
  380. if t1 := util.Int64All(tmp["jgtime"]); t1 > 0 {
  381. et1 := t1 + t
  382. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  383. } else {
  384. if t2 := util.Int64All(tmp["lasttime"]); t2 > 0 {
  385. et1 := t2 + t
  386. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  387. }
  388. }
  389. }
  390. }
  391. // 2023年之后的数据,无合约周期默认 1年
  392. if save["next_bid_time"] == nil && util.Int64All(save["jgtime"]) > 1672502400 {
  393. if et := getDate(12, util.Int64All(tmp["jgtime"])); et > 0 {
  394. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  395. }
  396. //else {
  397. // if t := util.Int64All(tmp["lasttime"]); t > 0 {
  398. // if et := getDate(12, util.Int64All(tmp["lasttime"])); et > 0 {
  399. // save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  400. // }
  401. // }
  402. //}
  403. }
  404. }
  405. // 中标单位
  406. for _, w := range strings.Split(util.ObjToString(tmp["s_winner"]), ",") {
  407. if w != "" {
  408. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": w, "stype": 3})
  409. }
  410. }
  411. save["winner"] = util.ObjToString(tmp["s_winner"])
  412. savePool <- save
  413. }
  414. func getStr(b string) string {
  415. if b == "" {
  416. return ""
  417. }
  418. a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
  419. a2 := "(卫健委|医疗)"
  420. a3 := "(教育|学校)"
  421. a4 := "(人行|金融业)"
  422. a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)"
  423. if strings.Contains(a1, b) {
  424. return "政府机构"
  425. } else if strings.Contains(a2, b) {
  426. return "医疗单位"
  427. } else if strings.Contains(a3, b) {
  428. return "教育单位"
  429. } else if strings.Contains(a4, b) {
  430. return "金融企业"
  431. } else if strings.Contains(a5, b) {
  432. return "商业公司"
  433. }
  434. return ""
  435. }
  436. func getDate(p int, stime int64) int64 {
  437. if p == 11 {
  438. } else if p == 12 {
  439. return stime + (365 * 24 * 60 * 60)
  440. } else if p == 13 {
  441. return stime + (365 * 24 * 60 * 60 * 2)
  442. } else if p == 14 {
  443. return stime + (365 * 24 * 60 * 60 * 3)
  444. } else if p == 15 {
  445. return stime + (365 * 24 * 60 * 60 * 5)
  446. }
  447. return 0
  448. }
  449. // 根据ids获取项目id
  450. func getPidByIds(ids string) string {
  451. data, ok := Mgo.FindOne(config.Conf.DB.Mongo.Coll, map[string]interface{}{"ids": ids})
  452. if ok && data != nil && len(*data) > 0 {
  453. return mongodb.BsonIdToSId((*data)["_id"])
  454. }
  455. return ""
  456. }
  457. func uniqueMerge(arr1, arr2 []string) []string {
  458. uniqueMap := make(map[string]bool)
  459. result := []string{}
  460. // 合并并去重第一个数组
  461. for _, item := range arr1 {
  462. if _, found := uniqueMap[item]; !found {
  463. uniqueMap[item] = true
  464. result = append(result, item)
  465. }
  466. }
  467. // 合并并去重第二个数组
  468. for _, item := range arr2 {
  469. if _, found := uniqueMap[item]; !found {
  470. uniqueMap[item] = true
  471. result = append(result, item)
  472. }
  473. }
  474. return result
  475. }