package main import ( "context" "fmt" "github.com/xuri/excelize/v2" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "net/url" "strings" "time" ) func get181Data() { //181 凭安库 MgoQY := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.181:27001", //MongodbAddr: "127.0.0.1:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } MgoQY.InitPool() sess := MgoQY.GetMgoConn() defer MgoQY.DestoryMongoConn(sess) where := map[string]interface{}{ "use_flag": 0, "company_status": map[string]interface{}{ "$nin": []string{"注销", "吊销", "吊销,已注销"}, }, } selected := map[string]interface{}{ "company_name": 1, "company_status": 1, "company_type": 1, "use_flag": 1, } query := sess.DB("mixdata").C("company_base").Find(where).Select(selected).Iter() count := 0 typeCount := make(map[string]int) for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%10000 == 0 { log.Println("current:", count, tmp["company_name"], tmp["company_status"], tmp["company_type"], len(typeCount)) } if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["company_type"]) == "" { continue } company_status := util.ObjToString(tmp["company_status"]) if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") { continue } company_type := util.ObjToString(tmp["company_type"]) typeCount[company_type]++ } log.Println("len types", len(typeCount)) for k, v := range typeCount { sa := map[string]interface{}{ "type": k, "count": v, } MgoQY.Save("wcc_company_type_static_0712", sa) } log.Println("get181Data --------- over ") } func exportByTraverseAndCount() { ctx := context.Background() username := "" password := "" //hosts := []string{"172.17.4.181:27001"} hosts := []string{"172.17.4.181:27001"} uri, err := BuildMongoURI(username, password, hosts, nil) if err != nil { panic(err) } clientOptions := options.Client().ApplyURI(uri) client, err := mongo.Connect(ctx, clientOptions) if err != nil { log.Fatal(err) } defer client.Disconnect(ctx) collection := client.Database("mixdata").Collection("company_base") // 查询条件 filter := bson.D{ {"use_flag", 0}, {"company_status", bson.D{{"$nin", bson.A{"注销", "吊销", "吊销,已注销"}}}}, } // 只查询 company_type 字段 opts := options.Find().SetProjection(bson.D{{"company_type", 1}}) // 创建游标 cursor, err := collection.Find(ctx, filter, opts) if err != nil { log.Fatalf("查询失败: %v", err) } defer cursor.Close(ctx) // 用 map 统计 typeCount := make(map[string]int64) count := 0 // 遍历 for cursor.Next(ctx) { var doc struct { CompanyType string `bson:"company_type"` } count++ if count%10000 == 0 { log.Println("current:", count) } if err := cursor.Decode(&doc); err != nil { log.Printf("解码失败: %v", err) continue } if doc.CompanyType == "" { continue } typeCount[doc.CompanyType]++ } if err := cursor.Err(); err != nil { log.Fatalf("遍历出错: %v", err) } // 写 Excel f := excelize.NewFile() sheet := "TypeCounts" f.SetCellValue(sheet, "A1", "company_type") f.SetCellValue(sheet, "B1", "count") row := 2 for companyType, count := range typeCount { f.SetCellValue(sheet, fmt.Sprintf("A%d", row), companyType) f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count) row++ } filename := fmt.Sprintf("company_type_counts_traverse_%d.xlsx", time.Now().Unix()) if err := f.SaveAs(filename); err != nil { log.Fatalf("保存 Excel 失败: %v", err) } fmt.Println("\n导出完成,文件名:", filename) } func exportCompanyType2() { ctx := context.Background() username := "" password := "" hosts := []string{"172.17.4.181:27001"} uri, err := BuildMongoURI(username, password, hosts, nil) if err != nil { panic(err) } clientOptions := options.Client().ApplyURI(uri) client, err := mongo.Connect(ctx, clientOptions) if err != nil { log.Fatal(err) } defer client.Disconnect(ctx) db := client.Database("mixdata") collection := db.Collection("company_base") // 公共 filter:use_flag=0 且 company_status != 注销 且 company_status != 吊销 filter := bson.M{ "use_flag": 0, "company_status": bson.M{ "$nin": []string{"注销", "吊销", "吊销,已注销"}, }, } // 第一步:distinct 不重复的 company_type types, err := collection.Distinct(ctx, "company_type", filter) if err != nil { log.Fatalf("查询 distinct 失败: %v", err) } fmt.Printf("共找到 %d 个不同的 company_type\n", len(types)) // 创建 Excel 文件 f := excelize.NewFile() sheet := "TypeCounts" f.SetCellValue(sheet, "A1", "company_type") f.SetCellValue(sheet, "B1", "count") // 第二步:对每个类型 countDocuments,也带上相同 filter row := 2 for _, t := range types { strType, ok := t.(string) if !ok { continue } // 在 filter 上再加一个 company_type 条件 typeFilter := bson.M{ "use_flag": 0, "company_status": bson.M{ "$nin": []string{"注销", "吊销", "吊销,已注销"}, }, "company_type": strType, } count, err := collection.CountDocuments(ctx, typeFilter) if err != nil { log.Printf("统计 %s 出错: %v", strType, err) continue } log.Println(strType, count) // 写到 Excel f.SetCellValue(sheet, fmt.Sprintf("A%d", row), strType) f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count) row++ } // 保存 Excel if err := f.SaveAs("company_type_counts_filtered.xlsx"); err != nil { log.Fatalf("保存 Excel 失败: %v", err) } fmt.Println("导出完成,文件名:company_type_counts_filtered.xlsx") } func exportCompanyType() { ctx := context.Background() username := "" password := "" //hosts := []string{"127.0.0.1:27001"} hosts := []string{"172.17.4.181:27001"} uri, err := BuildMongoURI(username, password, hosts, nil) if err != nil { panic(err) } clientOptions := options.Client().ApplyURI(uri) //clientOptions.SetDirect(true) // 如果需要 direct client, err := mongo.Connect(ctx, clientOptions) if err != nil { log.Fatal(err) } defer client.Disconnect(ctx) db := client.Database("mixdata") collection := db.Collection("company_base") // 第一步:distinct 不重复的 company_type types, err := collection.Distinct(ctx, "company_type", bson.D{}) if err != nil { log.Fatalf("查询 distinct 失败: %v", err) } fmt.Printf("共找到 %d 个不同的 company_type\n", len(types)) // 创建 Excel 文件 f := excelize.NewFile() sheet := "TypeCounts" f.SetCellValue(sheet, "A1", "company_type") f.SetCellValue(sheet, "B1", "count") // 第二步:对每个类型 countDocuments row := 2 for _, t := range types { strType, ok := t.(string) if !ok { continue } count, err := collection.CountDocuments(ctx, bson.M{"company_type": strType}) if err != nil { log.Printf("统计 %s 出错: %v", strType, err) continue } log.Println(strType, count) // 写到 Excel f.SetCellValue(sheet, fmt.Sprintf("A%d", row), strType) f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count) row++ } // 保存 Excel if err := f.SaveAs("company_type_counts_fast.xlsx"); err != nil { log.Fatalf("保存 Excel 失败: %v", err) } fmt.Println("导出完成,文件名:company_type_counts_fast.xlsx") } type Enterprise struct { CompanyName string `bson:"company_name"` CompanyType string `bson:"company_type"` CreditNo string `bson:"credit_no"` Organizer string `bson:"organizer"` UseFlag float64 `bson:"use_flag"` RuleName string `bson:"rule_name"` // Excel B列内容 } // matchSpecialEnterprise 匹配特殊企业 func matchSpecialEnterprise() { ctx := context.Background() /** MgoQY := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.181:27001", //MongodbAddr: "127.0.0.1:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } MgoQY.InitPool() */ username := "" password := "" hosts := []string{"172.17.4.181:27001"} uri, err := BuildMongoURI(username, password, hosts, nil) if err != nil { panic(err) } // MongoDB连接 client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) if err != nil { log.Fatal(err) } defer client.Disconnect(ctx) db := client.Database("mixdata") sourceCol := db.Collection("special_enterprise") targetCol := db.Collection("special_enterprise_temp_03") // 读取Excel f, err := excelize.OpenFile("./政府、学校、医院、银行等.xlsx") if err != nil { log.Fatal(err) } defer f.Close() rows, err := f.GetRows("Sheet1") if err != nil { log.Fatal(err) } var ruleNames []string for idx, row := range rows { if idx == 0 { continue // 跳过标题 } if len(row) > 1 { ruleNames = append(ruleNames, strings.TrimSpace(row[1])) } } log.Printf("读取到 %d 条规则", len(ruleNames)) // 从MongoDB一次性读出 use_flag=0 的数据 cursor, err := sourceCol.Find(ctx, bson.M{"use_flag": 0.0}) if err != nil { log.Fatal("查询Mongo失败:", err) } defer cursor.Close(ctx) var allEnterprises []Enterprise for cursor.Next(ctx) { var doc bson.M if err := cursor.Decode(&doc); err != nil { log.Println("解码失败:", err) continue } ent := Enterprise{ CompanyName: getString(doc, "company_name"), CompanyType: getString(doc, "company_type"), CreditNo: getString(doc, "credit_no"), Organizer: getString(doc, "organizer"), UseFlag: getFloat64(doc, "use_flag"), } allEnterprises = append(allEnterprises, ent) } log.Printf("从MongoDB读取到 %d 条数据", len(allEnterprises)) var matched []interface{} for idx, ent := range allEnterprises { if idx%10000 == 0 { log.Println("当前匹配企业", idx, ent.CompanyName, "还剩余", len(allEnterprises)-idx) } if ent.Organizer == "" { continue } for _, rule := range ruleNames { if rule == "" { continue } if strings.Contains(ent.Organizer, rule) || strings.Contains(rule, ent.Organizer) { newEnt := ent newEnt.RuleName = rule matched = append(matched, newEnt) break // 匹配到一个就退出,继续下一个企业 } } } log.Printf("匹配到 %d 条记录,准备写入临时表", len(matched)) // 分批写入 MongoDB const batchSize = 1000 for i := 0; i < len(matched); i += batchSize { end := i + batchSize if end > len(matched) { end = len(matched) } _, err := targetCol.InsertMany(ctx, matched[i:end]) if err != nil { log.Println("插入失败:", err) } } log.Println("全部完成!") } // 工具函数:安全获取字符串 func getString(m bson.M, key string) string { if val, ok := m[key]; ok { if s, ok := val.(string); ok { return s } } return "" } // 工具函数:安全获取float64 func getFloat64(m bson.M, key string) float64 { if val, ok := m[key]; ok { switch v := val.(type) { case float64: return v case int32: return float64(v) case int64: return float64(v) } } return 0 } func BuildMongoURI(username, password string, hosts []string, options map[string]string) (string, error) { if len(hosts) == 0 { return "", fmt.Errorf("hosts cannot be empty") } hostList := strings.Join(hosts, ",") var authPart string if username != "" { escapedUsername := url.QueryEscape(username) escapedPassword := url.QueryEscape(password) authPart = fmt.Sprintf("%s:%s@", escapedUsername, escapedPassword) // 如果密码为空,也会拼成 username:@host ,MongoDB URI 是支持的,可以保留 } var optionStr string if len(options) > 0 { query := url.Values{} for k, v := range options { query.Set(k, v) } optionStr = "?" + query.Encode() } return fmt.Sprintf("mongodb://%s%s%s", authPart, hostList, optionStr), nil }