|
@@ -6,62 +6,16 @@ import (
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+
|
|
|
"github.com/RoaringBitmap/roaring"
|
|
|
"go.uber.org/zap"
|
|
|
+ "gorm.io/gorm"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
|
|
|
- "strings"
|
|
|
- "time"
|
|
|
)
|
|
|
|
|
|
-// dealAllData 处理存量数据
|
|
|
-func dealAllData() {
|
|
|
- jlog.Info("dealAllData", zap.String("开始处理法人库存量数据", "-------------"))
|
|
|
- ctx := context.Background()
|
|
|
- // 批量处理数据
|
|
|
- batchSize := 100 // 每批处理的数据量
|
|
|
- offset := 0
|
|
|
- count := 0
|
|
|
-
|
|
|
- for {
|
|
|
- query := fmt.Sprintf(`
|
|
|
- SELECT company_id,company_name,company_code,credit_no,org_tags,bitmapToArray(company_label) labels,org_code, tax_code,establish_date,
|
|
|
-
|
|
|
- FROM ent_info LIMIT %d OFFSET %d
|
|
|
- `, batchSize, offset)
|
|
|
-
|
|
|
- rows, err := ClickHouseConn.Query(ctx, query)
|
|
|
- if err != nil {
|
|
|
- jlog.Info("allUpdateBitmap2", zap.Error(err))
|
|
|
- }
|
|
|
-
|
|
|
- if !rows.Next() {
|
|
|
- break
|
|
|
- }
|
|
|
-
|
|
|
- for rows.Next() {
|
|
|
- count++
|
|
|
- var ent EntInfo
|
|
|
- //var bitmapVals []uint64
|
|
|
- var company_id, companyName, creditNo, orgTags string
|
|
|
-
|
|
|
- var oldLabels = make([]uint64, 0)
|
|
|
- if err = rows.Scan(&company_id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil {
|
|
|
- jlog.Info("dealIncEntInfo", zap.Error(err))
|
|
|
- }
|
|
|
-
|
|
|
- if count%10000 == 0 {
|
|
|
- jlog.Info("allUpdateBitmap2:", zap.Int("current", offset), zap.String("name", ent.CompanyName))
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- offset += batchSize
|
|
|
- }
|
|
|
-
|
|
|
- jlog.Info("dealAllData", zap.Int("存量数据迭代完毕", offset))
|
|
|
-}
|
|
|
-
|
|
|
// dealAllFromCompanyBase 从company_base 处理惬意数据存量
|
|
|
func dealAllFromCompanyBase() {
|
|
|
jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据"))
|
|
@@ -235,6 +189,205 @@ func dealAllFromCompanyBase() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// dealLeaf 处理存量非叶子节点的企业数据标签
|
|
|
+func dealLeaf() {
|
|
|
+ const batchSize = 50
|
|
|
+ lastID := uint64(0)
|
|
|
+
|
|
|
+ for {
|
|
|
+ var companies []EntInfo
|
|
|
+ // 分批查询
|
|
|
+ if err := MysqlDB.Model(&EntInfo{}).
|
|
|
+ Select("id, company_name, credit_no, company_type, jy_company_type_is_leaf").
|
|
|
+ Where("jy_company_type_is_leaf = ?", 0).
|
|
|
+ Order("id ASC").
|
|
|
+ Limit(batchSize).
|
|
|
+ Find(&companies).Error; err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(companies) == 0 {
|
|
|
+ fmt.Println("处理完成 ✅")
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ if lastID%1000 == 0 {
|
|
|
+ jlog.Info("dealLeaf", zap.Any("lastID", lastID), zap.Any("id", companies[0].ID))
|
|
|
+ }
|
|
|
+
|
|
|
+ // 只存储有变化的公司
|
|
|
+ updates := make(map[uint64]map[string]interface{})
|
|
|
+ for i := range companies {
|
|
|
+ if companies[i].JYCompanyTypeIsLeaf == 1 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ company_name := util.ObjToString(companies[i].CompanyName)
|
|
|
+ top_names := getTopNames(company_name)
|
|
|
+
|
|
|
+ for _, top_name := range top_names {
|
|
|
+ topwhere := map[string]interface{}{
|
|
|
+ "use_flag": 0,
|
|
|
+ "company_status": map[string]interface{}{
|
|
|
+ "$nin": []string{"注销", "吊销", "吊销,已注销"},
|
|
|
+ },
|
|
|
+ "company_name": top_name,
|
|
|
+ }
|
|
|
+
|
|
|
+ top_bases, _ := MgoQY.FindOne("company_base", topwhere)
|
|
|
+ if top_bases != nil && len(*top_bases) > 0 {
|
|
|
+ //获取上级企业类型
|
|
|
+ top_company_type := util.ObjToString((*top_bases)["company_type"])
|
|
|
+ if norm_info, ok := nameNorm[top_company_type]; ok {
|
|
|
+ // 这里判断:如果已有字段不一样,才算变更
|
|
|
+ if companies[i].JYCompanyTypeLeafCode != norm_info.Code ||
|
|
|
+ companies[i].JYCompanyTypeLeafName != norm_info.Name ||
|
|
|
+ companies[i].JYCompanyTypeLeafTag != norm_info.Tag ||
|
|
|
+ companies[i].JYOrgPropertyThreeTag != norm_info.Tag2 {
|
|
|
+ updates[companies[i].ID] = map[string]interface{}{
|
|
|
+ "jy_company_type_leaf_code": norm_info.Code,
|
|
|
+ "jy_company_type_leaf_name": norm_info.Name,
|
|
|
+ "jy_company_type_leaf_tag": norm_info.Tag,
|
|
|
+ "jy_org_property_three_tag": norm_info.Tag2,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 去其他库查
|
|
|
+ where2 := map[string]interface{}{"company_name": top_name}
|
|
|
+ enterprise, _ := MgoQY.FindOne("special_enterprise", where2)
|
|
|
+ if enterprise != nil && len(*enterprise) > 0 {
|
|
|
+ if companies[i].JYOrgPropertyThreeTag != "国企" {
|
|
|
+ updates[companies[i].ID] = map[string]interface{}{
|
|
|
+ "jy_org_property_three_tag": "国企",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ gov, _ := MgoQY.FindOne("special_gov_unit", where2)
|
|
|
+ if gov != nil && len(*gov) > 0 {
|
|
|
+ if companies[i].JYOrgPropertyThreeTag != "国企" {
|
|
|
+ updates[companies[i].ID] = map[string]interface{}{
|
|
|
+ "jy_org_property_three_tag": "国企",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 批量更新 (只更新有变化的)
|
|
|
+ if len(updates) > 0 {
|
|
|
+ if err := batchUpdateFields(MysqlDB, (EntInfo{}).TableName(), updates); err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 更新游标
|
|
|
+ lastID = companies[len(companies)-1].ID
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 允许更新的字段白名单(非常重要,防注入)
|
|
|
+var allowedColumns = map[string]struct{}{
|
|
|
+ "jy_company_type_leaf_code": {},
|
|
|
+ "jy_company_type_leaf_name": {},
|
|
|
+ "jy_company_type_leaf_tag": {},
|
|
|
+ "jy_org_property_three_tag": {},
|
|
|
+ // 需要的话继续补充其它允许批量更新的字段
|
|
|
+}
|
|
|
+
|
|
|
+// batchUpdateFields 批量更新
|
|
|
+func batchUpdateFields(db *gorm.DB, tableName string, updates map[uint64]map[string]interface{}) error {
|
|
|
+ if len(updates) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // 1) 收集字段(按白名单过滤)
|
|
|
+ fieldSet := make(map[string]struct{})
|
|
|
+ for _, m := range updates {
|
|
|
+ for col := range m {
|
|
|
+ if _, ok := allowedColumns[col]; ok {
|
|
|
+ fieldSet[col] = struct{}{}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(fieldSet) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ fields := make([]string, 0, len(fieldSet))
|
|
|
+ for col := range fieldSet {
|
|
|
+ fields = append(fields, col)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2) 构造 CASE 语句和参数
|
|
|
+ cases := make([]string, 0, len(fields))
|
|
|
+ args := make([]interface{}, 0, len(updates)*len(fields)*2)
|
|
|
+ idSet := make(map[uint64]struct{}, len(updates))
|
|
|
+
|
|
|
+ for _, field := range fields {
|
|
|
+ var sb strings.Builder
|
|
|
+ sb.WriteString(field)
|
|
|
+ sb.WriteString(" = CASE id ")
|
|
|
+
|
|
|
+ hasWhen := false
|
|
|
+ for id, m := range updates {
|
|
|
+ if val, ok := m[field]; ok {
|
|
|
+ sb.WriteString("WHEN ? THEN ? ")
|
|
|
+ args = append(args, id, val)
|
|
|
+ idSet[id] = struct{}{}
|
|
|
+ hasWhen = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果这个字段对所有 id 都没有需要更新的值,就跳过它
|
|
|
+ if !hasWhen {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ sb.WriteString("ELSE ")
|
|
|
+ sb.WriteString(field)
|
|
|
+ sb.WriteString(" END")
|
|
|
+
|
|
|
+ cases = append(cases, sb.String())
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果所有字段都被跳过了(比如全被白名单过滤),直接返回
|
|
|
+ if len(cases) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3) WHERE IN 使用占位符
|
|
|
+ ids := make([]uint64, 0, len(idSet))
|
|
|
+ for id := range idSet {
|
|
|
+ ids = append(ids, id)
|
|
|
+ }
|
|
|
+ placeholders := make([]string, 0, len(ids))
|
|
|
+ for range ids {
|
|
|
+ placeholders = append(placeholders, "?")
|
|
|
+ }
|
|
|
+ for _, id := range ids {
|
|
|
+ args = append(args, id)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4) 组装最终 SQL
|
|
|
+ sql := fmt.Sprintf(
|
|
|
+ "UPDATE %s SET %s WHERE id IN (%s)",
|
|
|
+ tableName,
|
|
|
+ strings.Join(cases, ", "),
|
|
|
+ strings.Join(placeholders, ","),
|
|
|
+ )
|
|
|
+
|
|
|
+ // 5) 建议放在事务里执行
|
|
|
+ return db.Transaction(func(tx *gorm.DB) error {
|
|
|
+ return tx.Exec(sql, args...).Error
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
// get 通过companyID 获取法人库数据
|
|
|
func get() {
|
|
|
// 2. 查询一条数据
|