|
@@ -0,0 +1,584 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "github.com/olivere/elastic/v7"
|
|
|
+ nebula "github.com/vesoft-inc/nebula-go/v3"
|
|
|
+ "io"
|
|
|
+ util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
+ "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
+ "log"
|
|
|
+ "regexp"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ re = regexp.MustCompile(`[-+]?[0-9]*\.?[0-9]+`)
|
|
|
+ HostList = []nebula.HostAddress{{Host: "114.116.213.97", Port: 9669}}
|
|
|
+ //HostList = []nebula.HostAddress{{Host: "127.0.0.1", Port: 9669}}
|
|
|
+ UserName = "root"
|
|
|
+ PassWord = "jianyu@123"
|
|
|
+ Mgo181 *mongodb.MongodbSim
|
|
|
+ Table_Space = "legal_profile"
|
|
|
+)
|
|
|
+
|
|
|
+// Legal 代表公司节点的结构体
|
|
|
+type Legal struct {
|
|
|
+ Name string
|
|
|
+ Code string
|
|
|
+ Type string
|
|
|
+}
|
|
|
+
|
|
|
+// Invest 代表公司之间的投资关系边的结构体
|
|
|
+type Invest struct {
|
|
|
+ FromCode string
|
|
|
+ ToCode string
|
|
|
+ Amount float64
|
|
|
+ Ratio float64
|
|
|
+}
|
|
|
+
|
|
|
+type InvestVertex struct { //顶点
|
|
|
+ id string
|
|
|
+ company_id string
|
|
|
+ company_name string
|
|
|
+ credit_no string
|
|
|
+}
|
|
|
+
|
|
|
+type InvestEdge struct { //边
|
|
|
+ company_id string
|
|
|
+ company_name string
|
|
|
+ stock_id string
|
|
|
+ stock_name string
|
|
|
+ stock_rate float64
|
|
|
+ stock_amount float64
|
|
|
+ stock_level int
|
|
|
+ stock_type int //0企业股东 1自然人股东
|
|
|
+}
|
|
|
+
|
|
|
+// ConnectToNebula 封装数据库连接函数
|
|
|
+func ConnectToNebula(hosts []nebula.HostAddress, username, password string) (*nebula.Session, *nebula.ConnectionPool, error) {
|
|
|
+ // 创建连接池配置
|
|
|
+ config := nebula.GetDefaultConf()
|
|
|
+ config.UseHTTP2 = false
|
|
|
+ config.HandshakeKey = ""
|
|
|
+
|
|
|
+ // 初始化连接池
|
|
|
+ pool, err := nebula.NewConnectionPool(hosts, config, nebula.DefaultLogger{})
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取会话
|
|
|
+ session, err := pool.GetSession(username, password)
|
|
|
+ if err != nil {
|
|
|
+ pool.Close()
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+ return session, pool, nil
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ InitMgo()
|
|
|
+ getQyxytData()
|
|
|
+ //
|
|
|
+
|
|
|
+ log.Println("数据处理完毕!!!!!!!")
|
|
|
+}
|
|
|
+
|
|
|
+func dda() {
|
|
|
+ name := "北京拓普丰联信息科技股份有限公司"
|
|
|
+ rea, resb := GetInvByLevel(name, 5, 0, false)
|
|
|
+
|
|
|
+ // 调用封装的连接函数
|
|
|
+ session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Failed to connect to Nebula Graph: %v", err)
|
|
|
+ }
|
|
|
+ defer pool.Close()
|
|
|
+ defer session.Release()
|
|
|
+ for _, v := range rea {
|
|
|
+ d := Legal{
|
|
|
+ Name: v.company_name,
|
|
|
+ Code: v.credit_no,
|
|
|
+ }
|
|
|
+ res, err := InsertCompany(session, d)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err, res)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, v := range resb {
|
|
|
+ d := Invest{
|
|
|
+ FromCode: v.stock_name,
|
|
|
+ ToCode: v.company_name,
|
|
|
+ Amount: v.stock_amount,
|
|
|
+ Ratio: v.stock_rate,
|
|
|
+ }
|
|
|
+
|
|
|
+ err := InsertInvestRel(session, d)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err, d)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// getQyxytData 获取企业数据
|
|
|
+func getQyxytData() {
|
|
|
+ // 调用封装的连接函数
|
|
|
+ session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Failed to connect to Nebula Graph: %v", err)
|
|
|
+ }
|
|
|
+ defer pool.Close()
|
|
|
+ defer session.Release()
|
|
|
+
|
|
|
+ url := "http://172.17.4.184:19908"
|
|
|
+ //url := "http://127.0.0.1:19908"
|
|
|
+ username := "jybid"
|
|
|
+ password := "Top2023_JEB01i@31"
|
|
|
+ index := "qyxy" //索引名称
|
|
|
+ // 创建 Elasticsearch 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(url),
|
|
|
+ elastic.SetBasicAuth(username, password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ //---------------------------//
|
|
|
+ //query := elastic.NewBoolQuery()
|
|
|
+ //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
|
|
|
+ //query.Must(elastic.NewTermQuery("company_city", "北京市"))
|
|
|
+
|
|
|
+ //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
+ //北京,天津,河北,上海,江苏,浙江,安徽
|
|
|
+ //Must(elastic.NewTermQuery("area", "北京市")).
|
|
|
+ //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
|
|
|
+ MustNot(
|
|
|
+ elastic.NewTermQuery("company_type", "个体工商户"),
|
|
|
+ ).
|
|
|
+ //Must(elastic.NewTermQuery("company_name", "河南拓普计算机网络工程有限公司"))
|
|
|
+ Must(elastic.NewTermsQuery("company_area", "河南"))
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ //开始滚动搜索
|
|
|
+ scrollID := ""
|
|
|
+ scroll := "10m"
|
|
|
+ searchSource := elastic.NewSearchSource().
|
|
|
+ Query(query).
|
|
|
+ Size(10000).
|
|
|
+ Sort("_doc", true) //升序排序
|
|
|
+ //Sort("_doc", false) //降序排序
|
|
|
+
|
|
|
+ searchService := client.Scroll(index).
|
|
|
+ Size(10000).
|
|
|
+ Scroll(scroll).
|
|
|
+ SearchSource(searchSource)
|
|
|
+
|
|
|
+ res, err := searchService.Do(ctx)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ fmt.Println("没有数据")
|
|
|
+ } else {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
|
|
|
+ fmt.Println("总数是:", res.TotalHits())
|
|
|
+
|
|
|
+ total := 0
|
|
|
+ for len(res.Hits.Hits) > 0 {
|
|
|
+ for _, hit := range res.Hits.Hits {
|
|
|
+ var doc map[string]interface{}
|
|
|
+ err := json.Unmarshal(hit.Source, &doc)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("解析文档失败:%s", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ company1 := Legal{
|
|
|
+ Name: util.ObjToString(doc["company_name"]),
|
|
|
+ Code: util.ObjToString(doc["credit_no"]),
|
|
|
+ Type: "企业",
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ 1.stock_name_id 为空,直接跳过
|
|
|
+ 2.stock_name 为空,直接跳过
|
|
|
+ 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
|
|
|
+ 4.stock_name 不含中文,跳过
|
|
|
+ */
|
|
|
+
|
|
|
+ if util.ObjToString(doc["company_name"]) == "" || util.ObjToString(doc["credit_no"]) == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if strings.Contains(util.ObjToString(doc["company_name"]), "已除名") {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ res1, err1 := InsertCompany(session, company1)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("InsertCompany err", res1, err1)
|
|
|
+ }
|
|
|
+
|
|
|
+ //边
|
|
|
+ if partners, ok := doc["partners"].([]interface{}); ok {
|
|
|
+ for _, partner := range partners {
|
|
|
+ if da, ok := partner.(map[string]interface{}); ok {
|
|
|
+ if util.ObjToString(da["stock_type"]) == "企业法人" {
|
|
|
+ if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ company2 := Legal{
|
|
|
+ Name: util.ObjToString(da["stock_name"]),
|
|
|
+ Code: util.ObjToString(da["identify_no"]),
|
|
|
+ Type: "企业",
|
|
|
+ }
|
|
|
+ res2, err2 := InsertCompany(session, company2)
|
|
|
+ if err2 != nil {
|
|
|
+ log.Println("InsertCompany err", res2, err2)
|
|
|
+ }
|
|
|
+ //
|
|
|
+
|
|
|
+ if err1 != nil || err2 != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ where := map[string]interface{}{
|
|
|
+ "company_name": util.ObjToString(doc["company_name"]),
|
|
|
+ "stock_name": util.ObjToString(da["stock_name"]),
|
|
|
+ }
|
|
|
+
|
|
|
+ ddd, _ := Mgo181.FindOne("company_partner", where)
|
|
|
+ if len(*ddd) > 0 {
|
|
|
+ par := *ddd
|
|
|
+ amount := ParseStockCapital(util.ObjToString(par["stock_capital"]))
|
|
|
+ investRel := Invest{FromCode: util.ObjToString(da["stock_name"]), ToCode: util.ObjToString(doc["company_name"]), Ratio: util.Float64All(par["stock_proportion"]), Amount: amount}
|
|
|
+ err = InsertInvestRel(session, investRel)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("InsertInvestRel", err, investRel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ total = total + len(res.Hits.Hits)
|
|
|
+ scrollID = res.ScrollId
|
|
|
+ res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
|
|
|
+ log.Println("current count:", total)
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ // 滚动到最后一批数据,退出循环
|
|
|
+ break
|
|
|
+ }
|
|
|
+ log.Println("滚动搜索失败:", err, res)
|
|
|
+ break // 处理错误时退出循环
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 在循环外调用 ClearScroll
|
|
|
+ _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("清理滚动搜索失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("结束~~~~~~~~~~~~~~~")
|
|
|
+}
|
|
|
+
|
|
|
+// dealCompanyPartner 处理企业投资关系
|
|
|
+func dealCompanyPartner() {
|
|
|
+ // 调用封装的连接函数
|
|
|
+ session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Failed to connect to Nebula Graph: %v", err)
|
|
|
+ }
|
|
|
+ defer pool.Close()
|
|
|
+ defer session.Release()
|
|
|
+
|
|
|
+ //log.Println("session", session)
|
|
|
+
|
|
|
+ defer util.Catch()
|
|
|
+ sess := Mgo181.GetMgoConn()
|
|
|
+ defer Mgo181.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ it := sess.DB("mixdata").C("company_partner").Find(nil).Select(nil).Iter()
|
|
|
+ count := 0
|
|
|
+ //realNum := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%10000 == 0 {
|
|
|
+ log.Println("current:", count, tmp["stock_name"], tmp["company_name"])
|
|
|
+ }
|
|
|
+ //个人企业跳过
|
|
|
+ if util.IntAll(tmp["is_personal"]) == 1 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if util.IntAll(tmp["use_flag"]) > 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ company1 := Legal{
|
|
|
+ Name: util.ObjToString(tmp["stock_name"]),
|
|
|
+ Code: util.ObjToString(tmp["stock_name_id"]),
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ 1.stock_name_id 为空,直接跳过
|
|
|
+ 2.stock_name 为空,直接跳过
|
|
|
+ 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
|
|
|
+ 4.stock_name 不含中文,跳过
|
|
|
+ */
|
|
|
+
|
|
|
+ company2 := Legal{
|
|
|
+ Name: util.ObjToString(tmp["company_name"]),
|
|
|
+ Code: util.ObjToString(tmp["company_id"]),
|
|
|
+ }
|
|
|
+ res1, err1 := InsertCompany(session, company1)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("InsertCompany err", res1, err1)
|
|
|
+ }
|
|
|
+ res2, err2 := InsertCompany(session, company2)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("InsertCompany err", res2, err2)
|
|
|
+ }
|
|
|
+ if err1 != nil || err2 != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //边
|
|
|
+ amount := ParseStockCapital(util.ObjToString(tmp["stock_capital"]))
|
|
|
+ investRel := Invest{FromCode: res1, ToCode: res2, Ratio: util.Float64All(tmp["stock_proportion"]), Amount: amount}
|
|
|
+ err = InsertInvestRel(session, investRel)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("InsertInvestRel", err, investRel)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// InsertCompany 插入公司节点的方法
|
|
|
+func InsertCompany(session *nebula.Session, company Legal) (string, error) {
|
|
|
+ // 构建插入公司节点的查询
|
|
|
+ //insertCompanyStmt := `
|
|
|
+ // USE ` + Table_Space + `;
|
|
|
+ // INSERT VERTEX company(company_id,name) VALUES "%s":("%s", "%s");
|
|
|
+ //`
|
|
|
+ //insertCompanyStmt = fmt.Sprintf(insertCompanyStmt, inv.id, inv.company_id, inv.company_name)
|
|
|
+ query := fmt.Sprintf(`
|
|
|
+ USE `+Table_Space+`;
|
|
|
+ INSERT VERTEX Legal(name, code, type) VALUES "%s":("%s", "%s", "%s")
|
|
|
+ `, company.Name, company.Name, company.Code, company.Type)
|
|
|
+
|
|
|
+ // 执行查询
|
|
|
+ result, err := session.Execute(query)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("InsertCompany", result)
|
|
|
+ return "", err
|
|
|
+ }
|
|
|
+
|
|
|
+ // 打印返回结果
|
|
|
+ //fmt.Println("Insert Company Result:", result)
|
|
|
+
|
|
|
+ // 返回节点ID(通常可以通过返回的结果中的 "_vid" 字段获取)
|
|
|
+ return company.Name, nil
|
|
|
+}
|
|
|
+
|
|
|
+// InsertInvestRel 插入投资关系边的方法
|
|
|
+func InsertInvestRel(session *nebula.Session, investRel Invest) error {
|
|
|
+ // 构建插入投资关系边的查询
|
|
|
+ query := fmt.Sprintf(`
|
|
|
+ USE `+Table_Space+`;
|
|
|
+ INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f)
|
|
|
+ `, investRel.FromCode, investRel.ToCode, investRel.Amount, investRel.Ratio)
|
|
|
+
|
|
|
+ // 执行查询
|
|
|
+ result, err := session.Execute(query)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("InsertInvestRel", result)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // 打印返回结果
|
|
|
+ //fmt.Println("Insert InvestRel Result:", result)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func ParseStockCapital(raw string) float64 {
|
|
|
+ raw = strings.TrimSpace(raw)
|
|
|
+ // 默认单位:万元人民币
|
|
|
+ exchangeRateUSD := 7.0
|
|
|
+ // 匹配数值部分(可能带小数)
|
|
|
+ re := regexp.MustCompile(`([\d.]+)`)
|
|
|
+ match := re.FindStringSubmatch(raw)
|
|
|
+ if len(match) < 2 {
|
|
|
+ return 0
|
|
|
+ }
|
|
|
+
|
|
|
+ value, _ := strconv.ParseFloat(match[1], 64)
|
|
|
+
|
|
|
+ // 判断单位并转换
|
|
|
+ switch {
|
|
|
+ case strings.Contains(raw, "万美元"):
|
|
|
+ value *= exchangeRateUSD // 转换成人民币
|
|
|
+ case strings.Contains(raw, "元") || strings.Contains(raw, "人民币"):
|
|
|
+ if strings.Contains(raw, "万元") || strings.Contains(raw, "万") {
|
|
|
+ // 已经是万元单位,无需处理
|
|
|
+ } else {
|
|
|
+ // 是“元”,需要除以1万
|
|
|
+ value = value / 10000
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ // 可能是纯数字,默认视为“万元”
|
|
|
+ }
|
|
|
+
|
|
|
+ return value
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+根据公司名称和层级向上挖掘,获取顶点和边;
|
|
|
+maxLevel 挖掘层级数量;
|
|
|
+direction 0:双向挖掘 -1:向上挖掘 1:向下挖掘
|
|
|
+person true:保留自然人股东 false:不保留自然人股东
|
|
|
+*/
|
|
|
+func GetInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
|
|
|
+ verter := map[string]InvestVertex{}
|
|
|
+ edges := []InvestEdge{}
|
|
|
+ if direction == 0 {
|
|
|
+ v1, e1 := getInvByLevel(company_name, maxLevel, 1, person)
|
|
|
+ v2, e2 := getInvByLevel(company_name, maxLevel, -1, person)
|
|
|
+ for k, v := range v1 {
|
|
|
+ verter[k] = v
|
|
|
+ }
|
|
|
+ for k, v := range v2 {
|
|
|
+ verter[k] = v
|
|
|
+ }
|
|
|
+ edges = append(edges, e1...)
|
|
|
+ edges = append(edges, e2...)
|
|
|
+
|
|
|
+ } else {
|
|
|
+ verter, edges = getInvByLevel(company_name, maxLevel, direction, person)
|
|
|
+ }
|
|
|
+ return verter, edges
|
|
|
+}
|
|
|
+
|
|
|
+func getInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
|
|
|
+ data, _ := Mgo181.FindOne("company_base", map[string]interface{}{
|
|
|
+ "company_name": company_name,
|
|
|
+ })
|
|
|
+ company_id := fmt.Sprint((*data)["company_id"])
|
|
|
+ credit_no := fmt.Sprint((*data)["credit_no"])
|
|
|
+ var edges = []InvestEdge{} //记录边
|
|
|
+ var verter = map[string]InvestVertex{} //有效顶点
|
|
|
+ // 初始化队列和访问记录
|
|
|
+ type node struct {
|
|
|
+ companyID, companyName, creditNo string
|
|
|
+ level int
|
|
|
+ }
|
|
|
+ queue := []node{{companyID: company_id, companyName: company_name, creditNo: credit_no, level: 1}}
|
|
|
+ visited := make(map[string]bool)
|
|
|
+ for len(queue) > 0 {
|
|
|
+ current := queue[0]
|
|
|
+ if _, ok := verter[current.companyID]; !ok {
|
|
|
+ verter[current.companyID] = InvestVertex{
|
|
|
+ id: current.companyID,
|
|
|
+ company_id: current.companyID,
|
|
|
+ company_name: current.companyName,
|
|
|
+ credit_no: current.creditNo,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ queue = queue[1:]
|
|
|
+ if visited[current.companyID] || current.level > maxLevel { // 防止重复处理和超过最大层级
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ visited[current.companyID] = true
|
|
|
+ query := map[string]interface{}{"company_id": current.companyID}
|
|
|
+ if direction > 0 {
|
|
|
+ query = map[string]interface{}{"stock_name_id": current.companyID}
|
|
|
+ }
|
|
|
+ partners, _ := Mgo181.Find("company_partner", query, nil, nil, false, -1, -1)
|
|
|
+ // 处理股东数据
|
|
|
+ for _, p := range *partners {
|
|
|
+ //log.Println(direction, p)
|
|
|
+ if fmt.Sprint(p["is_history"]) == "1" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // 构建投资关系
|
|
|
+ inv := InvestEdge{
|
|
|
+ company_id: fmt.Sprint(p["company_id"]),
|
|
|
+ company_name: fmt.Sprint(p["company_name"]),
|
|
|
+ stock_id: fmt.Sprint(p["stock_name_id"]),
|
|
|
+ stock_name: fmt.Sprint(p["stock_name"]),
|
|
|
+ stock_rate: convertStockCapitalToFloat(fmt.Sprint(p["stock_proportion"])),
|
|
|
+ stock_amount: convertStockCapitalToFloat(fmt.Sprint(p["stock_capital"])),
|
|
|
+ stock_level: current.level,
|
|
|
+ stock_type: 0, // 默认机构股东
|
|
|
+ }
|
|
|
+ edges = append(edges, inv)
|
|
|
+
|
|
|
+ // 根据股东类型是否继续挖掘
|
|
|
+ if fmt.Sprint(p["stock_type"]) == "自然人股东" || convertStockCapitalToFloat(fmt.Sprint(p["is_personal"])) > 0 {
|
|
|
+ inv.stock_type = 1
|
|
|
+ if _, ok := verter[inv.stock_id]; !ok && person {
|
|
|
+ verter[inv.stock_id] = InvestVertex{
|
|
|
+ id: inv.stock_id,
|
|
|
+ company_id: inv.stock_id,
|
|
|
+ company_name: inv.stock_name,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ where1 := map[string]interface{}{
|
|
|
+ "company_name": inv.company_name,
|
|
|
+ }
|
|
|
+ where2 := map[string]interface{}{
|
|
|
+ "company_name": inv.stock_name,
|
|
|
+ }
|
|
|
+ company, _ := Mgo181.FindOne("company_base", where1)
|
|
|
+ stock, _ := Mgo181.FindOne("company_base", where2)
|
|
|
+ // 机构股东加入队列继续穿透
|
|
|
+ if direction > 0 { //向下挖掘
|
|
|
+ if !visited[inv.company_id] {
|
|
|
+ queue = append(queue, node{
|
|
|
+ companyID: inv.company_id,
|
|
|
+ companyName: inv.company_name,
|
|
|
+ creditNo: util.ObjToString((*company)["credit_no"]),
|
|
|
+ level: current.level + 1,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ } else { //向上挖掘
|
|
|
+ if !visited[inv.stock_id] {
|
|
|
+ queue = append(queue, node{
|
|
|
+ companyID: inv.stock_id,
|
|
|
+ companyName: inv.stock_name,
|
|
|
+ creditNo: util.ObjToString((*stock)["credit_no"]),
|
|
|
+ level: current.level + 1,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Printf("已处理层级%d,当前队列深度%d", current.level, len(queue))
|
|
|
+ }
|
|
|
+ return verter, edges
|
|
|
+}
|
|
|
+
|
|
|
+func convertStockCapitalToFloat(str string) float64 {
|
|
|
+ // 查找匹配的数字
|
|
|
+ match := re.FindString(str)
|
|
|
+ if match == "" {
|
|
|
+ return 0
|
|
|
+ }
|
|
|
+ // 将匹配到的数字字符串转换为浮点数
|
|
|
+ result, err := strconv.ParseFloat(match, 64)
|
|
|
+ if err != nil {
|
|
|
+ return 0
|
|
|
+ }
|
|
|
+ return result
|
|
|
+}
|