123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- package main
- import (
- "context"
- "fmt"
- "github.com/xuri/excelize/v2"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "net/url"
- "strings"
- "time"
- )
- func get181Data() {
- //181 凭安库
- MgoQY := &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.181:27001",
- //MongodbAddr: "127.0.0.1:27001",
- DbName: "mixdata",
- Size: 10,
- UserName: "",
- Password: "",
- //Direct: true,
- }
- MgoQY.InitPool()
- sess := MgoQY.GetMgoConn()
- defer MgoQY.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "use_flag": 0,
- "company_status": map[string]interface{}{
- "$nin": []string{"注销", "吊销", "吊销,已注销"},
- },
- }
- selected := map[string]interface{}{
- "company_name": 1,
- "company_status": 1,
- "company_type": 1,
- "use_flag": 1,
- }
- query := sess.DB("mixdata").C("company_base").Find(where).Select(selected).Iter()
- count := 0
- typeCount := make(map[string]int)
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, tmp["company_name"], tmp["company_status"], tmp["company_type"], len(typeCount))
- }
- if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["company_type"]) == "" {
- continue
- }
- company_status := util.ObjToString(tmp["company_status"])
- if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
- continue
- }
- company_type := util.ObjToString(tmp["company_type"])
- typeCount[company_type]++
- }
- log.Println("len types", len(typeCount))
- for k, v := range typeCount {
- sa := map[string]interface{}{
- "type": k,
- "count": v,
- }
- MgoQY.Save("wcc_company_type_static_0712", sa)
- }
- log.Println("get181Data --------- over ")
- }
- func exportByTraverseAndCount() {
- ctx := context.Background()
- username := ""
- password := ""
- //hosts := []string{"172.17.4.181:27001"}
- hosts := []string{"172.17.4.181:27001"}
- uri, err := BuildMongoURI(username, password, hosts, nil)
- if err != nil {
- panic(err)
- }
- clientOptions := options.Client().ApplyURI(uri)
- client, err := mongo.Connect(ctx, clientOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer client.Disconnect(ctx)
- collection := client.Database("mixdata").Collection("company_base")
- // 查询条件
- filter := bson.D{
- {"use_flag", 0},
- {"company_status", bson.D{{"$nin", bson.A{"注销", "吊销", "吊销,已注销"}}}},
- }
- // 只查询 company_type 字段
- opts := options.Find().SetProjection(bson.D{{"company_type", 1}})
- // 创建游标
- cursor, err := collection.Find(ctx, filter, opts)
- if err != nil {
- log.Fatalf("查询失败: %v", err)
- }
- defer cursor.Close(ctx)
- // 用 map 统计
- typeCount := make(map[string]int64)
- count := 0
- // 遍历
- for cursor.Next(ctx) {
- var doc struct {
- CompanyType string `bson:"company_type"`
- }
- count++
- if count%10000 == 0 {
- log.Println("current:", count)
- }
- if err := cursor.Decode(&doc); err != nil {
- log.Printf("解码失败: %v", err)
- continue
- }
- if doc.CompanyType == "" {
- continue
- }
- typeCount[doc.CompanyType]++
- }
- if err := cursor.Err(); err != nil {
- log.Fatalf("遍历出错: %v", err)
- }
- // 写 Excel
- f := excelize.NewFile()
- sheet := "TypeCounts"
- f.SetCellValue(sheet, "A1", "company_type")
- f.SetCellValue(sheet, "B1", "count")
- row := 2
- for companyType, count := range typeCount {
- f.SetCellValue(sheet, fmt.Sprintf("A%d", row), companyType)
- f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count)
- row++
- }
- filename := fmt.Sprintf("company_type_counts_traverse_%d.xlsx", time.Now().Unix())
- if err := f.SaveAs(filename); err != nil {
- log.Fatalf("保存 Excel 失败: %v", err)
- }
- fmt.Println("\n导出完成,文件名:", filename)
- }
- func exportCompanyType2() {
- ctx := context.Background()
- username := ""
- password := ""
- hosts := []string{"172.17.4.181:27001"}
- uri, err := BuildMongoURI(username, password, hosts, nil)
- if err != nil {
- panic(err)
- }
- clientOptions := options.Client().ApplyURI(uri)
- client, err := mongo.Connect(ctx, clientOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer client.Disconnect(ctx)
- db := client.Database("mixdata")
- collection := db.Collection("company_base")
- // 公共 filter:use_flag=0 且 company_status != 注销 且 company_status != 吊销
- filter := bson.M{
- "use_flag": 0,
- "company_status": bson.M{
- "$nin": []string{"注销", "吊销", "吊销,已注销"},
- },
- }
- // 第一步:distinct 不重复的 company_type
- types, err := collection.Distinct(ctx, "company_type", filter)
- if err != nil {
- log.Fatalf("查询 distinct 失败: %v", err)
- }
- fmt.Printf("共找到 %d 个不同的 company_type\n", len(types))
- // 创建 Excel 文件
- f := excelize.NewFile()
- sheet := "TypeCounts"
- f.SetCellValue(sheet, "A1", "company_type")
- f.SetCellValue(sheet, "B1", "count")
- // 第二步:对每个类型 countDocuments,也带上相同 filter
- row := 2
- for _, t := range types {
- strType, ok := t.(string)
- if !ok {
- continue
- }
- // 在 filter 上再加一个 company_type 条件
- typeFilter := bson.M{
- "use_flag": 0,
- "company_status": bson.M{
- "$nin": []string{"注销", "吊销", "吊销,已注销"},
- },
- "company_type": strType,
- }
- count, err := collection.CountDocuments(ctx, typeFilter)
- if err != nil {
- log.Printf("统计 %s 出错: %v", strType, err)
- continue
- }
- log.Println(strType, count)
- // 写到 Excel
- f.SetCellValue(sheet, fmt.Sprintf("A%d", row), strType)
- f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count)
- row++
- }
- // 保存 Excel
- if err := f.SaveAs("company_type_counts_filtered.xlsx"); err != nil {
- log.Fatalf("保存 Excel 失败: %v", err)
- }
- fmt.Println("导出完成,文件名:company_type_counts_filtered.xlsx")
- }
- func exportCompanyType() {
- ctx := context.Background()
- username := ""
- password := ""
- //hosts := []string{"127.0.0.1:27001"}
- hosts := []string{"172.17.4.181:27001"}
- uri, err := BuildMongoURI(username, password, hosts, nil)
- if err != nil {
- panic(err)
- }
- clientOptions := options.Client().ApplyURI(uri)
- //clientOptions.SetDirect(true) // 如果需要 direct
- client, err := mongo.Connect(ctx, clientOptions)
- if err != nil {
- log.Fatal(err)
- }
- defer client.Disconnect(ctx)
- db := client.Database("mixdata")
- collection := db.Collection("company_base")
- // 第一步:distinct 不重复的 company_type
- types, err := collection.Distinct(ctx, "company_type", bson.D{})
- if err != nil {
- log.Fatalf("查询 distinct 失败: %v", err)
- }
- fmt.Printf("共找到 %d 个不同的 company_type\n", len(types))
- // 创建 Excel 文件
- f := excelize.NewFile()
- sheet := "TypeCounts"
- f.SetCellValue(sheet, "A1", "company_type")
- f.SetCellValue(sheet, "B1", "count")
- // 第二步:对每个类型 countDocuments
- row := 2
- for _, t := range types {
- strType, ok := t.(string)
- if !ok {
- continue
- }
- count, err := collection.CountDocuments(ctx, bson.M{"company_type": strType})
- if err != nil {
- log.Printf("统计 %s 出错: %v", strType, err)
- continue
- }
- log.Println(strType, count)
- // 写到 Excel
- f.SetCellValue(sheet, fmt.Sprintf("A%d", row), strType)
- f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count)
- row++
- }
- // 保存 Excel
- if err := f.SaveAs("company_type_counts_fast.xlsx"); err != nil {
- log.Fatalf("保存 Excel 失败: %v", err)
- }
- fmt.Println("导出完成,文件名:company_type_counts_fast.xlsx")
- }
- type Enterprise struct {
- CompanyName string `bson:"company_name"`
- CompanyType string `bson:"company_type"`
- CreditNo string `bson:"credit_no"`
- Organizer string `bson:"organizer"`
- UseFlag float64 `bson:"use_flag"`
- RuleName string `bson:"rule_name"` // Excel B列内容
- }
- // matchSpecialEnterprise 匹配特殊企业
- func matchSpecialEnterprise() {
- ctx := context.Background()
- /**
- MgoQY := &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.181:27001",
- //MongodbAddr: "127.0.0.1:27001",
- DbName: "mixdata",
- Size: 10,
- UserName: "",
- Password: "",
- //Direct: true,
- }
- MgoQY.InitPool()
- */
- username := ""
- password := ""
- hosts := []string{"172.17.4.181:27001"}
- uri, err := BuildMongoURI(username, password, hosts, nil)
- if err != nil {
- panic(err)
- }
- // MongoDB连接
- client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
- if err != nil {
- log.Fatal(err)
- }
- defer client.Disconnect(ctx)
- db := client.Database("mixdata")
- sourceCol := db.Collection("special_enterprise")
- targetCol := db.Collection("special_enterprise_temp_03")
- // 读取Excel
- f, err := excelize.OpenFile("./政府、学校、医院、银行等.xlsx")
- if err != nil {
- log.Fatal(err)
- }
- defer f.Close()
- rows, err := f.GetRows("Sheet1")
- if err != nil {
- log.Fatal(err)
- }
- var ruleNames []string
- for idx, row := range rows {
- if idx == 0 {
- continue // 跳过标题
- }
- if len(row) > 1 {
- ruleNames = append(ruleNames, strings.TrimSpace(row[1]))
- }
- }
- log.Printf("读取到 %d 条规则", len(ruleNames))
- // 从MongoDB一次性读出 use_flag=0 的数据
- cursor, err := sourceCol.Find(ctx, bson.M{"use_flag": 0.0})
- if err != nil {
- log.Fatal("查询Mongo失败:", err)
- }
- defer cursor.Close(ctx)
- var allEnterprises []Enterprise
- for cursor.Next(ctx) {
- var doc bson.M
- if err := cursor.Decode(&doc); err != nil {
- log.Println("解码失败:", err)
- continue
- }
- ent := Enterprise{
- CompanyName: getString(doc, "company_name"),
- CompanyType: getString(doc, "company_type"),
- CreditNo: getString(doc, "credit_no"),
- Organizer: getString(doc, "organizer"),
- UseFlag: getFloat64(doc, "use_flag"),
- }
- allEnterprises = append(allEnterprises, ent)
- }
- log.Printf("从MongoDB读取到 %d 条数据", len(allEnterprises))
- var matched []interface{}
- for idx, ent := range allEnterprises {
- if idx%10000 == 0 {
- log.Println("当前匹配企业", idx, ent.CompanyName, "还剩余", len(allEnterprises)-idx)
- }
- if ent.Organizer == "" {
- continue
- }
- for _, rule := range ruleNames {
- if rule == "" {
- continue
- }
- if strings.Contains(ent.Organizer, rule) || strings.Contains(rule, ent.Organizer) {
- newEnt := ent
- newEnt.RuleName = rule
- matched = append(matched, newEnt)
- break // 匹配到一个就退出,继续下一个企业
- }
- }
- }
- log.Printf("匹配到 %d 条记录,准备写入临时表", len(matched))
- // 分批写入 MongoDB
- const batchSize = 1000
- for i := 0; i < len(matched); i += batchSize {
- end := i + batchSize
- if end > len(matched) {
- end = len(matched)
- }
- _, err := targetCol.InsertMany(ctx, matched[i:end])
- if err != nil {
- log.Println("插入失败:", err)
- }
- }
- log.Println("全部完成!")
- }
- // 工具函数:安全获取字符串
- func getString(m bson.M, key string) string {
- if val, ok := m[key]; ok {
- if s, ok := val.(string); ok {
- return s
- }
- }
- return ""
- }
- // 工具函数:安全获取float64
- func getFloat64(m bson.M, key string) float64 {
- if val, ok := m[key]; ok {
- switch v := val.(type) {
- case float64:
- return v
- case int32:
- return float64(v)
- case int64:
- return float64(v)
- }
- }
- return 0
- }
- func BuildMongoURI(username, password string, hosts []string, options map[string]string) (string, error) {
- if len(hosts) == 0 {
- return "", fmt.Errorf("hosts cannot be empty")
- }
- hostList := strings.Join(hosts, ",")
- var authPart string
- if username != "" {
- escapedUsername := url.QueryEscape(username)
- escapedPassword := url.QueryEscape(password)
- authPart = fmt.Sprintf("%s:%s@", escapedUsername, escapedPassword)
- // 如果密码为空,也会拼成 username:@host ,MongoDB URI 是支持的,可以保留
- }
- var optionStr string
- if len(options) > 0 {
- query := url.Values{}
- for k, v := range options {
- query.Set(k, v)
- }
- optionStr = "?" + query.Encode()
- }
- return fmt.Sprintf("mongodb://%s%s%s", authPart, hostList, optionStr), nil
- }
|