task.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package main
  2. import (
  3. "context"
  4. "data_project_information/config"
  5. "encoding/json"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. "go.mongodb.org/mongo-driver/bson"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var piciLock sync.Mutex
  17. func task() {
  18. client := Es.GetEsConn()
  19. defer Es.DestoryEsConn(client)
  20. wg := &sync.WaitGroup{}
  21. tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200}
  22. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form") // 查询字段
  23. for i, tm := range tf {
  24. query := es.NewBoolQuery().
  25. //Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
  26. Must(es.NewExistsQuery("property_form"))
  27. if i == 0 {
  28. query.Must(es.NewRangeQuery("comeintime").Lte(tm))
  29. } else if i == 1 {
  30. query.Must(es.NewRangeQuery("comeintime").Gte(tf[0]).Lte(tm))
  31. } else if i == 2 {
  32. query.Must(es.NewRangeQuery("comeintime").Gte(tf[1]).Lte(tm))
  33. } else if i == 3 {
  34. query.Must(es.NewRangeQuery("comeintime").Gte(tf[2]).Lte(tm))
  35. } else {
  36. query.Must(es.NewRangeQuery("comeintime").Lte(tf[3]))
  37. }
  38. util.Debug(fmt.Sprintf("第%d次查询,数据量为:%d", i+1, Es.Count("bidding", query)))
  39. countDocs := 0
  40. res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  41. if err == nil {
  42. taskInfoA(res, wg, &countDocs)
  43. scrollId := res.ScrollId
  44. for {
  45. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  46. if err != nil {
  47. util.Debug("Es Search Data Error:", err.Error())
  48. break
  49. }
  50. taskInfoA(searchResult, wg, &countDocs)
  51. scrollId = searchResult.ScrollId
  52. }
  53. wg.Wait()
  54. util.Debug(fmt.Sprintf("第%d次处理结束,处理文档%d条", i+1, countDocs))
  55. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  56. } else {
  57. util.Debug(err)
  58. }
  59. }
  60. }
  61. func taskAdd() {
  62. client := Es.GetEsConn()
  63. defer Es.DestoryEsConn(client)
  64. wg := &sync.WaitGroup{}
  65. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段
  66. query := es.NewBoolQuery().
  67. Should(es.NewTermQuery("tag_topinformation", "情报_物业")).
  68. Should(es.NewTermQuery("tag_topinformation_ai", "情报_物业")).MinimumShouldMatch("1")
  69. //Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
  70. //Must(es.NewExistsQuery("property_form"))
  71. if config.Conf.Serve.Pici > 0 {
  72. query.Must(es.NewRangeQuery("pici").Gte(config.Conf.Serve.Pici))
  73. }
  74. util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding", query)))
  75. countDocs := 0
  76. res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  77. if err == nil {
  78. taskInfoA(res, wg, &countDocs)
  79. scrollId := res.ScrollId
  80. for {
  81. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  82. if err != nil {
  83. util.Debug("Es Search Data Error:", err.Error())
  84. break
  85. }
  86. taskInfoA(searchResult, wg, &countDocs)
  87. scrollId = searchResult.ScrollId
  88. }
  89. wg.Wait()
  90. util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs))
  91. util.Debug(config.Conf.Serve.Pici)
  92. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  93. } else {
  94. util.Debug(err)
  95. }
  96. }
  97. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  98. chd := make(chan bool, 5)
  99. for _, hit := range searchResult.Hits.Hits {
  100. //开始处理数据
  101. wg.Add(1)
  102. chd <- true
  103. go func(tmpHit *es.SearchHit) {
  104. defer func() {
  105. <-chd
  106. wg.Done()
  107. }()
  108. tmp := make(map[string]interface{})
  109. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  110. piciLock.Lock()
  111. if util.Int64All(tmp["pici"]) > config.Conf.Serve.Pici {
  112. config.Conf.Serve.Pici = util.Int64All(tmp["pici"])
  113. }
  114. piciLock.Unlock()
  115. update := make(map[string]interface{})
  116. if tmp["tag_topinformation"] != nil {
  117. update["tag_topinformation"] = tmp["tag_topinformation"]
  118. }
  119. if tmp["tag_subinformation"] != nil {
  120. update["tag_subinformation"] = tmp["tag_subinformation"]
  121. }
  122. if tmp["tag_topinformation_ai"] != nil {
  123. update["tag_topinformation"] = tmp["tag_topinformation_ai"]
  124. }
  125. if tmp["tag_subinformation_ai"] != nil {
  126. update["tag_subinformation_ai"] = tmp["tag_subinformation_ai"]
  127. }
  128. if tmp["tag_set"] != nil {
  129. update["tag_set"] = tmp["tag_set"]
  130. }
  131. if tmp["property_form"] != nil {
  132. update["property_form"] = tmp["property_form"]
  133. }
  134. //updatePool <- []map[string]interface{}{
  135. // {"ids": tmpHit.Id},
  136. // {"$set": update},
  137. //}
  138. //if tmp["tag_topinformation"] == nil {
  139. // updatePool <- []map[string]interface{}{
  140. // {"ids": tmpHit.Id},
  141. // {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id},
  142. // "$unset": bson.M{"tag_topinformation": 1, "tag_subinformation": 1}},
  143. // }
  144. //} else {
  145. // updatePool <- []map[string]interface{}{
  146. // {"ids": tmpHit.Id},
  147. // {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id}},
  148. // }
  149. //}
  150. }
  151. }(hit)
  152. *countDocs += 1
  153. if *countDocs%10000 == 0 {
  154. util.Debug("Current:", *countDocs)
  155. }
  156. }
  157. }
  158. func taskT() {
  159. sess := Mgo.GetMgoConn()
  160. defer Mgo.DestoryMongoConn(sess)
  161. ch := make(chan bool, 5)
  162. wg := &sync.WaitGroup{}
  163. q := bson.M{"firsttime": bson.M{"$gte": 1711360551}}
  164. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter()
  165. count := 0
  166. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  167. if count%20000 == 0 {
  168. log.Info(fmt.Sprintf("current --- %d", count))
  169. }
  170. ch <- true
  171. wg.Add(1)
  172. go func(tmp map[string]interface{}) {
  173. defer func() {
  174. <-ch
  175. wg.Done()
  176. }()
  177. if tmp["tag_topinformation"] != nil && tmp["tag_subinformation"] != nil {
  178. taskinfotA(tmp)
  179. }
  180. }(tmp)
  181. tmp = make(map[string]interface{})
  182. }
  183. wg.Wait()
  184. log.Info(fmt.Sprintf("over --- %d", count))
  185. }
  186. var YeTai = map[string]string{
  187. "21": "住宅",
  188. "22": "政府办公楼",
  189. "23": "学校",
  190. "24": "医院",
  191. "25": "产业园区",
  192. "26": "旅游景区",
  193. "27": "交通运输",
  194. "28": "商务办公楼",
  195. "29": "酒店",
  196. }
  197. var ZhouQi = map[int]string{
  198. 11: "1年以下",
  199. 12: "1年",
  200. 13: "2年",
  201. 14: "3年",
  202. 15: "5年",
  203. 16: "其他",
  204. }
  205. func taskinfotA(tmp map[string]interface{}) {
  206. save := make(map[string]interface{})
  207. pid := mongodb.BsonIdToSId(tmp["_id"])
  208. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  209. save["source_id"] = pid
  210. save["project_name"] = util.ObjToString(tmp["projectname"])
  211. save["area"] = tmp["area"]
  212. save["city"] = tmp["city"]
  213. save["district"] = tmp["district"]
  214. save["budget"] = tmp["budget"]
  215. save["bidamount"] = tmp["bidamount"]
  216. save["first_url"] = fmt.Sprintf("https://www.jianyu360.cn/article/content/%s.html", util.CommonEncodeArticle("content", ids[0]))
  217. save["business_label"] = getStr(util.ObjToString(tmp["buyerclass"]))
  218. //bt := util.ObjToString(tmp["bidtype"])
  219. bs := util.ObjToString(tmp["bidstatus"])
  220. if bs == "中标" || bs == "成交" || bs == "合同" {
  221. save["bidstatus"] = 1
  222. } else if bs == "废标" || bs == "流标" {
  223. save["bidstatus"] = 0
  224. } else {
  225. save["bidstatus"] = 2
  226. }
  227. if tmp["firsttime"] != nil && util.IntAll(tmp["firsttime"]) > 0 {
  228. t := util.Int64All(tmp["firsttime"])
  229. save["publish_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  230. }
  231. if util.IntAll(save["bidstatus"]) == 1 && util.IntAll(tmp["jgtime"]) > 0 {
  232. t := util.Int64All(tmp["jgtime"])
  233. save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  234. }
  235. // 补充结果时间
  236. //if util.Int64All(save["result_time"]) <= 0 && util.IntAll(save["bidstatus"]) == 1 {
  237. // t := util.Int64All(tmp["lasttime"])
  238. // if t > 0 {
  239. // save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  240. // }
  241. //}
  242. save["create_time"] = time.Now().Format(util.Date_Full_Layout)
  243. save["update_time"] = time.Now().Format(util.Date_Full_Layout)
  244. for _, id := range ids {
  245. if info := CkhTool.FindOne("information", bson.M{"datajson_id": id}, "starttime,endtime,datajson_expiredate", ""); info != nil && len(*info) > 0 {
  246. if (*info)["starttime"] != nil && util.IntAll((*info)["starttime"]) > 0 {
  247. t := util.Int64All((*info)["starttime"])
  248. save["contract_cycle_start_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  249. }
  250. if (*info)["datajson_expiredate"] != nil && util.IntAll((*info)["datajson_expiredate"]) > 0 {
  251. t := util.Int64All((*info)["datajson_expiredate"])
  252. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  253. }
  254. if (*info)["endtime"] != nil && util.IntAll((*info)["endtime"]) > 0 && save["contract_cycle_end_time"] == nil {
  255. t := util.Int64All((*info)["endtime"])
  256. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  257. }
  258. break
  259. }
  260. }
  261. // 行业标签
  262. v1 := util.ObjArrToStringArr(tmp["tag_subinformation"].([]interface{}))
  263. for _, sub := range v1 {
  264. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": sub, "stype": 1})
  265. }
  266. save["industry_label"] = strings.Join(v1, ",")
  267. // 物业业态
  268. if tagset, ok := tmp["tag_set"].(map[string]interface{}); ok {
  269. if wuye, o1 := tagset["wuye"].(map[string]interface{}); o1 {
  270. if yt := util.ObjToString(wuye["property_form"]); yt != "" {
  271. var v2 []string
  272. for _, s := range strings.Split(yt, ",") {
  273. if s1 := YeTai[s]; s1 != "" {
  274. v2 = append(v2, s1)
  275. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": s1, "stype": 2})
  276. }
  277. }
  278. save["activities_label"] = strings.Join(v2, ",")
  279. }
  280. if p := util.IntAll(wuye["period"]); p != 0 && ZhouQi[p] != "" {
  281. save["contract_cycle"] = ZhouQi[p]
  282. if util.IntAll(save["bidstatus"]) == 1 {
  283. if util.IntAll(tmp["jgtime"]) > 0 {
  284. if et := getDate(p, util.Int64All(tmp["jgtime"])); et > 0 {
  285. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  286. }
  287. } else if t := getDate(p, util.Int64All(tmp["lasttime"])); t > 0 {
  288. if t > 0 {
  289. save["next_bid_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  290. }
  291. }
  292. }
  293. }
  294. }
  295. }
  296. // 2023年之后的数据
  297. if util.IntAll(save["bidstatus"]) == 1 {
  298. if save["next_bid_time"] == nil && util.Int64All(tmp["jgtime"]) > 1672502400 {
  299. st, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_start_time"]))
  300. et, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_end_time"]))
  301. if t := et.Unix() - st.Unix(); t < 90*24*60*60 {
  302. if tmp["jgtime"] != nil && util.IntAll(tmp["jgtime"]) > 0 {
  303. if et1 := getDate(12, util.Int64All(tmp["jgtime"])); et1 > 0 {
  304. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  305. }
  306. } else {
  307. if t1 := util.Int64All(tmp["lasttime"]); t1 > 0 {
  308. if et1 := getDate(12, util.Int64All(tmp["lasttime"])); et1 > 0 {
  309. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  310. }
  311. }
  312. }
  313. } else {
  314. if t1 := util.Int64All(tmp["jgtime"]); t1 > 0 {
  315. et1 := t1 + t
  316. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  317. } else {
  318. if t2 := util.Int64All(tmp["lasttime"]); t2 > 0 {
  319. et1 := t2 + t
  320. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  321. }
  322. }
  323. }
  324. }
  325. // 2023年之后的数据,无合约周期默认 1年
  326. if save["next_bid_time"] == nil && util.Int64All(save["jgtime"]) > 1672502400 {
  327. if et := getDate(12, util.Int64All(tmp["jgtime"])); et > 0 {
  328. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  329. }
  330. //else {
  331. // if t := util.Int64All(tmp["lasttime"]); t > 0 {
  332. // if et := getDate(12, util.Int64All(tmp["lasttime"])); et > 0 {
  333. // save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  334. // }
  335. // }
  336. //}
  337. }
  338. }
  339. // 中标单位
  340. for _, w := range strings.Split(util.ObjToString(tmp["s_winner"]), ",") {
  341. if w != "" {
  342. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": w, "stype": 3})
  343. }
  344. }
  345. save["winner"] = util.ObjToString(tmp["s_winner"])
  346. savePool <- save
  347. }
  348. func getStr(b string) string {
  349. if b == "" {
  350. return ""
  351. }
  352. a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
  353. a2 := "(卫健委|医疗)"
  354. a3 := "(教育|学校)"
  355. a4 := "(人行|金融业)"
  356. a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)"
  357. if strings.Contains(a1, b) {
  358. return "政府机构"
  359. } else if strings.Contains(a2, b) {
  360. return "医疗单位"
  361. } else if strings.Contains(a3, b) {
  362. return "教育单位"
  363. } else if strings.Contains(a4, b) {
  364. return "金融企业"
  365. } else if strings.Contains(a5, b) {
  366. return "商业公司"
  367. }
  368. return ""
  369. }
  370. func getDate(p int, stime int64) int64 {
  371. if p == 11 {
  372. } else if p == 12 {
  373. return stime + (365 * 24 * 60 * 60)
  374. } else if p == 13 {
  375. return stime + (365 * 24 * 60 * 60 * 2)
  376. } else if p == 14 {
  377. return stime + (365 * 24 * 60 * 60 * 3)
  378. } else if p == 15 {
  379. return stime + (365 * 24 * 60 * 60 * 5)
  380. }
  381. return 0
  382. }