package main import ( "context" "encoding/json" "fmt" "github.com/dgraph-io/badger/v3" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "strings" "sync" "unicode/utf8" ) type Employee struct { EmployeeName string `bson:"employee_name"` Position string `bson:"position"` } type CompanyD struct { CreditNo string `bson:"credit_no"` CompanyID string `bson:"_id"` UseFlag int `bson:"use_flag"` CompanyName string `bson:"company_name"` Employees []Employee `bson:"employees"` CompanyStatus string `bson:"company_status"` } type EmployeeMatch struct { CreditNo1 string `bson:"credit_no_1"` CreditNo2 string `bson:"credit_no_2"` Name1 string `bson:"company_name_1"` Name2 string `bson:"company_name_2"` EmployeeName string `bson:"employee_name"` Position1 string `bson:"position_1"` Position2 string `bson:"position_2"` CompanyID1 string `bson:"company_id1"` CompanyID2 string `bson:"company_id2"` } // dealTsDongShi 处理企业 董事关系 到 MongoDB数据库 func dealTsDongShi() { ctx := context.Background() username := "SJZY_RWbid_ES" password := "SJZY@B4i4D5e6S" hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"} //hosts := []string{"127.0.0.1:27083"} optionsSet := map[string]string{} uri, err := BuildMongoURI(username, password, hosts, optionsSet) if err != nil { panic(err) } clientOptions := options.Client().ApplyURI(uri) //clientOptions.SetReadPreference(readpref.Primary()) //clientOptions.SetDirect(true) // 连接MongoDB client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { log.Println(err) } srcColl := client.Database("mixdata").Collection("qyxy_std") dstColl := client.Database("mixdata").Collection("employee_relation0424") // 打开 BadgerDB db, err := badger.Open(badger.DefaultOptions("wcc_employee_cache")) if err != nil { log.Fatal(err) } defer db.Close() // 设置并发处理 taskChan := make(chan CompanyD, 10000) var wg sync.WaitGroup worker := 10 // 启动 worker 处理任务 for i := 0; i < worker; i++ { wg.Add(1) go func() { defer wg.Done() for company := range taskChan { for _, emp := range company.Employees { name := strings.TrimSpace(emp.EmployeeName) if name == "" || strings.Contains(name, "无") || strings.Contains(name, "*") { continue } key := "employee_" + name currentInfo, _ := json.Marshal(map[string]string{ "credit_no": company.CreditNo, "company_id": company.CompanyID, "company_name": company.CompanyName, "position": emp.Position, }) err := db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) if err == badger.ErrKeyNotFound { return nil } return item.Value(func(val []byte) error { var prev map[string]string json.Unmarshal(val, &prev) if prev["credit_no"] != company.CreditNo { match := EmployeeMatch{ CreditNo1: prev["credit_no"], Name1: prev["company_name"], Position1: prev["position"], CreditNo2: company.CreditNo, Name2: company.CompanyName, Position2: emp.Position, EmployeeName: name, CompanyID1: prev["company_id"], CompanyID2: company.CompanyID, } _, err := dstColl.InsertOne(ctx, match) if err != nil { log.Println("insert error:", err) } } return nil }) }) if err != nil { log.Println("badger read error:", err) } // 更新缓存 _ = db.Update(func(txn *badger.Txn) error { return txn.Set([]byte(key), currentInfo) }) } } }() } // 分发任务 // 查询条件 where := map[string]interface{}{ "company_type": map[string]interface{}{ "$ne": "个体工商户", }, } cursor, _ := srcColl.Find(ctx, where) defer cursor.Close(ctx) count := 0 for cursor.Next(ctx) { count++ var c CompanyD if err := cursor.Decode(&c); err != nil { log.Println("Decode err", err) continue } if count%10000 == 0 { log.Println("current:", count, c) } if c.CreditNo == "" || len(c.Employees) == 0 { continue } if strings.Contains(c.CompanyStatus, "注销") || strings.Contains(c.CompanyStatus, "吊销") { continue } if c.CompanyName == "" || strings.Contains(c.CompanyName, "已除名") { continue } if utf8.RuneCountInString(c.CompanyName) < 5 { continue } if c.UseFlag > 0 { continue } if !strings.Contains(c.CompanyName, "公司") { continue } taskChan <- c } close(taskChan) wg.Wait() log.Println("done.") } // dealDsGraph 处理 董事 高管数据到 graph 图形数据库 func dealDsGraph() { session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() // sess := MgoQy.GetMgoConn() defer MgoQy.DestoryMongoConn(sess) it := sess.DB("mixdata").C("wcc_employee_relation0424").Find(nil).Sort("_id").Select(nil).Iter() jobChan := make(chan ExecutivesInvest, WorkerCount*2) var wg sync.WaitGroup // 启动工作协程 for i := 0; i < 5; i++ { wg.Add(1) go BatchInsertExecutivesInvestWork(session, &wg, jobChan) } count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current:", count, tmp["company_name_1"], tmp["company_name_2"]) } // if util.ObjToString(tmp["company_name_1"]) == "" || util.ObjToString(tmp["company_name_2"]) == "" || util.ObjToString(tmp["employee_name"]) == "" { continue } job := ExecutivesInvest{ FromCode: util.ObjToString(tmp["company_id1"]), ToCode: util.ObjToString(tmp["company_id2"]), Name: util.ObjToString(tmp["employee_name"]), } jobChan <- job } close(jobChan) wg.Wait() log.Println(" dealTsGraph 完成!") } //------------------------------------------// type ExecutivesInvestRelation struct { FromName string ToName string Name string // 边属性 } func (c *NebulaClient) FindExecutivesInvestRelationsByNames(names []string) ([]ExecutivesInvestRelation, error) { if len(names) == 0 { return nil, nil } // 1. 企业名称 -> vid var nameList strings.Builder for i, name := range names { nameList.WriteString(fmt.Sprintf("\"%s\"", name)) if i < len(names)-1 { nameList.WriteString(", ") } } vidQuery := fmt.Sprintf(` USE %s; LOOKUP ON Legal WHERE Legal.name IN [%s] YIELD id(vertex) AS vid, Legal.name AS name `, Table_Space, nameList.String()) resp, err := c.ExecuteWithReconnect(vidQuery) if err != nil { return nil, err } if !resp.IsSucceed() { return nil, fmt.Errorf("vid lookup failed: %s", resp.GetErrorMsg()) } nameToVid := make(map[string]string) vidToName := make(map[string]string) rows := resp.GetRows() for _, row := range rows { vid := row.Values[0].GetSVal() name := row.Values[1].GetSVal() nameToVid[string(name)] = string(vid) vidToName[string(vid)] = string(name) } if len(nameToVid) == 0 { return nil, nil } // 2. 用 vid 查询 ExecutivesInvest 边 var vidList strings.Builder i := 0 for _, vid := range nameToVid { vidList.WriteString(fmt.Sprintf("\"%s\"", vid)) if i < len(nameToVid)-1 { vidList.WriteString(", ") } i++ } edgeQuery := fmt.Sprintf(` USE %s; GO FROM %s OVER ExecutivesInvest YIELD ExecutivesInvest._src AS from_id, ExecutivesInvest._dst AS to_id, ExecutivesInvest.name AS name `, Table_Space, vidList.String()) resp2, err := c.ExecuteWithReconnect(edgeQuery) if err != nil { return nil, err } if !resp2.IsSucceed() { return nil, fmt.Errorf("edge query failed: %s", resp2.GetErrorMsg()) } // 3. 解析查询结果,筛选起点和终点都在输入列表里的边 vidSet := make(map[string]struct{}) for _, vid := range nameToVid { vidSet[vid] = struct{}{} } existingRelations := make(map[string]struct{}) var relations []ExecutivesInvestRelation rows2 := resp2.GetRows() for _, row := range rows2 { fromVid := row.Values[0].GetSVal() toVid := row.Values[1].GetSVal() name := row.Values[2].GetSVal() if _, ok1 := vidSet[string(fromVid)]; ok1 { if _, ok2 := vidSet[string(toVid)]; ok2 { fromName := vidToName[string(fromVid)] toName := vidToName[string(toVid)] key := fromName + "->" + toName if _, exists := existingRelations[key]; exists { continue } existingRelations[key] = struct{}{} relations = append(relations, ExecutivesInvestRelation{ FromName: fromName, ToName: toName, Name: string(name), }) } } } return relations, nil }