123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- package main
- import (
- "context"
- "database/sql"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/RoaringBitmap/roaring"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "strings"
- "time"
- )
- // dealAllData 处理存量数据
- func dealAllData() {
- jlog.Info("dealAllData", zap.String("开始处理法人库存量数据", "-------------"))
- ctx := context.Background()
- // 批量处理数据
- batchSize := 100 // 每批处理的数据量
- offset := 0
- count := 0
- for {
- query := fmt.Sprintf(`
- SELECT company_id,company_name,company_code,credit_no,org_tags,bitmapToArray(company_label) labels,org_code, tax_code,establish_date,
-
- FROM ent_info LIMIT %d OFFSET %d
- `, batchSize, offset)
- rows, err := ClickHouseConn.Query(ctx, query)
- if err != nil {
- jlog.Info("allUpdateBitmap2", zap.Error(err))
- }
- if !rows.Next() {
- break
- }
- for rows.Next() {
- count++
- var ent EntInfo
- //var bitmapVals []uint64
- var company_id, companyName, creditNo, orgTags string
- var oldLabels = make([]uint64, 0)
- if err = rows.Scan(&company_id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil {
- jlog.Info("dealIncEntInfo", zap.Error(err))
- }
- if count%10000 == 0 {
- jlog.Info("allUpdateBitmap2:", zap.Int("current", offset), zap.String("name", ent.CompanyName))
- }
- }
- offset += batchSize
- }
- jlog.Info("dealAllData", zap.Int("存量数据迭代完毕", offset))
- }
- // dealAllFromCompanyBase 从company_base 处理惬意数据存量
- func dealAllFromCompanyBase() {
- jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据"))
- defer util.Catch()
- sess := MgoQY.GetMgoConn()
- defer MgoQY.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "company_type": map[string]interface{}{
- "$ne": "个体工商户",
- },
- }
- count := 0
- batchSize := 100
- ents := make([]EntInfo, 0, batchSize)
- it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Select(nil).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"]))
- }
- company_status := util.ObjToString(tmp["company_status"])
- if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
- continue
- }
- if util.IntAll(tmp["use_flag"]) > 0 {
- continue
- }
- var ent EntInfo
- ent.CompanyID = util.ObjToString(tmp["company_id"])
- ent.CompanyName = util.ObjToString(tmp["company_name"])
- ent.CompanyCode = util.ObjToString(tmp["company_code"])
- ent.CreditNo = util.ObjToString(tmp["credit_no"])
- ent.OrgCode = util.ObjToString(tmp["org_code"])
- ent.TaxCode = util.ObjToString(tmp["tax_code"])
- ent.EstablishDate = util.ObjToString(tmp["establish_date"])
- ent.LegalPerson = util.ObjToString(tmp["legal_person"])
- ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"])
- ent.CompanyStatus = util.ObjToString(tmp["company_status"])
- ent.CompanyType = util.ObjToString(tmp["company_type"])
- ent.Authority = util.ObjToString(tmp["authority"])
- ent.IssueDate = util.ObjToString(tmp["issue_date"])
- ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"])
- ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"])
- ent.Capital = util.ObjToString(tmp["capital"])
- ent.CompanyAddress = util.ObjToString(tmp["company_address"])
- ent.BusinessScope = util.ObjToString(tmp["business_scope"])
- ent.ComeInTime = time.Now().Unix()
- ent.UpdateTime = time.Now().Unix()
- ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"]))
- ent.RealCapital = util.ObjToString(tmp["real_capital"])
- ent.EnName = util.ObjToString(tmp["en_name"])
- ent.ListCode = util.ObjToString(tmp["list_code"])
- //annual_reports
- std := getQyxyStd(util.ObjToString(tmp["company_name"]))
- if std != nil && len(std) > 0 {
- // 取出 annual_reports 字段
- reports, ok := std["annual_reports"].([]interface{})
- if ok {
- var maxYear float64
- var employeeNo string
- // 遍历 annual_reports 数组
- for i, r := range reports {
- if reportMap, ok := r.(map[string]interface{}); ok {
- year := util.Float64All(reportMap["report_year"])
- emp := util.ObjToString(reportMap["employee_no"])
- if i == 0 || year > maxYear {
- maxYear = year
- employeeNo = emp
- }
- }
- }
- if maxYear > 0 {
- ent.EmployeeNo = util.IntAll(employeeNo)
- }
- }
- }
- //
- ent.Website = util.ObjToString(tmp["website_url"])
- ent.CompanyPhone = util.ObjToString(tmp["company_phone"])
- ent.CompanyEmail = util.ObjToString(tmp["company_email"])
- //company_industry_tags
- whereIndustry := map[string]interface{}{
- "company_id": util.ObjToString(tmp["company_id"]),
- }
- indus, _ := MgoQY.FindOne("company_industry", whereIndustry)
- ent.CompanyIndustryTags = "{}" // 先给个默认值
- if indus != nil && len(*indus) > 0 {
- name_path := make([]string, 0)
- name_code := make([]string, 0)
- name_path = append(name_path, util.ObjToString((*indus)["industry_l1_name"]))
- name_path = append(name_path, util.ObjToString((*indus)["industry_l2_name"]))
- name_path = append(name_path, util.ObjToString((*indus)["industry_l3_name"]))
- name_path = append(name_path, util.ObjToString((*indus)["industry_l4_name"]))
- //
- name_code = append(name_code, util.ObjToString((*indus)["industry_l1_code"]))
- name_code = append(name_code, util.ObjToString((*indus)["industry_l2_code"]))
- name_code = append(name_code, util.ObjToString((*indus)["industry_l3_code"]))
- name_code = append(name_code, util.ObjToString((*indus)["industry_l4_code"]))
- industry := map[string]interface{}{
- "name_path": name_path,
- "code_path": name_code,
- }
- // map 转 JSON
- jsonBytes, _ := json.Marshal(industry)
- ent.CompanyIndustryTags = string(jsonBytes)
- }
- //
- area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"])
- area_code, city_code, district_code := CalculateRegionCode(area, city, district)
- ent.JYAreaCode = area_code
- ent.JYCityCode = city_code
- ent.JYDistrictCode = district_code
- //
- query := `
- SELECT bitmapToArray(company_label)
- FROM ent_info
- WHERE company_id = ?
- `
- var oldLabels = make([]uint64, 0)
- row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID)
- err := row.Scan(&oldLabels)
- if err != nil {
- if errors.Is(err, sql.ErrNoRows) {
- //jlog.Info("dealIncEntInfo: 没查到数据", zap.String("company_id", ent.CompanyID))
- } else {
- jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err))
- }
- }
- // 转 RoaringBitmap
- rbm := roaring.NewBitmap()
- for _, v := range oldLabels {
- rbm.Add(uint32(v))
- }
- bin, _ := rbm.ToBytes()
- ent.JYCompanyLabel = bin
- ent.JYOrgTopType = "企业"
- company_type := util.ObjToString(tmp["company_type"])
- if info, ok := nameNorm[company_type]; ok {
- ent.JYCompanyTypeOriginCode = info.Code
- ent.JYCompanyTypeIsLeaf = 1
- ent.JYCompanyTypeLeafCode = info.Code
- ent.JYCompanyTypeLeafName = info.Name
- ent.JYCompanyTypeLeafTag = info.Tag
- ent.JYOrgPropertyOneTag = "工商"
- ent.JYOrgPropertyTwoTag = "企业"
- }
- //保存tidb
- //if err := MysqlDB.Create(&ent).Error; err != nil {
- // jlog.Info("insert failed: %v", zap.Error(err))
- //}
- ents = append(ents, ent)
- if len(ents) >= batchSize {
- if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
- jlog.Error("批量插入失败", zap.Error(err))
- }
- ents = ents[:0] // 清空 slice
- }
- }
- // 循环结束后如果还有数据
- if len(ents) > 0 {
- if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
- jlog.Error("批量插入失败", zap.Error(err))
- }
- }
- }
- // get 通过companyID 获取法人库数据
- func get() {
- // 2. 查询一条数据
- var ent EntInfo
- if err := MysqlDB.Where("company_id = ?", "001c2e9882ae982abf6e1e9ed06e2654").First(&ent).Error; err != nil {
- panic(err)
- }
- // 3. 反序列化 RoaringBitmap
- rbm := roaring.NewBitmap()
- if len(ent.JYCompanyLabel) > 0 {
- if err := rbm.UnmarshalBinary(ent.JYCompanyLabel); err != nil {
- panic(err)
- }
- }
- // 4. 转成 []uint64
- ids := make([]uint64, 0, rbm.GetCardinality())
- it := rbm.Iterator()
- for it.HasNext() {
- ids = append(ids, uint64(it.Next()))
- }
- fmt.Println("CompanyID:", ent.CompanyID)
- fmt.Println("标签ID集合:", ids)
- }
|