package main import ( "context" "database/sql" "encoding/json" "errors" "fmt" "github.com/RoaringBitmap/roaring" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "strings" "time" ) // dealAllData 处理存量数据 func dealAllData() { jlog.Info("dealAllData", zap.String("开始处理法人库存量数据", "-------------")) ctx := context.Background() // 批量处理数据 batchSize := 100 // 每批处理的数据量 offset := 0 count := 0 for { query := fmt.Sprintf(` SELECT company_id,company_name,company_code,credit_no,org_tags,bitmapToArray(company_label) labels,org_code, tax_code,establish_date, FROM ent_info LIMIT %d OFFSET %d `, batchSize, offset) rows, err := ClickHouseConn.Query(ctx, query) if err != nil { jlog.Info("allUpdateBitmap2", zap.Error(err)) } if !rows.Next() { break } for rows.Next() { count++ var ent EntInfo //var bitmapVals []uint64 var company_id, companyName, creditNo, orgTags string var oldLabels = make([]uint64, 0) if err = rows.Scan(&company_id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil { jlog.Info("dealIncEntInfo", zap.Error(err)) } if count%10000 == 0 { jlog.Info("allUpdateBitmap2:", zap.Int("current", offset), zap.String("name", ent.CompanyName)) } } offset += batchSize } jlog.Info("dealAllData", zap.Int("存量数据迭代完毕", offset)) } // dealAllFromCompanyBase 从company_base 处理惬意数据存量 func dealAllFromCompanyBase() { jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据")) defer util.Catch() sess := MgoQY.GetMgoConn() defer MgoQY.DestoryMongoConn(sess) where := map[string]interface{}{ "company_type": map[string]interface{}{ "$ne": "个体工商户", }, } count := 0 batchSize := 100 ents := make([]EntInfo, 0, batchSize) it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Select(nil).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"])) } company_status := util.ObjToString(tmp["company_status"]) if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") { continue } if util.IntAll(tmp["use_flag"]) > 0 { continue } var ent EntInfo ent.CompanyID = util.ObjToString(tmp["company_id"]) ent.CompanyName = util.ObjToString(tmp["company_name"]) ent.CompanyCode = util.ObjToString(tmp["company_code"]) ent.CreditNo = util.ObjToString(tmp["credit_no"]) ent.OrgCode = util.ObjToString(tmp["org_code"]) ent.TaxCode = util.ObjToString(tmp["tax_code"]) ent.EstablishDate = util.ObjToString(tmp["establish_date"]) ent.LegalPerson = util.ObjToString(tmp["legal_person"]) ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"]) ent.CompanyStatus = util.ObjToString(tmp["company_status"]) ent.CompanyType = util.ObjToString(tmp["company_type"]) ent.Authority = util.ObjToString(tmp["authority"]) ent.IssueDate = util.ObjToString(tmp["issue_date"]) ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"]) ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"]) ent.Capital = util.ObjToString(tmp["capital"]) ent.CompanyAddress = util.ObjToString(tmp["company_address"]) ent.BusinessScope = util.ObjToString(tmp["business_scope"]) ent.ComeInTime = time.Now().Unix() ent.UpdateTime = time.Now().Unix() ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"])) ent.RealCapital = util.ObjToString(tmp["real_capital"]) ent.EnName = util.ObjToString(tmp["en_name"]) ent.ListCode = util.ObjToString(tmp["list_code"]) //annual_reports std := getQyxyStd(util.ObjToString(tmp["company_name"])) if std != nil && len(std) > 0 { // 取出 annual_reports 字段 reports, ok := std["annual_reports"].([]interface{}) if ok { var maxYear float64 var employeeNo string // 遍历 annual_reports 数组 for i, r := range reports { if reportMap, ok := r.(map[string]interface{}); ok { year := util.Float64All(reportMap["report_year"]) emp := util.ObjToString(reportMap["employee_no"]) if i == 0 || year > maxYear { maxYear = year employeeNo = emp } } } if maxYear > 0 { ent.EmployeeNo = util.IntAll(employeeNo) } } } // ent.Website = util.ObjToString(tmp["website_url"]) ent.CompanyPhone = util.ObjToString(tmp["company_phone"]) ent.CompanyEmail = util.ObjToString(tmp["company_email"]) //company_industry_tags whereIndustry := map[string]interface{}{ "company_id": util.ObjToString(tmp["company_id"]), } indus, _ := MgoQY.FindOne("company_industry", whereIndustry) ent.CompanyIndustryTags = "{}" // 先给个默认值 if indus != nil && len(*indus) > 0 { name_path := make([]string, 0) name_code := make([]string, 0) name_path = append(name_path, util.ObjToString((*indus)["industry_l1_name"])) name_path = append(name_path, util.ObjToString((*indus)["industry_l2_name"])) name_path = append(name_path, util.ObjToString((*indus)["industry_l3_name"])) name_path = append(name_path, util.ObjToString((*indus)["industry_l4_name"])) // name_code = append(name_code, util.ObjToString((*indus)["industry_l1_code"])) name_code = append(name_code, util.ObjToString((*indus)["industry_l2_code"])) name_code = append(name_code, util.ObjToString((*indus)["industry_l3_code"])) name_code = append(name_code, util.ObjToString((*indus)["industry_l4_code"])) industry := map[string]interface{}{ "name_path": name_path, "code_path": name_code, } // map 转 JSON jsonBytes, _ := json.Marshal(industry) ent.CompanyIndustryTags = string(jsonBytes) } // area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"]) area_code, city_code, district_code := CalculateRegionCode(area, city, district) ent.JYAreaCode = area_code ent.JYCityCode = city_code ent.JYDistrictCode = district_code // query := ` SELECT bitmapToArray(company_label) FROM ent_info WHERE company_id = ? ` var oldLabels = make([]uint64, 0) row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID) err := row.Scan(&oldLabels) if err != nil { if errors.Is(err, sql.ErrNoRows) { //jlog.Info("dealIncEntInfo: 没查到数据", zap.String("company_id", ent.CompanyID)) } else { jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err)) } } // 转 RoaringBitmap rbm := roaring.NewBitmap() for _, v := range oldLabels { rbm.Add(uint32(v)) } bin, _ := rbm.ToBytes() ent.JYCompanyLabel = bin ent.JYOrgTopType = "企业" company_type := util.ObjToString(tmp["company_type"]) if info, ok := nameNorm[company_type]; ok { ent.JYCompanyTypeOriginCode = info.Code ent.JYCompanyTypeIsLeaf = 1 ent.JYCompanyTypeLeafCode = info.Code ent.JYCompanyTypeLeafName = info.Name ent.JYCompanyTypeLeafTag = info.Tag ent.JYOrgPropertyOneTag = "工商" ent.JYOrgPropertyTwoTag = "企业" } //保存tidb //if err := MysqlDB.Create(&ent).Error; err != nil { // jlog.Info("insert failed: %v", zap.Error(err)) //} ents = append(ents, ent) if len(ents) >= batchSize { if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil { jlog.Error("批量插入失败", zap.Error(err)) } ents = ents[:0] // 清空 slice } } // 循环结束后如果还有数据 if len(ents) > 0 { if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil { jlog.Error("批量插入失败", zap.Error(err)) } } } // get 通过companyID 获取法人库数据 func get() { // 2. 查询一条数据 var ent EntInfo if err := MysqlDB.Where("company_id = ?", "001c2e9882ae982abf6e1e9ed06e2654").First(&ent).Error; err != nil { panic(err) } // 3. 反序列化 RoaringBitmap rbm := roaring.NewBitmap() if len(ent.JYCompanyLabel) > 0 { if err := rbm.UnmarshalBinary(ent.JYCompanyLabel); err != nil { panic(err) } } // 4. 转成 []uint64 ids := make([]uint64, 0, rbm.GetCardinality()) it := rbm.Iterator() for it.HasNext() { ids = append(ids, uint64(it.Next())) } fmt.Println("CompanyID:", ent.CompanyID) fmt.Println("标签ID集合:", ids) }