package main import ( "context" "fmt" "github.com/RoaringBitmap/roaring" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "os" "strings" "time" ) type BitmapStore struct { bitmap *roaring.Bitmap path string } func NewBitmapStore(path string) (*BitmapStore, error) { b := roaring.New() if data, err := os.ReadFile(path); err == nil { if err := b.UnmarshalBinary(data); err != nil { return nil, err } } return &BitmapStore{bitmap: b, path: path}, nil } func (bs *BitmapStore) Contains(hash uint32) bool { return bs.bitmap.Contains(hash) } func (bs *BitmapStore) Add(hash uint32) { bs.bitmap.Add(hash) } func (bs *BitmapStore) Save() error { data, err := bs.bitmap.ToBytes() if err != nil { return err } return os.WriteFile(bs.path, data, 0644) } func mapMongoToCompanyBase(m map[string]interface{}) (*CompanyBase, string) { companyID, ok := m["company_id"].(string) if !ok || companyID == "" { return nil, "" } parseDate := func(v interface{}) *time.Time { if str, ok := v.(string); ok && str != "" { t, err := time.Parse("2006-01-02", str) if err == nil { return &t } } return nil } parseDateTime := func(v interface{}) *time.Time { if str, ok := v.(string); ok && str != "" { t, err := time.Parse("2006-01-02 15:04:05", str) if err == nil { return &t } } return nil } getInt8Ptr := func(v interface{}) *int8 { switch t := v.(type) { case int32: v := int8(t) return &v case float64: v := int8(t) return &v } return nil } return &CompanyBase{ CompanyID: companyID, ProvinceShort: fmt.Sprint(m["province_short"]), CompanyName: fmt.Sprint(m["company_name"]), CompanyCode: fmt.Sprint(m["company_code"]), CreditNo: fmt.Sprint(m["credit_no"]), OrgCode: fmt.Sprint(m["org_code"]), TaxCode: fmt.Sprint(m["tax_code"]), EstablishDate: parseDate(m["establish_date"]), LegalPerson: fmt.Sprint(m["legal_person"]), LegalPersonCaption: fmt.Sprint(m["legal_person_caption"]), CompanyStatus: fmt.Sprint(m["company_status"]), CompanyType: fmt.Sprint(m["company_type"]), Authority: fmt.Sprint(m["authority"]), IssueDate: parseDate(m["issue_date"]), OperationStartDate: fmt.Sprint(m["operation_startdate"]), OperationEndDate: fmt.Sprint(m["operation_enddate"]), Capital: fmt.Sprint(m["capital"]), CompanyAddress: fmt.Sprint(m["company_address"]), BusinessScope: fmt.Sprint(m["business_scope"]), CancelDate: parseDate(m["cancel_date"]), CancelReason: fmt.Sprint(m["cancel_reason"]), RevokeDate: parseDate(m["revoke_date"]), RevokeReason: fmt.Sprint(m["revoke_reason"]), UseFlag: getInt8Ptr(m["use_flag"]), CreateTime: parseDateTime(m["create_time"]), UpdateTime: parseDateTime(m["update_time"]), LegalPersonType: getInt8Ptr(m["legal_person_type"]), RealCapital: fmt.Sprint(m["real_capital"]), EnName: fmt.Sprint(m["en_name"]), ListCode: fmt.Sprint(m["list_code"]), LegalPersonID: fmt.Sprint(m["legal_person_id"]), }, companyID } // SyncCompanyBaseToMySQL 同步 基本信息 func SyncCompanyBaseToMySQL() error { ctx := context.Background() // Bitmap 路径默认当前目录 bitmapFile := "./company_sync_bitmap.bin" // MongoDB //mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://172.17.4.181:27001")) //clientOptions := options.Client().ApplyURI("mongodb://172.17.4.181:27001") host := GF.Mongoqy.Host username := GF.Mongoqy.Username password := GF.Mongoqy.Password authSource := "admin" // 通常是 "admin",也可以是你的数据库名 // 构造 MongoDB URI var mongoURI string if username != "" && password != "" { mongoURI = fmt.Sprintf("mongodb://%s:%s@%s/?authSource=%s", username, password, host, authSource) } else { mongoURI = fmt.Sprintf("mongodb://%s", host) } clientOptions := options.Client().ApplyURI(mongoURI) //clientOptions.SetReadPreference(readpref.Primary()) //clientOptions.SetDirect(true) // 连接MongoDB client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { log.Info("SyncCompanyBaseToMySQL", zap.Error(err)) } // 检查连接 err = client.Ping(context.Background(), nil) if err != nil { log.Info("SyncCompanyBaseToMySQL", zap.Error(err)) } //mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://127.0.0.1:27001")) mongoColl := client.Database("mixdata").Collection("company_base") bitmapStore, err := NewBitmapStore(bitmapFile) if err != nil { return err } defer bitmapStore.Save() // 不是 个体工商户的数据 filter := map[string]interface{}{ "company_type": map[string]interface{}{ "$nin": []string{"个体工商户", "个人独资企业"}, }, } cur, err := mongoColl.Find(ctx, filter) if err != nil { return err } defer cur.Close(ctx) var batch []*CompanyBase batchSize := 200 num := 0 for cur.Next(ctx) { num++ var raw bson.M if err := cur.Decode(&raw); err != nil { continue } if num%1000 == 0 { log.Info("main", zap.Any("current", num), zap.Any("_id", raw["_id"]), zap.Any("company_name", raw["company_name"])) } company_status := util.ObjToString(raw["company_status"]) if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") { continue } entity, companyID := mapMongoToCompanyBase(raw) if companyID == "" { continue } hash := hashCompanyID(companyID) if bitmapStore.Contains(hash) { continue } bitmapStore.Add(hash) batch = append(batch, entity) if len(batch) >= batchSize { if err := MysqlDB.Create(&batch).Error; err != nil { return err } batch = batch[:0] } } if len(batch) > 0 { if err := MysqlDB.Create(&batch).Error; err != nil { return err } } return nil }