package main import ( "context" "database/sql" "encoding/json" "errors" "fmt" "strings" "time" "github.com/RoaringBitmap/roaring" "go.uber.org/zap" "gorm.io/gorm" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" ) // dealAllFromCompanyBase 从company_base 处理惬意数据存量 func dealAllFromCompanyBase() { jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据")) defer util.Catch() sess := MgoQY.GetMgoConn() defer MgoQY.DestoryMongoConn(sess) where := map[string]interface{}{ "company_type": map[string]interface{}{ "$ne": "个体工商户", }, } count := 0 batchSize := 100 ents := make([]EntInfo, 0, batchSize) it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Select(nil).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"])) } company_status := util.ObjToString(tmp["company_status"]) if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") { continue } if util.IntAll(tmp["use_flag"]) > 0 { continue } var ent EntInfo ent.CompanyID = util.ObjToString(tmp["company_id"]) ent.CompanyName = util.ObjToString(tmp["company_name"]) ent.CompanyCode = util.ObjToString(tmp["company_code"]) ent.CreditNo = util.ObjToString(tmp["credit_no"]) ent.OrgCode = util.ObjToString(tmp["org_code"]) ent.TaxCode = util.ObjToString(tmp["tax_code"]) ent.EstablishDate = util.ObjToString(tmp["establish_date"]) ent.LegalPerson = util.ObjToString(tmp["legal_person"]) ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"]) ent.CompanyStatus = util.ObjToString(tmp["company_status"]) ent.CompanyType = util.ObjToString(tmp["company_type"]) ent.Authority = util.ObjToString(tmp["authority"]) ent.IssueDate = util.ObjToString(tmp["issue_date"]) ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"]) ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"]) ent.Capital = util.ObjToString(tmp["capital"]) ent.CompanyAddress = util.ObjToString(tmp["company_address"]) ent.BusinessScope = util.ObjToString(tmp["business_scope"]) ent.ComeInTime = time.Now().Unix() ent.UpdateTime = time.Now().Unix() ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"])) ent.RealCapital = util.ObjToString(tmp["real_capital"]) ent.EnName = util.ObjToString(tmp["en_name"]) ent.ListCode = util.ObjToString(tmp["list_code"]) //annual_reports std := getQyxyStd(util.ObjToString(tmp["company_name"])) if std != nil && len(std) > 0 { // 取出 annual_reports 字段 reports, ok := std["annual_reports"].([]interface{}) if ok { var maxYear float64 var employeeNo string // 遍历 annual_reports 数组 for i, r := range reports { if reportMap, ok := r.(map[string]interface{}); ok { year := util.Float64All(reportMap["report_year"]) emp := util.ObjToString(reportMap["employee_no"]) if i == 0 || year > maxYear { maxYear = year employeeNo = emp } } } if maxYear > 0 { ent.EmployeeNo = util.IntAll(employeeNo) } } } // ent.Website = util.ObjToString(tmp["website_url"]) ent.CompanyPhone = util.ObjToString(tmp["company_phone"]) ent.CompanyEmail = util.ObjToString(tmp["company_email"]) //company_industry_tags whereIndustry := map[string]interface{}{ "company_id": util.ObjToString(tmp["company_id"]), } indus, _ := MgoQY.FindOne("company_industry", whereIndustry) ent.CompanyIndustryTags = "{}" // 先给个默认值 if indus != nil && len(*indus) > 0 { name_path := make([]string, 0) name_code := make([]string, 0) name_path = append(name_path, util.ObjToString((*indus)["industry_l1_name"])) name_path = append(name_path, util.ObjToString((*indus)["industry_l2_name"])) name_path = append(name_path, util.ObjToString((*indus)["industry_l3_name"])) name_path = append(name_path, util.ObjToString((*indus)["industry_l4_name"])) // name_code = append(name_code, util.ObjToString((*indus)["industry_l1_code"])) name_code = append(name_code, util.ObjToString((*indus)["industry_l2_code"])) name_code = append(name_code, util.ObjToString((*indus)["industry_l3_code"])) name_code = append(name_code, util.ObjToString((*indus)["industry_l4_code"])) industry := map[string]interface{}{ "name_path": name_path, "code_path": name_code, } // map 转 JSON jsonBytes, _ := json.Marshal(industry) ent.CompanyIndustryTags = string(jsonBytes) } // area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"]) area_code, city_code, district_code := CalculateRegionCode(area, city, district) ent.JYAreaCode = area_code ent.JYCityCode = city_code ent.JYDistrictCode = district_code // query := ` SELECT bitmapToArray(company_label) FROM ent_info WHERE company_id = ? ` var oldLabels = make([]uint64, 0) row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID) err := row.Scan(&oldLabels) if err != nil { if errors.Is(err, sql.ErrNoRows) { //jlog.Info("dealIncEntInfo: 没查到数据", zap.String("company_id", ent.CompanyID)) } else { jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err)) } } // 转 RoaringBitmap rbm := roaring.NewBitmap() for _, v := range oldLabels { rbm.Add(uint32(v)) } bin, _ := rbm.ToBytes() ent.JYCompanyLabel = bin ent.JYOrgTopType = "企业" company_type := util.ObjToString(tmp["company_type"]) if info, ok := nameNorm[company_type]; ok { ent.JYCompanyTypeOriginCode = info.Code ent.JYCompanyTypeIsLeaf = 1 ent.JYCompanyTypeLeafCode = info.Code ent.JYCompanyTypeLeafName = info.Name ent.JYCompanyTypeLeafTag = info.Tag ent.JYOrgPropertyOneTag = "工商" ent.JYOrgPropertyTwoTag = "企业" } //保存tidb //if err := MysqlDB.Create(&ent).Error; err != nil { // jlog.Info("insert failed: %v", zap.Error(err)) //} ents = append(ents, ent) if len(ents) >= batchSize { if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil { jlog.Error("批量插入失败", zap.Error(err)) } ents = ents[:0] // 清空 slice } } // 循环结束后如果还有数据 if len(ents) > 0 { if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil { jlog.Error("批量插入失败", zap.Error(err)) } } } // dealLeaf 处理存量非叶子节点的企业数据标签 func dealLeaf() { const batchSize = 50 lastID := uint64(0) for { var companies []EntInfo // 分批查询 if err := MysqlDB.Model(&EntInfo{}). Select("id, company_name, credit_no, company_type, jy_company_type_is_leaf"). Where("jy_company_type_is_leaf = ?", 0). Order("id ASC"). Limit(batchSize). Find(&companies).Error; err != nil { panic(err) } if len(companies) == 0 { fmt.Println("处理完成 ✅") break } if lastID%1000 == 0 { jlog.Info("dealLeaf", zap.Any("lastID", lastID), zap.Any("id", companies[0].ID)) } // 只存储有变化的公司 updates := make(map[uint64]map[string]interface{}) for i := range companies { if companies[i].JYCompanyTypeIsLeaf == 1 { continue } company_name := util.ObjToString(companies[i].CompanyName) top_names := getTopNames(company_name) for _, top_name := range top_names { topwhere := map[string]interface{}{ "use_flag": 0, "company_status": map[string]interface{}{ "$nin": []string{"注销", "吊销", "吊销,已注销"}, }, "company_name": top_name, } top_bases, _ := MgoQY.FindOne("company_base", topwhere) if top_bases != nil && len(*top_bases) > 0 { //获取上级企业类型 top_company_type := util.ObjToString((*top_bases)["company_type"]) if norm_info, ok := nameNorm[top_company_type]; ok { // 这里判断:如果已有字段不一样,才算变更 if companies[i].JYCompanyTypeLeafCode != norm_info.Code || companies[i].JYCompanyTypeLeafName != norm_info.Name || companies[i].JYCompanyTypeLeafTag != norm_info.Tag || companies[i].JYOrgPropertyThreeTag != norm_info.Tag2 { updates[companies[i].ID] = map[string]interface{}{ "jy_company_type_leaf_code": norm_info.Code, "jy_company_type_leaf_name": norm_info.Name, "jy_company_type_leaf_tag": norm_info.Tag, "jy_org_property_three_tag": norm_info.Tag2, } } break } } else { // 去其他库查 where2 := map[string]interface{}{"company_name": top_name} enterprise, _ := MgoQY.FindOne("special_enterprise", where2) if enterprise != nil && len(*enterprise) > 0 { if companies[i].JYOrgPropertyThreeTag != "国企" { updates[companies[i].ID] = map[string]interface{}{ "jy_org_property_three_tag": "国企", } } break } else { gov, _ := MgoQY.FindOne("special_gov_unit", where2) if gov != nil && len(*gov) > 0 { if companies[i].JYOrgPropertyThreeTag != "国企" { updates[companies[i].ID] = map[string]interface{}{ "jy_org_property_three_tag": "国企", } } break } } } } } // 批量更新 (只更新有变化的) if len(updates) > 0 { if err := batchUpdateFields(MysqlDB, (EntInfo{}).TableName(), updates); err != nil { panic(err) } } // 更新游标 lastID = companies[len(companies)-1].ID } } // 允许更新的字段白名单(非常重要,防注入) var allowedColumns = map[string]struct{}{ "jy_company_type_leaf_code": {}, "jy_company_type_leaf_name": {}, "jy_company_type_leaf_tag": {}, "jy_org_property_three_tag": {}, // 需要的话继续补充其它允许批量更新的字段 } // batchUpdateFields 批量更新 func batchUpdateFields(db *gorm.DB, tableName string, updates map[uint64]map[string]interface{}) error { if len(updates) == 0 { return nil } // 1) 收集字段(按白名单过滤) fieldSet := make(map[string]struct{}) for _, m := range updates { for col := range m { if _, ok := allowedColumns[col]; ok { fieldSet[col] = struct{}{} } } } if len(fieldSet) == 0 { return nil } fields := make([]string, 0, len(fieldSet)) for col := range fieldSet { fields = append(fields, col) } // 2) 构造 CASE 语句和参数 cases := make([]string, 0, len(fields)) args := make([]interface{}, 0, len(updates)*len(fields)*2) idSet := make(map[uint64]struct{}, len(updates)) for _, field := range fields { var sb strings.Builder sb.WriteString(field) sb.WriteString(" = CASE id ") hasWhen := false for id, m := range updates { if val, ok := m[field]; ok { sb.WriteString("WHEN ? THEN ? ") args = append(args, id, val) idSet[id] = struct{}{} hasWhen = true } } // 如果这个字段对所有 id 都没有需要更新的值,就跳过它 if !hasWhen { continue } sb.WriteString("ELSE ") sb.WriteString(field) sb.WriteString(" END") cases = append(cases, sb.String()) } // 如果所有字段都被跳过了(比如全被白名单过滤),直接返回 if len(cases) == 0 { return nil } // 3) WHERE IN 使用占位符 ids := make([]uint64, 0, len(idSet)) for id := range idSet { ids = append(ids, id) } placeholders := make([]string, 0, len(ids)) for range ids { placeholders = append(placeholders, "?") } for _, id := range ids { args = append(args, id) } // 4) 组装最终 SQL sql := fmt.Sprintf( "UPDATE %s SET %s WHERE id IN (%s)", tableName, strings.Join(cases, ", "), strings.Join(placeholders, ","), ) // 5) 建议放在事务里执行 return db.Transaction(func(tx *gorm.DB) error { return tx.Exec(sql, args...).Error }) } // get 通过companyID 获取法人库数据 func get() { // 2. 查询一条数据 var ent EntInfo if err := MysqlDB.Where("company_id = ?", "001c2e9882ae982abf6e1e9ed06e2654").First(&ent).Error; err != nil { panic(err) } // 3. 反序列化 RoaringBitmap rbm := roaring.NewBitmap() if len(ent.JYCompanyLabel) > 0 { if err := rbm.UnmarshalBinary(ent.JYCompanyLabel); err != nil { panic(err) } } // 4. 转成 []uint64 ids := make([]uint64, 0, rbm.GetCardinality()) it := rbm.Iterator() for it.HasNext() { ids = append(ids, uint64(it.Next())) } fmt.Println("CompanyID:", ent.CompanyID) fmt.Println("标签ID集合:", ids) }