Sfoglia il codice sorgente

法人库企业数据,协程处理

wangchengcheng 1 giorno fa
parent
commit
81d71ea2ae
1 ha cambiato i file con 210 aggiunte e 0 eliminazioni
  1. 210 0
      faren_tidb/all.go

+ 210 - 0
faren_tidb/all.go

@@ -7,6 +7,7 @@ import (
 	"errors"
 	"fmt"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/RoaringBitmap/roaring"
@@ -45,6 +46,9 @@ func dealAllFromCompanyBase() {
 		if util.IntAll(tmp["use_flag"]) > 0 {
 			continue
 		}
+		if util.ObjToString(tmp["company_type"]) == "事业单位" {
+			continue
+		}
 
 		var ent EntInfo
 		ent.CompanyID = util.ObjToString(tmp["company_id"])
@@ -189,6 +193,212 @@ func dealAllFromCompanyBase() {
 	}
 }
 
+// dealAllFromCompanyBase2  多协程批量数据
+func dealAllFromCompanyBase2() {
+	jlog.Info("dealAllFromCompanyBase", zap.String("开始处理", "-------企业库存量数据"))
+	defer util.Catch()
+	sess := MgoQY.GetMgoConn()
+	defer MgoQY.DestoryMongoConn(sess)
+
+	where := map[string]interface{}{
+		"company_type": map[string]interface{}{
+			"$ne": "个体工商户",
+		},
+	}
+	// channel 作为队列
+	jobCh := make(chan map[string]interface{}, 1000) // 缓冲队列
+	entCh := make(chan EntInfo, 1000)                // 结果队列
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// 启动 worker 处理数据
+	workerNum := 6 // 并发度可调
+	var wg sync.WaitGroup
+	for i := 0; i < workerNum; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for tmp := range jobCh {
+				ent, ok := processCompany(tmp)
+				if ok {
+					entCh <- ent
+				}
+			}
+		}()
+	}
+
+	// 启动一个写入 goroutine,专门负责批量写 DB
+	go func() {
+		batchSize := 100
+		ents := make([]EntInfo, 0, batchSize)
+		for ent := range entCh {
+			ents = append(ents, ent)
+			if len(ents) >= batchSize {
+				if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
+					jlog.Error("批量插入失败", zap.Error(err))
+				}
+				ents = ents[:0]
+			}
+		}
+		// flush
+		if len(ents) > 0 {
+			if err := MysqlDB.CreateInBatches(ents, batchSize).Error; err != nil {
+				jlog.Error("批量插入失败", zap.Error(err))
+			}
+		}
+	}()
+
+	// 主协程负责读 Mongo
+	it := sess.DB(GF.MongoQy.DB).C("company_base").Find(where).Sort("_id").Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
+		if count%1000 == 0 {
+			jlog.Info("dealAllFromCompanyBase", zap.Any("current:", count), zap.Any("company_name", tmp["company_name"]))
+		}
+		select {
+		case jobCh <- tmp:
+		case <-ctx.Done():
+			break
+		}
+	}
+	close(jobCh) // 生产完毕
+
+	wg.Wait()    // 等所有 worker 结束
+	close(entCh) // 再关掉结果通道,通知写入 goroutine flush 完成
+}
+
+// processCompany 处理单条 company_base 数据,生成 EntInfo
+func processCompany(tmp map[string]interface{}) (EntInfo, bool) {
+	// 过滤条件
+	company_status := util.ObjToString(tmp["company_status"])
+	if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
+		return EntInfo{}, false
+	}
+	if util.IntAll(tmp["use_flag"]) > 0 {
+		return EntInfo{}, false
+	}
+	if util.ObjToString(tmp["company_type"]) == "事业单位" {
+		return EntInfo{}, false
+	}
+
+	var ent EntInfo
+	ent.CompanyID = util.ObjToString(tmp["company_id"])
+	ent.CompanyName = util.ObjToString(tmp["company_name"])
+	ent.CompanyCode = util.ObjToString(tmp["company_code"])
+	ent.CreditNo = util.ObjToString(tmp["credit_no"])
+	ent.OrgCode = util.ObjToString(tmp["org_code"])
+	ent.TaxCode = util.ObjToString(tmp["tax_code"])
+	ent.EstablishDate = util.ObjToString(tmp["establish_date"])
+	ent.LegalPerson = util.ObjToString(tmp["legal_person"])
+	ent.LegalPersonCaption = util.ObjToString(tmp["legal_person_caption"])
+	ent.CompanyStatus = company_status
+	ent.CompanyType = util.ObjToString(tmp["company_type"])
+	ent.Authority = util.ObjToString(tmp["authority"])
+	ent.IssueDate = util.ObjToString(tmp["issue_date"])
+	ent.OperationStartDate = util.ObjToString(tmp["operation_startdate"])
+	ent.OperationEndDate = util.ObjToString(tmp["operation_enddate"])
+	ent.Capital = util.ObjToString(tmp["capital"])
+	ent.CompanyAddress = util.ObjToString(tmp["company_address"])
+	ent.BusinessScope = util.ObjToString(tmp["business_scope"])
+	ent.ComeInTime = time.Now().Unix()
+	ent.UpdateTime = time.Now().Unix()
+	ent.LegalPersonType = int8(util.IntAll(tmp["legal_person_type"]))
+	ent.RealCapital = util.ObjToString(tmp["real_capital"])
+	ent.EnName = util.ObjToString(tmp["en_name"])
+	ent.ListCode = util.ObjToString(tmp["list_code"])
+
+	// annual_reports
+	std := getQyxyStd(util.ObjToString(tmp["company_name"]))
+	if std != nil && len(std) > 0 {
+		reports, ok := std["annual_reports"].([]interface{})
+		if ok {
+			var maxYear float64
+			var employeeNo string
+			for i, r := range reports {
+				if reportMap, ok := r.(map[string]interface{}); ok {
+					year := util.Float64All(reportMap["report_year"])
+					emp := util.ObjToString(reportMap["employee_no"])
+					if i == 0 || year > maxYear {
+						maxYear = year
+						employeeNo = emp
+					}
+				}
+			}
+			if maxYear > 0 {
+				ent.EmployeeNo = util.IntAll(employeeNo)
+			}
+		}
+	}
+
+	ent.Website = util.ObjToString(tmp["website_url"])
+	ent.CompanyPhone = util.ObjToString(tmp["company_phone"])
+	ent.CompanyEmail = util.ObjToString(tmp["company_email"])
+
+	// company_industry_tags
+	whereIndustry := map[string]interface{}{
+		"company_id": util.ObjToString(tmp["company_id"]),
+	}
+	indus, _ := MgoQY.FindOne("company_industry", whereIndustry)
+	ent.CompanyIndustryTags = "{}"
+	if indus != nil && len(*indus) > 0 {
+		name_path := []string{
+			util.ObjToString((*indus)["industry_l1_name"]),
+			util.ObjToString((*indus)["industry_l2_name"]),
+			util.ObjToString((*indus)["industry_l3_name"]),
+			util.ObjToString((*indus)["industry_l4_name"]),
+		}
+		name_code := []string{
+			util.ObjToString((*indus)["industry_l1_code"]),
+			util.ObjToString((*indus)["industry_l2_code"]),
+			util.ObjToString((*indus)["industry_l3_code"]),
+			util.ObjToString((*indus)["industry_l4_code"]),
+		}
+		industry := map[string]interface{}{
+			"name_path": name_path,
+			"code_path": name_code,
+		}
+		jsonBytes, _ := json.Marshal(industry)
+		ent.CompanyIndustryTags = string(jsonBytes)
+	}
+
+	// 行政区划编码
+	area, city, district := util.ObjToString((std)["company_area"]), util.ObjToString((std)["company_city"]), util.ObjToString((std)["company_district"])
+	area_code, city_code, district_code := CalculateRegionCode(area, city, district)
+	ent.JYAreaCode = area_code
+	ent.JYCityCode = city_code
+	ent.JYDistrictCode = district_code
+
+	// ClickHouse 查询历史标签
+	query := `SELECT bitmapToArray(company_label) FROM ent_info WHERE company_id = ?`
+	var oldLabels = make([]uint64, 0)
+	row := ClickHouseConn.QueryRow(context.Background(), query, ent.CompanyID)
+	err := row.Scan(&oldLabels)
+	if err != nil && !errors.Is(err, sql.ErrNoRows) {
+		jlog.Info("dealIncEntInfo: 查询出错", zap.Error(err))
+	}
+	rbm := roaring.NewBitmap()
+	for _, v := range oldLabels {
+		rbm.Add(uint32(v))
+	}
+	bin, _ := rbm.ToBytes()
+	ent.JYCompanyLabel = bin
+
+	ent.JYOrgTopType = "企业"
+	company_type := util.ObjToString(tmp["company_type"])
+	if info, ok := nameNorm[company_type]; ok {
+		ent.JYCompanyTypeOriginCode = info.Code
+		ent.JYCompanyTypeIsLeaf = 1
+		ent.JYCompanyTypeLeafCode = info.Code
+		ent.JYCompanyTypeLeafName = info.Name
+		ent.JYCompanyTypeLeafTag = info.Tag
+		ent.JYOrgPropertyOneTag = "工商"
+		ent.JYOrgPropertyTwoTag = "企业"
+		ent.JYOrgPropertyThreeTag = info.Tag2
+	}
+
+	return ent, true
+}
+
 // dealLeaf 处理存量非叶子节点的企业数据标签
 func dealLeaf() {
 	const batchSize = 50