package main import ( "fmt" _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "gorm.io/driver/clickhouse" "gorm.io/driver/mysql" "gorm.io/gorm" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strconv" "sync" "time" ) // ExampleModel 定义数据模型 type ExampleModel struct { ID uint `gorm:"column:id"` Name string `gorm:"column:name"` Age uint `gorm:"column:age"` CreateTime time.Time `gorm:"create_time"` UpdateTime time.Time `gorm:"update_time"` } type EnterpriseBaseInfo struct { ID int `json:"id"` Name string `json:"name"` NameID string `json:"name_id"` CompanyID string `json:"company_id"` Address string `json:"address"` AreaCode string `json:"area_code"` CityCode string `json:"city_code"` DistrictCode string `json:"district_code"` IdentityType []uint8 `json:"identity_type"` Createtime time.Time `json:"createtime"` Updatetime time.Time `json:"updatetime"` LatestTime time.Time `json:"latest_time"` SEOID string `json:"seo_id"` EnterpriseType string `json:"enterprise_type"` } func (EnterpriseBaseInfo) TableName() string { return "dws_f_ent_baseinfo" } type WccBeijing struct { ProjectID string `json:"project_id"` CompanyID string `json:"company_id"` CreditNo *string `json:"credit_no,omitempty"` CompanyName *string `json:"company_name,omitempty"` RegisterArea *string `json:"register_area,omitempty"` CompanyType string `json:"company_type"` IndustryName *string `json:"industry_name,omitempty"` BidStatus *string `json:"bidstatus,omitempty"` Jgtime int32 `json:"jgtime"` BidYear *string `json:"bid_year,omitempty"` BidQuarter *string `json:"bid_quarter,omitempty"` BidMonth *string `json:"bid_month,omitempty"` Area *string `json:"area,omitempty"` City *string `json:"city,omitempty"` District *string `json:"district,omitempty"` Buyer *string `json:"buyer,omitempty"` BuyerClass *string `json:"buyerclass,omitempty"` Topscopeclass *string `json:"topscopeclass,omitempty"` Budget *float32 `json:"budget,omitempty"` BidAmount *float32 `json:"bidamount,omitempty"` } // click clickhouse 北京-京津翼长三角数据 func click() { //top.wcc_beijing Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "top", Size: 10, //Direct: true, } Mgo.InitPool() //181 Mgo2 := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.181:27001", //MongodbAddr: "127.0.0.1:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } Mgo2.InitPool() //mixdata.qyxy_std MgoS := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27083", MongodbAddr: "172.17.189.140:27080", DbName: "mixdata", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoS.InitPool() //tidb username := "datascbi" password := "Da#Bi20221111SC" //host := "127.0.0.1:4001" host := "172.17.162.25:4000" database := "global_common_data" dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database) // 连接到数据库 db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { log.Println("Failed to connect to database:", err) return } fmt.Println("Connected to the database!") //1. sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) query := sess.DB("top").C("wcc_allcity_2024Q2").Find(nil).Select(map[string]interface{}{"list": 0}).Iter() count := 0 ch := make(chan bool, 10) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current ---", count, tmp["s_winner"]) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //处理数据 beijing := WccBeijing{ ProjectID: util.ObjToString(tmp["id"]), } companyName := util.ObjToString(tmp["s_winner"]) if companyName == "" { return } beijing.CompanyName = &companyName if bidstatus, ok := tmp["bidstatus"].(string); ok { beijing.BidStatus = &bidstatus } beijing.Jgtime = int32(util.IntAll(tmp["jgtime"])) if util.IntAll(tmp["jgtime"]) > 0 { // 转换为 time.Time 对象 t := time.Unix(util.Int64All(tmp["jgtime"]), 0) // 获取年份、月份、季度 year := strconv.Itoa(t.Year()) monthStringTwoDigits := year + fmt.Sprintf("%02d", int(t.Month())) quarter := year + "Q" + strconv.Itoa((int(t.Month())-1)/3+1) beijing.BidYear = &year beijing.BidMonth = &monthStringTwoDigits beijing.BidQuarter = &quarter } area := util.ObjToString(tmp["area"]) beijing.Area = &area city := util.ObjToString(tmp["city"]) beijing.City = &city district := util.ObjToString(tmp["district"]) beijing.District = &district buyer := util.ObjToString(tmp["buyer"]) beijing.Buyer = &buyer buyerclass := util.ObjToString(tmp["buyerclass"]) beijing.BuyerClass = &buyerclass if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok { topclass := util.ObjToString(topscopeclass[0]) beijing.Topscopeclass = &topclass } budget := float32(util.Float64All(tmp["budget"])) beijing.Budget = &budget bidamount := float32(util.Float64All(tmp["bidamount"])) beijing.BidAmount = &bidamount data, _ := MgoS.FindOne("qyxy_std", map[string]interface{}{"company_name": companyName}) credit_no := util.ObjToString((*data)["credit_no"]) beijing.CreditNo = &credit_no companyID := util.ObjToString((*data)["_id"]) beijing.CompanyID = companyID companyArea := util.ObjToString((*data)["company_area"]) beijing.RegisterArea = &companyArea // var baseInfo EnterpriseBaseInfo db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo) companyType := baseInfo.EnterpriseType beijing.CompanyType = companyType // indusWhere := map[string]interface{}{ "company_id": companyID, } indus, _ := Mgo2.Find("company_industry", &indusWhere, nil, nil, false, -1, -1) if len(*indus) > 0 { indusName := util.ObjToString((*indus)[0]["industry_l1_name"]) beijing.IndustryName = &indusName } _, err := g.DB().Insert(gctx.New(), "wcc_allcity_2024Q2", beijing) if err != nil { log.Println("clickhouse 写入失败;", err, companyName, companyID) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("over") // 替换为你的 MySQL 数据库连接信息 //dsn := "user:password@tcp(localhost:3306)/database?charset=utf8mb4&parseTime=True&loc=Local" //dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database) //// 连接到数据库 //db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) //if err != nil { // fmt.Println("Failed to connect to database:", err) // return //} // //fmt.Println(db) //res, err := g.DB().Query(gctx.New(), `SELECT * FROM seo_siteKeywords_splicing order by site_code asc`) //if err == nil && len(res.List()) > 0 { // for _, m := range res.List() { // log.Println(m) // } //} } // click2 gorm 操作clickhouse func click2() { //无密码连接 //dsn := "clickhouse://localhost:9090/test?dial_timeout=10s&read_timeout=20s" dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s" db, err := gorm.Open(clickhouse.Open(dsn), &gorm.Config{}) if err != nil { log.Printf("打开数据库失败:%v", err) } //// 创建数据表 //if err := db.AutoMigrate(&ExampleModel{}); err != nil { // log.Fatal(err) //} // 写入数据 //example := &ExampleModel{ID: 33, Name: "wangchengcheng", Age: 32, CreateTime: time.Now(), UpdateTime: time.Now()} //if err := db.Create(example).Error; err != nil { // log.Fatal(err) //} // 查询单个数据 //var result ExampleModel //if err := db.First(&result, "id = ?", 1).Error; err != nil { // log.Fatal(err) //} //fmt.Printf("ID: %d, Name: %s, Age: %d\n", result.ID, result.Name, result.Age) // 查询所有数据 //var exas []ExampleModel //db.Find(&exas, "age > ? ", 25) //fmt.Println(exas) //批量保存 //e1 := ExampleModel{ID: 14, Name: "name4", Age: 31} //e2 := ExampleModel{ID: 15, Name: "name5", Age: 32} //e3 := ExampleModel{ID: 16, Name: "name6", Age: 33} //es := []ExampleModel{e1, e2, e3} //db.Create(&es) //更新数据 var result ExampleModel err = db.Model(&ExampleModel{}).Find(&result, "name = ? and age = ?", "wangchengcheng", 32).Error if err != nil { log.Println("find err", err) } db.Model(&result).Select("name", "age", "update_time").Updates(ExampleModel{Name: "wwwwww", Age: 333, UpdateTime: time.Now()}) }