main.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. "go.mongodb.org/mongo-driver/mongo/readpref"
  10. "log"
  11. )
  12. func main() {
  13. getHot()
  14. //QlmChannel()
  15. }
  16. //QlmChannel 千里马 channel
  17. func QlmChannel() {
  18. type Item struct {
  19. ID primitive.ObjectID `bson:"_id,omitempty"`
  20. Site string `bson:"site"`
  21. Channel string `bson:"channel"`
  22. Publishtime string `bson:"publishtime"`
  23. // Add other fields as needed
  24. }
  25. type GroupedData struct {
  26. Channel string `bson:"channel"`
  27. Count int `bson:"count"`
  28. }
  29. // 设置MongoDB连接选项
  30. //clientOptions := options.Client().ApplyURI("mongodb://127.0.0.1:27081")
  31. clientOptions := options.Client().ApplyURI("mongodb://172.17.4.87:27080")
  32. //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083")
  33. clientOptions.SetReadPreference(readpref.Primary())
  34. clientOptions.SetDirect(true)
  35. // 连接MongoDB
  36. client, err := mongo.Connect(context.Background(), clientOptions)
  37. if err != nil {
  38. log.Println(err)
  39. }
  40. // 检查连接
  41. err = client.Ping(context.Background(), nil)
  42. if err != nil {
  43. log.Fatal(err)
  44. }
  45. // 选择数据库和集合
  46. database := client.Database("qlm")
  47. collection := database.Collection("data_merge")
  48. // 查询条件
  49. filter := bson.D{
  50. {"site", "千里马"},
  51. {"publishtime", bson.D{{"$gt", "2022-01-01"}}},
  52. }
  53. // 执行查询
  54. cursor, err := collection.Find(context.Background(), filter)
  55. if err != nil {
  56. log.Fatal(err)
  57. }
  58. defer cursor.Close(context.Background())
  59. // 手动分组查看数据
  60. count := 0
  61. groupedData := make(map[string]int)
  62. for cursor.Next(context.Background()) {
  63. var item Item
  64. err := cursor.Decode(&item)
  65. if err != nil {
  66. log.Fatal(err)
  67. }
  68. // 在这里可以根据需要进行其他逻辑处理
  69. // 手动分组
  70. groupedData[item.Channel]++
  71. count++
  72. if count%10000 == 0 {
  73. log.Println("current:", count)
  74. }
  75. }
  76. // 输出结果
  77. //for channel, count := range groupedData {
  78. // fmt.Printf("Channel: %s, Count: %d\n", channel, count)
  79. //}
  80. // 将手动分组的数据保存到新的集合中
  81. var groupedDataList []interface{}
  82. for channel, count := range groupedData {
  83. groupedDataList = append(groupedDataList, GroupedData{Channel: channel, Count: count})
  84. }
  85. // 选择新的集合
  86. newCollection := database.Collection("wcc_qlm")
  87. // 插入数据
  88. _, err = newCollection.InsertMany(context.Background(), groupedDataList)
  89. if err != nil {
  90. log.Fatal(err)
  91. }
  92. fmt.Println("Data inserted into wcc_qlm collection.")
  93. }