clickhouse.go 8.8 KB


  1. package main
  2. import (
  3. "fmt"
  4. _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gctx"
  7. "gorm.io/driver/clickhouse"
  8. "gorm.io/driver/mysql"
  9. "gorm.io/gorm"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "strconv"
  14. "sync"
  15. "time"
  16. )
  17. // ExampleModel 定义数据模型
  18. type ExampleModel struct {
  19. ID uint `gorm:"column:id"`
  20. Name string `gorm:"column:name"`
  21. Age uint `gorm:"column:age"`
  22. CreateTime time.Time `gorm:"create_time"`
  23. UpdateTime time.Time `gorm:"update_time"`
  24. }
  25. type EnterpriseBaseInfo struct {
  26. ID int `json:"id"`
  27. Name string `json:"name"`
  28. NameID string `json:"name_id"`
  29. CompanyID string `json:"company_id"`
  30. Address string `json:"address"`
  31. AreaCode string `json:"area_code"`
  32. CityCode string `json:"city_code"`
  33. DistrictCode string `json:"district_code"`
  34. IdentityType []uint8 `json:"identity_type"`
  35. Createtime time.Time `json:"createtime"`
  36. Updatetime time.Time `json:"updatetime"`
  37. LatestTime time.Time `json:"latest_time"`
  38. SEOID string `json:"seo_id"`
  39. EnterpriseType string `json:"enterprise_type"`
  40. }
  41. func (EnterpriseBaseInfo) TableName() string {
  42. return "dws_f_ent_baseinfo"
  43. }
  44. type WccBeijing struct {
  45. ProjectID string `json:"project_id"`
  46. CompanyID string `json:"company_id"`
  47. CreditNo *string `json:"credit_no,omitempty"`
  48. CompanyName *string `json:"company_name,omitempty"`
  49. RegisterArea *string `json:"register_area,omitempty"`
  50. CompanyType string `json:"company_type"`
  51. IndustryName *string `json:"industry_name,omitempty"`
  52. BidStatus *string `json:"bidstatus,omitempty"`
  53. Jgtime int32 `json:"jgtime"`
  54. BidYear *string `json:"bid_year,omitempty"`
  55. BidQuarter *string `json:"bid_quarter,omitempty"`
  56. BidMonth *string `json:"bid_month,omitempty"`
  57. Area *string `json:"area,omitempty"`
  58. City *string `json:"city,omitempty"`
  59. District *string `json:"district,omitempty"`
  60. Buyer *string `json:"buyer,omitempty"`
  61. BuyerClass *string `json:"buyerclass,omitempty"`
  62. Topscopeclass *string `json:"topscopeclass,omitempty"`
  63. Budget *float32 `json:"budget,omitempty"`
  64. BidAmount *float32 `json:"bidamount,omitempty"`
  65. }
  66. // click clickhouse 北京-京津翼长三角数据
  67. func click() {
  68. //top.wcc_beijing
  69. Mgo := &mongodb.MongodbSim{
  70. //MongodbAddr: "127.0.0.1:27080",
  71. MongodbAddr: "172.17.4.85:27080",
  72. DbName: "top",
  73. Size: 10,
  74. //Direct: true,
  75. }
  76. Mgo.InitPool()
  77. //181
  78. Mgo2 := &mongodb.MongodbSim{
  79. MongodbAddr: "172.17.4.181:27001",
  80. //MongodbAddr: "127.0.0.1:27001",
  81. DbName: "mixdata",
  82. Size: 10,
  83. UserName: "",
  84. Password: "",
  85. //Direct: true,
  86. }
  87. Mgo2.InitPool()
  88. //mixdata.qyxy_std
  89. MgoS := &mongodb.MongodbSim{
  90. //MongodbAddr: "127.0.0.1:27083",
  91. MongodbAddr: "172.17.189.140:27080",
  92. DbName: "mixdata",
  93. Size: 10,
  94. UserName: "SJZY_RWbid_ES",
  95. Password: "SJZY@B4i4D5e6S",
  96. //Direct: true,
  97. }
  98. MgoS.InitPool()
  99. //tidb
  100. username := "datascbi"
  101. password := "Da#Bi20221111SC"
  102. //host := "127.0.0.1:4001"
  103. host := "172.17.4.242:4000"
  104. database := "global_common_data"
  105. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  106. // 连接到数据库
  107. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  108. if err != nil {
  109. log.Println("Failed to connect to database:", err)
  110. return
  111. }
  112. fmt.Println("Connected to the database!")
  113. //1.
  114. sess := Mgo.GetMgoConn()
  115. defer Mgo.DestoryMongoConn(sess)
  116. query := sess.DB("top").C("wcc_allcity_2024Q1").Find(nil).Select(map[string]interface{}{"list": 0}).Iter()
  117. count := 0
  118. ch := make(chan bool, 10)
  119. wg := &sync.WaitGroup{}
  120. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  121. if count%1000 == 0 {
  122. log.Println("current ---", count, tmp["s_winner"])
  123. }
  124. ch <- true
  125. wg.Add(1)
  126. go func(tmp map[string]interface{}) {
  127. defer func() {
  128. <-ch
  129. wg.Done()
  130. }()
  131. //处理数据
  132. beijing := WccBeijing{
  133. ProjectID: util.ObjToString(tmp["id"]),
  134. }
  135. companyName := util.ObjToString(tmp["s_winner"])
  136. if companyName == "" {
  137. return
  138. }
  139. beijing.CompanyName = &companyName
  140. if bidstatus, ok := tmp["bidstatus"].(string); ok {
  141. beijing.BidStatus = &bidstatus
  142. }
  143. beijing.Jgtime = int32(util.IntAll(tmp["jgtime"]))
  144. if util.IntAll(tmp["jgtime"]) > 0 {
  145. // 转换为 time.Time 对象
  146. t := time.Unix(util.Int64All(tmp["jgtime"]), 0)
  147. // 获取年份、月份、季度
  148. year := strconv.Itoa(t.Year())
  149. monthStringTwoDigits := year + fmt.Sprintf("%02d", int(t.Month()))
  150. quarter := year + "Q" + strconv.Itoa((int(t.Month())-1)/3+1)
  151. beijing.BidYear = &year
  152. beijing.BidMonth = &monthStringTwoDigits
  153. beijing.BidQuarter = &quarter
  154. }
  155. area := util.ObjToString(tmp["area"])
  156. beijing.Area = &area
  157. city := util.ObjToString(tmp["city"])
  158. beijing.City = &city
  159. district := util.ObjToString(tmp["district"])
  160. beijing.District = &district
  161. buyer := util.ObjToString(tmp["buyer"])
  162. beijing.Buyer = &buyer
  163. buyerclass := util.ObjToString(tmp["buyerclass"])
  164. beijing.BuyerClass = &buyerclass
  165. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  166. topclass := util.ObjToString(topscopeclass[0])
  167. beijing.Topscopeclass = &topclass
  168. }
  169. budget := float32(util.Float64All(tmp["budget"]))
  170. beijing.Budget = &budget
  171. bidamount := float32(util.Float64All(tmp["bidamount"]))
  172. beijing.BidAmount = &bidamount
  173. data, _ := MgoS.FindOne("qyxy_std", map[string]interface{}{"company_name": companyName})
  174. credit_no := util.ObjToString((*data)["credit_no"])
  175. beijing.CreditNo = &credit_no
  176. companyID := util.ObjToString((*data)["_id"])
  177. beijing.CompanyID = companyID
  178. companyArea := util.ObjToString((*data)["company_area"])
  179. beijing.RegisterArea = &companyArea
  180. //
  181. var baseInfo EnterpriseBaseInfo
  182. db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo)
  183. companyType := baseInfo.EnterpriseType
  184. beijing.CompanyType = companyType
  185. //
  186. indusWhere := map[string]interface{}{
  187. "company_id": companyID,
  188. }
  189. indus, _ := Mgo2.Find("company_industry", &indusWhere, nil, nil, false, -1, -1)
  190. if len(*indus) > 0 {
  191. indusName := util.ObjToString((*indus)[0]["industry_l1_name"])
  192. beijing.IndustryName = &indusName
  193. }
  194. _, err := g.DB().Insert(gctx.New(), "wcc_allcity_2024Q1", beijing)
  195. if err != nil {
  196. log.Println("clickhouse 写入失败;", err, companyName, companyID)
  197. }
  198. }(tmp)
  199. tmp = make(map[string]interface{})
  200. }
  201. wg.Wait()
  202. log.Println("over")
  203. // 替换为你的 MySQL 数据库连接信息
  204. //dsn := "user:password@tcp(localhost:3306)/database?charset=utf8mb4&parseTime=True&loc=Local"
  205. //dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  206. //// 连接到数据库
  207. //db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  208. //if err != nil {
  209. // fmt.Println("Failed to connect to database:", err)
  210. // return
  211. //}
  212. //
  213. //fmt.Println(db)
  214. //res, err := g.DB().Query(gctx.New(), `SELECT * FROM seo_siteKeywords_splicing order by site_code asc`)
  215. //if err == nil && len(res.List()) > 0 {
  216. // for _, m := range res.List() {
  217. // log.Println(m)
  218. // }
  219. //}
  220. }
  221. // click2 gorm 操作clickhouse
  222. func click2() {
  223. //无密码连接
  224. //dsn := "clickhouse://localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  225. dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  226. db, err := gorm.Open(clickhouse.Open(dsn), &gorm.Config{})
  227. if err != nil {
  228. log.Printf("打开数据库失败:%v", err)
  229. }
  230. //// 创建数据表
  231. //if err := db.AutoMigrate(&ExampleModel{}); err != nil {
  232. // log.Fatal(err)
  233. //}
  234. // 写入数据
  235. //example := &ExampleModel{ID: 33, Name: "wangchengcheng", Age: 32, CreateTime: time.Now(), UpdateTime: time.Now()}
  236. //if err := db.Create(example).Error; err != nil {
  237. // log.Fatal(err)
  238. //}
  239. // 查询单个数据
  240. //var result ExampleModel
  241. //if err := db.First(&result, "id = ?", 1).Error; err != nil {
  242. // log.Fatal(err)
  243. //}
  244. //fmt.Printf("ID: %d, Name: %s, Age: %d\n", result.ID, result.Name, result.Age)
  245. // 查询所有数据
  246. //var exas []ExampleModel
  247. //db.Find(&exas, "age > ? ", 25)
  248. //fmt.Println(exas)
  249. //批量保存
  250. //e1 := ExampleModel{ID: 14, Name: "name4", Age: 31}
  251. //e2 := ExampleModel{ID: 15, Name: "name5", Age: 32}
  252. //e3 := ExampleModel{ID: 16, Name: "name6", Age: 33}
  253. //es := []ExampleModel{e1, e2, e3}
  254. //db.Create(&es)
  255. //更新数据
  256. var result ExampleModel
  257. err = db.Model(&ExampleModel{}).Find(&result, "name = ? and age = ?", "wangchengcheng", 32).Error
  258. if err != nil {
  259. log.Println("find err", err)
  260. }
  261. db.Model(&result).Select("name", "age", "update_time").Updates(ExampleModel{Name: "wwwwww", Age: 333, UpdateTime: time.Now()})
  262. }