xiamen.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997
  1. package main
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "fmt"
  6. "go.mongodb.org/mongo-driver/bson"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "log"
  10. "math"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. "sync"
  15. "time"
  16. "unicode/utf8"
  17. )
  18. // dealXmData 处理中标企业注册地为厦门的数据;陈伟铭 需求
  19. /**
  20. 中标方企业注册地为厦门的企业数量一共是多少,可以分别看下22年度,23年度的两个数据总量
  21. */
  22. func dealXmData() {
  23. Mgo := &mongodb.MongodbSim{
  24. MongodbAddr: "172.17.189.140:27080",
  25. //MongodbAddr: "127.0.0.1:27083",
  26. DbName: "qfw",
  27. Size: 10,
  28. UserName: "SJZY_RWbid_ES",
  29. Password: "SJZY@B4i4D5e6S",
  30. //Direct: true,
  31. }
  32. Mgo.InitPool()
  33. MgoC := &mongodb.MongodbSim{
  34. MongodbAddr: "172.17.189.140:27080",
  35. //MongodbAddr: "127.0.0.1:27083",
  36. DbName: "mixdata",
  37. Size: 10,
  38. UserName: "SJZY_RWbid_ES",
  39. Password: "SJZY@B4i4D5e6S",
  40. //Direct: true,
  41. }
  42. MgoC.InitPool()
  43. defer util.Catch()
  44. sess := Mgo.GetMgoConn()
  45. defer Mgo.DestoryMongoConn(sess)
  46. pool := make(chan bool, 10) //处理协程
  47. wg := &sync.WaitGroup{}
  48. // 设置查询条件
  49. filter := bson.D{
  50. //{"publishtime", bson.M{"$gte": 1640966400, "$lt": 1672502400}},
  51. {"publishtime", bson.M{"$gte": 1640966400}},
  52. {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
  53. }
  54. selected := map[string]interface{}{
  55. "contenthtml": 0, // 0表示不返回该字段
  56. "attach_text": 0, // 0表示不返回该字段
  57. "detail": 0, // 0表示不返回该字段
  58. "purchasingsource": 0, // 0表示不返回该字段
  59. "jsondata": 0, // 0表示不返回该字段
  60. "package": 0, // 0表示不返回该字段
  61. }
  62. it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
  63. count := 0
  64. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  65. if count%10000 == 0 {
  66. log.Println("current :", count)
  67. }
  68. pool <- true
  69. wg.Add(1)
  70. go func(tmp map[string]interface{}) {
  71. defer func() {
  72. <-pool
  73. wg.Done()
  74. }()
  75. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  76. tmp = make(map[string]interface{})
  77. return
  78. }
  79. // 针对存量数据,重复数据不进索引
  80. if util.IntAll(tmp["extracttype"]) == -1 {
  81. return
  82. }
  83. swinner := util.ObjToString(tmp["s_winner"])
  84. if swinner == "" {
  85. return
  86. }
  87. if utf8.RuneCountInString(swinner) < 4 {
  88. return
  89. }
  90. publishtime := util.Int64All(tmp["publishtime"])
  91. projectName := util.ObjToString(tmp["projectname"])
  92. if strings.Contains(swinner, ",") {
  93. winners := strings.Split(swinner, ",")
  94. for _, v := range winners {
  95. if utf8.RuneCountInString(v) < 4 {
  96. continue
  97. } else {
  98. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
  99. if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
  100. continue
  101. }
  102. city := util.ObjToString((*da)["company_city"])
  103. if city != "厦门市" {
  104. continue
  105. }
  106. insert := map[string]interface{}{
  107. "winner": v,
  108. "credit_no": (*da)["credit_no"],
  109. "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
  110. "projectname": projectName,
  111. "company_type": (*da)["company_type"],
  112. "company_status": (*da)["company_status"],
  113. "company_area": (*da)["company_area"],
  114. "company_city": (*da)["company_city"],
  115. "year": time.Unix(publishtime, 0).Year(),
  116. }
  117. Mgo.Save("wcc_xiamen_winner", insert)
  118. }
  119. }
  120. } else {
  121. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
  122. if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
  123. return
  124. }
  125. city := util.ObjToString((*da)["company_city"])
  126. if city != "厦门市" {
  127. return
  128. }
  129. insert := map[string]interface{}{
  130. "winner": swinner,
  131. "credit_no": (*da)["credit_no"],
  132. "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
  133. "projectname": projectName,
  134. "company_type": (*da)["company_type"],
  135. "company_status": (*da)["company_status"],
  136. "company_area": (*da)["company_area"],
  137. "company_city": (*da)["company_city"],
  138. "year": time.Unix(publishtime, 0).Year(),
  139. }
  140. Mgo.Save("wcc_xiamen_winner", insert)
  141. }
  142. }(tmp)
  143. tmp = make(map[string]interface{})
  144. }
  145. wg.Wait()
  146. log.Println("结束")
  147. }
  148. // CountBidamount 统计厦门中标单位,中标金额总数
  149. func CountBidamount() {
  150. Mgo := &mongodb.MongodbSim{
  151. MongodbAddr: "172.17.189.140:27080",
  152. //MongodbAddr: "127.0.0.1:27083",
  153. DbName: "qfw",
  154. Size: 10,
  155. UserName: "SJZY_RWbid_ES",
  156. Password: "SJZY@B4i4D5e6S",
  157. //Direct: true,
  158. }
  159. Mgo.InitPool()
  160. defer util.Catch()
  161. sess := Mgo.GetMgoConn()
  162. defer Mgo.DestoryMongoConn(sess)
  163. var BidMap = make(map[string]bool)
  164. it := sess.DB("qfw").C("wcc_xiamen_winner").Find(nil).Select(nil).Iter()
  165. count := 0
  166. var total = float64(0)
  167. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  168. biddingID := util.ObjToString(tmp["bidding_id"])
  169. if BidMap[biddingID] {
  170. continue
  171. }
  172. data, _ := Mgo.FindById("bidding", biddingID, nil)
  173. BidMap[biddingID] = true
  174. bid := util.Float64All((*data)["bidamount"])
  175. total += bid
  176. tmp = make(map[string]interface{})
  177. }
  178. log.Println("total", total)
  179. }
  180. // exportXiaMenJson 导出厦门数据,JSON格式导出
  181. func exportXiaMenJson() {
  182. Mgo := &mongodb.MongodbSim{
  183. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  184. //MongodbAddr: "127.0.0.1:27083",
  185. DbName: "qfw",
  186. Size: 10,
  187. UserName: "SJZY_RWbid_ES",
  188. Password: "SJZY@B4i4D5e6S",
  189. //Direct: true,
  190. }
  191. Mgo.InitPool()
  192. MgoC := &mongodb.MongodbSim{
  193. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  194. //MongodbAddr: "127.0.0.1:27083",
  195. DbName: "mixdata",
  196. Size: 10,
  197. UserName: "SJZY_RWbid_ES",
  198. Password: "SJZY@B4i4D5e6S",
  199. //Direct: true,
  200. }
  201. MgoC.InitPool()
  202. defer util.Catch()
  203. sess := Mgo.GetMgoConn()
  204. defer Mgo.DestoryMongoConn(sess)
  205. pool := make(chan bool, 10) //处理协程
  206. wg := &sync.WaitGroup{}
  207. // 设置查询条件
  208. filter := bson.D{
  209. //{"publishtime", bson.M{"$gte": 1640966400, "$lt": 1672502400}},
  210. {"publishtime", bson.M{"$gte": 1609430400}}, //2021-1-1
  211. {"subtype", bson.M{"$in": []string{"中标", "成交", "合同", "单一"}}},
  212. }
  213. selected := map[string]interface{}{
  214. "detail": 1,
  215. "href": 1,
  216. "title": 1,
  217. "publishtime": 1,
  218. "area": 1,
  219. "bidopentime": 1,
  220. "budget": 1,
  221. "buyer": 1,
  222. "bidamount": 1,
  223. "buyertel": 1,
  224. "buyerperson": 1,
  225. "city": 1,
  226. "subtype": 1,
  227. "projectname": 1,
  228. "projectcode": 1,
  229. "projectscope": 1,
  230. "agency": 1,
  231. "s_winner": 1,
  232. "winnerperson ": 1,
  233. "winnertel ": 1,
  234. }
  235. it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
  236. count := 0
  237. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  238. if count%10000 == 0 {
  239. log.Println("current :", count)
  240. }
  241. pool <- true
  242. wg.Add(1)
  243. go func(tmp map[string]interface{}) {
  244. defer func() {
  245. <-pool
  246. wg.Done()
  247. }()
  248. // 创建一个新的map用于goroutine,避免重用
  249. docCopy := make(map[string]interface{})
  250. for k, v := range tmp {
  251. docCopy[k] = v
  252. }
  253. docCopy["id"] = mongodb.BsonIdToSId(tmp["_id"])
  254. docCopy["jybxhref"] = GetJyURLByID(mongodb.BsonIdToSId(tmp["_id"]))
  255. //
  256. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  257. tmp = make(map[string]interface{})
  258. return
  259. }
  260. // 针对存量数据,重复数据不进索引
  261. if util.IntAll(tmp["extracttype"]) == -1 {
  262. return
  263. }
  264. swinner := util.ObjToString(tmp["s_winner"])
  265. if swinner == "" {
  266. return
  267. }
  268. if utf8.RuneCountInString(swinner) < 4 {
  269. return
  270. }
  271. if strings.Contains(swinner, ",") {
  272. winner_credit_no_arr := make([]string, 0)
  273. var personMap = make([]string, 0)
  274. var phoneMap = make([]string, 0)
  275. var emaolMap = make([]string, 0)
  276. winners := strings.Split(swinner, ",")
  277. for _, v := range winners {
  278. if utf8.RuneCountInString(v) < 4 {
  279. continue
  280. } else {
  281. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
  282. winner_credit_no_arr = append(winner_credit_no_arr, util.ObjToString((*da)["credit_no"]))
  283. //2.联系人,连线方式
  284. if reports, ok := (*da)["annual_reports"]; ok {
  285. if rs, ok := reports.([]interface{}); ok {
  286. // 2. 初始化变量来保存最新的年份及对应的报告信息
  287. var latestReport map[string]interface{}
  288. latestYear := math.MinInt // 初始设置为最小值
  289. // 3. 遍历年度报告
  290. for _, report := range rs {
  291. if r, ok := report.(map[string]interface{}); ok {
  292. // 获取报告年份
  293. if year, ok := r["report_year"].(float64); ok {
  294. reportYear := int(year) // 转换为int
  295. if reportYear > latestYear {
  296. latestYear = reportYear
  297. latestReport = r
  298. }
  299. }
  300. }
  301. }
  302. // 4. 输出最新的报告信息
  303. if latestReport != nil {
  304. personMap = append(personMap, util.ObjToString(latestReport["operator_name"]))
  305. phoneMap = append(phoneMap, util.ObjToString(latestReport["company_phone"]))
  306. emaolMap = append(emaolMap, util.ObjToString(latestReport["company_email"]))
  307. //fmt.Println("最新的报告信息:", latestReport)
  308. }
  309. }
  310. }
  311. }
  312. }
  313. docCopy["winner_credit_no"] = strings.Join(winner_credit_no_arr, ",")
  314. docCopy["legal_person"] = strings.Join(personMap, ",")
  315. docCopy["company_phone"] = strings.Join(phoneMap, ",")
  316. docCopy["company_email"] = strings.Join(emaolMap, ",")
  317. Mgo.Save("wcc_xiamen_winner_1221", docCopy)
  318. } else {
  319. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
  320. docCopy["winner_credit_no"] = util.ObjToString((*da)["credit_no"])
  321. // 1. 检查是否存在 "annual_reports" 字段,并且它是一个切片类型
  322. if reports, ok := (*da)["annual_reports"]; ok {
  323. if rs, ok := reports.([]interface{}); ok {
  324. // 2. 初始化变量来保存最新的年份及对应的报告信息
  325. var latestReport map[string]interface{}
  326. latestYear := math.MinInt // 初始设置为最小值
  327. // 3. 遍历年度报告
  328. for _, report := range rs {
  329. if r, ok := report.(map[string]interface{}); ok {
  330. // 获取报告年份
  331. if year, ok := r["report_year"].(float64); ok {
  332. reportYear := int(year) // 转换为int
  333. if reportYear > latestYear {
  334. latestYear = reportYear
  335. latestReport = r
  336. }
  337. }
  338. }
  339. }
  340. // 4. 输出最新的报告信息
  341. if latestReport != nil {
  342. docCopy["legal_person"] = latestReport["operator_name"]
  343. docCopy["company_phone"] = latestReport["company_phone"]
  344. docCopy["company_email"] = latestReport["company_email"]
  345. //fmt.Println("最新的报告信息:", latestReport)
  346. }
  347. }
  348. }
  349. //
  350. Mgo.Save("wcc_xiamen_winner_1221", docCopy)
  351. }
  352. }(tmp)
  353. tmp = make(map[string]interface{})
  354. }
  355. wg.Wait()
  356. log.Println("结束")
  357. }
  358. const MaxRecordsPerFile = 1000000 // 每个文件的最大记录数
  359. // exportXiaMenJsonFile 导出JSON文件
  360. func exportXiaMenJsonFile() {
  361. // 配置文件夹路径
  362. outputDir := "./output" // 你可以根据需要设置这个路径
  363. // 创建文件夹,如果不存在的话
  364. if _, err := os.Stat(outputDir); os.IsNotExist(err) {
  365. err := os.MkdirAll(outputDir, os.ModePerm)
  366. if err != nil {
  367. log.Fatalf("无法创建文件夹: %v", err)
  368. }
  369. }
  370. // 模拟数据和其他初始化
  371. Mgo := &mongodb.MongodbSim{
  372. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  373. //MongodbAddr: "127.0.0.1:27083",
  374. DbName: "qfw",
  375. Size: 10,
  376. UserName: "SJZY_RWbid_ES",
  377. Password: "SJZY@B4i4D5e6S",
  378. //Direct: true,
  379. }
  380. Mgo.InitPool()
  381. MgoC := &mongodb.MongodbSim{
  382. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  383. //MongodbAddr: "127.0.0.1:27083",
  384. DbName: "mixdata",
  385. Size: 10,
  386. UserName: "SJZY_RWbid_ES",
  387. Password: "SJZY@B4i4D5e6S",
  388. //Direct: true,
  389. }
  390. MgoC.InitPool()
  391. defer util.Catch()
  392. sess := Mgo.GetMgoConn()
  393. defer Mgo.DestoryMongoConn(sess)
  394. pool := make(chan bool, 10) // 处理协程
  395. wg := &sync.WaitGroup{}
  396. // 用于存储数据
  397. var allRecords []map[string]interface{}
  398. var mu sync.Mutex // 用于同步访问 allRecords
  399. // 设置查询条件
  400. filter := bson.D{
  401. {"publishtime", bson.M{"$gte": 1609430400}},
  402. {"subtype", bson.M{"$in": []string{"中标", "成交", "合同", "单一"}}},
  403. }
  404. selected := map[string]interface{}{
  405. "detail": 1,
  406. "href": 1,
  407. "title": 1,
  408. "publishtime": 1,
  409. "area": 1,
  410. "bidopentime": 1,
  411. "budget": 1,
  412. "buyer": 1,
  413. "bidamount": 1,
  414. "buyertel": 1,
  415. "buyerperson": 1,
  416. "city": 1,
  417. "subtype": 1,
  418. "projectname": 1,
  419. "projectcode": 1,
  420. "projectscope": 1,
  421. "agency": 1,
  422. "s_winner": 1,
  423. "winnerperson": 1,
  424. "winnertel": 1,
  425. "extracttype": 1,
  426. "sensitive": 1,
  427. }
  428. SE := util.SimpleEncrypt{Key: "topJYBX2019"}
  429. curren := 0
  430. it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Limit(101000).Iter()
  431. count := 0
  432. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  433. if count%10000 == 0 {
  434. log.Println("current :", count, tmp["title"], tmp["_id"], curren)
  435. }
  436. // 过滤敏感数据
  437. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" {
  438. continue
  439. }
  440. // 针对存量数据,重复数据不进索引
  441. if util.IntAll(tmp["extracttype"]) == -1 {
  442. continue
  443. }
  444. mu.Lock()
  445. // 每当累积到 MaxRecordsPerFile 条数据时,保存文件并清空 allRecords
  446. if len(allRecords) == MaxRecordsPerFile {
  447. log.Println("len allRecords", len(allRecords), curren)
  448. saveToFile2(outputDir, allRecords)
  449. allRecords = nil // 清空数据
  450. }
  451. mu.Unlock()
  452. // 创建一个新的map用于goroutine,避免重用
  453. docCopy2 := make(map[string]interface{})
  454. for k, v := range tmp {
  455. docCopy2[k] = v
  456. }
  457. delete(docCopy2, "sensitive")
  458. delete(docCopy2, "extracttype")
  459. curren++
  460. pool <- true
  461. wg.Add(1)
  462. go func(tmp map[string]interface{}) {
  463. defer func() {
  464. <-pool
  465. wg.Done()
  466. }()
  467. docCopy := make(map[string]interface{})
  468. for k, v := range tmp {
  469. docCopy[k] = v
  470. }
  471. docCopy["id"] = SE.EncodeString(mongodb.BsonIdToSId(tmp["_id"]))
  472. docCopy["jybxhref"] = GetJyURLByID(mongodb.BsonIdToSId(tmp["_id"]))
  473. // 合并数据
  474. swinner := util.ObjToString(tmp["s_winner"])
  475. if strings.Contains(swinner, ",") {
  476. winner_credit_no_arr := make([]string, 0)
  477. var personMap = make([]string, 0)
  478. var phoneMap = make([]string, 0)
  479. var emaolMap = make([]string, 0)
  480. winners := strings.Split(swinner, ",")
  481. for _, v := range winners {
  482. if utf8.RuneCountInString(v) < 4 {
  483. continue
  484. } else {
  485. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
  486. winner_credit_no_arr = append(winner_credit_no_arr, util.ObjToString((*da)["credit_no"]))
  487. personMap = append(personMap, util.ObjToString((*da)["legal_person"]))
  488. //2.联系人,连线方式
  489. if reports, ok := (*da)["annual_reports"]; ok {
  490. if rs, ok := reports.([]interface{}); ok {
  491. // 2. 初始化变量来保存最新的年份及对应的报告信息
  492. var latestReport map[string]interface{}
  493. latestYear := math.MinInt // 初始设置为最小值
  494. // 3. 遍历年度报告
  495. for _, report := range rs {
  496. if r, ok := report.(map[string]interface{}); ok {
  497. // 获取报告年份
  498. if year, ok := r["report_year"].(float64); ok {
  499. reportYear := int(year) // 转换为int
  500. if reportYear > latestYear {
  501. latestYear = reportYear
  502. latestReport = r
  503. }
  504. }
  505. }
  506. }
  507. // 4. 输出最新的报告信息
  508. if latestReport != nil {
  509. phoneMap = append(phoneMap, util.ObjToString(latestReport["company_phone"]))
  510. emaolMap = append(emaolMap, util.ObjToString(latestReport["company_email"]))
  511. //fmt.Println("最新的报告信息:", latestReport)
  512. }
  513. }
  514. }
  515. }
  516. }
  517. docCopy["winner_credit_no"] = strings.Join(winner_credit_no_arr, ",")
  518. docCopy["legal_person"] = strings.Join(personMap, ",")
  519. docCopy["company_phone"] = strings.Join(phoneMap, ",")
  520. docCopy["company_email"] = strings.Join(emaolMap, ",")
  521. } else {
  522. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
  523. docCopy["winner_credit_no"] = util.ObjToString((*da)["credit_no"])
  524. docCopy["legal_person"] = util.ObjToString((*da)["legal_person"])
  525. // 1. 检查是否存在 "annual_reports" 字段,并且它是一个切片类型
  526. if reports, ok := (*da)["annual_reports"]; ok {
  527. if rs, ok := reports.([]interface{}); ok {
  528. // 2. 初始化变量来保存最新的年份及对应的报告信息
  529. var latestReport map[string]interface{}
  530. latestYear := math.MinInt // 初始设置为最小值
  531. // 3. 遍历年度报告
  532. for _, report := range rs {
  533. if r, ok := report.(map[string]interface{}); ok {
  534. // 获取报告年份
  535. if year, ok := r["report_year"].(float64); ok {
  536. reportYear := int(year) // 转换为int
  537. if reportYear > latestYear {
  538. latestYear = reportYear
  539. latestReport = r
  540. }
  541. }
  542. }
  543. }
  544. // 4. 输出最新的报告信息
  545. if latestReport != nil {
  546. docCopy["company_phone"] = latestReport["company_phone"]
  547. docCopy["company_email"] = latestReport["company_email"]
  548. }
  549. }
  550. }
  551. }
  552. // 加锁,确保并发安全地修改 allRecords
  553. //Mgo.SaveByOriID("wcc_xiamen_winner_1227", docCopy)
  554. mu.Lock()
  555. delete(docCopy, "_id")
  556. allRecords = append(allRecords, docCopy)
  557. mu.Unlock()
  558. }(docCopy2)
  559. tmp = make(map[string]interface{})
  560. }
  561. // 等待所有协程处理完
  562. wg.Wait()
  563. // 如果还有剩余的数据,保存最后一个文件
  564. if len(allRecords) > 0 {
  565. log.Println("len allRecords", len(allRecords), curren)
  566. saveToFile2(outputDir, allRecords)
  567. }
  568. log.Println("所有数据导出完成", curren)
  569. }
  570. // 将数据保存到文件
  571. func saveToFile2(outputDir string, records []map[string]interface{}) {
  572. // 创建文件名,包含时间戳
  573. fileName := fmt.Sprintf("%s_%s.json.gz", "xiaMenData", time.Now().Format("20060102_150405"))
  574. filePath := filepath.Join(outputDir, fileName)
  575. log.Println("开始写入文件:", fileName, "数据总量", len(records))
  576. // 创建文件
  577. file, err := os.Create(filePath)
  578. if err != nil {
  579. log.Fatalf("无法创建文件: %v", err)
  580. }
  581. defer file.Close()
  582. // 使用gzip压缩
  583. gzipWriter := gzip.NewWriter(file)
  584. defer gzipWriter.Close()
  585. // 创建 JSON 编码器
  586. jsonEncoder := json.NewEncoder(gzipWriter)
  587. jsonEncoder.SetIndent("", " ")
  588. // 批量写入数据,逐条处理并写入文件
  589. batchSize := 10000 // 每次写入的数据条数
  590. for i := 0; i < len(records); i += batchSize {
  591. // 计算本次要写入的批次数据
  592. end := i + batchSize
  593. if end > len(records) {
  594. end = len(records)
  595. }
  596. batch := records[i:end] // 获取当前批次数据
  597. // 编码数据并写入压缩文件
  598. if err := jsonEncoder.Encode(batch); err != nil {
  599. log.Fatalf("无法写入 JSON 数据: %v", err)
  600. }
  601. // 打印进度,确保文件写入及时反馈
  602. log.Printf("成功写入 %d/%d 条数据", end, len(records))
  603. }
  604. log.Printf("数据成功写入压缩文件: %s", filePath)
  605. }
  606. // exportXiaMenJsonFile 导出JSON文件
  607. func exportXiaMenJsonFile2() {
  608. // 配置文件夹路径
  609. var outputDir = "./output" // 你可以根据需要设置这个路径
  610. // 创建文件夹,如果不存在的话
  611. if _, err := os.Stat(outputDir); os.IsNotExist(err) {
  612. err := os.MkdirAll(outputDir, os.ModePerm)
  613. if err != nil {
  614. log.Fatalf("无法创建文件夹: %v", err)
  615. }
  616. }
  617. // 模拟数据和其他初始化
  618. Mgo := &mongodb.MongodbSim{
  619. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  620. DbName: "qfw",
  621. Size: 10,
  622. UserName: "SJZY_RWbid_ES",
  623. Password: "SJZY@B4i4D5e6S",
  624. }
  625. Mgo.InitPool()
  626. MgoC := &mongodb.MongodbSim{
  627. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  628. DbName: "mixdata",
  629. Size: 10,
  630. UserName: "SJZY_RWbid_ES",
  631. Password: "SJZY@B4i4D5e6S",
  632. }
  633. MgoC.InitPool()
  634. defer util.Catch()
  635. sess := Mgo.GetMgoConn()
  636. defer Mgo.DestoryMongoConn(sess)
  637. pool := make(chan bool, 10) // 处理协程
  638. wg := &sync.WaitGroup{}
  639. // 用于存储数据
  640. var allRecords []map[string]interface{}
  641. var mu sync.Mutex // 用于同步访问 allRecords
  642. // 设置查询条件
  643. filter := bson.D{
  644. {"publishtime", bson.M{"$gte": 1609430400}},
  645. {"subtype", bson.M{"$in": []string{"中标", "成交", "合同", "单一"}}},
  646. }
  647. selected := map[string]interface{}{
  648. "detail": 1,
  649. "href": 1,
  650. "title": 1,
  651. "publishtime": 1,
  652. "area": 1,
  653. "bidopentime": 1,
  654. "budget": 1,
  655. "buyer": 1,
  656. "bidamount": 1,
  657. "buyertel": 1,
  658. "buyerperson": 1,
  659. "city": 1,
  660. "subtype": 1,
  661. "projectname": 1,
  662. "projectcode": 1,
  663. "projectscope": 1,
  664. "agency": 1,
  665. "s_winner": 1,
  666. "winnerperson": 1,
  667. "winnertel": 1,
  668. "extracttype": 1,
  669. "sensitive": 1,
  670. }
  671. SE := util.SimpleEncrypt{Key: "topJYBX2019"}
  672. curren := 0
  673. it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
  674. count := 0
  675. // 使用通道来传递记录
  676. dataChannel := make(chan map[string]interface{}, 10000)
  677. // 保存数据的协程
  678. go func() {
  679. for records := range dataChannel {
  680. mu.Lock()
  681. allRecords = append(allRecords, records)
  682. // 每当累积到 MaxRecordsPerFile 条数据时,保存文件并清空 allRecords
  683. if len(allRecords) >= MaxRecordsPerFile {
  684. log.Println("保存文件,记录数:", len(allRecords))
  685. saveToFile3(outputDir, allRecords)
  686. allRecords = nil // 清空数据
  687. }
  688. mu.Unlock()
  689. }
  690. // 在通道关闭后,检查是否有剩余数据需要保存
  691. mu.Lock()
  692. if len(allRecords) > 0 {
  693. log.Println("保存最后一批数据,记录数:", len(allRecords))
  694. saveToFile3(outputDir, allRecords)
  695. allRecords = nil // 清空数据
  696. }
  697. mu.Unlock()
  698. }()
  699. //读取数据库
  700. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  701. if count%10000 == 0 {
  702. log.Println("current :", count, tmp["title"], tmp["_id"], curren)
  703. }
  704. // 过滤敏感数据
  705. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" {
  706. continue
  707. }
  708. // 针对存量数据,重复数据不进索引
  709. if util.IntAll(tmp["extracttype"]) == -1 {
  710. continue
  711. }
  712. // 创建一个新的map用于goroutine,避免重用
  713. docCopy2 := make(map[string]interface{})
  714. for k, v := range tmp {
  715. docCopy2[k] = v
  716. }
  717. delete(docCopy2, "sensitive")
  718. delete(docCopy2, "extracttype")
  719. curren++
  720. // 启动一个新的协程来处理每条数据
  721. pool <- true
  722. wg.Add(1)
  723. go func(tmp map[string]interface{}) {
  724. defer func() {
  725. <-pool
  726. wg.Done()
  727. }()
  728. docCopy := make(map[string]interface{})
  729. for k, v := range tmp {
  730. docCopy[k] = v
  731. }
  732. docCopy["id"] = SE.EncodeString(mongodb.BsonIdToSId(tmp["_id"]))
  733. docCopy["jybxhref"] = GetJyURLByID(mongodb.BsonIdToSId(tmp["_id"]))
  734. // 合并数据
  735. swinner := util.ObjToString(tmp["s_winner"])
  736. if strings.Contains(swinner, ",") {
  737. // 处理多个赢家的情况
  738. winner_credit_no_arr := make([]string, 0)
  739. var personMap = make([]string, 0)
  740. var phoneMap = make([]string, 0)
  741. var emaolMap = make([]string, 0)
  742. winners := strings.Split(swinner, ",")
  743. for _, v := range winners {
  744. if utf8.RuneCountInString(v) < 4 {
  745. continue
  746. } else {
  747. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
  748. winner_credit_no_arr = append(winner_credit_no_arr, util.ObjToString((*da)["credit_no"]))
  749. personMap = append(personMap, util.ObjToString((*da)["legal_person"]))
  750. //2.联系人,连线方式
  751. if reports, ok := (*da)["annual_reports"]; ok {
  752. if rs, ok := reports.([]interface{}); ok {
  753. // 2. 初始化变量来保存最新的年份及对应的报告信息
  754. var latestReport map[string]interface{}
  755. latestYear := math.MinInt // 初始设置为最小值
  756. // 3. 遍历年度报告
  757. for _, report := range rs {
  758. if r, ok := report.(map[string]interface{}); ok {
  759. // 获取报告年份
  760. if year, ok := r["report_year"].(float64); ok {
  761. reportYear := int(year) // 转换为int
  762. if reportYear > latestYear {
  763. latestYear = reportYear
  764. latestReport = r
  765. }
  766. }
  767. }
  768. }
  769. // 4. 输出最新的报告信息
  770. if latestReport != nil {
  771. phoneMap = append(phoneMap, util.ObjToString(latestReport["company_phone"]))
  772. emaolMap = append(emaolMap, util.ObjToString(latestReport["company_email"]))
  773. }
  774. }
  775. }
  776. }
  777. }
  778. docCopy["winner_credit_no"] = strings.Join(winner_credit_no_arr, ",")
  779. docCopy["legal_person"] = strings.Join(personMap, ",")
  780. docCopy["company_phone"] = strings.Join(phoneMap, ",")
  781. docCopy["company_email"] = strings.Join(emaolMap, ",")
  782. } else {
  783. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
  784. docCopy["winner_credit_no"] = util.ObjToString((*da)["credit_no"])
  785. docCopy["legal_person"] = util.ObjToString((*da)["legal_person"])
  786. // 1. 检查是否存在 "annual_reports" 字段,并且它是一个切片类型
  787. if reports, ok := (*da)["annual_reports"]; ok {
  788. if rs, ok := reports.([]interface{}); ok {
  789. // 2. 初始化变量来保存最新的年份及对应的报告信息
  790. var latestReport map[string]interface{}
  791. latestYear := math.MinInt // 初始设置为最小值
  792. // 3. 遍历年度报告
  793. for _, report := range rs {
  794. if r, ok := report.(map[string]interface{}); ok {
  795. // 获取报告年份
  796. if year, ok := r["report_year"].(float64); ok {
  797. reportYear := int(year) // 转换为int
  798. if reportYear > latestYear {
  799. latestYear = reportYear
  800. latestReport = r
  801. }
  802. }
  803. }
  804. }
  805. // 4. 输出最新的报告信息
  806. if latestReport != nil {
  807. docCopy["company_phone"] = latestReport["company_phone"]
  808. docCopy["company_email"] = latestReport["company_email"]
  809. }
  810. }
  811. }
  812. }
  813. // 通过channel发送数据
  814. delete(docCopy, "_id")
  815. dataChannel <- docCopy
  816. }(docCopy2)
  817. tmp = make(map[string]interface{})
  818. }
  819. // 等待所有协程处理完
  820. wg.Wait()
  821. // 关闭channel,确保保存线程能够结束
  822. close(dataChannel)
  823. log.Println("所有数据导出完成", curren)
  824. select {}
  825. }
  826. func saveToFile3(outputDir string, records []map[string]interface{}) {
  827. // 创建文件名,包含时间戳
  828. fileName := fmt.Sprintf("%s_%s.json.gz", "xiaMenData", time.Now().Format("20060102_150405"))
  829. filePath := filepath.Join(outputDir, fileName)
  830. log.Println("开始写入文件:", fileName, "数据总量", len(records))
  831. // 创建文件
  832. file, err := os.Create(filePath)
  833. if err != nil {
  834. log.Fatalf("无法创建文件: %v", err)
  835. }
  836. defer file.Close()
  837. // 使用gzip压缩
  838. gzipWriter := gzip.NewWriter(file)
  839. defer gzipWriter.Close()
  840. // 创建 JSON 编码器
  841. jsonEncoder := json.NewEncoder(gzipWriter)
  842. // 不需要设置缩进,因为每条记录将是单独的 JSON 对象,通常不需要美化
  843. // jsonEncoder.SetIndent("", " ")
  844. // 批量写入数据,逐条处理并写入文件
  845. for k, record := range records {
  846. // 编码每条记录并写入压缩文件,每条记录一行
  847. if err := jsonEncoder.Encode(record); err != nil {
  848. log.Fatalf("无法写入 JSON 数据: %v", err)
  849. }
  850. if k%1000 == 0 {
  851. log.Println("成功写入文件行数:", k, fileName)
  852. }
  853. }
  854. log.Printf("数据成功写入压缩文件: %s", filePath)
  855. }
  856. // 将数据保存到文件
  857. func saveToFile(outputDir string, records []map[string]interface{}) {
  858. // 创建文件名,包含时间戳
  859. fileName := fmt.Sprintf("%s_%s.json.gz", "xiaMenData", time.Now().Format("20060102_150405"))
  860. filePath := filepath.Join(outputDir, fileName)
  861. // 创建文件
  862. file, err := os.Create(filePath)
  863. if err != nil {
  864. log.Fatalf("无法创建文件: %v", err)
  865. }
  866. defer file.Close()
  867. // 使用gzip压缩
  868. gzipWriter := gzip.NewWriter(file)
  869. defer gzipWriter.Close()
  870. // 创建 JSON 编码器
  871. jsonEncoder := json.NewEncoder(gzipWriter)
  872. jsonEncoder.SetIndent("", " ")
  873. // 编码数据
  874. if err := jsonEncoder.Encode(records); err != nil {
  875. log.Fatalf("无法写入 JSON 数据: %v", err)
  876. }
  877. log.Printf("数据成功写入压缩文件: %s", filePath)
  878. }