all.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/RoaringBitmap/roaring"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. "go.uber.org/zap"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "os"
  13. "strings"
  14. "time"
  15. )
  16. type BitmapStore struct {
  17. bitmap *roaring.Bitmap
  18. path string
  19. }
  20. func NewBitmapStore(path string) (*BitmapStore, error) {
  21. b := roaring.New()
  22. if data, err := os.ReadFile(path); err == nil {
  23. if err := b.UnmarshalBinary(data); err != nil {
  24. return nil, err
  25. }
  26. }
  27. return &BitmapStore{bitmap: b, path: path}, nil
  28. }
  29. func (bs *BitmapStore) Contains(hash uint32) bool {
  30. return bs.bitmap.Contains(hash)
  31. }
  32. func (bs *BitmapStore) Add(hash uint32) {
  33. bs.bitmap.Add(hash)
  34. }
  35. func (bs *BitmapStore) Save() error {
  36. data, err := bs.bitmap.ToBytes()
  37. if err != nil {
  38. return err
  39. }
  40. return os.WriteFile(bs.path, data, 0644)
  41. }
  42. func mapMongoToCompanyBase(m map[string]interface{}) (*CompanyBase, string) {
  43. companyID, ok := m["company_id"].(string)
  44. if !ok || companyID == "" {
  45. return nil, ""
  46. }
  47. parseDate := func(v interface{}) *time.Time {
  48. if str, ok := v.(string); ok && str != "" {
  49. t, err := time.Parse("2006-01-02", str)
  50. if err == nil {
  51. return &t
  52. }
  53. }
  54. return nil
  55. }
  56. parseDateTime := func(v interface{}) *time.Time {
  57. if str, ok := v.(string); ok && str != "" {
  58. t, err := time.Parse("2006-01-02 15:04:05", str)
  59. if err == nil {
  60. return &t
  61. }
  62. }
  63. return nil
  64. }
  65. getInt8Ptr := func(v interface{}) *int8 {
  66. switch t := v.(type) {
  67. case int32:
  68. v := int8(t)
  69. return &v
  70. case float64:
  71. v := int8(t)
  72. return &v
  73. }
  74. return nil
  75. }
  76. return &CompanyBase{
  77. CompanyID: companyID,
  78. ProvinceShort: fmt.Sprint(m["province_short"]),
  79. CompanyName: fmt.Sprint(m["company_name"]),
  80. CompanyCode: fmt.Sprint(m["company_code"]),
  81. CreditNo: fmt.Sprint(m["credit_no"]),
  82. OrgCode: fmt.Sprint(m["org_code"]),
  83. TaxCode: fmt.Sprint(m["tax_code"]),
  84. EstablishDate: parseDate(m["establish_date"]),
  85. LegalPerson: fmt.Sprint(m["legal_person"]),
  86. LegalPersonCaption: fmt.Sprint(m["legal_person_caption"]),
  87. CompanyStatus: fmt.Sprint(m["company_status"]),
  88. CompanyType: fmt.Sprint(m["company_type"]),
  89. Authority: fmt.Sprint(m["authority"]),
  90. IssueDate: parseDate(m["issue_date"]),
  91. OperationStartDate: fmt.Sprint(m["operation_startdate"]),
  92. OperationEndDate: fmt.Sprint(m["operation_enddate"]),
  93. Capital: fmt.Sprint(m["capital"]),
  94. CompanyAddress: fmt.Sprint(m["company_address"]),
  95. BusinessScope: fmt.Sprint(m["business_scope"]),
  96. CancelDate: parseDate(m["cancel_date"]),
  97. CancelReason: fmt.Sprint(m["cancel_reason"]),
  98. RevokeDate: parseDate(m["revoke_date"]),
  99. RevokeReason: fmt.Sprint(m["revoke_reason"]),
  100. UseFlag: getInt8Ptr(m["use_flag"]),
  101. CreateTime: parseDateTime(m["create_time"]),
  102. UpdateTime: parseDateTime(m["update_time"]),
  103. LegalPersonType: getInt8Ptr(m["legal_person_type"]),
  104. RealCapital: fmt.Sprint(m["real_capital"]),
  105. EnName: fmt.Sprint(m["en_name"]),
  106. ListCode: fmt.Sprint(m["list_code"]),
  107. LegalPersonID: fmt.Sprint(m["legal_person_id"]),
  108. }, companyID
  109. }
  110. // SyncCompanyBaseToMySQL 同步 基本信息
  111. func SyncCompanyBaseToMySQL() error {
  112. ctx := context.Background()
  113. // Bitmap 路径默认当前目录
  114. bitmapFile := "./company_sync_bitmap.bin"
  115. // MongoDB
  116. //mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://172.17.4.181:27001"))
  117. //clientOptions := options.Client().ApplyURI("mongodb://172.17.4.181:27001")
  118. host := GF.Mongoqy.Host
  119. username := GF.Mongoqy.Username
  120. password := GF.Mongoqy.Password
  121. authSource := "admin" // 通常是 "admin",也可以是你的数据库名
  122. // 构造 MongoDB URI
  123. var mongoURI string
  124. if username != "" && password != "" {
  125. mongoURI = fmt.Sprintf("mongodb://%s:%s@%s/?authSource=%s", username, password, host, authSource)
  126. } else {
  127. mongoURI = fmt.Sprintf("mongodb://%s", host)
  128. }
  129. clientOptions := options.Client().ApplyURI(mongoURI)
  130. //clientOptions.SetReadPreference(readpref.Primary())
  131. //clientOptions.SetDirect(true)
  132. // 连接MongoDB
  133. client, err := mongo.Connect(context.Background(), clientOptions)
  134. if err != nil {
  135. log.Info("SyncCompanyBaseToMySQL", zap.Error(err))
  136. }
  137. // 检查连接
  138. err = client.Ping(context.Background(), nil)
  139. if err != nil {
  140. log.Info("SyncCompanyBaseToMySQL", zap.Error(err))
  141. }
  142. //mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://127.0.0.1:27001"))
  143. mongoColl := client.Database("mixdata").Collection("company_base")
  144. bitmapStore, err := NewBitmapStore(bitmapFile)
  145. if err != nil {
  146. return err
  147. }
  148. defer bitmapStore.Save()
  149. // 不是 个体工商户的数据
  150. filter := map[string]interface{}{
  151. "company_type": map[string]interface{}{
  152. "$nin": []string{"个体工商户", "个人独资企业"},
  153. },
  154. }
  155. cur, err := mongoColl.Find(ctx, filter)
  156. if err != nil {
  157. return err
  158. }
  159. defer cur.Close(ctx)
  160. var batch []*CompanyBase
  161. batchSize := 200
  162. num := 0
  163. for cur.Next(ctx) {
  164. num++
  165. var raw bson.M
  166. if err := cur.Decode(&raw); err != nil {
  167. continue
  168. }
  169. if num%1000 == 0 {
  170. log.Info("main", zap.Any("current", num), zap.Any("_id", raw["_id"]), zap.Any("company_name", raw["company_name"]))
  171. }
  172. company_status := util.ObjToString(raw["company_status"])
  173. if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
  174. continue
  175. }
  176. entity, companyID := mapMongoToCompanyBase(raw)
  177. if companyID == "" {
  178. continue
  179. }
  180. hash := hashCompanyID(companyID)
  181. if bitmapStore.Contains(hash) {
  182. continue
  183. }
  184. bitmapStore.Add(hash)
  185. batch = append(batch, entity)
  186. if len(batch) >= batchSize {
  187. if err := MysqlDB.Create(&batch).Error; err != nil {
  188. return err
  189. }
  190. batch = batch[:0]
  191. }
  192. }
  193. if len(batch) > 0 {
  194. if err := MysqlDB.Create(&batch).Error; err != nil {
  195. return err
  196. }
  197. }
  198. return nil
  199. }