batch.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. nebula "github.com/vesoft-inc/nebula-go/v3"
  8. "io"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "log"
  11. "strings"
  12. "sync"
  13. "time"
  14. "unicode/utf8"
  15. )
  16. func dealCompanyBase22() {
  17. log.Println("dealCompanyBase", "开始处理数据")
  18. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  19. if err != nil {
  20. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  21. }
  22. defer pool.Close()
  23. defer session.Release()
  24. defer util.Catch()
  25. sess := Mgo181.GetMgoConn()
  26. defer Mgo181.DestoryMongoConn(sess)
  27. it := sess.DB("mixdata").C("company_base").Find(nil).Sort("_id").Select(nil).Iter()
  28. //jobChan := make(chan InsertJob, WorkerCount*2)
  29. var wg sync.WaitGroup
  30. count := 0
  31. // 新增一个 job 构建的协程池
  32. buildChan := make(chan map[string]interface{}, WorkerCount*5) // 用于传递原始 Mongo 数据
  33. var wgBuild sync.WaitGroup
  34. for i := 0; i < 2; i++ {
  35. wgBuild.Add(1)
  36. go func() {
  37. defer wgBuild.Done()
  38. count2 := 0
  39. for tmp := range buildChan {
  40. count2++
  41. if count2%10000 == 0 {
  42. log.Printf("已处理 %d 条,休息 1 秒...\n", count)
  43. time.Sleep(time.Second)
  44. }
  45. c1 := Legal{
  46. Name: util.ObjToString(tmp["company_name"]),
  47. Code: util.ObjToString(tmp["credit_no"]),
  48. Type: "企业",
  49. }
  50. r1, err := InsertCompany(session, c1)
  51. if err != nil {
  52. log.Println("InsertCompany", r1, err)
  53. }
  54. // 耗时查询移到这里
  55. rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
  56. for _, v := range rea {
  57. d := Legal{
  58. Name: v.company_name,
  59. Code: v.credit_no,
  60. Type: "企业",
  61. }
  62. r, err := InsertCompany(session, d)
  63. if err != nil {
  64. log.Println("InsertCompany", r, err)
  65. }
  66. }
  67. for _, v := range resb {
  68. d := Invest{
  69. FromCode: v.stock_name,
  70. ToCode: v.company_name,
  71. Amount: v.stock_amount,
  72. Ratio: v.stock_rate,
  73. }
  74. err := InsertInvestRel(session, d)
  75. if err != nil {
  76. log.Println("InsertInvestRel", err, d)
  77. }
  78. }
  79. }
  80. }()
  81. }
  82. //realNum := 0
  83. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  84. if count%10000 == 0 {
  85. log.Println("current:", count, tmp["company_name"], tmp["_id"])
  86. }
  87. if util.IntAll(tmp["use_flag"]) > 0 {
  88. continue
  89. }
  90. if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
  91. continue
  92. }
  93. if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
  94. continue
  95. }
  96. // 注销;关闭
  97. if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
  98. continue
  99. }
  100. if utf8.RuneCountInString(util.ObjToString(tmp["company_name"])) < 5 {
  101. continue
  102. }
  103. buildChan <- tmp // 推送到异步处理构建
  104. }
  105. close(buildChan)
  106. wgBuild.Wait()
  107. wg.Wait()
  108. log.Println("完成!")
  109. }
  110. func dealCompanyBase() {
  111. log.Println("dealCompanyBase", "开始处理数据")
  112. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  113. if err != nil {
  114. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  115. }
  116. defer pool.Close()
  117. defer session.Release()
  118. defer util.Catch()
  119. sess := Mgo181.GetMgoConn()
  120. defer Mgo181.DestoryMongoConn(sess)
  121. it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Iter()
  122. //jobChan := make(chan InsertJob, WorkerCount*2)
  123. var wg sync.WaitGroup
  124. count := 0
  125. // 新增一个 job 构建的协程池
  126. buildChan := make(chan map[string]interface{}, WorkerCount*10) // 用于传递原始 Mongo 数据
  127. jobChan := make(chan InsertJob, WorkerCount*5)
  128. // 启动工作协程;存储数据
  129. //for i := 0; i < WorkerCount; i++ {
  130. // wg.Add(1)
  131. // go insertWorker(session, &wg, jobChan)
  132. //}
  133. //写入图形数据库
  134. for i := 0; i < WorkerCount; i++ {
  135. wg.Add(1)
  136. go func() {
  137. defer wg.Done()
  138. // 每个 worker 拿自己的 session
  139. localSession, err := pool.GetSession(UserName, PassWord)
  140. if err != nil {
  141. log.Println("获取 session 失败:", err)
  142. return
  143. }
  144. defer localSession.Release()
  145. insertWorker2(localSession, jobChan)
  146. }()
  147. }
  148. var wgBuild sync.WaitGroup
  149. for i := 0; i < WorkerCount; i++ {
  150. wgBuild.Add(1)
  151. go func() {
  152. defer wgBuild.Done()
  153. for tmp := range buildChan {
  154. c1 := Legal{
  155. Id: util.ObjToString(tmp["company_id"]),
  156. Name: util.ObjToString(tmp["company_name"]),
  157. Code: util.ObjToString(tmp["credit_no"]),
  158. Type: "企业",
  159. }
  160. if utf8.RuneCountInString(c1.Name) < 5 {
  161. continue
  162. }
  163. job := InsertJob{}
  164. job.Companies = append(job.Companies, c1)
  165. // 耗时查询移到这里
  166. rea, resb := GetInvByLevel(c1.Name, 2, 0, false)
  167. for _, v := range rea {
  168. d := Legal{
  169. Id: v.company_id,
  170. Name: v.company_name,
  171. Code: v.credit_no,
  172. Type: "企业",
  173. }
  174. job.Companies = append(job.Companies, d)
  175. }
  176. for _, v := range resb {
  177. d := Invest{
  178. FromCode: v.stock_id,
  179. ToCode: v.company_id,
  180. Amount: v.stock_amount,
  181. Ratio: v.stock_rate,
  182. }
  183. job.Relations = append(job.Relations, d)
  184. }
  185. jobChan <- job
  186. }
  187. }()
  188. }
  189. //realNum := 0
  190. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  191. if count%10000 == 0 {
  192. log.Println("current:", count, tmp["company_name"])
  193. }
  194. if util.IntAll(tmp["use_flag"]) > 0 {
  195. continue
  196. }
  197. if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
  198. continue
  199. }
  200. if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
  201. continue
  202. }
  203. // 注销;关闭
  204. if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
  205. continue
  206. }
  207. buildChan <- tmp // 推送到异步处理构建
  208. //1、处理点
  209. //job := InsertJob{}
  210. //c1 := Legal{
  211. // Name: util.ObjToString(tmp["company_name"]),
  212. // Code: util.ObjToString(tmp["credit_no"]),
  213. // Type: "企业",
  214. //}
  215. //if utf8.RuneCountInString(c1.Name) < 5 {
  216. // continue
  217. //}
  218. //job.Companies = append(job.Companies, c1)
  219. ////2、处理变
  220. //rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
  221. //for _, v := range rea {
  222. // d := Legal{
  223. // Name: v.company_name,
  224. // Code: v.credit_no,
  225. // Type: "企业",
  226. // }
  227. // job.Companies = append(job.Companies, d)
  228. //}
  229. //
  230. //for _, v := range resb {
  231. // d := Invest{
  232. // FromCode: v.stock_name,
  233. // ToCode: v.company_name,
  234. // Amount: v.stock_amount,
  235. // Ratio: v.stock_rate,
  236. // }
  237. // job.Relations = append(job.Relations, d)
  238. //}
  239. //
  240. //jobChan <- job
  241. }
  242. close(buildChan)
  243. wgBuild.Wait()
  244. close(jobChan)
  245. wg.Wait()
  246. log.Println("完成!")
  247. }
  248. // batchDealGraph 批量读取ES 企业库,写入 图形数据库,company_id 作为主键
  249. func batchDealGraph() {
  250. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  251. if err != nil {
  252. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  253. }
  254. defer pool.Close()
  255. defer session.Release()
  256. client, err := elastic.NewClient(
  257. elastic.SetURL("http://172.17.4.184:19908"),
  258. //elastic.SetURL("http://127.0.0.1:19908"),
  259. elastic.SetBasicAuth("jybid", "Top2023_JEB01i@31"),
  260. elastic.SetSniff(false),
  261. )
  262. if err != nil {
  263. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  264. }
  265. query := elastic.NewBoolQuery().
  266. //北京,天津,河北,上海,江苏,浙江,安徽
  267. //Must(elastic.NewTermQuery("area", "北京市")).
  268. //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  269. MustNot(
  270. elastic.NewTermQuery("company_type", "个体工商户"),
  271. elastic.NewTermsQuery("company_status", "吊销", "注销"),
  272. )
  273. //Must(elastic.NewTermQuery("company_name", "北京剑鱼信息技术有限公司"))
  274. //Must(elastic.NewTermsQuery("company_area", "河南"))
  275. ctx := context.Background()
  276. searchSource := elastic.NewSearchSource().
  277. Query(query).
  278. Size(10000).
  279. Sort("_doc", true)
  280. searchService := client.Scroll("qyxy").
  281. Size(10000).
  282. Scroll("5m").
  283. SearchSource(searchSource)
  284. jobChan := make(chan InsertJob, WorkerCount*2)
  285. var wg sync.WaitGroup
  286. // 启动工作协程
  287. for i := 0; i < WorkerCount; i++ {
  288. wg.Add(1)
  289. go insertWorker(session, &wg, jobChan)
  290. }
  291. total := 0
  292. for {
  293. res, err := searchService.Do(ctx)
  294. if err == io.EOF {
  295. break
  296. }
  297. if err != nil {
  298. log.Println("scroll error:", err)
  299. break
  300. }
  301. fmt.Println("总数是:", res.TotalHits())
  302. if len(res.Hits.Hits) == 0 {
  303. break
  304. }
  305. job := InsertJob{}
  306. for _, hit := range res.Hits.Hits {
  307. var doc map[string]interface{}
  308. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  309. log.Println("解析失败", err)
  310. continue
  311. }
  312. c1 := Legal{
  313. Id: util.ObjToString(doc["id"]),
  314. Name: util.ObjToString(doc["company_name"]),
  315. Code: util.ObjToString(doc["credit_no"]),
  316. Type: "企业",
  317. }
  318. ////存续、在营、开业、在册
  319. //if strings.Contains(util.ObjToString(doc["company_status"]), "存续") || strings.Contains(util.ObjToString(doc["company_status"]), "在营") || strings.Contains(util.ObjToString(doc["company_status"]), "在册") || strings.Contains(util.ObjToString(doc["company_status"]), "开业") {
  320. // c1.State = "有效"
  321. //} else {
  322. // c1.State = "无效"
  323. //}
  324. if !strings.Contains(c1.Name, "公司") {
  325. continue
  326. }
  327. if c1.Name == "" || c1.Code == "" || strings.Contains(c1.Name, "已除名") {
  328. continue
  329. }
  330. if strings.Contains(util.ObjToString(doc["company_status"]), "吊销") || strings.Contains(util.ObjToString(doc["company_status"]), "注销") {
  331. continue
  332. }
  333. if utf8.RuneCountInString(c1.Name) < 5 {
  334. continue
  335. }
  336. job.Companies = append(job.Companies, c1)
  337. if partners, ok := doc["partners"].([]interface{}); ok {
  338. for _, partner := range partners {
  339. if da, ok := partner.(map[string]interface{}); ok {
  340. if !strings.Contains(util.ObjToString(da["stock_type"]), "自然人") && !strings.Contains(util.ObjToString(da["stock_type"]), "个人") {
  341. if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
  342. continue
  343. }
  344. //1
  345. where1 := map[string]interface{}{
  346. "company_name": util.ObjToString(da["stock_name"]),
  347. }
  348. tmpBase, _ := Mgo181.FindOne("company_base", where1)
  349. if len(*tmpBase) > 0 {
  350. c2 := Legal{
  351. Id: util.ObjToString((*tmpBase)["company_id"]),
  352. Name: util.ObjToString(da["stock_name"]),
  353. Code: util.ObjToString(da["identify_no"]),
  354. Type: "企业",
  355. }
  356. //if strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "存续") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在营") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在册") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "开业") {
  357. // c2.State = "有效"
  358. //} else {
  359. // c2.State = "无效"
  360. //}
  361. job.Companies = append(job.Companies, c2)
  362. //2
  363. where := map[string]interface{}{
  364. "company_name": util.ObjToString(doc["company_name"]),
  365. "stock_name": util.ObjToString(da["stock_name"]),
  366. }
  367. ddd, _ := Mgo181.FindOne("company_partner", where)
  368. if len(*ddd) > 0 {
  369. par := *ddd
  370. invest := Invest{
  371. FromCode: c2.Id,
  372. ToCode: c1.Id,
  373. Ratio: util.Float64All(par["stock_proportion"]),
  374. Amount: ParseStockCapital(util.ObjToString(par["stock_capital"])),
  375. }
  376. job.Relations = append(job.Relations, invest)
  377. }
  378. }
  379. }
  380. }
  381. }
  382. }
  383. }
  384. jobChan <- job
  385. total += len(res.Hits.Hits)
  386. log.Println("处理总量:", total)
  387. }
  388. close(jobChan)
  389. wg.Wait()
  390. log.Println("完成!")
  391. }
  392. func insertWorker2(session *nebula.Session, jobs <-chan InsertJob) {
  393. for job := range jobs {
  394. // 分批插入公司
  395. for i := 0; i < len(job.Companies); i += BatchSize {
  396. end := i + BatchSize
  397. if end > len(job.Companies) {
  398. end = len(job.Companies)
  399. }
  400. BatchInsertCompanies(session, job.Companies[i:end])
  401. //time.Sleep(time.Second * 1)
  402. }
  403. // 分批插入投资关系
  404. for i := 0; i < len(job.Relations); i += BatchSize {
  405. end := i + BatchSize
  406. if end > len(job.Relations) {
  407. end = len(job.Relations)
  408. }
  409. BatchInsertInvestRels(session, job.Relations[i:end])
  410. //time.Sleep(time.Second * 1)
  411. }
  412. }
  413. }
  414. // BatchInsertSuspectInvestWork 批量处理 疑似关系的边
  415. func BatchInsertSuspectInvestWork(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan InsertSuspectJob) {
  416. defer wg.Done()
  417. batch := make([]SuspectInvest, 0, BatchSize)
  418. for job := range jobs {
  419. batch = append(batch, job.Relations)
  420. if len(batch) >= BatchSize {
  421. // 批量插入
  422. BatchInsertSuspectInvest(session, batch)
  423. // 清空 batch
  424. batch = batch[:0]
  425. }
  426. }
  427. // 最后一批不足 BatchSize 的数据也插入
  428. if len(batch) > 0 {
  429. BatchInsertSuspectInvest(session, batch)
  430. }
  431. }
  432. // BatchInsertExecutivesInvestWork 批量插入 高管关系 到图形数据局
  433. func BatchInsertExecutivesInvestWork(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan ExecutivesInvest) {
  434. defer wg.Done()
  435. batch := make([]ExecutivesInvest, 0, BatchSize)
  436. for job := range jobs {
  437. batch = append(batch, job)
  438. if len(batch) >= BatchSize {
  439. // 批量插入
  440. BatchInsertExecutivesInvest(session, batch)
  441. // 清空 batch
  442. batch = batch[:0]
  443. }
  444. }
  445. // 最后一批不足 BatchSize 的数据也插入
  446. if len(batch) > 0 {
  447. BatchInsertExecutivesInvest(session, batch)
  448. }
  449. }
  450. func insertWorker(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan InsertJob) {
  451. defer wg.Done()
  452. for job := range jobs {
  453. // 分批插入公司
  454. for i := 0; i < len(job.Companies); i += BatchSize {
  455. end := i + BatchSize
  456. if end > len(job.Companies) {
  457. end = len(job.Companies)
  458. }
  459. BatchInsertCompanies(session, job.Companies[i:end])
  460. }
  461. // 分批插入投资关系
  462. for i := 0; i < len(job.Relations); i += BatchSize {
  463. end := i + BatchSize
  464. if end > len(job.Relations) {
  465. end = len(job.Relations)
  466. }
  467. BatchInsertInvestRels(session, job.Relations[i:end])
  468. }
  469. }
  470. }
  471. func BatchInsertCompanies(session *nebula.Session, companies []Legal) {
  472. if len(companies) == 0 {
  473. return
  474. }
  475. var sb strings.Builder
  476. sb.WriteString("USE " + Table_Space + "; ")
  477. for _, c := range companies {
  478. sb.WriteString(fmt.Sprintf(`INSERT VERTEX Legal(name, code, type, state) VALUES "%s":("%s", "%s", "%s", "%s");`, c.Id, c.Name, c.Code, c.Type, c.State))
  479. }
  480. _, err := session.Execute(sb.String())
  481. if err != nil {
  482. log.Println("批量插入公司失败:", err)
  483. }
  484. }
  485. // BatchInsertInvestRels 批量处理 投资关系-边数据
  486. func BatchInsertInvestRels(session *nebula.Session, rels []Invest) {
  487. if len(rels) == 0 {
  488. return
  489. }
  490. var sb strings.Builder
  491. sb.WriteString("USE " + Table_Space + "; ")
  492. for _, r := range rels {
  493. sb.WriteString(fmt.Sprintf(`INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f);`, r.FromCode, r.ToCode, r.Amount, r.Ratio))
  494. }
  495. _, err := session.Execute(sb.String())
  496. if err != nil {
  497. log.Println("批量插入投资关系失败:", err)
  498. }
  499. }
  500. // BatchInsertSuspectInvest 批量处理疑似关系的-边数据
  501. func BatchInsertSuspectInvest(session *nebula.Session, rels []SuspectInvest) {
  502. if len(rels) == 0 {
  503. return
  504. }
  505. var sb strings.Builder
  506. sb.WriteString("USE " + Table_Space + "; ")
  507. for _, r := range rels {
  508. //sb.WriteString(fmt.Sprintf(`INSERT EDGE SuspectInvest(reason) VALUES "%s"->"%s":(%s);`, r.FromCode, r.ToCode, r.Reason))
  509. sb.WriteString(fmt.Sprintf(`INSERT EDGE SuspectInvest(reason) VALUES "%s"->"%s":("%s");`,
  510. r.FromCode, r.ToCode, r.Reason))
  511. }
  512. _, err := session.Execute(sb.String())
  513. if err != nil {
  514. log.Println("批量插入 疑似关系 失败:", err)
  515. }
  516. }
  517. // BatchInsertExecutivesInvest 批量处理数据到 高管 图形数据库
  518. func BatchInsertExecutivesInvest(session *nebula.Session, rels []ExecutivesInvest) {
  519. if len(rels) == 0 {
  520. return
  521. }
  522. var sb strings.Builder
  523. sb.WriteString("USE " + Table_Space + "; ")
  524. for _, r := range rels {
  525. sb.WriteString(fmt.Sprintf(`INSERT EDGE ExecutivesInvest(name) VALUES "%s"->"%s":("%s");`,
  526. r.FromCode, r.ToCode, r.Name))
  527. }
  528. _, err := session.Execute(sb.String())
  529. if err != nil {
  530. log.Println("批量插入 高管关系 失败:", err)
  531. }
  532. }
  533. type InvestRelationResult struct {
  534. Related bool
  535. Paths []map[string]string
  536. CommonNodes []CommonNodeInfo
  537. }
  538. type CommonNodeInfo struct {
  539. VID string
  540. Name string
  541. }
  542. func CheckInvestRelationWithIntersection(session *nebula.Session, names []string, depth int) (bool, []map[string]string, []string, error) {
  543. if len(names) == 0 || depth <= 0 {
  544. return false, nil, nil, fmt.Errorf("invalid input")
  545. }
  546. // Step 1: 获取所有企业的 VID
  547. vids := make([]string, 0)
  548. vidToName := make(map[string]string)
  549. inputVIDSet := make(map[string]bool)
  550. for _, name := range names {
  551. query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
  552. resp, err := session.Execute(query)
  553. if err != nil || !resp.IsSucceed() {
  554. log.Printf("lookup failed for name %s: %v", name, err)
  555. continue
  556. }
  557. for _, row := range resp.GetRows() {
  558. vid := string(row.Values[0].GetSVal())
  559. vids = append(vids, fmt.Sprintf(`"%s"`, vid))
  560. vidToName[vid] = name
  561. inputVIDSet[vid] = true
  562. }
  563. }
  564. if len(vids) < 2 {
  565. return false, nil, nil, nil // 不足两个公司参与判断
  566. }
  567. // Step 2: 查找路径(双向 Invest)
  568. fromClause := strings.Join(vids, ",")
  569. query := fmt.Sprintf(`
  570. GO FROM %s OVER Invest BIDIRECT UPTO %d STEPS
  571. YIELD src(edge) AS from, dst(edge) AS to
  572. `, fromClause, depth)
  573. resp, err := session.Execute(query)
  574. if err != nil || !resp.IsSucceed() {
  575. return false, nil, nil, fmt.Errorf("GO query failed: %v", err)
  576. }
  577. // Step 3: 统计路径和交集节点
  578. relationPaths := make([]map[string]string, 0)
  579. nodeToSources := make(map[string]map[string]bool) // key: node, value: set of inputVIDs
  580. for _, row := range resp.GetRows() {
  581. from := string(row.Values[0].GetSVal())
  582. to := string(row.Values[1].GetSVal())
  583. // 记录路径
  584. relationPaths = append(relationPaths, map[string]string{
  585. "from": from,
  586. "to": to,
  587. })
  588. // 记录 from 节点来源
  589. if !inputVIDSet[from] {
  590. if nodeToSources[from] == nil {
  591. nodeToSources[from] = make(map[string]bool)
  592. }
  593. for _, vid := range vids {
  594. if from == strings.Trim(vid, `"`) {
  595. continue
  596. }
  597. if strings.Contains(query, vid) {
  598. nodeToSources[from][strings.Trim(vid, `"`)] = true
  599. }
  600. }
  601. }
  602. // 记录 to 节点来源
  603. if !inputVIDSet[to] {
  604. if nodeToSources[to] == nil {
  605. nodeToSources[to] = make(map[string]bool)
  606. }
  607. for _, vid := range vids {
  608. if to == strings.Trim(vid, `"`) {
  609. continue
  610. }
  611. if strings.Contains(query, vid) {
  612. nodeToSources[to][strings.Trim(vid, `"`)] = true
  613. }
  614. }
  615. }
  616. }
  617. // Step 4: 找出出现在多个输入公司路径中的中间节点(交集)
  618. commonNodeVIDs := make([]string, 0)
  619. for node, sourceSet := range nodeToSources {
  620. if len(sourceSet) >= 2 {
  621. commonNodeVIDs = append(commonNodeVIDs, node)
  622. }
  623. }
  624. // Step 5: 查名称
  625. intersectionNames := make([]string, 0)
  626. if len(commonNodeVIDs) > 0 {
  627. query := fmt.Sprintf(`FETCH PROP ON Legal %s YIELD Legal.name`, strings.Join(wrapInQuotes(commonNodeVIDs), ","))
  628. resp, err := session.Execute(query)
  629. if err == nil && resp.IsSucceed() {
  630. for _, row := range resp.GetRows() {
  631. name := string(row.Values[1].GetSVal())
  632. intersectionNames = append(intersectionNames, name)
  633. }
  634. }
  635. }
  636. found := len(intersectionNames) > 0
  637. return found, relationPaths, intersectionNames, nil
  638. }
  639. func wrapInQuotes(ids []string) []string {
  640. result := make([]string, len(ids))
  641. for i, id := range ids {
  642. result[i] = fmt.Sprintf(`"%s"`, id)
  643. }
  644. return result
  645. }
  646. func CheckInvestRelation1(session *nebula.Session, names []string, depth int) (bool, []map[string]string, error) {
  647. if len(names) == 0 || depth <= 0 {
  648. return false, nil, fmt.Errorf("invalid input")
  649. }
  650. // Step 1: 获取所有企业的 VID
  651. vids := make([]string, 0)
  652. nameSet := make(map[string]bool)
  653. for _, name := range names {
  654. query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
  655. resp, err := session.Execute(query)
  656. if err != nil || !resp.IsSucceed() {
  657. log.Printf("lookup failed for name %s: %v", name, err)
  658. continue
  659. }
  660. for _, row := range resp.GetRows() {
  661. //vid := row.Values[0].GetSVal()
  662. vid := string(row.Values[0].GetSVal())
  663. vids = append(vids, fmt.Sprintf(`"%s"`, vid))
  664. nameSet[vid] = true
  665. }
  666. }
  667. if len(vids) == 0 {
  668. return false, nil, nil // 没有查出任何节点
  669. }
  670. // Step 2: 构造 GO 查询路径
  671. fromClause := strings.Join(vids, ",")
  672. query := fmt.Sprintf(`
  673. GO FROM %s OVER Invest UPTO %d STEPS
  674. YIELD src(edge) AS from, dst(edge) AS to
  675. `, fromClause, depth)
  676. resp, err := session.Execute(query)
  677. if err != nil {
  678. return false, nil, fmt.Errorf("GO query failed: %v", err)
  679. }
  680. if !resp.IsSucceed() {
  681. return false, nil, fmt.Errorf("Nebula error: %s", resp.GetErrorMsg())
  682. }
  683. // Step 3: 分析路径结果
  684. resultPaths := make([]map[string]string, 0)
  685. found := false
  686. for _, row := range resp.GetRows() {
  687. from := string(row.Values[0].GetSVal())
  688. to := string(row.Values[1].GetSVal())
  689. if nameSet[from] && nameSet[to] && from != to {
  690. found = true
  691. }
  692. resultPaths = append(resultPaths, map[string]string{
  693. "from": from,
  694. "to": to,
  695. })
  696. }
  697. return found, resultPaths, nil
  698. }
  699. func FindInvestmentRelations() {
  700. }
  701. //type PathRelation struct {
  702. // Companies []string
  703. // Paths []string
  704. //}
  705. //
  706. //func CheckLegalRelationsGraph(session *nebula.Session, names []string, deep int) (*PathRelation, error) {
  707. // // 查询 name -> vid 映射
  708. // nameToVid := make(map[string]string)
  709. // vidToName := make(map[string]string)
  710. // for _, name := range names {
  711. // vid, err := getVidByName(session, name)
  712. // if err != nil {
  713. // log.Printf("获取 %s 的 VID 失败: %v", name, err)
  714. // continue
  715. // }
  716. // nameToVid[name] = vid
  717. // vidToName[vid] = name
  718. // }
  719. //
  720. // allPaths := [][]string{}
  721. // checked := make(map[string]bool)
  722. //
  723. // // 遍历所有组合
  724. // for i := 0; i < len(names); i++ {
  725. // for j := i + 1; j < len(names); j++ {
  726. // a, b := names[i], names[j]
  727. // vidA, okA := nameToVid[a]
  728. // vidB, okB := nameToVid[b]
  729. // if !okA || !okB {
  730. // continue
  731. // }
  732. // key := vidA + "|" + vidB
  733. // if checked[key] {
  734. // continue
  735. // }
  736. // checked[key] = true
  737. //
  738. // if pathAB, _ := findPath(session, vidA, vidB, deep); len(pathAB) > 0 {
  739. // allPaths = append(allPaths, pathAB)
  740. // }
  741. // if pathBA, _ := findPath(session, vidB, vidA, deep); len(pathBA) > 0 {
  742. // allPaths = append(allPaths, pathBA)
  743. // }
  744. //
  745. // // 共同上级路径
  746. // common, commonPaths := checkCommonAncestor(session, vidA, vidB, deep)
  747. // if common {
  748. // allPaths = append(allPaths, commonPaths)
  749. // }
  750. // }
  751. // }
  752. //
  753. // // 1. 收集所有涉及的 VID
  754. // vidSet := make(map[string]struct{})
  755. // for _, path := range allPaths {
  756. // for _, vid := range path {
  757. // vidSet[vid] = struct{}{}
  758. // }
  759. // }
  760. //
  761. // // 2. 获取所有 VID 的公司名
  762. // for vid := range vidSet {
  763. // if _, ok := vidToName[vid]; ok {
  764. // continue
  765. // }
  766. // query := fmt.Sprintf(`FETCH PROP ON Legal "%s" YIELD Legal.name`, vid)
  767. // resp, err := session.Execute(query)
  768. // if err != nil || resp.IsEmpty() {
  769. // continue
  770. // }
  771. // rows := resp.GetRows()
  772. // if len(rows) > 0 && len(rows[0].Values) > 0 && rows[0].Values[0].SVal != nil {
  773. // vidToName[vid] = string(rows[0].Values[0].SVal)
  774. // }
  775. // }
  776. //
  777. // // 3. 清洗路径并格式化输出
  778. // companySet := make(map[string]struct{})
  779. // result := &PathRelation{
  780. // Companies: []string{},
  781. // Paths: []string{},
  782. // }
  783. //
  784. // for _, path := range allPaths {
  785. // namesPath := []string{}
  786. // last := ""
  787. // for _, vid := range path {
  788. // name, ok := vidToName[vid]
  789. // if !ok {
  790. // continue
  791. // }
  792. // if name == last {
  793. // continue // 去除重复节点
  794. // }
  795. // namesPath = append(namesPath, name)
  796. // last = name
  797. // companySet[name] = struct{}{}
  798. // }
  799. // if len(namesPath) >= 2 {
  800. // result.Paths = append(result.Paths, strings.Join(namesPath, "->"))
  801. // }
  802. // }
  803. //
  804. // for name := range companySet {
  805. // result.Companies = append(result.Companies, name)
  806. // }
  807. // sort.Strings(result.Companies)
  808. // return result, nil
  809. //}
  810. //
  811. //func checkCommonAncestor(session *nebula.Session, aVid, bVid string, deep int) (bool, []string) {
  812. // query := fmt.Sprintf(`
  813. // (
  814. // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
  815. // )
  816. // INTERSECT
  817. // (
  818. // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
  819. // );
  820. // `, deep, aVid, deep, bVid)
  821. //
  822. // resp, err := session.Execute(query)
  823. // if err != nil {
  824. // return false, nil
  825. // }
  826. // ancestors, err := getFirstColumnStrings(resp)
  827. // if err != nil || len(ancestors) == 0 {
  828. // return false, nil
  829. // }
  830. //
  831. // // 只返回第一个共同祖先的简单路径:a->ancestor->b
  832. // return true, []string{aVid, ancestors[0], bVid}
  833. //}
  834. //
  835. //func findPath(session *nebula.Session, fromVid, toVid string, maxStep int) ([]string, error) {
  836. // query := fmt.Sprintf(`FIND ALL PATH FROM "%s" TO "%s" OVER Invest UPTO %d STEPS YIELD path as p`, fromVid, toVid, maxStep)
  837. // resp, err := session.Execute(query)
  838. // if err != nil {
  839. // return nil, err
  840. // }
  841. // return getFirstColumnStrings(resp)
  842. //}
  843. //
  844. //func getVidByName(session *nebula.Session, name string) (string, error) {
  845. // query := fmt.Sprintf(`
  846. //USE `+Table_Space+`;
  847. //LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
  848. // resp, err := session.Execute(query)
  849. // if err != nil {
  850. // return "", err
  851. // }
  852. //
  853. // values, err := getFirstColumnStrings(resp)
  854. // if err != nil || len(values) == 0 {
  855. // return "", fmt.Errorf("未找到公司: %s", name)
  856. // }
  857. // return values[0], nil
  858. //}
  859. //
  860. //func getFirstColumnStrings(resp *nebula.ResultSet) ([]string, error) {
  861. // if resp == nil {
  862. // return nil, fmt.Errorf("result set is nil")
  863. // }
  864. //
  865. // var values []string
  866. // for _, row := range resp.GetRows() {
  867. // if len(row.Values) == 0 {
  868. // continue
  869. // }
  870. // val := row.Values[0]
  871. // switch {
  872. // case val.SVal != nil:
  873. // values = append(values, string(val.SVal))
  874. // case val.IVal != nil:
  875. // values = append(values, fmt.Sprintf("%d", *val.IVal))
  876. // case val.BVal != nil:
  877. // values = append(values, fmt.Sprintf("%v", *val.BVal))
  878. // default:
  879. // log.Printf("未知类型值: %+v", val)
  880. // }
  881. // }
  882. // return values, nil
  883. //}