|
- package main
- import (
- "fmt"
- "github.com/xuri/excelize/v2"
- "go.mongodb.org/mongo-driver/bson"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "sort"
- "strings"
- "sync"
- "unicode/utf8"
- )
- // getCompany 获取企业
- func getCompany() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "mixdata",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- where := map[string]interface{}{
- "company_area": map[string]interface{}{
- "$in": []string{"北京", "上海", "浙江", "江苏", "广东"},
- },
- //"company_status": "存续",
- }
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- //pool := make(chan bool, 10) //处理协程
- //wg := &sync.WaitGroup{}
- selected := map[string]interface{}{"company_name": 1, "company_area": 1, "credit_no": 1, "company_status": 1, "company_city": 1}
- it := sess.DB("mixdata").C("qyxy_std").Find(&where).Select(&selected).Iter()
- log.Println("开始")
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current: ", count)
- }
- if util.ObjToString(tmp["credit_no"]) == "" {
- continue
- }
- Mgo.SaveByOriID("wcc_bank_company", tmp)
- tmp = make(map[string]interface{})
- }
- log.Println("结束")
- }
- // bankPOC bankPOC
- func bankPOC() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- MgoC := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "mixdata",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoC.InitPool()
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- pool := make(chan bool, 20) //处理协程
- wg := &sync.WaitGroup{}
- //查询条件
- // 设置查询条件
- filter := bson.D{
- {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
- {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
- }
- selected := map[string]interface{}{
- "contenthtml": 0, // 0表示不返回该字段
- "attach_text": 0, // 0表示不返回该字段
- "detail": 0, // 0表示不返回该字段
- "purchasingsource": 0, // 0表示不返回该字段
- "jsondata": 0, // 0表示不返回该字段
- "package": 0, // 0表示不返回该字段
- }
- it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
- //total, _ := sess.DB("qfw").C("bidding").Find(filter).Count()
- //fmt.Println("开始", "总数是:", total)
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("CURRENT :", count)
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
- tmp = make(map[string]interface{})
- return
- }
- // 针对存量数据,重复数据不进索引
- if util.IntAll(tmp["extracttype"]) == -1 {
- return
- }
- projectName := util.ObjToString(tmp["projectname"])
- if strings.Contains(projectName, "非政府") {
- return
- }
- buyerclass := util.ObjToString(tmp["buyerclass"])
- if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
- return
- }
- swinner := util.ObjToString(tmp["s_winner"])
- if swinner == "" {
- return
- }
- if utf8.RuneCountInString(swinner) < 4 {
- return
- }
- if strings.Contains(swinner, ",") {
- winners := strings.Split(swinner, ",")
- for _, v := range winners {
- if utf8.RuneCountInString(v) < 4 {
- continue
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
- if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
- return
- }
- area := util.ObjToString((*da)["company_area"])
- areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
- if !IsInStringArray(area, areas) {
- continue
- }
- if !IsInStringArray(area, areas) {
- continue
- }
- insert := map[string]interface{}{
- "winner": v,
- "credit_no": (*da)["credit_no"],
- "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
- "projectname": projectName,
- "company_type": (*da)["company_type"],
- "company_status": (*da)["company_status"],
- "company_area": (*da)["company_area"],
- "company_city": (*da)["company_city"],
- }
- Mgo.Save("wcc_bank_winner_new", insert)
- }
- }
- } else {
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
- if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
- return
- }
- area := util.ObjToString((*da)["company_area"])
- areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
- if !IsInStringArray(area, areas) {
- return
- }
- if !IsInStringArray(area, areas) {
- return
- }
- insert := map[string]interface{}{
- "winner": swinner,
- "credit_no": (*da)["credit_no"],
- "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
- "projectname": projectName,
- }
- Mgo.Save("wcc_bank_winner", insert)
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- fmt.Println("结束")
- }
- // bankWinnerStatistic 统计企业信息;企业出现数量
- func bankWinnerStatistic() {
- Mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- Mgo.InitPool()
- MgoC := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- DbName: "mixdata",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoC.InitPool()
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- it := sess.DB("qfw").C("wcc_bank_winner").Find(nil).Select(nil).Iter()
- statisticMap := make(map[string]int)
- noMap := make(map[string]interface{})
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- winner := util.ObjToString(tmp["winner"])
- da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": winner})
- if util.ObjToString((*da)["company_type"]) == "个体工商户" {
- continue
- }
- statisticMap[winner]++
- noMap[winner] = tmp["credit_no"]
- }
- //
- log.Println("开始保存")
- for k, v := range statisticMap {
- insert := map[string]interface{}{
- "winner": k,
- "num": v,
- "credit_no": noMap[k],
- }
- Mgo.Save("wcc_bank_winner_statistic_company", insert)
- }
- log.Println("保存结束")
- }
- // getPengWinner 根据 龚文华需求, 碰撞提供的企业名单,获取相关标讯信息
- func getPengWinner() {
- Mgo := &mongodb.MongodbSim{
- //MongodbAddr: "172.17.189.140:27080",
- MongodbAddr: "127.0.0.1:27083",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- Direct: true,
- }
- Mgo.InitPool()
- f, err := excelize.OpenFile("政采测试100户名单0118.xlsx")
- if err != nil {
- fmt.Println(err)
- return
- }
- defer func() {
- f.Save()
- if err := f.Close(); err != nil {
- fmt.Println(err)
- }
- }()
- // 获取 Sheet1 上所有单元格
- rows, err := f.GetRows("Sheet1")
- if err != nil {
- fmt.Println(err)
- return
- }
- for i := 1; i < len(rows); i++ {
- name := rows[i][0]
- where := map[string]interface{}{
- "winner": name,
- }
- log.Println(name)
- datas, _ := Mgo.Find("wcc_bank_winner", where, nil, nil, false, -1, -1)
- //没有找到中标单位 标讯信息
- if len(*datas) == 0 {
- f.SetCellValue("Sheet1", fmt.Sprintf("C%v", i+1), "无")
- } else {
- f.SetCellValue("Sheet1", fmt.Sprintf("C%v", i+1), "有")
- }
- //if len(*datas) > 0 {
- // for k, _ := range *datas {
- // biddingID := util.ObjToString((*datas)[k]["bidding_id"])
- // biddingData, _ := Mgo.FindById("bidding", biddingID, nil)
- // //fmt.Println(biddingData)
- // Mgo.SaveByOriID("wcc_bank_winner_bidding", biddingData)
- // }
- //}
- }
- log.Println("over")
- }
- /**
- // exportPocN 导出合作渠道银行POC需求;龚文华 需求;函数已作废
- func exportPocN() {
- //InitMgo()
- username := "SJZY_RWbid_ES"
- password := "SJZY@B4i4D5e6S"
- addr := "172.17.189.140:27080"
- //addr := "127.0.0.1:27083"
- direct := true
- if !strings.Contains(addr, "127") {
- direct = false
- }
- // Escape special characters in username and password
- escapedUsername := url.QueryEscape(username)
- escapedPassword := url.QueryEscape(password)
- // Construct MongoDB connection string
- urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr)
- clientOptions := options.Client().ApplyURI(urls).SetDirect(direct)
- //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083")
- // 连接到MongoDB
- client, err := mongo.Connect(context.TODO(), clientOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer func() {
- if err := client.Disconnect(context.TODO()); err != nil {
- log.Fatal(err)
- }
- }()
- // 获取要查询的集合
- sourceCollection := client.Database("qfw").Collection("bidding")
- //写入新表
- targetCollection := client.Database("qfw").Collection("wcc_bank_poc_new")
- // 设置查询条件
- filter := bson.D{
- {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
- {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
- }
- // 设置投影,排除 contenthtml 字段
- projection := bson.D{
- {"contenthtml", 0}, // 0表示不返回该字段
- {"attach_text", 0}, // 0表示不返回该字段
- {"detail", 0}, // 0表示不返回该字段
- {"purchasingsource", 0}, // 0表示不返回该字段
- {"jsondata", 0}, // 0表示不返回该字段
- {"package", 0}, // 0表示不返回该字段
- }
- // 获取查询结果的总文档数
- totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil)
- if err != nil {
- log.Fatal(err)
- }
- log.Println("总数量:", totalCount)
- findOptions := options.Find().SetProjection(projection)
- // 执行查询
- cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer cursor.Close(context.TODO())
- count := 0
- // 迭代查询结果
- for cursor.Next(context.TODO()) {
- var result = make(map[string]interface{})
- if err := cursor.Decode(&result); err != nil {
- log.Fatal(err)
- }
- count++
- if count%10000 == 0 {
- log.Println("current count", count)
- }
- //过滤重复数据
- if util.IntAll(result["extracttype"]) != 1 {
- continue
- }
- // 处理查询结果
- area := util.ObjToString(result["area"])
- areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
- if !IsInStringArray(area, areas) {
- continue
- }
- projectName := util.ObjToString(result["projectname"])
- if strings.Contains(projectName, "非政府") {
- continue
- }
- buyerclass := util.ObjToString(result["buyerclass"])
- if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
- continue
- }
- //存入新表
- _, err := targetCollection.InsertOne(context.TODO(), result)
- if err != nil {
- log.Fatal(err)
- }
- if err := cursor.Err(); err != nil {
- log.Fatal(err)
- }
- }
- log.Println("over")
- }
- // staticBank 统计中标单位中标项目数量;函数已作废
- func staticBank() {
- username := "SJZY_RWbid_ES"
- password := "SJZY@B4i4D5e6S"
- addr := "172.17.189.140:27080"
- //addr := "127.0.0.1:27083"
- direct := true
- if !strings.Contains(addr, "127") {
- direct = false
- }
- escapedUsername := url.QueryEscape(username)
- escapedPassword := url.QueryEscape(password)
- urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr)
- clientOptions := options.Client().ApplyURI(urls).SetDirect(direct)
- // 连接到MongoDB
- client, err := mongo.Connect(context.TODO(), clientOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer func() {
- if err := client.Disconnect(context.TODO()); err != nil {
- log.Fatal(err)
- }
- }()
- statisticMap := make(map[string]int)
- // 获取要查询的集合
- sourceCollection := client.Database("qfw").Collection("wcc_bank_poc3")
- //写入新表
- targetCollection := client.Database("qfw").Collection("wcc_bank_poc3_statistic")
- // 设置查询条件
- filter := bson.D{
- //{"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
- //{"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
- }
- // 设置投影,排除 contenthtml 字段
- projection := bson.D{
- {"s_winner", 1}, // 0表示不返回该字段
- {"projectname", 1}, // 0表示不返回该字段
- }
- // 获取查询结果的总文档数
- totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil)
- if err != nil {
- log.Fatal(err)
- }
- log.Println("总数量:", totalCount)
- findOptions := options.Find().SetProjection(projection)
- // 执行查询
- cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer cursor.Close(context.TODO())
- //凭安企业库
- MgoC := &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.181:27001",
- DbName: "mixdata",
- Size: 10,
- UserName: "",
- Password: "",
- //Direct: true,
- }
- MgoC.InitPool()
- count := 0
- // 迭代查询结果
- for cursor.Next(context.TODO()) {
- var result = make(map[string]interface{})
- if err := cursor.Decode(&result); err != nil {
- log.Fatal(err)
- }
- count++
- if count%10000 == 0 {
- log.Println("current count", count)
- }
- projectName := util.ObjToString(result["projectname"])
- if projectName == "" {
- continue
- }
- winner := util.ObjToString(result["s_winner"])
- if winner == "" {
- continue
- }
- if strings.Contains(winner, ",") {
- winners := strings.Split(winner, ",")
- for _, v := range winners {
- statisticMap[v]++
- }
- }
- }
- for k, v := range statisticMap {
- insert := map[string]interface{}{
- "winner": k,
- "num": v,
- }
- //存入新表
- _, err := targetCollection.InsertOne(context.TODO(), insert)
- if err != nil {
- log.Fatal(err)
- }
- }
- log.Println("over")
- }
- // exportPoc 导出合作渠道银行POC需求;龚文华 需求;函数作废
- func exportPoc() {
- //InitMgo()
- username := "SJZY_RWbid_ES"
- password := "SJZY@B4i4D5e6S"
- addr := "172.17.189.140:27080"
- //addr := "127.0.0.1:27083"
- direct := true
- //url := fmt.Sprintf("mongodb://%s:%s@%s", username, password, addr)
- //clientOptions := options.Client().ApplyURI(url)
- if !strings.Contains(addr, "127") {
- direct = false
- }
- // Escape special characters in username and password
- escapedUsername := url.QueryEscape(username)
- escapedPassword := url.QueryEscape(password)
- // Construct MongoDB connection string
- urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr)
- clientOptions := options.Client().ApplyURI(urls).SetDirect(direct)
- //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083")
- // 连接到MongoDB
- client, err := mongo.Connect(context.TODO(), clientOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer func() {
- if err := client.Disconnect(context.TODO()); err != nil {
- log.Fatal(err)
- }
- }()
- // 获取要查询的集合
- sourceCollection := client.Database("qfw").Collection("bidding")
- //写入新表
- targetCollection := client.Database("qfw").Collection("wcc_bank_poc")
- // 设置查询条件
- filter := bson.D{
- {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
- {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
- }
- // 设置投影,排除 contenthtml 字段
- projection := bson.D{
- {"contenthtml", 0}, // 0表示不返回该字段
- {"attach_text", 0}, // 0表示不返回该字段
- }
- // 获取查询结果的总文档数
- totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil)
- if err != nil {
- log.Fatal(err)
- }
- log.Println("总数量:", totalCount)
- // 设置每批次查询的文档数量和当前批次数
- batchSize := 1000000
- currentBatch := 1 //当前处理批次
- recordCount := 0
- // 执行分批查询和导出
- for skip := 0; skip < int(totalCount); skip += batchSize {
- // 设置查询选项,包括批次大小和偏移量,过滤查询字段
- findOptions := options.Find().SetSkip(int64(skip)).SetLimit(int64(batchSize)).SetProjection(projection)
- // 执行查询
- cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer cursor.Close(context.TODO())
- // 迭代查询结果
- for cursor.Next(context.TODO()) {
- var result = make(map[string]interface{})
- if err := cursor.Decode(&result); err != nil {
- log.Fatal(err)
- }
- recordCount++
- if recordCount%10000 == 0 {
- log.Printf("已处理 %d 批次 %d 条记录\n", currentBatch, recordCount)
- }
- //过滤重复数据
- if util.IntAll(result["extracttype"]) != 1 {
- continue
- }
- // 处理查询结果
- area := util.ObjToString(result["area"])
- areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
- if !IsInStringArray(area, areas) {
- continue
- }
- projectName := util.ObjToString(result["projectname"])
- if strings.Contains(projectName, "非政府") {
- continue
- }
- buyerclass := util.ObjToString(result["buyerclass"])
- if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
- continue
- }
- //存入新表
- _, err := targetCollection.InsertOne(context.TODO(), result)
- if err != nil {
- log.Fatal(err)
- }
- }
- if err := cursor.Err(); err != nil {
- log.Fatal(err)
- }
- log.Printf("批次 %d 完成\n", currentBatch)
- // 增加当前批次数
- currentBatch++
- // 可选:如果查询结果较大,可以使用游标的超时机制,以防止游标在处理较大数据集时被自动关闭
- //time.Sleep(5 * time.Second)
- }
- log.Println("over")
- }
- **/
- // IsInStringArray 判断数组中是否存在字符串
- func IsInStringArray(str string, arr []string) bool {
- // 先对字符串数组进行排序
- sort.Strings(arr)
- // 使用二分查找算法查找字符串
- pos := sort.SearchStrings(arr, str)
- // 如果找到了则返回 true,否则返回 false
- return pos < len(arr) && arr[pos] == str
- }
|