package main import ( "context" "fmt" "github.com/google/uuid" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "strings" "time" ) // dealIncQyData 处理凭安增量数据 func dealIncQyData() { log.Info("dealIncQyData", zap.String("开始处理", "-------凭安增量数据")) defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) //获取本周一的时间 monday := getMondayOfCurrentWeek() where := map[string]interface{}{ "updatetime": map[string]interface{}{ "$gt": monday.Unix(), //"$gt": 1733068800, }, } log.Info("dealIncQyData", zap.Any("where", where)) //query := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` //1、处理qyxy_std 凭安增量数据 count := 0 it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(where).Limit(100).Select(nil).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Info("dealIncQyData", zap.Int("current:", count), zap.Any("company_name", tmp["company_name"])) } use_flag := util.IntAll(tmp["use_flag"]) is_history := util.IntAll(tmp["is_history"]) company_type := util.ObjToString(tmp["company_type"]) company_name := util.ObjToString(tmp["company_name"]) // if use_flag > 5 || is_history != 0 || company_name == "" || company_type == "个体工商户" { continue } ctx := context.Background() //1.判断是否存在 query := `SELECT id FROM information.ent_info WHERE company_name = ?` rows, err := ClickHouseConn.Query(ctx, query, company_name) if err != nil { log.Info("dealIncQyData", zap.Error(err)) } count2 := 0 var id string for rows.Next() { if err = rows.Scan( &id, ); err != nil { log.Info("dealIncQyData", zap.Error(err)) } count2++ } //2.已经存在的法人库,直接诶跳过 if count2 > 0 { continue } id = uuid.New().String() id = strings.ReplaceAll(id, "-", "") company_id := util.ObjToString(tmp["_id"]) area := util.ObjToString(tmp["company_area"]) city := util.ObjToString(tmp["company_city"]) district := util.ObjToString(tmp["company_district"]) area_code, city_code, district_code := CalculateRegionCode(area, city, district) company_address := util.ObjToString(tmp["company_address"]) //获取Bitmap 和对应字符串 company_label, company_label_str := getCompanyLabelBitmap(util.ObjToString(tmp["credit_no"]), company_name) company_code := util.ObjToString(tmp["company_code"]) credit_no := util.ObjToString(tmp["credit_no"]) org_code := util.ObjToString(tmp["org_code"]) tax_code := util.ObjToString(tmp["tax_code"]) establish_date := util.Int64All(tmp["establish_date"]) legal_person := util.ObjToString(tmp["legal_person"]) legal_person_caption := util.ObjToString(tmp["legal_person_caption"]) company_status := util.ObjToString(tmp["company_status"]) authority := util.ObjToString(tmp["authority"]) issue_date := util.Int64All(tmp["issue_date"]) operation_startdate := util.ObjToString(tmp["operation_startdate"]) operation_enddate := util.ObjToString(tmp["operation_enddate"]) capital := util.ObjToString(tmp["capital"]) business_scope := util.ObjToString(tmp["business_scope"]) comeintime := util.Int64All(tmp["comeintime"]) updatetime := util.Int64All(tmp["updatetime"]) legal_person_type := util.IntAll(tmp["legal_person_type"]) real_capital := util.ObjToString(tmp["real_capital"]) en_name := util.ObjToString(tmp["en_name"]) list_code := util.ObjToString(tmp["list_code"]) employee_no := util.IntAll(tmp["employee_no"]) website := util.ObjToString(tmp["website"]) company_phone := util.ObjToString(tmp["company_phone"]) company_email := util.ObjToString(tmp["company_email"]) //插入Clickhouse的数据 query2 := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` err = ClickHouseConn.Exec(context.Background(), query2, id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) if err != nil { log.Info("dealIncQyData", zap.String("company_name", company_name), zap.Error(err)) } else { //保存到es data := map[string]interface{}{ "_id": id, "id": id, "company_name": company_name, "company_id": company_id, "company_address": company_address, "area_code": area_code, "city_code": city_code, "district_code": district_code, "company_label": company_label_str, "company_code": company_code, "credit_no": credit_no, "org_code": org_code, "tax_code": tax_code, "establish_date": establish_date, "legal_person": legal_person, "legal_person_caption": legal_person_caption, "company_status": company_status, "company_type": company_type, "authority": authority, "issue_date": issue_date, "operation_startdate": operation_startdate, "operation_enddate": operation_enddate, "capital": capital, "business_scope": business_scope, "comeintime": comeintime, "updatetime": updatetime, "legal_person_type": legal_person_type, "real_capital": real_capital, "en_name": en_name, "list_code": list_code, "employee_no": employee_no, "website": website, "company_phone": company_phone, "company_email": company_email, } saveEsPool <- data log.Info("dealIncQyData", zap.String("name", company_name), zap.String("company_id", company_id)) } } log.Info("dealIncQyData", zap.String("凭安增量数据", "-------处理完毕")) } // dealIncEntInfo 处理法人库增量数据 func dealIncEntInfo() { log.Info("dealIncEntInfo", zap.String("开始处理标签", "--------")) now := time.Now() yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local) ctx := context.Background() // 批量处理数据 batchSize := 100 // 每批处理的数据量 offset := 0 count := 0 for { query := fmt.Sprintf(` SELECT id,company_name,credit_no,org_tags,bitmapToArray(company_label) labels FROM ent_info where updatetime >= %v LIMIT %d OFFSET %d `, yesterday.Unix(), batchSize, offset) rows, err := ClickHouseConn.Query(ctx, query) if err != nil { log.Info("dealIncEntInfo", zap.Error(err)) } if !rows.Next() { break } for rows.Next() { count++ var id, companyName, creditNo, orgTags string var addLabels = make([]uint64, 0) var oldLabels = make([]uint64, 0) if err = rows.Scan(&id, &companyName, &creditNo, &orgTags, &oldLabels); err != nil { log.Info("dealIncEntInfo", zap.Error(err)) } if count%10000 == 0 { log.Info("dealIncEntInfo:", zap.Int("current", offset), zap.String("id", id), zap.String("companyName", companyName)) } //1 dealCompanyNo tags1 := dealCompanyNo(creditNo) if len(tags1) > 0 { addLabels = append(addLabels, tags1...) } //2.处理 org_tags tags2 := dealOrgTags(orgTags) if len(tags2) > 0 { addLabels = append(addLabels, tags2...) } if len(addLabels) > 0 { // 构建 toUInt64 数组字符串 toUInt64Array := buildToUInt64Array(addLabels) // SQL 动态生成 sql := fmt.Sprintf(` ALTER TABLE information.ent_info UPDATE company_label = bitmapOr(company_label, bitmapBuild(%s)) WHERE company_name = ? `, toUInt64Array) err = ClickHouseConn.Exec(context.Background(), sql, companyName) if err != nil { log.Info("dealIncEntInfo", zap.Error(err)) } } //2.更新es var labelNames = make([]string, 0) oldLabels = append(oldLabels, addLabels...) for _, v := range oldLabels { if name, ok := entLabelMap[v]; ok { labelNames = append(labelNames, name) } } if len(labelNames) > 0 { labelNames = removeDuplicates(labelNames) //去重 esUpdate := map[string]interface{}{ "company_label": strings.Join(labelNames, ","), } updateEsPool <- []map[string]interface{}{ {"_id": id}, esUpdate, } } } offset += batchSize } log.Info("dealIncEntInfo", zap.Int("数据迭代完毕", offset)) }