123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997 |
- package main
- import (
- "compress/gzip"
- "encoding/json"
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "math"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "unicode/utf8"
- )
- // dealXmData 处理中标企业注册地为厦门的数据;陈伟铭 需求
- /**
- 中标方企业注册地为厦门的企业数量一共是多少,可以分别看下22年度,23年度的两个数据总量
- */
- func dealXmData() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- MgoC := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "mixdata",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoC.InitPool()
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- pool := make(chan bool, 10) //处理协程
- wg := &sync.WaitGroup{}
- // 设置查询条件
- filter := bson.D{
- //{"publishtime", bson.M{"$gte": 1640966400, "$lt": 1672502400}},
- {"publishtime", bson.M{"$gte": 1640966400}},
- {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
- }
- selected := map[string]interface{}{
- "contenthtml": 0, // 0表示不返回该字段
- "attach_text": 0, // 0表示不返回该字段
- "detail": 0, // 0表示不返回该字段
- "purchasingsource": 0, // 0表示不返回该字段
- "jsondata": 0, // 0表示不返回该字段
- "package": 0, // 0表示不返回该字段
- }
- it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current :", count)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- return
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- return
- }
- swinner := util.ObjToString(tmp["s_winner"])
- if swinner == "" {
- return
- }
- if utf8.RuneCountInString(swinner) < 4 {
- return
- }
- publishtime := util.Int64All(tmp["publishtime"])
- projectName := util.ObjToString(tmp["projectname"])
- if strings.Contains(swinner, ",") {
- winners := strings.Split(swinner, ",")
- for _, v := range winners {
- if utf8.RuneCountInString(v) < 4 {
- continue
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
- if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
- continue
- }
- city := util.ObjToString((*da)["company_city"])
- if city != "厦门市" {
- continue
- }
- insert := map[string]interface{}{
- "winner": v,
- "credit_no": (*da)["credit_no"],
- "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
- "projectname": projectName,
- "company_type": (*da)["company_type"],
- "company_status": (*da)["company_status"],
- "company_area": (*da)["company_area"],
- "company_city": (*da)["company_city"],
- "year": time.Unix(publishtime, 0).Year(),
- }
- Mgo.Save("wcc_xiamen_winner", insert)
- }
- }
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
- if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
- return
- }
- city := util.ObjToString((*da)["company_city"])
- if city != "厦门市" {
- return
- }
- insert := map[string]interface{}{
- "winner": swinner,
- "credit_no": (*da)["credit_no"],
- "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
- "projectname": projectName,
- "company_type": (*da)["company_type"],
- "company_status": (*da)["company_status"],
- "company_area": (*da)["company_area"],
- "company_city": (*da)["company_city"],
- "year": time.Unix(publishtime, 0).Year(),
- }
- Mgo.Save("wcc_xiamen_winner", insert)
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Println("结束")
- }
- // CountBidamount 统计厦门中标单位,中标金额总数
- func CountBidamount() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- var BidMap = make(map[string]bool)
- it := sess.DB("qfw").C("wcc_xiamen_winner").Find(nil).Select(nil).Iter()
- count := 0
- var total = float64(0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- biddingID := util.ObjToString(tmp["bidding_id"])
- if BidMap[biddingID] {
- continue
- }
- data, _ := Mgo.FindById("bidding", biddingID, nil)
- BidMap[biddingID] = true
- bid := util.Float64All((*data)["bidamount"])
- total += bid
- tmp = make(map[string]interface{})
- }
- log.Println("total", total)
- }
- // exportXiaMenJson 导出厦门数据,JSON格式导出
- func exportXiaMenJson() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- MgoC := &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "mixdata",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoC.InitPool()
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- pool := make(chan bool, 10) //处理协程
- wg := &sync.WaitGroup{}
- // 设置查询条件
- filter := bson.D{
- //{"publishtime", bson.M{"$gte": 1640966400, "$lt": 1672502400}},
- {"publishtime", bson.M{"$gte": 1609430400}}, //2021-1-1
- {"subtype", bson.M{"$in": []string{"中标", "成交", "合同", "单一"}}},
- }
- selected := map[string]interface{}{
- "detail": 1,
- "href": 1,
- "title": 1,
- "publishtime": 1,
- "area": 1,
- "bidopentime": 1,
- "budget": 1,
- "buyer": 1,
- "bidamount": 1,
- "buyertel": 1,
- "buyerperson": 1,
- "city": 1,
- "subtype": 1,
- "projectname": 1,
- "projectcode": 1,
- "projectscope": 1,
- "agency": 1,
- "s_winner": 1,
- "winnerperson ": 1,
- "winnertel ": 1,
- }
- it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current :", count)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- // 创建一个新的map用于goroutine,避免重用
- docCopy := make(map[string]interface{})
- for k, v := range tmp {
- docCopy[k] = v
- }
- docCopy["id"] = mongodb.BsonIdToSId(tmp["_id"])
- docCopy["jybxhref"] = GetJyURLByID(mongodb.BsonIdToSId(tmp["_id"]))
- //
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- return
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- return
- }
- swinner := util.ObjToString(tmp["s_winner"])
- if swinner == "" {
- return
- }
- if utf8.RuneCountInString(swinner) < 4 {
- return
- }
- if strings.Contains(swinner, ",") {
- winner_credit_no_arr := make([]string, 0)
- var personMap = make([]string, 0)
- var phoneMap = make([]string, 0)
- var emaolMap = make([]string, 0)
- winners := strings.Split(swinner, ",")
- for _, v := range winners {
- if utf8.RuneCountInString(v) < 4 {
- continue
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
- winner_credit_no_arr = append(winner_credit_no_arr, util.ObjToString((*da)["credit_no"]))
- //2.联系人,连线方式
- if reports, ok := (*da)["annual_reports"]; ok {
- if rs, ok := reports.([]interface{}); ok {
- // 2. 初始化变量来保存最新的年份及对应的报告信息
- var latestReport map[string]interface{}
- latestYear := math.MinInt // 初始设置为最小值
- // 3. 遍历年度报告
- for _, report := range rs {
- if r, ok := report.(map[string]interface{}); ok {
- // 获取报告年份
- if year, ok := r["report_year"].(float64); ok {
- reportYear := int(year) // 转换为int
- if reportYear > latestYear {
- latestYear = reportYear
- latestReport = r
- }
- }
- }
- }
- // 4. 输出最新的报告信息
- if latestReport != nil {
- personMap = append(personMap, util.ObjToString(latestReport["operator_name"]))
- phoneMap = append(phoneMap, util.ObjToString(latestReport["company_phone"]))
- emaolMap = append(emaolMap, util.ObjToString(latestReport["company_email"]))
- //fmt.Println("最新的报告信息:", latestReport)
- }
- }
- }
- }
- }
- docCopy["winner_credit_no"] = strings.Join(winner_credit_no_arr, ",")
- docCopy["legal_person"] = strings.Join(personMap, ",")
- docCopy["company_phone"] = strings.Join(phoneMap, ",")
- docCopy["company_email"] = strings.Join(emaolMap, ",")
- Mgo.Save("wcc_xiamen_winner_1221", docCopy)
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
- docCopy["winner_credit_no"] = util.ObjToString((*da)["credit_no"])
- // 1. 检查是否存在 "annual_reports" 字段,并且它是一个切片类型
- if reports, ok := (*da)["annual_reports"]; ok {
- if rs, ok := reports.([]interface{}); ok {
- // 2. 初始化变量来保存最新的年份及对应的报告信息
- var latestReport map[string]interface{}
- latestYear := math.MinInt // 初始设置为最小值
- // 3. 遍历年度报告
- for _, report := range rs {
- if r, ok := report.(map[string]interface{}); ok {
- // 获取报告年份
- if year, ok := r["report_year"].(float64); ok {
- reportYear := int(year) // 转换为int
- if reportYear > latestYear {
- latestYear = reportYear
- latestReport = r
- }
- }
- }
- }
- // 4. 输出最新的报告信息
- if latestReport != nil {
- docCopy["legal_person"] = latestReport["operator_name"]
- docCopy["company_phone"] = latestReport["company_phone"]
- docCopy["company_email"] = latestReport["company_email"]
- //fmt.Println("最新的报告信息:", latestReport)
- }
- }
- }
- //
- Mgo.Save("wcc_xiamen_winner_1221", docCopy)
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Println("结束")
- }
- const MaxRecordsPerFile = 1000000 // 每个文件的最大记录数
- // exportXiaMenJsonFile 导出JSON文件
- func exportXiaMenJsonFile() {
- // 配置文件夹路径
- outputDir := "./output" // 你可以根据需要设置这个路径
- // 创建文件夹,如果不存在的话
- if _, err := os.Stat(outputDir); os.IsNotExist(err) {
- err := os.MkdirAll(outputDir, os.ModePerm)
- if err != nil {
- log.Fatalf("无法创建文件夹: %v", err)
- }
- }
- // 模拟数据和其他初始化
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- MgoC := &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "mixdata",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoC.InitPool()
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- pool := make(chan bool, 10) // 处理协程
- wg := &sync.WaitGroup{}
- // 用于存储数据
- var allRecords []map[string]interface{}
- var mu sync.Mutex // 用于同步访问 allRecords
- // 设置查询条件
- filter := bson.D{
- {"publishtime", bson.M{"$gte": 1609430400}},
- {"subtype", bson.M{"$in": []string{"中标", "成交", "合同", "单一"}}},
- }
- selected := map[string]interface{}{
- "detail": 1,
- "href": 1,
- "title": 1,
- "publishtime": 1,
- "area": 1,
- "bidopentime": 1,
- "budget": 1,
- "buyer": 1,
- "bidamount": 1,
- "buyertel": 1,
- "buyerperson": 1,
- "city": 1,
- "subtype": 1,
- "projectname": 1,
- "projectcode": 1,
- "projectscope": 1,
- "agency": 1,
- "s_winner": 1,
- "winnerperson": 1,
- "winnertel": 1,
- "extracttype": 1,
- "sensitive": 1,
- }
- SE := util.SimpleEncrypt{Key: "topJYBX2019"}
- curren := 0
- it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Limit(101000).Iter()
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current :", count, tmp["title"], tmp["_id"], curren)
- }
- // 过滤敏感数据
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" {
- continue
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- continue
- }
- mu.Lock()
- // 每当累积到 MaxRecordsPerFile 条数据时,保存文件并清空 allRecords
- if len(allRecords) == MaxRecordsPerFile {
- log.Println("len allRecords", len(allRecords), curren)
- saveToFile2(outputDir, allRecords)
- allRecords = nil // 清空数据
- }
- mu.Unlock()
- // 创建一个新的map用于goroutine,避免重用
- docCopy2 := make(map[string]interface{})
- for k, v := range tmp {
- docCopy2[k] = v
- }
- delete(docCopy2, "sensitive")
- delete(docCopy2, "extracttype")
- curren++
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- docCopy := make(map[string]interface{})
- for k, v := range tmp {
- docCopy[k] = v
- }
- docCopy["id"] = SE.EncodeString(mongodb.BsonIdToSId(tmp["_id"]))
- docCopy["jybxhref"] = GetJyURLByID(mongodb.BsonIdToSId(tmp["_id"]))
- // 合并数据
- swinner := util.ObjToString(tmp["s_winner"])
- if strings.Contains(swinner, ",") {
- winner_credit_no_arr := make([]string, 0)
- var personMap = make([]string, 0)
- var phoneMap = make([]string, 0)
- var emaolMap = make([]string, 0)
- winners := strings.Split(swinner, ",")
- for _, v := range winners {
- if utf8.RuneCountInString(v) < 4 {
- continue
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
- winner_credit_no_arr = append(winner_credit_no_arr, util.ObjToString((*da)["credit_no"]))
- personMap = append(personMap, util.ObjToString((*da)["legal_person"]))
- //2.联系人,连线方式
- if reports, ok := (*da)["annual_reports"]; ok {
- if rs, ok := reports.([]interface{}); ok {
- // 2. 初始化变量来保存最新的年份及对应的报告信息
- var latestReport map[string]interface{}
- latestYear := math.MinInt // 初始设置为最小值
- // 3. 遍历年度报告
- for _, report := range rs {
- if r, ok := report.(map[string]interface{}); ok {
- // 获取报告年份
- if year, ok := r["report_year"].(float64); ok {
- reportYear := int(year) // 转换为int
- if reportYear > latestYear {
- latestYear = reportYear
- latestReport = r
- }
- }
- }
- }
- // 4. 输出最新的报告信息
- if latestReport != nil {
- phoneMap = append(phoneMap, util.ObjToString(latestReport["company_phone"]))
- emaolMap = append(emaolMap, util.ObjToString(latestReport["company_email"]))
- //fmt.Println("最新的报告信息:", latestReport)
- }
- }
- }
- }
- }
- docCopy["winner_credit_no"] = strings.Join(winner_credit_no_arr, ",")
- docCopy["legal_person"] = strings.Join(personMap, ",")
- docCopy["company_phone"] = strings.Join(phoneMap, ",")
- docCopy["company_email"] = strings.Join(emaolMap, ",")
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
- docCopy["winner_credit_no"] = util.ObjToString((*da)["credit_no"])
- docCopy["legal_person"] = util.ObjToString((*da)["legal_person"])
- // 1. 检查是否存在 "annual_reports" 字段,并且它是一个切片类型
- if reports, ok := (*da)["annual_reports"]; ok {
- if rs, ok := reports.([]interface{}); ok {
- // 2. 初始化变量来保存最新的年份及对应的报告信息
- var latestReport map[string]interface{}
- latestYear := math.MinInt // 初始设置为最小值
- // 3. 遍历年度报告
- for _, report := range rs {
- if r, ok := report.(map[string]interface{}); ok {
- // 获取报告年份
- if year, ok := r["report_year"].(float64); ok {
- reportYear := int(year) // 转换为int
- if reportYear > latestYear {
- latestYear = reportYear
- latestReport = r
- }
- }
- }
- }
- // 4. 输出最新的报告信息
- if latestReport != nil {
- docCopy["company_phone"] = latestReport["company_phone"]
- docCopy["company_email"] = latestReport["company_email"]
- }
- }
- }
- }
- // 加锁,确保并发安全地修改 allRecords
- //Mgo.SaveByOriID("wcc_xiamen_winner_1227", docCopy)
- mu.Lock()
- delete(docCopy, "_id")
- allRecords = append(allRecords, docCopy)
- mu.Unlock()
- }(docCopy2)
- tmp = make(map[string]interface{})
- }
- // 等待所有协程处理完
- wg.Wait()
- // 如果还有剩余的数据,保存最后一个文件
- if len(allRecords) > 0 {
- log.Println("len allRecords", len(allRecords), curren)
- saveToFile2(outputDir, allRecords)
- }
- log.Println("所有数据导出完成", curren)
- }
- // 将数据保存到文件
- func saveToFile2(outputDir string, records []map[string]interface{}) {
- // 创建文件名,包含时间戳
- fileName := fmt.Sprintf("%s_%s.json.gz", "xiaMenData", time.Now().Format("20060102_150405"))
- filePath := filepath.Join(outputDir, fileName)
- log.Println("开始写入文件:", fileName, "数据总量", len(records))
- // 创建文件
- file, err := os.Create(filePath)
- if err != nil {
- log.Fatalf("无法创建文件: %v", err)
- }
- defer file.Close()
- // 使用gzip压缩
- gzipWriter := gzip.NewWriter(file)
- defer gzipWriter.Close()
- // 创建 JSON 编码器
- jsonEncoder := json.NewEncoder(gzipWriter)
- jsonEncoder.SetIndent("", " ")
- // 批量写入数据,逐条处理并写入文件
- batchSize := 10000 // 每次写入的数据条数
- for i := 0; i < len(records); i += batchSize {
- // 计算本次要写入的批次数据
- end := i + batchSize
- if end > len(records) {
- end = len(records)
- }
- batch := records[i:end] // 获取当前批次数据
- // 编码数据并写入压缩文件
- if err := jsonEncoder.Encode(batch); err != nil {
- log.Fatalf("无法写入 JSON 数据: %v", err)
- }
- // 打印进度,确保文件写入及时反馈
- log.Printf("成功写入 %d/%d 条数据", end, len(records))
- }
- log.Printf("数据成功写入压缩文件: %s", filePath)
- }
- // exportXiaMenJsonFile 导出JSON文件
- func exportXiaMenJsonFile2() {
- // 配置文件夹路径
- var outputDir = "./output" // 你可以根据需要设置这个路径
- // 创建文件夹,如果不存在的话
- if _, err := os.Stat(outputDir); os.IsNotExist(err) {
- err := os.MkdirAll(outputDir, os.ModePerm)
- if err != nil {
- log.Fatalf("无法创建文件夹: %v", err)
- }
- }
- // 模拟数据和其他初始化
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- Mgo.InitPool()
- MgoC := &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- DbName: "mixdata",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- MgoC.InitPool()
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- pool := make(chan bool, 10) // 处理协程
- wg := &sync.WaitGroup{}
- // 用于存储数据
- var allRecords []map[string]interface{}
- var mu sync.Mutex // 用于同步访问 allRecords
- // 设置查询条件
- filter := bson.D{
- {"publishtime", bson.M{"$gte": 1609430400}},
- {"subtype", bson.M{"$in": []string{"中标", "成交", "合同", "单一"}}},
- }
- selected := map[string]interface{}{
- "detail": 1,
- "href": 1,
- "title": 1,
- "publishtime": 1,
- "area": 1,
- "bidopentime": 1,
- "budget": 1,
- "buyer": 1,
- "bidamount": 1,
- "buyertel": 1,
- "buyerperson": 1,
- "city": 1,
- "subtype": 1,
- "projectname": 1,
- "projectcode": 1,
- "projectscope": 1,
- "agency": 1,
- "s_winner": 1,
- "winnerperson": 1,
- "winnertel": 1,
- "extracttype": 1,
- "sensitive": 1,
- }
- SE := util.SimpleEncrypt{Key: "topJYBX2019"}
- curren := 0
- it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
- count := 0
- // 使用通道来传递记录
- dataChannel := make(chan map[string]interface{}, 10000)
- // 保存数据的协程
- go func() {
- for records := range dataChannel {
- mu.Lock()
- allRecords = append(allRecords, records)
- // 每当累积到 MaxRecordsPerFile 条数据时,保存文件并清空 allRecords
- if len(allRecords) >= MaxRecordsPerFile {
- log.Println("保存文件,记录数:", len(allRecords))
- saveToFile3(outputDir, allRecords)
- allRecords = nil // 清空数据
- }
- mu.Unlock()
- }
- // 在通道关闭后,检查是否有剩余数据需要保存
- mu.Lock()
- if len(allRecords) > 0 {
- log.Println("保存最后一批数据,记录数:", len(allRecords))
- saveToFile3(outputDir, allRecords)
- allRecords = nil // 清空数据
- }
- mu.Unlock()
- }()
- //读取数据库
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current :", count, tmp["title"], tmp["_id"], curren)
- }
- // 过滤敏感数据
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" {
- continue
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- continue
- }
- // 创建一个新的map用于goroutine,避免重用
- docCopy2 := make(map[string]interface{})
- for k, v := range tmp {
- docCopy2[k] = v
- }
- delete(docCopy2, "sensitive")
- delete(docCopy2, "extracttype")
- curren++
- // 启动一个新的协程来处理每条数据
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- docCopy := make(map[string]interface{})
- for k, v := range tmp {
- docCopy[k] = v
- }
- docCopy["id"] = SE.EncodeString(mongodb.BsonIdToSId(tmp["_id"]))
- docCopy["jybxhref"] = GetJyURLByID(mongodb.BsonIdToSId(tmp["_id"]))
- // 合并数据
- swinner := util.ObjToString(tmp["s_winner"])
- if strings.Contains(swinner, ",") {
- // 处理多个赢家的情况
- winner_credit_no_arr := make([]string, 0)
- var personMap = make([]string, 0)
- var phoneMap = make([]string, 0)
- var emaolMap = make([]string, 0)
- winners := strings.Split(swinner, ",")
- for _, v := range winners {
- if utf8.RuneCountInString(v) < 4 {
- continue
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
- winner_credit_no_arr = append(winner_credit_no_arr, util.ObjToString((*da)["credit_no"]))
- personMap = append(personMap, util.ObjToString((*da)["legal_person"]))
- //2.联系人,连线方式
- if reports, ok := (*da)["annual_reports"]; ok {
- if rs, ok := reports.([]interface{}); ok {
- // 2. 初始化变量来保存最新的年份及对应的报告信息
- var latestReport map[string]interface{}
- latestYear := math.MinInt // 初始设置为最小值
- // 3. 遍历年度报告
- for _, report := range rs {
- if r, ok := report.(map[string]interface{}); ok {
- // 获取报告年份
- if year, ok := r["report_year"].(float64); ok {
- reportYear := int(year) // 转换为int
- if reportYear > latestYear {
- latestYear = reportYear
- latestReport = r
- }
- }
- }
- }
- // 4. 输出最新的报告信息
- if latestReport != nil {
- phoneMap = append(phoneMap, util.ObjToString(latestReport["company_phone"]))
- emaolMap = append(emaolMap, util.ObjToString(latestReport["company_email"]))
- }
- }
- }
- }
- }
- docCopy["winner_credit_no"] = strings.Join(winner_credit_no_arr, ",")
- docCopy["legal_person"] = strings.Join(personMap, ",")
- docCopy["company_phone"] = strings.Join(phoneMap, ",")
- docCopy["company_email"] = strings.Join(emaolMap, ",")
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
- docCopy["winner_credit_no"] = util.ObjToString((*da)["credit_no"])
- docCopy["legal_person"] = util.ObjToString((*da)["legal_person"])
- // 1. 检查是否存在 "annual_reports" 字段,并且它是一个切片类型
- if reports, ok := (*da)["annual_reports"]; ok {
- if rs, ok := reports.([]interface{}); ok {
- // 2. 初始化变量来保存最新的年份及对应的报告信息
- var latestReport map[string]interface{}
- latestYear := math.MinInt // 初始设置为最小值
- // 3. 遍历年度报告
- for _, report := range rs {
- if r, ok := report.(map[string]interface{}); ok {
- // 获取报告年份
- if year, ok := r["report_year"].(float64); ok {
- reportYear := int(year) // 转换为int
- if reportYear > latestYear {
- latestYear = reportYear
- latestReport = r
- }
- }
- }
- }
- // 4. 输出最新的报告信息
- if latestReport != nil {
- docCopy["company_phone"] = latestReport["company_phone"]
- docCopy["company_email"] = latestReport["company_email"]
- }
- }
- }
- }
- // 通过channel发送数据
- delete(docCopy, "_id")
- dataChannel <- docCopy
- }(docCopy2)
- tmp = make(map[string]interface{})
- }
- // 等待所有协程处理完
- wg.Wait()
- // 关闭channel,确保保存线程能够结束
- close(dataChannel)
- log.Println("所有数据导出完成", curren)
- select {}
- }
- func saveToFile3(outputDir string, records []map[string]interface{}) {
- // 创建文件名,包含时间戳
- fileName := fmt.Sprintf("%s_%s.json.gz", "xiaMenData", time.Now().Format("20060102_150405"))
- filePath := filepath.Join(outputDir, fileName)
- log.Println("开始写入文件:", fileName, "数据总量", len(records))
- // 创建文件
- file, err := os.Create(filePath)
- if err != nil {
- log.Fatalf("无法创建文件: %v", err)
- }
- defer file.Close()
- // 使用gzip压缩
- gzipWriter := gzip.NewWriter(file)
- defer gzipWriter.Close()
- // 创建 JSON 编码器
- jsonEncoder := json.NewEncoder(gzipWriter)
- // 不需要设置缩进,因为每条记录将是单独的 JSON 对象,通常不需要美化
- // jsonEncoder.SetIndent("", " ")
- // 批量写入数据,逐条处理并写入文件
- for k, record := range records {
- // 编码每条记录并写入压缩文件,每条记录一行
- if err := jsonEncoder.Encode(record); err != nil {
- log.Fatalf("无法写入 JSON 数据: %v", err)
- }
- if k%1000 == 0 {
- log.Println("成功写入文件行数:", k, fileName)
- }
- }
- log.Printf("数据成功写入压缩文件: %s", filePath)
- }
- // 将数据保存到文件
- func saveToFile(outputDir string, records []map[string]interface{}) {
- // 创建文件名,包含时间戳
- fileName := fmt.Sprintf("%s_%s.json.gz", "xiaMenData", time.Now().Format("20060102_150405"))
- filePath := filepath.Join(outputDir, fileName)
- // 创建文件
- file, err := os.Create(filePath)
- if err != nil {
- log.Fatalf("无法创建文件: %v", err)
- }
- defer file.Close()
- // 使用gzip压缩
- gzipWriter := gzip.NewWriter(file)
- defer gzipWriter.Close()
- // 创建 JSON 编码器
- jsonEncoder := json.NewEncoder(gzipWriter)
- jsonEncoder.SetIndent("", " ")
- // 编码数据
- if err := jsonEncoder.Encode(records); err != nil {
- log.Fatalf("无法写入 JSON 数据: %v", err)
- }
- log.Printf("数据成功写入压缩文件: %s", filePath)
- }
|