package main import ( "fmt" "github.com/xuri/excelize/v2" "go.mongodb.org/mongo-driver/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sort" "strings" "sync" "unicode/utf8" ) // getCompany 获取企业 func getCompany() { Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "mixdata", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() where := map[string]interface{}{ "company_area": map[string]interface{}{ "$in": []string{"北京", "上海", "浙江", "江苏", "广东"}, }, //"company_status": "存续", } defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //pool := make(chan bool, 10) //处理协程 //wg := &sync.WaitGroup{} selected := map[string]interface{}{"company_name": 1, "company_area": 1, "credit_no": 1, "company_status": 1, "company_city": 1} it := sess.DB("mixdata").C("qyxy_std").Find(&where).Select(&selected).Iter() log.Println("开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current: ", count) } if util.ObjToString(tmp["credit_no"]) == "" { continue } Mgo.SaveByOriID("wcc_bank_company", tmp) tmp = make(map[string]interface{}) } log.Println("结束") } // bankPOC bankPOC func bankPOC() { Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "qfw", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() MgoC := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "mixdata", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoC.InitPool() defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) pool := make(chan bool, 20) //处理协程 wg := &sync.WaitGroup{} //查询条件 // 设置查询条件 filter := bson.D{ {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}}, {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}}, } selected := map[string]interface{}{ "contenthtml": 0, // 0表示不返回该字段 "attach_text": 0, // 0表示不返回该字段 "detail": 0, // 0表示不返回该字段 "purchasingsource": 0, // 0表示不返回该字段 "jsondata": 0, // 0表示不返回该字段 "package": 0, // 0表示不返回该字段 } it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter() //total, _ := sess.DB("qfw").C("bidding").Find(filter).Count() //fmt.Println("开始", "总数是:", total) count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("CURRENT :", count) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) return } // 针对存量数据,重复数据不进索引 if util.IntAll(tmp["extracttype"]) == -1 { return } projectName := util.ObjToString(tmp["projectname"]) if strings.Contains(projectName, "非政府") { return } buyerclass := util.ObjToString(tmp["buyerclass"]) if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { return } swinner := util.ObjToString(tmp["s_winner"]) if swinner == "" { return } if utf8.RuneCountInString(swinner) < 4 { return } if strings.Contains(swinner, ",") { winners := strings.Split(swinner, ",") for _, v := range winners { if utf8.RuneCountInString(v) < 4 { continue } else { da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v}) if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" { return } area := util.ObjToString((*da)["company_area"]) areas := []string{"北京", "上海", "广东", "江苏", "浙江"} if !IsInStringArray(area, areas) { continue } if !IsInStringArray(area, areas) { continue } insert := map[string]interface{}{ "winner": v, "credit_no": (*da)["credit_no"], "bidding_id": mongodb.BsonIdToSId(tmp["_id"]), "projectname": projectName, "company_type": (*da)["company_type"], "company_status": (*da)["company_status"], "company_area": (*da)["company_area"], "company_city": (*da)["company_city"], } Mgo.Save("wcc_bank_winner_new", insert) } } } else { da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner}) if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" { return } area := util.ObjToString((*da)["company_area"]) areas := []string{"北京", "上海", "广东", "江苏", "浙江"} if !IsInStringArray(area, areas) { return } if !IsInStringArray(area, areas) { return } insert := map[string]interface{}{ "winner": swinner, "credit_no": (*da)["credit_no"], "bidding_id": mongodb.BsonIdToSId(tmp["_id"]), "projectname": projectName, } Mgo.Save("wcc_bank_winner", insert) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() fmt.Println("结束") } // bankWinnerStatistic 统计企业信息;企业出现数量 func bankWinnerStatistic() { Mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "qfw", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } Mgo.InitPool() MgoC := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", DbName: "mixdata", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoC.InitPool() defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) it := sess.DB("qfw").C("wcc_bank_winner").Find(nil).Select(nil).Iter() statisticMap := make(map[string]int) noMap := make(map[string]interface{}) count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } winner := util.ObjToString(tmp["winner"]) da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": winner}) if util.ObjToString((*da)["company_type"]) == "个体工商户" { continue } statisticMap[winner]++ noMap[winner] = tmp["credit_no"] } // log.Println("开始保存") for k, v := range statisticMap { insert := map[string]interface{}{ "winner": k, "num": v, "credit_no": noMap[k], } Mgo.Save("wcc_bank_winner_statistic_company", insert) } log.Println("保存结束") } // getPengWinner 根据 龚文华需求, 碰撞提供的企业名单,获取相关标讯信息 func getPengWinner() { Mgo := &mongodb.MongodbSim{ //MongodbAddr: "172.17.189.140:27080", MongodbAddr: "127.0.0.1:27083", DbName: "qfw", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", Direct: true, } Mgo.InitPool() f, err := excelize.OpenFile("政采测试100户名单0118.xlsx") if err != nil { fmt.Println(err) return } defer func() { f.Save() if err := f.Close(); err != nil { fmt.Println(err) } }() // 获取 Sheet1 上所有单元格 rows, err := f.GetRows("Sheet1") if err != nil { fmt.Println(err) return } for i := 1; i < len(rows); i++ { name := rows[i][0] where := map[string]interface{}{ "winner": name, } log.Println(name) datas, _ := Mgo.Find("wcc_bank_winner", where, nil, nil, false, -1, -1) //没有找到中标单位 标讯信息 if len(*datas) == 0 { f.SetCellValue("Sheet1", fmt.Sprintf("C%v", i+1), "无") } else { f.SetCellValue("Sheet1", fmt.Sprintf("C%v", i+1), "有") } //if len(*datas) > 0 { // for k, _ := range *datas { // biddingID := util.ObjToString((*datas)[k]["bidding_id"]) // biddingData, _ := Mgo.FindById("bidding", biddingID, nil) // //fmt.Println(biddingData) // Mgo.SaveByOriID("wcc_bank_winner_bidding", biddingData) // } //} } log.Println("over") } /** // exportPocN 导出合作渠道银行POC需求;龚文华 需求;函数已作废 func exportPocN() { //InitMgo() username := "SJZY_RWbid_ES" password := "SJZY@B4i4D5e6S" addr := "172.17.189.140:27080" //addr := "127.0.0.1:27083" direct := true if !strings.Contains(addr, "127") { direct = false } // Escape special characters in username and password escapedUsername := url.QueryEscape(username) escapedPassword := url.QueryEscape(password) // Construct MongoDB connection string urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr) clientOptions := options.Client().ApplyURI(urls).SetDirect(direct) //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083") // 连接到MongoDB client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { log.Fatal(err) } defer func() { if err := client.Disconnect(context.TODO()); err != nil { log.Fatal(err) } }() // 获取要查询的集合 sourceCollection := client.Database("qfw").Collection("bidding") //写入新表 targetCollection := client.Database("qfw").Collection("wcc_bank_poc_new") // 设置查询条件 filter := bson.D{ {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}}, {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}}, } // 设置投影,排除 contenthtml 字段 projection := bson.D{ {"contenthtml", 0}, // 0表示不返回该字段 {"attach_text", 0}, // 0表示不返回该字段 {"detail", 0}, // 0表示不返回该字段 {"purchasingsource", 0}, // 0表示不返回该字段 {"jsondata", 0}, // 0表示不返回该字段 {"package", 0}, // 0表示不返回该字段 } // 获取查询结果的总文档数 totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil) if err != nil { log.Fatal(err) } log.Println("总数量:", totalCount) findOptions := options.Find().SetProjection(projection) // 执行查询 cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions) if err != nil { log.Fatal(err) } defer cursor.Close(context.TODO()) count := 0 // 迭代查询结果 for cursor.Next(context.TODO()) { var result = make(map[string]interface{}) if err := cursor.Decode(&result); err != nil { log.Fatal(err) } count++ if count%10000 == 0 { log.Println("current count", count) } //过滤重复数据 if util.IntAll(result["extracttype"]) != 1 { continue } // 处理查询结果 area := util.ObjToString(result["area"]) areas := []string{"北京", "上海", "广东", "江苏", "浙江"} if !IsInStringArray(area, areas) { continue } projectName := util.ObjToString(result["projectname"]) if strings.Contains(projectName, "非政府") { continue } buyerclass := util.ObjToString(result["buyerclass"]) if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { continue } //存入新表 _, err := targetCollection.InsertOne(context.TODO(), result) if err != nil { log.Fatal(err) } if err := cursor.Err(); err != nil { log.Fatal(err) } } log.Println("over") } // staticBank 统计中标单位中标项目数量;函数已作废 func staticBank() { username := "SJZY_RWbid_ES" password := "SJZY@B4i4D5e6S" addr := "172.17.189.140:27080" //addr := "127.0.0.1:27083" direct := true if !strings.Contains(addr, "127") { direct = false } escapedUsername := url.QueryEscape(username) escapedPassword := url.QueryEscape(password) urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr) clientOptions := options.Client().ApplyURI(urls).SetDirect(direct) // 连接到MongoDB client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { log.Fatal(err) } defer func() { if err := client.Disconnect(context.TODO()); err != nil { log.Fatal(err) } }() statisticMap := make(map[string]int) // 获取要查询的集合 sourceCollection := client.Database("qfw").Collection("wcc_bank_poc3") //写入新表 targetCollection := client.Database("qfw").Collection("wcc_bank_poc3_statistic") // 设置查询条件 filter := bson.D{ //{"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}}, //{"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}}, } // 设置投影,排除 contenthtml 字段 projection := bson.D{ {"s_winner", 1}, // 0表示不返回该字段 {"projectname", 1}, // 0表示不返回该字段 } // 获取查询结果的总文档数 totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil) if err != nil { log.Fatal(err) } log.Println("总数量:", totalCount) findOptions := options.Find().SetProjection(projection) // 执行查询 cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions) if err != nil { log.Fatal(err) } defer cursor.Close(context.TODO()) //凭安企业库 MgoC := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.181:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } MgoC.InitPool() count := 0 // 迭代查询结果 for cursor.Next(context.TODO()) { var result = make(map[string]interface{}) if err := cursor.Decode(&result); err != nil { log.Fatal(err) } count++ if count%10000 == 0 { log.Println("current count", count) } projectName := util.ObjToString(result["projectname"]) if projectName == "" { continue } winner := util.ObjToString(result["s_winner"]) if winner == "" { continue } if strings.Contains(winner, ",") { winners := strings.Split(winner, ",") for _, v := range winners { statisticMap[v]++ } } } for k, v := range statisticMap { insert := map[string]interface{}{ "winner": k, "num": v, } //存入新表 _, err := targetCollection.InsertOne(context.TODO(), insert) if err != nil { log.Fatal(err) } } log.Println("over") } // exportPoc 导出合作渠道银行POC需求;龚文华 需求;函数作废 func exportPoc() { //InitMgo() username := "SJZY_RWbid_ES" password := "SJZY@B4i4D5e6S" addr := "172.17.189.140:27080" //addr := "127.0.0.1:27083" direct := true //url := fmt.Sprintf("mongodb://%s:%s@%s", username, password, addr) //clientOptions := options.Client().ApplyURI(url) if !strings.Contains(addr, "127") { direct = false } // Escape special characters in username and password escapedUsername := url.QueryEscape(username) escapedPassword := url.QueryEscape(password) // Construct MongoDB connection string urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr) clientOptions := options.Client().ApplyURI(urls).SetDirect(direct) //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083") // 连接到MongoDB client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { log.Fatal(err) } defer func() { if err := client.Disconnect(context.TODO()); err != nil { log.Fatal(err) } }() // 获取要查询的集合 sourceCollection := client.Database("qfw").Collection("bidding") //写入新表 targetCollection := client.Database("qfw").Collection("wcc_bank_poc") // 设置查询条件 filter := bson.D{ {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}}, {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}}, } // 设置投影,排除 contenthtml 字段 projection := bson.D{ {"contenthtml", 0}, // 0表示不返回该字段 {"attach_text", 0}, // 0表示不返回该字段 } // 获取查询结果的总文档数 totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil) if err != nil { log.Fatal(err) } log.Println("总数量:", totalCount) // 设置每批次查询的文档数量和当前批次数 batchSize := 1000000 currentBatch := 1 //当前处理批次 recordCount := 0 // 执行分批查询和导出 for skip := 0; skip < int(totalCount); skip += batchSize { // 设置查询选项,包括批次大小和偏移量,过滤查询字段 findOptions := options.Find().SetSkip(int64(skip)).SetLimit(int64(batchSize)).SetProjection(projection) // 执行查询 cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions) if err != nil { log.Fatal(err) } defer cursor.Close(context.TODO()) // 迭代查询结果 for cursor.Next(context.TODO()) { var result = make(map[string]interface{}) if err := cursor.Decode(&result); err != nil { log.Fatal(err) } recordCount++ if recordCount%10000 == 0 { log.Printf("已处理 %d 批次 %d 条记录\n", currentBatch, recordCount) } //过滤重复数据 if util.IntAll(result["extracttype"]) != 1 { continue } // 处理查询结果 area := util.ObjToString(result["area"]) areas := []string{"北京", "上海", "广东", "江苏", "浙江"} if !IsInStringArray(area, areas) { continue } projectName := util.ObjToString(result["projectname"]) if strings.Contains(projectName, "非政府") { continue } buyerclass := util.ObjToString(result["buyerclass"]) if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { continue } //存入新表 _, err := targetCollection.InsertOne(context.TODO(), result) if err != nil { log.Fatal(err) } } if err := cursor.Err(); err != nil { log.Fatal(err) } log.Printf("批次 %d 完成\n", currentBatch) // 增加当前批次数 currentBatch++ // 可选:如果查询结果较大,可以使用游标的超时机制,以防止游标在处理较大数据集时被自动关闭 //time.Sleep(5 * time.Second) } log.Println("over") } **/ // IsInStringArray 判断数组中是否存在字符串 func IsInStringArray(str string, arr []string) bool { // 先对字符串数组进行排序 sort.Strings(arr) // 使用二分查找算法查找字符串 pos := sort.SearchStrings(arr, str) // 如果找到了则返回 true,否则返回 false return pos < len(arr) && arr[pos] == str }