history.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  6. "strings"
  7. "sync"
  8. )
  9. // HisTransactionDataFromBid 历史bidding(指定截止comeintime,采购意向)
  10. func HisTransactionDataFromBid() {
  11. sess := MgoB.GetMgoConn()
  12. defer MgoB.DestoryMongoConn(sess)
  13. ch := make(chan bool, 10)
  14. wg := &sync.WaitGroup{}
  15. lock := &sync.Mutex{}
  16. query := map[string]interface{}{
  17. "toptype": "采购意向",
  18. }
  19. fields := map[string]interface{}{
  20. "projectname": 1,
  21. "budget": 1,
  22. "bidamount": 1,
  23. "buyer": 1,
  24. "s_winner": 1,
  25. "agency": 1,
  26. "property_form": 1,
  27. "multipackage": 1,
  28. "area": 1,
  29. "city": 1,
  30. "district": 1,
  31. //
  32. "publishtime": 1,
  33. "comeintime": 1,
  34. "extracttype": 1,
  35. "tag_subinformation": 1,
  36. "tag_subinformation_ai": 1,
  37. "tag_topinformation": 1,
  38. "tag_topinformation_ai": 1,
  39. }
  40. arr := []map[string]interface{}{}
  41. it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
  42. n := 0
  43. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  44. ch <- true
  45. wg.Add(1)
  46. go func(tmp map[string]interface{}) {
  47. defer func() {
  48. <-ch
  49. wg.Done()
  50. }()
  51. if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800
  52. return
  53. }
  54. if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
  55. return
  56. }
  57. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  58. return
  59. }
  60. result := DealTransactionForBid(tmp, "采购意向", 3)
  61. lock.Lock()
  62. if len(result) > 0 {
  63. arr = append(arr, result)
  64. }
  65. if len(arr) > 50 {
  66. MgoPro.SaveBulk("projectset_wy", arr...)
  67. arr = []map[string]interface{}{}
  68. }
  69. lock.Unlock()
  70. }(tmp)
  71. if n%10000 == 0 {
  72. fmt.Println("current:", n)
  73. }
  74. tmp = map[string]interface{}{}
  75. }
  76. wg.Wait()
  77. if len(arr) > 0 {
  78. MgoPro.SaveBulk("projectset_wy", arr...)
  79. arr = []map[string]interface{}{}
  80. }
  81. fmt.Println("结束")
  82. }
  83. // HisTransactionDataFromBid2 历史bidding(指定截止comeintime,新增物业项目)
  84. func HisTransactionDataFromBid2() {
  85. sess := MgoB.GetMgoConn()
  86. defer MgoB.DestoryMongoConn(sess)
  87. ch := make(chan bool, 20)
  88. wg := &sync.WaitGroup{}
  89. lock := &sync.Mutex{}
  90. query := map[string]interface{}{
  91. "comeintime": map[string]interface{}{
  92. "$gte": 1713715200,
  93. "$lt": 1713801600,
  94. },
  95. "toptype": "拟建",
  96. }
  97. fields := map[string]interface{}{
  98. "projectname": 1,
  99. "budget": 1,
  100. "bidamount": 1,
  101. "buyer": 1,
  102. "s_winner": 1,
  103. "agency": 1,
  104. "property_form": 1,
  105. "multipackage": 1,
  106. "area": 1,
  107. "city": 1,
  108. "district": 1,
  109. //
  110. "owner": 1,
  111. "s_topscopeclass": 1,
  112. "publishtime": 1,
  113. "toptype": 1,
  114. "comeintime": 1,
  115. "extracttype": 1,
  116. "tag_subinformation": 1,
  117. "tag_subinformation_ai": 1,
  118. "tag_topinformation": 1,
  119. "tag_topinformation_ai": 1,
  120. }
  121. arr := []map[string]interface{}{}
  122. it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
  123. n := 0
  124. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  125. ch <- true
  126. wg.Add(1)
  127. go func(tmp map[string]interface{}) {
  128. defer func() {
  129. <-ch
  130. wg.Done()
  131. }()
  132. //comeintime := gconv.Int64(tmp["comeintime"])
  133. //if comeintime < 1609430400 || comeintime >= 1713715200 {
  134. // return
  135. //}
  136. if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
  137. return
  138. }
  139. if s_topscopeclass := gconv.String(tmp["s_topscopeclass"]); !strings.Contains(s_topscopeclass, "建筑工程") { //排除非建筑工程
  140. return
  141. }
  142. if tag_topinformation := gconv.String(tmp["tag_topinformation"]); strings.Contains(tag_topinformation, "物业") { //排除物业
  143. return
  144. } else if tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"]); strings.Contains(tag_topinformation_ai, "物业") {
  145. return
  146. }
  147. //if tmp["tag_topinformation"] != nil || tmp["tag_topinformation_ai"] != nil { //不包含物业
  148. // return
  149. //}
  150. project_bidstatus := 4 //拟建
  151. business_type := "新增物业项目"
  152. result := DealTransactionForBid(tmp, business_type, project_bidstatus)
  153. lock.Lock()
  154. if len(result) > 0 {
  155. arr = append(arr, result)
  156. }
  157. if len(arr) > 50 {
  158. MgoPro.SaveBulk("projectset_wy_nj", arr...)
  159. arr = []map[string]interface{}{}
  160. }
  161. lock.Unlock()
  162. }(tmp)
  163. if n%10000 == 0 {
  164. fmt.Println("current:", n)
  165. }
  166. tmp = map[string]interface{}{}
  167. }
  168. wg.Wait()
  169. if len(arr) > 0 {
  170. MgoPro.SaveBulk("projectset_wy_nj", arr...)
  171. arr = []map[string]interface{}{}
  172. }
  173. fmt.Println("结束")
  174. }
  175. // HisTransactionDataFromProject 历史project(指定截止pici:1713196800)
  176. func HisTransactionDataFromProject() {
  177. sess := MgoPro.GetMgoConn()
  178. defer MgoPro.DestoryMongoConn(sess)
  179. ch := make(chan bool, 20)
  180. wg := &sync.WaitGroup{}
  181. lock := &sync.Mutex{}
  182. query := map[string]interface{}{
  183. "pici": map[string]interface{}{
  184. "$lt": 1713196800,
  185. //"$gt": 1711900800,
  186. },
  187. }
  188. fields := map[string]interface{}{
  189. "projectname": 1,
  190. "budget": 1,
  191. "bidamount": 1,
  192. "buyer": 1,
  193. "s_winner": 1,
  194. "agency": 1,
  195. "property_form": 1,
  196. "multipackage": 1,
  197. "area": 1,
  198. "city": 1,
  199. "district": 1,
  200. "zbtime": 1,
  201. "jgtime": 1,
  202. "bidstatus": 1,
  203. //
  204. "firsttime": 1,
  205. "ids": 1,
  206. "pici": 1,
  207. "sourceinfoid": 1,
  208. "tag_subinformation": 1,
  209. "tag_subinformation_ai": 1,
  210. "tag_topinformation": 1,
  211. "tag_topinformation_ai": 1,
  212. }
  213. arr := []map[string]interface{}{}
  214. it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
  215. n := 0
  216. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  217. ch <- true
  218. wg.Add(1)
  219. go func(tmp map[string]interface{}) {
  220. defer func() {
  221. <-ch
  222. wg.Done()
  223. }()
  224. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  225. return
  226. }
  227. result := DealTransactionForPro(tmp)
  228. lock.Lock()
  229. if len(result) > 0 {
  230. arr = append(arr, result)
  231. }
  232. if len(arr) > 50 {
  233. MgoPro.SaveBulk("projectset_wy_newback", arr...)
  234. arr = []map[string]interface{}{}
  235. }
  236. lock.Unlock()
  237. }(tmp)
  238. if n%10000 == 0 {
  239. fmt.Println("current:", n)
  240. }
  241. tmp = map[string]interface{}{}
  242. }
  243. wg.Wait()
  244. if len(arr) > 0 {
  245. MgoPro.SaveBulk("projectset_wy_newback", arr...)
  246. arr = []map[string]interface{}{}
  247. }
  248. fmt.Println("结束")
  249. }
  250. // HisTransactionDataAddInformation 补充字段信息
  251. func HisTransactionDataAddInformation() {
  252. sess := MgoPro.GetMgoConn()
  253. defer MgoPro.DestoryMongoConn(sess)
  254. ch := make(chan bool, 1)
  255. wg := &sync.WaitGroup{}
  256. lock := &sync.Mutex{}
  257. query := map[string]interface{}{
  258. //"_id": mongodb.StringTOBsonId("662f01d8397fa006e2e75e6c"),
  259. //项目
  260. //"_id": map[string]interface{}{
  261. // "$gte": mongodb.StringTOBsonId("66308fa06f6c86a3960ae83f"),
  262. // "$lte": mongodb.StringTOBsonId("66308feb6f6c86a3960b0f4e"),
  263. //},
  264. //拟建
  265. //"project_bidstatus": 4,
  266. //"_id": map[string]interface{}{
  267. // "$lte": mongodb.StringTOBsonId("6627227819c5408c474c3802"),
  268. //},
  269. //采购意向
  270. //"project_bidstatus": 3,
  271. //"_id": map[string]interface{}{
  272. // "$lte": mongodb.StringTOBsonId("661f798b5a4e6cc01349dad0"),
  273. //},
  274. //历史projectset_wy
  275. //"project_id": map[string]interface{}{
  276. // //"$gt": "662143800000000000000000",
  277. // "$gt": "667c3b5166cf0db42ae965e6",
  278. //},
  279. "project_id": "6637ae0866cf0db42aeeb5d4",
  280. //历史projectset_wy_back
  281. //"update_time": map[string]interface{}{
  282. // "$gte": 1714959573,
  283. // "$lte": 1719795791,
  284. //},
  285. }
  286. count := MgoPro.Count("projectset_wy_back", query)
  287. fmt.Println("count:", count)
  288. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  289. n := 0
  290. arr := [][]map[string]interface{}{}
  291. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  292. ch <- true
  293. wg.Add(1)
  294. go func(tmp map[string]interface{}) {
  295. defer func() {
  296. <-ch
  297. wg.Done()
  298. }()
  299. id := mongodb.BsonIdToSId(tmp["_id"])
  300. update := []map[string]interface{}{
  301. {"_id": mongodb.StringTOBsonId(id)},
  302. }
  303. set := map[string]interface{}{}
  304. //法人信息
  305. buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"]))
  306. //更新
  307. set["buyer_id"] = buyer_id
  308. set["agency_id"] = agency_id
  309. set["winner_id"] = winner_ids
  310. //保存
  311. tmp["buyer_id"] = buyer_id
  312. tmp["agency_id"] = agency_id
  313. tmp["winner_id"] = winner_ids
  314. if from := gconv.String(tmp["from"]); from == "project" {
  315. //项目信息补充业态
  316. //project_id := gconv.String(tmp["project_id"])
  317. //pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
  318. //if len(*pro) > 0 && (*pro)["property_form"] != nil {
  319. // //更新
  320. // set["property_form"] = (*pro)["property_form"]
  321. // //保存
  322. // tmp["property_form"] = (*pro)["property_form"]
  323. //}
  324. //查询情报信息
  325. ids := gconv.Strings(tmp["info_ids"])
  326. info := FindInfomationData(ids...) //情报信息查询
  327. //更新
  328. set["information_id"] = info.Id
  329. set["starttime"] = info.Starttime
  330. set["endtime"] = info.Endtime
  331. //保存
  332. tmp["information_id"] = info.Id
  333. tmp["starttime"] = info.Starttime
  334. tmp["endtime"] = info.Endtime
  335. } else {
  336. if project_bidstatus := gconv.Int(tmp["project_bidstatus"]); project_bidstatus == 4 { //拟建新增物业项目,补充情报信息
  337. //查询情报信息
  338. id := gconv.String(tmp["info_id"])
  339. info := FindInfomationData(id) //情报信息查询
  340. //更新
  341. set["information_id"] = info.Id
  342. set["starttime"] = info.Starttime
  343. set["endtime"] = info.Endtime
  344. //保存
  345. tmp["information_id"] = info.Id
  346. tmp["starttime"] = info.Starttime
  347. tmp["endtime"] = info.Endtime
  348. }
  349. }
  350. delete(tmp, "from") //无用字段删除
  351. delete(tmp, "_id") //无用字段删除
  352. if !SaveDataToEs(tmp) { //保存、更新es
  353. fmt.Println("数据保存es失败,数据类型 项目project_id", tmp["project_id"])
  354. }
  355. var err error
  356. err = UpdateOrSaveDataToClickHouse(tmp) //保存、更新clickhouse
  357. if err != nil {
  358. fmt.Println("数据迁移失败,数据类型 项目project_id", tmp["project_id"], err)
  359. }
  360. //更新
  361. update = append(update, map[string]interface{}{"$set": set})
  362. lock.Lock()
  363. arr = append(arr, update)
  364. if len(arr) > 100 {
  365. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  366. arr = [][]map[string]interface{}{}
  367. }
  368. lock.Unlock()
  369. }(tmp)
  370. if n%100 == 0 {
  371. fmt.Println("current:", n)
  372. }
  373. tmp = map[string]interface{}{}
  374. }
  375. wg.Wait()
  376. if len(arr) > 0 {
  377. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  378. arr = [][]map[string]interface{}{}
  379. }
  380. fmt.Println("迁移结束...")
  381. }