Browse Source

Merge branch 'master' of https://jygit.jydev.jianyu360.cn/data_processing/data_ent_wuye

# Conflicts:
#	main.go
zhengkun 1 năm trước cách đây
mục cha
commit
b8bb49ff77
4 tập tin đã thay đổi với 864 bổ sung63 xóa
  1. 78 38
      ent_legal/legal_add.go
  2. 677 3
      ent_legal/legal_full.go
  3. 45 13
      ent_util/init.go
  4. 64 9
      main.go

+ 78 - 38
ent_legal/legal_add.go

@@ -2,12 +2,14 @@ package ent_legal
 
 import (
 	"context"
+	"data_ent_wuye/ent_contact"
 	ul "data_ent_wuye/ent_util"
 	"fmt"
 	"strconv"
 	"strings"
 	"sync"
 	"time"
+	"unicode/utf8"
 
 	"log"
 
@@ -68,8 +70,7 @@ func RunLegalInfo(gtid string, lteid string) {
 				qu.ObjToString(tmp["owner"]) == "" {
 				return
 			}
-			//处理构建法人信息
-			CreateLegalInfo(tmp)
+			FindLegal(tmp)
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
@@ -77,52 +78,103 @@ func RunLegalInfo(gtid string, lteid string) {
 	log.Println("is over ~ ", total)
 }
 
-func CreateLegalInfo(tmp map[string]interface{}) {
-	//构建法人信息...
-	query := fmt.Sprintf(`SELECT id FROM information.ent_info  WHERE company_name = '%s'`, tmp["buyer"])
+func FindLegal(tmp map[string]interface{}) {
+	buyer := qu.ObjToString(tmp["buyer"])
+	agency := qu.ObjToString(tmp["agency"])
+	winner := qu.ObjToString(tmp["winner"])
+	owner := qu.ObjToString(tmp["owner"])
+	s_winner := qu.ObjToString(tmp["s_winner"])
+	b_per := qu.ObjToString(tmp["buyerperson"])
+	b_tel := qu.ObjToString(tmp["buyertel"])
+	if utf8.RuneCountInString(b_tel) > 60 {
+		b_tel = ""
+	}
+	a_per := qu.ObjToString(tmp["agencyperson"])
+	a_tel := qu.ObjToString(tmp["agencytel"])
+	if utf8.RuneCountInString(a_tel) > 60 {
+		a_tel = ""
+	}
+	w_per := qu.ObjToString(tmp["winnerperson"])
+	w_tel := qu.ObjToString(tmp["winnertel"])
+	if utf8.RuneCountInString(w_tel) > 60 {
+		w_tel = ""
+	}
+	o_per := qu.ObjToString(tmp["project_person"])
+	o_tel := qu.ObjToString(tmp["project_phone"])
+	if utf8.RuneCountInString(o_tel) > 60 {
+		o_tel = ""
+	}
+	winner_arr, winner_bool := ul.SegmentationEntName(winner, s_winner)
+	//
+	if buyer != "" && utf8.RuneCountInString(buyer) < 30 {
+		FindLegalInfo(buyer, b_per, b_tel)
+	}
+	//中标单位
+	for k, v := range winner_arr {
+		b := winner_bool[k]
+		if b {
+			FindLegalInfo(v, w_per, w_tel)
+		} else {
+			FindLegalInfo(v, "", "")
+		}
+	}
+
+	if agency != "" && utf8.RuneCountInString(agency) < 30 {
+		FindLegalInfo(agency, a_per, a_tel)
+	}
+
+	if owner != "" && utf8.RuneCountInString(owner) < 30 {
+		FindLegalInfo(owner, o_per, o_tel)
+	}
+}
+
+func FindLegalInfo(name, peason, phone string) {
+	query := fmt.Sprintf(`SELECT id FROM information.ent_info  WHERE company_name = '%s'`, name)
 	rows, err := ul.ClickHouseConn.Query(context.Background(), query)
 	if err != nil {
 		log.Println(err)
 	}
-	isok := 0
-	type Res struct {
-		id string
-	}
+	count := 0
+	var id string
 	for rows.Next() {
-		var res Res
 		if err := rows.Scan(
-			&res.id,
+			&id,
 		); err != nil {
 			log.Println(err)
 		}
-		isok++
+		count++
 	}
-	方法名字ABC("徐氏企业")
-	//winner_arr, winner_bool := ul.SegmentationEntName(winner, s_winner)
+	if count > 0 {
+		UpdateContact(id, name, peason, phone)
+	} else {
+		CreateLegalInfo("save", name, peason, phone)
+	}
+}
+
+func UpdateContact(id, name, peason, phone string) {
+	ent_contact.InjectContactAddTask(id, name, peason, phone)
+	//更新标签
 }
 
-func 方法名字ABC(name string) {
-	//法人是否存在更新?
-	//......
-	info := map[string]interface{}{}
-	if info == nil { //新增法人信息
+func CreateLegalInfo(dataType, name, peason, phone string) {
+	if dataType == "save" {
 		base_info := map[string]interface{}{}
 		//法人信息所需要的大部分企业信息
 		qyxy_info := GetQyxyInfo(name)
-		base_info["name"] = name
+		base_info["company_name"] = name
 		area := qu.ObjToString(qyxy_info["company_area"])
 		city := qu.ObjToString(qyxy_info["company_city"])
 		district := qu.ObjToString(qyxy_info["company_district"])
 		base_info["area_code"], base_info["city_code"], base_info["district_code"] = ul.CalculateRegionCode(area, city, district)
-		name_id := uuid.New().String()
-		name_id = strings.ReplaceAll(name_id, "-", "")
-		base_info["name_id"] = name_id
-		base_info["createtime"] = time.Unix(time.Now().Unix(), 0).Format(ul.TimeLayout)
+		id := uuid.New().String()
+		id = strings.ReplaceAll(id, "-", "")
+		base_info["id"] = id
+		base_info["createtime"] = time.Now().Unix()
 		/*
 			···
 			···
 		*/
-		//告知通讯录...
+		ent_contact.InjectContactAddTask(id, name, peason, phone)
 	} else {
 		//告知通讯录...
 	}
@@ -131,19 +183,7 @@ func 方法名字ABC(name string) {
 // 创建企业信息
 func GetQyxyInfo(name string) map[string]interface{} {
 	qyxy_info := map[string]interface{}{}
-	dataArr, _ := ul.QyxyMgo.Find("qyxy_std", map[string]interface{}{"company_name": name}, map[string]interface{}{"updatetime": -1}, map[string]interface{}{
-		"_id":              1,
-		"company_name":     1,
-		"company_address":  1,
-		"company_area":     1,
-		"company_city":     1,
-		"company_district": 1,
-		"legal_person":     1,
-		"company_phone":    1,
-		"company_email":    1,
-		"company_type_old": 1,
-		"字段很多":             1,
-	})
+	dataArr, _ := ul.QyxyMgo.Find("qyxy_std", map[string]interface{}{"company_name": name}, map[string]interface{}{"updatetime": -1}, nil)
 	if len(dataArr) > 0 {
 		qyxy_info = dataArr[0] //补充企业信息
 	} else {

+ 677 - 3
ent_legal/legal_full.go

@@ -1,10 +1,684 @@
 package ent_legal
 
-func 全量方法ABC() {
-	//从现有主题库...直接获取
+import (
+	"context"
+	"data_ent_wuye/ent_contact"
+	"data_ent_wuye/ent_util"
+	"encoding/json"
+	"fmt"
+	"log"
+	"regexp"
+	"strconv"
+	"strings"
+	"sync"
 
-	//凭安全量企业···查到数据告知通讯录
+	"github.com/google/uuid"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+)
 
+var RegClean = regexp.MustCompile("[^ -~\u2E80-\u2FDF\u3040-\u318F\u31A0-\u31BF\u31F0-\u31FF\u3400-\u4DB5\u4E00-\u9FFF\uA960-\uA97F\uAC00-\uD7FF\u3002\u00a5\uff1f\uff01\uff0c\u3001\uff1b\uff1a\u201c\u201d\u2018\u2019\uff08\uff09\u300a\u300b\u3008\u3009\u3010\u3011\u300e\u300f\u300c\u300d\ufe43\ufe44\u3014\u3015\u2026\u2014\uff5e\ufe4f\uffe5\u00a5]+")
+
+func LegalFull() {
+	//从现有主题库...直接获取
+	getZhuTi()
+	//凭安全量企业···查到数据告知通讯录
+	// getPingAn()
+	//特企
+	// getTQ()
 	//马克信息···
+	// getMK()
+}
+
+func getZhuTi() {
+	log.Println("主体库全量任务开始")
+	index, total := 0, 0
+	q := map[string]interface{}{
+		"createtime": map[string]interface{}{
+			"lt": "2024-04-19 00:00:00",
+		},
+	}
+L:
+	for {
+		dataArr := ent_util.MysqlGlobalTool.Find("dws_f_ent_baseinfo", q, "name_id,name,company_id,address,area_code,city_code,district_code", "id", index*50000, 50000)
+		if dataArr != nil {
+			if len(*dataArr) == 0 {
+				break
+			}
+			pool := make(chan bool, 10)
+			wg := &sync.WaitGroup{}
+			for _, v := range *dataArr {
+				pool <- true
+				wg.Add(1)
+				go func(v map[string]interface{}) {
+					defer func() {
+						<-pool
+						wg.Done()
+					}()
+					id := qu.ObjToString(v["name_id"])
+					company_name := qu.ObjToString(v["name"])
+					if company_name == "" {
+						return
+					}
+					company_id := qu.ObjToString(v["company_id"])
+					company_address := qu.ObjToString(v["address"])
+					area_code := qu.ObjToString(v["area_code"])
+					city_code := qu.ObjToString(v["city_code"])
+					district_code := qu.ObjToString(v["district_code"])
+					company_label, company_label_str := []uint64{uint64(0)}, ""
+					qyxy_info := GetQyxyInfo(company_name)
+					// if qyxy_info != nil {
+					// 	company_label, company_label_str = getCompanyLabel(company_name)
+					// }
+					company_code := qu.ObjToString(qyxy_info["company_code"])
+					credit_no := qu.ObjToString(qyxy_info["credit_no"])
+					org_code := qu.ObjToString(qyxy_info["org_code"])
+					tax_code := qu.ObjToString(qyxy_info["tax_code"])
+					establish_date := qu.Int64All(qyxy_info["establish_date"])
+					legal_person := qu.ObjToString(qyxy_info["legal_person"])
+					legal_person_caption := qu.ObjToString(qyxy_info["legal_person_caption"])
+					company_status := qu.ObjToString(qyxy_info["company_status"])
+					company_type := qu.ObjToString(qyxy_info["company_type"])
+					authority := qu.ObjToString(qyxy_info["authority"])
+					issue_date := qu.Int64All(qyxy_info["issue_date"])
+					operation_startdate := qu.ObjToString(qyxy_info["operation_startdate"])
+					operation_enddate := qu.ObjToString(qyxy_info["operation_enddate"])
+					capital := qu.ObjToString(qyxy_info["capital"])
+					business_scope := qu.ObjToString(qyxy_info["business_scope"])
+					comeintime := qu.Int64All(qyxy_info["comeintime"])
+					updatetime := qu.Int64All(qyxy_info["updatetime"])
+					legal_person_type := qu.IntAll(qyxy_info["legal_person_type"])
+					real_capital := qu.ObjToString(qyxy_info["real_capital"])
+					en_name := qu.ObjToString(qyxy_info["en_name"])
+					list_code := qu.ObjToString(qyxy_info["list_code"])
+					employee_no := qu.IntAll(qyxy_info["employee_no"])
+					website := qu.ObjToString(qyxy_info["website"])
+					company_phone := qu.ObjToString(qyxy_info["company_phone"])
+					company_email := qu.ObjToString(qyxy_info["company_email"])
+					query := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
+					err := ent_util.ClickHouseConn.Exec(context.Background(), query, id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email)
+					if err != nil {
+						log.Println(err, "clickhouse存入失败", company_name)
+					} else {
+						data := map[string]interface{}{
+							"_id":                  id,
+							"company_name":         company_name,
+							"company_id":           company_id,
+							"company_address":      company_address,
+							"area_code":            area_code,
+							"city_code":            city_code,
+							"district_code":        district_code,
+							"company_label":        company_label_str,
+							"company_code":         company_code,
+							"credit_no":            credit_no,
+							"org_code":             org_code,
+							"tax_code":             tax_code,
+							"establish_date":       establish_date,
+							"legal_person":         legal_person,
+							"legal_person_caption": legal_person_caption,
+							"company_status":       company_status,
+							"company_type":         company_type,
+							"authority":            authority,
+							"issue_date":           issue_date,
+							"operation_startdate":  operation_startdate,
+							"operation_enddate":    operation_enddate,
+							"capital":              capital,
+							"business_scope":       business_scope,
+							"comeintime":           comeintime,
+							"updatetime":           updatetime,
+							"legal_person_type":    legal_person_type,
+							"real_capital":         real_capital,
+							"en_name":              en_name,
+							"list_code":            list_code,
+							"employee_no":          employee_no,
+							"website":              website,
+							"company_phone":        company_phone,
+							"company_email":        company_email,
+						}
+						ok := ent_util.EsClinet.Save("ent_info", "", data)
+						if ok {
+							log.Println("es存入成功")
+						} else {
+							log.Println("es存入失败!!!!", company_name)
+						}
+					}
+				}(v)
+			}
+		} else {
+			break L
+		}
+		index++
+		total += len(*dataArr)
+		if index%5 == 0 {
+			log.Println("cur idx ", total)
+		}
+	}
+	log.Println("主体库全量任务结束")
+}
+
+func getPingAn() {
+	sess := ent_util.QyxyMgo.GetMgoConn()
+	defer ent_util.QyxyMgo.DestoryMongoConn(sess)
+	q := map[string]interface{}{}
+	it := sess.DB("mixdata").C("qyxy_std").Find(&q).Sort("_id").Iter()
+	pool := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	total := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%1000 == 0 {
+			log.Println("cur index ", total)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			use_flag := qu.IntAll(tmp["use_flag"])
+			is_history := qu.IntAll(tmp["is_history"])
+			company_type := qu.ObjToString(tmp["company_type"])
+			company_name := qu.ObjToString(tmp["company_name"])
+			if use_flag > 5 || is_history != 0 || company_name == "" || company_type == "个体工商户" {
+				return
+			}
+			query := fmt.Sprintf(`SELECT id FROM information.ent_info WHERE company_name = '%s'`, qu.ObjToString(tmp["company_name"]))
+			rows, err := ent_util.ClickHouseConn.Query(context.Background(), query)
+			if err != nil {
+				log.Println(err)
+			}
+			count := 0
+			var id string
+			for rows.Next() {
+				if err := rows.Scan(
+					&id,
+				); err != nil {
+					log.Println(err)
+				}
+				count++
+			}
+			if count == 0 {
+				id = uuid.New().String()
+				id = strings.ReplaceAll(id, "-", "")
+				area := qu.ObjToString(tmp["company_area"])
+				city := qu.ObjToString(tmp["company_city"])
+				district := qu.ObjToString(tmp["company_district"])
+				area_code, city_code, district_code := ent_util.CalculateRegionCode(area, city, district)
+				company_name := qu.ObjToString(tmp["company_name"])
+				company_id := ent_util.BsonTOStringId(tmp["_id"])
+				company_address := qu.ObjToString(tmp["company_address"])
+				company_label, company_label_str := getCompanyLabel(company_name)
+				company_code := qu.ObjToString(tmp["company_code"])
+				credit_no := qu.ObjToString(tmp["credit_no"])
+				org_code := qu.ObjToString(tmp["org_code"])
+				tax_code := qu.ObjToString(tmp["tax_code"])
+				establish_date := qu.Int64All(tmp["establish_date"])
+				legal_person := qu.ObjToString(tmp["legal_person"])
+				legal_person_caption := qu.ObjToString(tmp["legal_person_caption"])
+				company_status := qu.ObjToString(tmp["company_status"])
+				company_type := qu.ObjToString(tmp["company_type"])
+				authority := qu.ObjToString(tmp["authority"])
+				issue_date := qu.Int64All(tmp["issue_date"])
+				operation_startdate := qu.ObjToString(tmp["operation_startdate"])
+				operation_enddate := qu.ObjToString(tmp["operation_enddate"])
+				capital := qu.ObjToString(tmp["capital"])
+				business_scope := qu.ObjToString(tmp["business_scope"])
+				comeintime := qu.Int64All(tmp["comeintime"])
+				updatetime := qu.Int64All(tmp["updatetime"])
+				legal_person_type := qu.IntAll(tmp["legal_person_type"])
+				real_capital := qu.ObjToString(tmp["real_capital"])
+				en_name := qu.ObjToString(tmp["en_name"])
+				list_code := qu.ObjToString(tmp["list_code"])
+				employee_no := qu.IntAll(tmp["employee_no"])
+				website := qu.ObjToString(tmp["website"])
+				company_phone := qu.ObjToString(tmp["company_phone"])
+				company_email := qu.ObjToString(tmp["company_email"])
+				query := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
+				err := ent_util.ClickHouseConn.Exec(context.Background(), query, id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email)
+				if err != nil {
+					log.Println(err, "clickhouse存入失败", company_name)
+				} else {
+					data := map[string]interface{}{
+						"_id":                  id,
+						"company_name":         company_name,
+						"company_id":           company_id,
+						"company_address":      company_address,
+						"area_code":            area_code,
+						"city_code":            city_code,
+						"district_code":        district_code,
+						"company_label":        company_label_str,
+						"company_code":         company_code,
+						"credit_no":            credit_no,
+						"org_code":             org_code,
+						"tax_code":             tax_code,
+						"establish_date":       establish_date,
+						"legal_person":         legal_person,
+						"legal_person_caption": legal_person_caption,
+						"company_status":       company_status,
+						"company_type":         company_type,
+						"authority":            authority,
+						"issue_date":           issue_date,
+						"operation_startdate":  operation_startdate,
+						"operation_enddate":    operation_enddate,
+						"capital":              capital,
+						"business_scope":       business_scope,
+						"comeintime":           comeintime,
+						"updatetime":           updatetime,
+						"legal_person_type":    legal_person_type,
+						"real_capital":         real_capital,
+						"en_name":              en_name,
+						"list_code":            list_code,
+						"employee_no":          employee_no,
+						"website":              website,
+						"company_phone":        company_phone,
+						"company_email":        company_email,
+					}
+					ent_util.EsClinet.Save("ent_info", "", data)
+					ent_contact.InjectContactPingAnInfo(id, tmp)
+				}
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Println("is over ~ ", total)
+}
+
+func getTQ() {
+	tqarr := []string{"special_enterprise", "special_foundation", "special_gov_unit", "special_hongkong_company", "special_hongkong_company_history", "special_law_office", "special_social_organ", "special_trade_union"}
+	for _, db := range tqarr {
+		sess := ent_util.SpiMgo.GetMgoConn()
+		defer ent_util.SpiMgo.DestoryMongoConn(sess)
+		q := map[string]interface{}{}
+		it := sess.DB("mixdata").C(db).Find(&q).Sort("_id").Iter()
+		pool := make(chan bool, 10)
+		wg := &sync.WaitGroup{}
+		total := 0
+		for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+			if total%1000 == 0 {
+				log.Println("cur index ", total)
+			}
+			pool <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				use_flag := qu.IntAll(tmp["use_flag"])
+				company_name := qu.ObjToString(tmp["company_name"])
+				if use_flag > 5 || company_name == "" {
+					return
+				}
+				query := fmt.Sprintf(`SELECT id FROM information.ent_info WHERE company_name = '%s'`, qu.ObjToString(tmp["company_name"]))
+				rows, err := ent_util.ClickHouseConn.Query(context.Background(), query)
+				if err != nil {
+					log.Println(err)
+				}
+				count := 0
+				var id string
+				for rows.Next() {
+					if err := rows.Scan(
+						&id,
+					); err != nil {
+						log.Println(err)
+					}
+					count++
+				}
+				if count == 0 {
+					id = uuid.New().String()
+					id = strings.ReplaceAll(id, "-", "")
+					area := qu.ObjToString(tmp["company_area"])
+					city := qu.ObjToString(tmp["company_city"])
+					district := qu.ObjToString(tmp["company_district"])
+					area_code, city_code, district_code := ent_util.CalculateRegionCode(area, city, district)
+					company_name := qu.ObjToString(tmp["company_name"])
+					company_id := ent_util.BsonTOStringId(tmp["_id"])
+					company_address := qu.ObjToString(tmp["company_address"])
+					company_label, company_label_str := getCompanyLabel(company_name)
+					company_code := qu.ObjToString(tmp["company_code"])
+					credit_no := qu.ObjToString(tmp["credit_no"])
+					org_code := qu.ObjToString(tmp["org_code"])
+					tax_code := qu.ObjToString(tmp["tax_code"])
+					establish_date := qu.Int64All(tmp["establish_date"])
+					legal_person := qu.ObjToString(tmp["legal_person"])
+					legal_person_caption := qu.ObjToString(tmp["legal_person_caption"])
+					company_status := qu.ObjToString(tmp["company_status"])
+					company_type := qu.ObjToString(tmp["company_type"])
+					authority := qu.ObjToString(tmp["authority"])
+					issue_date := qu.Int64All(tmp["issue_date"])
+					operation_startdate := qu.ObjToString(tmp["operation_startdate"])
+					operation_enddate := qu.ObjToString(tmp["operation_enddate"])
+					capital := qu.ObjToString(tmp["capital"])
+					business_scope := qu.ObjToString(tmp["business_scope"])
+					comeintime := qu.Int64All(tmp["comeintime"])
+					updatetime := qu.Int64All(tmp["updatetime"])
+					legal_person_type := qu.IntAll(tmp["legal_person_type"])
+					real_capital := qu.ObjToString(tmp["real_capital"])
+					en_name := qu.ObjToString(tmp["en_name"])
+					list_code := qu.ObjToString(tmp["list_code"])
+					employee_no := qu.IntAll(tmp["employee_no"])
+					website := qu.ObjToString(tmp["website"])
+					company_phone := qu.ObjToString(tmp["company_phone"])
+					company_email := qu.ObjToString(tmp["company_email"])
+					query := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
+					err := ent_util.ClickHouseConn.Exec(context.Background(), query, id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email)
+					if err != nil {
+						log.Println(err, "clickhouse存入失败", company_name)
+					} else {
+						data := map[string]interface{}{
+							"_id":                  id,
+							"company_name":         company_name,
+							"company_id":           company_id,
+							"company_address":      company_address,
+							"area_code":            area_code,
+							"city_code":            city_code,
+							"district_code":        district_code,
+							"company_label":        company_label_str,
+							"company_code":         company_code,
+							"credit_no":            credit_no,
+							"org_code":             org_code,
+							"tax_code":             tax_code,
+							"establish_date":       establish_date,
+							"legal_person":         legal_person,
+							"legal_person_caption": legal_person_caption,
+							"company_status":       company_status,
+							"company_type":         company_type,
+							"authority":            authority,
+							"issue_date":           issue_date,
+							"operation_startdate":  operation_startdate,
+							"operation_enddate":    operation_enddate,
+							"capital":              capital,
+							"business_scope":       business_scope,
+							"comeintime":           comeintime,
+							"updatetime":           updatetime,
+							"legal_person_type":    legal_person_type,
+							"real_capital":         real_capital,
+							"en_name":              en_name,
+							"list_code":            list_code,
+							"employee_no":          employee_no,
+							"website":              website,
+							"company_phone":        company_phone,
+							"company_email":        company_email,
+						}
+						ent_util.EsClinet.Save("ent_info", "", data)
+						ent_contact.InjectContactPingAnInfo(id, tmp)
+					}
+				}
+			}(tmp)
+			tmp = make(map[string]interface{})
+		}
+		wg.Wait()
+		log.Println("is over ~ ", total)
+	}
+}
+
+func getMK() {
+	sess := ent_util.QyxyMgo.GetMgoConn()
+	defer ent_util.QyxyMgo.DestoryMongoConn(sess)
+	q := map[string]interface{}{}
+	it := sess.DB("mixdata").C("qyxy_xzh").Find(&q).Sort("_id").Iter()
+	pool := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	total := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
+		if total%1000 == 0 {
+			log.Println("cur index ", total)
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			company_name := qu.ObjToString(tmp["company_name"])
+			if company_name == "" {
+				return
+			}
+			query := fmt.Sprintf(`SELECT id FROM information.ent_info WHERE company_name = '%s'`, qu.ObjToString(tmp["company_name"]))
+			rows, err := ent_util.ClickHouseConn.Query(context.Background(), query)
+			if err != nil {
+				log.Println(err)
+			}
+			count := 0
+			var id string
+			for rows.Next() {
+				if err := rows.Scan(
+					&id,
+				); err != nil {
+					log.Println(err)
+				}
+				count++
+			}
+			if count == 0 {
+				id = uuid.New().String()
+				id = strings.ReplaceAll(id, "-", "")
+				area := qu.ObjToString(tmp["company_area"])
+				city := qu.ObjToString(tmp["company_city"])
+				district := qu.ObjToString(tmp["company_district"])
+				area_code, city_code, district_code := ent_util.CalculateRegionCode(area, city, district)
+				company_name := qu.ObjToString(tmp["company_name"])
+				if company_name == "" {
+					return
+				}
+				company_id := ent_util.BsonTOStringId(tmp["_id"])
+				company_address := qu.ObjToString(tmp["company_address"])
+				company_label, company_label_str := getCompanyLabel(company_name)
+				company_code := qu.ObjToString(tmp["company_code"])
+				credit_no := qu.ObjToString(tmp["credit_no"])
+				org_code := qu.ObjToString(tmp["org_code"])
+				tax_code := qu.ObjToString(tmp["tax_code"])
+				establish_date := qu.Int64All(tmp["establish_date"])
+				legal_person := qu.ObjToString(tmp["legal_person"])
+				legal_person_caption := qu.ObjToString(tmp["legal_person_caption"])
+				company_status := qu.ObjToString(tmp["company_status"])
+				company_type := qu.ObjToString(tmp["company_type"])
+				authority := qu.ObjToString(tmp["authority"])
+				issue_date := qu.Int64All(tmp["issue_date"])
+				operation_startdate := qu.ObjToString(tmp["operation_startdate"])
+				operation_enddate := qu.ObjToString(tmp["operation_enddate"])
+				capital := qu.ObjToString(tmp["capital"])
+				business_scope := qu.ObjToString(tmp["business_scope"])
+				comeintime := qu.Int64All(tmp["comeintime"])
+				updatetime := qu.Int64All(tmp["updatetime"])
+				legal_person_type := qu.IntAll(tmp["legal_person_type"])
+				real_capital := qu.ObjToString(tmp["real_capital"])
+				en_name := qu.ObjToString(tmp["en_name"])
+				list_code := qu.ObjToString(tmp["list_code"])
+				employee_no := qu.IntAll(tmp["employee_no"])
+				website := qu.ObjToString(tmp["website"])
+				company_phone := qu.ObjToString(tmp["company_phone"])
+				company_email := qu.ObjToString(tmp["company_email"])
+				query := `INSERT INTO information.ent_info (id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email) VALUES(?, ?, ?, bitmapBuild(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
+				err := ent_util.ClickHouseConn.Exec(context.Background(), query, id, company_id, company_name, company_label, company_code, credit_no, org_code, tax_code, establish_date, legal_person, legal_person_caption, company_status, company_type, authority, issue_date, operation_startdate, operation_enddate, capital, company_address, business_scope, comeintime, updatetime, legal_person_type, real_capital, en_name, area_code, city_code, district_code, list_code, employee_no, website, company_phone, company_email)
+				if err != nil {
+					log.Println(err, "clickhouse存入失败", company_name)
+				} else {
+					data := map[string]interface{}{
+						"_id":                  id,
+						"company_name":         company_name,
+						"company_id":           company_id,
+						"company_address":      company_address,
+						"area_code":            area_code,
+						"city_code":            city_code,
+						"district_code":        district_code,
+						"company_label":        company_label_str,
+						"company_code":         company_code,
+						"credit_no":            credit_no,
+						"org_code":             org_code,
+						"tax_code":             tax_code,
+						"establish_date":       establish_date,
+						"legal_person":         legal_person,
+						"legal_person_caption": legal_person_caption,
+						"company_status":       company_status,
+						"company_type":         company_type,
+						"authority":            authority,
+						"issue_date":           issue_date,
+						"operation_startdate":  operation_startdate,
+						"operation_enddate":    operation_enddate,
+						"capital":              capital,
+						"business_scope":       business_scope,
+						"comeintime":           comeintime,
+						"updatetime":           updatetime,
+						"legal_person_type":    legal_person_type,
+						"real_capital":         real_capital,
+						"en_name":              en_name,
+						"list_code":            list_code,
+						"employee_no":          employee_no,
+						"website":              website,
+						"company_phone":        company_phone,
+						"company_email":        company_email,
+					}
+					ent_util.EsClinet.Save("ent_info", "", data)
+					ent_contact.InjectContactPingAnInfo(id, tmp)
+				}
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Println("is over ~ ", total)
+}
 
+func getCompanyLabel(name string) ([]uint64, string) {
+	name = strings.TrimSpace(RegClean.ReplaceAllString(name, ""))
+	sql := `{
+	  "query": {
+	    "bool": {
+	      "should": [
+	        {
+	          "term": {
+	            "buyer": "` + name + `"
+	          }
+	        },
+	        {
+	          "term": {
+	            "s_winner": "` + name + `"
+	          }
+	        },
+	        {
+	          "term": {
+	            "agency": "` + name + `"
+	          }
+	        }
+	      ]
+	    }
+	  },
+	  "aggs": {
+	    "total_sum_a": {
+	      "terms": {
+	        "field": "tag_subinformation"
+	      }
+	    },
+	    "total_sum_b": {
+	      "terms": {
+	        "field": "tag_subinformation_ai"
+	      }
+	    },
+	    "total_sum_c": {
+	      "terms": {
+	        "field": "property_form"
+	      }
+	    }
+	  },
+	  "size": 0
+	}`
+	sql2 := `{
+	  "query": {
+	    "bool": {
+	      "must": [
+	        {
+	          "term": {
+	            "buyer": "` + name + `"
+	          }
+	        }
+	      ]
+	    }
+	  },
+	  "aggs": {
+	    "total_sum_a": {
+	      "terms": {
+	        "field": "buyerclass"
+	      }
+	    }
+	  },
+	  "size": 0
+	}`
+	subMap, propertyFormArr, buyerClassArr, bitMapCodeArr, bitMapStrArr := map[string]bool{}, []string{}, []string{}, []uint64{}, []string{}
+	res, _, _ := ent_util.Es.GetAggs("bidding", "bidding", sql)
+	if res != nil {
+		ass, _ := res.Children("total_sum_a")
+		as, _ := ass.Aggregations["buckets"].MarshalJSON()
+		if len(as) > 0 {
+			buckets := []map[string]interface{}{}
+			json.Unmarshal(as, &buckets)
+			if len(buckets) > 0 {
+				for _, v := range buckets {
+					subMap[qu.ObjToString(v["key"])] = true //tag_subinformation
+				}
+			}
+		}
+		bss, _ := res.Children("total_sum_b")
+		bs, _ := bss.Aggregations["buckets"].MarshalJSON()
+		if len(bs) > 0 {
+			buckets := []map[string]interface{}{}
+			json.Unmarshal(bs, &buckets)
+			if len(buckets) > 0 {
+				for _, v := range buckets {
+					subMap[qu.ObjToString(v["key"])] = true //tag_subinformation_ai
+				}
+			}
+		}
+		css, _ := res.Children("total_sum_c")
+		cs, _ := css.Aggregations["buckets"].MarshalJSON()
+		if len(cs) > 0 {
+			buckets := []map[string]interface{}{}
+			json.Unmarshal(cs, &buckets)
+			if len(buckets) > 0 {
+				for _, v := range buckets {
+					propertyFormArr = append(propertyFormArr, qu.ObjToString(v["key"])) //property_form
+				}
+			}
+		}
+	} else {
+		log.Println("sql", sql)
+	}
+	ress, _, _ := ent_util.Es.GetAggs("bidding", "bidding", sql2)
+	if ress != nil {
+		css, _ := res.Children("total_sum_a")
+		cs, _ := css.Aggregations["buckets"].MarshalJSON()
+		if len(cs) > 0 {
+			buckets := []map[string]interface{}{}
+			json.Unmarshal(cs, &buckets)
+			if len(buckets) > 0 {
+				for _, v := range buckets {
+					buyerClassArr = append(buyerClassArr, qu.ObjToString(v["key"])) //buyerclass
+				}
+			}
+		}
+	} else {
+		log.Println("sql2", sql2)
+	}
+	if len(subMap) > 0 {
+		for s, _ := range subMap {
+			bitMapStrArr = append(bitMapStrArr, strconv.Itoa(ent_util.BitMapCode2[s]))
+			bitMapCodeArr = append(bitMapCodeArr, uint64(ent_util.BitMapCode2[s]))
+		}
+	}
+	if len(propertyFormArr) > 0 {
+		for _, p := range propertyFormArr {
+			bitMapStrArr = append(bitMapStrArr, strconv.Itoa(ent_util.BitMapCode3[p]))
+			bitMapCodeArr = append(bitMapCodeArr, uint64(ent_util.BitMapCode3[p]))
+		}
+	}
+	if len(buyerClassArr) > 0 {
+		for _, b := range propertyFormArr {
+			bitMapStrArr = append(bitMapStrArr, strconv.Itoa(ent_util.BitMapCode1[b]))
+			bitMapCodeArr = append(bitMapCodeArr, uint64(ent_util.BitMapCode1[b]))
+		}
+	}
+	if len(bitMapCodeArr) == 0 {
+		bitMapCodeArr = []uint64{uint64(0)}
+	}
+	return bitMapCodeArr, strings.Join(bitMapStrArr, ",")
 }

+ 45 - 13
ent_util/init.go

@@ -2,15 +2,14 @@ package ent_util
 
 import (
 	"context"
+	"log"
 	"runtime"
 	"time"
 
-	"log"
-
+	elastic "app.yhyue.com/moapp/jybase/es"
 	"github.com/ClickHouse/clickhouse-go/v2"
 	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
 	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
-	"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
 )
 
 var (
@@ -20,9 +19,13 @@ var (
 	SysConfig          map[string]interface{}
 	IsLocal            bool
 	ClickHouseConn     driver.Conn
-	EsClinet           *elastic.Elastic
+	EsClinet           elastic.Es
+	Es                 elastic.Es
 	BuyerClassData     = map[string]string{}
 	RegionCodeData     = map[string]string{}
+	BitMapCode1        = map[string]int{}
+	BitMapCode2        = map[string]int{}
+	BitMapCode3        = map[string]int{}
 	TimeLayout         = "2006年01月02日"
 	TimeLayout_New     = "2006-01-02 15:04:05"
 	Url                = "https://www.jianyu360.cn/article/content/%s.html"
@@ -143,7 +146,9 @@ func initVCode() {
 // 创建clickhouse连接
 func initClickHouse() {
 	ClickHouseConn, _ = connectClickhouse()
+	getClickHouseCode()
 }
+
 func connectClickhouse() (driver.Conn, error) {
 	ck := *qu.ObjToMap(SysConfig["clickhouse"])
 	var (
@@ -175,22 +180,49 @@ func connectClickhouse() (driver.Conn, error) {
 	return conn, nil
 }
 
+func getClickHouseCode() {
+	query := `SELECT label_type,label_name,bitmap_num FROM information.ent_label`
+	rows, err := ClickHouseConn.Query(context.Background(), query)
+	if err != nil {
+		log.Println(err)
+	}
+	var (
+		label_type int8
+		label_name string
+		bitmap_num int8
+	)
+	for rows.Next() {
+		if err := rows.Scan(
+			&label_type,
+			&label_name,
+			&bitmap_num,
+		); err != nil {
+			log.Println(err)
+		} else {
+			if label_type == 1 {
+				BitMapCode1[label_name] = qu.IntAll(bitmap_num)
+			} else if label_type == 2 {
+				BitMapCode2[label_name] = qu.IntAll(bitmap_num)
+			} else if label_type == 3 {
+				BitMapCode3[label_name] = qu.IntAll(bitmap_num)
+			}
+		}
+	}
+	log.Println("bitmap占位代码表数量~", len(BitMapCode1), len(BitMapCode2), len(BitMapCode3))
+}
+
 func initEs() {
 	es := *qu.ObjToMap(SysConfig["es"])
-	EsClinet = &elastic.Elastic{
-		S_esurl:  qu.ObjToString(es["addr"]),
-		I_size:   10,
-		Username: qu.ObjToString(es["username"]),
-		Password: qu.ObjToString(es["password"]),
-	}
-	EsClinet.InitElasticSize()
+	EsClinet = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"]))
+	Es = elastic.NewEs("07", qu.ObjToString(es["addr"]), 100, qu.ObjToString(es["username"]), qu.ObjToString(es["password"]))
 }
 
 // ...
 func GetNewInfo(index, query string) map[string]interface{} {
 	//log.Println("query  -- ", query)
-	client := EsClinet.GetEsConn()
-	defer EsClinet.DestoryEsConn(client)
+	esCon := elastic.VarEs.(*elastic.EsV7)
+	client := esCon.GetEsConn()
+	defer esCon.DestoryEsConn(client)
 	res := map[string]interface{}{}
 	if client != nil {
 		defer func() {

+ 64 - 9
main.go

@@ -17,15 +17,18 @@ import (
 
 func init() {
 	ent_util.InitGlobalVar()
-	log.Println("init over ...")
 }
 
 func main() {
-	ent_contact.InjectContactTidbFullInfo()
-	return
-	//tidb全量
-	mode := flag.String("m", "1", "")
-	if *mode == "1" {
+	mode := flag.Int("m", 1, "")
+	flag.Parse()
+	if *mode == 1 {
+		//全量
+		log.Println("全量任务")
+		ent_legal.LegalFull()
+		// projectT()
+
+	} else {
 		//增量
 		a := cron.New()
 		a.AddFunc("0 0 20 * * ?", func() {
@@ -33,8 +36,6 @@ func main() {
 		})
 		a.Start()
 		select {}
-	} else {
-		//全量
 	}
 }
 
@@ -83,7 +84,7 @@ func test1() {
 func test2() {
 
 	query := `ALTER TABLE information.information DELETE WHERE id='676470119ae64a18bab5d1fdb5f06bb3' `
-	query = `TRUNCATE TABLE information.ent_contact`
+	query = `-- TRUNCATE TABLE information.information_copy;`
 	err := ent_util.ClickHouseConn.Exec(context.Background(), query)
 	if err != nil {
 		log.Println(err)
@@ -103,3 +104,57 @@ func test2() {
 		}
 	}
 }
+
+func projectT() {
+	query := `SELECT project_id,project_name,business_type,property_form,subclass,area,city,district,buyer,create_time FROM information.transaction_info`
+	rows, err := ent_util.ClickHouseConn.Query(context.Background(), query)
+	if err != nil {
+		log.Println(err)
+	}
+	count := 0
+	var (
+		project_id    string
+		project_name  string
+		business_type string
+		property_form []string
+		subclass      []string
+		area          string
+		city          string
+		district      string
+		create_time   int64
+		buyer         string
+	)
+	for rows.Next() {
+		if err := rows.Scan(
+			&project_id,
+			&project_name,
+			&business_type,
+			&property_form,
+			&subclass,
+			&area,
+			&city,
+			&district,
+			&buyer,
+			&create_time,
+		); err != nil {
+			log.Println(err)
+		} else {
+			ent_util.EsClinet.UpdateNewDoc("transaction_info", "", map[string]interface{}{
+				"_id":           project_id,
+				"project_name":  project_name,
+				"business_type": business_type,
+				"property_form": property_form,
+				"subclass":      subclass,
+				"area":          area,
+				"city":          city,
+				"district":      district,
+				"create_time":   create_time,
+				"buyer":         buyer,
+				"winner":        buyer,
+				"agency":        buyer,
+			})
+		}
+		count++
+		log.Println("count ", count)
+	}
+}