wcc 1 月之前
父節點
當前提交
d995418879
共有 13 個文件被更改,包括 1542 次插入32 次删除
  1. 二進制
      getEs/getQyxyNationToFiles
  2. 6 3
      getEs/main.go
  3. 178 0
      getEs/qyxy.go
  4. 157 0
      getEs/qyxy_nation_type.go
  5. 9 0
      getEs/tools.go
  6. 127 2
      mongodb-test/main.go
  7. 13 6
      qyxy-mysql/config.toml
  8. 89 0
      qyxy-mysql/entity.go
  9. 4 0
      qyxy-mysql/main.go
  10. 707 0
      qyxy-mysql/test.go
  11. 197 1
      qyxy-mysql/tools.go
  12. 55 20
      updateBidding/main.go
  13. 二進制
      updateBidding/updateBidding

二進制
getEs/getQyxyNationToFiles


+ 6 - 3
getEs/main.go

@@ -34,12 +34,15 @@ func InitMgo() {
 }
 
 func main() {
+
+	getQyxyNationToFiles()
+	//exportQyxy() //导出企业数据
 	//dealXlsx()
-	//getQyxyNation()
+	//getQyxyNation() //导出 国标行业分类,注册资金靠前的企业
 	//getQyxyNation()
 
-	InitMgo()
-	getDataFromFile()
+	//InitMgo()
+	//getDataFromFile()
 	//updateXlsxDa()
 
 	return

+ 178 - 0
getEs/qyxy.go

@@ -1,17 +1,195 @@
 package main
 
 import (
+	"bytes"
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/elastic/go-elasticsearch/v7"
 	"github.com/olivere/elastic/v7"
+	"github.com/xuri/excelize/v2"
 	"io"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"log"
 	"strings"
+	"time"
 )
 
+// exportQyxy 导出企业数据
+func exportQyxy() {
+
+	type Hit struct {
+		ID     string                 `json:"_id"`
+		Source map[string]interface{} `json:"_source"`
+	}
+	type ESResponse struct {
+		ScrollID string `json:"_scroll_id"`
+		Hits     struct {
+			Hits []Hit `json:"hits"`
+		} `json:"hits"`
+	}
+
+	// 1. 配置 ES 连接信息
+	url := "http://172.17.4.184:19908"
+	username := "jybid"
+	password := "Top2023_JEB01i@31"
+	cfg := elasticsearch.Config{
+		Addresses: []string{
+			url, // 替换为你的 ES 地址
+		},
+		Username: username, // 替换为你的用户名
+		Password: password, // 替换为你的密码
+	}
+	es, err := elasticsearch.NewClient(cfg)
+	if err != nil {
+		log.Fatalf("Error creating the client: %s", err)
+	}
+
+	scrollTime := 2 * time.Minute
+	indexName := "qyxy"
+	batchSize := 5000
+	maxRowsPerFile := 500000
+
+	headers := []string{
+		"_id", "cancel_date", "company_name", "history_name", "cancel_reason",
+		"company_type", "business_scope", "legal_person", "capital", "credit_no",
+		"tax_code", "company_code", "org_code", "establish_date", "authority",
+		"issue_date", "company_area", "company_city", "company_district",
+		"company_phone", "company_address", "company_email", "employee_num",
+	}
+
+	query := map[string]interface{}{
+		"size":             batchSize,
+		"track_total_hits": true,
+		"query": map[string]interface{}{
+			"bool": map[string]interface{}{
+				"must": []interface{}{
+					map[string]interface{}{
+						"range": map[string]interface{}{
+							"cancel_date_unix": map[string]interface{}{
+								"gte": 1704038400,
+							},
+						},
+					},
+					map[string]interface{}{
+						"terms": map[string]interface{}{
+							"company_status": []string{"注销", "吊销"},
+						},
+					},
+					map[string]interface{}{
+						"term": map[string]interface{}{
+							"company_area": "河南",
+						},
+					},
+				},
+			},
+		},
+		"_source": headers,
+	}
+
+	var buf bytes.Buffer
+	if err := json.NewEncoder(&buf).Encode(query); err != nil {
+		log.Fatalf("query encode failed: %v", err)
+	}
+
+	res, err := es.Search(
+		es.Search.WithContext(context.Background()),
+		es.Search.WithIndex(indexName),
+		es.Search.WithBody(&buf),
+		es.Search.WithScroll(scrollTime),
+	)
+	if err != nil {
+		log.Fatalf("initial scroll search error: %v", err)
+	}
+	defer res.Body.Close()
+
+	// 状态追踪变量
+	scrollID := ""
+	fileIndex := 1
+	rowNum := 2
+	totalCount := 0
+
+	f := excelize.NewFile()
+	sheet := "Sheet1"
+
+	writeHeader := func(f *excelize.File) {
+		for i, h := range headers {
+			cell, _ := excelize.CoordinatesToCellName(i+1, 1)
+			f.SetCellValue(sheet, cell, h)
+		}
+	}
+
+	writeHeader(f)
+
+	for {
+		var r ESResponse
+		if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
+			log.Fatalf("decode error: %v", err)
+		}
+		if len(r.Hits.Hits) == 0 {
+			break
+		}
+		scrollID = r.ScrollID
+
+		for _, hit := range r.Hits.Hits {
+			for col, h := range headers {
+				cell, _ := excelize.CoordinatesToCellName(col+1, rowNum)
+				val := hit.Source[h]
+				if h == "_id" {
+					val = hit.ID
+				}
+				f.SetCellValue(sheet, cell, val)
+			}
+			rowNum++
+			totalCount++
+
+			// 满 50 万行,保存并开启新文件
+			if rowNum > maxRowsPerFile {
+				filename := fmt.Sprintf("qyxy_export_%d.xlsx", fileIndex)
+				if err := f.SaveAs(filename); err != nil {
+					log.Fatalf("failed to save file %s: %v", filename, err)
+				}
+				fmt.Printf("✅ 导出文件 [%s] 完成,累计 %d 条\n", filename, totalCount)
+
+				fileIndex++
+				rowNum = 2
+				f = excelize.NewFile()
+				writeHeader(f)
+			}
+		}
+
+		fmt.Printf("当前已处理 %d 条...\n", totalCount)
+
+		// 拉下一页
+		res, err = es.Scroll(
+			es.Scroll.WithScrollID(scrollID),
+			es.Scroll.WithScroll(scrollTime),
+		)
+		if err != nil {
+			log.Fatalf("scroll next error: %v", err)
+		}
+		defer res.Body.Close()
+	}
+
+	// 保存最后一批文件
+	if rowNum > 2 {
+		filename := fmt.Sprintf("qyxy_export_%d.xlsx", fileIndex)
+		if err := f.SaveAs(filename); err != nil {
+			log.Fatalf("failed to save final file %s: %v", filename, err)
+		}
+		fmt.Printf("✅ 最后文件 [%s] 完成,累计 %d 条\n", filename, totalCount)
+	}
+
+	// 清理 scroll
+	if scrollID != "" {
+		_, _ = es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))
+	}
+
+	fmt.Println("🎉 全部导出完成,总数:", totalCount)
+
+}
+
 // getGD 获取广东企业数据
 func getGD() {
 	url := "http://172.17.4.184:19908"

+ 157 - 0
getEs/qyxy_nation_type.go

@@ -10,6 +10,8 @@ import (
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"log"
 	"math"
+	"os"
+	"regexp"
 )
 
 type Company struct {
@@ -23,6 +25,39 @@ var (
 	indexName = "qyxy"
 )
 
+func getQyxyNationToFiles() {
+	url := "http://172.17.4.184:19908"
+	//url := "http://127.0.0.1:19908"
+	username := "jybid"
+	password := "Top2023_JEB01i@31"
+	// 创建 Elasticsearch 客户端
+	client, err := elastic.NewClient(
+		elastic.SetURL(url),
+		elastic.SetBasicAuth(username, password),
+		elastic.SetSniff(false),
+	)
+
+	if err != nil {
+		log.Fatalf("Elasticsearch 连接失败: %v", err)
+	}
+
+	//nationalSubs := []string{"石油和天然气开采业", "文教、工美、体育和娱乐用品制造业", "石油、煤炭及其他燃料加工业", "化学原料和化学制品制造业",
+	//	"医药制造业", "通用设备制造业", "专用设备制造业", "汽车制造业", "电信、广播电视和卫星传输服务", "互联网和相关服务", "软件和信息技术服务业", "货币金融服务",
+	//	"资本市场服务", "保险业", "其他金融业", "其他金融业", "教育", "新闻和出版业","生态保护和环境治理业"}
+
+	nationalSubs := []string{"生态保护和环境治理业"}
+	outputDir := "output"
+	// 确保输出目录存在
+	os.MkdirAll(outputDir, 0755)
+	err = exportSelectedGroupsToFiles(client, "national_sub", nationalSubs, outputDir)
+	if err != nil {
+		log.Fatalf("导出失败: %v", err)
+	}
+
+	log.Println("🎉 所有分组导出完成")
+}
+
+// getQyxyNation getQyxyNation
 func getQyxyNation() {
 	url := "http://172.17.4.184:19908"
 	//url := "http://127.0.0.1:19908"
@@ -550,6 +585,128 @@ func writeHeaders(f *excelize.File, sheetName string, headers []string) {
 	}
 }
 
+const maxRowsPerSheet = 1048576
+
+// exportSelectedGroupsToFiles 导出多文件
+func exportSelectedGroupsToFiles(client *elastic.Client, groupField string, groupValues []string, outputDir string) error {
+	ctx := context.Background()
+	headers := []string{groupField, "company_name", "capital", "company_type", "company_status"}
+
+	for _, groupVal := range groupValues {
+		// 构造子查询
+		subQuery := elastic.NewBoolQuery().
+			Must(
+				elastic.NewTermQuery("company_status", "存续"),
+				elastic.NewTermQuery(groupField, groupVal),
+			).
+			MustNot(elastic.NewTermQuery("company_type", "个体工商户")).
+			Must(elastic.NewExistsQuery("capital"))
+
+		// 获取该分组的总数
+		countRes, err := client.Count(indexName).Query(subQuery).Do(ctx)
+		if err != nil || countRes == 0 {
+			log.Printf("跳过分组 %s(无数据或查询失败)\n", groupVal)
+			continue
+		}
+		size := int(math.Max(1, float64(countRes)*0.2))
+
+		sourceCtx := elastic.NewFetchSourceContext(true).Include(
+			"company_name", "capital", "company_type", "company_status", groupField,
+		)
+
+		scroll := client.Scroll(indexName).
+			Query(subQuery).
+			SortWithInfo(elastic.SortInfo{
+				Field:     "capital",
+				Ascending: false,
+				Missing:   "_last",
+			}).
+			Size(500).
+			FetchSourceContext(sourceCtx)
+		defer scroll.Clear(ctx)
+
+		part := 1
+		row := 2
+		f := excelize.NewFile()
+		sheetName := "数据"
+		f.NewSheet(sheetName)
+		writeHeaders(f, sheetName, headers)
+
+		fetched := 0
+
+		for fetched < size {
+			res, err := scroll.Do(ctx)
+			if err == io.EOF {
+				break
+			}
+			if err != nil {
+				log.Printf("scroll 查询失败 group=%v, err=%v\n", groupVal, err)
+				break
+			}
+
+			for _, hit := range res.Hits.Hits {
+				if fetched >= size {
+					break
+				}
+
+				if row > maxRowsPerSheet {
+					// 超出最大行数,保存当前文件,开启新文件
+					filename := fmt.Sprintf("%s/%s_top5%%_part%d.xlsx", outputDir, sanitizeFileName(groupVal), part)
+					f.DeleteSheet("Sheet1")
+					if err := f.SaveAs(filename); err != nil {
+						log.Printf("❌ 保存文件失败 [%s]: %v\n", filename, err)
+					} else {
+						log.Printf("✅ 已保存:%s(%d 条)", filename, row-2)
+					}
+
+					// 重置
+					part++
+					row = 2
+					f = excelize.NewFile()
+					f.NewSheet(sheetName)
+					writeHeaders(f, sheetName, headers)
+				}
+
+				var c map[string]interface{}
+				if err := json.Unmarshal(hit.Source, &c); err != nil {
+					continue
+				}
+				data := []interface{}{
+					groupVal,
+					c["company_name"],
+					c["capital"],
+					c["company_type"],
+					c["company_status"],
+				}
+				for col, val := range data {
+					cell, _ := excelize.CoordinatesToCellName(col+1, row)
+					f.SetCellValue(sheetName, cell, val)
+				}
+				row++
+				fetched++
+			}
+		}
+
+		// 保存最后一个文件
+		if row > 2 {
+			filename := fmt.Sprintf("%s/%s_top5%%_part%d.xlsx", outputDir, sanitizeFileName(groupVal), part)
+			f.DeleteSheet("Sheet1")
+			if err := f.SaveAs(filename); err != nil {
+				log.Printf("❌ 保存文件失败 [%s]: %v\n", filename, err)
+			} else {
+				log.Printf("✅ 完成导出:%s(%d 条)", filename, row-2)
+			}
+		}
+	}
+	return nil
+}
+
+func sanitizeFileName(name string) string {
+	// 替换非法文件名字符
+	re := regexp.MustCompile(`[\\/:*?"<>|]`)
+	return re.ReplaceAllString(name, "_")
+}
+
 func dealXlsx() {
 	log.Println("开始处理数据")
 	// 打开原始文件

+ 9 - 0
getEs/tools.go

@@ -130,3 +130,12 @@ func ZpAI1(apiKey, model, name string, businessScope string) (rest map[string]in
 	}
 	return
 }
+
+func contains(list []string, target string) bool {
+	for _, item := range list {
+		if item == target {
+			return true
+		}
+	}
+	return false
+}

+ 127 - 2
mongodb-test/main.go

@@ -9,14 +9,16 @@ import (
 	"go.mongodb.org/mongo-driver/mongo/options"
 	"go.mongodb.org/mongo-driver/mongo/readpref"
 	"log"
+	"time"
 )
 
 func main() {
-	getHot()
+	qqqq()
+	//getHot()
 	//QlmChannel()
 }
 
-//QlmChannel 千里马 channel
+// QlmChannel 千里马 channel
 func QlmChannel() {
 	type Item struct {
 		ID          primitive.ObjectID `bson:"_id,omitempty"`
@@ -109,3 +111,126 @@ func QlmChannel() {
 
 	fmt.Println("Data inserted into wcc_qlm collection.")
 }
+
+// MongoConfig 保存连接配置
+type MongoConfig struct {
+	URI      string
+	Database string
+	Username string
+	Password string
+	Direct   bool
+}
+
+func connectMongo(cfg MongoConfig) (*mongo.Client, error) {
+	//clientOpts := options.Client().ApplyURI(cfg.URI)
+	clientOpts := options.Client().
+		ApplyURI(cfg.URI).
+		SetDirect(cfg.Direct) // 设置为直连模式,适合单节点连接
+	if cfg.Username != "" && cfg.Password != "" {
+		cred := options.Credential{
+			Username: cfg.Username,
+			Password: cfg.Password,
+		}
+		clientOpts.SetAuth(cred)
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+	client, err := mongo.Connect(ctx, clientOpts)
+	if err != nil {
+		return nil, err
+	}
+	return client, nil
+}
+
+// qqqq 迁移MongoDB
+func qqqq() {
+	srcCfg := MongoConfig{
+		URI:      "mongodb://127.0.0.1:27001",
+		Database: "mixdata",
+		Username: "",
+		Password: "",
+		Direct:   true,
+	}
+	dstCfg := MongoConfig{
+		URI:      "mongodb://172.20.45.129:27002",
+		Database: "mixdata",
+		Username: "",
+		Password: "",
+		//Direct:   true,
+	}
+
+	// 连接源和目标Mongo
+	srcClient, err := connectMongo(srcCfg)
+	if err != nil {
+		log.Fatalf("连接源Mongo失败: %v", err)
+	}
+	defer srcClient.Disconnect(context.Background())
+
+	dstClient, err := connectMongo(dstCfg)
+	if err != nil {
+		log.Fatalf("连接目标Mongo失败: %v", err)
+	}
+	defer dstClient.Disconnect(context.Background())
+
+	srcDB := srcClient.Database(srcCfg.Database)
+	dstDB := dstClient.Database(dstCfg.Database)
+
+	// 获取源库集合列表
+	ctx := context.Background()
+	//collections, err := srcDB.ListCollectionNames(ctx, bson.M{})
+	collections := []string{"special_trade_union"}
+	if err != nil {
+		log.Fatalf("获取集合列表失败: %v", err)
+	}
+	fmt.Printf("发现 %d 个集合,将逐个迁移\n", len(collections))
+
+	batchSize := 1000 // 批量大小
+
+	for _, collName := range collections {
+		fmt.Printf("迁移集合: %s\n", collName)
+
+		srcColl := srcDB.Collection(collName)
+		dstColl := dstDB.Collection(collName)
+
+		// 查询全部数据
+		cursor, err := srcColl.Find(ctx, bson.M{})
+		if err != nil {
+			log.Printf("查询集合 %s 失败: %v", collName, err)
+			continue
+		}
+
+		batchDocs := make([]interface{}, 0, batchSize)
+		count := 0
+		for cursor.Next(ctx) {
+			var doc bson.M
+			if err := cursor.Decode(&doc); err != nil {
+				log.Printf("解析文档失败: %v", err)
+				continue
+			}
+
+			batchDocs = append(batchDocs, doc)
+			if len(batchDocs) >= batchSize {
+				if err := insertBatch(ctx, dstColl, batchDocs); err != nil {
+					log.Printf("批量写入失败: %v", err)
+				}
+				count += len(batchDocs)
+				log.Println("current:", collName, count)
+				batchDocs = batchDocs[:0]
+			}
+		}
+		// 插入剩余文档
+		if len(batchDocs) > 0 {
+			if err := insertBatch(ctx, dstColl, batchDocs); err != nil {
+				log.Printf("批量写入失败: %v", err)
+			}
+			count += len(batchDocs)
+		}
+		fmt.Printf("集合 %s 迁移完成,导入 %d 条数据\n", collName, count)
+		cursor.Close(ctx)
+	}
+}
+
+func insertBatch(ctx context.Context, coll *mongo.Collection, docs []interface{}) error {
+	_, err := coll.InsertMany(ctx, docs)
+	return err
+}

+ 13 - 6
qyxy-mysql/config.toml

@@ -17,14 +17,21 @@
 
 
 [mysql] ## 数据库MySQL
-    address = "127.0.0.1:4001" ## 本地
-#address = "172.17.162.29:14000" ## 线上
-    dbname=   "qyxy" ##
-    username = "wangchengcheng"
-    password= "Wcc#20221209P"
-    prefix = "" ## mysql 数据表前缀
+#    address = "127.0.0.1:4001" ## 本地
+##address = "172.17.162.29:14000" ## 线上
+#    dbname=   "qyxy" ##
+#    username = "wangchengcheng"
+#    password= "Wcc#20221209P"
+#    prefix = "" ## mysql 数据表前缀
 
 
+## 测试环境
+address = "172.20.45.129:4000" ## 本地
+#address = "172.17.162.29:14000" ## 线上
+dbname=   "qyxy" ##
+username = "root"
+password= "=PDT49#80Z!RVv52_z"
+prefix = "" ## mysql 数据表前缀
 
 
 ## 日志

+ 89 - 0
qyxy-mysql/entity.go

@@ -997,3 +997,92 @@ type CompanyBaseClean struct {
 func (CompanyBaseClean) TableName() string {
 	return "company_base_clean"
 }
+
+// CompanyEmployee 工商-主要人员
+type CompanyEmployee struct {
+	ID               int64      `gorm:"column:id;primaryKey;autoIncrement" json:"id"`
+	CompanyID        string     `gorm:"column:company_id;type:char(32);not null;uniqueIndex:uk_company_id" json:"company_id"`                      // 主体唯一键
+	EmployeeName     string     `gorm:"column:employee_name;type:varchar(255);not null;index:idx_employee_name" json:"employee_name"`              // 高管名称
+	Position         string     `gorm:"column:position;type:varchar(200);not null;index:idx_position" json:"position"`                             // 职位
+	IsHistory        *int8      `gorm:"column:is_history;type:tinyint;default:0" json:"is_history"`                                                // 状态(0: 有效,1: 历史)
+	UseFlag          *int8      `gorm:"column:use_flag;type:tinyint;default:0" json:"use_flag"`                                                    // 使用标记
+	CreateTime       *time.Time `gorm:"column:create_time;type:datetime;default:CURRENT_TIMESTAMP" json:"create_time"`                             // 入库时间
+	UpdateTime       *time.Time `gorm:"column:update_time;type:datetime;default:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" json:"update_time"` // 更新时间
+	Education        *string    `gorm:"column:education;type:varchar(20)" json:"education"`                                                        // 学历
+	Age              *string    `gorm:"column:age;type:varchar(20)" json:"age"`                                                                    // 年龄
+	Sex              *string    `gorm:"column:sex;type:varchar(20)" json:"sex"`                                                                    // 性别
+	PersonID         *string    `gorm:"column:person_id;type:char(32)" json:"person_id"`                                                           // 高管id
+	EmployeeNameType *int8      `gorm:"column:employee_name_type;type:tinyint" json:"employee_name_type"`                                          // 高管名称类型(0自然人,1非自然人)
+}
+
+// 表名绑定
+func (CompanyEmployee) TableName() string {
+	return "company_employee"
+}
+
+// CompanyPartner 工商-股东信息
+type CompanyPartner struct {
+	ID               int64      `gorm:"column:id;primaryKey;autoIncrement" json:"id"`
+	CompanyID        string     `gorm:"column:company_id;type:char(32);not null;uniqueIndex:uk_company_id" json:"company_id"`                      // 主体唯一键
+	StockName        string     `gorm:"column:stock_name;type:varchar(255);not null;index:idx_stock_name" json:"stock_name"`                       // 股东名称
+	CompanyName      *string    `gorm:"column:company_name;type:varchar(255);index:idx_company_name" json:"company_name"`                          // 企业名称
+	StockType        *string    `gorm:"column:stock_type;type:varchar(30)" json:"stock_type"`                                                      // 股东类型
+	IsPersonal       *int8      `gorm:"column:is_personal;type:tinyint" json:"is_personal"`                                                        // 0机构;1自然人
+	StockNameID      *string    `gorm:"column:stock_name_id;type:char(32)" json:"stock_name_id"`                                                   // 企业股东id
+	IdentifyType     *string    `gorm:"column:identify_type;type:varchar(50);index:idx_identify_type" json:"identify_type"`                        // 股东证件类型
+	IdentifyNo       *string    `gorm:"column:identify_no;type:varchar(50)" json:"identify_no"`                                                    // 股东证件号码
+	StockCapital     *string    `gorm:"column:stock_capital;type:varchar(20)" json:"stock_capital"`                                                // 应缴出资额
+	StockRealCapital *string    `gorm:"column:stock_realcapital;type:varchar(20);index:idx_stock_realcapital" json:"stock_realcapital"`            // 实缴出资额
+	StockProportion  *float64   `gorm:"column:stock_proportion;type:double(10,6)" json:"stock_proportion"`                                         // 股权占比
+	InvestType       *string    `gorm:"column:invest_type;type:varchar(20)" json:"invest_type"`                                                    // 应缴出资方式
+	InvestRealType   *string    `gorm:"column:invest_realtype;type:varchar(20)" json:"invest_realtype"`                                            // 实缴出资方式
+	UseFlag          *int8      `gorm:"column:use_flag;type:tinyint;default:0" json:"use_flag"`                                                    // 使用标记
+	IsHistory        *int8      `gorm:"column:is_history;type:tinyint;default:0" json:"is_history"`                                                // 状态
+	CreateTime       *time.Time `gorm:"column:create_time;type:datetime;default:CURRENT_TIMESTAMP" json:"create_time"`                             // 入库时间
+	UpdateTime       *time.Time `gorm:"column:update_time;type:datetime;default:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" json:"update_time"` // 更新时间
+}
+
+// TableName specifies the table name for GORM
+func (CompanyPartner) TableName() string {
+	return "company_partner"
+}
+
+// CompanyPartnerPay 工商-股东应缴明细
+type CompanyPartnerPay struct {
+	ID           int64      `gorm:"column:id;primaryKey;autoIncrement" json:"id"`                                                              // 自增主键
+	CompanyID    string     `gorm:"column:company_id;type:char(32);not null;uniqueIndex:uk_company_id" json:"company_id"`                      // 主体唯一键
+	StockName    string     `gorm:"column:stock_name;type:varchar(255);not null;index:idx_stock_name" json:"stock_name"`                       // 股东名称
+	PayRecord    string     `gorm:"column:pay_record;type:char(32);not null;index:idx_pay_record" json:"pay_record"`                           // 应缴记录(出资日期+方式)
+	StockCapital *string    `gorm:"column:stock_capital;type:varchar(20)" json:"stock_capital"`                                                // 应缴出资额
+	InvestType   *string    `gorm:"column:invest_type;type:varchar(20)" json:"invest_type"`                                                    // 应缴出资方式
+	StockDate    *time.Time `gorm:"column:stock_date;type:date" json:"stock_date"`                                                             // 应缴出资日期
+	UseFlag      *int8      `gorm:"column:use_flag;type:tinyint;default:0" json:"use_flag"`                                                    // 使用标记
+	IsHistory    *int8      `gorm:"column:is_history;type:tinyint;default:0" json:"is_history"`                                                // 状态(0: 有效, 1: 历史)
+	CreateTime   *time.Time `gorm:"column:create_time;type:datetime;default:CURRENT_TIMESTAMP" json:"create_time"`                             // 入库时间
+	UpdateTime   *time.Time `gorm:"column:update_time;type:datetime;default:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" json:"update_time"` // 更新时间
+}
+
+// TableName sets the insert table name for this struct type
+func (CompanyPartnerPay) TableName() string {
+	return "company_partner_pay"
+}
+
+// CompanyPartnerRealPay 工商-股东实缴明细
+type CompanyPartnerRealPay struct {
+	ID               int64      `gorm:"column:id;primaryKey;autoIncrement" json:"id"`                                                              // 自增主键
+	CompanyID        string     `gorm:"column:company_id;type:char(32);not null;uniqueIndex:uk_company_id" json:"company_id"`                      // 主体唯一键
+	StockName        string     `gorm:"column:stock_name;type:varchar(255);not null;index:idx_stock_name" json:"stock_name"`                       // 股东名称
+	RealpayRecord    string     `gorm:"column:realpay_record;type:char(32);not null;index:idx_realpay_record" json:"realpay_record"`               // 实缴记录(出资日期+方式)
+	StockRealCapital *string    `gorm:"column:stock_realcapital;type:varchar(20)" json:"stock_realcapital"`                                        // 实缴出资额
+	InvestRealType   *string    `gorm:"column:invest_realtype;type:varchar(20)" json:"invest_realtype"`                                            // 实缴出资方式
+	StockRealDate    *time.Time `gorm:"column:stock_realdate;type:date" json:"stock_realdate"`                                                     // 实缴出资日期
+	UseFlag          *int8      `gorm:"column:use_flag;type:tinyint;default:0" json:"use_flag"`                                                    // 使用标记
+	IsHistory        *int8      `gorm:"column:is_history;type:tinyint;default:0" json:"is_history"`                                                // 状态(0: 有效, 1: 历史)
+	CreateTime       *time.Time `gorm:"column:create_time;type:datetime;default:CURRENT_TIMESTAMP" json:"create_time"`                             // 入库时间
+	UpdateTime       *time.Time `gorm:"column:update_time;type:datetime;default:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" json:"update_time"` // 更新时间
+}
+
+// TableName sets the insert table name for this struct type
+func (CompanyPartnerRealPay) TableName() string {
+	return "company_partner_realpay"
+}

+ 4 - 0
qyxy-mysql/main.go

@@ -14,6 +14,10 @@ var (
 
 func main() {
 	Init()
+
+	dealTestData()
+
+	return
 	log.Info("main", zap.String("SyncCompanyBaseToMySQL", "开始处理企业基本信息"))
 	err := SyncCompanyBaseToMySQL()
 	if err != nil {

+ 707 - 0
qyxy-mysql/test.go

@@ -0,0 +1,707 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+	"log"
+)
+
+//处理测试环境,处理一批测试数据
+
+func dealTestData() {
+	// 获取 company_id 列表
+	var companyIDs []string
+	if err := MysqlDB.Table("company_base").Select("company_id").Find(&companyIDs).Error; err != nil {
+		log.Fatalf("查询 company_id 失败: %v", err)
+	}
+
+	host := GF.Mongoqy.Host
+	username := GF.Mongoqy.Username
+	password := GF.Mongoqy.Password
+	authSource := "admin" // 通常是 "admin",也可以是你的数据库名
+	// 构造 MongoDB URI
+	var mongoURI string
+	if username != "" && password != "" {
+		mongoURI = fmt.Sprintf("mongodb://%s:%s@%s/?authSource=%s", username, password, host, authSource)
+	} else {
+		mongoURI = fmt.Sprintf("mongodb://%s", host)
+	}
+
+	clientOptions := options.Client().ApplyURI(mongoURI)
+	//clientOptions.SetReadPreference(readpref.Primary())
+	//clientOptions.SetDirect(true)
+	// 连接MongoDB
+	client, err := mongo.Connect(context.Background(), clientOptions)
+	if err != nil {
+		log.Println(err)
+	}
+
+	mongoColl := client.Database("mixdata").Collection("company_change")
+	ctx := context.Background()
+	// 分批从 MongoDB 查询并写入 MySQL
+	batchSize := 500
+	for i := 0; i < len(companyIDs); i += batchSize {
+		end := i + batchSize
+		if end > len(companyIDs) {
+			end = len(companyIDs)
+		}
+
+		// 构造查询
+		filter := bson.M{"company_id": bson.M{"$in": companyIDs[i:end]}}
+
+		cursor, err := mongoColl.Find(ctx, filter)
+		if err != nil {
+			log.Fatalf("MongoDB 查询失败: %v", err)
+		}
+		defer cursor.Close(ctx)
+
+		//var results []AnnualReportBase
+		//var results []CompanyIndustry
+		//var results []CompanyHistoryName
+		var results []CompanyChange
+
+		for cursor.Next(ctx) {
+			var m map[string]interface{}
+			if err := cursor.Decode(&m); err != nil {
+				log.Printf("解码单条数据失败: %v", err)
+				continue
+			}
+			//if report := ConvertAnnualReport(m); report != nil {
+			//	results = append(results, *report)
+			//}
+			//if report := ConvertCompanyIndustry(m); report != nil {
+			//	results = append(results, *report)
+			//}
+			//if report := ConvertCompanyHistoryName(m); report != nil {
+			//	results = append(results, *report)
+			//}
+
+			if report := ConvertCompanyChange(m); report != nil {
+				results = append(results, *report)
+			}
+
+		}
+
+		if err := cursor.Err(); err != nil {
+			log.Fatalf("遍历 cursor 时发生错误: %v", err)
+		}
+
+		if len(results) > 0 {
+			if err := MysqlDB.Create(&results).Error; err != nil {
+				log.Fatalf("写入 MySQL annual_report_base 失败: %v", err)
+			}
+			fmt.Printf("成功插入 %d 条记录\n", len(results))
+		} else {
+			fmt.Println("没有符合条件的数据写入")
+		}
+	}
+
+	fmt.Println("完成数据迁移")
+
+}
+
+// ConvertAnnualReport ConvertAnnualReport
+func ConvertAnnualReport(m map[string]interface{}) *AnnualReportBase {
+	return &AnnualReportBase{
+		CompanyID:             fmt.Sprint(m["company_id"]),
+		CreditNo:              strPtr(m["credit_no"]),
+		CompanyName:           strPtr(m["company_name"]),
+		CompanyCode:           strPtr(m["company_code"]),
+		ReportYear:            parseInt16(m["report_year"]),
+		OperatorName:          strPtr(m["operator_name"]),
+		CompanyStatus:         strPtr(m["company_status"]),
+		CompanyAddress:        strPtr(m["company_address"]),
+		BusinessScope:         strPtr(m["business_scope"]),
+		CompanyPhone:          strPtr(m["company_phone"]),
+		CompanyEmail:          strPtr(m["company_email"]),
+		ZipCode:               strPtr(m["zip_code"]),
+		EmployeeNo:            strPtr(m["employee_no"]),
+		WomenEmployeeNo:       strPtr(m["women_employee_no"]),
+		MemberNo:              strPtr(m["member_no"]),
+		MemberFarmerNo:        strPtr(m["member_farmer_no"]),
+		MemberIncreaseNo:      strPtr(m["member_increase_no"]),
+		MemberOutNo:           strPtr(m["member_out_no"]),
+		CompanyHolding:        strPtr(m["company_holding"]),
+		HasInvest:             strPtr(m["has_invest"]),
+		HasGuarantees:         strPtr(m["has_guarantees"]),
+		StockSell:             strPtr(m["stock_sell"]),
+		SubjectionCreditNo:    strPtr(m["subjection_credit_no"]),
+		SubjectionCompanyName: strPtr(m["subjection_company_name"]),
+		HasWebsite:            strPtr(m["has_website"]),
+		ReportDate:            parseDate(m["report_date"]),
+		UseFlag:               getInt8Ptr(m["use_flag"]),
+		CreateTime:            fallbackNow(parseDate(m["create_time"])),
+		UpdateTime:            fallbackNow(parseDate(m["update_time"])),
+	}
+}
+
+func ConvertCompanyIndustry(m map[string]interface{}) *CompanyIndustry {
+	return &CompanyIndustry{
+		CompanyID:      fmt.Sprint(m["company_id"]),
+		Industry:       strVal(m["industry"]),
+		IndustryL1Code: strVal(m["industry_l1_code"]),
+		IndustryL1Name: strVal(m["industry_l1_name"]),
+		IndustryL2Code: strVal(m["industry_l2_code"]),
+		IndustryL2Name: strVal(m["industry_l2_name"]),
+		IndustryL3Code: strVal(m["industry_l3_code"]),
+		IndustryL3Name: strVal(m["industry_l3_name"]),
+		IndustryL4Code: strVal(m["industry_l4_code"]),
+		IndustryL4Name: strVal(m["industry_l4_name"]),
+		UseFlag:        getInt8(m["use_flag"]),
+		CreateTime:     fallbackNow(parseDate(m["create_time"])),
+		UpdateTime:     fallbackNow(parseDate(m["update_time"])),
+	}
+}
+
+func ConvertCompanyHistoryName(m map[string]interface{}) *CompanyHistoryName {
+	return &CompanyHistoryName{
+		CompanyID:   strVal(m["company_id"]),
+		HistoryName: strVal(m["history_name"]),
+		StartDate:   parseDate(m["start_date"]),
+		EndDate:     parseDate(m["end_date"]),
+		UseFlag:     getInt8Ptr(m["use_flag"]),
+		ChangeDate:  parseDate(m["change_date"]),
+		CreateTime:  fallbackNow(parseDate(m["create_time"])),
+		UpdateTime:  fallbackNow(parseDate(m["update_time"])),
+	}
+}
+
+func ConvertCompanyChange(m map[string]interface{}) *CompanyChange {
+	return &CompanyChange{
+		CompanyID:     strVal(m["company_id"]),
+		ChangeDate:    parseDate(m["change_date"]),
+		ChangeType:    strVal(m["change_type"]),
+		ChangeField:   strVal(m["change_field"]),
+		ContentBefore: strVal(m["content_before"]),
+		ContentAfter:  strVal(m["content_after"]),
+		UseFlag:       getInt8Ptr(m["use_flag"]),
+		ChangeRecord:  strVal(m["change_record"]),
+		CreateTime:    fallbackNow(parseDate(m["create_time"])),
+		UpdateTime:    fallbackNow(parseDate(m["update_time"])),
+	}
+}
+
+func ConvertCompanyBranch(m map[string]interface{}) *CompanyBranch {
+	return &CompanyBranch{
+		CompanyID:       strVal(m["company_id"]),
+		BranchCompanyID: strPtr(m["branch_company_id"]),
+		BranchName:      strVal(m["branch_name"]),
+		BranchCreditNo:  strPtr(m["branch_credit_no"]),
+		BranchCode:      strPtr(m["branch_code"]),
+		LegalPerson:     strPtr(m["legal_person"]),
+		CompanyStatus:   strPtr(m["company_status"]),
+		NCompanyStatus:  strPtr(m["n_company_status"]),
+		EstablishDate:   parseDate(m["establish_date"]),
+		Authority:       strPtr(m["authority"]),
+		CancelDate:      parseDate(m["cancel_date"]),
+		RevokeDate:      parseDate(m["revoke_date"]),
+		UseFlag:         getInt8Ptr(m["use_flag"]),
+		CreateTime:      fallbackNow(parseDate(m["create_time"])),
+		UpdateTime:      fallbackNow(parseDate(m["update_time"])),
+	}
+}
+
+func ConvertAnnualReportWebsite(m map[string]interface{}) *AnnualReportWebsite {
+	return &AnnualReportWebsite{
+		CompanyID:   strVal(m["company_id"]),
+		ReportYear:  int16Val(m["report_year"]),
+		WebsiteURL:  strVal(m["website_url"]),
+		WebsiteName: strPtr(m["website_name"]),
+		WebsiteType: strPtr(m["website_type"]),
+		IsHistory:   getInt8Ptr(m["is_history"]),
+		UseFlag:     getInt8Ptr(m["use_flag"]),
+		URLMD5:      strVal(m["url_md5"]),
+		CreateTime:  parseDate(m["create_time"]),
+		UpdateTime:  parseDate(m["update_time"]),
+	}
+}
+
+func ConvertAnnualReportPartner(m map[string]interface{}) *AnnualReportPartner {
+	return &AnnualReportPartner{
+		CompanyID:        strVal(m["company_id"]),
+		ReportYear:       int16Val(m["report_year"]),
+		StockName:        strVal(m["stock_name"]),
+		IsPersonal:       getInt8Ptr(m["is_personal"]),
+		StockNameID:      strPtr(m["stock_name_id"]),
+		StockCapital:     strPtr(m["stock_capital"]),
+		StockDate:        parseDate(m["stock_date"]),
+		InvestType:       strPtr(m["invest_type"]),
+		StockRealCapital: strPtr(m["stock_real_capital"]),
+		StockRealDate:    parseDate(m["stock_real_date"]),
+		InvestRealType:   strPtr(m["invest_real_type"]),
+		IsHistory:        getInt8Ptr(m["is_history"]),
+		UseFlag:          getInt8Ptr(m["use_flag"]),
+		PartnerRecord:    strVal(m["partner_record"]),
+		CreateTime:       parseDate(m["create_time"]),
+		UpdateTime:       parseDate(m["update_time"]),
+	}
+}
+
+// ConvertAnnualReportInvest 企业年报对外投资信息表
+func ConvertAnnualReportInvest(m map[string]interface{}) *AnnualReportInvest {
+	return &AnnualReportInvest{
+		CompanyID:      strVal(m["company_id"]),
+		ReportYear:     int16Val(m["report_year"]),
+		CreditNo:       strPtr(m["credit_no"]),
+		InvesteeName:   strVal(m["investee_name"]),
+		InvesteeNameID: strPtr(m["investee_name_id"]),
+		InvesteeCode:   strPtr(m["investee_code"]),
+		IsHistory:      getInt8Ptr(m["is_history"]),
+		UseFlag:        getInt8Ptr(m["use_flag"]),
+		CreateTime:     parseDate(m["create_time"]),
+		UpdateTime:     parseDate(m["update_time"]),
+	}
+}
+
+// ConvertAnnualReportSocialSecurity 企业年报社保信息表
+func ConvertAnnualReportSocialSecurity(m map[string]interface{}) *AnnualReportSocialSecurity {
+	return &AnnualReportSocialSecurity{
+		CompanyID:            strVal(m["company_id"]),
+		ReportYear:           int16Val(m["report_year"]),
+		InsuranceName:        strVal(m["insurance_name"]),
+		InsuranceAmount:      strPtr(m["insurance_amount"]),
+		InsuranceBase:        strPtr(m["insurance_base"]),
+		InsuranceRealCapital: strPtr(m["insurance_real_capital"]),
+		InsuranceArrearage:   strPtr(m["insurance_arrearage"]),
+		UseFlag:              getInt8Ptr(m["use_flag"]),
+		CreateTime:           parseDate(m["create_time"]),
+		UpdateTime:           parseDate(m["update_time"]),
+	}
+}
+
+// ConvertAnnualReportEquityChange 企业年报股权变更信息表
+func ConvertAnnualReportEquityChange(m map[string]interface{}) *AnnualReportEquityChange {
+	return &AnnualReportEquityChange{
+		CompanyID:        strVal(m["company_id"]),
+		ReportYear:       int16Val(m["report_year"]),
+		StockName:        strVal(m["stock_name"]),
+		ChangeDate:       parseDate(m["change_date"]),
+		ProportionBefore: strPtr(m["proportion_change_before"]),
+		ProportionAfter:  strPtr(m["proportion_change_after"]),
+		IsHistory:        getInt8Ptr(m["is_history"]),
+		UseFlag:          getInt8Ptr(m["use_flag"]),
+		ChangeRecord:     strVal(m["change_record"]),
+		CreateTime:       parseDate(m["create_time"]),
+		UpdateTime:       parseDate(m["update_time"]),
+	}
+}
+
+// ConvertAnnualReportChange 企业年报变更信息表
+func ConvertAnnualReportChange(m map[string]interface{}) *AnnualReportChange {
+	return &AnnualReportChange{
+		CompanyID:     strVal(m["company_id"]),
+		ReportYear:    int16Val(m["report_year"]),
+		ChangeDate:    parseDate(m["change_date"]),
+		ChangeField:   strPtr(m["change_field"]),
+		ContentBefore: strPtr(m["content_before"]),
+		ContentAfter:  strPtr(m["content_after"]),
+		UseFlag:       getInt8Ptr(m["use_flag"]),
+		ChangeRecord:  strVal(m["change_record"]),
+		CreateTime:    parseDate(m["create_time"]),
+		UpdateTime:    parseDate(m["update_time"]),
+	}
+}
+
+// ConvertAnnualReportGuarantee 企业年报对外提供保证担保信息表
+func ConvertAnnualReportGuarantee(m map[string]interface{}) *AnnualReportGuarantee {
+	return &AnnualReportGuarantee{
+		CompanyID:       strVal(m["company_id"]),
+		ReportYear:      int16Val(m["report_year"]),
+		Creditor:        strPtr(m["creditor"]),
+		Debtor:          strPtr(m["debtor"]),
+		DebtType:        strPtr(m["debt_type"]),
+		GuaranteeAmount: strPtr(m["guarantee_amount"]),
+		PerformTime:     strPtr(m["perform_time"]),
+		GuaranteeTerm:   strPtr(m["guarantee_term"]),
+		GuaranteeType:   strPtr(m["guarantee_type"]),
+		GuaranteeScope:  strPtr(m["guarantee_scope"]),
+		IsHistory:       getInt8Ptr(m["is_history"]),
+		UseFlag:         getInt8Ptr(m["use_flag"]),
+		GuaranteeRecord: strVal(m["guarantee_record"]),
+		CreateTime:      parseDate(m["create_time"]),
+		UpdateTime:      parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyAllow 行政许可信息表
+func ConvertCompanyAllow(m map[string]interface{}) *CompanyAllow {
+	return &CompanyAllow{
+		CompanyID:      strVal(m["company_id"]),
+		AllowCode:      strPtr(m["allow_code"]),
+		AllowFilename:  strPtr(m["allow_filename"]),
+		AllowContent:   strPtr(m["allow_content"]),
+		AllowStartDate: strPtr(m["allow_startdate"]),
+		AllowEndDate:   strPtr(m["allow_enddate"]),
+		AllowAuthority: strPtr(m["allow_authority"]),
+		IsHistory:      getInt8Ptr(m["is_history"]),
+		UseFlag:        getInt8Ptr(m["use_flag"]),
+		AllowRecord:    strVal(m["allow_record"]),
+		CreateTime:     parseDate(m["create_time"]),
+		UpdateTime:     parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyAbnormal 经营异常记录表
+func ConvertCompanyAbnormal(m map[string]interface{}) *CompanyAbnormal {
+	return &CompanyAbnormal{
+		CompanyID:         strVal(m["company_id"]),
+		IncludedDate:      parseDate(m["included_date"]),
+		IncludedReason:    strPtr(m["included_reason"]),
+		IncludedAuthority: strPtr(m["included_authority"]),
+		RemovedDate:       parseDate(m["removed_date"]),
+		RemovedReason:     strPtr(m["removed_reason"]),
+		RemovedAuthority:  strPtr(m["removed_authority"]),
+		UseFlag:           getInt8Ptr(m["use_flag"]),
+		IsHistory:         getInt8Ptr(m["is_history"]),
+		AbnormalRecord:    strVal(m["abnormal_record"]),
+		CreateTime:        parseDate(m["create_time"]),
+		UpdateTime:        parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyIllegal 企业严重违法信息表
+func ConvertCompanyIllegal(m map[string]interface{}) *CompanyIllegal {
+	return &CompanyIllegal{
+		CompanyID:         strVal(m["company_id"]),
+		IncludedDate:      parseDate(m["included_date"]),
+		IllegalType:       strPtr(m["illegal_type"]),
+		IncludedReason:    strPtr(m["included_reason"]),
+		IncludedAuthority: strPtr(m["included_authority"]),
+		RemovedDate:       parseDate(m["removed_date"]),
+		RemovedReason:     strPtr(m["removed_reason"]),
+		RemovedAuthority:  strPtr(m["removed_authority"]),
+		IsHistory:         getInt8Ptr(m["is_history"]),
+		UseFlag:           getInt8Ptr(m["use_flag"]),
+		IllegalRecord:     strVal(m["illegal_record"]),
+		CreateTime:        parseDate(m["create_time"]),
+		UpdateTime:        parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyCheck 企业抽查检查信息表
+func ConvertCompanyCheck(m map[string]interface{}) *CompanyCheck {
+	return &CompanyCheck{
+		CompanyID:   strVal(m["company_id"]),
+		CheckDate:   parseDate(m["check_date"]),
+		CheckType:   strPtr(m["check_type"]),
+		CheckResult: strPtr(m["check_result"]),
+		Authority:   strPtr(m["authority"]),
+		IsHistory:   getInt8Ptr(m["is_history"]),
+		UseFlag:     getInt8Ptr(m["use_flag"]),
+		CheckRecord: strVal(m["check_record"]),
+		CreateTime:  parseDate(m["create_time"]),
+		UpdateTime:  parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyPledge 企业股权出质信息表
+func ConvertCompanyPledge(m map[string]interface{}) *CompanyPledge {
+	return &CompanyPledge{
+		CompanyID:         strVal(m["company_id"]),
+		ProvinceShort:     strPtr(m["province_short"]),
+		PledgeCode:        strPtr(m["pledge_code"]),
+		Pledgor:           strPtr(m["pledgor"]),
+		PledgorIdentifyNo: strPtr(m["pledgor_identify_no"]),
+		PledgorNameID:     strPtr(m["pledgor_name_id"]),
+		PledgorIsPersonal: getInt8Ptr(m["pledgor_is_personal"]),
+		Pawnee:            strPtr(m["pawnee"]),
+		PawneeIdentifyNo:  strPtr(m["pawnee_identify_no"]),
+		PawneeNameID:      strPtr(m["pawnee_name_id"]),
+		PawneeIsPersonal:  getInt8Ptr(m["pawnee_is_personal"]),
+		PledgeEquity:      strPtr(m["pledge_equity"]),
+		PledgeDate:        parseDate(m["pledge_date"]),
+		PledgeStatus:      strPtr(m["pledge_status"]),
+		PublicDate:        parseDate(m["public_date"]),
+		RevokeDate:        parseDate(m["revoke_date"]),
+		RevokeReason:      strPtr(m["revoke_reason"]),
+		InvalidDate:       parseDate(m["invalid_date"]),
+		InvalidReason:     strPtr(m["invalid_reason"]),
+		UseFlag:           getInt8Ptr(m["use_flag"]),
+		IsHistory:         getInt8Ptr(m["is_history"]),
+		PledgeRecord:      strVal(m["pledge_record"]),
+		CreateTime:        parseDate(m["create_time"]),
+		UpdateTime:        parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyPledgeExtend 企业股权出质扩展信息表
+func ConvertCompanyPledgeExtend(m map[string]interface{}) *CompanyPledgeExtend {
+	return &CompanyPledgeExtend{
+		CompanyID:    strVal(m["company_id"]),
+		PledgeRecord: strVal(m["pledge_record"]),
+		EquityAmount: strPtr(m["equity_amount"]),
+		EquityUnit:   strPtr(m["equity_unit"]),
+		UseFlag:      getInt8Ptr(m["use_flag"]),
+		CreateTime:   parseDate(m["create_time"]),
+		UpdateTime:   parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyPunish 企业行政处罚信息表
+func ConvertCompanyPunish(m map[string]interface{}) *CompanyPunish {
+	return &CompanyPunish{
+		CompanyID:        strVal(m["company_id"]),
+		PunishDate:       parseDate(m["punish_date"]),
+		PunishCode:       strPtr(m["punish_code"]),
+		IllegalType:      strPtr(m["illegal_type"]),
+		PunishContent:    strPtr(m["punish_content"]),
+		IllegalFact:      strPtr(m["illegal_fact"]),
+		PunishType:       strPtr(m["punish_type"]),
+		PunishAmount:     strPtr(m["punish_amount"]),
+		AmountForfeiture: strPtr(m["amount_forfeiture"]),
+		PunishValidity:   strPtr(m["punish_validity"]),
+		PublicDate:       parseDate(m["public_date"]),
+		PublicDeadline:   strPtr(m["public_deadline"]),
+		PunishBasis:      strPtr(m["punish_basis"]),
+		Authority:        strPtr(m["authority"]),
+		RevokeNameCode:   strPtr(m["revoke_name_code"]),
+		Mark:             strPtr(m["mark"]),
+		IsHistory:        getInt8Ptr(m["is_history"]),
+		UseFlag:          getInt8Ptr(m["use_flag"]),
+		PunishRecord:     strVal(m["punish_record"]),
+		CreateTime:       parseDate(m["create_time"]),
+		UpdateTime:       parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyIntellectual 企业知识产权出质信息表
+func ConvertCompanyIntellectual(m map[string]interface{}) *CompanyIntellectual {
+	return &CompanyIntellectual{
+		CompanyID:            strVal(m["company_id"]),
+		PublicDate:           parseDate(m["public_date"]),
+		IntellectualCode:     strPtr(m["intellectual_code"]),
+		Pledgor:              strPtr(m["pledgor"]),
+		IntellectualName:     strPtr(m["intellectual_name"]),
+		IntellectualType:     strPtr(m["intellectual_type"]),
+		Pledgee:              strPtr(m["pledgee"]),
+		IntellectualStatus:   strPtr(m["intellectual_status"]),
+		IntellectualDeadline: strPtr(m["intellectual_deadline"]),
+		CancelDate:           parseDate(m["cancel_date"]),
+		CancelReason:         strPtr(m["cancel_reason"]),
+		IsHistory:            getInt8Ptr(m["is_history"]),
+		UseFlag:              getInt8Ptr(m["use_flag"]),
+		IntellectualRecord:   strVal(m["intellectual_record"]),
+		CreateTime:           parseDate(m["create_time"]),
+		UpdateTime:           parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyIntellectualChange 知识产权出质变更信息表
+func ConvertCompanyIntellectualChange(m map[string]interface{}) *CompanyIntellectualChange {
+	return &CompanyIntellectualChange{
+		CompanyID:          strVal(m["company_id"]),
+		ChangeDate:         parseDate(m["change_date"]),
+		ChangeField:        strPtr(m["change_field"]),
+		ContentBefore:      strPtr(m["content_before"]),
+		ContentAfter:       strPtr(m["content_after"]),
+		ChangeRecord:       strVal(m["change_record"]),
+		IsHistory:          getInt8Ptr(m["is_history"]),
+		UseFlag:            getInt8Ptr(m["use_flag"]),
+		IntellectualRecord: strVal(m["intellectual_record"]),
+		CreateTime:         parseDate(m["create_time"]),
+		UpdateTime:         parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyChattel 动产抵押登记信息表
+func ConvertCompanyChattel(m map[string]interface{}) *CompanyChattel {
+	return &CompanyChattel{
+		CompanyID:      strVal(m["company_id"]),
+		ChattelDate:    parseDate(m["chattel_date"]),
+		ChattelCode:    strVal(m["chattel_code"]),
+		DebtType:       strPtr(m["debt_type"]),
+		DebtAmount:     strPtr(m["debt_amount"]),
+		GuaranteeScope: strPtr(m["guarantee_scope"]),
+		DebtTerm:       strPtr(m["debt_term"]),
+		Remark:         strPtr(m["remark"]),
+		ChattelStatus:  strPtr(m["chattel_status"]),
+		Authority:      strPtr(m["authority"]),
+		PublicDate:     parseDate(m["public_date"]),
+		RevokeDate:     parseDate(m["revoke_date"]),
+		RevokeReason:   strPtr(m["revoke_reason"]),
+		IsHistory:      getInt8Ptr(m["is_history"]),
+		UseFlag:        getInt8Ptr(m["use_flag"]),
+		CreateTime:     parseDate(m["create_time"]),
+		UpdateTime:     parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyChattelChange 动产抵押变更信息表
+func ConvertCompanyChattelChange(m map[string]interface{}) *CompanyChattelChange {
+	return &CompanyChattelChange{
+		CompanyID:     strVal(m["company_id"]),
+		ChangeDate:    parseDate(m["change_date"]),
+		ChattelCode:   strPtr(m["chattel_code"]),
+		ChangeContent: strPtr(m["change_content"]),
+		IsHistory:     getInt8Ptr(m["is_history"]),
+		UseFlag:       getInt8Ptr(m["use_flag"]),
+		ChangeRecord:  strVal(m["change_record"]),
+		CreateTime:    parseDate(m["create_time"]),
+		UpdateTime:    parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyChattelMortgage 动产抵押权人信息表
+func ConvertCompanyChattelMortgage(m map[string]interface{}) *CompanyChattelMortgage {
+	return &CompanyChattelMortgage{
+		CompanyID:             strVal(m["company_id"]),
+		ChattelCode:           strPtr(m["chattel_code"]),
+		Mortgagee:             strPtr(m["mortgagee"]),
+		MortgageeIdentifyType: strPtr(m["mortgagee_identify_type"]),
+		MortgageeIdentifyNo:   strPtr(m["mortgagee_identify_no"]),
+		MortgageeAddress:      strPtr(m["mortgagee_address"]),
+		IsHistory:             getInt8Ptr(m["is_history"]),
+		UseFlag:               getInt8Ptr(m["use_flag"]),
+		CreateTime:            parseDate(m["create_time"]),
+		UpdateTime:            parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyChattelPawn 动产抵押物信息表
+func ConvertCompanyChattelPawn(m map[string]interface{}) *CompanyChattelPawn {
+	return &CompanyChattelPawn{
+		CompanyID:   strVal(m["company_id"]),
+		ChattelCode: strPtr(m["chattel_code"]),
+		PawnNo:      getIntPtr(m["pawn_no"]),
+		PawnName:    strPtr(m["pawn_name"]),
+		PawnInfo:    strPtr(m["pawn_info"]),
+		PawnOwner:   strPtr(m["pawn_owner"]),
+		Remark:      strPtr(m["remark"]),
+		IsHistory:   getInt8Ptr(m["is_history"]),
+		UseFlag:     getInt8Ptr(m["use_flag"]),
+		CreateTime:  parseDate(m["create_time"]),
+		UpdateTime:  parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyJustice 司法协助信息表
+func ConvertCompanyJustice(m map[string]interface{}) *CompanyJustice {
+	return &CompanyJustice{
+		ID:            intVal(m["id"]),
+		CompanyID:     strVal(m["company_id"]),
+		DocNo:         strPtr(m["doc_no"]),
+		Executee:      strPtr(m["executee"]),
+		EquityAmount:  strPtr(m["equity_amount"]),
+		EquityStatus:  strPtr(m["equity_status"]),
+		ExecCourt:     strPtr(m["exec_court"]),
+		IsHistory:     getInt8Ptr(m["is_history"]),
+		UseFlag:       getInt8Ptr(m["use_flag"]),
+		JusticeRecord: strVal(m["justice_record"]),
+		CreateTime:    parseDate(m["create_time"]),
+		UpdateTime:    parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyJusticeEquityChange 司法协助股权变更信息表
+func ConvertCompanyJusticeEquityChange(m map[string]interface{}) *CompanyJusticeEquityChange {
+	return &CompanyJusticeEquityChange{
+		CompanyID:            strVal(m["company_id"]),
+		ExecDate:             parseDate(m["exec_date"]),
+		DocNo:                strPtr(m["doc_no"]),
+		Executee:             strPtr(m["executee"]),
+		ExecuteeIdentifyType: strPtr(m["executee_identify_type"]),
+		ExecuteeIdentifyNo:   strPtr(m["executee_identify_no"]),
+		ExecItem:             strPtr(m["exec_item"]),
+		EquityAmount:         strPtr(m["equity_amount"]),
+		Accepter:             strPtr(m["accepter"]),
+		AccepterIdentifyType: strPtr(m["accepter_identify_type"]),
+		AccepterIdentifyNo:   strPtr(m["accepter_identify_no"]),
+		ExecCourt:            strPtr(m["exec_court"]),
+		ExecNo:               strPtr(m["exec_no"]),
+		IsHistory:            getInt8Ptr(m["is_history"]),
+		UseFlag:              getInt8Ptr(m["use_flag"]),
+		JusticeRecord:        strVal(m["justice_record"]),
+		CreateTime:           parseDate(m["create_time"]),
+		UpdateTime:           parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyJusticeFreeze 司法协助冻结信息表
+func ConvertCompanyJusticeFreeze(m map[string]interface{}) *CompanyJusticeFreeze {
+	return &CompanyJusticeFreeze{
+		CompanyID:            strVal(m["company_id"]),
+		PublicDate:           parseDate(m["public_date"]),
+		DocNo:                strPtr(m["doc_no"]),
+		Executee:             strPtr(m["executee"]),
+		ExecuteeIdentifyType: strPtr(m["executee_identify_type"]),
+		ExecuteeIdentifyNo:   strPtr(m["executee_identify_no"]),
+		ExecItem:             strPtr(m["exec_item"]),
+		EquityAmount:         strPtr(m["equity_amount"]),
+		FreezeStartDate:      parseDate(m["freeze_start_date"]),
+		FreezeEndDate:        parseDate(m["freeze_end_date"]),
+		FreezeYears:          strPtr(m["freeze_years"]),
+		ExecCourt:            strPtr(m["exec_court"]),
+		ExecNo:               strPtr(m["exec_no"]),
+		IsHistory:            getInt8Ptr(m["is_history"]),
+		UseFlag:              getInt8Ptr(m["use_flag"]),
+		JusticeRecord:        strVal(m["justice_record"]),
+		CreateTime:           parseDate(m["create_time"]),
+		UpdateTime:           parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyJusticeInvalid 司法协助失效信息表
+func ConvertCompanyJusticeInvalid(m map[string]interface{}) *CompanyJusticeInvalid {
+	return &CompanyJusticeInvalid{
+		CompanyID:     strVal(m["company_id"]),
+		InvalidDate:   parseDate(m["invalid_date"]),
+		InvalidReason: strPtr(m["invalid_reason"]),
+		IsHistory:     getInt8Ptr(m["is_history"]),
+		UseFlag:       getInt8Ptr(m["use_flag"]),
+		JusticeRecord: strVal(m["justice_record"]),
+		CreateTime:    parseDate(m["create_time"]),
+		UpdateTime:    parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyJusticeUnfreeze 司法协助解除冻结信息表
+func ConvertCompanyJusticeUnfreeze(m map[string]interface{}) *CompanyJusticeUnfreeze {
+	return &CompanyJusticeUnfreeze{
+		CompanyID:                    strVal(m["company_id"]),
+		UnfreezeDate:                 parseDate(m["unfreeze_date"]),
+		UnfreezeDocNo:                strPtr(m["unfreeze_doc_no"]),
+		UnfreezeExecutee:             strPtr(m["unfreeze_executee"]),
+		UnfreezeExecuteeIdentifyType: strPtr(m["unfreeze_executee_identify_type"]),
+		UnfreezeExecuteeIdentifyNo:   strPtr(m["unfreeze_executee_identify_no"]),
+		UnfreezeExecItem:             strPtr(m["unfreeze_exec_item"]),
+		UnfreezeEquityAmount:         strPtr(m["unfreeze_equity_amount"]),
+		UnfreezePublicDate:           parseDate(m["unfreeze_public_date"]),
+		UnfreezeExecCourt:            strPtr(m["unfreeze_exec_court"]),
+		UnfreezeExecNo:               strPtr(m["unfreeze_exec_no"]),
+		IsHistory:                    getInt8Ptr(m["is_history"]),
+		UseFlag:                      getInt8Ptr(m["use_flag"]),
+		JusticeRecord:                strVal(m["justice_record"]),
+		CreateTime:                   parseDate(m["create_time"]),
+		UpdateTime:                   parseDate(m["update_time"]),
+	}
+}
+
+// ConvertCompanyJusticeKeepFreeze 司法协助续行冻结信息表
+func ConvertCompanyJusticeKeepFreeze(m map[string]interface{}) *CompanyJusticeKeepFreeze {
+	return &CompanyJusticeKeepFreeze{
+		CompanyID:            strVal(m["company_id"]),
+		PublicDate:           parseDate(m["public_date"]),
+		DocNo:                strPtr(m["doc_no"]),
+		Executee:             strPtr(m["executee"]),
+		ExecuteeIdentifyType: strPtr(m["executee_identify_type"]),
+		ExecuteeIdentifyNo:   strPtr(m["executee_identify_no"]),
+		ExecItem:             strPtr(m["exec_item"]),
+		EquityAmount:         strPtr(m["equity_amount"]),
+		FreezeStartDate:      parseDate(m["freeze_start_date"]),
+		FreezeEndDate:        parseDate(m["freeze_end_date"]),
+		FreezeYears:          strPtr(m["freeze_years"]),
+		ExecCourt:            strPtr(m["exec_court"]),
+		ExecNo:               strPtr(m["exec_no"]),
+		JusticeRecord:        strVal(m["justice_record"]),
+		IsHistory:            getInt8Ptr(m["is_history"]),
+		UseFlag:              getInt8Ptr(m["use_flag"]),
+		KeepFreezeRecord:     strVal(m["keep_freeze_record"]),
+		CreateTime:           parseDate(m["create_time"]),
+		UpdateTime:           parseDate(m["update_time"]),
+	}
+}
+
+//

+ 197 - 1
qyxy-mysql/tools.go

@@ -1,9 +1,205 @@
 package main
 
-import "hash/fnv"
+import (
+	"hash/fnv"
+	"strconv"
+	"time"
+)
 
 func hashCompanyID(companyID string) uint32 {
 	h := fnv.New32a()
 	h.Write([]byte(companyID))
 	return h.Sum32()
 }
+
+// parseDate 尝试解析时间字符串,支持 "2006-01-02 15:04:05" 和 "2006-01-02"
+func parseDate(v interface{}) *time.Time {
+	if str, ok := v.(string); ok && str != "" {
+		t, err := time.Parse("2006-01-02 15:04:05", str)
+		if err == nil {
+			return &t
+		}
+		t, err = time.Parse("2006-01-02", str)
+		if err == nil {
+			return &t
+		}
+	}
+	return nil
+}
+
+// strVal 返回字符串值,类型不是字符串或为空时返回空字符串
+func strVal(v interface{}) string {
+	if s, ok := v.(string); ok {
+		return s
+	}
+	return ""
+}
+
+func strPtr(v interface{}) *string {
+	if s, ok := v.(string); ok && s != "" {
+		return &s
+	}
+	return nil
+}
+
+// fallbackNow 如果时间指针非空,返回其值,否则返回当前时间
+func fallbackNow(t *time.Time) time.Time {
+	if t != nil {
+		return *t
+	}
+	return time.Now()
+}
+
+func parseInt16(v interface{}) int16 {
+	switch t := v.(type) {
+	case int32:
+		return int16(t)
+	case int:
+		return int16(t)
+	case float64:
+		return int16(t)
+	case int64:
+		return int16(t)
+	case string:
+		if i, err := strconv.Atoi(t); err == nil {
+			return int16(i)
+		}
+	}
+	return 0
+}
+
+func getInt8Ptr(v interface{}) *int8 {
+	if v == nil {
+		return nil
+	}
+	switch val := v.(type) {
+	case int8:
+		return &val
+	case int:
+		tmp := int8(val)
+		return &tmp
+	case int32:
+		tmp := int8(val)
+		return &tmp
+	case int64:
+		tmp := int8(val)
+		return &tmp
+	case float64:
+		tmp := int8(val)
+		return &tmp
+	case string:
+		if i, err := strconv.Atoi(val); err == nil {
+			tmp := int8(i)
+			return &tmp
+		}
+	}
+	return nil
+}
+
+// getInt8 将多种类型转换为 int8,无法转换时返回 0
+func getInt8(v interface{}) int8 {
+	switch t := v.(type) {
+	case int:
+		return int8(t)
+	case int32:
+		return int8(t)
+	case int64:
+		return int8(t)
+	case float64:
+		return int8(t)
+	case string:
+		if i, err := strconv.Atoi(t); err == nil {
+			return int8(i)
+		}
+	}
+	return 0
+}
+
+func int16Val(v interface{}) int16 {
+	if v == nil {
+		return 0
+	}
+	switch val := v.(type) {
+	case int:
+		return int16(val)
+	case int64:
+		return int16(val)
+	case float64:
+		return int16(val)
+	case string:
+		if i, err := strconv.Atoi(val); err == nil {
+			return int16(i)
+		}
+	}
+	return 0
+}
+
+func getIntPtr(v interface{}) *int {
+	switch val := v.(type) {
+	case int:
+		return &val
+	case int8:
+		x := int(val)
+		return &x
+	case int16:
+		x := int(val)
+		return &x
+	case int32:
+		x := int(val)
+		return &x
+	case int64:
+		x := int(val)
+		return &x
+	case float32:
+		x := int(val)
+		return &x
+	case float64:
+		x := int(val)
+		return &x
+	case string:
+		if i, err := strconv.Atoi(val); err == nil {
+			return &i
+		}
+	}
+	return nil
+}
+
+func intVal(v interface{}) int {
+	switch val := v.(type) {
+	case int:
+		return val
+	case int8:
+		return int(val)
+	case int16:
+		return int(val)
+	case int32:
+		return int(val)
+	case int64:
+		return int(val)
+	case uint:
+		return int(val)
+	case uint8:
+		return int(val)
+	case uint16:
+		return int(val)
+	case uint32:
+		return int(val)
+	case uint64:
+		return int(val)
+	case float32:
+		return int(val)
+	case float64:
+		return int(val)
+	case string:
+		if i, err := strconv.Atoi(val); err == nil {
+			return i
+		}
+	case []byte:
+		if i, err := strconv.Atoi(string(val)); err == nil {
+			return i
+		}
+	case nil:
+		return 0
+	}
+	return 0
+}

+ 55 - 20
updateBidding/main.go

@@ -12,6 +12,7 @@ import (
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"reflect"
+	"strings"
 	"time"
 )
 
@@ -39,16 +40,16 @@ var (
 )
 
 func Init() {
-	MgoB = &mongodb.MongodbSim{
-		MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
-		//MongodbAddr: "127.0.0.1:27083",
-		DbName:   "qfw",
-		Size:     10,
-		UserName: "SJZY_RWbid_ES",
-		Password: "SJZY@B4i4D5e6S",
-		//Direct:      true,
-	}
-	MgoB.InitPool()
+	//MgoB = &mongodb.MongodbSim{
+	//	MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
+	//	//MongodbAddr: "127.0.0.1:27083",
+	//	DbName:   "qfw",
+	//	Size:     10,
+	//	UserName: "SJZY_RWbid_ES",
+	//	Password: "SJZY@B4i4D5e6S",
+	//	//Direct:      true,
+	//}
+	//MgoB.InitPool()
 
 	//MgoBAi = &mongodb.MongodbSim{
 	//	//MongodbAddr: "172.17.189.140:27080",
@@ -74,14 +75,14 @@ func Init() {
 	//Mgo.InitPool()
 
 	//85
-	//MgoR = &mongodb.MongodbSim{
-	//	MongodbAddr: "127.0.0.1:27080",
-	//	//MongodbAddr: "172.17.4.85:27080",
-	//	DbName: "qfw",
-	//	Size:   10,
-	//	Direct: true,
-	//}
-	//MgoR.InitPool()
+	MgoR = &mongodb.MongodbSim{
+		//MongodbAddr: "127.0.0.1:27080",
+		MongodbAddr: "172.17.4.85:27080",
+		DbName:      "qfw",
+		Size:        10,
+		//Direct: true,
+	}
+	MgoR.InitPool()
 
 	//测试环境MongoDB
 	//MgoT = &mongodb.MongodbSim{
@@ -130,12 +131,12 @@ func main() {
 	Init()
 	//InitEsBiddingField()
 	//go updateMethod()   //更新mongodb
-	go updateEsMethod() //更新es
+	//go updateEsMethod() //更新es
 	//go updateEsHrefMethod() //更新es href 字段
 	//go updateProjectEsMethod()
 	//taskRunProject()
 	//taskRunBidding()
-	dealBidding() //正式环境bidding数据处理
+	//dealBidding() //正式环境bidding数据处理
 	//dealBiddingAi() //正式环境bidding数据处理
 	//dealBiddingTest() // 测试环境数据处理
 
@@ -143,11 +144,45 @@ func main() {
 	//dealBiddingNiJian() //更新拟建数据中buyer = owner
 	//updateBiddingBidamount()
 	//updateProject()
+	//-------------------------------//
+	fixBiddingEs()
 	log.Info("over")
 	c := make(chan bool, 1)
 	<-c
 }
 
+// fixBiddingEs 修复bidding 索引数据,
+func fixBiddingEs() {
+	defer util.Catch()
+	sess := MgoR.GetMgoConn()
+	defer MgoR.DestoryMongoConn(sess)
+
+	where := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte": mongodb.StringTOBsonId("6847fc265f834436f08ef4fe"),
+			"$lte": mongodb.StringTOBsonId("6848e42b5f834436f092f645"),
+		},
+	}
+
+	it := sess.DB("qfw").C("result_20220219").Find(where).Select(nil).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
+		if count%1000 == 0 {
+			log.Info("current", log.Int("count", count), log.Any("_id", tmp["_id"]))
+		}
+
+		biddingID := mongodb.BsonIdToSId(tmp["_id"])
+		repeat := util.IntAll(tmp["repeat"])
+		repeat_reason := util.ObjToString(tmp["repeat_reason"])
+		if repeat == 1 && strings.Contains(repeat_reason, "采集源重复") {
+			Es.DeleteByID("bidding", biddingID)
+			log.Info("fixBiddingEs", zap.String("biddingID", biddingID))
+			EsNew.DeleteByID("bidding", biddingID)
+		}
+
+	}
+}
+
 func InitEsBiddingField() {
 	now := time.Now()
 	info, _ := MgoB.Find("bidding_processing_field", `{"stype": "bidding"}`, nil, nil, false, -1, -1)

二進制
updateBidding/updateBidding