main.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. "go.mongodb.org/mongo-driver/mongo/readpref"
  10. "log"
  11. "time"
  12. )
  13. func main() {
  14. qqqq()
  15. //getHot()
  16. //QlmChannel()
  17. }
  18. // QlmChannel 千里马 channel
  19. func QlmChannel() {
  20. type Item struct {
  21. ID primitive.ObjectID `bson:"_id,omitempty"`
  22. Site string `bson:"site"`
  23. Channel string `bson:"channel"`
  24. Publishtime string `bson:"publishtime"`
  25. // Add other fields as needed
  26. }
  27. type GroupedData struct {
  28. Channel string `bson:"channel"`
  29. Count int `bson:"count"`
  30. }
  31. // 设置MongoDB连接选项
  32. //clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27081")
  33. clientOptions := options.Client().ApplyURI("mongodb://172.17.4.87:27080")
  34. //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083")
  35. clientOptions.SetReadPreference(readpref.Primary())
  36. clientOptions.SetDirect(true)
  37. // 连接MongoDB
  38. client, err := mongo.Connect(context.Background(), clientOptions)
  39. if err != nil {
  40. log.Println(err)
  41. }
  42. // 检查连接
  43. err = client.Ping(context.Background(), nil)
  44. if err != nil {
  45. log.Fatal(err)
  46. }
  47. // 选择数据库和集合
  48. database := client.Database("qlm")
  49. collection := database.Collection("data_merge")
  50. // 查询条件
  51. filter := bson.D{
  52. {"site", "千里马"},
  53. {"publishtime", bson.D{{"$gt", "2022-01-01"}}},
  54. }
  55. // 执行查询
  56. cursor, err := collection.Find(context.Background(), filter)
  57. if err != nil {
  58. log.Fatal(err)
  59. }
  60. defer cursor.Close(context.Background())
  61. // 手动分组查看数据
  62. count := 0
  63. groupedData := make(map[string]int)
  64. for cursor.Next(context.Background()) {
  65. var item Item
  66. err := cursor.Decode(&item)
  67. if err != nil {
  68. log.Fatal(err)
  69. }
  70. // 在这里可以根据需要进行其他逻辑处理
  71. // 手动分组
  72. groupedData[item.Channel]++
  73. count++
  74. if count%10000 == 0 {
  75. log.Println("current:", count)
  76. }
  77. }
  78. // 输出结果
  79. //for channel, count := range groupedData {
  80. // fmt.Printf("Channel: %s, Count: %d\n", channel, count)
  81. //}
  82. // 将手动分组的数据保存到新的集合中
  83. var groupedDataList []interface{}
  84. for channel, count := range groupedData {
  85. groupedDataList = append(groupedDataList, GroupedData{Channel: channel, Count: count})
  86. }
  87. // 选择新的集合
  88. newCollection := database.Collection("wcc_qlm")
  89. // 插入数据
  90. _, err = newCollection.InsertMany(context.Background(), groupedDataList)
  91. if err != nil {
  92. log.Fatal(err)
  93. }
  94. fmt.Println("Data inserted into wcc_qlm collection.")
  95. }
  96. // MongoConfig 保存连接配置
  97. type MongoConfig struct {
  98. URI string
  99. Database string
  100. Username string
  101. Password string
  102. Direct bool
  103. }
  104. func connectMongo(cfg MongoConfig) (*mongo.Client, error) {
  105. //clientOpts := options.Client().ApplyURI(cfg.URI)
  106. clientOpts := options.Client().
  107. ApplyURI(cfg.URI).
  108. SetDirect(cfg.Direct) // 设置为直连模式,适合单节点连接
  109. if cfg.Username != "" && cfg.Password != "" {
  110. cred := options.Credential{
  111. Username: cfg.Username,
  112. Password: cfg.Password,
  113. }
  114. clientOpts.SetAuth(cred)
  115. }
  116. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  117. defer cancel()
  118. client, err := mongo.Connect(ctx, clientOpts)
  119. if err != nil {
  120. return nil, err
  121. }
  122. return client, nil
  123. }
  124. // qqqq 迁移MongoDB
  125. func qqqq() {
  126. srcCfg := MongoConfig{
  127. URI: "mongodb://127.0.0.1:27001",
  128. Database: "mixdata",
  129. Username: "",
  130. Password: "",
  131. Direct: true,
  132. }
  133. dstCfg := MongoConfig{
  134. URI: "mongodb://172.20.45.129:27002",
  135. Database: "mixdata",
  136. Username: "",
  137. Password: "",
  138. //Direct: true,
  139. }
  140. // 连接源和目标Mongo
  141. srcClient, err := connectMongo(srcCfg)
  142. if err != nil {
  143. log.Fatalf("连接源Mongo失败: %v", err)
  144. }
  145. defer srcClient.Disconnect(context.Background())
  146. dstClient, err := connectMongo(dstCfg)
  147. if err != nil {
  148. log.Fatalf("连接目标Mongo失败: %v", err)
  149. }
  150. defer dstClient.Disconnect(context.Background())
  151. srcDB := srcClient.Database(srcCfg.Database)
  152. dstDB := dstClient.Database(dstCfg.Database)
  153. // 获取源库集合列表
  154. ctx := context.Background()
  155. //collections, err := srcDB.ListCollectionNames(ctx, bson.M{})
  156. collections := []string{"special_trade_union"}
  157. if err != nil {
  158. log.Fatalf("获取集合列表失败: %v", err)
  159. }
  160. fmt.Printf("发现 %d 个集合,将逐个迁移\n", len(collections))
  161. batchSize := 1000 // 批量大小
  162. for _, collName := range collections {
  163. fmt.Printf("迁移集合: %s\n", collName)
  164. srcColl := srcDB.Collection(collName)
  165. dstColl := dstDB.Collection(collName)
  166. // 查询全部数据
  167. cursor, err := srcColl.Find(ctx, bson.M{})
  168. if err != nil {
  169. log.Printf("查询集合 %s 失败: %v", collName, err)
  170. continue
  171. }
  172. batchDocs := make([]interface{}, 0, batchSize)
  173. count := 0
  174. for cursor.Next(ctx) {
  175. var doc bson.M
  176. if err := cursor.Decode(&doc); err != nil {
  177. log.Printf("解析文档失败: %v", err)
  178. continue
  179. }
  180. batchDocs = append(batchDocs, doc)
  181. if len(batchDocs) >= batchSize {
  182. if err := insertBatch(ctx, dstColl, batchDocs); err != nil {
  183. log.Printf("批量写入失败: %v", err)
  184. }
  185. count += len(batchDocs)
  186. log.Println("current:", collName, count)
  187. batchDocs = batchDocs[:0]
  188. }
  189. }
  190. // 插入剩余文档
  191. if len(batchDocs) > 0 {
  192. if err := insertBatch(ctx, dstColl, batchDocs); err != nil {
  193. log.Printf("批量写入失败: %v", err)
  194. }
  195. count += len(batchDocs)
  196. }
  197. fmt.Printf("集合 %s 迁移完成,导入 %d 条数据\n", collName, count)
  198. cursor.Close(ctx)
  199. }
  200. }
  201. func insertBatch(ctx context.Context, coll *mongo.Collection, docs []interface{}) error {
  202. _, err := coll.InsertMany(ctx, docs)
  203. return err
  204. }