123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/dgraph-io/badger/v3"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "strings"
- "sync"
- "unicode/utf8"
- )
- type Employee struct {
- EmployeeName string `bson:"employee_name"`
- Position string `bson:"position"`
- }
- type CompanyD struct {
- CreditNo string `bson:"credit_no"`
- CompanyID string `bson:"_id"`
- UseFlag int `bson:"use_flag"`
- CompanyName string `bson:"company_name"`
- Employees []Employee `bson:"employees"`
- CompanyStatus string `bson:"company_status"`
- }
- type EmployeeMatch struct {
- CreditNo1 string `bson:"credit_no_1"`
- CreditNo2 string `bson:"credit_no_2"`
- Name1 string `bson:"company_name_1"`
- Name2 string `bson:"company_name_2"`
- EmployeeName string `bson:"employee_name"`
- Position1 string `bson:"position_1"`
- Position2 string `bson:"position_2"`
- CompanyID1 string `bson:"company_id1"`
- CompanyID2 string `bson:"company_id2"`
- }
- // dealTsDongShi 处理企业 董事关系 到 MongoDB数据库
- func dealTsDongShi() {
- ctx := context.Background()
- username := "SJZY_RWbid_ES"
- password := "SJZY@B4i4D5e6S"
- hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"}
- //hosts := []string{"127.0.0.1:27083"}
- optionsSet := map[string]string{}
- uri, err := BuildMongoURI(username, password, hosts, optionsSet)
- if err != nil {
- panic(err)
- }
- clientOptions := options.Client().ApplyURI(uri)
- //clientOptions.SetReadPreference(readpref.Primary())
- //clientOptions.SetDirect(true)
- // 连接MongoDB
- client, err := mongo.Connect(context.Background(), clientOptions)
- if err != nil {
- log.Println(err)
- }
- srcColl := client.Database("mixdata").Collection("qyxy_std")
- dstColl := client.Database("mixdata").Collection("employee_relation0424")
- // 打开 BadgerDB
- db, err := badger.Open(badger.DefaultOptions("wcc_employee_cache"))
- if err != nil {
- log.Fatal(err)
- }
- defer db.Close()
- // 设置并发处理
- taskChan := make(chan CompanyD, 10000)
- var wg sync.WaitGroup
- worker := 10
- // 启动 worker 处理任务
- for i := 0; i < worker; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for company := range taskChan {
- for _, emp := range company.Employees {
- name := strings.TrimSpace(emp.EmployeeName)
- if name == "" || strings.Contains(name, "无") || strings.Contains(name, "*") {
- continue
- }
- key := "employee_" + name
- currentInfo, _ := json.Marshal(map[string]string{
- "credit_no": company.CreditNo,
- "company_id": company.CompanyID,
- "company_name": company.CompanyName,
- "position": emp.Position,
- })
- err := db.View(func(txn *badger.Txn) error {
- item, err := txn.Get([]byte(key))
- if err == badger.ErrKeyNotFound {
- return nil
- }
- return item.Value(func(val []byte) error {
- var prev map[string]string
- json.Unmarshal(val, &prev)
- if prev["credit_no"] != company.CreditNo {
- match := EmployeeMatch{
- CreditNo1: prev["credit_no"],
- Name1: prev["company_name"],
- Position1: prev["position"],
- CreditNo2: company.CreditNo,
- Name2: company.CompanyName,
- Position2: emp.Position,
- EmployeeName: name,
- CompanyID1: prev["company_id"],
- CompanyID2: company.CompanyID,
- }
- _, err := dstColl.InsertOne(ctx, match)
- if err != nil {
- log.Println("insert error:", err)
- }
- }
- return nil
- })
- })
- if err != nil {
- log.Println("badger read error:", err)
- }
- // 更新缓存
- _ = db.Update(func(txn *badger.Txn) error {
- return txn.Set([]byte(key), currentInfo)
- })
- }
- }
- }()
- }
- // 分发任务
- // 查询条件
- where := map[string]interface{}{
- "company_type": map[string]interface{}{
- "$ne": "个体工商户",
- },
- }
- cursor, _ := srcColl.Find(ctx, where)
- defer cursor.Close(ctx)
- count := 0
- for cursor.Next(ctx) {
- count++
- var c CompanyD
- if err := cursor.Decode(&c); err != nil {
- log.Println("Decode err", err)
- continue
- }
- if count%10000 == 0 {
- log.Println("current:", count, c)
- }
- if c.CreditNo == "" || len(c.Employees) == 0 {
- continue
- }
- if strings.Contains(c.CompanyStatus, "注销") || strings.Contains(c.CompanyStatus, "吊销") {
- continue
- }
- if c.CompanyName == "" || strings.Contains(c.CompanyName, "已除名") {
- continue
- }
- if utf8.RuneCountInString(c.CompanyName) < 5 {
- continue
- }
- if c.UseFlag > 0 {
- continue
- }
- if !strings.Contains(c.CompanyName, "公司") {
- continue
- }
- taskChan <- c
- }
- close(taskChan)
- wg.Wait()
- log.Println("done.")
- }
- // dealDsGraph 处理 董事 高管数据到 graph 图形数据库
- func dealDsGraph() {
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- //
- sess := MgoQy.GetMgoConn()
- defer MgoQy.DestoryMongoConn(sess)
- it := sess.DB("mixdata").C("wcc_employee_relation0424").Find(nil).Sort("_id").Select(nil).Iter()
- jobChan := make(chan ExecutivesInvest, WorkerCount*2)
- var wg sync.WaitGroup
- // 启动工作协程
- for i := 0; i < 5; i++ {
- wg.Add(1)
- go BatchInsertExecutivesInvestWork(session, &wg, jobChan)
- }
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, tmp["company_name_1"], tmp["company_name_2"])
- }
- //
- if util.ObjToString(tmp["company_name_1"]) == "" || util.ObjToString(tmp["company_name_2"]) == "" || util.ObjToString(tmp["employee_name"]) == "" {
- continue
- }
- job := ExecutivesInvest{
- FromCode: util.ObjToString(tmp["company_id1"]),
- ToCode: util.ObjToString(tmp["company_id2"]),
- Name: util.ObjToString(tmp["employee_name"]),
- }
- jobChan <- job
- }
- close(jobChan)
- wg.Wait()
- log.Println(" dealTsGraph 完成!")
- }
- //------------------------------------------//
- type ExecutivesInvestRelation struct {
- FromName string
- ToName string
- Name string // 边属性
- }
- func (c *NebulaClient) FindExecutivesInvestRelationsByNames(names []string) ([]ExecutivesInvestRelation, error) {
- if len(names) == 0 {
- return nil, nil
- }
- // 1. 企业名称 -> vid
- var nameList strings.Builder
- for i, name := range names {
- nameList.WriteString(fmt.Sprintf("\"%s\"", name))
- if i < len(names)-1 {
- nameList.WriteString(", ")
- }
- }
- vidQuery := fmt.Sprintf(`
- USE %s;
- LOOKUP ON Legal WHERE Legal.name IN [%s] YIELD id(vertex) AS vid, Legal.name AS name
- `, Table_Space, nameList.String())
- resp, err := c.ExecuteWithReconnect(vidQuery)
- if err != nil {
- return nil, err
- }
- if !resp.IsSucceed() {
- return nil, fmt.Errorf("vid lookup failed: %s", resp.GetErrorMsg())
- }
- nameToVid := make(map[string]string)
- vidToName := make(map[string]string)
- rows := resp.GetRows()
- for _, row := range rows {
- vid := row.Values[0].GetSVal()
- name := row.Values[1].GetSVal()
- nameToVid[string(name)] = string(vid)
- vidToName[string(vid)] = string(name)
- }
- if len(nameToVid) == 0 {
- return nil, nil
- }
- // 2. 用 vid 查询 ExecutivesInvest 边
- var vidList strings.Builder
- i := 0
- for _, vid := range nameToVid {
- vidList.WriteString(fmt.Sprintf("\"%s\"", vid))
- if i < len(nameToVid)-1 {
- vidList.WriteString(", ")
- }
- i++
- }
- edgeQuery := fmt.Sprintf(`
- USE %s;
- GO FROM %s OVER ExecutivesInvest
- YIELD
- ExecutivesInvest._src AS from_id,
- ExecutivesInvest._dst AS to_id,
- ExecutivesInvest.name AS name
- `, Table_Space, vidList.String())
- resp2, err := c.ExecuteWithReconnect(edgeQuery)
- if err != nil {
- return nil, err
- }
- if !resp2.IsSucceed() {
- return nil, fmt.Errorf("edge query failed: %s", resp2.GetErrorMsg())
- }
- // 3. 解析查询结果,筛选起点和终点都在输入列表里的边
- vidSet := make(map[string]struct{})
- for _, vid := range nameToVid {
- vidSet[vid] = struct{}{}
- }
- existingRelations := make(map[string]struct{})
- var relations []ExecutivesInvestRelation
- rows2 := resp2.GetRows()
- for _, row := range rows2 {
- fromVid := row.Values[0].GetSVal()
- toVid := row.Values[1].GetSVal()
- name := row.Values[2].GetSVal()
- if _, ok1 := vidSet[string(fromVid)]; ok1 {
- if _, ok2 := vidSet[string(toVid)]; ok2 {
- fromName := vidToName[string(fromVid)]
- toName := vidToName[string(toVid)]
- key := fromName + "->" + toName
- if _, exists := existingRelations[key]; exists {
- continue
- }
- existingRelations[key] = struct{}{}
- relations = append(relations, ExecutivesInvestRelation{
- FromName: fromName,
- ToName: toName,
- Name: string(name),
- })
- }
- }
- }
- return relations, nil
- }
|