clickhouse.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package main
  2. import (
  3. "fmt"
  4. _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gctx"
  7. "gorm.io/driver/mysql"
  8. "gorm.io/gorm"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "strconv"
  13. "sync"
  14. "time"
  15. )
  16. type EnterpriseBaseInfo struct {
  17. ID int `json:"id"`
  18. Name string `json:"name"`
  19. NameID string `json:"name_id"`
  20. CompanyID string `json:"company_id"`
  21. Address string `json:"address"`
  22. AreaCode string `json:"area_code"`
  23. CityCode string `json:"city_code"`
  24. DistrictCode string `json:"district_code"`
  25. IdentityType []uint8 `json:"identity_type"`
  26. Createtime time.Time `json:"createtime"`
  27. Updatetime time.Time `json:"updatetime"`
  28. LatestTime time.Time `json:"latest_time"`
  29. SEOID string `json:"seo_id"`
  30. EnterpriseType string `json:"enterprise_type"`
  31. }
  32. func (EnterpriseBaseInfo) TableName() string {
  33. return "dws_f_ent_baseinfo"
  34. }
  35. type WccBeijing struct {
  36. ProjectID string `json:"project_id"`
  37. CompanyID string `json:"company_id"`
  38. CreditNo *string `json:"credit_no,omitempty"`
  39. CompanyName *string `json:"company_name,omitempty"`
  40. RegisterArea *string `json:"register_area,omitempty"`
  41. CompanyType string `json:"company_type"`
  42. IndustryName *string `json:"industry_name,omitempty"`
  43. BidStatus *string `json:"bidstatus,omitempty"`
  44. Jgtime int32 `json:"jgtime"`
  45. BidYear *string `json:"bid_year,omitempty"`
  46. BidQuarter *string `json:"bid_quarter,omitempty"`
  47. BidMonth *string `json:"bid_month,omitempty"`
  48. Area *string `json:"area,omitempty"`
  49. City *string `json:"city,omitempty"`
  50. District *string `json:"district,omitempty"`
  51. Buyer *string `json:"buyer,omitempty"`
  52. BuyerClass *string `json:"buyerclass,omitempty"`
  53. Topscopeclass *string `json:"topscopeclass,omitempty"`
  54. Budget *float32 `json:"budget,omitempty"`
  55. BidAmount *float32 `json:"bidamount,omitempty"`
  56. }
  57. // click clickhouse 北京-京津翼长三角数据
  58. func click() {
  59. //top.wcc_beijing
  60. Mgo := &mongodb.MongodbSim{
  61. //MongodbAddr: "127.0.0.1:27080",
  62. MongodbAddr: "172.17.4.85:27080",
  63. DbName: "top",
  64. Size: 10,
  65. //Direct: true,
  66. }
  67. Mgo.InitPool()
  68. //181
  69. Mgo2 := &mongodb.MongodbSim{
  70. MongodbAddr: "172.17.4.181:27001",
  71. //MongodbAddr: "127.0.0.1:27001",
  72. DbName: "mixdata",
  73. Size: 10,
  74. UserName: "",
  75. Password: "",
  76. //Direct: true,
  77. }
  78. Mgo2.InitPool()
  79. //mixdata.qyxy_std
  80. MgoS := &mongodb.MongodbSim{
  81. //MongodbAddr: "127.0.0.1:27083",
  82. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  83. DbName: "mixdata",
  84. Size: 10,
  85. UserName: "SJZY_RWbid_ES",
  86. Password: "SJZY@B4i4D5e6S",
  87. //Direct: true,
  88. }
  89. MgoS.InitPool()
  90. //tidb
  91. username := "datascbi"
  92. password := "Da#Bi20221111SC"
  93. //host := "127.0.0.1:4001"
  94. host := "172.17.162.25:4000"
  95. database := "global_common_data"
  96. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  97. // 连接到数据库
  98. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  99. if err != nil {
  100. log.Println("Failed to connect to database:", err)
  101. return
  102. }
  103. fmt.Println("Connected to the database!", db)
  104. //1.
  105. sess := Mgo.GetMgoConn()
  106. defer Mgo.DestoryMongoConn(sess)
  107. query := sess.DB("top").C("wcc_allcity_2025Q2").Find(nil).Select(map[string]interface{}{"list": 0}).Iter()
  108. count := 0
  109. ch := make(chan bool, 20)
  110. wg := &sync.WaitGroup{}
  111. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  112. if count%1000 == 0 {
  113. log.Println("current ---", count, tmp["s_winner"])
  114. }
  115. ch <- true
  116. wg.Add(1)
  117. go func(tmp map[string]interface{}) {
  118. defer func() {
  119. <-ch
  120. wg.Done()
  121. }()
  122. //处理数据
  123. beijing := WccBeijing{
  124. ProjectID: util.ObjToString(tmp["id"]),
  125. }
  126. companyName := util.ObjToString(tmp["s_winner"])
  127. if companyName == "" {
  128. return
  129. }
  130. beijing.CompanyName = &companyName
  131. if bidstatus, ok := tmp["bidstatus"].(string); ok {
  132. beijing.BidStatus = &bidstatus
  133. }
  134. beijing.Jgtime = int32(util.IntAll(tmp["jgtime"]))
  135. if util.IntAll(tmp["jgtime"]) > 0 {
  136. // 转换为 time.Time 对象
  137. t := time.Unix(util.Int64All(tmp["jgtime"]), 0)
  138. // 获取年份、月份、季度
  139. year := strconv.Itoa(t.Year())
  140. monthStringTwoDigits := year + fmt.Sprintf("%02d", int(t.Month()))
  141. quarter := year + "Q" + strconv.Itoa((int(t.Month())-1)/3+1)
  142. beijing.BidYear = &year
  143. beijing.BidMonth = &monthStringTwoDigits
  144. beijing.BidQuarter = &quarter
  145. }
  146. area := util.ObjToString(tmp["area"])
  147. beijing.Area = &area
  148. city := util.ObjToString(tmp["city"])
  149. beijing.City = &city
  150. district := util.ObjToString(tmp["district"])
  151. beijing.District = &district
  152. buyer := util.ObjToString(tmp["buyer"])
  153. beijing.Buyer = &buyer
  154. buyerclass := util.ObjToString(tmp["buyerclass"])
  155. beijing.BuyerClass = &buyerclass
  156. if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
  157. topclass := util.ObjToString(topscopeclass[0])
  158. beijing.Topscopeclass = &topclass
  159. }
  160. budget := float32(util.Float64All(tmp["budget"]))
  161. beijing.Budget = &budget
  162. bidamount := float32(util.Float64All(tmp["bidamount"]))
  163. beijing.BidAmount = &bidamount
  164. data, _ := MgoS.FindOne("qyxy_std", map[string]interface{}{"company_name": companyName})
  165. credit_no := util.ObjToString((*data)["credit_no"])
  166. beijing.CreditNo = &credit_no
  167. companyID := util.ObjToString((*data)["_id"])
  168. beijing.CompanyID = companyID
  169. companyArea := util.ObjToString((*data)["company_area"])
  170. beijing.RegisterArea = &companyArea
  171. //
  172. var baseInfo EnterpriseBaseInfo
  173. db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo)
  174. companyType := baseInfo.EnterpriseType
  175. beijing.CompanyType = companyType
  176. //
  177. indusWhere := map[string]interface{}{
  178. "company_id": companyID,
  179. }
  180. indus, _ := Mgo2.Find("company_industry", &indusWhere, nil, nil, false, -1, -1)
  181. if len(*indus) > 0 {
  182. indusName := util.ObjToString((*indus)[0]["industry_l1_name"])
  183. beijing.IndustryName = &indusName
  184. }
  185. _, err := g.DB().Insert(gctx.New(), "wcc_beijing_2024Q1", beijing)
  186. if err != nil {
  187. log.Println("clickhouse 写入失败;", err, companyName, companyID)
  188. }
  189. }(tmp)
  190. tmp = make(map[string]interface{})
  191. }
  192. wg.Wait()
  193. log.Println("over")
  194. // 替换为你的 MySQL 数据库连接信息
  195. //dsn := "user:password@tcp(localhost:3306)/database?charset=utf8mb4&parseTime=True&loc=Local"
  196. //dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database)
  197. //// 连接到数据库
  198. //db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  199. //if err != nil {
  200. // fmt.Println("Failed to connect to database:", err)
  201. // return
  202. //}
  203. //
  204. //fmt.Println(db)
  205. //res, err := g.DB().Query(gctx.New(), `SELECT * FROM seo_siteKeywords_splicing order by site_code asc`)
  206. //if err == nil && len(res.List()) > 0 {
  207. // for _, m := range res.List() {
  208. // log.Println(m)
  209. // }
  210. //}
  211. }