main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "github.com/olivere/elastic/v7"
  8. nebula "github.com/vesoft-inc/nebula-go/v3"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "net/http"
  14. "regexp"
  15. "strconv"
  16. "strings"
  17. )
  18. var (
  19. re = regexp.MustCompile(`[-+]?[0-9]*\.?[0-9]+`)
  20. HostList = []nebula.HostAddress{{Host: "114.116.213.97", Port: 9669}}
  21. //HostList = []nebula.HostAddress{{Host: "127.0.0.1", Port: 9669}}
  22. UserName = "root"
  23. PassWord = "jianyu@123"
  24. Mgo181 *mongodb.MongodbSim
  25. //Table_Space = "legal_profile"
  26. Table_Space = "legal_profile"
  27. WorkerCount = 5
  28. BatchSize = 100
  29. )
  30. // Legal 代表公司节点的结构体
  31. type Legal struct {
  32. Id string
  33. Name string
  34. Code string
  35. Type string //类型,企业,事业单哪位,政府部门
  36. State string //状态,有效/无效;不是存续、在营、开业、在册
  37. }
  38. // Invest 代表公司之间的投资关系边的结构体
  39. type Invest struct {
  40. FromCode string
  41. ToCode string
  42. Amount float64
  43. Ratio float64
  44. }
  45. type InsertJob struct {
  46. Companies []Legal
  47. Relations []Invest
  48. }
  49. type InvestVertex struct { //顶点
  50. id string
  51. company_id string
  52. company_name string
  53. credit_no string
  54. }
  55. type InvestEdge struct { //边
  56. company_id string
  57. company_name string
  58. stock_id string
  59. stock_name string
  60. stock_rate float64
  61. stock_amount float64
  62. stock_level int
  63. stock_type int //0企业股东 1自然人股东
  64. }
  65. // ConnectToNebula 封装数据库连接函数
  66. func ConnectToNebula(hosts []nebula.HostAddress, username, password string) (*nebula.Session, *nebula.ConnectionPool, error) {
  67. // 创建连接池配置
  68. config := nebula.GetDefaultConf()
  69. config.UseHTTP2 = false
  70. config.HandshakeKey = ""
  71. // 初始化连接池
  72. pool, err := nebula.NewConnectionPool(hosts, config, nebula.DefaultLogger{})
  73. if err != nil {
  74. return nil, nil, err
  75. }
  76. // 获取会话
  77. session, err := pool.GetSession(username, password)
  78. if err != nil {
  79. pool.Close()
  80. return nil, nil, err
  81. }
  82. return session, pool, nil
  83. }
  84. type CheckRequest struct {
  85. Names []string `json:"names"`
  86. Deep int `json:"deep"`
  87. Stype int `json:"stype"` //0.简易模式,匹配到直接返回;1.匹配完所有的数据
  88. }
  89. type CheckResponse struct {
  90. Code int `json:"code"`
  91. Data []string `json:"data"`
  92. Msg string `json:"msg"`
  93. }
  94. func main() {
  95. //InitMgo()
  96. //dda()
  97. //dealCompanyBase22()
  98. //dealCompanyBase() //迭代company_base 处理企业数据
  99. //batchDealGraph() // 迭代es 处理企业数据;
  100. //
  101. //log.Println("数据处理完毕!!!!!!!")
  102. //return
  103. //2、封装对外提供的HTTP
  104. //session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  105. //if err != nil {
  106. // log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  107. //}
  108. //defer pool.Close()
  109. //defer session.Release()
  110. //// 初始化 Gin 路由
  111. //r := gin.Default()
  112. //// 注册 POST 接口
  113. //r.POST("/check-relations", func(c *gin.Context) {
  114. // var req CheckRequest
  115. // if err := c.ShouldBindJSON(&req); err != nil {
  116. // c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
  117. // return
  118. // }
  119. //
  120. // has, results, err := CheckLegalRelationships4(session, req.Names, req.Deep, req.Stype)
  121. // if err != nil {
  122. // res := CheckResponse{
  123. // Code: -1,
  124. // Data: results,
  125. // Msg: "请求失败",
  126. // }
  127. // c.JSON(http.StatusInternalServerError, res)
  128. // return
  129. // }
  130. //
  131. // res := CheckResponse{
  132. // Code: 200,
  133. // Data: results,
  134. // }
  135. // if has {
  136. // res.Msg = "存在投资关系"
  137. // } else {
  138. // res.Msg = "不存在投资关系"
  139. // }
  140. //
  141. // c.JSON(http.StatusOK, res)
  142. //})
  143. //3、改造方法,使用连接池,避免session过去//
  144. // 初始化 Gin 路由
  145. r := gin.Default()
  146. // 加载模板文件(你可以自定义路径)
  147. r.LoadHTMLGlob("templates/*")
  148. client, err := NewNebulaClient(HostList, UserName, PassWord)
  149. if err != nil {
  150. log.Fatal("连接失败:", err)
  151. }
  152. defer client.Close()
  153. // 注册 POST 接口
  154. // 提供 HTML 页面
  155. r.GET("/legal/graph", func(c *gin.Context) {
  156. c.HTML(http.StatusOK, "graph.html", nil)
  157. })
  158. r.POST("/check-relations", func(c *gin.Context) {
  159. var req CheckRequest
  160. if err := c.ShouldBindJSON(&req); err != nil {
  161. c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
  162. return
  163. }
  164. has, results, err := client.CheckLegalRelationships(req.Names, req.Deep, req.Stype)
  165. if err != nil {
  166. res := CheckResponse{
  167. Code: -1,
  168. Data: results,
  169. Msg: "请求失败;" + err.Error(),
  170. }
  171. c.JSON(http.StatusInternalServerError, res)
  172. return
  173. }
  174. res := CheckResponse{
  175. Code: 200,
  176. Data: results,
  177. }
  178. if has {
  179. res.Msg = "存在投资关系"
  180. } else {
  181. res.Msg = "不存在投资关系"
  182. }
  183. c.JSON(http.StatusOK, res)
  184. })
  185. // 启动服务
  186. r.Run(":8080")
  187. }
  188. // dda 针对某个企业,单独生节点数据
  189. func dda() {
  190. name := "北京拓普丰联信息科技股份有限公司"
  191. rea, resb := GetInvByLevel(name, 5, 0, false)
  192. // 调用封装的连接函数
  193. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  194. if err != nil {
  195. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  196. }
  197. defer pool.Close()
  198. defer session.Release()
  199. for _, v := range rea {
  200. d := Legal{
  201. Id: v.company_id,
  202. Name: v.company_name,
  203. Code: v.credit_no,
  204. Type: "企业",
  205. }
  206. res, err := InsertCompany(session, d)
  207. if err != nil {
  208. log.Println(err, res)
  209. }
  210. }
  211. for _, v := range resb {
  212. d := Invest{
  213. FromCode: v.stock_name,
  214. ToCode: v.company_name,
  215. Amount: v.stock_amount,
  216. Ratio: v.stock_rate,
  217. }
  218. err := InsertInvestRel(session, d)
  219. if err != nil {
  220. log.Println(err, d)
  221. }
  222. }
  223. }
  224. // getQyxytData 获取企业数据
  225. func getQyxytData() {
  226. // 调用封装的连接函数
  227. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  228. if err != nil {
  229. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  230. }
  231. defer pool.Close()
  232. defer session.Release()
  233. url := "http://172.17.4.184:19908"
  234. //url := "http://127.0.0.1:19908"
  235. username := "jybid"
  236. password := "Top2023_JEB01i@31"
  237. index := "qyxy" //索引名称
  238. // 创建 Elasticsearch 客户端
  239. client, err := elastic.NewClient(
  240. elastic.SetURL(url),
  241. elastic.SetBasicAuth(username, password),
  242. elastic.SetSniff(false),
  243. )
  244. if err != nil {
  245. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  246. }
  247. //---------------------------//
  248. //query := elastic.NewBoolQuery()
  249. //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  250. //query.Must(elastic.NewTermQuery("company_city", "北京市"))
  251. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  252. query := elastic.NewBoolQuery().
  253. //北京,天津,河北,上海,江苏,浙江,安徽
  254. //Must(elastic.NewTermQuery("area", "北京市")).
  255. //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  256. MustNot(
  257. elastic.NewTermQuery("company_type", "个体工商户"),
  258. ).
  259. //Must(elastic.NewTermQuery("company_name", "河南拓普计算机网络工程有限公司"))
  260. Must(elastic.NewTermsQuery("company_area", "河南"))
  261. ctx := context.Background()
  262. //开始滚动搜索
  263. scrollID := ""
  264. scroll := "10m"
  265. searchSource := elastic.NewSearchSource().
  266. Query(query).
  267. Size(10000).
  268. Sort("_doc", true) //升序排序
  269. //Sort("_doc", false) //降序排序
  270. searchService := client.Scroll(index).
  271. Size(10000).
  272. Scroll(scroll).
  273. SearchSource(searchSource)
  274. res, err := searchService.Do(ctx)
  275. if err != nil {
  276. if err == io.EOF {
  277. fmt.Println("没有数据")
  278. } else {
  279. panic(err)
  280. }
  281. }
  282. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  283. fmt.Println("总数是:", res.TotalHits())
  284. total := 0
  285. for len(res.Hits.Hits) > 0 {
  286. for _, hit := range res.Hits.Hits {
  287. var doc map[string]interface{}
  288. err := json.Unmarshal(hit.Source, &doc)
  289. if err != nil {
  290. log.Printf("解析文档失败:%s", err)
  291. continue
  292. }
  293. company1 := Legal{
  294. Name: util.ObjToString(doc["company_name"]),
  295. Code: util.ObjToString(doc["credit_no"]),
  296. Type: "企业",
  297. }
  298. /**
  299. 1.stock_name_id 为空,直接跳过
  300. 2.stock_name 为空,直接跳过
  301. 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
  302. 4.stock_name 不含中文,跳过
  303. */
  304. if util.ObjToString(doc["company_name"]) == "" || util.ObjToString(doc["credit_no"]) == "" {
  305. continue
  306. }
  307. if strings.Contains(util.ObjToString(doc["company_name"]), "已除名") {
  308. continue
  309. }
  310. res1, err1 := InsertCompany(session, company1)
  311. if err != nil {
  312. log.Println("InsertCompany err", res1, err1)
  313. }
  314. //边
  315. if partners, ok := doc["partners"].([]interface{}); ok {
  316. for _, partner := range partners {
  317. if da, ok := partner.(map[string]interface{}); ok {
  318. if util.ObjToString(da["stock_type"]) == "企业法人" {
  319. if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
  320. continue
  321. } else {
  322. company2 := Legal{
  323. Name: util.ObjToString(da["stock_name"]),
  324. Code: util.ObjToString(da["identify_no"]),
  325. Type: "企业",
  326. }
  327. res2, err2 := InsertCompany(session, company2)
  328. if err2 != nil {
  329. log.Println("InsertCompany err", res2, err2)
  330. }
  331. //
  332. if err1 != nil || err2 != nil {
  333. continue
  334. }
  335. where := map[string]interface{}{
  336. "company_name": util.ObjToString(doc["company_name"]),
  337. "stock_name": util.ObjToString(da["stock_name"]),
  338. }
  339. ddd, _ := Mgo181.FindOne("company_partner", where)
  340. if len(*ddd) > 0 {
  341. par := *ddd
  342. amount := ParseStockCapital(util.ObjToString(par["stock_capital"]))
  343. investRel := Invest{FromCode: util.ObjToString(da["stock_name"]), ToCode: util.ObjToString(doc["company_name"]), Ratio: util.Float64All(par["stock_proportion"]), Amount: amount}
  344. err = InsertInvestRel(session, investRel)
  345. if err != nil {
  346. log.Println("InsertInvestRel", err, investRel)
  347. }
  348. }
  349. }
  350. }
  351. }
  352. }
  353. }
  354. }
  355. total = total + len(res.Hits.Hits)
  356. scrollID = res.ScrollId
  357. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  358. log.Println("current count:", total)
  359. if err != nil {
  360. if err == io.EOF {
  361. // 滚动到最后一批数据,退出循环
  362. break
  363. }
  364. log.Println("滚动搜索失败:", err, res)
  365. break // 处理错误时退出循环
  366. }
  367. }
  368. // 在循环外调用 ClearScroll
  369. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  370. if err != nil {
  371. log.Printf("清理滚动搜索失败:%s", err)
  372. }
  373. log.Println("结束~~~~~~~~~~~~~~~")
  374. }
  375. // dealCompanyPartner 处理企业投资关系
  376. func dealCompanyPartner() {
  377. // 调用封装的连接函数
  378. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  379. if err != nil {
  380. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  381. }
  382. defer pool.Close()
  383. defer session.Release()
  384. //log.Println("session", session)
  385. defer util.Catch()
  386. sess := Mgo181.GetMgoConn()
  387. defer Mgo181.DestoryMongoConn(sess)
  388. it := sess.DB("mixdata").C("company_partner").Find(nil).Select(nil).Iter()
  389. count := 0
  390. //realNum := 0
  391. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  392. if count%10000 == 0 {
  393. log.Println("current:", count, tmp["stock_name"], tmp["company_name"])
  394. }
  395. //个人企业跳过
  396. if util.IntAll(tmp["is_personal"]) == 1 {
  397. continue
  398. }
  399. if util.IntAll(tmp["use_flag"]) > 0 {
  400. continue
  401. }
  402. company1 := Legal{
  403. Name: util.ObjToString(tmp["stock_name"]),
  404. Code: util.ObjToString(tmp["stock_name_id"]),
  405. }
  406. /**
  407. 1.stock_name_id 为空,直接跳过
  408. 2.stock_name 为空,直接跳过
  409. 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
  410. 4.stock_name 不含中文,跳过
  411. */
  412. company2 := Legal{
  413. Name: util.ObjToString(tmp["company_name"]),
  414. Code: util.ObjToString(tmp["company_id"]),
  415. }
  416. res1, err1 := InsertCompany(session, company1)
  417. if err != nil {
  418. log.Println("InsertCompany err", res1, err1)
  419. }
  420. res2, err2 := InsertCompany(session, company2)
  421. if err != nil {
  422. log.Println("InsertCompany err", res2, err2)
  423. }
  424. if err1 != nil || err2 != nil {
  425. continue
  426. }
  427. //边
  428. amount := ParseStockCapital(util.ObjToString(tmp["stock_capital"]))
  429. investRel := Invest{FromCode: res1, ToCode: res2, Ratio: util.Float64All(tmp["stock_proportion"]), Amount: amount}
  430. err = InsertInvestRel(session, investRel)
  431. if err != nil {
  432. log.Println("InsertInvestRel", err, investRel)
  433. }
  434. }
  435. }
  436. // InsertCompany 插入公司节点的方法
  437. func InsertCompany(session *nebula.Session, company Legal) (string, error) {
  438. // 构建插入公司节点的查询
  439. //insertCompanyStmt := `
  440. // USE ` + Table_Space + `;
  441. // INSERT VERTEX company(company_id,name) VALUES "%s":("%s", "%s");
  442. //`
  443. //insertCompanyStmt = fmt.Sprintf(insertCompanyStmt, inv.id, inv.company_id, inv.company_name)
  444. query := fmt.Sprintf(`
  445. USE `+Table_Space+`;
  446. INSERT VERTEX Legal(name, code, type, state ) VALUES "%s":("%s", "%s", "%s", "%s")
  447. `, company.Name, company.Name, company.Code, company.Type, company.State)
  448. // 执行查询
  449. result, err := session.Execute(query)
  450. if err != nil {
  451. log.Println("InsertCompany", result)
  452. return "", err
  453. }
  454. // 打印返回结果
  455. //fmt.Println("Insert Company Result:", result)
  456. // 返回节点ID(通常可以通过返回的结果中的 "_vid" 字段获取)
  457. return company.Name, nil
  458. }
  459. // InsertInvestRel 插入投资关系边的方法
  460. func InsertInvestRel(session *nebula.Session, investRel Invest) error {
  461. // 构建插入投资关系边的查询
  462. query := fmt.Sprintf(`
  463. USE `+Table_Space+`;
  464. INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f)
  465. `, investRel.FromCode, investRel.ToCode, investRel.Amount, investRel.Ratio)
  466. // 执行查询
  467. result, err := session.Execute(query)
  468. if err != nil {
  469. log.Println("InsertInvestRel", result)
  470. return err
  471. }
  472. // 打印返回结果
  473. //fmt.Println("Insert InvestRel Result:", result)
  474. return nil
  475. }
  476. func ParseStockCapital(raw string) float64 {
  477. raw = strings.TrimSpace(raw)
  478. // 默认单位:万元人民币
  479. exchangeRateUSD := 7.0
  480. // 匹配数值部分(可能带小数)
  481. re := regexp.MustCompile(`([\d.]+)`)
  482. match := re.FindStringSubmatch(raw)
  483. if len(match) < 2 {
  484. return 0
  485. }
  486. value, _ := strconv.ParseFloat(match[1], 64)
  487. // 判断单位并转换
  488. switch {
  489. case strings.Contains(raw, "万美元"):
  490. value *= exchangeRateUSD // 转换成人民币
  491. case strings.Contains(raw, "元") || strings.Contains(raw, "人民币"):
  492. if strings.Contains(raw, "万元") || strings.Contains(raw, "万") {
  493. // 已经是万元单位,无需处理
  494. } else {
  495. // 是“元”,需要除以1万
  496. value = value / 10000
  497. }
  498. default:
  499. // 可能是纯数字,默认视为“万元”
  500. }
  501. return value
  502. }
  503. /*
  504. 根据公司名称和层级向上挖掘,获取顶点和边;
  505. maxLevel 挖掘层级数量;
  506. direction 0:双向挖掘 -1:向上挖掘 1:向下挖掘
  507. person true:保留自然人股东 false:不保留自然人股东
  508. */
  509. func GetInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
  510. verter := map[string]InvestVertex{}
  511. edges := []InvestEdge{}
  512. if direction == 0 {
  513. v1, e1 := getInvByLevel(company_name, maxLevel, 1, person)
  514. v2, e2 := getInvByLevel(company_name, maxLevel, -1, person)
  515. for k, v := range v1 {
  516. verter[k] = v
  517. }
  518. for k, v := range v2 {
  519. verter[k] = v
  520. }
  521. edges = append(edges, e1...)
  522. edges = append(edges, e2...)
  523. } else {
  524. verter, edges = getInvByLevel(company_name, maxLevel, direction, person)
  525. }
  526. return verter, edges
  527. }
  528. func getInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
  529. data, _ := Mgo181.FindOne("company_base", map[string]interface{}{
  530. "company_name": company_name,
  531. })
  532. company_id := fmt.Sprint((*data)["company_id"])
  533. credit_no := fmt.Sprint((*data)["credit_no"])
  534. var edges = []InvestEdge{} //记录边
  535. var verter = map[string]InvestVertex{} //有效顶点
  536. // 初始化队列和访问记录
  537. type node struct {
  538. companyID, companyName, creditNo string
  539. level int
  540. }
  541. queue := []node{{companyID: company_id, companyName: company_name, creditNo: credit_no, level: 1}}
  542. visited := make(map[string]bool)
  543. for len(queue) > 0 {
  544. current := queue[0]
  545. if _, ok := verter[current.companyID]; !ok {
  546. verter[current.companyID] = InvestVertex{
  547. id: current.companyID,
  548. company_id: current.companyID,
  549. company_name: current.companyName,
  550. credit_no: current.creditNo,
  551. }
  552. }
  553. queue = queue[1:]
  554. if visited[current.companyID] || current.level > maxLevel { // 防止重复处理和超过最大层级
  555. continue
  556. }
  557. visited[current.companyID] = true
  558. query := map[string]interface{}{"company_id": current.companyID}
  559. if direction > 0 {
  560. query = map[string]interface{}{"stock_name_id": current.companyID}
  561. }
  562. partners, _ := Mgo181.Find("company_partner", query, nil, nil, false, -1, -1)
  563. // 处理股东数据
  564. for _, p := range *partners {
  565. //log.Println(direction, p)
  566. if fmt.Sprint(p["is_history"]) == "1" {
  567. continue
  568. }
  569. // 构建投资关系
  570. inv := InvestEdge{
  571. company_id: fmt.Sprint(p["company_id"]),
  572. company_name: fmt.Sprint(p["company_name"]),
  573. stock_id: fmt.Sprint(p["stock_name_id"]),
  574. stock_name: fmt.Sprint(p["stock_name"]),
  575. stock_rate: convertStockCapitalToFloat(fmt.Sprint(p["stock_proportion"])),
  576. stock_amount: convertStockCapitalToFloat(fmt.Sprint(p["stock_capital"])),
  577. stock_level: current.level,
  578. stock_type: 0, // 默认机构股东
  579. }
  580. edges = append(edges, inv)
  581. // 根据股东类型是否继续挖掘
  582. if fmt.Sprint(p["stock_type"]) == "自然人股东" || convertStockCapitalToFloat(fmt.Sprint(p["is_personal"])) > 0 {
  583. inv.stock_type = 1
  584. if _, ok := verter[inv.stock_id]; !ok && person {
  585. verter[inv.stock_id] = InvestVertex{
  586. id: inv.stock_id,
  587. company_id: inv.stock_id,
  588. company_name: inv.stock_name,
  589. }
  590. }
  591. } else {
  592. where1 := map[string]interface{}{
  593. "company_name": inv.company_name,
  594. }
  595. where2 := map[string]interface{}{
  596. "company_name": inv.stock_name,
  597. }
  598. company, _ := Mgo181.FindOne("company_base", where1)
  599. stock, _ := Mgo181.FindOne("company_base", where2)
  600. // 机构股东加入队列继续穿透
  601. if direction > 0 { //向下挖掘
  602. if !visited[inv.company_id] {
  603. queue = append(queue, node{
  604. companyID: inv.company_id,
  605. companyName: inv.company_name,
  606. creditNo: util.ObjToString((*company)["credit_no"]),
  607. level: current.level + 1,
  608. })
  609. }
  610. } else { //向上挖掘
  611. if !visited[inv.stock_id] {
  612. queue = append(queue, node{
  613. companyID: inv.stock_id,
  614. companyName: inv.stock_name,
  615. creditNo: util.ObjToString((*stock)["credit_no"]),
  616. level: current.level + 1,
  617. })
  618. }
  619. }
  620. }
  621. }
  622. //log.Printf("已处理层级%d,当前队列深度%d", current.level, len(queue))
  623. }
  624. return verter, edges
  625. }
  626. func convertStockCapitalToFloat(str string) float64 {
  627. // 查找匹配的数字
  628. match := re.FindString(str)
  629. if match == "" {
  630. return 0
  631. }
  632. // 将匹配到的数字字符串转换为浮点数
  633. result, err := strconv.ParseFloat(match, 64)
  634. if err != nil {
  635. return 0
  636. }
  637. return result
  638. }