clickhouse.go 8.4 KB


  1. package main
  2. import (
  3. "fmt"
  4. _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gctx"
  7. "gorm.io/driver/clickhouse"
  8. "gorm.io/driver/mysql"
  9. "gorm.io/gorm"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "strconv"
  14. "sync"
  15. "time"
  16. )
  17. // ExampleModel 定义数据模型
  18. type ExampleModel struct {
  19. ID uint `gorm:"column:id"`
  20. Name string `gorm:"column:name"`
  21. Age uint `gorm:"column:age"`
  22. }
  23. type EnterpriseBaseInfo struct {
  24. ID int `json:"id"`
  25. Name string `json:"name"`
  26. NameID string `json:"name_id"`
  27. CompanyID string `json:"company_id"`
  28. Address string `json:"address"`
  29. AreaCode string `json:"area_code"`
  30. CityCode string `json:"city_code"`
  31. DistrictCode string `json:"district_code"`
  32. IdentityType []uint8 `json:"identity_type"`
  33. Createtime time.Time `json:"createtime"`
  34. Updatetime time.Time `json:"updatetime"`
  35. LatestTime time.Time `json:"latest_time"`
  36. SEOID string `json:"seo_id"`
  37. EnterpriseType string `json:"enterprise_type"`
  38. }
  39. func (EnterpriseBaseInfo) TableName() string {
  40. return "dws_f_ent_baseinfo"
  41. }
  42. type WccBeijing struct {
  43. ProjectID string `json:"project_id"`
  44. CompanyID string `json:"company_id"`
  45. CreditNo *string `json:"credit_no,omitempty"`
  46. CompanyName *string `json:"company_name,omitempty"`
  47. RegisterArea *string `json:"register_area,omitempty"`
  48. CompanyType string `json:"company_type"`
  49. IndustryName *string `json:"industry_name,omitempty"`
  50. BidStatus *string `json:"bidstatus,omitempty"`
  51. Jgtime int32 `json:"jgtime"`
  52. BidYear *string `json:"bid_year,omitempty"`
  53. BidQuarter *string `json:"bid_quarter,omitempty"`
  54. BidMonth *string `json:"bid_month,omitempty"`
  55. Area *string `json:"area,omitempty"`
  56. City *string `json:"city,omitempty"`
  57. District *string `json:"district,omitempty"`
  58. Buyer *string `json:"buyer,omitempty"`
  59. BuyerClass *string `json:"buyerclass,omitempty"`
  60. Topscopeclass *string `json:"topscopeclass,omitempty"`
  61. Budget *float32 `json:"budget,omitempty"`
  62. BidAmount *float32 `json:"bidamount,omitempty"`
  63. }
  64. // click clickhouse 北京-京津翼长三角数据
  65. func click() {
  66. //top.wcc_beijing
  67. Mgo := &mongodb.MongodbSim{
  68. //MongodbAddr: "127.0.0.1:27080",
  69. MongodbAddr: "172.17.4.85:27080",
  70. DbName: "top",
  71. Size: 10,
  72. //Direct: true,
  73. }
  74. Mgo.InitPool()
  75. //181
  76. Mgo2 := &mongodb.MongodbSim{
  77. MongodbAddr: "172.17.4.181:27001",
  78. //MongodbAddr: "127.0.0.1:27001",
  79. DbName: "mixdata",
  80. Size: 10,
  81. UserName: "",
  82. Password: "",
  83. //Direct: true,
  84. }
  85. Mgo2.InitPool()
  86. //mixdata.qyxy_std
  87. MgoS := &mongodb.MongodbSim{
  88. //MongodbAddr: "127.0.0.1:27083",
  89. MongodbAddr: "172.17.189.140:27080",
  90. DbName: "mixdata",
  91. Size: 10,
  92. UserName: "SJZY_RWbid_ES",
  93. Password: "SJZY@B4i4D5e6S",
  94. //Direct: true,
  95. }
  96. MgoS.InitPool()
  97. //tidb
  98. username := "datascbi"
  99. password := "Da#Bi20221111SC"
  100. //host := "127.0.0.1:4001"
  101. host := "172.17.4.242:4000"
  102. database := "global_common_data"
  103. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  104. // 连接到数据库
  105. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  106. if err != nil {
  107. log.Println("Failed to connect to database:", err)
  108. return
  109. }
  110. fmt.Println("Connected to the database!")
  111. //1.
  112. sess := Mgo.GetMgoConn()
  113. defer Mgo.DestoryMongoConn(sess)
  114. query := sess.DB("top").C("wcc_beijing_0110").Find(nil).Select(map[string]interface{}{"list": 0}).Iter()
  115. count := 0
  116. ch := make(chan bool, 10)
  117. wg := &sync.WaitGroup{}
  118. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  119. if count%1000 == 0 {
  120. log.Println("current ---", count, tmp["s_winner"])
  121. }
  122. ch <- true
  123. wg.Add(1)
  124. go func(tmp map[string]interface{}) {
  125. defer func() {
  126. <-ch
  127. wg.Done()
  128. }()
  129. //处理数据
  130. beijing := WccBeijing{
  131. ProjectID: util.ObjToString(tmp["id"]),
  132. }
  133. companyName := util.ObjToString(tmp["s_winner"])
  134. if companyName == "" {
  135. return
  136. }
  137. beijing.CompanyName = &companyName
  138. if bidstatus, ok := tmp["bidstatus"].(string); ok {
  139. beijing.BidStatus = &bidstatus
  140. }
  141. beijing.Jgtime = int32(util.IntAll(tmp["jgtime"]))
  142. if util.IntAll(tmp["jgtime"]) > 0 {
  143. // 转换为 time.Time 对象
  144. t := time.Unix(util.Int64All(tmp["jgtime"]), 0)
  145. // 获取年份、月份、季度
  146. year := strconv.Itoa(t.Year())
  147. monthStringTwoDigits := year + fmt.Sprintf("%02d", int(t.Month()))
  148. quarter := year + "Q" + strconv.Itoa((int(t.Month())-1)/3+1)
  149. beijing.BidYear = &year
  150. beijing.BidMonth = &monthStringTwoDigits
  151. beijing.BidQuarter = &quarter
  152. }
  153. area := util.ObjToString(tmp["area"])
  154. beijing.Area = &area
  155. city := util.ObjToString(tmp["city"])
  156. beijing.City = &city
  157. district := util.ObjToString(tmp["district"])
  158. beijing.District = &district
  159. buyer := util.ObjToString(tmp["buyer"])
  160. beijing.Buyer = &buyer
  161. buyerclass := util.ObjToString(tmp["buyerclass"])
  162. beijing.BuyerClass = &buyerclass
  163. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  164. topclass := util.ObjToString(topscopeclass[0])
  165. beijing.Topscopeclass = &topclass
  166. }
  167. budget := float32(util.Float64All(tmp["budget"]))
  168. beijing.Budget = &budget
  169. bidamount := float32(util.Float64All(tmp["bidamount"]))
  170. beijing.BidAmount = &bidamount
  171. data, _ := MgoS.FindOne("qyxy_std", map[string]interface{}{"company_name": companyName})
  172. credit_no := util.ObjToString((*data)["credit_no"])
  173. beijing.CreditNo = &credit_no
  174. companyID := util.ObjToString((*data)["_id"])
  175. beijing.CompanyID = companyID
  176. companyArea := util.ObjToString((*data)["company_area"])
  177. beijing.RegisterArea = &companyArea
  178. //
  179. var baseInfo EnterpriseBaseInfo
  180. db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo)
  181. companyType := baseInfo.EnterpriseType
  182. beijing.CompanyType = companyType
  183. //
  184. indusWhere := map[string]interface{}{
  185. "company_id": companyID,
  186. }
  187. indus, _ := Mgo2.Find("company_industry", &indusWhere, nil, nil, false, -1, -1)
  188. if len(*indus) > 0 {
  189. indusName := util.ObjToString((*indus)[0]["industry_l1_name"])
  190. beijing.IndustryName = &indusName
  191. }
  192. _, err := g.DB().Insert(gctx.New(), "wcc_beijing_2013", beijing)
  193. if err != nil {
  194. log.Println("clickhouse 写入失败;", err, companyName, companyID)
  195. }
  196. }(tmp)
  197. tmp = make(map[string]interface{})
  198. }
  199. wg.Wait()
  200. log.Println("over")
  201. // 替换为你的 MySQL 数据库连接信息
  202. //dsn := "user:password@tcp(localhost:3306)/database?charset=utf8mb4&parseTime=True&loc=Local"
  203. //dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  204. //// 连接到数据库
  205. //db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  206. //if err != nil {
  207. // fmt.Println("Failed to connect to database:", err)
  208. // return
  209. //}
  210. //
  211. //fmt.Println(db)
  212. //res, err := g.DB().Query(gctx.New(), `SELECT * FROM seo_siteKeywords_splicing order by site_code asc`)
  213. //if err == nil && len(res.List()) > 0 {
  214. // for _, m := range res.List() {
  215. // log.Println(m)
  216. // }
  217. //}
  218. }
  219. // click2 gorm 操作clickhouse
  220. func click2() {
  221. //无密码连接
  222. //dsn := "clickhouse://localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  223. dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  224. db, err := gorm.Open(clickhouse.Open(dsn), &gorm.Config{})
  225. if err != nil {
  226. log.Printf("打开数据库失败:%v", err)
  227. }
  228. //// 创建数据表
  229. //if err := db.AutoMigrate(&ExampleModel{}); err != nil {
  230. // log.Fatal(err)
  231. //}
  232. // 写入数据
  233. //example := &ExampleModel{ID: 2, Name: "Wang Lei", Age: 30}
  234. //if err := db.Create(example).Error; err != nil {
  235. // log.Fatal(err)
  236. //}
  237. // 查询单个数据
  238. //var result ExampleModel
  239. //if err := db.First(&result, "id = ?", 1).Error; err != nil {
  240. // log.Fatal(err)
  241. //}
  242. //fmt.Printf("ID: %d, Name: %s, Age: %d\n", result.ID, result.Name, result.Age)
  243. // 查询所有数据
  244. //var exas []ExampleModel
  245. //db.Find(&exas, "age > ? ", 25)
  246. //fmt.Println(exas)
  247. //批量保存
  248. e1 := ExampleModel{ID: 4, Name: "name4", Age: 31}
  249. e2 := ExampleModel{ID: 5, Name: "name5", Age: 32}
  250. e3 := ExampleModel{ID: 6, Name: "name6", Age: 33}
  251. es := []ExampleModel{e1, e2, e3}
  252. db.Create(&es)
  253. }