wcc 3 月之前
父节点
当前提交
c6037bae5e
共有 5 个文件被更改,包括 98 次插入5 次删除
  1. 39 1
      graph/batch.go
  2. 47 1
      graph/dongshi.go
  3. 二进制
      graph/graph-dongshi-2
  4. 11 2
      graph/main.go
  5. 1 1
      graph/yisi.go

+ 39 - 1
graph/batch.go

@@ -472,6 +472,27 @@ func BatchInsertSuspectInvestWork(session *nebula.Session, wg *sync.WaitGroup, j
 	}
 }
 
+// BatchInsertExecutivesInvestWork 批量插入 高管关系 到图形数据局
+func BatchInsertExecutivesInvestWork(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan ExecutivesInvest) {
+	defer wg.Done()
+	batch := make([]ExecutivesInvest, 0, BatchSize)
+
+	for job := range jobs {
+		batch = append(batch, job)
+		if len(batch) >= BatchSize {
+			// 批量插入
+			BatchInsertExecutivesInvest(session, batch)
+			// 清空 batch
+			batch = batch[:0]
+		}
+	}
+
+	// 最后一批不足 BatchSize 的数据也插入
+	if len(batch) > 0 {
+		BatchInsertExecutivesInvest(session, batch)
+	}
+}
+
 func insertWorker(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan InsertJob) {
 	defer wg.Done()
 	for job := range jobs {
@@ -540,7 +561,24 @@ func BatchInsertSuspectInvest(session *nebula.Session, rels []SuspectInvest) {
 	}
 	_, err := session.Execute(sb.String())
 	if err != nil {
-		log.Println("批量插入投资关系失败:", err)
+		log.Println("批量插入 疑似关系 失败:", err)
+	}
+}
+
+// BatchInsertExecutivesInvest 批量处理数据到 高管 图形数据库
+func BatchInsertExecutivesInvest(session *nebula.Session, rels []ExecutivesInvest) {
+	if len(rels) == 0 {
+		return
+	}
+	var sb strings.Builder
+	sb.WriteString("USE " + Table_Space + "; ")
+	for _, r := range rels {
+		sb.WriteString(fmt.Sprintf(`INSERT EDGE ExecutivesInvest(name) VALUES "%s"->"%s":("%s");`,
+			r.FromCode, r.ToCode, r.Name))
+	}
+	_, err := session.Execute(sb.String())
+	if err != nil {
+		log.Println("批量插入  高管关系 失败:", err)
 	}
 }
 

+ 47 - 1
graph/dongshi.go

@@ -6,6 +6,7 @@ import (
 	"github.com/dgraph-io/badger/v3"
 	"go.mongodb.org/mongo-driver/mongo"
 	"go.mongodb.org/mongo-driver/mongo/options"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"log"
 	"strings"
 	"sync"
@@ -38,7 +39,7 @@ type EmployeeMatch struct {
 	CompanyID2   string `bson:"company_id2"`
 }
 
-// dealTsDongShi 处理企业 董事关系
+// dealTsDongShi 处理企业 董事关系 到 MongoDB数据库
 func dealTsDongShi() {
 	ctx := context.Background()
 	username := "SJZY_RWbid_ES"
@@ -187,3 +188,48 @@ func dealTsDongShi() {
 	wg.Wait()
 	log.Println("done.")
 }
+
+// dealDsGraph 处理 董事 高管数据到 graph 图形数据库
+func dealDsGraph() {
+	session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
+	if err != nil {
+		log.Fatalf("Failed to connect to Nebula Graph: %v", err)
+	}
+	defer pool.Close()
+	defer session.Release()
+	//
+	sess := MgoQy.GetMgoConn()
+	defer MgoQy.DestoryMongoConn(sess)
+	it := sess.DB("mixdata").C("wcc_employee_relation0424").Find(nil).Sort("_id").Select(nil).Iter()
+
+	jobChan := make(chan ExecutivesInvest, WorkerCount*2)
+	var wg sync.WaitGroup
+	// 启动工作协程
+	for i := 0; i < 5; i++ {
+		wg.Add(1)
+		go BatchInsertExecutivesInvestWork(session, &wg, jobChan)
+	}
+
+	count := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
+		if count%10000 == 0 {
+			log.Println("current:", count, tmp["company_name_1"], tmp["company_name_2"])
+		}
+		//
+		if util.ObjToString(tmp["company_name_1"]) == "" || util.ObjToString(tmp["company_name_2"]) == "" || util.ObjToString(tmp["employee_name"]) == "" {
+			continue
+		}
+
+		job := ExecutivesInvest{
+			FromCode: util.ObjToString(tmp["company_id1"]),
+			ToCode:   util.ObjToString(tmp["company_id2"]),
+			Name:     util.ObjToString(tmp["employee_name"]),
+		}
+
+		jobChan <- job
+	}
+
+	close(jobChan)
+	wg.Wait()
+	log.Println(" dealTsGraph  完成!")
+}

二进制
graph/graph-dongshi-2


+ 11 - 2
graph/main.go

@@ -59,6 +59,13 @@ type InsertSuspectJob struct {
 	Relations SuspectInvest
 }
 
+// ExecutivesInvest 高管关系结构体
+type ExecutivesInvest struct {
+	FromCode string `json:"from_code"`
+	ToCode   string `json:"to_code"`
+	Name     string `json:"name"` //姓名
+}
+
 // InsertJob 批量处理投资关系的点 和 边关系
 type InsertJob struct {
 	Companies []Legal
@@ -123,8 +130,10 @@ func main() {
 	//dealTsDongShi() //董事高管
 	//log.Println("处理完毕")
 	//return
-	InitMgo()     //初始化 MongoDB
-	dealTsGraph() //处理疑似关系图谱
+	InitMgo() //初始化 MongoDB
+	//dealTsGraph() //处理疑似关系图谱
+	dealDsGraph() //处理高管  关系到 图形数据库
+	//---------------//
 	//dda()
 	//dealCompanyBase22()
 	//dealCompanyBase() //迭代company_base 处理企业数据

+ 1 - 1
graph/yisi.go

@@ -320,7 +320,7 @@ func dealTsGraph() {
 	jobChan := make(chan InsertSuspectJob, WorkerCount*2)
 	var wg sync.WaitGroup
 	// 启动工作协程
-	for i := 0; i < 1; i++ {
+	for i := 0; i < 5; i++ {
 		wg.Add(1)
 		go BatchInsertSuspectInvestWork(session, &wg, jobChan)
 	}