clickhouse.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. ckgo "github.com/ClickHouse/clickhouse-go/v2"
  6. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  7. _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
  8. "github.com/gogf/gf/v2/frame/g"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "github.com/xuri/excelize/v2"
  11. "gorm.io/driver/clickhouse"
  12. "gorm.io/driver/mysql"
  13. "gorm.io/gorm"
  14. "gorm.io/gorm/logger"
  15. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  16. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  17. "log"
  18. "net/url"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. // ExampleModel 定义数据模型
  25. type ExampleModel struct {
  26. ID uint `gorm:"column:id"`
  27. Name string `gorm:"column:name"`
  28. Age uint `gorm:"column:age"`
  29. CreateTime time.Time `gorm:"create_time"`
  30. UpdateTime time.Time `gorm:"update_time"`
  31. }
  32. type EnterpriseBaseInfo struct {
  33. ID int `json:"id"`
  34. Name string `json:"name"`
  35. NameID string `json:"name_id"`
  36. CompanyID string `json:"company_id"`
  37. Address string `json:"address"`
  38. AreaCode string `json:"area_code"`
  39. CityCode string `json:"city_code"`
  40. DistrictCode string `json:"district_code"`
  41. IdentityType []uint8 `json:"identity_type"`
  42. Createtime time.Time `json:"createtime"`
  43. Updatetime time.Time `json:"updatetime"`
  44. LatestTime time.Time `json:"latest_time"`
  45. SEOID string `json:"seo_id"`
  46. EnterpriseType string `json:"enterprise_type"`
  47. }
  48. func (EnterpriseBaseInfo) TableName() string {
  49. return "dws_f_ent_baseinfo"
  50. }
  51. type WccBeijing struct {
  52. ProjectID string `json:"project_id"`
  53. CompanyID string `json:"company_id"`
  54. CreditNo *string `json:"credit_no,omitempty"`
  55. CompanyName *string `json:"company_name,omitempty"`
  56. RegisterArea *string `json:"register_area,omitempty"`
  57. CompanyType string `json:"company_type"`
  58. IndustryName *string `json:"industry_name,omitempty"`
  59. BidStatus *string `json:"bidstatus,omitempty"`
  60. Jgtime int32 `json:"jgtime"`
  61. BidYear *string `json:"bid_year,omitempty"`
  62. BidQuarter *string `json:"bid_quarter,omitempty"`
  63. BidMonth *string `json:"bid_month,omitempty"`
  64. Area *string `json:"area,omitempty"`
  65. City *string `json:"city,omitempty"`
  66. District *string `json:"district,omitempty"`
  67. Buyer *string `json:"buyer,omitempty"`
  68. BuyerClass *string `json:"buyerclass,omitempty"`
  69. Topscopeclass *string `json:"topscopeclass,omitempty"`
  70. Budget *float32 `json:"budget,omitempty"`
  71. BidAmount *float32 `json:"bidamount,omitempty"`
  72. }
  73. // click clickhouse 北京-京津翼长三角数据
  74. func click() {
  75. //top.wcc_beijing
  76. Mgo := &mongodb.MongodbSim{
  77. //MongodbAddr: "127.0.0.1:27080",
  78. MongodbAddr: "172.17.4.85:27080",
  79. DbName: "top",
  80. Size: 10,
  81. //Direct: true,
  82. }
  83. Mgo.InitPool()
  84. //181
  85. Mgo2 := &mongodb.MongodbSim{
  86. MongodbAddr: "172.17.4.181:27001",
  87. //MongodbAddr: "127.0.0.1:27001",
  88. DbName: "mixdata",
  89. Size: 10,
  90. UserName: "",
  91. Password: "",
  92. //Direct: true,
  93. }
  94. Mgo2.InitPool()
  95. //mixdata.qyxy_std
  96. MgoS := &mongodb.MongodbSim{
  97. //MongodbAddr: "127.0.0.1:27083",
  98. MongodbAddr: "172.17.189.140:27080",
  99. DbName: "mixdata",
  100. Size: 10,
  101. UserName: "SJZY_RWbid_ES",
  102. Password: "SJZY@B4i4D5e6S",
  103. //Direct: true,
  104. }
  105. MgoS.InitPool()
  106. //tidb
  107. username := "datascbi"
  108. password := "Da#Bi20221111SC"
  109. //host := "127.0.0.1:4001"
  110. host := "172.17.162.25:4000"
  111. database := "global_common_data"
  112. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  113. // 连接到数据库
  114. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  115. if err != nil {
  116. log.Println("Failed to connect to database:", err)
  117. return
  118. }
  119. fmt.Println("Connected to the database!")
  120. //1.
  121. sess := Mgo.GetMgoConn()
  122. defer Mgo.DestoryMongoConn(sess)
  123. query := sess.DB("top").C("wcc_allcity_2024Q2").Find(nil).Select(map[string]interface{}{"list": 0}).Iter()
  124. count := 0
  125. ch := make(chan bool, 10)
  126. wg := &sync.WaitGroup{}
  127. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  128. if count%1000 == 0 {
  129. log.Println("current ---", count, tmp["s_winner"])
  130. }
  131. ch <- true
  132. wg.Add(1)
  133. go func(tmp map[string]interface{}) {
  134. defer func() {
  135. <-ch
  136. wg.Done()
  137. }()
  138. //处理数据
  139. beijing := WccBeijing{
  140. ProjectID: util.ObjToString(tmp["id"]),
  141. }
  142. companyName := util.ObjToString(tmp["s_winner"])
  143. if companyName == "" {
  144. return
  145. }
  146. beijing.CompanyName = &companyName
  147. if bidstatus, ok := tmp["bidstatus"].(string); ok {
  148. beijing.BidStatus = &bidstatus
  149. }
  150. beijing.Jgtime = int32(util.IntAll(tmp["jgtime"]))
  151. if util.IntAll(tmp["jgtime"]) > 0 {
  152. // 转换为 time.Time 对象
  153. t := time.Unix(util.Int64All(tmp["jgtime"]), 0)
  154. // 获取年份、月份、季度
  155. year := strconv.Itoa(t.Year())
  156. monthStringTwoDigits := year + fmt.Sprintf("%02d", int(t.Month()))
  157. quarter := year + "Q" + strconv.Itoa((int(t.Month())-1)/3+1)
  158. beijing.BidYear = &year
  159. beijing.BidMonth = &monthStringTwoDigits
  160. beijing.BidQuarter = &quarter
  161. }
  162. area := util.ObjToString(tmp["area"])
  163. beijing.Area = &area
  164. city := util.ObjToString(tmp["city"])
  165. beijing.City = &city
  166. district := util.ObjToString(tmp["district"])
  167. beijing.District = &district
  168. buyer := util.ObjToString(tmp["buyer"])
  169. beijing.Buyer = &buyer
  170. buyerclass := util.ObjToString(tmp["buyerclass"])
  171. beijing.BuyerClass = &buyerclass
  172. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  173. topclass := util.ObjToString(topscopeclass[0])
  174. beijing.Topscopeclass = &topclass
  175. }
  176. budget := float32(util.Float64All(tmp["budget"]))
  177. beijing.Budget = &budget
  178. bidamount := float32(util.Float64All(tmp["bidamount"]))
  179. beijing.BidAmount = &bidamount
  180. data, _ := MgoS.FindOne("qyxy_std", map[string]interface{}{"company_name": companyName})
  181. credit_no := util.ObjToString((*data)["credit_no"])
  182. beijing.CreditNo = &credit_no
  183. companyID := util.ObjToString((*data)["_id"])
  184. beijing.CompanyID = companyID
  185. companyArea := util.ObjToString((*data)["company_area"])
  186. beijing.RegisterArea = &companyArea
  187. //
  188. var baseInfo EnterpriseBaseInfo
  189. db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo)
  190. companyType := baseInfo.EnterpriseType
  191. beijing.CompanyType = companyType
  192. //
  193. indusWhere := map[string]interface{}{
  194. "company_id": companyID,
  195. }
  196. indus, _ := Mgo2.Find("company_industry", &indusWhere, nil, nil, false, -1, -1)
  197. if len(*indus) > 0 {
  198. indusName := util.ObjToString((*indus)[0]["industry_l1_name"])
  199. beijing.IndustryName = &indusName
  200. }
  201. _, err := g.DB().Insert(gctx.New(), "wcc_allcity_2024Q2", beijing)
  202. if err != nil {
  203. log.Println("clickhouse 写入失败;", err, companyName, companyID)
  204. }
  205. }(tmp)
  206. tmp = make(map[string]interface{})
  207. }
  208. wg.Wait()
  209. log.Println("over")
  210. // 替换为你的 MySQL 数据库连接信息
  211. //dsn := "user:password@tcp(localhost:3306)/database?charset=utf8mb4&parseTime=True&loc=Local"
  212. //dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  213. //// 连接到数据库
  214. //db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  215. //if err != nil {
  216. // fmt.Println("Failed to connect to database:", err)
  217. // return
  218. //}
  219. //
  220. //fmt.Println(db)
  221. //res, err := g.DB().Query(gctx.New(), `SELECT * FROM seo_siteKeywords_splicing order by site_code asc`)
  222. //if err == nil && len(res.List()) > 0 {
  223. // for _, m := range res.List() {
  224. // log.Println(m)
  225. // }
  226. //}
  227. }
  228. // click2 gorm 操作clickhouse
  229. func click2() {
  230. //无密码连接
  231. //dsn := "clickhouse://localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  232. dsn := "clickhouse://wcc:123@localhost:9090/test?dial_timeout=10s&read_timeout=20s"
  233. db, err := gorm.Open(clickhouse.Open(dsn), &gorm.Config{})
  234. if err != nil {
  235. log.Printf("打开数据库失败:%v", err)
  236. }
  237. //// 创建数据表
  238. //if err := db.AutoMigrate(&ExampleModel{}); err != nil {
  239. // log.Fatal(err)
  240. //}
  241. // 写入数据
  242. //example := &ExampleModel{ID: 33, Name: "wangchengcheng", Age: 32, CreateTime: time.Now(), UpdateTime: time.Now()}
  243. //if err := db.Create(example).Error; err != nil {
  244. // log.Fatal(err)
  245. //}
  246. // 查询单个数据
  247. //var result ExampleModel
  248. //if err := db.First(&result, "id = ?", 1).Error; err != nil {
  249. // log.Fatal(err)
  250. //}
  251. //fmt.Printf("ID: %d, Name: %s, Age: %d\n", result.ID, result.Name, result.Age)
  252. // 查询所有数据
  253. //var exas []ExampleModel
  254. //db.Find(&exas, "age > ? ", 25)
  255. //fmt.Println(exas)
  256. //批量保存
  257. //e1 := ExampleModel{ID: 14, Name: "name4", Age: 31}
  258. //e2 := ExampleModel{ID: 15, Name: "name5", Age: 32}
  259. //e3 := ExampleModel{ID: 16, Name: "name6", Age: 33}
  260. //es := []ExampleModel{e1, e2, e3}
  261. //db.Create(&es)
  262. //更新数据
  263. var result ExampleModel
  264. err = db.Model(&ExampleModel{}).Find(&result, "name = ? and age = ?", "wangchengcheng", 32).Error
  265. if err != nil {
  266. log.Println("find err", err)
  267. }
  268. db.Model(&result).Select("name", "age", "update_time").Updates(ExampleModel{Name: "wwwwww", Age: 333, UpdateTime: time.Now()})
  269. }
  270. // ClickhouseData 处理数据到Clickhouse
  271. func ClickhouseData() {
  272. //正式环境
  273. host := "cc-2ze9tv451wov14w9e.clickhouse.ads.aliyuncs.com:9000"
  274. username := "biservice"
  275. password := "Bi_top95215#"
  276. //-----------------------------//
  277. // 测试环境
  278. //host := "192.168.3.207:19000"
  279. //username := "jytop"
  280. //password := "pwdTopJy123"
  281. ////
  282. encodedPassword := url.QueryEscape(password)
  283. dn := fmt.Sprintf("clickhouse://%s:%s@%s/pub_tags?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  284. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  285. Logger: logger.Default.LogMode(logger.Silent),
  286. })
  287. if err != nil {
  288. log.Fatal("打开数据库失败:", err)
  289. } else {
  290. log.Println("连接数据库成功", db.Name())
  291. }
  292. f, err := excelize.OpenFile("./主体代码表2.xlsx")
  293. if err != nil {
  294. fmt.Println(err)
  295. return
  296. }
  297. defer func() {
  298. f.Save()
  299. if err := f.Close(); err != nil {
  300. fmt.Println(err)
  301. }
  302. }()
  303. }
  304. // connectClickhouse 连接Clickhouse,其它方式连接;bitmap字段无法处理,需要使用下面方法
  305. func connectClickhouse(host, username, password, dbname string) (driver.Conn, error) {
  306. //host := "192.168.3.207:19000"
  307. //username := "jytop"
  308. //password := "pwdTopJy123"
  309. //dbname := "pub_tags"
  310. var (
  311. ctx = context.Background()
  312. conn, err = ckgo.Open(&ckgo.Options{
  313. Addr: []string{host},
  314. DialTimeout: 10 * time.Second,
  315. MaxIdleConns: 3,
  316. MaxOpenConns: 30,
  317. Auth: ckgo.Auth{
  318. Database: dbname,
  319. Username: username,
  320. Password: password,
  321. },
  322. Debugf: func(format string, v ...interface{}) {
  323. log.Println(format, v)
  324. },
  325. })
  326. )
  327. if err != nil {
  328. return nil, err
  329. }
  330. if err := conn.Ping(ctx); err != nil {
  331. if exception, ok := err.(*ckgo.Exception); ok {
  332. log.Println("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
  333. }
  334. return nil, err
  335. }
  336. return conn, nil
  337. }
  338. // dealClickhouse 处理法人库标签数组bitmap到Clickhouse
  339. func dealClickhouse() {
  340. //正式环境
  341. host := "cc-2ze9tv451wov14w9e.clickhouse.ads.aliyuncs.com:9000"
  342. username := "biservice"
  343. password := "Bi_top95215#"
  344. // 测试环境
  345. //host := "192.168.3.207:19000"
  346. //username := "jytop"
  347. //password := "pwdTopJy123"
  348. //-----//
  349. dbname := "pub_tags"
  350. ClickHouseConn, _ := connectClickhouse(host, username, password, dbname)
  351. //
  352. sql := `
  353. INSERT INTO pub_tags.dwd_d_tag (id, code, name, bitobj, groupid, created_at, created_by, bit_num)
  354. VALUES (?, ?, ?, bitmapBuild(?), ?, ?, ?, ?)
  355. `
  356. bitMapCodeArr := []uint64{}
  357. if len(bitMapCodeArr) == 0 {
  358. bitMapCodeArr = []uint64{uint64(0)}
  359. }
  360. f, err := excelize.OpenFile("./主体代码表2.xlsx")
  361. if err != nil {
  362. fmt.Println(err)
  363. return
  364. }
  365. defer func() {
  366. f.Save()
  367. if err := f.Close(); err != nil {
  368. fmt.Println(err)
  369. }
  370. }()
  371. //2.专项债详情
  372. rows, err := f.GetRows("法人库标签")
  373. if err != nil {
  374. fmt.Println(err)
  375. return
  376. }
  377. for i := 1; i < len(rows); i++ {
  378. if i%100 == 0 {
  379. log.Println("iiiii", i, rows[i][1])
  380. }
  381. //插入数据
  382. err = ClickHouseConn.Exec(context.Background(), sql,
  383. util.Int64All(rows[i][3]), // ID
  384. "", // Code
  385. strings.TrimSpace(rows[i][1]), // Name
  386. bitMapCodeArr, // 初始化 bitobj 的 UInt64 值
  387. util.Int64All(rows[i][0]), // GroupID
  388. time.Now(), // CreatedAt
  389. "admin", // CreatedBy
  390. util.Int64All(rows[i][2]), // BitNum
  391. )
  392. if err != nil {
  393. log.Println(err)
  394. }
  395. }
  396. log.Println("数据处理完毕")
  397. }
  398. func testUpdateBitmap() {
  399. // 测试环境
  400. host := "192.168.3.207:19000"
  401. username := "jytop"
  402. password := "pwdTopJy123"
  403. //dbname := "pub_tags"
  404. dbname := "information"
  405. ClickHouseConn, _ := connectClickhouse(host, username, password, dbname)
  406. rows, err := ClickHouseConn.Query(context.Background(), "SELECT company_name,bitmapToArray(company_label) labels FROM ent_info where company_name = '长春市建工建设监理有限公司'")
  407. if err != nil {
  408. log.Fatal(err)
  409. }
  410. for rows.Next() {
  411. var companyName string
  412. var companyLabels = make([]uint64, 0)
  413. var addLabels = make([]uint64, 0)
  414. if err := rows.Scan(&companyName, &companyLabels); err != nil {
  415. log.Println("eerrr", err)
  416. }
  417. addLabels = append(addLabels, uint64(13), uint64(14), uint64(15))
  418. log.Println(companyName, companyLabels)
  419. // 构建 toUInt64 数组字符串
  420. toUInt64Array := buildToUInt64Array(addLabels)
  421. // SQL 动态生成
  422. sql := fmt.Sprintf(`
  423. ALTER TABLE information.ent_info
  424. UPDATE company_label = bitmapOr(company_label, bitmapBuild(%s))
  425. WHERE company_name = ?
  426. `, toUInt64Array)
  427. log.Println(sql)
  428. err = ClickHouseConn.Exec(context.Background(), sql, companyName)
  429. if err != nil {
  430. fmt.Printf("SQL execution failed: %v\n", err)
  431. } else {
  432. fmt.Println("Update succeeded!")
  433. }
  434. }
  435. /**
  436. alter table message_user_summary UPDATE readMsg = bitmapOr(readMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'
  437. */
  438. // sql := `
  439. //alter table pub_tags.dwd_d_tag_wcc UPDATE bitobj = bitmapOr(readMsg,bitmapBuild(%v) where name = '通用仓储'
  440. //`
  441. }
  442. // 动态构建 toUInt64 数组字符串
  443. func buildToUInt64Array(labels []uint64) string {
  444. if len(labels) == 0 {
  445. return "[]"
  446. }
  447. toUInt64Labels := make([]string, len(labels))
  448. for i, label := range labels {
  449. toUInt64Labels[i] = fmt.Sprintf("toUInt64(%d)", label)
  450. }
  451. return fmt.Sprintf("[%s]", strings.Join(toUInt64Labels, ", "))
  452. }