123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- nebula "github.com/vesoft-inc/nebula-go/v3"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "strings"
- "sync"
- "time"
- "unicode/utf8"
- )
- func dealCompanyBase22() {
- log.Println("dealCompanyBase", "开始处理数据")
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- defer util.Catch()
- sess := Mgo181.GetMgoConn()
- defer Mgo181.DestoryMongoConn(sess)
- it := sess.DB("mixdata").C("company_base").Find(nil).Sort("_id").Select(nil).Iter()
- //jobChan := make(chan InsertJob, WorkerCount*2)
- var wg sync.WaitGroup
- count := 0
- // 新增一个 job 构建的协程池
- buildChan := make(chan map[string]interface{}, WorkerCount*5) // 用于传递原始 Mongo 数据
- var wgBuild sync.WaitGroup
- for i := 0; i < 2; i++ {
- wgBuild.Add(1)
- go func() {
- defer wgBuild.Done()
- count2 := 0
- for tmp := range buildChan {
- count2++
- if count2%10000 == 0 {
- log.Printf("已处理 %d 条,休息 1 秒...\n", count)
- time.Sleep(time.Second)
- }
- c1 := Legal{
- Name: util.ObjToString(tmp["company_name"]),
- Code: util.ObjToString(tmp["credit_no"]),
- Type: "企业",
- }
- r1, err := InsertCompany(session, c1)
- if err != nil {
- log.Println("InsertCompany", r1, err)
- }
- // 耗时查询移到这里
- rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
- for _, v := range rea {
- d := Legal{
- Name: v.company_name,
- Code: v.credit_no,
- Type: "企业",
- }
- r, err := InsertCompany(session, d)
- if err != nil {
- log.Println("InsertCompany", r, err)
- }
- }
- for _, v := range resb {
- d := Invest{
- FromCode: v.stock_name,
- ToCode: v.company_name,
- Amount: v.stock_amount,
- Ratio: v.stock_rate,
- }
- err := InsertInvestRel(session, d)
- if err != nil {
- log.Println("InsertInvestRel", err, d)
- }
- }
- }
- }()
- }
- //realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, tmp["company_name"], tmp["_id"])
- }
- if util.IntAll(tmp["use_flag"]) > 0 {
- continue
- }
- if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
- continue
- }
- if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
- continue
- }
- // 注销;关闭
- if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
- continue
- }
- if utf8.RuneCountInString(util.ObjToString(tmp["company_name"])) < 5 {
- continue
- }
- buildChan <- tmp // 推送到异步处理构建
- }
- close(buildChan)
- wgBuild.Wait()
- wg.Wait()
- log.Println("完成!")
- }
- func dealCompanyBase() {
- log.Println("dealCompanyBase", "开始处理数据")
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- defer util.Catch()
- sess := Mgo181.GetMgoConn()
- defer Mgo181.DestoryMongoConn(sess)
- it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Iter()
- //jobChan := make(chan InsertJob, WorkerCount*2)
- var wg sync.WaitGroup
- count := 0
- // 新增一个 job 构建的协程池
- buildChan := make(chan map[string]interface{}, WorkerCount*10) // 用于传递原始 Mongo 数据
- jobChan := make(chan InsertJob, WorkerCount*5)
- // 启动工作协程;存储数据
- //for i := 0; i < WorkerCount; i++ {
- // wg.Add(1)
- // go insertWorker(session, &wg, jobChan)
- //}
- //写入图形数据库
- for i := 0; i < WorkerCount; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- // 每个 worker 拿自己的 session
- localSession, err := pool.GetSession(UserName, PassWord)
- if err != nil {
- log.Println("获取 session 失败:", err)
- return
- }
- defer localSession.Release()
- insertWorker2(localSession, jobChan)
- }()
- }
- var wgBuild sync.WaitGroup
- for i := 0; i < WorkerCount; i++ {
- wgBuild.Add(1)
- go func() {
- defer wgBuild.Done()
- for tmp := range buildChan {
- c1 := Legal{
- Id: util.ObjToString(tmp["company_id"]),
- Name: util.ObjToString(tmp["company_name"]),
- Code: util.ObjToString(tmp["credit_no"]),
- Type: "企业",
- }
- if utf8.RuneCountInString(c1.Name) < 5 {
- continue
- }
- job := InsertJob{}
- job.Companies = append(job.Companies, c1)
- // 耗时查询移到这里
- rea, resb := GetInvByLevel(c1.Name, 2, 0, false)
- for _, v := range rea {
- d := Legal{
- Id: v.company_id,
- Name: v.company_name,
- Code: v.credit_no,
- Type: "企业",
- }
- job.Companies = append(job.Companies, d)
- }
- for _, v := range resb {
- d := Invest{
- FromCode: v.stock_id,
- ToCode: v.company_id,
- Amount: v.stock_amount,
- Ratio: v.stock_rate,
- }
- job.Relations = append(job.Relations, d)
- }
- jobChan <- job
- }
- }()
- }
- //realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, tmp["company_name"])
- }
- if util.IntAll(tmp["use_flag"]) > 0 {
- continue
- }
- if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
- continue
- }
- if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
- continue
- }
- // 注销;关闭
- if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
- continue
- }
- buildChan <- tmp // 推送到异步处理构建
- //1、处理点
- //job := InsertJob{}
- //c1 := Legal{
- // Name: util.ObjToString(tmp["company_name"]),
- // Code: util.ObjToString(tmp["credit_no"]),
- // Type: "企业",
- //}
- //if utf8.RuneCountInString(c1.Name) < 5 {
- // continue
- //}
- //job.Companies = append(job.Companies, c1)
- ////2、处理变
- //rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
- //for _, v := range rea {
- // d := Legal{
- // Name: v.company_name,
- // Code: v.credit_no,
- // Type: "企业",
- // }
- // job.Companies = append(job.Companies, d)
- //}
- //
- //for _, v := range resb {
- // d := Invest{
- // FromCode: v.stock_name,
- // ToCode: v.company_name,
- // Amount: v.stock_amount,
- // Ratio: v.stock_rate,
- // }
- // job.Relations = append(job.Relations, d)
- //}
- //
- //jobChan <- job
- }
- close(buildChan)
- wgBuild.Wait()
- close(jobChan)
- wg.Wait()
- log.Println("完成!")
- }
- func batchDealGraph() {
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- client, err := elastic.NewClient(
- elastic.SetURL("http://172.17.4.184:19908"),
- //elastic.SetURL("http://127.0.0.1:19908"),
- elastic.SetBasicAuth("jybid", "Top2023_JEB01i@31"),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- query := elastic.NewBoolQuery().
- //北京,天津,河北,上海,江苏,浙江,安徽
- //Must(elastic.NewTermQuery("area", "北京市")).
- //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
- MustNot(
- elastic.NewTermQuery("company_type", "个体工商户"),
- elastic.NewTermsQuery("company_status", "吊销", "注销"),
- )
- //Must(elastic.NewTermQuery("company_name", "北京剑鱼信息技术有限公司"))
- //Must(elastic.NewTermsQuery("company_area", "河南"))
- ctx := context.Background()
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(10000).
- Sort("_doc", true)
- searchService := client.Scroll("qyxy").
- Size(10000).
- Scroll("5m").
- SearchSource(searchSource)
- jobChan := make(chan InsertJob, WorkerCount*2)
- var wg sync.WaitGroup
- // 启动工作协程
- for i := 0; i < WorkerCount; i++ {
- wg.Add(1)
- go insertWorker(session, &wg, jobChan)
- }
- total := 0
- for {
- res, err := searchService.Do(ctx)
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Println("scroll error:", err)
- break
- }
- fmt.Println("总数是:", res.TotalHits())
- if len(res.Hits.Hits) == 0 {
- break
- }
- job := InsertJob{}
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- if err := json.Unmarshal(hit.Source, &doc); err != nil {
- log.Println("解析失败", err)
- continue
- }
- c1 := Legal{
- Id: util.ObjToString(doc["id"]),
- Name: util.ObjToString(doc["company_name"]),
- Code: util.ObjToString(doc["credit_no"]),
- Type: "企业",
- }
- ////存续、在营、开业、在册
- //if strings.Contains(util.ObjToString(doc["company_status"]), "存续") || strings.Contains(util.ObjToString(doc["company_status"]), "在营") || strings.Contains(util.ObjToString(doc["company_status"]), "在册") || strings.Contains(util.ObjToString(doc["company_status"]), "开业") {
- // c1.State = "有效"
- //} else {
- // c1.State = "无效"
- //}
- if !strings.Contains(c1.Name, "公司") {
- continue
- }
- if c1.Name == "" || c1.Code == "" || strings.Contains(c1.Name, "已除名") {
- continue
- }
- if strings.Contains(util.ObjToString(doc["company_status"]), "吊销") || strings.Contains(util.ObjToString(doc["company_status"]), "注销") {
- continue
- }
- if utf8.RuneCountInString(c1.Name) < 5 {
- continue
- }
- job.Companies = append(job.Companies, c1)
- if partners, ok := doc["partners"].([]interface{}); ok {
- for _, partner := range partners {
- if da, ok := partner.(map[string]interface{}); ok {
- if !strings.Contains(util.ObjToString(da["stock_type"]), "自然人") && !strings.Contains(util.ObjToString(da["stock_type"]), "个人") {
- if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
- continue
- }
- //1
- where1 := map[string]interface{}{
- "company_name": util.ObjToString(da["stock_name"]),
- }
- tmpBase, _ := Mgo181.FindOne("company_base", where1)
- if len(*tmpBase) > 0 {
- c2 := Legal{
- Id: util.ObjToString((*tmpBase)["company_id"]),
- Name: util.ObjToString(da["stock_name"]),
- Code: util.ObjToString(da["identify_no"]),
- Type: "企业",
- }
- //if strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "存续") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在营") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在册") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "开业") {
- // c2.State = "有效"
- //} else {
- // c2.State = "无效"
- //}
- job.Companies = append(job.Companies, c2)
- //2
- where := map[string]interface{}{
- "company_name": util.ObjToString(doc["company_name"]),
- "stock_name": util.ObjToString(da["stock_name"]),
- }
- ddd, _ := Mgo181.FindOne("company_partner", where)
- if len(*ddd) > 0 {
- par := *ddd
- invest := Invest{
- FromCode: c2.Id,
- ToCode: c1.Id,
- Ratio: util.Float64All(par["stock_proportion"]),
- Amount: ParseStockCapital(util.ObjToString(par["stock_capital"])),
- }
- job.Relations = append(job.Relations, invest)
- }
- }
- }
- }
- }
- }
- }
- jobChan <- job
- total += len(res.Hits.Hits)
- log.Println("处理总量:", total)
- }
- close(jobChan)
- wg.Wait()
- log.Println("完成!")
- }
- func insertWorker2(session *nebula.Session, jobs <-chan InsertJob) {
- for job := range jobs {
- // 分批插入公司
- for i := 0; i < len(job.Companies); i += BatchSize {
- end := i + BatchSize
- if end > len(job.Companies) {
- end = len(job.Companies)
- }
- BatchInsertCompanies(session, job.Companies[i:end])
- //time.Sleep(time.Second * 1)
- }
- // 分批插入投资关系
- for i := 0; i < len(job.Relations); i += BatchSize {
- end := i + BatchSize
- if end > len(job.Relations) {
- end = len(job.Relations)
- }
- BatchInsertInvestRels(session, job.Relations[i:end])
- //time.Sleep(time.Second * 1)
- }
- }
- }
- func insertWorker(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan InsertJob) {
- defer wg.Done()
- for job := range jobs {
- // 分批插入公司
- for i := 0; i < len(job.Companies); i += BatchSize {
- end := i + BatchSize
- if end > len(job.Companies) {
- end = len(job.Companies)
- }
- BatchInsertCompanies(session, job.Companies[i:end])
- }
- // 分批插入投资关系
- for i := 0; i < len(job.Relations); i += BatchSize {
- end := i + BatchSize
- if end > len(job.Relations) {
- end = len(job.Relations)
- }
- BatchInsertInvestRels(session, job.Relations[i:end])
- }
- }
- }
- func BatchInsertCompanies(session *nebula.Session, companies []Legal) {
- if len(companies) == 0 {
- return
- }
- var sb strings.Builder
- sb.WriteString("USE " + Table_Space + "; ")
- for _, c := range companies {
- sb.WriteString(fmt.Sprintf(`INSERT VERTEX Legal(name, code, type, state) VALUES "%s":("%s", "%s", "%s", "%s");`, c.Id, c.Name, c.Code, c.Type, c.State))
- }
- _, err := session.Execute(sb.String())
- if err != nil {
- log.Println("批量插入公司失败:", err)
- }
- }
- func BatchInsertInvestRels(session *nebula.Session, rels []Invest) {
- if len(rels) == 0 {
- return
- }
- var sb strings.Builder
- sb.WriteString("USE " + Table_Space + "; ")
- for _, r := range rels {
- sb.WriteString(fmt.Sprintf(`INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f);`, r.FromCode, r.ToCode, r.Amount, r.Ratio))
- }
- _, err := session.Execute(sb.String())
- if err != nil {
- log.Println("批量插入投资关系失败:", err)
- }
- }
- type InvestRelationResult struct {
- Related bool
- Paths []map[string]string
- CommonNodes []CommonNodeInfo
- }
- type CommonNodeInfo struct {
- VID string
- Name string
- }
- func CheckInvestRelationWithIntersection(session *nebula.Session, names []string, depth int) (bool, []map[string]string, []string, error) {
- if len(names) == 0 || depth <= 0 {
- return false, nil, nil, fmt.Errorf("invalid input")
- }
- // Step 1: 获取所有企业的 VID
- vids := make([]string, 0)
- vidToName := make(map[string]string)
- inputVIDSet := make(map[string]bool)
- for _, name := range names {
- query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
- resp, err := session.Execute(query)
- if err != nil || !resp.IsSucceed() {
- log.Printf("lookup failed for name %s: %v", name, err)
- continue
- }
- for _, row := range resp.GetRows() {
- vid := string(row.Values[0].GetSVal())
- vids = append(vids, fmt.Sprintf(`"%s"`, vid))
- vidToName[vid] = name
- inputVIDSet[vid] = true
- }
- }
- if len(vids) < 2 {
- return false, nil, nil, nil // 不足两个公司参与判断
- }
- // Step 2: 查找路径(双向 Invest)
- fromClause := strings.Join(vids, ",")
- query := fmt.Sprintf(`
- GO FROM %s OVER Invest BIDIRECT UPTO %d STEPS
- YIELD src(edge) AS from, dst(edge) AS to
- `, fromClause, depth)
- resp, err := session.Execute(query)
- if err != nil || !resp.IsSucceed() {
- return false, nil, nil, fmt.Errorf("GO query failed: %v", err)
- }
- // Step 3: 统计路径和交集节点
- relationPaths := make([]map[string]string, 0)
- nodeToSources := make(map[string]map[string]bool) // key: node, value: set of inputVIDs
- for _, row := range resp.GetRows() {
- from := string(row.Values[0].GetSVal())
- to := string(row.Values[1].GetSVal())
- // 记录路径
- relationPaths = append(relationPaths, map[string]string{
- "from": from,
- "to": to,
- })
- // 记录 from 节点来源
- if !inputVIDSet[from] {
- if nodeToSources[from] == nil {
- nodeToSources[from] = make(map[string]bool)
- }
- for _, vid := range vids {
- if from == strings.Trim(vid, `"`) {
- continue
- }
- if strings.Contains(query, vid) {
- nodeToSources[from][strings.Trim(vid, `"`)] = true
- }
- }
- }
- // 记录 to 节点来源
- if !inputVIDSet[to] {
- if nodeToSources[to] == nil {
- nodeToSources[to] = make(map[string]bool)
- }
- for _, vid := range vids {
- if to == strings.Trim(vid, `"`) {
- continue
- }
- if strings.Contains(query, vid) {
- nodeToSources[to][strings.Trim(vid, `"`)] = true
- }
- }
- }
- }
- // Step 4: 找出出现在多个输入公司路径中的中间节点(交集)
- commonNodeVIDs := make([]string, 0)
- for node, sourceSet := range nodeToSources {
- if len(sourceSet) >= 2 {
- commonNodeVIDs = append(commonNodeVIDs, node)
- }
- }
- // Step 5: 查名称
- intersectionNames := make([]string, 0)
- if len(commonNodeVIDs) > 0 {
- query := fmt.Sprintf(`FETCH PROP ON Legal %s YIELD Legal.name`, strings.Join(wrapInQuotes(commonNodeVIDs), ","))
- resp, err := session.Execute(query)
- if err == nil && resp.IsSucceed() {
- for _, row := range resp.GetRows() {
- name := string(row.Values[1].GetSVal())
- intersectionNames = append(intersectionNames, name)
- }
- }
- }
- found := len(intersectionNames) > 0
- return found, relationPaths, intersectionNames, nil
- }
- func wrapInQuotes(ids []string) []string {
- result := make([]string, len(ids))
- for i, id := range ids {
- result[i] = fmt.Sprintf(`"%s"`, id)
- }
- return result
- }
- func CheckInvestRelation1(session *nebula.Session, names []string, depth int) (bool, []map[string]string, error) {
- if len(names) == 0 || depth <= 0 {
- return false, nil, fmt.Errorf("invalid input")
- }
- // Step 1: 获取所有企业的 VID
- vids := make([]string, 0)
- nameSet := make(map[string]bool)
- for _, name := range names {
- query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
- resp, err := session.Execute(query)
- if err != nil || !resp.IsSucceed() {
- log.Printf("lookup failed for name %s: %v", name, err)
- continue
- }
- for _, row := range resp.GetRows() {
- //vid := row.Values[0].GetSVal()
- vid := string(row.Values[0].GetSVal())
- vids = append(vids, fmt.Sprintf(`"%s"`, vid))
- nameSet[vid] = true
- }
- }
- if len(vids) == 0 {
- return false, nil, nil // 没有查出任何节点
- }
- // Step 2: 构造 GO 查询路径
- fromClause := strings.Join(vids, ",")
- query := fmt.Sprintf(`
- GO FROM %s OVER Invest UPTO %d STEPS
- YIELD src(edge) AS from, dst(edge) AS to
- `, fromClause, depth)
- resp, err := session.Execute(query)
- if err != nil {
- return false, nil, fmt.Errorf("GO query failed: %v", err)
- }
- if !resp.IsSucceed() {
- return false, nil, fmt.Errorf("Nebula error: %s", resp.GetErrorMsg())
- }
- // Step 3: 分析路径结果
- resultPaths := make([]map[string]string, 0)
- found := false
- for _, row := range resp.GetRows() {
- from := string(row.Values[0].GetSVal())
- to := string(row.Values[1].GetSVal())
- if nameSet[from] && nameSet[to] && from != to {
- found = true
- }
- resultPaths = append(resultPaths, map[string]string{
- "from": from,
- "to": to,
- })
- }
- return found, resultPaths, nil
- }
- func FindInvestmentRelations() {
- }
- //type PathRelation struct {
- // Companies []string
- // Paths []string
- //}
- //
- //func CheckLegalRelationsGraph(session *nebula.Session, names []string, deep int) (*PathRelation, error) {
- // // 查询 name -> vid 映射
- // nameToVid := make(map[string]string)
- // vidToName := make(map[string]string)
- // for _, name := range names {
- // vid, err := getVidByName(session, name)
- // if err != nil {
- // log.Printf("获取 %s 的 VID 失败: %v", name, err)
- // continue
- // }
- // nameToVid[name] = vid
- // vidToName[vid] = name
- // }
- //
- // allPaths := [][]string{}
- // checked := make(map[string]bool)
- //
- // // 遍历所有组合
- // for i := 0; i < len(names); i++ {
- // for j := i + 1; j < len(names); j++ {
- // a, b := names[i], names[j]
- // vidA, okA := nameToVid[a]
- // vidB, okB := nameToVid[b]
- // if !okA || !okB {
- // continue
- // }
- // key := vidA + "|" + vidB
- // if checked[key] {
- // continue
- // }
- // checked[key] = true
- //
- // if pathAB, _ := findPath(session, vidA, vidB, deep); len(pathAB) > 0 {
- // allPaths = append(allPaths, pathAB)
- // }
- // if pathBA, _ := findPath(session, vidB, vidA, deep); len(pathBA) > 0 {
- // allPaths = append(allPaths, pathBA)
- // }
- //
- // // 共同上级路径
- // common, commonPaths := checkCommonAncestor(session, vidA, vidB, deep)
- // if common {
- // allPaths = append(allPaths, commonPaths)
- // }
- // }
- // }
- //
- // // 1. 收集所有涉及的 VID
- // vidSet := make(map[string]struct{})
- // for _, path := range allPaths {
- // for _, vid := range path {
- // vidSet[vid] = struct{}{}
- // }
- // }
- //
- // // 2. 获取所有 VID 的公司名
- // for vid := range vidSet {
- // if _, ok := vidToName[vid]; ok {
- // continue
- // }
- // query := fmt.Sprintf(`FETCH PROP ON Legal "%s" YIELD Legal.name`, vid)
- // resp, err := session.Execute(query)
- // if err != nil || resp.IsEmpty() {
- // continue
- // }
- // rows := resp.GetRows()
- // if len(rows) > 0 && len(rows[0].Values) > 0 && rows[0].Values[0].SVal != nil {
- // vidToName[vid] = string(rows[0].Values[0].SVal)
- // }
- // }
- //
- // // 3. 清洗路径并格式化输出
- // companySet := make(map[string]struct{})
- // result := &PathRelation{
- // Companies: []string{},
- // Paths: []string{},
- // }
- //
- // for _, path := range allPaths {
- // namesPath := []string{}
- // last := ""
- // for _, vid := range path {
- // name, ok := vidToName[vid]
- // if !ok {
- // continue
- // }
- // if name == last {
- // continue // 去除重复节点
- // }
- // namesPath = append(namesPath, name)
- // last = name
- // companySet[name] = struct{}{}
- // }
- // if len(namesPath) >= 2 {
- // result.Paths = append(result.Paths, strings.Join(namesPath, "->"))
- // }
- // }
- //
- // for name := range companySet {
- // result.Companies = append(result.Companies, name)
- // }
- // sort.Strings(result.Companies)
- // return result, nil
- //}
- //
- //func checkCommonAncestor(session *nebula.Session, aVid, bVid string, deep int) (bool, []string) {
- // query := fmt.Sprintf(`
- // (
- // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
- // )
- // INTERSECT
- // (
- // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
- // );
- // `, deep, aVid, deep, bVid)
- //
- // resp, err := session.Execute(query)
- // if err != nil {
- // return false, nil
- // }
- // ancestors, err := getFirstColumnStrings(resp)
- // if err != nil || len(ancestors) == 0 {
- // return false, nil
- // }
- //
- // // 只返回第一个共同祖先的简单路径:a->ancestor->b
- // return true, []string{aVid, ancestors[0], bVid}
- //}
- //
- //func findPath(session *nebula.Session, fromVid, toVid string, maxStep int) ([]string, error) {
- // query := fmt.Sprintf(`FIND ALL PATH FROM "%s" TO "%s" OVER Invest UPTO %d STEPS YIELD path as p`, fromVid, toVid, maxStep)
- // resp, err := session.Execute(query)
- // if err != nil {
- // return nil, err
- // }
- // return getFirstColumnStrings(resp)
- //}
- //
- //func getVidByName(session *nebula.Session, name string) (string, error) {
- // query := fmt.Sprintf(`
- //USE `+Table_Space+`;
- //LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
- // resp, err := session.Execute(query)
- // if err != nil {
- // return "", err
- // }
- //
- // values, err := getFirstColumnStrings(resp)
- // if err != nil || len(values) == 0 {
- // return "", fmt.Errorf("未找到公司: %s", name)
- // }
- // return values[0], nil
- //}
- //
- //func getFirstColumnStrings(resp *nebula.ResultSet) ([]string, error) {
- // if resp == nil {
- // return nil, fmt.Errorf("result set is nil")
- // }
- //
- // var values []string
- // for _, row := range resp.GetRows() {
- // if len(row.Values) == 0 {
- // continue
- // }
- // val := row.Values[0]
- // switch {
- // case val.SVal != nil:
- // values = append(values, string(val.SVal))
- // case val.IVal != nil:
- // values = append(values, fmt.Sprintf("%d", *val.IVal))
- // case val.BVal != nil:
- // values = append(values, fmt.Sprintf("%v", *val.BVal))
- // default:
- // log.Printf("未知类型值: %+v", val)
- // }
- // }
- // return values, nil
- //}
|