all.go 8.3 KB


  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/RoaringBitmap/roaring"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "strings"
  13. "time"
  14. )
  15. // dealAllData 处理存量数据
  16. func dealAllData() {
  17. jlog.Info("dealAllData", zap.String("开始处理法人库存量数据", "-------------"))
  18. ctx := context.Background()
  19. // 批量处理数据
  20. batchSize := 100 // 每批处理的数据量
  21. offset := 0
  22. count := 0
  23. for {
  24. query := fmt.Sprintf(`
  25. SELECT company_id,company_name,company_code,credit_no,org_tags,bitmapToArray(company_label) labels,org_code, tax_code,establish_date,
  26. FROM ent_info LIMIT %d OFFSET %d
  27. `, batchSize, offset)
  28. rows, err := ClickHouseConn.Query(ctx, query)
  29. if err != nil {
  30. jlog.Info("allUpdateBitmap2", zap.Error(err))
  31. }
  32. if !rows.Next() {
  33. break
  34. }
  35. for rows.Next() {
  36. count++
  37. var ent EntInfo
  38. //var bitmapVals []uint64
  39. var company_id, companyName, creditNo, orgTags string
  40. var oldLabels = make([]uint64, 0)
  41. if err = rows.Scan(&company_id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil {
  42. jlog.Info("dealIncEntInfo", zap.Error(err))
  43. }
  44. if count%10000 == 0 {
  45. jlog.Info("allUpdateBitmap2:", zap.Int("current", offset), zap.String("name", ent.CompanyName))
  46. }
  47. }
  48. offset += batchSize
  49. }
  50. jlog.Info("dealAllData", zap.Int("存量数据迭代完毕", offset))
  51. }
  52. // dealAllFromCompanyBase 从company_base 处理惬意数据存量
  53. func dealAllFromCompanyBase() {
  54. jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据"))
  55. defer util.Catch()
  56. sess := MgoQY.GetMgoConn()
  57. defer MgoQY.DestoryMongoConn(sess)
  58. where := map[string]interface{}{
  59. "company_type": map[string]interface{}{
  60. "$ne": "个体工商户",
  61. },
  62. }
  63. count := 0
  64. batchSize := 100
  65. ents := make([]EntInfo, 0, batchSize)
  66. it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Select(nil).Iter()
  67. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  68. if count%1000 == 0 {
  69. jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"]))
  70. }
  71. company_status := util.ObjToString(tmp["company_status"])
  72. if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
  73. continue
  74. }
  75. if util.IntAll(tmp["use_flag"]) > 0 {
  76. continue
  77. }
  78. var ent EntInfo
  79. ent.CompanyID = util.ObjToString(tmp["company_id"])
  80. ent.CompanyName = util.ObjToString(tmp["company_name"])
  81. ent.CompanyCode = util.ObjToString(tmp["company_code"])
  82. ent.CreditNo = util.ObjToString(tmp["credit_no"])
  83. ent.OrgCode = util.ObjToString(tmp["org_code"])
  84. ent.TaxCode = util.ObjToString(tmp["tax_code"])
  85. ent.EstablishDate = util.ObjToString(tmp["establish_date"])
  86. ent.LegalPerson = util.ObjToString(tmp["legal_person"])
  87. ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"])
  88. ent.CompanyStatus = util.ObjToString(tmp["company_status"])
  89. ent.CompanyType = util.ObjToString(tmp["company_type"])
  90. ent.Authority = util.ObjToString(tmp["authority"])
  91. ent.IssueDate = util.ObjToString(tmp["issue_date"])
  92. ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"])
  93. ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"])
  94. ent.Capital = util.ObjToString(tmp["capital"])
  95. ent.CompanyAddress = util.ObjToString(tmp["company_address"])
  96. ent.BusinessScope = util.ObjToString(tmp["business_scope"])
  97. ent.ComeInTime = time.Now().Unix()
  98. ent.UpdateTime = time.Now().Unix()
  99. ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"]))
  100. ent.RealCapital = util.ObjToString(tmp["real_capital"])
  101. ent.EnName = util.ObjToString(tmp["en_name"])
  102. ent.ListCode = util.ObjToString(tmp["list_code"])
  103. //annual_reports
  104. std := getQyxyStd(util.ObjToString(tmp["company_name"]))
  105. if std != nil && len(std) > 0 {
  106. // 取出 annual_reports 字段
  107. reports, ok := std["annual_reports"].([]interface{})
  108. if ok {
  109. var maxYear float64
  110. var employeeNo string
  111. // 遍历 annual_reports 数组
  112. for i, r := range reports {
  113. if reportMap, ok := r.(map[string]interface{}); ok {
  114. year := util.Float64All(reportMap["report_year"])
  115. emp := util.ObjToString(reportMap["employee_no"])
  116. if i == 0 || year > maxYear {
  117. maxYear = year
  118. employeeNo = emp
  119. }
  120. }
  121. }
  122. if maxYear > 0 {
  123. ent.EmployeeNo = util.IntAll(employeeNo)
  124. }
  125. }
  126. }
  127. //
  128. ent.Website = util.ObjToString(tmp["website_url"])
  129. ent.CompanyPhone = util.ObjToString(tmp["company_phone"])
  130. ent.CompanyEmail = util.ObjToString(tmp["company_email"])
  131. //company_industry_tags
  132. whereIndustry := map[string]interface{}{
  133. "company_id": util.ObjToString(tmp["company_id"]),
  134. }
  135. indus, _ := MgoQY.FindOne("company_industry", whereIndustry)
  136. ent.CompanyIndustryTags = "{}" // 先给个默认值
  137. if indus != nil && len(*indus) > 0 {
  138. name_path := make([]string, 0)
  139. name_code := make([]string, 0)
  140. name_path = append(name_path, util.ObjToString((*indus)["industry_l1_name"]))
  141. name_path = append(name_path, util.ObjToString((*indus)["industry_l2_name"]))
  142. name_path = append(name_path, util.ObjToString((*indus)["industry_l3_name"]))
  143. name_path = append(name_path, util.ObjToString((*indus)["industry_l4_name"]))
  144. //
  145. name_code = append(name_code, util.ObjToString((*indus)["industry_l1_code"]))
  146. name_code = append(name_code, util.ObjToString((*indus)["industry_l2_code"]))
  147. name_code = append(name_code, util.ObjToString((*indus)["industry_l3_code"]))
  148. name_code = append(name_code, util.ObjToString((*indus)["industry_l4_code"]))
  149. industry := map[string]interface{}{
  150. "name_path": name_path,
  151. "code_path": name_code,
  152. }
  153. // map 转 JSON
  154. jsonBytes, _ := json.Marshal(industry)
  155. ent.CompanyIndustryTags = string(jsonBytes)
  156. }
  157. //
  158. area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"])
  159. area_code, city_code, district_code := CalculateRegionCode(area, city, district)
  160. ent.JYAreaCode = area_code
  161. ent.JYCityCode = city_code
  162. ent.JYDistrictCode = district_code
  163. //
  164. query := `
  165. SELECT bitmapToArray(company_label)
  166. FROM ent_info
  167. WHERE company_id = ?
  168. `
  169. var oldLabels = make([]uint64, 0)
  170. row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID)
  171. err := row.Scan(&oldLabels)
  172. if err != nil {
  173. if errors.Is(err, sql.ErrNoRows) {
  174. //jlog.Info("dealIncEntInfo: 没查到数据", zap.String("company_id", ent.CompanyID))
  175. } else {
  176. jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err))
  177. }
  178. }
  179. // 转 RoaringBitmap
  180. rbm := roaring.NewBitmap()
  181. for _, v := range oldLabels {
  182. rbm.Add(uint32(v))
  183. }
  184. bin, _ := rbm.ToBytes()
  185. ent.JYCompanyLabel = bin
  186. ent.JYOrgTopType = "企业"
  187. company_type := util.ObjToString(tmp["company_type"])
  188. if info, ok := nameNorm[company_type]; ok {
  189. ent.JYCompanyTypeOriginCode = info.Code
  190. ent.JYCompanyTypeIsLeaf = 1
  191. ent.JYCompanyTypeLeafCode = info.Code
  192. ent.JYCompanyTypeLeafName = info.Name
  193. ent.JYCompanyTypeLeafTag = info.Tag
  194. ent.JYOrgPropertyOneTag = "工商"
  195. ent.JYOrgPropertyTwoTag = "企业"
  196. }
  197. //保存tidb
  198. //if err := MysqlDB.Create(&ent).Error; err != nil {
  199. // jlog.Info("insert failed: %v", zap.Error(err))
  200. //}
  201. ents = append(ents, ent)
  202. if len(ents) >= batchSize {
  203. if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
  204. jlog.Error("批量插入失败", zap.Error(err))
  205. }
  206. ents = ents[:0] // 清空 slice
  207. }
  208. }
  209. // 循环结束后如果还有数据
  210. if len(ents) > 0 {
  211. if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
  212. jlog.Error("批量插入失败", zap.Error(err))
  213. }
  214. }
  215. }
  216. // get 通过companyID 获取法人库数据
  217. func get() {
  218. // 2. 查询一条数据
  219. var ent EntInfo
  220. if err := MysqlDB.Where("company_id = ?", "001c2e9882ae982abf6e1e9ed06e2654").First(&ent).Error; err != nil {
  221. panic(err)
  222. }
  223. // 3. 反序列化 RoaringBitmap
  224. rbm := roaring.NewBitmap()
  225. if len(ent.JYCompanyLabel) > 0 {
  226. if err := rbm.UnmarshalBinary(ent.JYCompanyLabel); err != nil {
  227. panic(err)
  228. }
  229. }
  230. // 4. 转成 []uint64
  231. ids := make([]uint64, 0, rbm.GetCardinality())
  232. it := rbm.Iterator()
  233. for it.HasNext() {
  234. ids = append(ids, uint64(it.Next()))
  235. }
  236. fmt.Println("CompanyID:", ent.CompanyID)
  237. fmt.Println("标签ID集合:", ids)
  238. }