main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "github.com/olivere/elastic/v7"
  8. nebula "github.com/vesoft-inc/nebula-go/v3"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "net/http"
  14. "regexp"
  15. "strconv"
  16. "strings"
  17. )
  18. var (
  19. re = regexp.MustCompile(`[-+]?[0-9]*\.?[0-9]+`)
  20. HostList = []nebula.HostAddress{{Host: "114.116.213.97", Port: 9669}}
  21. //HostList = []nebula.HostAddress{{Host: "127.0.0.1", Port: 9669}}
  22. UserName = "root"
  23. PassWord = "jianyu@123"
  24. Mgo181 *mongodb.MongodbSim
  25. //Table_Space = "legal_profile"
  26. Table_Space = "legal_profile"
  27. WorkerCount = 5
  28. BatchSize = 100
  29. )
  30. // Legal 代表公司节点的结构体
  31. type Legal struct {
  32. Id string
  33. Name string
  34. Code string
  35. Type string //类型,企业,事业单哪位,政府部门
  36. State string //状态,有效/无效;不是存续、在营、开业、在册
  37. }
  38. // Invest 代表公司之间的投资关系边的结构体
  39. type Invest struct {
  40. FromCode string
  41. ToCode string
  42. Amount float64
  43. Ratio float64
  44. }
  45. // SuspectInvest 疑似关系
  46. type SuspectInvest struct {
  47. FromCode string `json:"from_code"`
  48. ToCode string `json:"to_code"`
  49. Reason string `json:"reason"` //原因,手机、邮箱相同
  50. }
  51. // InsertSuspectJob 处理疑似投资
  52. type InsertSuspectJob struct {
  53. Relations SuspectInvest
  54. }
  55. // ExecutivesInvest 高管关系结构体
  56. type ExecutivesInvest struct {
  57. FromCode string `json:"from_code"`
  58. ToCode string `json:"to_code"`
  59. Name string `json:"name"` //姓名
  60. }
  61. // InsertJob 批量处理投资关系的点 和 边关系
  62. type InsertJob struct {
  63. Companies []Legal
  64. Relations []Invest
  65. }
  66. type InvestVertex struct { //顶点
  67. id string
  68. company_id string
  69. company_name string
  70. credit_no string
  71. }
  72. type InvestEdge struct { //边
  73. company_id string
  74. company_name string
  75. stock_id string
  76. stock_name string
  77. stock_rate float64
  78. stock_amount float64
  79. stock_level int
  80. stock_type int //0企业股东 1自然人股东
  81. }
  82. // ConnectToNebula 封装数据库连接函数
  83. func ConnectToNebula(hosts []nebula.HostAddress, username, password string) (*nebula.Session, *nebula.ConnectionPool, error) {
  84. // 创建连接池配置
  85. config := nebula.GetDefaultConf()
  86. config.UseHTTP2 = false
  87. config.HandshakeKey = ""
  88. // 初始化连接池
  89. pool, err := nebula.NewConnectionPool(hosts, config, nebula.DefaultLogger{})
  90. if err != nil {
  91. return nil, nil, err
  92. }
  93. // 获取会话
  94. session, err := pool.GetSession(username, password)
  95. if err != nil {
  96. pool.Close()
  97. return nil, nil, err
  98. }
  99. return session, pool, nil
  100. }
  101. type CheckRequest struct {
  102. Names []string `json:"names"`
  103. Deep int `json:"deep"`
  104. Stype int `json:"stype"` //0.简易模式,匹配到直接返回;1.匹配完所有的数据
  105. }
  106. type CheckResponse struct {
  107. Code int `json:"code"`
  108. Data []string `json:"data"`
  109. Msg string `json:"msg"`
  110. }
  111. // SuspectInvestResponse 意思关系返回结果
  112. type SuspectInvestResponse struct {
  113. Code int `json:"code"`
  114. Data []interface{} `json:"data"`
  115. Msg string `json:"msg"`
  116. }
  117. func main() {
  118. handHttp()
  119. return
  120. //dealYS()
  121. //dealTsDongShi() //董事高管
  122. //log.Println("处理完毕")
  123. //return
  124. //InitMgo() //初始化 MongoDB
  125. //dealTsGraph() //处理疑似关系图谱
  126. //dealDsGraph() //处理高管 关系到 图形数据库
  127. //---------------//
  128. //dda()
  129. //dealCompanyBase22()
  130. //dealCompanyBase() //迭代company_base 处理企业数据
  131. //batchDealGraph() // 迭代es 处理企业数据;
  132. //
  133. //log.Println("数据处理完毕!!!!!!!")
  134. //return
  135. //-----------------------------------------//
  136. //3、改造方法,使用连接池,避免session过去//
  137. // 初始化 Gin 路由
  138. //r := gin.Default()
  139. //
  140. //client, err := NewNebulaClient(HostList, UserName, PassWord)
  141. //if err != nil {
  142. // log.Fatal("连接失败:", err)
  143. //}
  144. //defer client.Close()
  145. //// 注册 POST 接口
  146. //// 提供 HTML 页面
  147. ////r.GET("/legal/graph", func(c *gin.Context) {
  148. //// c.HTML(http.StatusOK, "graph.html", nil)
  149. ////})
  150. //r.POST("/check-relations", func(c *gin.Context) {
  151. // var req CheckRequest
  152. // if err := c.ShouldBindJSON(&req); err != nil {
  153. // c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
  154. // return
  155. // }
  156. //
  157. // has, results, err := client.CheckLegalRelationships(req.Names, req.Deep, req.Stype)
  158. // if err != nil {
  159. // res := CheckResponse{
  160. // Code: -1,
  161. // Data: results,
  162. // Msg: "请求失败;" + err.Error(),
  163. // }
  164. // c.JSON(http.StatusInternalServerError, res)
  165. // return
  166. // }
  167. //
  168. // res := CheckResponse{
  169. // Code: 200,
  170. // Data: results,
  171. // }
  172. // if has {
  173. // res.Msg = "存在投资关系"
  174. // } else {
  175. // res.Msg = "不存在投资关系"
  176. // }
  177. //
  178. // c.JSON(http.StatusOK, res)
  179. //})
  180. //// 启动服务
  181. //r.Run(":8080")
  182. }
  183. // handHttp 对外提供 HTTP接口
  184. func handHttp() {
  185. // 初始化 Gin 路由
  186. r := gin.Default()
  187. client, err := NewNebulaClient(HostList, UserName, PassWord)
  188. if err != nil {
  189. log.Fatal("连接失败:", err)
  190. }
  191. defer client.Close()
  192. //1、投资关系查询
  193. r.POST("/check-relations", func(c *gin.Context) {
  194. var req CheckRequest
  195. if err := c.ShouldBindJSON(&req); err != nil {
  196. c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
  197. return
  198. }
  199. has, results, err := client.CheckLegalRelationships(req.Names, req.Deep, req.Stype)
  200. if err != nil {
  201. res := CheckResponse{
  202. Code: -1,
  203. Data: results,
  204. Msg: "请求失败;" + err.Error(),
  205. }
  206. c.JSON(http.StatusInternalServerError, res)
  207. return
  208. }
  209. res := CheckResponse{
  210. Code: 200,
  211. Data: results,
  212. }
  213. if has {
  214. res.Msg = "存在投资关系"
  215. } else {
  216. res.Msg = "不存在投资关系"
  217. }
  218. c.JSON(http.StatusOK, res)
  219. })
  220. //2、疑似关系 查询
  221. r.POST("/yisi-relations", func(c *gin.Context) {
  222. var req CheckRequest
  223. if err := c.ShouldBindJSON(&req); err != nil {
  224. c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
  225. return
  226. }
  227. results, err := client.FindSuspectInvestRelationsByNames(req.Names)
  228. if err != nil {
  229. res := map[string]interface{}{
  230. "code": 200,
  231. "msg": "请求失败",
  232. "data": err.Error(),
  233. }
  234. c.JSON(http.StatusInternalServerError, res)
  235. return
  236. } else {
  237. res := map[string]interface{}{
  238. "code": 200,
  239. "msg": "请求成功",
  240. "data": results,
  241. }
  242. c.JSON(http.StatusOK, res)
  243. }
  244. })
  245. //--------------------------//
  246. // 启动服务
  247. r.Run(":8080")
  248. }
  249. // dda 针对某个企业,单独生节点数据
  250. func dda() {
  251. name := "北京拓普丰联信息科技股份有限公司"
  252. rea, resb := GetInvByLevel(name, 5, 0, false)
  253. // 调用封装的连接函数
  254. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  255. if err != nil {
  256. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  257. }
  258. defer pool.Close()
  259. defer session.Release()
  260. for _, v := range rea {
  261. d := Legal{
  262. Id: v.company_id,
  263. Name: v.company_name,
  264. Code: v.credit_no,
  265. Type: "企业",
  266. }
  267. res, err := InsertCompany(session, d)
  268. if err != nil {
  269. log.Println(err, res)
  270. }
  271. }
  272. for _, v := range resb {
  273. d := Invest{
  274. FromCode: v.stock_name,
  275. ToCode: v.company_name,
  276. Amount: v.stock_amount,
  277. Ratio: v.stock_rate,
  278. }
  279. err := InsertInvestRel(session, d)
  280. if err != nil {
  281. log.Println(err, d)
  282. }
  283. }
  284. }
  285. // getQyxytData 获取企业数据
  286. func getQyxytData() {
  287. // 调用封装的连接函数
  288. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  289. if err != nil {
  290. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  291. }
  292. defer pool.Close()
  293. defer session.Release()
  294. url := "http://172.17.4.184:19908"
  295. //url := "http://127.0.0.1:19908"
  296. username := "jybid"
  297. password := "Top2023_JEB01i@31"
  298. index := "qyxy" //索引名称
  299. // 创建 Elasticsearch 客户端
  300. client, err := elastic.NewClient(
  301. elastic.SetURL(url),
  302. elastic.SetBasicAuth(username, password),
  303. elastic.SetSniff(false),
  304. )
  305. if err != nil {
  306. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  307. }
  308. //---------------------------//
  309. //query := elastic.NewBoolQuery()
  310. //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  311. //query.Must(elastic.NewTermQuery("company_city", "北京市"))
  312. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  313. query := elastic.NewBoolQuery().
  314. //北京,天津,河北,上海,江苏,浙江,安徽
  315. //Must(elastic.NewTermQuery("area", "北京市")).
  316. //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  317. MustNot(
  318. elastic.NewTermQuery("company_type", "个体工商户"),
  319. ).
  320. //Must(elastic.NewTermQuery("company_name", "河南拓普计算机网络工程有限公司"))
  321. Must(elastic.NewTermsQuery("company_area", "河南"))
  322. ctx := context.Background()
  323. //开始滚动搜索
  324. scrollID := ""
  325. scroll := "10m"
  326. searchSource := elastic.NewSearchSource().
  327. Query(query).
  328. Size(10000).
  329. Sort("_doc", true) //升序排序
  330. //Sort("_doc", false) //降序排序
  331. searchService := client.Scroll(index).
  332. Size(10000).
  333. Scroll(scroll).
  334. SearchSource(searchSource)
  335. res, err := searchService.Do(ctx)
  336. if err != nil {
  337. if err == io.EOF {
  338. fmt.Println("没有数据")
  339. } else {
  340. panic(err)
  341. }
  342. }
  343. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  344. fmt.Println("总数是:", res.TotalHits())
  345. total := 0
  346. for len(res.Hits.Hits) > 0 {
  347. for _, hit := range res.Hits.Hits {
  348. var doc map[string]interface{}
  349. err := json.Unmarshal(hit.Source, &doc)
  350. if err != nil {
  351. log.Printf("解析文档失败:%s", err)
  352. continue
  353. }
  354. company1 := Legal{
  355. Name: util.ObjToString(doc["company_name"]),
  356. Code: util.ObjToString(doc["credit_no"]),
  357. Type: "企业",
  358. }
  359. /**
  360. 1.stock_name_id 为空,直接跳过
  361. 2.stock_name 为空,直接跳过
  362. 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
  363. 4.stock_name 不含中文,跳过
  364. */
  365. if util.ObjToString(doc["company_name"]) == "" || util.ObjToString(doc["credit_no"]) == "" {
  366. continue
  367. }
  368. if strings.Contains(util.ObjToString(doc["company_name"]), "已除名") {
  369. continue
  370. }
  371. res1, err1 := InsertCompany(session, company1)
  372. if err != nil {
  373. log.Println("InsertCompany err", res1, err1)
  374. }
  375. //边
  376. if partners, ok := doc["partners"].([]interface{}); ok {
  377. for _, partner := range partners {
  378. if da, ok := partner.(map[string]interface{}); ok {
  379. if util.ObjToString(da["stock_type"]) == "企业法人" {
  380. if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
  381. continue
  382. } else {
  383. company2 := Legal{
  384. Name: util.ObjToString(da["stock_name"]),
  385. Code: util.ObjToString(da["identify_no"]),
  386. Type: "企业",
  387. }
  388. res2, err2 := InsertCompany(session, company2)
  389. if err2 != nil {
  390. log.Println("InsertCompany err", res2, err2)
  391. }
  392. //
  393. if err1 != nil || err2 != nil {
  394. continue
  395. }
  396. where := map[string]interface{}{
  397. "company_name": util.ObjToString(doc["company_name"]),
  398. "stock_name": util.ObjToString(da["stock_name"]),
  399. }
  400. ddd, _ := Mgo181.FindOne("company_partner", where)
  401. if len(*ddd) > 0 {
  402. par := *ddd
  403. amount := ParseStockCapital(util.ObjToString(par["stock_capital"]))
  404. investRel := Invest{FromCode: util.ObjToString(da["stock_name"]), ToCode: util.ObjToString(doc["company_name"]), Ratio: util.Float64All(par["stock_proportion"]), Amount: amount}
  405. err = InsertInvestRel(session, investRel)
  406. if err != nil {
  407. log.Println("InsertInvestRel", err, investRel)
  408. }
  409. }
  410. }
  411. }
  412. }
  413. }
  414. }
  415. }
  416. total = total + len(res.Hits.Hits)
  417. scrollID = res.ScrollId
  418. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  419. log.Println("current count:", total)
  420. if err != nil {
  421. if err == io.EOF {
  422. // 滚动到最后一批数据,退出循环
  423. break
  424. }
  425. log.Println("滚动搜索失败:", err, res)
  426. break // 处理错误时退出循环
  427. }
  428. }
  429. // 在循环外调用 ClearScroll
  430. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  431. if err != nil {
  432. log.Printf("清理滚动搜索失败:%s", err)
  433. }
  434. log.Println("结束~~~~~~~~~~~~~~~")
  435. }
  436. // dealCompanyPartner 处理企业投资关系
  437. func dealCompanyPartner() {
  438. // 调用封装的连接函数
  439. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  440. if err != nil {
  441. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  442. }
  443. defer pool.Close()
  444. defer session.Release()
  445. //log.Println("session", session)
  446. defer util.Catch()
  447. sess := Mgo181.GetMgoConn()
  448. defer Mgo181.DestoryMongoConn(sess)
  449. it := sess.DB("mixdata").C("company_partner").Find(nil).Select(nil).Iter()
  450. count := 0
  451. //realNum := 0
  452. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  453. if count%10000 == 0 {
  454. log.Println("current:", count, tmp["stock_name"], tmp["company_name"])
  455. }
  456. //个人企业跳过
  457. if util.IntAll(tmp["is_personal"]) == 1 {
  458. continue
  459. }
  460. if util.IntAll(tmp["use_flag"]) > 0 {
  461. continue
  462. }
  463. company1 := Legal{
  464. Name: util.ObjToString(tmp["stock_name"]),
  465. Code: util.ObjToString(tmp["stock_name_id"]),
  466. }
  467. /**
  468. 1.stock_name_id 为空,直接跳过
  469. 2.stock_name 为空,直接跳过
  470. 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
  471. 4.stock_name 不含中文,跳过
  472. */
  473. company2 := Legal{
  474. Name: util.ObjToString(tmp["company_name"]),
  475. Code: util.ObjToString(tmp["company_id"]),
  476. }
  477. res1, err1 := InsertCompany(session, company1)
  478. if err != nil {
  479. log.Println("InsertCompany err", res1, err1)
  480. }
  481. res2, err2 := InsertCompany(session, company2)
  482. if err != nil {
  483. log.Println("InsertCompany err", res2, err2)
  484. }
  485. if err1 != nil || err2 != nil {
  486. continue
  487. }
  488. //边
  489. amount := ParseStockCapital(util.ObjToString(tmp["stock_capital"]))
  490. investRel := Invest{FromCode: res1, ToCode: res2, Ratio: util.Float64All(tmp["stock_proportion"]), Amount: amount}
  491. err = InsertInvestRel(session, investRel)
  492. if err != nil {
  493. log.Println("InsertInvestRel", err, investRel)
  494. }
  495. }
  496. }
  497. // InsertCompany 插入公司节点的方法
  498. func InsertCompany(session *nebula.Session, company Legal) (string, error) {
  499. // 构建插入公司节点的查询
  500. //insertCompanyStmt := `
  501. // USE ` + Table_Space + `;
  502. // INSERT VERTEX company(company_id,name) VALUES "%s":("%s", "%s");
  503. //`
  504. //insertCompanyStmt = fmt.Sprintf(insertCompanyStmt, inv.id, inv.company_id, inv.company_name)
  505. query := fmt.Sprintf(`
  506. USE `+Table_Space+`;
  507. INSERT VERTEX Legal(name, code, type, state ) VALUES "%s":("%s", "%s", "%s", "%s")
  508. `, company.Name, company.Name, company.Code, company.Type, company.State)
  509. // 执行查询
  510. result, err := session.Execute(query)
  511. if err != nil {
  512. log.Println("InsertCompany", result)
  513. return "", err
  514. }
  515. // 打印返回结果
  516. //fmt.Println("Insert Company Result:", result)
  517. // 返回节点ID(通常可以通过返回的结果中的 "_vid" 字段获取)
  518. return company.Name, nil
  519. }
  520. // InsertInvestRel 插入投资关系边的方法
  521. func InsertInvestRel(session *nebula.Session, investRel Invest) error {
  522. // 构建插入投资关系边的查询
  523. query := fmt.Sprintf(`
  524. USE `+Table_Space+`;
  525. INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f)
  526. `, investRel.FromCode, investRel.ToCode, investRel.Amount, investRel.Ratio)
  527. // 执行查询
  528. result, err := session.Execute(query)
  529. if err != nil {
  530. log.Println("InsertInvestRel", result)
  531. return err
  532. }
  533. // 打印返回结果
  534. //fmt.Println("Insert InvestRel Result:", result)
  535. return nil
  536. }
  537. func ParseStockCapital(raw string) float64 {
  538. raw = strings.TrimSpace(raw)
  539. // 默认单位:万元人民币
  540. exchangeRateUSD := 7.0
  541. // 匹配数值部分(可能带小数)
  542. re := regexp.MustCompile(`([\d.]+)`)
  543. match := re.FindStringSubmatch(raw)
  544. if len(match) < 2 {
  545. return 0
  546. }
  547. value, _ := strconv.ParseFloat(match[1], 64)
  548. // 判断单位并转换
  549. switch {
  550. case strings.Contains(raw, "万美元"):
  551. value *= exchangeRateUSD // 转换成人民币
  552. case strings.Contains(raw, "元") || strings.Contains(raw, "人民币"):
  553. if strings.Contains(raw, "万元") || strings.Contains(raw, "万") {
  554. // 已经是万元单位,无需处理
  555. } else {
  556. // 是“元”,需要除以1万
  557. value = value / 10000
  558. }
  559. default:
  560. // 可能是纯数字,默认视为“万元”
  561. }
  562. return value
  563. }
  564. /*
  565. 根据公司名称和层级向上挖掘,获取顶点和边;
  566. maxLevel 挖掘层级数量;
  567. direction 0:双向挖掘 -1:向上挖掘 1:向下挖掘
  568. person true:保留自然人股东 false:不保留自然人股东
  569. */
  570. func GetInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
  571. verter := map[string]InvestVertex{}
  572. edges := []InvestEdge{}
  573. if direction == 0 {
  574. v1, e1 := getInvByLevel(company_name, maxLevel, 1, person)
  575. v2, e2 := getInvByLevel(company_name, maxLevel, -1, person)
  576. for k, v := range v1 {
  577. verter[k] = v
  578. }
  579. for k, v := range v2 {
  580. verter[k] = v
  581. }
  582. edges = append(edges, e1...)
  583. edges = append(edges, e2...)
  584. } else {
  585. verter, edges = getInvByLevel(company_name, maxLevel, direction, person)
  586. }
  587. return verter, edges
  588. }
  589. func getInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
  590. data, _ := Mgo181.FindOne("company_base", map[string]interface{}{
  591. "company_name": company_name,
  592. })
  593. company_id := fmt.Sprint((*data)["company_id"])
  594. credit_no := fmt.Sprint((*data)["credit_no"])
  595. var edges = []InvestEdge{} //记录边
  596. var verter = map[string]InvestVertex{} //有效顶点
  597. // 初始化队列和访问记录
  598. type node struct {
  599. companyID, companyName, creditNo string
  600. level int
  601. }
  602. queue := []node{{companyID: company_id, companyName: company_name, creditNo: credit_no, level: 1}}
  603. visited := make(map[string]bool)
  604. for len(queue) > 0 {
  605. current := queue[0]
  606. if _, ok := verter[current.companyID]; !ok {
  607. verter[current.companyID] = InvestVertex{
  608. id: current.companyID,
  609. company_id: current.companyID,
  610. company_name: current.companyName,
  611. credit_no: current.creditNo,
  612. }
  613. }
  614. queue = queue[1:]
  615. if visited[current.companyID] || current.level > maxLevel { // 防止重复处理和超过最大层级
  616. continue
  617. }
  618. visited[current.companyID] = true
  619. query := map[string]interface{}{"company_id": current.companyID}
  620. if direction > 0 {
  621. query = map[string]interface{}{"stock_name_id": current.companyID}
  622. }
  623. partners, _ := Mgo181.Find("company_partner", query, nil, nil, false, -1, -1)
  624. // 处理股东数据
  625. for _, p := range *partners {
  626. //log.Println(direction, p)
  627. if fmt.Sprint(p["is_history"]) == "1" {
  628. continue
  629. }
  630. // 构建投资关系
  631. inv := InvestEdge{
  632. company_id: fmt.Sprint(p["company_id"]),
  633. company_name: fmt.Sprint(p["company_name"]),
  634. stock_id: fmt.Sprint(p["stock_name_id"]),
  635. stock_name: fmt.Sprint(p["stock_name"]),
  636. stock_rate: convertStockCapitalToFloat(fmt.Sprint(p["stock_proportion"])),
  637. stock_amount: convertStockCapitalToFloat(fmt.Sprint(p["stock_capital"])),
  638. stock_level: current.level,
  639. stock_type: 0, // 默认机构股东
  640. }
  641. edges = append(edges, inv)
  642. // 根据股东类型是否继续挖掘
  643. if fmt.Sprint(p["stock_type"]) == "自然人股东" || convertStockCapitalToFloat(fmt.Sprint(p["is_personal"])) > 0 {
  644. inv.stock_type = 1
  645. if _, ok := verter[inv.stock_id]; !ok && person {
  646. verter[inv.stock_id] = InvestVertex{
  647. id: inv.stock_id,
  648. company_id: inv.stock_id,
  649. company_name: inv.stock_name,
  650. }
  651. }
  652. } else {
  653. where1 := map[string]interface{}{
  654. "company_name": inv.company_name,
  655. }
  656. where2 := map[string]interface{}{
  657. "company_name": inv.stock_name,
  658. }
  659. company, _ := Mgo181.FindOne("company_base", where1)
  660. stock, _ := Mgo181.FindOne("company_base", where2)
  661. // 机构股东加入队列继续穿透
  662. if direction > 0 { //向下挖掘
  663. if !visited[inv.company_id] {
  664. queue = append(queue, node{
  665. companyID: inv.company_id,
  666. companyName: inv.company_name,
  667. creditNo: util.ObjToString((*company)["credit_no"]),
  668. level: current.level + 1,
  669. })
  670. }
  671. } else { //向上挖掘
  672. if !visited[inv.stock_id] {
  673. queue = append(queue, node{
  674. companyID: inv.stock_id,
  675. companyName: inv.stock_name,
  676. creditNo: util.ObjToString((*stock)["credit_no"]),
  677. level: current.level + 1,
  678. })
  679. }
  680. }
  681. }
  682. }
  683. //log.Printf("已处理层级%d,当前队列深度%d", current.level, len(queue))
  684. }
  685. return verter, edges
  686. }
  687. func convertStockCapitalToFloat(str string) float64 {
  688. // 查找匹配的数字
  689. match := re.FindString(str)
  690. if match == "" {
  691. return 0
  692. }
  693. // 将匹配到的数字字符串转换为浮点数
  694. result, err := strconv.ParseFloat(match, 64)
  695. if err != nil {
  696. return 0
  697. }
  698. return result
  699. }