history.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/util/gconv"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  6. "strings"
  7. "sync"
  8. )
  9. // HisTransactionDataFromBid 历史bidding(指定截止comeintime,采购意向)
  10. func HisTransactionDataFromBid() {
  11. sess := MgoB.GetMgoConn()
  12. defer MgoB.DestoryMongoConn(sess)
  13. ch := make(chan bool, 10)
  14. wg := &sync.WaitGroup{}
  15. lock := &sync.Mutex{}
  16. query := map[string]interface{}{
  17. "toptype": "采购意向",
  18. }
  19. fields := map[string]interface{}{
  20. "projectname": 1,
  21. "budget": 1,
  22. "bidamount": 1,
  23. "buyer": 1,
  24. "s_winner": 1,
  25. "agency": 1,
  26. "property_form": 1,
  27. "multipackage": 1,
  28. "area": 1,
  29. "city": 1,
  30. "district": 1,
  31. //
  32. "publishtime": 1,
  33. "comeintime": 1,
  34. "extracttype": 1,
  35. "tag_subinformation": 1,
  36. "tag_subinformation_ai": 1,
  37. "tag_topinformation": 1,
  38. "tag_topinformation_ai": 1,
  39. }
  40. arr := []map[string]interface{}{}
  41. it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
  42. n := 0
  43. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  44. ch <- true
  45. wg.Add(1)
  46. go func(tmp map[string]interface{}) {
  47. defer func() {
  48. <-ch
  49. wg.Done()
  50. }()
  51. if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800
  52. return
  53. }
  54. if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
  55. return
  56. }
  57. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  58. return
  59. }
  60. result := DealTransactionForBid(tmp, "采购意向", 3)
  61. lock.Lock()
  62. if len(result) > 0 {
  63. arr = append(arr, result)
  64. }
  65. if len(arr) > 50 {
  66. MgoPro.SaveBulk("projectset_wy", arr...)
  67. arr = []map[string]interface{}{}
  68. }
  69. lock.Unlock()
  70. }(tmp)
  71. if n%10000 == 0 {
  72. fmt.Println("current:", n)
  73. }
  74. tmp = map[string]interface{}{}
  75. }
  76. wg.Wait()
  77. if len(arr) > 0 {
  78. MgoPro.SaveBulk("projectset_wy", arr...)
  79. arr = []map[string]interface{}{}
  80. }
  81. fmt.Println("结束")
  82. }
  83. // HisTransactionDataFromBid2 历史bidding(指定截止comeintime,新增项目)
  84. func HisTransactionDataFromBid2() {
  85. sess := MgoB.GetMgoConn()
  86. defer MgoB.DestoryMongoConn(sess)
  87. ch := make(chan bool, 20)
  88. wg := &sync.WaitGroup{}
  89. lock := &sync.Mutex{}
  90. query := map[string]interface{}{
  91. "comeintime": map[string]interface{}{
  92. "$gte": 1713715200,
  93. "$lt": 1713801600,
  94. },
  95. "toptype": "拟建",
  96. }
  97. fields := map[string]interface{}{
  98. "projectname": 1,
  99. "budget": 1,
  100. "bidamount": 1,
  101. "buyer": 1,
  102. "s_winner": 1,
  103. "agency": 1,
  104. "property_form": 1,
  105. "multipackage": 1,
  106. "area": 1,
  107. "city": 1,
  108. "district": 1,
  109. //
  110. "s_topscopeclass": 1,
  111. "publishtime": 1,
  112. "toptype": 1,
  113. "comeintime": 1,
  114. "extracttype": 1,
  115. "tag_subinformation": 1,
  116. "tag_subinformation_ai": 1,
  117. "tag_topinformation": 1,
  118. "tag_topinformation_ai": 1,
  119. }
  120. arr := []map[string]interface{}{}
  121. it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
  122. n := 0
  123. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  124. ch <- true
  125. wg.Add(1)
  126. go func(tmp map[string]interface{}) {
  127. defer func() {
  128. <-ch
  129. wg.Done()
  130. }()
  131. //comeintime := gconv.Int64(tmp["comeintime"])
  132. //if comeintime < 1609430400 || comeintime >= 1713715200 {
  133. // return
  134. //}
  135. if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
  136. return
  137. }
  138. if s_topscopeclass := gconv.String(tmp["s_topscopeclass"]); !strings.Contains(s_topscopeclass, "建筑工程") { //排除非建筑工程
  139. return
  140. }
  141. if tag_topinformation := gconv.String(tmp["tag_topinformation"]); strings.Contains(tag_topinformation, "物业") { //排除物业
  142. return
  143. } else if tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"]); strings.Contains(tag_topinformation_ai, "物业") {
  144. return
  145. }
  146. //if tmp["tag_topinformation"] != nil || tmp["tag_topinformation_ai"] != nil { //不包含物业
  147. // return
  148. //}
  149. project_bidstatus := 4 //拟建
  150. business_type := "新增项目"
  151. result := DealTransactionForBid(tmp, business_type, project_bidstatus)
  152. lock.Lock()
  153. if len(result) > 0 {
  154. arr = append(arr, result)
  155. }
  156. if len(arr) > 50 {
  157. MgoPro.SaveBulk("projectset_wy_nj", arr...)
  158. arr = []map[string]interface{}{}
  159. }
  160. lock.Unlock()
  161. }(tmp)
  162. if n%10000 == 0 {
  163. fmt.Println("current:", n)
  164. }
  165. tmp = map[string]interface{}{}
  166. }
  167. wg.Wait()
  168. if len(arr) > 0 {
  169. MgoPro.SaveBulk("projectset_wy_nj", arr...)
  170. arr = []map[string]interface{}{}
  171. }
  172. fmt.Println("结束")
  173. }
  174. // HisTransactionDataFromProject 历史project(指定截止pici:1713196800)
  175. func HisTransactionDataFromProject() {
  176. sess := MgoPro.GetMgoConn()
  177. defer MgoPro.DestoryMongoConn(sess)
  178. ch := make(chan bool, 20)
  179. wg := &sync.WaitGroup{}
  180. lock := &sync.Mutex{}
  181. query := map[string]interface{}{
  182. "pici": map[string]interface{}{
  183. "$lt": 1713196800,
  184. //"$gt": 1711900800,
  185. },
  186. }
  187. fields := map[string]interface{}{
  188. "projectname": 1,
  189. "budget": 1,
  190. "bidamount": 1,
  191. "buyer": 1,
  192. "s_winner": 1,
  193. "agency": 1,
  194. "property_form": 1,
  195. "multipackage": 1,
  196. "area": 1,
  197. "city": 1,
  198. "district": 1,
  199. "zbtime": 1,
  200. "jgtime": 1,
  201. "bidstatus": 1,
  202. //
  203. "firsttime": 1,
  204. "ids": 1,
  205. "pici": 1,
  206. "sourceinfoid": 1,
  207. "tag_subinformation": 1,
  208. "tag_subinformation_ai": 1,
  209. "tag_topinformation": 1,
  210. "tag_topinformation_ai": 1,
  211. }
  212. arr := []map[string]interface{}{}
  213. it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
  214. n := 0
  215. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  216. ch <- true
  217. wg.Add(1)
  218. go func(tmp map[string]interface{}) {
  219. defer func() {
  220. <-ch
  221. wg.Done()
  222. }()
  223. if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
  224. return
  225. }
  226. result := DealTransactionForPro(tmp)
  227. lock.Lock()
  228. if len(result) > 0 {
  229. arr = append(arr, result)
  230. }
  231. if len(arr) > 50 {
  232. MgoPro.SaveBulk("projectset_wy_back", arr...)
  233. arr = []map[string]interface{}{}
  234. }
  235. lock.Unlock()
  236. }(tmp)
  237. if n%10000 == 0 {
  238. fmt.Println("current:", n)
  239. }
  240. tmp = map[string]interface{}{}
  241. }
  242. wg.Wait()
  243. if len(arr) > 0 {
  244. MgoPro.SaveBulk("projectset_wy_back", arr...)
  245. arr = []map[string]interface{}{}
  246. }
  247. fmt.Println("结束")
  248. }
  249. // HisTransactionDataAddInformation 补充字段信息
  250. func HisTransactionDataAddInformation() {
  251. sess := MgoPro.GetMgoConn()
  252. defer MgoPro.DestoryMongoConn(sess)
  253. ch := make(chan bool, 5)
  254. wg := &sync.WaitGroup{}
  255. lock := &sync.Mutex{}
  256. query := map[string]interface{}{
  257. //"_id": map[string]interface{}{
  258. // "$gte": mongodb.StringTOBsonId("6627289319c5408c478125d4"),
  259. //},
  260. }
  261. it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
  262. n := 0
  263. arr := [][]map[string]interface{}{}
  264. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  265. ch <- true
  266. wg.Add(1)
  267. go func(tmp map[string]interface{}) {
  268. defer func() {
  269. <-ch
  270. wg.Done()
  271. }()
  272. id := mongodb.BsonIdToSId(tmp["_id"])
  273. update := []map[string]interface{}{
  274. {"_id": mongodb.StringTOBsonId(id)},
  275. }
  276. set := map[string]interface{}{}
  277. //法人信息
  278. buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"]))
  279. set["buyer_id"] = buyer_id
  280. set["agency_id"] = agency_id
  281. set["winner_id"] = winner_ids
  282. //项目信息补充业态
  283. //if from := gconv.String(tmp["from"]); from == "project" {
  284. // project_id := gconv.String(tmp["project_id"])
  285. // pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
  286. // if len(*pro) > 0 && (*pro)["property_form"] != nil {
  287. // set["property_form"] = (*pro)["property_form"]
  288. // }
  289. //}
  290. delete(tmp, "from") //无用字段删除
  291. delete(tmp, "_id") //无用字段删除
  292. tmp["buyer_id"] = buyer_id
  293. tmp["agency_id"] = agency_id
  294. tmp["winner_id"] = winner_ids
  295. if !SaveDataToEs(tmp) { //保存、更新es
  296. fmt.Println("数据保存es失败,数据类型 项目project_id", tmp["project_id"])
  297. }
  298. var err error
  299. err = UpdateOrSaveDataToClickHouse(tmp)
  300. if err != nil {
  301. fmt.Println("数据迁移失败,数据类型 项目project_id", tmp["project_id"], err)
  302. }
  303. //更新
  304. update = append(update, map[string]interface{}{"$set": set})
  305. lock.Lock()
  306. arr = append(arr, update)
  307. if len(arr) > 100 {
  308. MgoPro.UpdateBulk("projectset_wy", arr...)
  309. arr = [][]map[string]interface{}{}
  310. }
  311. lock.Unlock()
  312. }(tmp)
  313. if n%100 == 0 {
  314. fmt.Println("current:", n)
  315. }
  316. tmp = map[string]interface{}{}
  317. }
  318. wg.Wait()
  319. if len(arr) > 0 {
  320. MgoPro.UpdateBulk("projectset_wy", arr...)
  321. arr = [][]map[string]interface{}{}
  322. }
  323. fmt.Println("迁移结束...")
  324. }