123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- package main
- import (
- "context"
- "fmt"
- "github.com/RoaringBitmap/roaring"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "os"
- "strings"
- "time"
- )
- type BitmapStore struct {
- bitmap *roaring.Bitmap
- path string
- }
- func NewBitmapStore(path string) (*BitmapStore, error) {
- b := roaring.New()
- if data, err := os.ReadFile(path); err == nil {
- if err := b.UnmarshalBinary(data); err != nil {
- return nil, err
- }
- }
- return &BitmapStore{bitmap: b, path: path}, nil
- }
- func (bs *BitmapStore) Contains(hash uint32) bool {
- return bs.bitmap.Contains(hash)
- }
- func (bs *BitmapStore) Add(hash uint32) {
- bs.bitmap.Add(hash)
- }
- func (bs *BitmapStore) Save() error {
- data, err := bs.bitmap.ToBytes()
- if err != nil {
- return err
- }
- return os.WriteFile(bs.path, data, 0644)
- }
- func mapMongoToCompanyBase(m map[string]interface{}) (*CompanyBase, string) {
- companyID, ok := m["company_id"].(string)
- if !ok || companyID == "" {
- return nil, ""
- }
- parseDate := func(v interface{}) *time.Time {
- if str, ok := v.(string); ok && str != "" {
- t, err := time.Parse("2006-01-02", str)
- if err == nil {
- return &t
- }
- }
- return nil
- }
- parseDateTime := func(v interface{}) *time.Time {
- if str, ok := v.(string); ok && str != "" {
- t, err := time.Parse("2006-01-02 15:04:05", str)
- if err == nil {
- return &t
- }
- }
- return nil
- }
- getInt8Ptr := func(v interface{}) *int8 {
- switch t := v.(type) {
- case int32:
- v := int8(t)
- return &v
- case float64:
- v := int8(t)
- return &v
- }
- return nil
- }
- return &CompanyBase{
- CompanyID: companyID,
- ProvinceShort: fmt.Sprint(m["province_short"]),
- CompanyName: fmt.Sprint(m["company_name"]),
- CompanyCode: fmt.Sprint(m["company_code"]),
- CreditNo: fmt.Sprint(m["credit_no"]),
- OrgCode: fmt.Sprint(m["org_code"]),
- TaxCode: fmt.Sprint(m["tax_code"]),
- EstablishDate: parseDate(m["establish_date"]),
- LegalPerson: fmt.Sprint(m["legal_person"]),
- LegalPersonCaption: fmt.Sprint(m["legal_person_caption"]),
- CompanyStatus: fmt.Sprint(m["company_status"]),
- CompanyType: fmt.Sprint(m["company_type"]),
- Authority: fmt.Sprint(m["authority"]),
- IssueDate: parseDate(m["issue_date"]),
- OperationStartDate: fmt.Sprint(m["operation_startdate"]),
- OperationEndDate: fmt.Sprint(m["operation_enddate"]),
- Capital: fmt.Sprint(m["capital"]),
- CompanyAddress: fmt.Sprint(m["company_address"]),
- BusinessScope: fmt.Sprint(m["business_scope"]),
- CancelDate: parseDate(m["cancel_date"]),
- CancelReason: fmt.Sprint(m["cancel_reason"]),
- RevokeDate: parseDate(m["revoke_date"]),
- RevokeReason: fmt.Sprint(m["revoke_reason"]),
- UseFlag: getInt8Ptr(m["use_flag"]),
- CreateTime: parseDateTime(m["create_time"]),
- UpdateTime: parseDateTime(m["update_time"]),
- LegalPersonType: getInt8Ptr(m["legal_person_type"]),
- RealCapital: fmt.Sprint(m["real_capital"]),
- EnName: fmt.Sprint(m["en_name"]),
- ListCode: fmt.Sprint(m["list_code"]),
- LegalPersonID: fmt.Sprint(m["legal_person_id"]),
- }, companyID
- }
- // SyncCompanyBaseToMySQL 同步 基本信息
- func SyncCompanyBaseToMySQL() error {
- ctx := context.Background()
- // Bitmap 路径默认当前目录
- bitmapFile := "./company_sync_bitmap.bin"
- // MongoDB
- //mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://172.17.4.181:27001"))
- //clientOptions := options.Client().ApplyURI("mongodb://172.17.4.181:27001")
- host := GF.Mongoqy.Host
- username := GF.Mongoqy.Username
- password := GF.Mongoqy.Password
- authSource := "admin" // 通常是 "admin",也可以是你的数据库名
- // 构造 MongoDB URI
- var mongoURI string
- if username != "" && password != "" {
- mongoURI = fmt.Sprintf("mongodb://%s:%s@%s/?authSource=%s", username, password, host, authSource)
- } else {
- mongoURI = fmt.Sprintf("mongodb://%s", host)
- }
- clientOptions := options.Client().ApplyURI(mongoURI)
- //clientOptions.SetReadPreference(readpref.Primary())
- //clientOptions.SetDirect(true)
- // 连接MongoDB
- client, err := mongo.Connect(context.Background(), clientOptions)
- if err != nil {
- log.Info("SyncCompanyBaseToMySQL", zap.Error(err))
- }
- // 检查连接
- err = client.Ping(context.Background(), nil)
- if err != nil {
- log.Info("SyncCompanyBaseToMySQL", zap.Error(err))
- }
- //mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://127.0.0.1:27001"))
- mongoColl := client.Database("mixdata").Collection("company_base")
- bitmapStore, err := NewBitmapStore(bitmapFile)
- if err != nil {
- return err
- }
- defer bitmapStore.Save()
- // 不是 个体工商户的数据
- filter := map[string]interface{}{
- "company_type": map[string]interface{}{
- "$nin": []string{"个体工商户", "个人独资企业"},
- },
- }
- cur, err := mongoColl.Find(ctx, filter)
- if err != nil {
- return err
- }
- defer cur.Close(ctx)
- var batch []*CompanyBase
- batchSize := 200
- num := 0
- for cur.Next(ctx) {
- num++
- var raw bson.M
- if err := cur.Decode(&raw); err != nil {
- continue
- }
- if num%1000 == 0 {
- log.Info("main", zap.Any("current", num), zap.Any("_id", raw["_id"]), zap.Any("company_name", raw["company_name"]))
- }
- company_status := util.ObjToString(raw["company_status"])
- if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
- continue
- }
- entity, companyID := mapMongoToCompanyBase(raw)
- if companyID == "" {
- continue
- }
- hash := hashCompanyID(companyID)
- if bitmapStore.Contains(hash) {
- continue
- }
- bitmapStore.Add(hash)
- batch = append(batch, entity)
- if len(batch) >= batchSize {
- if err := MysqlDB.Create(&batch).Error; err != nil {
- return err
- }
- batch = batch[:0]
- }
- }
- if len(batch) > 0 {
- if err := MysqlDB.Create(&batch).Error; err != nil {
- return err
- }
- }
- return nil
- }
|