123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package main
- import (
- "context"
- "fmt"
- "github.com/google/uuid"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "strings"
- "time"
- )
- // dealIncQyData 处理凭安增量数据
- func dealIncQyData() {
- log.Info("dealIncQyData", zap.String("开始处理", "-------凭安增量数据"))
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- //获取本周一的时间
- monday := getMondayOfCurrentWeek()
- where := map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$gt": monday.Unix(),
- //"$gt": 1733068800,
- },
- }
- log.Info("dealIncQyData", zap.Any("where", where))
- //query := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
- //1、处理qyxy_std 凭安增量数据
- count := 0
- it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Limit(100).Select(nil).Iter()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Info("dealIncQyData", zap.Int("current:", count), zap.Any("company_name", tmp["company_name"]))
- }
- use_flag := util.IntAll(tmp["use_flag"])
- is_history := util.IntAll(tmp["is_history"])
- company_type := util.ObjToString(tmp["company_type"])
- company_name := util.ObjToString(tmp["company_name"])
- //
- if use_flag > 5 || is_history != 0 || company_name == "" || company_type == "个体工商户" {
- continue
- }
- ctx := context.Background()
- //1.判断是否存在
- query := `SELECT id FROM information.ent_info WHERE company_name = ?`
- rows, err := ClickHouseConn.Query(ctx, query, company_name)
- if err != nil {
- log.Info("dealIncQyData", zap.Error(err))
- }
- count2 := 0
- var id string
- for rows.Next() {
- if err = rows.Scan(
- &id,
- ); err != nil {
- log.Info("dealIncQyData", zap.Error(err))
- }
- count2++
- }
- //2.已经存在的法人库,直接诶跳过
- if count2 > 0 {
- continue
- }
- id = uuid.New().String()
- id = strings.ReplaceAll(id, "-", "")
- company_id := util.ObjToString(tmp["_id"])
- area := util.ObjToString(tmp["company_area"])
- city := util.ObjToString(tmp["company_city"])
- district := util.ObjToString(tmp["company_district"])
- area_code, city_code, district_code := CalculateRegionCode(area, city, district)
- company_address := util.ObjToString(tmp["company_address"])
- //获取Bitmap 和对应字符串
- company_label, company_label_str := getCompanyLabelBitmap(util.ObjToString(tmp["credit_no"]), company_name)
- company_code := util.ObjToString(tmp["company_code"])
- credit_no := util.ObjToString(tmp["credit_no"])
- org_code := util.ObjToString(tmp["org_code"])
- tax_code := util.ObjToString(tmp["tax_code"])
- establish_date := util.Int64All(tmp["establish_date"])
- legal_person := util.ObjToString(tmp["legal_person"])
- legal_person_caption := util.ObjToString(tmp["legal_person_caption"])
- company_status := util.ObjToString(tmp["company_status"])
- authority := util.ObjToString(tmp["authority"])
- issue_date := util.Int64All(tmp["issue_date"])
- operation_startdate := util.ObjToString(tmp["operation_startdate"])
- operation_enddate := util.ObjToString(tmp["operation_enddate"])
- capital := util.ObjToString(tmp["capital"])
- business_scope := util.ObjToString(tmp["business_scope"])
- comeintime := util.Int64All(tmp["comeintime"])
- updatetime := util.Int64All(tmp["updatetime"])
- legal_person_type := util.IntAll(tmp["legal_person_type"])
- real_capital := util.ObjToString(tmp["real_capital"])
- en_name := util.ObjToString(tmp["en_name"])
- list_code := util.ObjToString(tmp["list_code"])
- employee_no := util.IntAll(tmp["employee_no"])
- website := util.ObjToString(tmp["website"])
- company_phone := util.ObjToString(tmp["company_phone"])
- company_email := util.ObjToString(tmp["company_email"])
- //插入Clickhouse的数据
- query2 := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
- err = ClickHouseConn.Exec(context.Background(), query2, id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email)
- if err != nil {
- log.Info("dealIncQyData", zap.String("company_name", company_name), zap.Error(err))
- } else {
- //保存到es
- data := map[string]interface{}{
- "_id": id,
- "id": id,
- "company_name": company_name,
- "company_id": company_id,
- "company_address": company_address,
- "area_code": area_code,
- "city_code": city_code,
- "district_code": district_code,
- "company_label": company_label_str,
- "company_code": company_code,
- "credit_no": credit_no,
- "org_code": org_code,
- "tax_code": tax_code,
- "establish_date": establish_date,
- "legal_person": legal_person,
- "legal_person_caption": legal_person_caption,
- "company_status": company_status,
- "company_type": company_type,
- "authority": authority,
- "issue_date": issue_date,
- "operation_startdate": operation_startdate,
- "operation_enddate": operation_enddate,
- "capital": capital,
- "business_scope": business_scope,
- "comeintime": comeintime,
- "updatetime": updatetime,
- "legal_person_type": legal_person_type,
- "real_capital": real_capital,
- "en_name": en_name,
- "list_code": list_code,
- "employee_no": employee_no,
- "website": website,
- "company_phone": company_phone,
- "company_email": company_email,
- }
- saveEsPool <- data
- log.Info("dealIncQyData", zap.String("name", company_name), zap.String("company_id", company_id))
- }
- }
- log.Info("dealIncQyData", zap.String("凭安增量数据", "-------处理完毕"))
- }
- // dealIncEntInfo 处理法人库增量数据
- func dealIncEntInfo() {
- log.Info("dealIncEntInfo", zap.String("开始处理标签", "--------"))
- now := time.Now()
- yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
- ctx := context.Background()
- // 批量处理数据
- batchSize := 100 // 每批处理的数据量
- offset := 0
- count := 0
- for {
- query := fmt.Sprintf(`
- SELECT id,company_name,credit_no,org_tags,bitmapToArray(company_label) labels FROM ent_info where updatetime >= %v LIMIT %d OFFSET %d
- `, yesterday.Unix(), batchSize, offset)
- rows, err := ClickHouseConn.Query(ctx, query)
- if err != nil {
- log.Info("dealIncEntInfo", zap.Error(err))
- }
- if !rows.Next() {
- break
- }
- for rows.Next() {
- count++
- var id, companyName, creditNo, orgTags string
- var addLabels = make([]uint64, 0)
- var oldLabels = make([]uint64, 0)
- if err = rows.Scan(&id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil {
- log.Info("dealIncEntInfo", zap.Error(err))
- }
- if count%10000 == 0 {
- log.Info("dealIncEntInfo:", zap.Int("current", offset), zap.String("id", id), zap.String("companyName", companyName))
- }
- //1 dealCompanyNo
- tags1 := dealCompanyNo(creditNo)
- if len(tags1) > 0 {
- addLabels = append(addLabels, tags1...)
- }
- //2.处理 org_tags
- tags2 := dealOrgTags(orgTags)
- if len(tags2) > 0 {
- addLabels = append(addLabels, tags2...)
- }
- if len(addLabels) > 0 {
- // 构建 toUInt64 数组字符串
- toUInt64Array := buildToUInt64Array(addLabels)
- // SQL 动态生成
- sql := fmt.Sprintf(`
- ALTER TABLE information.ent_info
- UPDATE company_label = bitmapOr(company_label, bitmapBuild(%s))
- WHERE company_name = ?
- `, toUInt64Array)
- err = ClickHouseConn.Exec(context.Background(), sql, companyName)
- if err != nil {
- log.Info("dealIncEntInfo", zap.Error(err))
- }
- }
- //2.更新es
- var labelNames = make([]string, 0)
- oldLabels = append(oldLabels, addLabels...)
- for _, v := range oldLabels {
- if name, ok := entLabelMap[v]; ok {
- labelNames = append(labelNames, name)
- }
- }
- if len(labelNames) > 0 {
- labelNames = removeDuplicates(labelNames) //去重
- esUpdate := map[string]interface{}{
- "company_label": strings.Join(labelNames, ","),
- }
- updateEsPool <- []map[string]interface{}{
- {"_id": id},
- esUpdate,
- }
- }
- }
- offset += batchSize
- }
- log.Info("dealIncEntInfo", zap.Int("数据迭代完毕", offset))
- }
|