package main import ( "context" "fmt" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/robfig/cron/v3" "go.uber.org/zap" "gorm.io/gorm" es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "strings" "time" ) var ( GF GlobalConf MysqlDB *gorm.DB MgoQY *mongodb.MongodbSim //181凭安 Mgo *mongodb.MongodbSim //标讯地址-凭安库mixdata Esa *es.Elastic Esb *es.Elastic ClickHouseConn driver.Conn entLabelMap = make(map[uint64]string) nameBitMap = make(map[string]uint64) //和上面k-v 反过来 //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 RegionCodeData = map[string]string{} //保存到es saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es saveEsSp = make(chan bool, 5) EsBulkSize = 50 ) func main() { //定时任务 local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) // 每天执行一次更新法人库信息 _, err := c.AddFunc(GF.Env.Spec, dealIncDataEntInfo) if err != nil { jlog.Info("AddFunc dealIncData err", zap.Error(err)) } //每周执行一次凭安数据 _, err = c.AddFunc(GF.Env.Spec2, dealIncDataQY) if err != nil { jlog.Info("AddFunc dealIncDataQY err", zap.Error(err)) } c.Start() defer c.Stop() //dealIncEntInfo() //dealIncData() //处理增量数据 //dealAllData()// 处理存量数据 select {} } // dealIncDataQY 处理凭安增量数据 func dealIncDataQY() { go SaveEsMethod() go dealIncQyData() //处理凭安增量数据 } // dealIncDataEntInfo 处理法人库增量数据 func dealIncDataEntInfo() { go updateEsMethod() go dealIncEntInfo() } // dealAllData 处理法人库存量数据 func dealAllData() { go updateEsMethod() go allUpdateBitmap2() //单个更新 } // 处理存量法人库数据,单个更新bitmap; func allUpdateBitmap2() { jlog.Info("allUpdateBitmap2", zap.String("开始处理标签", "vvvvvvvvvvvvvv")) ctx := context.Background() // 批量处理数据 batchSize := 100 // 每批处理的数据量 offset := 0 count := 0 for { query := fmt.Sprintf(` SELECT id,company_name,credit_no,org_tags,bitmapToArray(company_label) labels FROM ent_info LIMIT %d OFFSET %d `, batchSize, offset) rows, err := ClickHouseConn.Query(ctx, query) if err != nil { jlog.Info("allUpdateBitmap2", zap.Error(err)) } if !rows.Next() { break } for rows.Next() { count++ var id, companyName, creditNo, orgTags string var addLabels = make([]uint64, 0) var oldLabels = make([]uint64, 0) if err = rows.Scan(&id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil { jlog.Info("allUpdateBitmap2", zap.Error(err)) } if count%10000 == 0 { jlog.Info("allUpdateBitmap2:", zap.Int("current", offset), zap.String("id", id), zap.String("companyName", companyName)) } //1 dealCompanyNo tags1 := dealCompanyNo(creditNo) if len(tags1) > 0 { addLabels = append(addLabels, tags1...) } //2.处理 org_tags tags2 := dealOrgTags(orgTags) if len(tags2) > 0 { addLabels = append(addLabels, tags2...) } if len(addLabels) > 0 { // 构建 toUInt64 数组字符串 toUInt64Array := buildToUInt64Array(addLabels) // SQL 动态生成 sql := fmt.Sprintf(` ALTER TABLE information.ent_info UPDATE company_label = bitmapOr(company_label, bitmapBuild(%s)) WHERE company_name = ? `, toUInt64Array) err = ClickHouseConn.Exec(context.Background(), sql, companyName) if err != nil { jlog.Info("allUpdateBitmap2", zap.Error(err)) } } //2.更新es var labelNames = make([]string, 0) oldLabels = append(oldLabels, addLabels...) for _, v := range oldLabels { if name, ok := entLabelMap[v]; ok { labelNames = append(labelNames, name) } } if len(labelNames) > 0 { labelNames = removeDuplicates(labelNames) //去重 esUpdate := map[string]interface{}{ "company_label": strings.Join(labelNames, ","), } updateEsPool <- []map[string]interface{}{ {"_id": id}, esUpdate, } } } offset += batchSize } jlog.Info("allUpdateBitmap2", zap.Int("数据迭代完毕", offset)) } // dealCompanyNo dealCompanyNo 统一信用代码 func dealCompanyNo(companyNo string) (newTags []uint64) { // 前缀与标签映射表 prefixTagMap := map[string]uint64{ "11": 151, "12": 152, "13": 153, "19": 154, "21": 155, "29": 156, "31": 157, "32": 158, "33": 159, "34": 160, "35": 161, "39": 162, "41": 163, "49": 164, "51": 165, "52": 166, "53": 167, "59": 168, "61": 169, "62": 170, "69": 171, "71": 172, "72": 173, "79": 174, "81": 175, "89": 176, "91": 177, "92": 178, "93": 179, "A1": 180, "A2": 181, "N1": 182, "N2": 183, "N3": 184, "N9": 185, "Y1": 186, } // 遍历映射表进行前缀匹配 for prefix, tag := range prefixTagMap { if strings.HasPrefix(companyNo, prefix) { newTags = append(newTags, tag) } } return } // dealOrgTags 处理国民经济行业分类 func dealOrgTags(org_tags string) (newTags []uint64) { var categoryMap = map[string]uint64{ "外交": 2193, "发展和改革": 2194, "科学技术/科技": 2195, "民族事务": 2196, "保密局": 2197, "国安局": 2198, "司法": 2199, "法院": 2200, "检察院": 2201, "人力资源和社会保障": 2202, "生态环境": 2203, "交通运输": 2204, "农业农村": 2205, "退役军人事务": 2206, "人民银行": 2207, "国防": 2208, "教育": 2209, "党校": 2210, "工业和信息化": 2211, "公安": 2212, "民政": 2213, "财政": 2214, "自然资源(包含规划)": 2215, "住建": 2216, "水利": 2217, "商务": 2218, "卫生健康": 2219, "应急管理": 2220, "审计": 2221, "国有资产监督管理": 2222, "海关": 2223, "市场监督": 2224, "证券监督管理": 2225, "体育": 2226, "统计": 2227, "国际发展合作": 2228, "税务": 2229, "金融": 2230, "广播电视": 2231, "信访": 2232, "知识产权": 2233, "医疗保障": 2234, "新华通讯社": 2235, "气象": 2236, "科学院": 2237, "工程院": 2238, "粮食和物资储备": 2239, "数据": 2240, "烟草专卖": 2241, "林业和草原": 2242, "民用航空": 2243, "文物": 2244, "疾病预防控制": 2245, "消防救援": 2246, "药品监督": 2247, "能源": 2248, "移民": 2249, "铁路": 2250, "邮政": 2251, "中医药": 2252, "外汇": 2253, "供销合作社": 2254, "公共资源交易中心": 2255, "监狱": 2256, "城乡建设": 2257, "文旅": 2258, "人民防空": 2259, "园林": 2260, "物流口岸": 2261, "大数据": 2262, "政务服务": 2263, "地方史志": 2264, "住房公积金管理中心": 2265, "仲裁": 2266, "招商": 2267, "社保中心": 2268, "管委会": 2269, "人民政府": 2270, "工商联": 2271, "残联": 2272, "妇联": 2273, "艺术联": 2274, "侨联": 2275, "台联": 2276, "城管": 2277, "编办": 2278, "政协": 2279, "民主党派": 2280, "党委": 2281, "团委": 2282, "人大": 2283, "档案局": 2284, "武装": 2285, "医院": 2286, "渔业": 2287, "学校": 2288, } orgTags := strings.TrimSpace(org_tags) if strings.Contains(orgTags, "-") { classifications := readFile() //读取国标行业分类 if len(classifications) > 0 { ss := findEntCodesByLabel(orgTags, classifications) newTags = append(newTags, ss...) } } else { //其它单独标签 for prefix, tag := range categoryMap { if orgTags == prefix { newTags = append(newTags, tag) } } } return } // 根据label返回对应的EntCode数组 func findEntCodesByLabel(label string, classifications []IndustryClassification) []uint64 { // 将label分割成多个层级 labels := strings.Split(label, "-") var result []uint64 if len(labels) > 0 { for k, v := range labels { rs := findIndustryClassification(k+1, v, classifications) if rs != nil { result = append(result, rs.EntCode) } } } return result } // 根据level和name查找对应的IndustryClassification func findIndustryClassification(level int, name string, classifications []IndustryClassification) *IndustryClassification { // 遍历每个分类 for _, classification := range classifications { // 如果当前分类的level和name匹配,返回当前分类 if classification.Level == level && classification.Name == name { return &classification } // 如果当前分类有子分类,则递归查找子分类 if len(classification.Children) > 0 { if result := findIndustryClassification(level, name, classification.Children); result != nil { return result } } } // 如果没有找到匹配的分类,返回nil return nil } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Esa.UpdateBulk(GF.Env.Esindex, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Esa.UpdateBulk(GF.Env.Esindex, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } // SaveEsMethod 保存es func SaveEsMethod() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveEsPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Esa.BulkSave(GF.Env.Esindex, arru) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEsSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEsSp }() Esa.BulkSave(GF.Env.Esindex, arru) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } }