|
@@ -0,0 +1,884 @@
|
|
|
|
+package main
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "context"
|
|
|
|
+ "encoding/json"
|
|
|
|
+ "fmt"
|
|
|
|
+ "github.com/olivere/elastic/v7"
|
|
|
|
+ nebula "github.com/vesoft-inc/nebula-go/v3"
|
|
|
|
+ "io"
|
|
|
|
+ util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
|
+ "log"
|
|
|
|
+ "strings"
|
|
|
|
+ "sync"
|
|
|
|
+ "time"
|
|
|
|
+ "unicode/utf8"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+func dealCompanyBase22() {
|
|
|
|
+ log.Println("dealCompanyBase", "开始处理数据")
|
|
|
|
+ session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Fatalf("Failed to connect to Nebula Graph: %v", err)
|
|
|
|
+ }
|
|
|
|
+ defer pool.Close()
|
|
|
|
+ defer session.Release()
|
|
|
|
+ defer util.Catch()
|
|
|
|
+ sess := Mgo181.GetMgoConn()
|
|
|
|
+ defer Mgo181.DestoryMongoConn(sess)
|
|
|
|
+ it := sess.DB("mixdata").C("company_base").Find(nil).Sort("_id").Select(nil).Iter()
|
|
|
|
+
|
|
|
|
+ //jobChan := make(chan InsertJob, WorkerCount*2)
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
+ count := 0
|
|
|
|
+
|
|
|
|
+ // 新增一个 job 构建的协程池
|
|
|
|
+ buildChan := make(chan map[string]interface{}, WorkerCount*5) // 用于传递原始 Mongo 数据
|
|
|
|
+ var wgBuild sync.WaitGroup
|
|
|
|
+ for i := 0; i < 2; i++ {
|
|
|
|
+ wgBuild.Add(1)
|
|
|
|
+ go func() {
|
|
|
|
+ defer wgBuild.Done()
|
|
|
|
+ count2 := 0
|
|
|
|
+ for tmp := range buildChan {
|
|
|
|
+ count2++
|
|
|
|
+ if count2%10000 == 0 {
|
|
|
|
+ log.Printf("已处理 %d 条,休息 1 秒...\n", count)
|
|
|
|
+ time.Sleep(time.Second)
|
|
|
|
+ }
|
|
|
|
+ c1 := Legal{
|
|
|
|
+ Name: util.ObjToString(tmp["company_name"]),
|
|
|
|
+ Code: util.ObjToString(tmp["credit_no"]),
|
|
|
|
+ Type: "企业",
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ r1, err := InsertCompany(session, c1)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("InsertCompany", r1, err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 耗时查询移到这里
|
|
|
|
+ rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
|
|
|
|
+ for _, v := range rea {
|
|
|
|
+ d := Legal{
|
|
|
|
+ Name: v.company_name,
|
|
|
|
+ Code: v.credit_no,
|
|
|
|
+ Type: "企业",
|
|
|
|
+ }
|
|
|
|
+ r, err := InsertCompany(session, d)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("InsertCompany", r, err)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _, v := range resb {
|
|
|
|
+ d := Invest{
|
|
|
|
+ FromCode: v.stock_name,
|
|
|
|
+ ToCode: v.company_name,
|
|
|
|
+ Amount: v.stock_amount,
|
|
|
|
+ Ratio: v.stock_rate,
|
|
|
|
+ }
|
|
|
|
+ err := InsertInvestRel(session, d)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("InsertInvestRel", err, d)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //realNum := 0
|
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
|
+ if count%10000 == 0 {
|
|
|
|
+ log.Println("current:", count, tmp["company_name"], tmp["_id"])
|
|
|
|
+ }
|
|
|
|
+ if util.IntAll(tmp["use_flag"]) > 0 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ // 注销;关闭
|
|
|
|
+ if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if utf8.RuneCountInString(util.ObjToString(tmp["company_name"])) < 5 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ buildChan <- tmp // 推送到异步处理构建
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ close(buildChan)
|
|
|
|
+ wgBuild.Wait()
|
|
|
|
+ wg.Wait()
|
|
|
|
+ log.Println("完成!")
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func dealCompanyBase() {
|
|
|
|
+ log.Println("dealCompanyBase", "开始处理数据")
|
|
|
|
+ session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Fatalf("Failed to connect to Nebula Graph: %v", err)
|
|
|
|
+ }
|
|
|
|
+ defer pool.Close()
|
|
|
|
+ defer session.Release()
|
|
|
|
+ defer util.Catch()
|
|
|
|
+ sess := Mgo181.GetMgoConn()
|
|
|
|
+ defer Mgo181.DestoryMongoConn(sess)
|
|
|
|
+ it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Iter()
|
|
|
|
+
|
|
|
|
+ //jobChan := make(chan InsertJob, WorkerCount*2)
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
+ count := 0
|
|
|
|
+
|
|
|
|
+ // 新增一个 job 构建的协程池
|
|
|
|
+ buildChan := make(chan map[string]interface{}, WorkerCount*10) // 用于传递原始 Mongo 数据
|
|
|
|
+ jobChan := make(chan InsertJob, WorkerCount*5)
|
|
|
|
+
|
|
|
|
+ // 启动工作协程;存储数据
|
|
|
|
+ //for i := 0; i < WorkerCount; i++ {
|
|
|
|
+ // wg.Add(1)
|
|
|
|
+ // go insertWorker(session, &wg, jobChan)
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ //写入图形数据库
|
|
|
|
+ for i := 0; i < WorkerCount; i++ {
|
|
|
|
+ wg.Add(1)
|
|
|
|
+ go func() {
|
|
|
|
+ defer wg.Done()
|
|
|
|
+ // 每个 worker 拿自己的 session
|
|
|
|
+ localSession, err := pool.GetSession(UserName, PassWord)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("获取 session 失败:", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ defer localSession.Release()
|
|
|
|
+ insertWorker2(localSession, jobChan)
|
|
|
|
+ }()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ var wgBuild sync.WaitGroup
|
|
|
|
+ for i := 0; i < WorkerCount; i++ {
|
|
|
|
+ wgBuild.Add(1)
|
|
|
|
+ go func() {
|
|
|
|
+ defer wgBuild.Done()
|
|
|
|
+ for tmp := range buildChan {
|
|
|
|
+ c1 := Legal{
|
|
|
|
+ Id: util.ObjToString(tmp["company_id"]),
|
|
|
|
+ Name: util.ObjToString(tmp["company_name"]),
|
|
|
|
+ Code: util.ObjToString(tmp["credit_no"]),
|
|
|
|
+ Type: "企业",
|
|
|
|
+ }
|
|
|
|
+ if utf8.RuneCountInString(c1.Name) < 5 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ job := InsertJob{}
|
|
|
|
+ job.Companies = append(job.Companies, c1)
|
|
|
|
+
|
|
|
|
+ // 耗时查询移到这里
|
|
|
|
+ rea, resb := GetInvByLevel(c1.Name, 2, 0, false)
|
|
|
|
+ for _, v := range rea {
|
|
|
|
+ d := Legal{
|
|
|
|
+ Id: v.company_id,
|
|
|
|
+ Name: v.company_name,
|
|
|
|
+ Code: v.credit_no,
|
|
|
|
+ Type: "企业",
|
|
|
|
+ }
|
|
|
|
+ job.Companies = append(job.Companies, d)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _, v := range resb {
|
|
|
|
+ d := Invest{
|
|
|
|
+ FromCode: v.stock_id,
|
|
|
|
+ ToCode: v.company_id,
|
|
|
|
+ Amount: v.stock_amount,
|
|
|
|
+ Ratio: v.stock_rate,
|
|
|
|
+ }
|
|
|
|
+ job.Relations = append(job.Relations, d)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ jobChan <- job
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //realNum := 0
|
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
|
+ if count%10000 == 0 {
|
|
|
|
+ log.Println("current:", count, tmp["company_name"])
|
|
|
|
+ }
|
|
|
|
+ if util.IntAll(tmp["use_flag"]) > 0 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ // 注销;关闭
|
|
|
|
+ if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ buildChan <- tmp // 推送到异步处理构建
|
|
|
|
+
|
|
|
|
+ //1、处理点
|
|
|
|
+ //job := InsertJob{}
|
|
|
|
+ //c1 := Legal{
|
|
|
|
+ // Name: util.ObjToString(tmp["company_name"]),
|
|
|
|
+ // Code: util.ObjToString(tmp["credit_no"]),
|
|
|
|
+ // Type: "企业",
|
|
|
|
+ //}
|
|
|
|
+ //if utf8.RuneCountInString(c1.Name) < 5 {
|
|
|
|
+ // continue
|
|
|
|
+ //}
|
|
|
|
+ //job.Companies = append(job.Companies, c1)
|
|
|
|
+ ////2、处理变
|
|
|
|
+ //rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
|
|
|
|
+ //for _, v := range rea {
|
|
|
|
+ // d := Legal{
|
|
|
|
+ // Name: v.company_name,
|
|
|
|
+ // Code: v.credit_no,
|
|
|
|
+ // Type: "企业",
|
|
|
|
+ // }
|
|
|
|
+ // job.Companies = append(job.Companies, d)
|
|
|
|
+ //}
|
|
|
|
+ //
|
|
|
|
+ //for _, v := range resb {
|
|
|
|
+ // d := Invest{
|
|
|
|
+ // FromCode: v.stock_name,
|
|
|
|
+ // ToCode: v.company_name,
|
|
|
|
+ // Amount: v.stock_amount,
|
|
|
|
+ // Ratio: v.stock_rate,
|
|
|
|
+ // }
|
|
|
|
+ // job.Relations = append(job.Relations, d)
|
|
|
|
+ //}
|
|
|
|
+ //
|
|
|
|
+ //jobChan <- job
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ close(buildChan)
|
|
|
|
+ wgBuild.Wait()
|
|
|
|
+ close(jobChan)
|
|
|
|
+ wg.Wait()
|
|
|
|
+ log.Println("完成!")
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func batchDealGraph() {
|
|
|
|
+ session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Fatalf("Failed to connect to Nebula Graph: %v", err)
|
|
|
|
+ }
|
|
|
|
+ defer pool.Close()
|
|
|
|
+ defer session.Release()
|
|
|
|
+
|
|
|
|
+ client, err := elastic.NewClient(
|
|
|
|
+ elastic.SetURL("http://172.17.4.184:19908"),
|
|
|
|
+ //elastic.SetURL("http://127.0.0.1:19908"),
|
|
|
|
+ elastic.SetBasicAuth("jybid", "Top2023_JEB01i@31"),
|
|
|
|
+ elastic.SetSniff(false),
|
|
|
|
+ )
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
|
+ //北京,天津,河北,上海,江苏,浙江,安徽
|
|
|
|
+ //Must(elastic.NewTermQuery("area", "北京市")).
|
|
|
|
+ //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
|
|
|
|
+ MustNot(
|
|
|
|
+ elastic.NewTermQuery("company_type", "个体工商户"),
|
|
|
|
+ elastic.NewTermsQuery("company_status", "吊销", "注销"),
|
|
|
|
+ )
|
|
|
|
+ //Must(elastic.NewTermQuery("company_name", "北京剑鱼信息技术有限公司"))
|
|
|
|
+ //Must(elastic.NewTermsQuery("company_area", "河南"))
|
|
|
|
+
|
|
|
|
+ ctx := context.Background()
|
|
|
|
+ searchSource := elastic.NewSearchSource().
|
|
|
|
+ Query(query).
|
|
|
|
+ Size(10000).
|
|
|
|
+ Sort("_doc", true)
|
|
|
|
+
|
|
|
|
+ searchService := client.Scroll("qyxy").
|
|
|
|
+ Size(10000).
|
|
|
|
+ Scroll("5m").
|
|
|
|
+ SearchSource(searchSource)
|
|
|
|
+
|
|
|
|
+ jobChan := make(chan InsertJob, WorkerCount*2)
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
+
|
|
|
|
+ // 启动工作协程
|
|
|
|
+ for i := 0; i < WorkerCount; i++ {
|
|
|
|
+ wg.Add(1)
|
|
|
|
+ go insertWorker(session, &wg, jobChan)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ total := 0
|
|
|
|
+ for {
|
|
|
|
+ res, err := searchService.Do(ctx)
|
|
|
|
+ if err == io.EOF {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("scroll error:", err)
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ fmt.Println("总数是:", res.TotalHits())
|
|
|
|
+ if len(res.Hits.Hits) == 0 {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ job := InsertJob{}
|
|
|
|
+ for _, hit := range res.Hits.Hits {
|
|
|
|
+ var doc map[string]interface{}
|
|
|
|
+ if err := json.Unmarshal(hit.Source, &doc); err != nil {
|
|
|
|
+ log.Println("解析失败", err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ c1 := Legal{
|
|
|
|
+ Id: util.ObjToString(doc["id"]),
|
|
|
|
+ Name: util.ObjToString(doc["company_name"]),
|
|
|
|
+ Code: util.ObjToString(doc["credit_no"]),
|
|
|
|
+ Type: "企业",
|
|
|
|
+ }
|
|
|
|
+ ////存续、在营、开业、在册
|
|
|
|
+ //if strings.Contains(util.ObjToString(doc["company_status"]), "存续") || strings.Contains(util.ObjToString(doc["company_status"]), "在营") || strings.Contains(util.ObjToString(doc["company_status"]), "在册") || strings.Contains(util.ObjToString(doc["company_status"]), "开业") {
|
|
|
|
+ // c1.State = "有效"
|
|
|
|
+ //} else {
|
|
|
|
+ // c1.State = "无效"
|
|
|
|
+ //}
|
|
|
|
+ if !strings.Contains(c1.Name, "公司") {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if c1.Name == "" || c1.Code == "" || strings.Contains(c1.Name, "已除名") {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if strings.Contains(util.ObjToString(doc["company_status"]), "吊销") || strings.Contains(util.ObjToString(doc["company_status"]), "注销") {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if utf8.RuneCountInString(c1.Name) < 5 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ job.Companies = append(job.Companies, c1)
|
|
|
|
+
|
|
|
|
+ if partners, ok := doc["partners"].([]interface{}); ok {
|
|
|
|
+ for _, partner := range partners {
|
|
|
|
+ if da, ok := partner.(map[string]interface{}); ok {
|
|
|
|
+ if !strings.Contains(util.ObjToString(da["stock_type"]), "自然人") && !strings.Contains(util.ObjToString(da["stock_type"]), "个人") {
|
|
|
|
+ if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //1
|
|
|
|
+ where1 := map[string]interface{}{
|
|
|
|
+ "company_name": util.ObjToString(da["stock_name"]),
|
|
|
|
+ }
|
|
|
|
+ tmpBase, _ := Mgo181.FindOne("company_base", where1)
|
|
|
|
+ if len(*tmpBase) > 0 {
|
|
|
|
+ c2 := Legal{
|
|
|
|
+ Id: util.ObjToString((*tmpBase)["company_id"]),
|
|
|
|
+ Name: util.ObjToString(da["stock_name"]),
|
|
|
|
+ Code: util.ObjToString(da["identify_no"]),
|
|
|
|
+ Type: "企业",
|
|
|
|
+ }
|
|
|
|
+ //if strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "存续") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在营") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在册") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "开业") {
|
|
|
|
+ // c2.State = "有效"
|
|
|
|
+ //} else {
|
|
|
|
+ // c2.State = "无效"
|
|
|
|
+ //}
|
|
|
|
+ job.Companies = append(job.Companies, c2)
|
|
|
|
+ //2
|
|
|
|
+ where := map[string]interface{}{
|
|
|
|
+ "company_name": util.ObjToString(doc["company_name"]),
|
|
|
|
+ "stock_name": util.ObjToString(da["stock_name"]),
|
|
|
|
+ }
|
|
|
|
+ ddd, _ := Mgo181.FindOne("company_partner", where)
|
|
|
|
+ if len(*ddd) > 0 {
|
|
|
|
+ par := *ddd
|
|
|
|
+ invest := Invest{
|
|
|
|
+ FromCode: c2.Id,
|
|
|
|
+ ToCode: c1.Id,
|
|
|
|
+ Ratio: util.Float64All(par["stock_proportion"]),
|
|
|
|
+ Amount: ParseStockCapital(util.ObjToString(par["stock_capital"])),
|
|
|
|
+ }
|
|
|
|
+ job.Relations = append(job.Relations, invest)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ jobChan <- job
|
|
|
|
+ total += len(res.Hits.Hits)
|
|
|
|
+ log.Println("处理总量:", total)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ close(jobChan)
|
|
|
|
+ wg.Wait()
|
|
|
|
+ log.Println("完成!")
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func insertWorker2(session *nebula.Session, jobs <-chan InsertJob) {
|
|
|
|
+ for job := range jobs {
|
|
|
|
+ // 分批插入公司
|
|
|
|
+ for i := 0; i < len(job.Companies); i += BatchSize {
|
|
|
|
+ end := i + BatchSize
|
|
|
|
+ if end > len(job.Companies) {
|
|
|
|
+ end = len(job.Companies)
|
|
|
|
+ }
|
|
|
|
+ BatchInsertCompanies(session, job.Companies[i:end])
|
|
|
|
+ //time.Sleep(time.Second * 1)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 分批插入投资关系
|
|
|
|
+ for i := 0; i < len(job.Relations); i += BatchSize {
|
|
|
|
+ end := i + BatchSize
|
|
|
|
+ if end > len(job.Relations) {
|
|
|
|
+ end = len(job.Relations)
|
|
|
|
+ }
|
|
|
|
+ BatchInsertInvestRels(session, job.Relations[i:end])
|
|
|
|
+ //time.Sleep(time.Second * 1)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func insertWorker(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan InsertJob) {
|
|
|
|
+ defer wg.Done()
|
|
|
|
+ for job := range jobs {
|
|
|
|
+ // 分批插入公司
|
|
|
|
+ for i := 0; i < len(job.Companies); i += BatchSize {
|
|
|
|
+ end := i + BatchSize
|
|
|
|
+ if end > len(job.Companies) {
|
|
|
|
+ end = len(job.Companies)
|
|
|
|
+ }
|
|
|
|
+ BatchInsertCompanies(session, job.Companies[i:end])
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 分批插入投资关系
|
|
|
|
+ for i := 0; i < len(job.Relations); i += BatchSize {
|
|
|
|
+ end := i + BatchSize
|
|
|
|
+ if end > len(job.Relations) {
|
|
|
|
+ end = len(job.Relations)
|
|
|
|
+ }
|
|
|
|
+ BatchInsertInvestRels(session, job.Relations[i:end])
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func BatchInsertCompanies(session *nebula.Session, companies []Legal) {
|
|
|
|
+ if len(companies) == 0 {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ var sb strings.Builder
|
|
|
|
+ sb.WriteString("USE " + Table_Space + "; ")
|
|
|
|
+ for _, c := range companies {
|
|
|
|
+ sb.WriteString(fmt.Sprintf(`INSERT VERTEX Legal(name, code, type, state) VALUES "%s":("%s", "%s", "%s", "%s");`, c.Id, c.Name, c.Code, c.Type, c.State))
|
|
|
|
+ }
|
|
|
|
+ _, err := session.Execute(sb.String())
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("批量插入公司失败:", err)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func BatchInsertInvestRels(session *nebula.Session, rels []Invest) {
|
|
|
|
+ if len(rels) == 0 {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ var sb strings.Builder
|
|
|
|
+ sb.WriteString("USE " + Table_Space + "; ")
|
|
|
|
+ for _, r := range rels {
|
|
|
|
+ sb.WriteString(fmt.Sprintf(`INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f);`, r.FromCode, r.ToCode, r.Amount, r.Ratio))
|
|
|
|
+ }
|
|
|
|
+ _, err := session.Execute(sb.String())
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Println("批量插入投资关系失败:", err)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type InvestRelationResult struct {
|
|
|
|
+ Related bool
|
|
|
|
+ Paths []map[string]string
|
|
|
|
+ CommonNodes []CommonNodeInfo
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type CommonNodeInfo struct {
|
|
|
|
+ VID string
|
|
|
|
+ Name string
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func CheckInvestRelationWithIntersection(session *nebula.Session, names []string, depth int) (bool, []map[string]string, []string, error) {
|
|
|
|
+ if len(names) == 0 || depth <= 0 {
|
|
|
|
+ return false, nil, nil, fmt.Errorf("invalid input")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 1: 获取所有企业的 VID
|
|
|
|
+ vids := make([]string, 0)
|
|
|
|
+ vidToName := make(map[string]string)
|
|
|
|
+ inputVIDSet := make(map[string]bool)
|
|
|
|
+
|
|
|
|
+ for _, name := range names {
|
|
|
|
+ query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
|
|
|
|
+ resp, err := session.Execute(query)
|
|
|
|
+ if err != nil || !resp.IsSucceed() {
|
|
|
|
+ log.Printf("lookup failed for name %s: %v", name, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ for _, row := range resp.GetRows() {
|
|
|
|
+ vid := string(row.Values[0].GetSVal())
|
|
|
|
+ vids = append(vids, fmt.Sprintf(`"%s"`, vid))
|
|
|
|
+ vidToName[vid] = name
|
|
|
|
+ inputVIDSet[vid] = true
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if len(vids) < 2 {
|
|
|
|
+ return false, nil, nil, nil // 不足两个公司参与判断
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 2: 查找路径(双向 Invest)
|
|
|
|
+ fromClause := strings.Join(vids, ",")
|
|
|
|
+ query := fmt.Sprintf(`
|
|
|
|
+ GO FROM %s OVER Invest BIDIRECT UPTO %d STEPS
|
|
|
|
+ YIELD src(edge) AS from, dst(edge) AS to
|
|
|
|
+ `, fromClause, depth)
|
|
|
|
+
|
|
|
|
+ resp, err := session.Execute(query)
|
|
|
|
+ if err != nil || !resp.IsSucceed() {
|
|
|
|
+ return false, nil, nil, fmt.Errorf("GO query failed: %v", err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 3: 统计路径和交集节点
|
|
|
|
+ relationPaths := make([]map[string]string, 0)
|
|
|
|
+ nodeToSources := make(map[string]map[string]bool) // key: node, value: set of inputVIDs
|
|
|
|
+
|
|
|
|
+ for _, row := range resp.GetRows() {
|
|
|
|
+ from := string(row.Values[0].GetSVal())
|
|
|
|
+ to := string(row.Values[1].GetSVal())
|
|
|
|
+
|
|
|
|
+ // 记录路径
|
|
|
|
+ relationPaths = append(relationPaths, map[string]string{
|
|
|
|
+ "from": from,
|
|
|
|
+ "to": to,
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ // 记录 from 节点来源
|
|
|
|
+ if !inputVIDSet[from] {
|
|
|
|
+ if nodeToSources[from] == nil {
|
|
|
|
+ nodeToSources[from] = make(map[string]bool)
|
|
|
|
+ }
|
|
|
|
+ for _, vid := range vids {
|
|
|
|
+ if from == strings.Trim(vid, `"`) {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if strings.Contains(query, vid) {
|
|
|
|
+ nodeToSources[from][strings.Trim(vid, `"`)] = true
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 记录 to 节点来源
|
|
|
|
+ if !inputVIDSet[to] {
|
|
|
|
+ if nodeToSources[to] == nil {
|
|
|
|
+ nodeToSources[to] = make(map[string]bool)
|
|
|
|
+ }
|
|
|
|
+ for _, vid := range vids {
|
|
|
|
+ if to == strings.Trim(vid, `"`) {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if strings.Contains(query, vid) {
|
|
|
|
+ nodeToSources[to][strings.Trim(vid, `"`)] = true
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 4: 找出出现在多个输入公司路径中的中间节点(交集)
|
|
|
|
+ commonNodeVIDs := make([]string, 0)
|
|
|
|
+ for node, sourceSet := range nodeToSources {
|
|
|
|
+ if len(sourceSet) >= 2 {
|
|
|
|
+ commonNodeVIDs = append(commonNodeVIDs, node)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 5: 查名称
|
|
|
|
+ intersectionNames := make([]string, 0)
|
|
|
|
+ if len(commonNodeVIDs) > 0 {
|
|
|
|
+ query := fmt.Sprintf(`FETCH PROP ON Legal %s YIELD Legal.name`, strings.Join(wrapInQuotes(commonNodeVIDs), ","))
|
|
|
|
+ resp, err := session.Execute(query)
|
|
|
|
+ if err == nil && resp.IsSucceed() {
|
|
|
|
+ for _, row := range resp.GetRows() {
|
|
|
|
+ name := string(row.Values[1].GetSVal())
|
|
|
|
+ intersectionNames = append(intersectionNames, name)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ found := len(intersectionNames) > 0
|
|
|
|
+ return found, relationPaths, intersectionNames, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func wrapInQuotes(ids []string) []string {
|
|
|
|
+ result := make([]string, len(ids))
|
|
|
|
+ for i, id := range ids {
|
|
|
|
+ result[i] = fmt.Sprintf(`"%s"`, id)
|
|
|
|
+ }
|
|
|
|
+ return result
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func CheckInvestRelation1(session *nebula.Session, names []string, depth int) (bool, []map[string]string, error) {
|
|
|
|
+ if len(names) == 0 || depth <= 0 {
|
|
|
|
+ return false, nil, fmt.Errorf("invalid input")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 1: 获取所有企业的 VID
|
|
|
|
+ vids := make([]string, 0)
|
|
|
|
+ nameSet := make(map[string]bool)
|
|
|
|
+ for _, name := range names {
|
|
|
|
+ query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
|
|
|
|
+ resp, err := session.Execute(query)
|
|
|
|
+ if err != nil || !resp.IsSucceed() {
|
|
|
|
+ log.Printf("lookup failed for name %s: %v", name, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ for _, row := range resp.GetRows() {
|
|
|
|
+ //vid := row.Values[0].GetSVal()
|
|
|
|
+ vid := string(row.Values[0].GetSVal())
|
|
|
|
+ vids = append(vids, fmt.Sprintf(`"%s"`, vid))
|
|
|
|
+ nameSet[vid] = true
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if len(vids) == 0 {
|
|
|
|
+ return false, nil, nil // 没有查出任何节点
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 2: 构造 GO 查询路径
|
|
|
|
+ fromClause := strings.Join(vids, ",")
|
|
|
|
+ query := fmt.Sprintf(`
|
|
|
|
+ GO FROM %s OVER Invest UPTO %d STEPS
|
|
|
|
+ YIELD src(edge) AS from, dst(edge) AS to
|
|
|
|
+ `, fromClause, depth)
|
|
|
|
+
|
|
|
|
+ resp, err := session.Execute(query)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return false, nil, fmt.Errorf("GO query failed: %v", err)
|
|
|
|
+ }
|
|
|
|
+ if !resp.IsSucceed() {
|
|
|
|
+ return false, nil, fmt.Errorf("Nebula error: %s", resp.GetErrorMsg())
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Step 3: 分析路径结果
|
|
|
|
+ resultPaths := make([]map[string]string, 0)
|
|
|
|
+ found := false
|
|
|
|
+ for _, row := range resp.GetRows() {
|
|
|
|
+ from := string(row.Values[0].GetSVal())
|
|
|
|
+ to := string(row.Values[1].GetSVal())
|
|
|
|
+
|
|
|
|
+ if nameSet[from] && nameSet[to] && from != to {
|
|
|
|
+ found = true
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ resultPaths = append(resultPaths, map[string]string{
|
|
|
|
+ "from": from,
|
|
|
|
+ "to": to,
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return found, resultPaths, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func FindInvestmentRelations() {
|
|
|
|
+
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//type PathRelation struct {
|
|
|
|
+// Companies []string
|
|
|
|
+// Paths []string
|
|
|
|
+//}
|
|
|
|
+//
|
|
|
|
+//func CheckLegalRelationsGraph(session *nebula.Session, names []string, deep int) (*PathRelation, error) {
|
|
|
|
+// // 查询 name -> vid 映射
|
|
|
|
+// nameToVid := make(map[string]string)
|
|
|
|
+// vidToName := make(map[string]string)
|
|
|
|
+// for _, name := range names {
|
|
|
|
+// vid, err := getVidByName(session, name)
|
|
|
|
+// if err != nil {
|
|
|
|
+// log.Printf("获取 %s 的 VID 失败: %v", name, err)
|
|
|
|
+// continue
|
|
|
|
+// }
|
|
|
|
+// nameToVid[name] = vid
|
|
|
|
+// vidToName[vid] = name
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// allPaths := [][]string{}
|
|
|
|
+// checked := make(map[string]bool)
|
|
|
|
+//
|
|
|
|
+// // 遍历所有组合
|
|
|
|
+// for i := 0; i < len(names); i++ {
|
|
|
|
+// for j := i + 1; j < len(names); j++ {
|
|
|
|
+// a, b := names[i], names[j]
|
|
|
|
+// vidA, okA := nameToVid[a]
|
|
|
|
+// vidB, okB := nameToVid[b]
|
|
|
|
+// if !okA || !okB {
|
|
|
|
+// continue
|
|
|
|
+// }
|
|
|
|
+// key := vidA + "|" + vidB
|
|
|
|
+// if checked[key] {
|
|
|
|
+// continue
|
|
|
|
+// }
|
|
|
|
+// checked[key] = true
|
|
|
|
+//
|
|
|
|
+// if pathAB, _ := findPath(session, vidA, vidB, deep); len(pathAB) > 0 {
|
|
|
|
+// allPaths = append(allPaths, pathAB)
|
|
|
|
+// }
|
|
|
|
+// if pathBA, _ := findPath(session, vidB, vidA, deep); len(pathBA) > 0 {
|
|
|
|
+// allPaths = append(allPaths, pathBA)
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// // 共同上级路径
|
|
|
|
+// common, commonPaths := checkCommonAncestor(session, vidA, vidB, deep)
|
|
|
|
+// if common {
|
|
|
|
+// allPaths = append(allPaths, commonPaths)
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// // 1. 收集所有涉及的 VID
|
|
|
|
+// vidSet := make(map[string]struct{})
|
|
|
|
+// for _, path := range allPaths {
|
|
|
|
+// for _, vid := range path {
|
|
|
|
+// vidSet[vid] = struct{}{}
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// // 2. 获取所有 VID 的公司名
|
|
|
|
+// for vid := range vidSet {
|
|
|
|
+// if _, ok := vidToName[vid]; ok {
|
|
|
|
+// continue
|
|
|
|
+// }
|
|
|
|
+// query := fmt.Sprintf(`FETCH PROP ON Legal "%s" YIELD Legal.name`, vid)
|
|
|
|
+// resp, err := session.Execute(query)
|
|
|
|
+// if err != nil || resp.IsEmpty() {
|
|
|
|
+// continue
|
|
|
|
+// }
|
|
|
|
+// rows := resp.GetRows()
|
|
|
|
+// if len(rows) > 0 && len(rows[0].Values) > 0 && rows[0].Values[0].SVal != nil {
|
|
|
|
+// vidToName[vid] = string(rows[0].Values[0].SVal)
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// // 3. 清洗路径并格式化输出
|
|
|
|
+// companySet := make(map[string]struct{})
|
|
|
|
+// result := &PathRelation{
|
|
|
|
+// Companies: []string{},
|
|
|
|
+// Paths: []string{},
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// for _, path := range allPaths {
|
|
|
|
+// namesPath := []string{}
|
|
|
|
+// last := ""
|
|
|
|
+// for _, vid := range path {
|
|
|
|
+// name, ok := vidToName[vid]
|
|
|
|
+// if !ok {
|
|
|
|
+// continue
|
|
|
|
+// }
|
|
|
|
+// if name == last {
|
|
|
|
+// continue // 去除重复节点
|
|
|
|
+// }
|
|
|
|
+// namesPath = append(namesPath, name)
|
|
|
|
+// last = name
|
|
|
|
+// companySet[name] = struct{}{}
|
|
|
|
+// }
|
|
|
|
+// if len(namesPath) >= 2 {
|
|
|
|
+// result.Paths = append(result.Paths, strings.Join(namesPath, "->"))
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// for name := range companySet {
|
|
|
|
+// result.Companies = append(result.Companies, name)
|
|
|
|
+// }
|
|
|
|
+// sort.Strings(result.Companies)
|
|
|
|
+// return result, nil
|
|
|
|
+//}
|
|
|
|
+//
|
|
|
|
+//func checkCommonAncestor(session *nebula.Session, aVid, bVid string, deep int) (bool, []string) {
|
|
|
|
+// query := fmt.Sprintf(`
|
|
|
|
+// (
|
|
|
|
+// GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
|
|
|
|
+// )
|
|
|
|
+// INTERSECT
|
|
|
|
+// (
|
|
|
|
+// GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
|
|
|
|
+// );
|
|
|
|
+// `, deep, aVid, deep, bVid)
|
|
|
|
+//
|
|
|
|
+// resp, err := session.Execute(query)
|
|
|
|
+// if err != nil {
|
|
|
|
+// return false, nil
|
|
|
|
+// }
|
|
|
|
+// ancestors, err := getFirstColumnStrings(resp)
|
|
|
|
+// if err != nil || len(ancestors) == 0 {
|
|
|
|
+// return false, nil
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// // 只返回第一个共同祖先的简单路径:a->ancestor->b
|
|
|
|
+// return true, []string{aVid, ancestors[0], bVid}
|
|
|
|
+//}
|
|
|
|
+//
|
|
|
|
+//func findPath(session *nebula.Session, fromVid, toVid string, maxStep int) ([]string, error) {
|
|
|
|
+// query := fmt.Sprintf(`FIND ALL PATH FROM "%s" TO "%s" OVER Invest UPTO %d STEPS YIELD path as p`, fromVid, toVid, maxStep)
|
|
|
|
+// resp, err := session.Execute(query)
|
|
|
|
+// if err != nil {
|
|
|
|
+// return nil, err
|
|
|
|
+// }
|
|
|
|
+// return getFirstColumnStrings(resp)
|
|
|
|
+//}
|
|
|
|
+//
|
|
|
|
+//func getVidByName(session *nebula.Session, name string) (string, error) {
|
|
|
|
+// query := fmt.Sprintf(`
|
|
|
|
+//USE `+Table_Space+`;
|
|
|
|
+//LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
|
|
|
|
+// resp, err := session.Execute(query)
|
|
|
|
+// if err != nil {
|
|
|
|
+// return "", err
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// values, err := getFirstColumnStrings(resp)
|
|
|
|
+// if err != nil || len(values) == 0 {
|
|
|
|
+// return "", fmt.Errorf("未找到公司: %s", name)
|
|
|
|
+// }
|
|
|
|
+// return values[0], nil
|
|
|
|
+//}
|
|
|
|
+//
|
|
|
|
+//func getFirstColumnStrings(resp *nebula.ResultSet) ([]string, error) {
|
|
|
|
+// if resp == nil {
|
|
|
|
+// return nil, fmt.Errorf("result set is nil")
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// var values []string
|
|
|
|
+// for _, row := range resp.GetRows() {
|
|
|
|
+// if len(row.Values) == 0 {
|
|
|
|
+// continue
|
|
|
|
+// }
|
|
|
|
+// val := row.Values[0]
|
|
|
|
+// switch {
|
|
|
|
+// case val.SVal != nil:
|
|
|
|
+// values = append(values, string(val.SVal))
|
|
|
|
+// case val.IVal != nil:
|
|
|
|
+// values = append(values, fmt.Sprintf("%d", *val.IVal))
|
|
|
|
+// case val.BVal != nil:
|
|
|
|
+// values = append(values, fmt.Sprintf("%v", *val.BVal))
|
|
|
|
+// default:
|
|
|
|
+// log.Printf("未知类型值: %+v", val)
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// return values, nil
|
|
|
|
+//}
|