history.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  6. "sync"
  7. )
  8. // HisTransactionDataFromBid 历史bidding(指定截止comeintime)
  9. func HisTransactionDataFromBid() {
  10. sess := MgoB.GetMgoConn()
  11. defer MgoB.DestoryMongoConn(sess)
  12. ch := make(chan bool, 10)
  13. wg := &sync.WaitGroup{}
  14. lock := &sync.Mutex{}
  15. query := map[string]interface{}{
  16. "toptype": "采购意向",
  17. }
  18. fields := map[string]interface{}{
  19. "projectname": 1,
  20. "budget": 1,
  21. "bidamount": 1,
  22. "buyer": 1,
  23. "s_winner": 1,
  24. "agency": 1,
  25. "property_form": 1,
  26. "multipackage": 1,
  27. "area": 1,
  28. "city": 1,
  29. "district": 1,
  30. //
  31. "publishtime": 1,
  32. "comeintime": 1,
  33. "extracttype": 1,
  34. "tag_subinformation": 1,
  35. "tag_subinformation_ai": 1,
  36. "tag_topinformation": 1,
  37. "tag_topinformation_ai": 1,
  38. }
  39. arr := []map[string]interface{}{}
  40. it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
  41. n := 0
  42. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  43. ch <- true
  44. wg.Add(1)
  45. go func(tmp map[string]interface{}) {
  46. defer func() {
  47. <-ch
  48. wg.Done()
  49. }()
  50. if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800
  51. return
  52. }
  53. if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
  54. return
  55. }
  56. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  57. return
  58. }
  59. result := DealTransactionForBid(tmp)
  60. lock.Lock()
  61. if len(result) > 0 {
  62. arr = append(arr, result)
  63. }
  64. if len(arr) > 50 {
  65. MgoPro.SaveBulk("projectset_wy", arr...)
  66. arr = []map[string]interface{}{}
  67. }
  68. lock.Unlock()
  69. }(tmp)
  70. if n%10000 == 0 {
  71. fmt.Println("current:", n)
  72. }
  73. tmp = map[string]interface{}{}
  74. }
  75. wg.Wait()
  76. if len(arr) > 0 {
  77. MgoPro.SaveBulk("projectset_wy", arr...)
  78. arr = []map[string]interface{}{}
  79. }
  80. fmt.Println("结束")
  81. }
  82. // HisTransactionDataFromProject 历史project(指定截止pici:1713196800)
  83. func HisTransactionDataFromProject() {
  84. sess := MgoPro.GetMgoConn()
  85. defer MgoPro.DestoryMongoConn(sess)
  86. ch := make(chan bool, 20)
  87. wg := &sync.WaitGroup{}
  88. lock := &sync.Mutex{}
  89. query := map[string]interface{}{
  90. "pici": map[string]interface{}{
  91. "$lt": 1713196800,
  92. //"$gt": 1711900800,
  93. },
  94. }
  95. fields := map[string]interface{}{
  96. "projectname": 1,
  97. "budget": 1,
  98. "bidamount": 1,
  99. "buyer": 1,
  100. "s_winner": 1,
  101. "agency": 1,
  102. "property_form": 1,
  103. "multipackage": 1,
  104. "area": 1,
  105. "city": 1,
  106. "district": 1,
  107. "zbtime": 1,
  108. "jgtime": 1,
  109. "bidstatus": 1,
  110. //
  111. "firsttime": 1,
  112. "ids": 1,
  113. "pici": 1,
  114. "sourceinfoid": 1,
  115. "tag_subinformation": 1,
  116. "tag_subinformation_ai": 1,
  117. "tag_topinformation": 1,
  118. "tag_topinformation_ai": 1,
  119. }
  120. arr := []map[string]interface{}{}
  121. it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
  122. n := 0
  123. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  124. ch <- true
  125. wg.Add(1)
  126. go func(tmp map[string]interface{}) {
  127. defer func() {
  128. <-ch
  129. wg.Done()
  130. }()
  131. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  132. return
  133. }
  134. result := DealTransactionForPro(tmp)
  135. lock.Lock()
  136. if len(result) > 0 {
  137. arr = append(arr, result)
  138. }
  139. if len(arr) > 50 {
  140. MgoPro.SaveBulk("projectset_wy_back", arr...)
  141. arr = []map[string]interface{}{}
  142. }
  143. lock.Unlock()
  144. }(tmp)
  145. if n%10000 == 0 {
  146. fmt.Println("current:", n)
  147. }
  148. tmp = map[string]interface{}{}
  149. }
  150. wg.Wait()
  151. if len(arr) > 0 {
  152. MgoPro.SaveBulk("projectset_wy_back", arr...)
  153. arr = []map[string]interface{}{}
  154. }
  155. fmt.Println("结束")
  156. }
  157. // HisTransactionDataAddInformation 补充字段信息
  158. func HisTransactionDataAddInformation() {
  159. sess := MgoPro.GetMgoConn()
  160. defer MgoPro.DestoryMongoConn(sess)
  161. ch := make(chan bool, 20)
  162. wg := &sync.WaitGroup{}
  163. lock := &sync.Mutex{}
  164. query := map[string]interface{}{}
  165. it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
  166. n := 0
  167. arr := [][]map[string]interface{}{}
  168. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  169. ch <- true
  170. wg.Add(1)
  171. go func(tmp map[string]interface{}) {
  172. defer func() {
  173. <-ch
  174. wg.Done()
  175. }()
  176. id := mongodb.BsonIdToSId(tmp["_id"])
  177. update := []map[string]interface{}{
  178. {"_id": tmp["_id"]},
  179. }
  180. set := map[string]interface{}{}
  181. //法人信息
  182. buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"]))
  183. set["buyer_id"] = buyer_id
  184. set["agency_id"] = agency_id
  185. set["winner_ids"] = winner_ids
  186. //项目信息补充业态
  187. if from := gconv.String(tmp["from"]); from == "project" {
  188. project_id := gconv.String(tmp["project_id"])
  189. pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
  190. if len(*pro) > 0 && (*pro)["property_form"] != nil {
  191. set["property_form"] = (*pro)["property_form"]
  192. }
  193. }
  194. update = append(update, map[string]interface{}{"$set": set})
  195. lock.Lock()
  196. arr = append(arr, update)
  197. if len(arr) > 100 {
  198. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  199. arr = [][]map[string]interface{}{}
  200. }
  201. lock.Unlock()
  202. }(tmp)
  203. if n%100 == 0 {
  204. fmt.Println("current:", n)
  205. }
  206. tmp = map[string]interface{}{}
  207. }
  208. wg.Wait()
  209. if len(arr) > 0 {
  210. MgoPro.UpdateBulk("projectset_wy_back", arr...)
  211. arr = [][]map[string]interface{}{}
  212. }
  213. fmt.Println("迁移结束...")
  214. }