task.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package main
  2. import (
  3. "context"
  4. "data_project_information/config"
  5. "encoding/json"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. "go.mongodb.org/mongo-driver/bson"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. func task() {
  17. client := Es.GetEsConn()
  18. defer Es.DestoryEsConn(client)
  19. wg := &sync.WaitGroup{}
  20. tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200}
  21. fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set") // 查询字段
  22. for i, tm := range tf {
  23. query := es.NewBoolQuery().
  24. Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
  25. if i == 0 {
  26. query.Must(es.NewRangeQuery("comeintime").Lte(tm))
  27. } else if i == 1 {
  28. query.Must(es.NewRangeQuery("comeintime").Gte(tf[0]).Lte(tm))
  29. } else if i == 2 {
  30. query.Must(es.NewRangeQuery("comeintime").Gte(tf[1]).Lte(tm))
  31. } else if i == 3 {
  32. query.Must(es.NewRangeQuery("comeintime").Gte(tf[2]).Lte(tm))
  33. } else {
  34. query.Must(es.NewRangeQuery("comeintime").Lte(tf[3]))
  35. }
  36. util.Debug(fmt.Sprintf("第%d次查询,数据量为:%d", i+1, Es.Count("bidding", query)))
  37. countDocs := 0
  38. res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
  39. if err == nil {
  40. taskInfoA(res, wg, &countDocs)
  41. scrollId := res.ScrollId
  42. for {
  43. searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
  44. if err != nil {
  45. util.Debug("Es Search Data Error:", err.Error())
  46. break
  47. }
  48. taskInfoA(searchResult, wg, &countDocs)
  49. scrollId = searchResult.ScrollId
  50. }
  51. wg.Wait()
  52. util.Debug(fmt.Sprintf("第%d次处理结束,处理文档%d条", i+1, countDocs))
  53. _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
  54. } else {
  55. util.Debug(err)
  56. }
  57. }
  58. }
  59. func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
  60. chd := make(chan bool, 5)
  61. for _, hit := range searchResult.Hits.Hits {
  62. //开始处理数据
  63. wg.Add(1)
  64. chd <- true
  65. go func(tmpHit *es.SearchHit) {
  66. defer func() {
  67. <-chd
  68. wg.Done()
  69. }()
  70. tmp := make(map[string]interface{})
  71. if json.Unmarshal(tmpHit.Source, &tmp) == nil {
  72. update := make(map[string]interface{})
  73. update["tag_topinformation"] = tmp["tag_topinformation"]
  74. update["tag_subinformation"] = tmp["tag_subinformation"]
  75. if tmp["tag_set"] != nil {
  76. update["tag_set"] = tmp["tag_set"]
  77. }
  78. //updatePool <- []map[string]interface{}{
  79. // {"ids": tmpHit.Id},
  80. // {"$set": update, "$push": bson.M{"tag_information_ids": tmpHit.Id}},
  81. //}
  82. updatePool <- []map[string]interface{}{
  83. {"ids": tmpHit.Id},
  84. {"$set": update},
  85. }
  86. }
  87. }(hit)
  88. *countDocs += 1
  89. if *countDocs%10000 == 0 {
  90. util.Debug("Current:", *countDocs)
  91. }
  92. }
  93. }
  94. func taskT() {
  95. sess := Mgo.GetMgoConn()
  96. defer Mgo.DestoryMongoConn(sess)
  97. ch := make(chan bool, 5)
  98. wg := &sync.WaitGroup{}
  99. q := bson.M{"firsttime": bson.M{"$gte": 1577808000}}
  100. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter()
  101. count := 0
  102. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  103. if count%20000 == 0 {
  104. log.Info(fmt.Sprintf("current --- %d", count))
  105. }
  106. ch <- true
  107. wg.Add(1)
  108. go func(tmp map[string]interface{}) {
  109. defer func() {
  110. <-ch
  111. wg.Done()
  112. }()
  113. if tmp["tag_topinformation"] != nil && tmp["tag_subinformation"] != nil {
  114. taskinfotA(tmp)
  115. }
  116. }(tmp)
  117. tmp = make(map[string]interface{})
  118. }
  119. wg.Wait()
  120. log.Info(fmt.Sprintf("over --- %d", count))
  121. }
  122. var YeTai = map[string]string{
  123. "21": "住宅",
  124. "22": "政府办公楼",
  125. "23": "学校",
  126. "24": "医院",
  127. "25": "产业园区",
  128. "26": "旅游景区",
  129. "27": "交通运输",
  130. "28": "商务办公楼",
  131. "29": "酒店",
  132. }
  133. var ZhouQi = map[int]string{
  134. 11: "1年以下",
  135. 12: "1年",
  136. 13: "2年",
  137. 14: "3年",
  138. 15: "5年",
  139. 16: "其他",
  140. }
  141. func taskinfotA(tmp map[string]interface{}) {
  142. save := make(map[string]interface{})
  143. pid := mongodb.BsonIdToSId(tmp["_id"])
  144. ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
  145. save["source_id"] = pid
  146. save["project_name"] = util.ObjToString(tmp["projectname"])
  147. save["area"] = tmp["area"]
  148. save["city"] = tmp["city"]
  149. save["district"] = tmp["district"]
  150. save["budget"] = tmp["budget"]
  151. save["bidamount"] = tmp["bidamount"]
  152. save["first_url"] = fmt.Sprintf("https://www.jianyu360.cn/article/content/%s.html", util.CommonEncodeArticle("content", ids[0]))
  153. save["business_label"] = getStr(util.ObjToString(tmp["buyerclass"]))
  154. //bt := util.ObjToString(tmp["bidtype"])
  155. bs := util.ObjToString(tmp["bidstatus"])
  156. if bs == "中标" || bs == "成交" || bs == "合同" {
  157. save["bidstatus"] = 1
  158. } else if bs == "废标" || bs == "流标" {
  159. save["bidstatus"] = 0
  160. } else {
  161. save["bidstatus"] = 2
  162. }
  163. if tmp["firsttime"] != nil && util.IntAll(tmp["firsttime"]) > 0 {
  164. t := util.Int64All(tmp["firsttime"])
  165. save["publish_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  166. }
  167. if util.IntAll(save["bidstatus"]) == 1 && util.IntAll(tmp["jgtime"]) > 0 {
  168. t := util.Int64All(tmp["jgtime"])
  169. save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  170. }
  171. // 补充结果时间
  172. //if util.Int64All(save["result_time"]) <= 0 && util.IntAll(save["bidstatus"]) == 1 {
  173. // t := util.Int64All(tmp["lasttime"])
  174. // if t > 0 {
  175. // save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  176. // }
  177. //}
  178. save["create_time"] = time.Now().Format(util.Date_Full_Layout)
  179. save["update_time"] = time.Now().Format(util.Date_Full_Layout)
  180. for _, id := range ids {
  181. if info := CkhTool.FindOne("information", bson.M{"datajson_id": id}, "starttime,endtime,datajson_expiredate", ""); info != nil && len(*info) > 0 {
  182. if (*info)["starttime"] != nil && util.IntAll((*info)["starttime"]) > 0 {
  183. t := util.Int64All((*info)["starttime"])
  184. save["contract_cycle_start_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  185. }
  186. if (*info)["datajson_expiredate"] != nil && util.IntAll((*info)["datajson_expiredate"]) > 0 {
  187. t := util.Int64All((*info)["datajson_expiredate"])
  188. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  189. }
  190. if (*info)["endtime"] != nil && util.IntAll((*info)["endtime"]) > 0 && save["contract_cycle_end_time"] == nil {
  191. t := util.Int64All((*info)["endtime"])
  192. save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  193. }
  194. break
  195. }
  196. }
  197. // 行业标签
  198. v1 := util.ObjArrToStringArr(tmp["tag_subinformation"].([]interface{}))
  199. for _, sub := range v1 {
  200. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": sub, "stype": 1})
  201. }
  202. save["industry_label"] = strings.Join(v1, ",")
  203. // 物业业态
  204. if tagset, ok := tmp["tag_set"].(map[string]interface{}); ok {
  205. if wuye, o1 := tagset["wuye"].(map[string]interface{}); o1 {
  206. if yt := util.ObjToString(wuye["property_form"]); yt != "" {
  207. var v2 []string
  208. for _, s := range strings.Split(yt, ",") {
  209. if s1 := YeTai[s]; s1 != "" {
  210. v2 = append(v2, s1)
  211. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": s1, "stype": 2})
  212. }
  213. }
  214. save["activities_label"] = strings.Join(v2, ",")
  215. }
  216. if p := util.IntAll(wuye["period"]); p != 0 && ZhouQi[p] != "" {
  217. save["contract_cycle"] = ZhouQi[p]
  218. if util.IntAll(save["bidstatus"]) == 1 {
  219. if util.IntAll(tmp["jgtime"]) > 0 {
  220. if et := getDate(p, util.Int64All(tmp["jgtime"])); et > 0 {
  221. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  222. }
  223. } else if t := getDate(p, util.Int64All(tmp["lasttime"])); t > 0 {
  224. if t > 0 {
  225. save["next_bid_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  226. }
  227. }
  228. }
  229. }
  230. }
  231. }
  232. // 2023年之后的数据
  233. if util.IntAll(save["bidstatus"]) == 1 {
  234. if save["next_bid_time"] == nil && util.Int64All(tmp["jgtime"]) > 1672502400 {
  235. st, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_start_time"]))
  236. et, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_end_time"]))
  237. if t := et.Unix() - st.Unix(); t < 90*24*60*60 {
  238. if tmp["jgtime"] != nil && util.IntAll(tmp["jgtime"]) > 0 {
  239. if et1 := getDate(12, util.Int64All(tmp["jgtime"])); et1 > 0 {
  240. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  241. }
  242. } else {
  243. if t1 := util.Int64All(tmp["lasttime"]); t1 > 0 {
  244. if et1 := getDate(12, util.Int64All(tmp["lasttime"])); et1 > 0 {
  245. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  246. }
  247. }
  248. }
  249. } else {
  250. if t1 := util.Int64All(tmp["jgtime"]); t1 > 0 {
  251. et1 := t1 + t
  252. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  253. } else {
  254. if t2 := util.Int64All(tmp["lasttime"]); t2 > 0 {
  255. et1 := t2 + t
  256. save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
  257. }
  258. }
  259. }
  260. }
  261. // 2023年之后的数据,无合约周期默认 1年
  262. if save["next_bid_time"] == nil && util.Int64All(save["jgtime"]) > 1672502400 {
  263. if et := getDate(12, util.Int64All(tmp["jgtime"])); et > 0 {
  264. save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  265. }
  266. //else {
  267. // if t := util.Int64All(tmp["lasttime"]); t > 0 {
  268. // if et := getDate(12, util.Int64All(tmp["lasttime"])); et > 0 {
  269. // save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
  270. // }
  271. // }
  272. //}
  273. }
  274. }
  275. // 中标单位
  276. for _, w := range strings.Split(util.ObjToString(tmp["s_winner"]), ",") {
  277. if w != "" {
  278. MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": w, "stype": 3})
  279. }
  280. }
  281. save["winner"] = util.ObjToString(tmp["s_winner"])
  282. savePool <- save
  283. }
  284. func getStr(b string) string {
  285. if b == "" {
  286. return ""
  287. }
  288. a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然\n资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
  289. a2 := "(卫健委|医疗)"
  290. a3 := "(教育|学校)"
  291. a4 := "(人行l金融业)"
  292. a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)"
  293. if strings.Contains(a1, b) {
  294. return "政府机构"
  295. } else if strings.Contains(a2, b) {
  296. return "医疗单位"
  297. } else if strings.Contains(a3, b) {
  298. return "教育单位"
  299. } else if strings.Contains(a4, b) {
  300. return "金融企业"
  301. } else if strings.Contains(a5, b) {
  302. return "商业公司"
  303. }
  304. return ""
  305. }
  306. func getDate(p int, stime int64) int64 {
  307. if p == 11 {
  308. } else if p == 12 {
  309. return stime + (365 * 24 * 60 * 60)
  310. } else if p == 13 {
  311. return stime + (365 * 24 * 60 * 60 * 2)
  312. } else if p == 14 {
  313. return stime + (365 * 24 * 60 * 60 * 3)
  314. } else if p == 15 {
  315. return stime + (365 * 24 * 60 * 60 * 5)
  316. }
  317. return 0
  318. }