task.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. package main
  2. import (
  3. "context"
  4. "data_project_information/config"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/gogf/gf/v2/util/gconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. es "github.com/olivere/elastic/v7"
  12. "go.mongodb.org/mongo-driver/bson"
  13. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  15. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  16. )
  17. //var piciLock sync.Mutex
  18. func task() {
  19. client := Es.GetEsConn()
  20. defer Es.DestoryEsConn(client)
  21. wg := &sync.WaitGroup{}
  22. tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200}
  23. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form") // 查询字段
  24. for i, tm := range tf {
  25. query := es.NewBoolQuery().
  26. //Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
  27. Must(es.NewExistsQuery("property_form"))
  28. if i == 0 {
  29. query.Must(es.NewRangeQuery("comeintime").Lte(tm))
  30. } else if i == 1 {
  31. query.Must(es.NewRangeQuery("comeintime").Gte(tf[0]).Lte(tm))
  32. } else if i == 2 {
  33. query.Must(es.NewRangeQuery("comeintime").Gte(tf[1]).Lte(tm))
  34. } else if i == 3 {
  35. query.Must(es.NewRangeQuery("comeintime").Gte(tf[2]).Lte(tm))
  36. } else {
  37. query.Must(es.NewRangeQuery("comeintime").Lte(tf[3]))
  38. }
  39. util.Debug(fmt.Sprintf("第%d次查询,数据量为:%d", i+1, Es.Count("bidding", query)))
  40. countDocs := 0
  41. res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  42. if err == nil {
  43. taskInfoA(res, wg, &countDocs)
  44. scrollId := res.ScrollId
  45. for {
  46. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  47. if err != nil {
  48. util.Debug("Es Search Data Error:", err.Error())
  49. break
  50. }
  51. taskInfoA(searchResult, wg, &countDocs)
  52. scrollId = searchResult.ScrollId
  53. }
  54. wg.Wait()
  55. util.Debug(fmt.Sprintf("第%d次处理结束,处理文档%d条", i+1, countDocs))
  56. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  57. } else {
  58. util.Debug(err)
  59. }
  60. }
  61. }
  62. func taskAdd() {
  63. client := Es.GetEsConn()
  64. defer Es.DestoryEsConn(client)
  65. wg := &sync.WaitGroup{}
  66. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段
  67. query := es.NewBoolQuery().
  68. // Should(es.NewExistsQuery("tag_topinformation")).
  69. // Should(es.NewExistsQuery("tag_topinformation_ai")).
  70. Should(es.NewExistsQuery("tag_topinformation"), es.NewExistsQuery("tag_topinformation_ai")).
  71. MinimumNumberShouldMatch(1)
  72. pici := time.Now().Unix()
  73. start := pici - 3600*2
  74. end := pici - 3600
  75. util.Debug(start, end)
  76. //if config.Conf.Serve.Pici > 0 {
  77. //query.Must(es.NewRangeQuery("pici").Gte(config.Conf.Serve.Pici).Lte(config.Conf.Serve.Pici))
  78. //}
  79. query.Must(es.NewRangeQuery("pici").Gte(start).Lte(end))
  80. //query.Must(es.NewTermQuery("_id", "673fc96ab25c3e1deb743862"))
  81. util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding_ai", query)))
  82. countDocs := 0
  83. res, err := client.Scroll().Index("bidding_ai").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  84. if err == nil {
  85. taskInfoA(res, wg, &countDocs)
  86. scrollId := res.ScrollId
  87. for {
  88. searchResult, err := client.Scroll("1m").Index("bidding_ai").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  89. if err != nil {
  90. util.Debug("Es Search Data Error:", err.Error())
  91. break
  92. }
  93. taskInfoA(searchResult, wg, &countDocs)
  94. scrollId = searchResult.ScrollId
  95. }
  96. wg.Wait()
  97. util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs))
  98. util.Debug(config.Conf.Serve.Pici)
  99. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  100. } else {
  101. util.Debug(err)
  102. }
  103. }
  104. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  105. chd := make(chan bool, 5)
  106. for _, hit := range searchResult.Hits.Hits {
  107. //开始处理数据
  108. wg.Add(1)
  109. chd <- true
  110. go func(tmpHit *es.SearchHit) {
  111. defer func() {
  112. <-chd
  113. wg.Done()
  114. }()
  115. tmp := make(map[string]interface{})
  116. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  117. //piciLock.Lock()
  118. //if util.Int64All(tmp["pici"]) > config.Conf.Serve.Pici {
  119. // config.Conf.Serve.Pici = util.Int64All(tmp["pici"])
  120. //}
  121. //piciLock.Unlock()
  122. update := make(map[string]interface{})
  123. if tmp["tag_topinformation"] != nil {
  124. update["tag_topinformation"] = tmp["tag_topinformation"]
  125. }
  126. if tmp["tag_subinformation"] != nil {
  127. update["tag_subinformation"] = tmp["tag_subinformation"]
  128. }
  129. //if tmp["tag_topinformation_ai"] != nil {
  130. // update["tag_topinformation_ai"] = tmp["tag_topinformation_ai"]
  131. //}
  132. if tmp["tag_subinformation_ai"] != nil {
  133. update["tag_subinformation_ai"] = tmp["tag_subinformation_ai"]
  134. }
  135. if tmp["tag_set"] != nil {
  136. update["tag_set"] = tmp["tag_set"]
  137. }
  138. if tmp["property_form"] != nil {
  139. update["property_form"] = tmp["property_form"]
  140. }
  141. tag_topinformationArr := gconv.Strings(tmp["tag_topinformation"])
  142. tag_topinformation_aiArr := gconv.Strings(tmp["tag_topinformation_ai"])
  143. //数据合并去重
  144. tag_topinformation := uniqueMerge(tag_topinformation_aiArr, tag_topinformationArr)
  145. if len(tag_topinformation) > 0 {
  146. update["tag_topinformation"] = tag_topinformation
  147. }
  148. //没有标签时
  149. //if tmp["tag_topinformation"] == nil {
  150. //
  151. //updatePool <- []map[string]interface{}{
  152. // {"ids": tmpHit.Id},
  153. // {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id},
  154. // "$unset": bson.M{"tag_topinformation": 1, "tag_subinformation": 1}},
  155. //}
  156. ////因为项目删除了tag_topinformation,tag_subinformation字段,所以索引也同步删除
  157. //pid := getPidByIds(tmpHit.Id)
  158. //if pid != "" {
  159. // update["tag_topinformation"] = []string{}
  160. // update["tag_subinformation"] = []string{}
  161. // updateEsPool <- []map[string]interface{}{
  162. // {"_id": pid},
  163. // update,
  164. // }
  165. //}
  166. //} else {
  167. if len(update) > 0 {
  168. //fmt.Println(tmpHit.Id, update)
  169. updatePool <- []map[string]interface{}{
  170. {"ids": tmpHit.Id},
  171. {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id}},
  172. }
  173. //修改es
  174. pid := getPidByIds(tmpHit.Id)
  175. if pid != "" {
  176. update["id"] = pid
  177. updateEsPool <- []map[string]interface{}{
  178. {"_id": pid},
  179. update,
  180. }
  181. }
  182. }
  183. //}
  184. }
  185. }(hit)
  186. *countDocs += 1
  187. if *countDocs%10000 == 0 {
  188. util.Debug("Current:", *countDocs)
  189. }
  190. }
  191. }
  192. func taskT() {
  193. sess := Mgo.GetMgoConn()
  194. defer Mgo.DestoryMongoConn(sess)
  195. ch := make(chan bool, 5)
  196. wg := &sync.WaitGroup{}
  197. q := bson.M{"firsttime": bson.M{"$gte": 1711360551}}
  198. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter()
  199. count := 0
  200. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  201. if count%20000 == 0 {
  202. log.Info(fmt.Sprintf("current --- %d", count))
  203. }
  204. ch <- true
  205. wg.Add(1)
  206. //
  207. go func(tmp map[string]interface{}) {
  208. defer func() {
  209. <-ch
  210. wg.Done()
  211. }()
  212. if tmp["tag_topinformation"] != nil && tmp["tag_subinformation"] != nil {
  213. taskinfotA(tmp)
  214. }
  215. }(tmp)
  216. tmp = make(map[string]interface{})
  217. }
  218. wg.Wait()
  219. log.Info(fmt.Sprintf("over --- %d", count))
  220. }
  221. var YeTai = map[string]string{
  222. "21": "住宅",
  223. "22": "政府办公楼",
  224. "23": "学校",
  225. "24": "医院",
  226. "25": "产业园区",
  227. "26": "旅游景区",
  228. "27": "交通运输",
  229. "28": "商务办公楼",
  230. "29": "酒店",
  231. }
  232. var ZhouQi = map[int]string{
  233. 11: "1年以下",
  234. 12: "1年",
  235. 13: "2年",
  236. 14: "3年",
  237. 15: "5年",
  238. 16: "其他",
  239. }
  240. func taskinfotA(tmp map[string]interface{}) {
  241. save := make(map[string]interface{})
  242. pid := mongodb.BsonIdToSId(tmp["_id"])
  243. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  244. save["source_id"] = pid
  245. save["project_name"] = util.ObjToString(tmp["projectname"])
  246. save["area"] = tmp["area"]
  247. save["city"] = tmp["city"]
  248. save["district"] = tmp["district"]
  249. save["budget"] = tmp["budget"]
  250. save["bidamount"] = tmp["bidamount"]
  251. save["first_url"] = fmt.Sprintf("https://www.jianyu360.cn/article/content/%s.html", util.CommonEncodeArticle("content", ids[0]))
  252. save["business_label"] = getStr(util.ObjToString(tmp["buyerclass"]))
  253. //bt := util.ObjToString(tmp["bidtype"])
  254. bs := util.ObjToString(tmp["bidstatus"])
  255. if bs == "中标" || bs == "成交" || bs == "合同" {
  256. save["bidstatus"] = 1
  257. } else if bs == "废标" || bs == "流标" {
  258. save["bidstatus"] = 0
  259. } else {
  260. save["bidstatus"] = 2
  261. }
  262. if tmp["firsttime"] != nil && util.IntAll(tmp["firsttime"]) > 0 {
  263. t := util.Int64All(tmp["firsttime"])
  264. save["publish_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  265. }
  266. if util.IntAll(save["bidstatus"]) == 1 && util.IntAll(tmp["jgtime"]) > 0 {
  267. t := util.Int64All(tmp["jgtime"])
  268. save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  269. }
  270. // 补充结果时间
  271. //if util.Int64All(save["result_time"]) <= 0 && util.IntAll(save["bidstatus"]) == 1 {
  272. // t := util.Int64All(tmp["lasttime"])
  273. // if t > 0 {
  274. // save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  275. // }
  276. //}
  277. save["create_time"] = time.Now().Format(util.Date_Full_Layout)
  278. save["update_time"] = time.Now().Format(util.Date_Full_Layout)
  279. for _, id := range ids {
  280. if info := CkhTool.FindOne("information", bson.M{"datajson_id": id}, "starttime,endtime,datajson_expiredate", ""); info != nil && len(*info) > 0 {
  281. if (*info)["starttime"] != nil && util.IntAll((*info)["starttime"]) > 0 {
  282. t := util.Int64All((*info)["starttime"])
  283. save["contract_cycle_start_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  284. }
  285. if (*info)["datajson_expiredate"] != nil && util.IntAll((*info)["datajson_expiredate"]) > 0 {
  286. t := util.Int64All((*info)["datajson_expiredate"])
  287. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  288. }
  289. if (*info)["endtime"] != nil && util.IntAll((*info)["endtime"]) > 0 && save["contract_cycle_end_time"] == nil {
  290. t := util.Int64All((*info)["endtime"])
  291. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  292. }
  293. break
  294. }
  295. }
  296. // 行业标签
  297. v1 := util.ObjArrToStringArr(tmp["tag_subinformation"].([]interface{}))
  298. for _, sub := range v1 {
  299. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": sub, "stype": 1})
  300. }
  301. save["industry_label"] = strings.Join(v1, ",")
  302. // 物业业态
  303. if tagset, ok := tmp["tag_set"].(map[string]interface{}); ok {
  304. if wuye, o1 := tagset["wuye"].(map[string]interface{}); o1 {
  305. if yt := util.ObjToString(wuye["property_form"]); yt != "" {
  306. var v2 []string
  307. for _, s := range strings.Split(yt, ",") {
  308. if s1 := YeTai[s]; s1 != "" {
  309. v2 = append(v2, s1)
  310. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": s1, "stype": 2})
  311. }
  312. }
  313. save["activities_label"] = strings.Join(v2, ",")
  314. }
  315. if p := util.IntAll(wuye["period"]); p != 0 && ZhouQi[p] != "" {
  316. save["contract_cycle"] = ZhouQi[p]
  317. if util.IntAll(save["bidstatus"]) == 1 {
  318. if util.IntAll(tmp["jgtime"]) > 0 {
  319. if et := getDate(p, util.Int64All(tmp["jgtime"])); et > 0 {
  320. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  321. }
  322. } else if t := getDate(p, util.Int64All(tmp["lasttime"])); t > 0 {
  323. if t > 0 {
  324. save["next_bid_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  325. }
  326. }
  327. }
  328. }
  329. }
  330. }
  331. // 2023年之后的数据
  332. if util.IntAll(save["bidstatus"]) == 1 {
  333. if save["next_bid_time"] == nil && util.Int64All(tmp["jgtime"]) > 1672502400 {
  334. st, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_start_time"]))
  335. et, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_end_time"]))
  336. if t := et.Unix() - st.Unix(); t < 90*24*60*60 {
  337. if tmp["jgtime"] != nil && util.IntAll(tmp["jgtime"]) > 0 {
  338. if et1 := getDate(12, util.Int64All(tmp["jgtime"])); et1 > 0 {
  339. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  340. }
  341. } else {
  342. if t1 := util.Int64All(tmp["lasttime"]); t1 > 0 {
  343. if et1 := getDate(12, util.Int64All(tmp["lasttime"])); et1 > 0 {
  344. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  345. }
  346. }
  347. }
  348. } else {
  349. if t1 := util.Int64All(tmp["jgtime"]); t1 > 0 {
  350. et1 := t1 + t
  351. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  352. } else {
  353. if t2 := util.Int64All(tmp["lasttime"]); t2 > 0 {
  354. et1 := t2 + t
  355. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  356. }
  357. }
  358. }
  359. }
  360. // 2023年之后的数据,无合约周期默认 1年
  361. if save["next_bid_time"] == nil && util.Int64All(save["jgtime"]) > 1672502400 {
  362. if et := getDate(12, util.Int64All(tmp["jgtime"])); et > 0 {
  363. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  364. }
  365. //else {
  366. // if t := util.Int64All(tmp["lasttime"]); t > 0 {
  367. // if et := getDate(12, util.Int64All(tmp["lasttime"])); et > 0 {
  368. // save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  369. // }
  370. // }
  371. //}
  372. }
  373. }
  374. // 中标单位
  375. for _, w := range strings.Split(util.ObjToString(tmp["s_winner"]), ",") {
  376. if w != "" {
  377. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": w, "stype": 3})
  378. }
  379. }
  380. save["winner"] = util.ObjToString(tmp["s_winner"])
  381. savePool <- save
  382. }
  383. func getStr(b string) string {
  384. if b == "" {
  385. return ""
  386. }
  387. a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
  388. a2 := "(卫健委|医疗)"
  389. a3 := "(教育|学校)"
  390. a4 := "(人行|金融业)"
  391. a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)"
  392. if strings.Contains(a1, b) {
  393. return "政府机构"
  394. } else if strings.Contains(a2, b) {
  395. return "医疗单位"
  396. } else if strings.Contains(a3, b) {
  397. return "教育单位"
  398. } else if strings.Contains(a4, b) {
  399. return "金融企业"
  400. } else if strings.Contains(a5, b) {
  401. return "商业公司"
  402. }
  403. return ""
  404. }
  405. func getDate(p int, stime int64) int64 {
  406. if p == 11 {
  407. } else if p == 12 {
  408. return stime + (365 * 24 * 60 * 60)
  409. } else if p == 13 {
  410. return stime + (365 * 24 * 60 * 60 * 2)
  411. } else if p == 14 {
  412. return stime + (365 * 24 * 60 * 60 * 3)
  413. } else if p == 15 {
  414. return stime + (365 * 24 * 60 * 60 * 5)
  415. }
  416. return 0
  417. }
  418. // 根据ids获取项目id
  419. func getPidByIds(ids string) string {
  420. data, ok := Mgo.FindOne(config.Conf.DB.Mongo.Coll, map[string]interface{}{"ids": ids})
  421. if ok && data != nil && len(*data) > 0 {
  422. return mongodb.BsonIdToSId((*data)["_id"])
  423. }
  424. return ""
  425. }
  426. func uniqueMerge(arr1, arr2 []string) []string {
  427. uniqueMap := make(map[string]bool)
  428. result := []string{}
  429. // 合并并去重第一个数组
  430. for _, item := range arr1 {
  431. if _, found := uniqueMap[item]; !found {
  432. uniqueMap[item] = true
  433. result = append(result, item)
  434. }
  435. }
  436. // 合并并去重第二个数组
  437. for _, item := range arr2 {
  438. if _, found := uniqueMap[item]; !found {
  439. uniqueMap[item] = true
  440. result = append(result, item)
  441. }
  442. }
  443. return result
  444. }