batch.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. nebula "github.com/vesoft-inc/nebula-go/v3"
  8. "io"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "log"
  11. "strings"
  12. "sync"
  13. "time"
  14. "unicode/utf8"
  15. )
  16. func dealCompanyBase22() {
  17. log.Println("dealCompanyBase", "开始处理数据")
  18. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  19. if err != nil {
  20. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  21. }
  22. defer pool.Close()
  23. defer session.Release()
  24. defer util.Catch()
  25. sess := Mgo181.GetMgoConn()
  26. defer Mgo181.DestoryMongoConn(sess)
  27. it := sess.DB("mixdata").C("company_base").Find(nil).Sort("_id").Select(nil).Iter()
  28. //jobChan := make(chan InsertJob, WorkerCount*2)
  29. var wg sync.WaitGroup
  30. count := 0
  31. // 新增一个 job 构建的协程池
  32. buildChan := make(chan map[string]interface{}, WorkerCount*5) // 用于传递原始 Mongo 数据
  33. var wgBuild sync.WaitGroup
  34. for i := 0; i < 2; i++ {
  35. wgBuild.Add(1)
  36. go func() {
  37. defer wgBuild.Done()
  38. count2 := 0
  39. for tmp := range buildChan {
  40. count2++
  41. if count2%10000 == 0 {
  42. log.Printf("已处理 %d 条,休息 1 秒...\n", count)
  43. time.Sleep(time.Second)
  44. }
  45. c1 := Legal{
  46. Name: util.ObjToString(tmp["company_name"]),
  47. Code: util.ObjToString(tmp["credit_no"]),
  48. Type: "企业",
  49. }
  50. r1, err := InsertCompany(session, c1)
  51. if err != nil {
  52. log.Println("InsertCompany", r1, err)
  53. }
  54. // 耗时查询移到这里
  55. rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
  56. for _, v := range rea {
  57. d := Legal{
  58. Name: v.company_name,
  59. Code: v.credit_no,
  60. Type: "企业",
  61. }
  62. r, err := InsertCompany(session, d)
  63. if err != nil {
  64. log.Println("InsertCompany", r, err)
  65. }
  66. }
  67. for _, v := range resb {
  68. d := Invest{
  69. FromCode: v.stock_name,
  70. ToCode: v.company_name,
  71. Amount: v.stock_amount,
  72. Ratio: v.stock_rate,
  73. }
  74. err := InsertInvestRel(session, d)
  75. if err != nil {
  76. log.Println("InsertInvestRel", err, d)
  77. }
  78. }
  79. }
  80. }()
  81. }
  82. //realNum := 0
  83. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  84. if count%10000 == 0 {
  85. log.Println("current:", count, tmp["company_name"], tmp["_id"])
  86. }
  87. if util.IntAll(tmp["use_flag"]) > 0 {
  88. continue
  89. }
  90. if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
  91. continue
  92. }
  93. if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
  94. continue
  95. }
  96. // 注销;关闭
  97. if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
  98. continue
  99. }
  100. if utf8.RuneCountInString(util.ObjToString(tmp["company_name"])) < 5 {
  101. continue
  102. }
  103. buildChan <- tmp // 推送到异步处理构建
  104. }
  105. close(buildChan)
  106. wgBuild.Wait()
  107. wg.Wait()
  108. log.Println("完成!")
  109. }
  110. func dealCompanyBase() {
  111. log.Println("dealCompanyBase", "开始处理数据")
  112. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  113. if err != nil {
  114. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  115. }
  116. defer pool.Close()
  117. defer session.Release()
  118. defer util.Catch()
  119. sess := Mgo181.GetMgoConn()
  120. defer Mgo181.DestoryMongoConn(sess)
  121. it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Iter()
  122. //jobChan := make(chan InsertJob, WorkerCount*2)
  123. var wg sync.WaitGroup
  124. count := 0
  125. // 新增一个 job 构建的协程池
  126. buildChan := make(chan map[string]interface{}, WorkerCount*10) // 用于传递原始 Mongo 数据
  127. jobChan := make(chan InsertJob, WorkerCount*5)
  128. // 启动工作协程;存储数据
  129. //for i := 0; i < WorkerCount; i++ {
  130. // wg.Add(1)
  131. // go insertWorker(session, &wg, jobChan)
  132. //}
  133. //写入图形数据库
  134. for i := 0; i < WorkerCount; i++ {
  135. wg.Add(1)
  136. go func() {
  137. defer wg.Done()
  138. // 每个 worker 拿自己的 session
  139. localSession, err := pool.GetSession(UserName, PassWord)
  140. if err != nil {
  141. log.Println("获取 session 失败:", err)
  142. return
  143. }
  144. defer localSession.Release()
  145. insertWorker2(localSession, jobChan)
  146. }()
  147. }
  148. var wgBuild sync.WaitGroup
  149. for i := 0; i < WorkerCount; i++ {
  150. wgBuild.Add(1)
  151. go func() {
  152. defer wgBuild.Done()
  153. for tmp := range buildChan {
  154. c1 := Legal{
  155. Id: util.ObjToString(tmp["company_id"]),
  156. Name: util.ObjToString(tmp["company_name"]),
  157. Code: util.ObjToString(tmp["credit_no"]),
  158. Type: "企业",
  159. }
  160. if utf8.RuneCountInString(c1.Name) < 5 {
  161. continue
  162. }
  163. job := InsertJob{}
  164. job.Companies = append(job.Companies, c1)
  165. // 耗时查询移到这里
  166. rea, resb := GetInvByLevel(c1.Name, 2, 0, false)
  167. for _, v := range rea {
  168. d := Legal{
  169. Id: v.company_id,
  170. Name: v.company_name,
  171. Code: v.credit_no,
  172. Type: "企业",
  173. }
  174. job.Companies = append(job.Companies, d)
  175. }
  176. for _, v := range resb {
  177. d := Invest{
  178. FromCode: v.stock_id,
  179. ToCode: v.company_id,
  180. Amount: v.stock_amount,
  181. Ratio: v.stock_rate,
  182. }
  183. job.Relations = append(job.Relations, d)
  184. }
  185. jobChan <- job
  186. }
  187. }()
  188. }
  189. //realNum := 0
  190. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  191. if count%10000 == 0 {
  192. log.Println("current:", count, tmp["company_name"])
  193. }
  194. if util.IntAll(tmp["use_flag"]) > 0 {
  195. continue
  196. }
  197. if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" {
  198. continue
  199. }
  200. if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" {
  201. continue
  202. }
  203. // 注销;关闭
  204. if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") {
  205. continue
  206. }
  207. buildChan <- tmp // 推送到异步处理构建
  208. //1、处理点
  209. //job := InsertJob{}
  210. //c1 := Legal{
  211. // Name: util.ObjToString(tmp["company_name"]),
  212. // Code: util.ObjToString(tmp["credit_no"]),
  213. // Type: "企业",
  214. //}
  215. //if utf8.RuneCountInString(c1.Name) < 5 {
  216. // continue
  217. //}
  218. //job.Companies = append(job.Companies, c1)
  219. ////2、处理变
  220. //rea, resb := GetInvByLevel(c1.Name, 1, 0, false)
  221. //for _, v := range rea {
  222. // d := Legal{
  223. // Name: v.company_name,
  224. // Code: v.credit_no,
  225. // Type: "企业",
  226. // }
  227. // job.Companies = append(job.Companies, d)
  228. //}
  229. //
  230. //for _, v := range resb {
  231. // d := Invest{
  232. // FromCode: v.stock_name,
  233. // ToCode: v.company_name,
  234. // Amount: v.stock_amount,
  235. // Ratio: v.stock_rate,
  236. // }
  237. // job.Relations = append(job.Relations, d)
  238. //}
  239. //
  240. //jobChan <- job
  241. }
  242. close(buildChan)
  243. wgBuild.Wait()
  244. close(jobChan)
  245. wg.Wait()
  246. log.Println("完成!")
  247. }
  248. func batchDealGraph() {
  249. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  250. if err != nil {
  251. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  252. }
  253. defer pool.Close()
  254. defer session.Release()
  255. client, err := elastic.NewClient(
  256. elastic.SetURL("http://172.17.4.184:19908"),
  257. //elastic.SetURL("http://127.0.0.1:19908"),
  258. elastic.SetBasicAuth("jybid", "Top2023_JEB01i@31"),
  259. elastic.SetSniff(false),
  260. )
  261. if err != nil {
  262. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  263. }
  264. query := elastic.NewBoolQuery().
  265. //北京,天津,河北,上海,江苏,浙江,安徽
  266. //Must(elastic.NewTermQuery("area", "北京市")).
  267. //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  268. MustNot(
  269. elastic.NewTermQuery("company_type", "个体工商户"),
  270. elastic.NewTermsQuery("company_status", "吊销", "注销"),
  271. )
  272. //Must(elastic.NewTermQuery("company_name", "北京剑鱼信息技术有限公司"))
  273. //Must(elastic.NewTermsQuery("company_area", "河南"))
  274. ctx := context.Background()
  275. searchSource := elastic.NewSearchSource().
  276. Query(query).
  277. Size(10000).
  278. Sort("_doc", true)
  279. searchService := client.Scroll("qyxy").
  280. Size(10000).
  281. Scroll("5m").
  282. SearchSource(searchSource)
  283. jobChan := make(chan InsertJob, WorkerCount*2)
  284. var wg sync.WaitGroup
  285. // 启动工作协程
  286. for i := 0; i < WorkerCount; i++ {
  287. wg.Add(1)
  288. go insertWorker(session, &wg, jobChan)
  289. }
  290. total := 0
  291. for {
  292. res, err := searchService.Do(ctx)
  293. if err == io.EOF {
  294. break
  295. }
  296. if err != nil {
  297. log.Println("scroll error:", err)
  298. break
  299. }
  300. fmt.Println("总数是:", res.TotalHits())
  301. if len(res.Hits.Hits) == 0 {
  302. break
  303. }
  304. job := InsertJob{}
  305. for _, hit := range res.Hits.Hits {
  306. var doc map[string]interface{}
  307. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  308. log.Println("解析失败", err)
  309. continue
  310. }
  311. c1 := Legal{
  312. Id: util.ObjToString(doc["id"]),
  313. Name: util.ObjToString(doc["company_name"]),
  314. Code: util.ObjToString(doc["credit_no"]),
  315. Type: "企业",
  316. }
  317. ////存续、在营、开业、在册
  318. //if strings.Contains(util.ObjToString(doc["company_status"]), "存续") || strings.Contains(util.ObjToString(doc["company_status"]), "在营") || strings.Contains(util.ObjToString(doc["company_status"]), "在册") || strings.Contains(util.ObjToString(doc["company_status"]), "开业") {
  319. // c1.State = "有效"
  320. //} else {
  321. // c1.State = "无效"
  322. //}
  323. if !strings.Contains(c1.Name, "公司") {
  324. continue
  325. }
  326. if c1.Name == "" || c1.Code == "" || strings.Contains(c1.Name, "已除名") {
  327. continue
  328. }
  329. if strings.Contains(util.ObjToString(doc["company_status"]), "吊销") || strings.Contains(util.ObjToString(doc["company_status"]), "注销") {
  330. continue
  331. }
  332. if utf8.RuneCountInString(c1.Name) < 5 {
  333. continue
  334. }
  335. job.Companies = append(job.Companies, c1)
  336. if partners, ok := doc["partners"].([]interface{}); ok {
  337. for _, partner := range partners {
  338. if da, ok := partner.(map[string]interface{}); ok {
  339. if !strings.Contains(util.ObjToString(da["stock_type"]), "自然人") && !strings.Contains(util.ObjToString(da["stock_type"]), "个人") {
  340. if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
  341. continue
  342. }
  343. //1
  344. where1 := map[string]interface{}{
  345. "company_name": util.ObjToString(da["stock_name"]),
  346. }
  347. tmpBase, _ := Mgo181.FindOne("company_base", where1)
  348. if len(*tmpBase) > 0 {
  349. c2 := Legal{
  350. Id: util.ObjToString((*tmpBase)["company_id"]),
  351. Name: util.ObjToString(da["stock_name"]),
  352. Code: util.ObjToString(da["identify_no"]),
  353. Type: "企业",
  354. }
  355. //if strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "存续") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在营") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在册") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "开业") {
  356. // c2.State = "有效"
  357. //} else {
  358. // c2.State = "无效"
  359. //}
  360. job.Companies = append(job.Companies, c2)
  361. //2
  362. where := map[string]interface{}{
  363. "company_name": util.ObjToString(doc["company_name"]),
  364. "stock_name": util.ObjToString(da["stock_name"]),
  365. }
  366. ddd, _ := Mgo181.FindOne("company_partner", where)
  367. if len(*ddd) > 0 {
  368. par := *ddd
  369. invest := Invest{
  370. FromCode: c2.Id,
  371. ToCode: c1.Id,
  372. Ratio: util.Float64All(par["stock_proportion"]),
  373. Amount: ParseStockCapital(util.ObjToString(par["stock_capital"])),
  374. }
  375. job.Relations = append(job.Relations, invest)
  376. }
  377. }
  378. }
  379. }
  380. }
  381. }
  382. }
  383. jobChan <- job
  384. total += len(res.Hits.Hits)
  385. log.Println("处理总量:", total)
  386. }
  387. close(jobChan)
  388. wg.Wait()
  389. log.Println("完成!")
  390. }
  391. func insertWorker2(session *nebula.Session, jobs <-chan InsertJob) {
  392. for job := range jobs {
  393. // 分批插入公司
  394. for i := 0; i < len(job.Companies); i += BatchSize {
  395. end := i + BatchSize
  396. if end > len(job.Companies) {
  397. end = len(job.Companies)
  398. }
  399. BatchInsertCompanies(session, job.Companies[i:end])
  400. //time.Sleep(time.Second * 1)
  401. }
  402. // 分批插入投资关系
  403. for i := 0; i < len(job.Relations); i += BatchSize {
  404. end := i + BatchSize
  405. if end > len(job.Relations) {
  406. end = len(job.Relations)
  407. }
  408. BatchInsertInvestRels(session, job.Relations[i:end])
  409. //time.Sleep(time.Second * 1)
  410. }
  411. }
  412. }
  413. func insertWorker(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan InsertJob) {
  414. defer wg.Done()
  415. for job := range jobs {
  416. // 分批插入公司
  417. for i := 0; i < len(job.Companies); i += BatchSize {
  418. end := i + BatchSize
  419. if end > len(job.Companies) {
  420. end = len(job.Companies)
  421. }
  422. BatchInsertCompanies(session, job.Companies[i:end])
  423. }
  424. // 分批插入投资关系
  425. for i := 0; i < len(job.Relations); i += BatchSize {
  426. end := i + BatchSize
  427. if end > len(job.Relations) {
  428. end = len(job.Relations)
  429. }
  430. BatchInsertInvestRels(session, job.Relations[i:end])
  431. }
  432. }
  433. }
  434. func BatchInsertCompanies(session *nebula.Session, companies []Legal) {
  435. if len(companies) == 0 {
  436. return
  437. }
  438. var sb strings.Builder
  439. sb.WriteString("USE " + Table_Space + "; ")
  440. for _, c := range companies {
  441. sb.WriteString(fmt.Sprintf(`INSERT VERTEX Legal(name, code, type, state) VALUES "%s":("%s", "%s", "%s", "%s");`, c.Id, c.Name, c.Code, c.Type, c.State))
  442. }
  443. _, err := session.Execute(sb.String())
  444. if err != nil {
  445. log.Println("批量插入公司失败:", err)
  446. }
  447. }
  448. func BatchInsertInvestRels(session *nebula.Session, rels []Invest) {
  449. if len(rels) == 0 {
  450. return
  451. }
  452. var sb strings.Builder
  453. sb.WriteString("USE " + Table_Space + "; ")
  454. for _, r := range rels {
  455. sb.WriteString(fmt.Sprintf(`INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f);`, r.FromCode, r.ToCode, r.Amount, r.Ratio))
  456. }
  457. _, err := session.Execute(sb.String())
  458. if err != nil {
  459. log.Println("批量插入投资关系失败:", err)
  460. }
  461. }
  462. type InvestRelationResult struct {
  463. Related bool
  464. Paths []map[string]string
  465. CommonNodes []CommonNodeInfo
  466. }
  467. type CommonNodeInfo struct {
  468. VID string
  469. Name string
  470. }
  471. func CheckInvestRelationWithIntersection(session *nebula.Session, names []string, depth int) (bool, []map[string]string, []string, error) {
  472. if len(names) == 0 || depth <= 0 {
  473. return false, nil, nil, fmt.Errorf("invalid input")
  474. }
  475. // Step 1: 获取所有企业的 VID
  476. vids := make([]string, 0)
  477. vidToName := make(map[string]string)
  478. inputVIDSet := make(map[string]bool)
  479. for _, name := range names {
  480. query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
  481. resp, err := session.Execute(query)
  482. if err != nil || !resp.IsSucceed() {
  483. log.Printf("lookup failed for name %s: %v", name, err)
  484. continue
  485. }
  486. for _, row := range resp.GetRows() {
  487. vid := string(row.Values[0].GetSVal())
  488. vids = append(vids, fmt.Sprintf(`"%s"`, vid))
  489. vidToName[vid] = name
  490. inputVIDSet[vid] = true
  491. }
  492. }
  493. if len(vids) < 2 {
  494. return false, nil, nil, nil // 不足两个公司参与判断
  495. }
  496. // Step 2: 查找路径(双向 Invest)
  497. fromClause := strings.Join(vids, ",")
  498. query := fmt.Sprintf(`
  499. GO FROM %s OVER Invest BIDIRECT UPTO %d STEPS
  500. YIELD src(edge) AS from, dst(edge) AS to
  501. `, fromClause, depth)
  502. resp, err := session.Execute(query)
  503. if err != nil || !resp.IsSucceed() {
  504. return false, nil, nil, fmt.Errorf("GO query failed: %v", err)
  505. }
  506. // Step 3: 统计路径和交集节点
  507. relationPaths := make([]map[string]string, 0)
  508. nodeToSources := make(map[string]map[string]bool) // key: node, value: set of inputVIDs
  509. for _, row := range resp.GetRows() {
  510. from := string(row.Values[0].GetSVal())
  511. to := string(row.Values[1].GetSVal())
  512. // 记录路径
  513. relationPaths = append(relationPaths, map[string]string{
  514. "from": from,
  515. "to": to,
  516. })
  517. // 记录 from 节点来源
  518. if !inputVIDSet[from] {
  519. if nodeToSources[from] == nil {
  520. nodeToSources[from] = make(map[string]bool)
  521. }
  522. for _, vid := range vids {
  523. if from == strings.Trim(vid, `"`) {
  524. continue
  525. }
  526. if strings.Contains(query, vid) {
  527. nodeToSources[from][strings.Trim(vid, `"`)] = true
  528. }
  529. }
  530. }
  531. // 记录 to 节点来源
  532. if !inputVIDSet[to] {
  533. if nodeToSources[to] == nil {
  534. nodeToSources[to] = make(map[string]bool)
  535. }
  536. for _, vid := range vids {
  537. if to == strings.Trim(vid, `"`) {
  538. continue
  539. }
  540. if strings.Contains(query, vid) {
  541. nodeToSources[to][strings.Trim(vid, `"`)] = true
  542. }
  543. }
  544. }
  545. }
  546. // Step 4: 找出出现在多个输入公司路径中的中间节点(交集)
  547. commonNodeVIDs := make([]string, 0)
  548. for node, sourceSet := range nodeToSources {
  549. if len(sourceSet) >= 2 {
  550. commonNodeVIDs = append(commonNodeVIDs, node)
  551. }
  552. }
  553. // Step 5: 查名称
  554. intersectionNames := make([]string, 0)
  555. if len(commonNodeVIDs) > 0 {
  556. query := fmt.Sprintf(`FETCH PROP ON Legal %s YIELD Legal.name`, strings.Join(wrapInQuotes(commonNodeVIDs), ","))
  557. resp, err := session.Execute(query)
  558. if err == nil && resp.IsSucceed() {
  559. for _, row := range resp.GetRows() {
  560. name := string(row.Values[1].GetSVal())
  561. intersectionNames = append(intersectionNames, name)
  562. }
  563. }
  564. }
  565. found := len(intersectionNames) > 0
  566. return found, relationPaths, intersectionNames, nil
  567. }
  568. func wrapInQuotes(ids []string) []string {
  569. result := make([]string, len(ids))
  570. for i, id := range ids {
  571. result[i] = fmt.Sprintf(`"%s"`, id)
  572. }
  573. return result
  574. }
  575. func CheckInvestRelation1(session *nebula.Session, names []string, depth int) (bool, []map[string]string, error) {
  576. if len(names) == 0 || depth <= 0 {
  577. return false, nil, fmt.Errorf("invalid input")
  578. }
  579. // Step 1: 获取所有企业的 VID
  580. vids := make([]string, 0)
  581. nameSet := make(map[string]bool)
  582. for _, name := range names {
  583. query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
  584. resp, err := session.Execute(query)
  585. if err != nil || !resp.IsSucceed() {
  586. log.Printf("lookup failed for name %s: %v", name, err)
  587. continue
  588. }
  589. for _, row := range resp.GetRows() {
  590. //vid := row.Values[0].GetSVal()
  591. vid := string(row.Values[0].GetSVal())
  592. vids = append(vids, fmt.Sprintf(`"%s"`, vid))
  593. nameSet[vid] = true
  594. }
  595. }
  596. if len(vids) == 0 {
  597. return false, nil, nil // 没有查出任何节点
  598. }
  599. // Step 2: 构造 GO 查询路径
  600. fromClause := strings.Join(vids, ",")
  601. query := fmt.Sprintf(`
  602. GO FROM %s OVER Invest UPTO %d STEPS
  603. YIELD src(edge) AS from, dst(edge) AS to
  604. `, fromClause, depth)
  605. resp, err := session.Execute(query)
  606. if err != nil {
  607. return false, nil, fmt.Errorf("GO query failed: %v", err)
  608. }
  609. if !resp.IsSucceed() {
  610. return false, nil, fmt.Errorf("Nebula error: %s", resp.GetErrorMsg())
  611. }
  612. // Step 3: 分析路径结果
  613. resultPaths := make([]map[string]string, 0)
  614. found := false
  615. for _, row := range resp.GetRows() {
  616. from := string(row.Values[0].GetSVal())
  617. to := string(row.Values[1].GetSVal())
  618. if nameSet[from] && nameSet[to] && from != to {
  619. found = true
  620. }
  621. resultPaths = append(resultPaths, map[string]string{
  622. "from": from,
  623. "to": to,
  624. })
  625. }
  626. return found, resultPaths, nil
  627. }
  628. func FindInvestmentRelations() {
  629. }
  630. //type PathRelation struct {
  631. // Companies []string
  632. // Paths []string
  633. //}
  634. //
  635. //func CheckLegalRelationsGraph(session *nebula.Session, names []string, deep int) (*PathRelation, error) {
  636. // // 查询 name -> vid 映射
  637. // nameToVid := make(map[string]string)
  638. // vidToName := make(map[string]string)
  639. // for _, name := range names {
  640. // vid, err := getVidByName(session, name)
  641. // if err != nil {
  642. // log.Printf("获取 %s 的 VID 失败: %v", name, err)
  643. // continue
  644. // }
  645. // nameToVid[name] = vid
  646. // vidToName[vid] = name
  647. // }
  648. //
  649. // allPaths := [][]string{}
  650. // checked := make(map[string]bool)
  651. //
  652. // // 遍历所有组合
  653. // for i := 0; i < len(names); i++ {
  654. // for j := i + 1; j < len(names); j++ {
  655. // a, b := names[i], names[j]
  656. // vidA, okA := nameToVid[a]
  657. // vidB, okB := nameToVid[b]
  658. // if !okA || !okB {
  659. // continue
  660. // }
  661. // key := vidA + "|" + vidB
  662. // if checked[key] {
  663. // continue
  664. // }
  665. // checked[key] = true
  666. //
  667. // if pathAB, _ := findPath(session, vidA, vidB, deep); len(pathAB) > 0 {
  668. // allPaths = append(allPaths, pathAB)
  669. // }
  670. // if pathBA, _ := findPath(session, vidB, vidA, deep); len(pathBA) > 0 {
  671. // allPaths = append(allPaths, pathBA)
  672. // }
  673. //
  674. // // 共同上级路径
  675. // common, commonPaths := checkCommonAncestor(session, vidA, vidB, deep)
  676. // if common {
  677. // allPaths = append(allPaths, commonPaths)
  678. // }
  679. // }
  680. // }
  681. //
  682. // // 1. 收集所有涉及的 VID
  683. // vidSet := make(map[string]struct{})
  684. // for _, path := range allPaths {
  685. // for _, vid := range path {
  686. // vidSet[vid] = struct{}{}
  687. // }
  688. // }
  689. //
  690. // // 2. 获取所有 VID 的公司名
  691. // for vid := range vidSet {
  692. // if _, ok := vidToName[vid]; ok {
  693. // continue
  694. // }
  695. // query := fmt.Sprintf(`FETCH PROP ON Legal "%s" YIELD Legal.name`, vid)
  696. // resp, err := session.Execute(query)
  697. // if err != nil || resp.IsEmpty() {
  698. // continue
  699. // }
  700. // rows := resp.GetRows()
  701. // if len(rows) > 0 && len(rows[0].Values) > 0 && rows[0].Values[0].SVal != nil {
  702. // vidToName[vid] = string(rows[0].Values[0].SVal)
  703. // }
  704. // }
  705. //
  706. // // 3. 清洗路径并格式化输出
  707. // companySet := make(map[string]struct{})
  708. // result := &PathRelation{
  709. // Companies: []string{},
  710. // Paths: []string{},
  711. // }
  712. //
  713. // for _, path := range allPaths {
  714. // namesPath := []string{}
  715. // last := ""
  716. // for _, vid := range path {
  717. // name, ok := vidToName[vid]
  718. // if !ok {
  719. // continue
  720. // }
  721. // if name == last {
  722. // continue // 去除重复节点
  723. // }
  724. // namesPath = append(namesPath, name)
  725. // last = name
  726. // companySet[name] = struct{}{}
  727. // }
  728. // if len(namesPath) >= 2 {
  729. // result.Paths = append(result.Paths, strings.Join(namesPath, "->"))
  730. // }
  731. // }
  732. //
  733. // for name := range companySet {
  734. // result.Companies = append(result.Companies, name)
  735. // }
  736. // sort.Strings(result.Companies)
  737. // return result, nil
  738. //}
  739. //
  740. //func checkCommonAncestor(session *nebula.Session, aVid, bVid string, deep int) (bool, []string) {
  741. // query := fmt.Sprintf(`
  742. // (
  743. // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
  744. // )
  745. // INTERSECT
  746. // (
  747. // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor
  748. // );
  749. // `, deep, aVid, deep, bVid)
  750. //
  751. // resp, err := session.Execute(query)
  752. // if err != nil {
  753. // return false, nil
  754. // }
  755. // ancestors, err := getFirstColumnStrings(resp)
  756. // if err != nil || len(ancestors) == 0 {
  757. // return false, nil
  758. // }
  759. //
  760. // // 只返回第一个共同祖先的简单路径:a->ancestor->b
  761. // return true, []string{aVid, ancestors[0], bVid}
  762. //}
  763. //
  764. //func findPath(session *nebula.Session, fromVid, toVid string, maxStep int) ([]string, error) {
  765. // query := fmt.Sprintf(`FIND ALL PATH FROM "%s" TO "%s" OVER Invest UPTO %d STEPS YIELD path as p`, fromVid, toVid, maxStep)
  766. // resp, err := session.Execute(query)
  767. // if err != nil {
  768. // return nil, err
  769. // }
  770. // return getFirstColumnStrings(resp)
  771. //}
  772. //
  773. //func getVidByName(session *nebula.Session, name string) (string, error) {
  774. // query := fmt.Sprintf(`
  775. //USE `+Table_Space+`;
  776. //LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name)
  777. // resp, err := session.Execute(query)
  778. // if err != nil {
  779. // return "", err
  780. // }
  781. //
  782. // values, err := getFirstColumnStrings(resp)
  783. // if err != nil || len(values) == 0 {
  784. // return "", fmt.Errorf("未找到公司: %s", name)
  785. // }
  786. // return values[0], nil
  787. //}
  788. //
  789. //func getFirstColumnStrings(resp *nebula.ResultSet) ([]string, error) {
  790. // if resp == nil {
  791. // return nil, fmt.Errorf("result set is nil")
  792. // }
  793. //
  794. // var values []string
  795. // for _, row := range resp.GetRows() {
  796. // if len(row.Values) == 0 {
  797. // continue
  798. // }
  799. // val := row.Values[0]
  800. // switch {
  801. // case val.SVal != nil:
  802. // values = append(values, string(val.SVal))
  803. // case val.IVal != nil:
  804. // values = append(values, fmt.Sprintf("%d", *val.IVal))
  805. // case val.BVal != nil:
  806. // values = append(values, fmt.Sprintf("%v", *val.BVal))
  807. // default:
  808. // log.Printf("未知类型值: %+v", val)
  809. // }
  810. // }
  811. // return values, nil
  812. //}