package main import ( "context" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" "log" "time" ) func main() { qqqq() //getHot() //QlmChannel() } // QlmChannel 千里马 channel func QlmChannel() { type Item struct { ID primitive.ObjectID `bson:"_id,omitempty"` Site string `bson:"site"` Channel string `bson:"channel"` Publishtime string `bson:"publishtime"` // Add other fields as needed } type GroupedData struct { Channel string `bson:"channel"` Count int `bson:"count"` } // 设置MongoDB连接选项 //clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27081") clientOptions := options.Client().ApplyURI("mongodb://172.17.4.87:27080") //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083") clientOptions.SetReadPreference(readpref.Primary()) clientOptions.SetDirect(true) // 连接MongoDB client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { log.Println(err) } // 检查连接 err = client.Ping(context.Background(), nil) if err != nil { log.Fatal(err) } // 选择数据库和集合 database := client.Database("qlm") collection := database.Collection("data_merge") // 查询条件 filter := bson.D{ {"site", "千里马"}, {"publishtime", bson.D{{"$gt", "2022-01-01"}}}, } // 执行查询 cursor, err := collection.Find(context.Background(), filter) if err != nil { log.Fatal(err) } defer cursor.Close(context.Background()) // 手动分组查看数据 count := 0 groupedData := make(map[string]int) for cursor.Next(context.Background()) { var item Item err := cursor.Decode(&item) if err != nil { log.Fatal(err) } // 在这里可以根据需要进行其他逻辑处理 // 手动分组 groupedData[item.Channel]++ count++ if count%10000 == 0 { log.Println("current:", count) } } // 输出结果 //for channel, count := range groupedData { // fmt.Printf("Channel: %s, Count: %d\n", channel, count) //} // 将手动分组的数据保存到新的集合中 var groupedDataList []interface{} for channel, count := range groupedData { groupedDataList = append(groupedDataList, GroupedData{Channel: channel, Count: count}) } // 选择新的集合 newCollection := database.Collection("wcc_qlm") // 插入数据 _, err = newCollection.InsertMany(context.Background(), groupedDataList) if err != nil { log.Fatal(err) } fmt.Println("Data inserted into wcc_qlm collection.") } // MongoConfig 保存连接配置 type MongoConfig struct { URI string Database string Username string Password string Direct bool } func connectMongo(cfg MongoConfig) (*mongo.Client, error) { //clientOpts := options.Client().ApplyURI(cfg.URI) clientOpts := options.Client(). ApplyURI(cfg.URI). SetDirect(cfg.Direct) // 设置为直连模式,适合单节点连接 if cfg.Username != "" && cfg.Password != "" { cred := options.Credential{ Username: cfg.Username, Password: cfg.Password, } clientOpts.SetAuth(cred) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() client, err := mongo.Connect(ctx, clientOpts) if err != nil { return nil, err } return client, nil } // qqqq 迁移MongoDB func qqqq() { srcCfg := MongoConfig{ URI: "mongodb://127.0.0.1:27001", Database: "mixdata", Username: "", Password: "", Direct: true, } dstCfg := MongoConfig{ URI: "mongodb://172.20.45.129:27002", Database: "mixdata", Username: "", Password: "", //Direct: true, } // 连接源和目标Mongo srcClient, err := connectMongo(srcCfg) if err != nil { log.Fatalf("连接源Mongo失败: %v", err) } defer srcClient.Disconnect(context.Background()) dstClient, err := connectMongo(dstCfg) if err != nil { log.Fatalf("连接目标Mongo失败: %v", err) } defer dstClient.Disconnect(context.Background()) srcDB := srcClient.Database(srcCfg.Database) dstDB := dstClient.Database(dstCfg.Database) // 获取源库集合列表 ctx := context.Background() //collections, err := srcDB.ListCollectionNames(ctx, bson.M{}) collections := []string{"special_trade_union"} if err != nil { log.Fatalf("获取集合列表失败: %v", err) } fmt.Printf("发现 %d 个集合,将逐个迁移\n", len(collections)) batchSize := 1000 // 批量大小 for _, collName := range collections { fmt.Printf("迁移集合: %s\n", collName) srcColl := srcDB.Collection(collName) dstColl := dstDB.Collection(collName) // 查询全部数据 cursor, err := srcColl.Find(ctx, bson.M{}) if err != nil { log.Printf("查询集合 %s 失败: %v", collName, err) continue } batchDocs := make([]interface{}, 0, batchSize) count := 0 for cursor.Next(ctx) { var doc bson.M if err := cursor.Decode(&doc); err != nil { log.Printf("解析文档失败: %v", err) continue } batchDocs = append(batchDocs, doc) if len(batchDocs) >= batchSize { if err := insertBatch(ctx, dstColl, batchDocs); err != nil { log.Printf("批量写入失败: %v", err) } count += len(batchDocs) log.Println("current:", collName, count) batchDocs = batchDocs[:0] } } // 插入剩余文档 if len(batchDocs) > 0 { if err := insertBatch(ctx, dstColl, batchDocs); err != nil { log.Printf("批量写入失败: %v", err) } count += len(batchDocs) } fmt.Printf("集合 %s 迁移完成,导入 %d 条数据\n", collName, count) cursor.Close(ctx) } } func insertBatch(ctx context.Context, coll *mongo.Collection, docs []interface{}) error { _, err := coll.InsertMany(ctx, docs) return err }