123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- package main
- import (
- "context"
- "fmt"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- "github.com/robfig/cron/v3"
- "go.uber.org/zap"
- "gorm.io/gorm"
- es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- jlog "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "strings"
- "time"
- )
- var (
- GF GlobalConf
- MysqlDB *gorm.DB
- MgoQY *mongodb.MongodbSim //181凭安
- Mgo *mongodb.MongodbSim //标讯地址-凭安库mixdata
- Esa *es.Elastic
- Esb *es.Elastic
- ClickHouseConn driver.Conn
- entLabelMap = make(map[uint64]string)
- nameBitMap = make(map[string]uint64) //和上面k-v 反过来
- //更新es
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 5) //保存协程
- RegionCodeData = map[string]string{}
- //保存到es
- saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
- saveEsSp = make(chan bool, 5)
- EsBulkSize = 50
- )
- func main() {
- //定时任务
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- // 每天执行一次更新法人库信息
- _, err := c.AddFunc(GF.Env.Spec, dealIncDataEntInfo)
- if err != nil {
- jlog.Info("AddFunc dealIncData err", zap.Error(err))
- }
- //每周执行一次凭安数据
- _, err = c.AddFunc(GF.Env.Spec2, dealIncDataQY)
- if err != nil {
- jlog.Info("AddFunc dealIncDataQY err", zap.Error(err))
- }
- c.Start()
- defer c.Stop()
- //dealIncEntInfo()
- //dealIncData() //处理增量数据
- //dealAllData()// 处理存量数据
- select {}
- }
- // dealIncDataQY 处理凭安增量数据
- func dealIncDataQY() {
- go SaveEsMethod()
- go dealIncQyData() //处理凭安增量数据
- }
- // dealIncDataEntInfo 处理法人库增量数据
- func dealIncDataEntInfo() {
- go updateEsMethod()
- go dealIncEntInfo()
- }
- // dealAllData 处理法人库存量数据
- func dealAllData() {
- go updateEsMethod()
- go allUpdateBitmap2() //单个更新
- }
- // 处理存量法人库数据,单个更新bitmap;
- func allUpdateBitmap2() {
- jlog.Info("allUpdateBitmap2", zap.String("开始处理标签", "vvvvvvvvvvvvvv"))
- ctx := context.Background()
- // 批量处理数据
- batchSize := 100 // 每批处理的数据量
- offset := 0
- count := 0
- for {
- query := fmt.Sprintf(`
- SELECT id,company_name,credit_no,org_tags,bitmapToArray(company_label) labels FROM ent_info LIMIT %d OFFSET %d
- `, batchSize, offset)
- rows, err := ClickHouseConn.Query(ctx, query)
- if err != nil {
- jlog.Info("allUpdateBitmap2", zap.Error(err))
- }
- if !rows.Next() {
- break
- }
- for rows.Next() {
- count++
- var id, companyName, creditNo, orgTags string
- var addLabels = make([]uint64, 0)
- var oldLabels = make([]uint64, 0)
- if err = rows.Scan(&id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil {
- jlog.Info("allUpdateBitmap2", zap.Error(err))
- }
- if count%10000 == 0 {
- jlog.Info("allUpdateBitmap2:", zap.Int("current", offset), zap.String("id", id), zap.String("companyName", companyName))
- }
- //1 dealCompanyNo
- tags1 := dealCompanyNo(creditNo)
- if len(tags1) > 0 {
- addLabels = append(addLabels, tags1...)
- }
- //2.处理 org_tags
- tags2 := dealOrgTags(orgTags)
- if len(tags2) > 0 {
- addLabels = append(addLabels, tags2...)
- }
- if len(addLabels) > 0 {
- // 构建 toUInt64 数组字符串
- toUInt64Array := buildToUInt64Array(addLabels)
- // SQL 动态生成
- sql := fmt.Sprintf(`
- ALTER TABLE information.ent_info
- UPDATE company_label = bitmapOr(company_label, bitmapBuild(%s))
- WHERE company_name = ?
- `, toUInt64Array)
- err = ClickHouseConn.Exec(context.Background(), sql, companyName)
- if err != nil {
- jlog.Info("allUpdateBitmap2", zap.Error(err))
- }
- }
- //2.更新es
- var labelNames = make([]string, 0)
- oldLabels = append(oldLabels, addLabels...)
- for _, v := range oldLabels {
- if name, ok := entLabelMap[v]; ok {
- labelNames = append(labelNames, name)
- }
- }
- if len(labelNames) > 0 {
- labelNames = removeDuplicates(labelNames) //去重
- esUpdate := map[string]interface{}{
- "company_label": strings.Join(labelNames, ","),
- }
- updateEsPool <- []map[string]interface{}{
- {"_id": id},
- esUpdate,
- }
- }
- }
- offset += batchSize
- }
- jlog.Info("allUpdateBitmap2", zap.Int("数据迭代完毕", offset))
- }
- // dealCompanyNo dealCompanyNo 统一信用代码
- func dealCompanyNo(companyNo string) (newTags []uint64) {
- // 前缀与标签映射表
- prefixTagMap := map[string]uint64{
- "11": 151, "12": 152, "13": 153, "19": 154,
- "21": 155, "29": 156, "31": 157, "32": 158,
- "33": 159, "34": 160, "35": 161, "39": 162,
- "41": 163, "49": 164, "51": 165, "52": 166,
- "53": 167, "59": 168, "61": 169, "62": 170,
- "69": 171, "71": 172, "72": 173, "79": 174,
- "81": 175, "89": 176, "91": 177, "92": 178,
- "93": 179, "A1": 180, "A2": 181, "N1": 182,
- "N2": 183, "N3": 184, "N9": 185, "Y1": 186,
- }
- // 遍历映射表进行前缀匹配
- for prefix, tag := range prefixTagMap {
- if strings.HasPrefix(companyNo, prefix) {
- newTags = append(newTags, tag)
- }
- }
- return
- }
- // dealOrgTags 处理国民经济行业分类
- func dealOrgTags(org_tags string) (newTags []uint64) {
- var categoryMap = map[string]uint64{
- "外交": 2193,
- "发展和改革": 2194,
- "科学技术/科技": 2195,
- "民族事务": 2196,
- "保密局": 2197,
- "国安局": 2198,
- "司法": 2199,
- "法院": 2200,
- "检察院": 2201,
- "人力资源和社会保障": 2202,
- "生态环境": 2203,
- "交通运输": 2204,
- "农业农村": 2205,
- "退役军人事务": 2206,
- "人民银行": 2207,
- "国防": 2208,
- "教育": 2209,
- "党校": 2210,
- "工业和信息化": 2211,
- "公安": 2212,
- "民政": 2213,
- "财政": 2214,
- "自然资源(包含规划)": 2215,
- "住建": 2216,
- "水利": 2217,
- "商务": 2218,
- "卫生健康": 2219,
- "应急管理": 2220,
- "审计": 2221,
- "国有资产监督管理": 2222,
- "海关": 2223,
- "市场监督": 2224,
- "证券监督管理": 2225,
- "体育": 2226,
- "统计": 2227,
- "国际发展合作": 2228,
- "税务": 2229,
- "金融": 2230,
- "广播电视": 2231,
- "信访": 2232,
- "知识产权": 2233,
- "医疗保障": 2234,
- "新华通讯社": 2235,
- "气象": 2236,
- "科学院": 2237,
- "工程院": 2238,
- "粮食和物资储备": 2239,
- "数据": 2240,
- "烟草专卖": 2241,
- "林业和草原": 2242,
- "民用航空": 2243,
- "文物": 2244,
- "疾病预防控制": 2245,
- "消防救援": 2246,
- "药品监督": 2247,
- "能源": 2248,
- "移民": 2249,
- "铁路": 2250,
- "邮政": 2251,
- "中医药": 2252,
- "外汇": 2253,
- "供销合作社": 2254,
- "公共资源交易中心": 2255,
- "监狱": 2256,
- "城乡建设": 2257,
- "文旅": 2258,
- "人民防空": 2259,
- "园林": 2260,
- "物流口岸": 2261,
- "大数据": 2262,
- "政务服务": 2263,
- "地方史志": 2264,
- "住房公积金管理中心": 2265,
- "仲裁": 2266,
- "招商": 2267,
- "社保中心": 2268,
- "管委会": 2269,
- "人民政府": 2270,
- "工商联": 2271,
- "残联": 2272,
- "妇联": 2273,
- "艺术联": 2274,
- "侨联": 2275,
- "台联": 2276,
- "城管": 2277,
- "编办": 2278,
- "政协": 2279,
- "民主党派": 2280,
- "党委": 2281,
- "团委": 2282,
- "人大": 2283,
- "档案局": 2284,
- "武装": 2285,
- "医院": 2286,
- "渔业": 2287,
- "学校": 2288,
- }
- orgTags := strings.TrimSpace(org_tags)
- if strings.Contains(orgTags, "-") {
- classifications := readFile() //读取国标行业分类
- if len(classifications) > 0 {
- ss := findEntCodesByLabel(orgTags, classifications)
- newTags = append(newTags, ss...)
- }
- } else {
- //其它单独标签
- for prefix, tag := range categoryMap {
- if orgTags == prefix {
- newTags = append(newTags, tag)
- }
- }
- }
- return
- }
- // 根据label返回对应的EntCode数组
- func findEntCodesByLabel(label string, classifications []IndustryClassification) []uint64 {
- // 将label分割成多个层级
- labels := strings.Split(label, "-")
- var result []uint64
- if len(labels) > 0 {
- for k, v := range labels {
- rs := findIndustryClassification(k+1, v, classifications)
- if rs != nil {
- result = append(result, rs.EntCode)
- }
- }
- }
- return result
- }
- // 根据level和name查找对应的IndustryClassification
- func findIndustryClassification(level int, name string, classifications []IndustryClassification) *IndustryClassification {
- // 遍历每个分类
- for _, classification := range classifications {
- // 如果当前分类的level和name匹配,返回当前分类
- if classification.Level == level && classification.Name == name {
- return &classification
- }
- // 如果当前分类有子分类,则递归查找子分类
- if len(classification.Children) > 0 {
- if result := findIndustryClassification(level, name, classification.Children); result != nil {
- return result
- }
- }
- }
- // 如果没有找到匹配的分类,返回nil
- return nil
- }
- // updateEsMethod 更新es
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Esa.UpdateBulk(GF.Env.Esindex, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Esa.UpdateBulk(GF.Env.Esindex, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
- // SaveEsMethod 保存es
- func SaveEsMethod() {
- arru := make([]map[string]interface{}, EsBulkSize)
- indexu := 0
- for {
- select {
- case v := <-saveEsPool:
- arru[indexu] = v
- indexu++
- if indexu == EsBulkSize {
- saveEsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsSp
- }()
- Esa.BulkSave(GF.Env.Esindex, arru)
- }(arru)
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveEsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEsSp
- }()
- Esa.BulkSave(GF.Env.Esindex, arru)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- }
- }
- }
|