|
- package main
- import (
- "context"
- "database/sql"
- "encoding/json"
- "errors"
- "fmt"
- "strings"
- "time"
- "github.com/RoaringBitmap/roaring"
- "go.uber.org/zap"
- "gorm.io/gorm"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- )
- // dealAllFromCompanyBase 从company_base 处理惬意数据存量
- func dealAllFromCompanyBase() {
- jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据"))
- defer util.Catch()
- sess := MgoQY.GetMgoConn()
- defer MgoQY.DestoryMongoConn(sess)
- where := map[string]interface{}{
- "company_type": map[string]interface{}{
- "$ne": "个体工商户",
- },
- }
- count := 0
- batchSize := 100
- ents := make([]EntInfo, 0, batchSize)
- it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Select(nil).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"]))
- }
- company_status := util.ObjToString(tmp["company_status"])
- if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
- continue
- }
- if util.IntAll(tmp["use_flag"]) > 0 {
- continue
- }
- var ent EntInfo
- ent.CompanyID = util.ObjToString(tmp["company_id"])
- ent.CompanyName = util.ObjToString(tmp["company_name"])
- ent.CompanyCode = util.ObjToString(tmp["company_code"])
- ent.CreditNo = util.ObjToString(tmp["credit_no"])
- ent.OrgCode = util.ObjToString(tmp["org_code"])
- ent.TaxCode = util.ObjToString(tmp["tax_code"])
- ent.EstablishDate = util.ObjToString(tmp["establish_date"])
- ent.LegalPerson = util.ObjToString(tmp["legal_person"])
- ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"])
- ent.CompanyStatus = util.ObjToString(tmp["company_status"])
- ent.CompanyType = util.ObjToString(tmp["company_type"])
- ent.Authority = util.ObjToString(tmp["authority"])
- ent.IssueDate = util.ObjToString(tmp["issue_date"])
- ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"])
- ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"])
- ent.Capital = util.ObjToString(tmp["capital"])
- ent.CompanyAddress = util.ObjToString(tmp["company_address"])
- ent.BusinessScope = util.ObjToString(tmp["business_scope"])
- ent.ComeInTime = time.Now().Unix()
- ent.UpdateTime = time.Now().Unix()
- ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"]))
- ent.RealCapital = util.ObjToString(tmp["real_capital"])
- ent.EnName = util.ObjToString(tmp["en_name"])
- ent.ListCode = util.ObjToString(tmp["list_code"])
- //annual_reports
- std := getQyxyStd(util.ObjToString(tmp["company_name"]))
- if std != nil && len(std) > 0 {
- // 取出 annual_reports 字段
- reports, ok := std["annual_reports"].([]interface{})
- if ok {
- var maxYear float64
- var employeeNo string
- // 遍历 annual_reports 数组
- for i, r := range reports {
- if reportMap, ok := r.(map[string]interface{}); ok {
- year := util.Float64All(reportMap["report_year"])
- emp := util.ObjToString(reportMap["employee_no"])
- if i == 0 || year > maxYear {
- maxYear = year
- employeeNo = emp
- }
- }
- }
- if maxYear > 0 {
- ent.EmployeeNo = util.IntAll(employeeNo)
- }
- }
- }
- //
- ent.Website = util.ObjToString(tmp["website_url"])
- ent.CompanyPhone = util.ObjToString(tmp["company_phone"])
- ent.CompanyEmail = util.ObjToString(tmp["company_email"])
- //company_industry_tags
- whereIndustry := map[string]interface{}{
- "company_id": util.ObjToString(tmp["company_id"]),
- }
- indus, _ := MgoQY.FindOne("company_industry", whereIndustry)
- ent.CompanyIndustryTags = "{}" // 先给个默认值
- if indus != nil && len(*indus) > 0 {
- name_path := make([]string, 0)
- name_code := make([]string, 0)
- name_path = append(name_path, util.ObjToString((*indus)["industry_l1_name"]))
- name_path = append(name_path, util.ObjToString((*indus)["industry_l2_name"]))
- name_path = append(name_path, util.ObjToString((*indus)["industry_l3_name"]))
- name_path = append(name_path, util.ObjToString((*indus)["industry_l4_name"]))
- //
- name_code = append(name_code, util.ObjToString((*indus)["industry_l1_code"]))
- name_code = append(name_code, util.ObjToString((*indus)["industry_l2_code"]))
- name_code = append(name_code, util.ObjToString((*indus)["industry_l3_code"]))
- name_code = append(name_code, util.ObjToString((*indus)["industry_l4_code"]))
- industry := map[string]interface{}{
- "name_path": name_path,
- "code_path": name_code,
- }
- // map 转 JSON
- jsonBytes, _ := json.Marshal(industry)
- ent.CompanyIndustryTags = string(jsonBytes)
- }
- //
- area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"])
- area_code, city_code, district_code := CalculateRegionCode(area, city, district)
- ent.JYAreaCode = area_code
- ent.JYCityCode = city_code
- ent.JYDistrictCode = district_code
- //
- query := `
- SELECT bitmapToArray(company_label)
- FROM ent_info
- WHERE company_id = ?
- `
- var oldLabels = make([]uint64, 0)
- row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID)
- err := row.Scan(&oldLabels)
- if err != nil {
- if errors.Is(err, sql.ErrNoRows) {
- //jlog.Info("dealIncEntInfo: 没查到数据", zap.String("company_id", ent.CompanyID))
- } else {
- jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err))
- }
- }
- // 转 RoaringBitmap
- rbm := roaring.NewBitmap()
- for _, v := range oldLabels {
- rbm.Add(uint32(v))
- }
- bin, _ := rbm.ToBytes()
- ent.JYCompanyLabel = bin
- ent.JYOrgTopType = "企业"
- company_type := util.ObjToString(tmp["company_type"])
- if info, ok := nameNorm[company_type]; ok {
- ent.JYCompanyTypeOriginCode = info.Code
- ent.JYCompanyTypeIsLeaf = 1
- ent.JYCompanyTypeLeafCode = info.Code
- ent.JYCompanyTypeLeafName = info.Name
- ent.JYCompanyTypeLeafTag = info.Tag
- ent.JYOrgPropertyOneTag = "工商"
- ent.JYOrgPropertyTwoTag = "企业"
- }
- //保存tidb
- //if err := MysqlDB.Create(&ent).Error; err != nil {
- // jlog.Info("insert failed: %v", zap.Error(err))
- //}
- ents = append(ents, ent)
- if len(ents) >= batchSize {
- if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
- jlog.Error("批量插入失败", zap.Error(err))
- }
- ents = ents[:0] // 清空 slice
- }
- }
- // 循环结束后如果还有数据
- if len(ents) > 0 {
- if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
- jlog.Error("批量插入失败", zap.Error(err))
- }
- }
- }
- // dealLeaf 处理存量非叶子节点的企业数据标签
- func dealLeaf() {
- const batchSize = 50
- lastID := uint64(0)
- for {
- var companies []EntInfo
- // 分批查询
- if err := MysqlDB.Model(&EntInfo{}).
- Select("id, company_name, credit_no, company_type, jy_company_type_is_leaf").
- Where("jy_company_type_is_leaf = ?", 0).
- Order("id ASC").
- Limit(batchSize).
- Find(&companies).Error; err != nil {
- panic(err)
- }
- if len(companies) == 0 {
- fmt.Println("处理完成 ✅")
- break
- }
- if lastID%1000 == 0 {
- jlog.Info("dealLeaf", zap.Any("lastID", lastID), zap.Any("id", companies[0].ID))
- }
- // 只存储有变化的公司
- updates := make(map[uint64]map[string]interface{})
- for i := range companies {
- if companies[i].JYCompanyTypeIsLeaf == 1 {
- continue
- }
- company_name := util.ObjToString(companies[i].CompanyName)
- top_names := getTopNames(company_name)
- for _, top_name := range top_names {
- topwhere := map[string]interface{}{
- "use_flag": 0,
- "company_status": map[string]interface{}{
- "$nin": []string{"注销", "吊销", "吊销,已注销"},
- },
- "company_name": top_name,
- }
- top_bases, _ := MgoQY.FindOne("company_base", topwhere)
- if top_bases != nil && len(*top_bases) > 0 {
- //获取上级企业类型
- top_company_type := util.ObjToString((*top_bases)["company_type"])
- if norm_info, ok := nameNorm[top_company_type]; ok {
- // 这里判断:如果已有字段不一样,才算变更
- if companies[i].JYCompanyTypeLeafCode != norm_info.Code ||
- companies[i].JYCompanyTypeLeafName != norm_info.Name ||
- companies[i].JYCompanyTypeLeafTag != norm_info.Tag ||
- companies[i].JYOrgPropertyThreeTag != norm_info.Tag2 {
- updates[companies[i].ID] = map[string]interface{}{
- "jy_company_type_leaf_code": norm_info.Code,
- "jy_company_type_leaf_name": norm_info.Name,
- "jy_company_type_leaf_tag": norm_info.Tag,
- "jy_org_property_three_tag": norm_info.Tag2,
- }
- }
- break
- }
- } else {
- // 去其他库查
- where2 := map[string]interface{}{"company_name": top_name}
- enterprise, _ := MgoQY.FindOne("special_enterprise", where2)
- if enterprise != nil && len(*enterprise) > 0 {
- if companies[i].JYOrgPropertyThreeTag != "国企" {
- updates[companies[i].ID] = map[string]interface{}{
- "jy_org_property_three_tag": "国企",
- }
- }
- break
- } else {
- gov, _ := MgoQY.FindOne("special_gov_unit", where2)
- if gov != nil && len(*gov) > 0 {
- if companies[i].JYOrgPropertyThreeTag != "国企" {
- updates[companies[i].ID] = map[string]interface{}{
- "jy_org_property_three_tag": "国企",
- }
- }
- break
- }
- }
- }
- }
- }
- // 批量更新 (只更新有变化的)
- if len(updates) > 0 {
- if err := batchUpdateFields(MysqlDB, (EntInfo{}).TableName(), updates); err != nil {
- panic(err)
- }
- }
- // 更新游标
- lastID = companies[len(companies)-1].ID
- }
- }
- // 允许更新的字段白名单(非常重要,防注入)
- var allowedColumns = map[string]struct{}{
- "jy_company_type_leaf_code": {},
- "jy_company_type_leaf_name": {},
- "jy_company_type_leaf_tag": {},
- "jy_org_property_three_tag": {},
- // 需要的话继续补充其它允许批量更新的字段
- }
- // batchUpdateFields 批量更新
- func batchUpdateFields(db *gorm.DB, tableName string, updates map[uint64]map[string]interface{}) error {
- if len(updates) == 0 {
- return nil
- }
- // 1) 收集字段(按白名单过滤)
- fieldSet := make(map[string]struct{})
- for _, m := range updates {
- for col := range m {
- if _, ok := allowedColumns[col]; ok {
- fieldSet[col] = struct{}{}
- }
- }
- }
- if len(fieldSet) == 0 {
- return nil
- }
- fields := make([]string, 0, len(fieldSet))
- for col := range fieldSet {
- fields = append(fields, col)
- }
- // 2) 构造 CASE 语句和参数
- cases := make([]string, 0, len(fields))
- args := make([]interface{}, 0, len(updates)*len(fields)*2)
- idSet := make(map[uint64]struct{}, len(updates))
- for _, field := range fields {
- var sb strings.Builder
- sb.WriteString(field)
- sb.WriteString(" = CASE id ")
- hasWhen := false
- for id, m := range updates {
- if val, ok := m[field]; ok {
- sb.WriteString("WHEN ? THEN ? ")
- args = append(args, id, val)
- idSet[id] = struct{}{}
- hasWhen = true
- }
- }
- // 如果这个字段对所有 id 都没有需要更新的值,就跳过它
- if !hasWhen {
- continue
- }
- sb.WriteString("ELSE ")
- sb.WriteString(field)
- sb.WriteString(" END")
- cases = append(cases, sb.String())
- }
- // 如果所有字段都被跳过了(比如全被白名单过滤),直接返回
- if len(cases) == 0 {
- return nil
- }
- // 3) WHERE IN 使用占位符
- ids := make([]uint64, 0, len(idSet))
- for id := range idSet {
- ids = append(ids, id)
- }
- placeholders := make([]string, 0, len(ids))
- for range ids {
- placeholders = append(placeholders, "?")
- }
- for _, id := range ids {
- args = append(args, id)
- }
- // 4) 组装最终 SQL
- sql := fmt.Sprintf(
- "UPDATE %s SET %s WHERE id IN (%s)",
- tableName,
- strings.Join(cases, ", "),
- strings.Join(placeholders, ","),
- )
- // 5) 建议放在事务里执行
- return db.Transaction(func(tx *gorm.DB) error {
- return tx.Exec(sql, args...).Error
- })
- }
- // get 通过companyID 获取法人库数据
- func get() {
- // 2. 查询一条数据
- var ent EntInfo
- if err := MysqlDB.Where("company_id = ?", "001c2e9882ae982abf6e1e9ed06e2654").First(&ent).Error; err != nil {
- panic(err)
- }
- // 3. 反序列化 RoaringBitmap
- rbm := roaring.NewBitmap()
- if len(ent.JYCompanyLabel) > 0 {
- if err := rbm.UnmarshalBinary(ent.JYCompanyLabel); err != nil {
- panic(err)
- }
- }
- // 4. 转成 []uint64
- ids := make([]uint64, 0, rbm.GetCardinality())
- it := rbm.Iterator()
- for it.HasNext() {
- ids = append(ids, uint64(it.Next()))
- }
- fmt.Println("CompanyID:", ent.CompanyID)
- fmt.Println("标签ID集合:", ids)
- }
|