package main import ( "fmt" _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "gorm.io/driver/mysql" "gorm.io/gorm" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strconv" "sync" "time" ) type EnterpriseBaseInfo struct { ID int `json:"id"` Name string `json:"name"` NameID string `json:"name_id"` CompanyID string `json:"company_id"` Address string `json:"address"` AreaCode string `json:"area_code"` CityCode string `json:"city_code"` DistrictCode string `json:"district_code"` IdentityType []uint8 `json:"identity_type"` Createtime time.Time `json:"createtime"` Updatetime time.Time `json:"updatetime"` LatestTime time.Time `json:"latest_time"` SEOID string `json:"seo_id"` EnterpriseType string `json:"enterprise_type"` } func (EnterpriseBaseInfo) TableName() string { return "dws_f_ent_baseinfo" } type WccBeijing struct { ProjectID string `json:"project_id"` CompanyID string `json:"company_id"` CreditNo *string `json:"credit_no,omitempty"` CompanyName *string `json:"company_name,omitempty"` RegisterArea *string `json:"register_area,omitempty"` CompanyType string `json:"company_type"` IndustryName *string `json:"industry_name,omitempty"` BidStatus *string `json:"bidstatus,omitempty"` Jgtime int32 `json:"jgtime"` BidYear *string `json:"bid_year,omitempty"` BidQuarter *string `json:"bid_quarter,omitempty"` BidMonth *string `json:"bid_month,omitempty"` Area *string `json:"area,omitempty"` City *string `json:"city,omitempty"` District *string `json:"district,omitempty"` Buyer *string `json:"buyer,omitempty"` BuyerClass *string `json:"buyerclass,omitempty"` Topscopeclass *string `json:"topscopeclass,omitempty"` Budget *float32 `json:"budget,omitempty"` BidAmount *float32 `json:"bidamount,omitempty"` } // click clickhouse 北京-京津翼长三角数据 func click() { //top.wcc_beijing Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "top", Size: 10, //Direct: true, } Mgo.InitPool() //181 Mgo2 := &mongodb.MongodbSim{ MongodbAddr: "172.17.4.181:27001", //MongodbAddr: "127.0.0.1:27001", DbName: "mixdata", Size: 10, UserName: "", Password: "", //Direct: true, } Mgo2.InitPool() //mixdata.qyxy_std MgoS := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27083", MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", DbName: "mixdata", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoS.InitPool() //tidb username := "datascbi" password := "Da#Bi20221111SC" //host := "127.0.0.1:4001" host := "172.17.162.25:4000" database := "global_common_data" dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database) // 连接到数据库 db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { log.Println("Failed to connect to database:", err) return } fmt.Println("Connected to the database!", db) //1. sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) query := sess.DB("top").C("wcc_allcity_2025Q2").Find(nil).Select(map[string]interface{}{"list": 0}).Iter() count := 0 ch := make(chan bool, 20) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current ---", count, tmp["s_winner"]) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //处理数据 beijing := WccBeijing{ ProjectID: util.ObjToString(tmp["id"]), } companyName := util.ObjToString(tmp["s_winner"]) if companyName == "" { return } beijing.CompanyName = &companyName if bidstatus, ok := tmp["bidstatus"].(string); ok { beijing.BidStatus = &bidstatus } beijing.Jgtime = int32(util.IntAll(tmp["jgtime"])) if util.IntAll(tmp["jgtime"]) > 0 { // 转换为 time.Time 对象 t := time.Unix(util.Int64All(tmp["jgtime"]), 0) // 获取年份、月份、季度 year := strconv.Itoa(t.Year()) monthStringTwoDigits := year + fmt.Sprintf("%02d", int(t.Month())) quarter := year + "Q" + strconv.Itoa((int(t.Month())-1)/3+1) beijing.BidYear = &year beijing.BidMonth = &monthStringTwoDigits beijing.BidQuarter = &quarter } area := util.ObjToString(tmp["area"]) beijing.Area = &area city := util.ObjToString(tmp["city"]) beijing.City = &city district := util.ObjToString(tmp["district"]) beijing.District = &district buyer := util.ObjToString(tmp["buyer"]) beijing.Buyer = &buyer buyerclass := util.ObjToString(tmp["buyerclass"]) beijing.BuyerClass = &buyerclass if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok { topclass := util.ObjToString(topscopeclass[0]) beijing.Topscopeclass = &topclass } budget := float32(util.Float64All(tmp["budget"])) beijing.Budget = &budget bidamount := float32(util.Float64All(tmp["bidamount"])) beijing.BidAmount = &bidamount data, _ := MgoS.FindOne("qyxy_std", map[string]interface{}{"company_name": companyName}) credit_no := util.ObjToString((*data)["credit_no"]) beijing.CreditNo = &credit_no companyID := util.ObjToString((*data)["_id"]) beijing.CompanyID = companyID companyArea := util.ObjToString((*data)["company_area"]) beijing.RegisterArea = &companyArea // var baseInfo EnterpriseBaseInfo db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo) companyType := baseInfo.EnterpriseType beijing.CompanyType = companyType // indusWhere := map[string]interface{}{ "company_id": companyID, } indus, _ := Mgo2.Find("company_industry", &indusWhere, nil, nil, false, -1, -1) if len(*indus) > 0 { indusName := util.ObjToString((*indus)[0]["industry_l1_name"]) beijing.IndustryName = &indusName } _, err := g.DB().Insert(gctx.New(), "wcc_beijing_2024Q1", beijing) if err != nil { log.Println("clickhouse 写入失败;", err, companyName, companyID) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("over") // 替换为你的 MySQL 数据库连接信息 //dsn := "user:password@tcp(localhost:3306)/database?charset=utf8mb4&parseTime=True&loc=Local" //dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database) //// 连接到数据库 //db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) //if err != nil { // fmt.Println("Failed to connect to database:", err) // return //} // //fmt.Println(db) //res, err := g.DB().Query(gctx.New(), `SELECT * FROM seo_siteKeywords_splicing order by site_code asc`) //if err == nil && len(res.List()) > 0 { // for _, m := range res.List() { // log.Println(m) // } //} }