|
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/dgraph-io/badger/v3"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "net/url"
- "regexp"
- "strings"
- "sync"
- "unicode/utf8"
- )
- var phoneRegexp = regexp.MustCompile(`^((\+?86)?1[3-9]\d{9}$)|(^0\d{2,3}-?\d{7,8}$)`)
- var emailRegexp = regexp.MustCompile(`^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$`)
- type AnnualReport struct {
- OperatorName string `bson:"operator_name"`
- CompanyPhone string `bson:"company_phone"`
- CompanyEmail string `bson:"company_email"`
- }
- type Company struct {
- CreditNo string `bson:"credit_no"`
- CompanyID string `bson:"_id"`
- CompanyStatus string `bson:"company_status"`
- CompanyName string `bson:"company_name"`
- AnnualReports []AnnualReport `bson:"annual_reports"`
- UseFlag int `bson:"use_flag"`
- }
- type SuspectMatch struct {
- CreditNo1 string `bson:"credit_no1"`
- CreditNo2 string `bson:"credit_no2"`
- CompanyID1 string `bson:"company_id1"`
- CompanyID2 string `bson:"company_id2"`
- Name1 string `bson:"company_name1"`
- Name2 string `bson:"company_name2"`
- MatchKeys []string `bson:"match_keys"`
- MatchValue []string `bson:"match_values"`
- }
- // BuildMongoURI 构造 MongoDB 连接 URI
- func BuildMongoURI(username, password string, hosts []string, options map[string]string) (string, error) {
- if len(hosts) == 0 {
- return "", fmt.Errorf("hosts cannot be empty")
- }
- escapedUsername := url.QueryEscape(username)
- escapedPassword := url.QueryEscape(password)
- hostList := strings.Join(hosts, ",")
- var optionStr string
- if len(options) > 0 {
- query := url.Values{}
- for k, v := range options {
- query.Set(k, v)
- }
- optionStr = "?" + query.Encode()
- }
- return fmt.Sprintf("mongodb://%s:%s@%s%s", escapedUsername, escapedPassword, hostList, optionStr), nil
- }
- func dealYS() {
- ctx := context.Background()
- username := "SJZY_RWbid_ES"
- password := "SJZY@B4i4D5e6S"
- hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"}
- //hosts := []string{"127.0.0.1:27083"}
- optionsSet := map[string]string{}
- uri, err := BuildMongoURI(username, password, hosts, optionsSet)
- if err != nil {
- panic(err)
- }
- clientOptions := options.Client().ApplyURI(uri)
- //clientOptions.SetReadPreference(readpref.Primary())
- //clientOptions.SetDirect(true)
- // 连接MongoDB
- client, err := mongo.Connect(context.Background(), clientOptions)
- if err != nil {
- log.Println(err)
- }
- srcColl := client.Database("mixdata").Collection("qyxy_std")
- dstColl := client.Database("mixdata").Collection("wcc_qyxy_yisi0424")
- // BadgerDB
- dbCache, _ := badger.Open(badger.DefaultOptions("badgerdb"))
- defer dbCache.Close()
- worker := 8
- taskChan := make(chan Company, 10000)
- var wg sync.WaitGroup
- // 启动 workers
- for i := 0; i < worker; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for c := range taskChan {
- processCompany(ctx, c, dbCache, dstColl)
- }
- }()
- }
- // 查询条件
- where := map[string]interface{}{
- "company_type": map[string]interface{}{
- "$ne": "个体工商户",
- },
- }
- cursor, err := srcColl.Find(ctx, where)
- if err != nil {
- log.Println("Find error:", err)
- return
- }
- defer cursor.Close(ctx)
- count := 0
- for cursor.Next(ctx) {
- var c Company
- if err := cursor.Decode(&c); err != nil {
- continue
- }
- count++
- if count%10000 == 0 {
- log.Println("current:", count, c)
- }
- if strings.Contains(c.CompanyStatus, "注销") || strings.Contains(c.CompanyStatus, "吊销") {
- continue
- }
- if c.CreditNo == "" || len(c.AnnualReports) == 0 {
- continue
- }
- if c.CompanyName == "" || strings.Contains(c.CompanyName, "已除名") {
- continue
- }
- if utf8.RuneCountInString(c.CompanyName) < 5 {
- continue
- }
- if c.UseFlag > 0 {
- continue
- }
- if !strings.Contains(c.CompanyName, "公司") {
- continue
- }
- taskChan <- c
- }
- if err := cursor.Err(); err != nil {
- log.Println("Cursor iteration error:", err)
- }
- close(taskChan)
- wg.Wait()
- log.Println("All done.")
- }
- func processCompany(ctx context.Context, c Company, db *badger.DB, dst *mongo.Collection) {
- for _, ar := range c.AnnualReports {
- phone := strings.TrimSpace(ar.CompanyPhone)
- email := strings.TrimSpace(ar.CompanyEmail)
- operator := strings.TrimSpace(ar.OperatorName)
- validFields := make([]string, 0)
- if isValidPhone(phone) {
- validFields = append(validFields, "company_phone")
- }
- if isValidEmail(email) {
- validFields = append(validFields, "company_email")
- }
- if isValidOperator(operator) {
- validFields = append(validFields, "operator_name")
- }
- if len(validFields) < 2 {
- continue
- }
- fields := map[string]string{
- "operator_name": ar.OperatorName,
- "company_phone": ar.CompanyPhone,
- "company_email": ar.CompanyEmail,
- }
- validKeys := make([]string, 0)
- for k, v := range fields {
- if strings.TrimSpace(v) != "" {
- validKeys = append(validKeys, k)
- }
- }
- if len(validKeys) < 2 {
- continue
- }
- key := generateKey(ar.OperatorName, ar.CompanyPhone, ar.CompanyEmail)
- if key == "" {
- continue
- }
- valMap := map[string]string{
- "credit_no": c.CreditNo,
- "company_name": c.CompanyName,
- "company_id": c.CompanyID,
- }
- val, _ := json.Marshal(valMap)
- // 查缓存
- existing := getFromBadger(db, key)
- if existing != nil {
- var e map[string]string
- _ = json.Unmarshal(existing, &e)
- if e["credit_no"] != c.CreditNo {
- match := SuspectMatch{
- CreditNo1: e["credit_no"],
- CreditNo2: c.CreditNo,
- Name1: e["company_name"],
- Name2: c.CompanyName,
- CompanyID1: e["company_id"],
- CompanyID2: c.CompanyID,
- MatchKeys: validKeys,
- MatchValue: getMatchValues(fields, validKeys),
- }
- _, err := dst.InsertOne(ctx, match)
- if err != nil {
- log.Println("Insert error:", err)
- }
- }
- } else {
- putToBadger(db, key, val)
- }
- }
- }
- func generateKey(operator, phone, email string) string {
- items := []string{}
- if strings.TrimSpace(operator) != "" {
- items = append(items, "op:"+operator)
- }
- if strings.TrimSpace(phone) != "" {
- items = append(items, "ph:"+phone)
- }
- if strings.TrimSpace(email) != "" {
- items = append(items, "em:"+email)
- }
- if len(items) < 2 {
- return ""
- }
- return strings.Join(items, "|")
- }
- func getMatchValues(fields map[string]string, keys []string) []string {
- res := []string{}
- for _, k := range keys {
- res = append(res, fields[k])
- }
- return res
- }
- func getFromBadger(db *badger.DB, key string) []byte {
- var val []byte
- err := db.View(func(txn *badger.Txn) error {
- item, err := txn.Get([]byte(key))
- if err == nil {
- val, _ = item.ValueCopy(nil)
- }
- return nil
- })
- if err != nil {
- return nil
- }
- return val
- }
- func putToBadger(db *badger.DB, key string, val []byte) {
- _ = db.Update(func(txn *badger.Txn) error {
- return txn.Set([]byte(key), val)
- })
- }
- func isValidPhone(phone string) bool {
- return phoneRegexp.MatchString(strings.TrimSpace(phone))
- }
- func isValidEmail(email string) bool {
- return emailRegexp.MatchString(strings.TrimSpace(email))
- }
- func isValidOperator(op string) bool {
- op = strings.TrimSpace(op)
- if op == "" || strings.Contains(op, "无") || strings.Contains(op, "*") {
- return false
- }
- return true
- }
- // dealTsGraph 处理疑似数据到图形数据库
- func dealTsGraph() {
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- //
- sess := MgoQy.GetMgoConn()
- defer MgoQy.DestoryMongoConn(sess)
- it := sess.DB("mixdata").C("wcc_qyxy_yisi0424").Find(nil).Sort("_id").Select(nil).Iter()
- jobChan := make(chan InsertSuspectJob, WorkerCount*2)
- var wg sync.WaitGroup
- // 启动工作协程
- for i := 0; i < 5; i++ {
- wg.Add(1)
- go BatchInsertSuspectInvestWork(session, &wg, jobChan)
- }
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, tmp["company_name1"], tmp["company_name2"])
- }
- //
- cname1 := util.ObjToString(tmp["company_name1"])
- cname2 := util.ObjToString(tmp["company_name2"])
- if (strings.Contains(cname1, "营业厅") && strings.Contains(cname2, "营业厅")) || (strings.Contains(cname1, "分公司") || strings.Contains(cname2, "分公司")) {
- continue
- }
- // 判断是否是同样的前缀,只是分公司、营业厅等不同
- if IsSameCompanySubject(cname1, cname2) {
- continue
- }
- //
- job := InsertSuspectJob{
- Relations: SuspectInvest{
- FromCode: util.ObjToString(tmp["company_id1"]),
- ToCode: util.ObjToString(tmp["company_id2"]),
- },
- }
- //拼接 疑似的原因
- result, err := concatMatchPairs(tmp)
- if err != nil {
- log.Println("concatMatchPairs Error:", err, result)
- continue
- }
- if result == "" {
- continue
- }
- job.Relations.Reason = result
- jobChan <- job
- }
- close(jobChan)
- wg.Wait()
- log.Println(" dealTsGraph 完成!")
- }
- // IsSameCompanySubject 判断两个公司名是否主体一致,仅结尾不同
- func IsSameCompanySubject(name1, name2 string) bool {
- name1 = strings.TrimSpace(name1)
- name2 = strings.TrimSpace(name2)
- // 找出最小长度
- minLen := min(utf8.RuneCountInString(name1), utf8.RuneCountInString(name2))
- // 获取最长公共前缀
- commonPrefix := longestCommonPrefix(name1, name2)
- // 阈值判断(比如公共前缀至少占短字符串的 80%)
- if float64(utf8.RuneCountInString(commonPrefix)) >= float64(minLen)*0.6 {
- return true
- //// 且公共前缀包含“有限公司”这种关键词
- //if strings.Contains(commonPrefix, "有限公司") || strings.Contains(commonPrefix, "公司") {
- // return true
- //}
- }
- return false
- }
- // longestCommonPrefix 获取最长公共前缀
- func longestCommonPrefix(s1, s2 string) string {
- var builder strings.Builder
- r1 := []rune(s1)
- r2 := []rune(s2)
- for i := 0; i < min(len(r1), len(r2)); i++ {
- if r1[i] == r2[i] {
- builder.WriteRune(r1[i])
- } else {
- break
- }
- }
- return builder.String()
- }
- // ------------------------------------//
- // 提取 map 中指定键的字符串数组
- func extractStringSlice(tmp map[string]interface{}, key string) ([]string, error) {
- interfaceSlice, ok := tmp[key].([]interface{})
- if !ok {
- return nil, fmt.Errorf("%s is not a slice", key)
- }
- stringSlice := make([]string, 0, len(interfaceSlice))
- for _, item := range interfaceSlice {
- str, ok := item.(string)
- if !ok {
- return nil, fmt.Errorf("%s contains non-string element", key)
- }
- stringSlice = append(stringSlice, str)
- }
- return stringSlice, nil
- }
- // 拼接键值对字符串
- func concatMatchPairs(tmp map[string]interface{}) (string, error) {
- // 提取 match_keys 和 match_values
- keys, err := extractStringSlice(tmp, "match_keys")
- if err != nil {
- return "", fmt.Errorf("failed to get match_keys: %v", err)
- }
- values, err := extractStringSlice(tmp, "match_values")
- if err != nil {
- return "", fmt.Errorf("failed to get match_values: %v", err)
- }
- // 检查长度是否一致
- if len(keys) != len(values) {
- return "", fmt.Errorf("length mismatch: match_keys (%d) vs match_values (%d)", len(keys), len(values))
- }
- // 拼接字符串
- var builder strings.Builder
- for i := 0; i < len(keys); i++ {
- if i > 0 {
- builder.WriteString(", ")
- }
- builder.WriteString(keys[i])
- builder.WriteString(":")
- builder.WriteString(values[i])
- }
- return builder.String(), nil
- }
|