package main import ( "context" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/olivere/elastic/v7" nebula "github.com/vesoft-inc/nebula-go/v3" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "net/http" "regexp" "strconv" "strings" ) var ( re = regexp.MustCompile(`[-+]?[0-9]*\.?[0-9]+`) HostList = []nebula.HostAddress{{Host: "114.116.213.97", Port: 9669}} //HostList = []nebula.HostAddress{{Host: "127.0.0.1", Port: 9669}} UserName = "root" PassWord = "jianyu@123" Mgo181 *mongodb.MongodbSim //Table_Space = "legal_profile" Table_Space = "legal_profile" WorkerCount = 5 BatchSize = 100 ) // Legal 代表公司节点的结构体 type Legal struct { Id string Name string Code string Type string //类型,企业,事业单哪位,政府部门 State string //状态,有效/无效;不是存续、在营、开业、在册 } // Invest 代表公司之间的投资关系边的结构体 type Invest struct { FromCode string ToCode string Amount float64 Ratio float64 } // SuspectInvest 疑似关系 type SuspectInvest struct { FromCode string `json:"from_code"` ToCode string `json:"to_code"` Reason string `json:"reason"` //原因,手机、邮箱相同 } // InsertSuspectJob 处理疑似投资 type InsertSuspectJob struct { Relations SuspectInvest } // ExecutivesInvest 高管关系结构体 type ExecutivesInvest struct { FromCode string `json:"from_code"` ToCode string `json:"to_code"` Name string `json:"name"` //姓名 } // InsertJob 批量处理投资关系的点 和 边关系 type InsertJob struct { Companies []Legal Relations []Invest } type InvestVertex struct { //顶点 id string company_id string company_name string credit_no string } type InvestEdge struct { //边 company_id string company_name string stock_id string stock_name string stock_rate float64 stock_amount float64 stock_level int stock_type int //0企业股东 1自然人股东 } // ConnectToNebula 封装数据库连接函数 func ConnectToNebula(hosts []nebula.HostAddress, username, password string) (*nebula.Session, *nebula.ConnectionPool, error) { // 创建连接池配置 config := nebula.GetDefaultConf() config.UseHTTP2 = false config.HandshakeKey = "" // 初始化连接池 pool, err := nebula.NewConnectionPool(hosts, config, nebula.DefaultLogger{}) if err != nil { return nil, nil, err } // 获取会话 session, err := pool.GetSession(username, password) if err != nil { pool.Close() return nil, nil, err } return session, pool, nil } type CheckRequest struct { Names []string `json:"names"` Deep int `json:"deep"` Stype int `json:"stype"` //0.简易模式,匹配到直接返回;1.匹配完所有的数据 } type CheckResponse struct { Code int `json:"code"` Data []string `json:"data"` Msg string `json:"msg"` } type GraphResponse struct { Code int `json:"code"` Data []string `json:"data"` Msg string `json:"msg"` Echars *GraphResult `json:"echars"` } // SuspectInvestResponse 意思关系返回结果 type SuspectInvestResponse struct { Code int `json:"code"` Data []interface{} `json:"data"` Msg string `json:"msg"` } func main() { handHttp() return //dealYS() //dealTsDongShi() //董事高管 //log.Println("处理完毕") //return //InitMgo() //初始化 MongoDB //dealTsGraph() //处理疑似关系图谱 //dealDsGraph() //处理高管 关系到 图形数据库 //---------------// //dda() //dealCompanyBase22() //dealCompanyBase() //迭代company_base 处理企业数据 //batchDealGraph() // 迭代es 处理企业数据; // //log.Println("数据处理完毕!!!!!!!") //return //-----------------------------------------// //3、改造方法,使用连接池,避免session过去// // 初始化 Gin 路由 //r := gin.Default() // //client, err := NewNebulaClient(HostList, UserName, PassWord) //if err != nil { // log.Fatal("连接失败:", err) //} //defer client.Close() //// 注册 POST 接口 //// 提供 HTML 页面 ////r.GET("/legal/graph", func(c *gin.Context) { //// c.HTML(http.StatusOK, "graph.html", nil) ////}) //r.POST("/check-relations", func(c *gin.Context) { // var req CheckRequest // if err := c.ShouldBindJSON(&req); err != nil { // c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"}) // return // } // // has, results, err := client.CheckLegalRelationships(req.Names, req.Deep, req.Stype) // if err != nil { // res := CheckResponse{ // Code: -1, // Data: results, // Msg: "请求失败;" + err.Error(), // } // c.JSON(http.StatusInternalServerError, res) // return // } // // res := CheckResponse{ // Code: 200, // Data: results, // } // if has { // res.Msg = "存在投资关系" // } else { // res.Msg = "不存在投资关系" // } // // c.JSON(http.StatusOK, res) //}) //// 启动服务 //r.Run(":8080") } // handHttp 对外提供 HTTP接口 func handHttp() { // 初始化 Gin 路由 r := gin.Default() client, err := NewNebulaClient(HostList, UserName, PassWord) if err != nil { log.Fatal("连接失败:", err) } defer client.Close() //1、投资关系查询 r.POST("/check-relations", func(c *gin.Context) { var req CheckRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"}) return } has, results, resb, err := client.CheckLegalRelationships(req.Names, req.Deep, req.Stype) if err != nil { res := GraphResponse{ Code: -1, Data: results, Msg: "请求失败;" + err.Error(), } c.JSON(http.StatusInternalServerError, res) return } res := GraphResponse{ Code: 200, Data: results, Echars: resb, } if has { res.Msg = "存在投资关系" } else { res.Msg = "不存在投资关系" } c.JSON(http.StatusOK, res) }) //2、疑似关系 查询 r.POST("/yisi-relations", func(c *gin.Context) { var req CheckRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"}) return } results, err := client.FindSuspectInvestRelationsByNames(req.Names) if err != nil { res := map[string]interface{}{ "code": 200, "msg": "请求失败", "data": err.Error(), } c.JSON(http.StatusInternalServerError, res) return } else { res := map[string]interface{}{ "code": 200, "msg": "请求成功", "data": results, } c.JSON(http.StatusOK, res) } }) //3.董高监 关系 r.POST("/dgj-relations", func(c *gin.Context) { var req CheckRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"}) return } results, err := client.FindExecutivesInvestRelationsByNames(req.Names) if err != nil { res := map[string]interface{}{ "code": 200, "msg": "请求失败", "data": err.Error(), } c.JSON(http.StatusInternalServerError, res) return } else { res := map[string]interface{}{ "code": 200, "msg": "请求成功", "data": results, } c.JSON(http.StatusOK, res) } }) //--------------------------// // 启动服务 r.Run(":8080") } // dda 针对某个企业,单独生节点数据 func dda() { name := "北京拓普丰联信息科技股份有限公司" rea, resb := GetInvByLevel(name, 5, 0, false) // 调用封装的连接函数 session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() for _, v := range rea { d := Legal{ Id: v.company_id, Name: v.company_name, Code: v.credit_no, Type: "企业", } res, err := InsertCompany(session, d) if err != nil { log.Println(err, res) } } for _, v := range resb { d := Invest{ FromCode: v.stock_name, ToCode: v.company_name, Amount: v.stock_amount, Ratio: v.stock_rate, } err := InsertInvestRel(session, d) if err != nil { log.Println(err, d) } } } // getQyxytData 获取企业数据 func getQyxytData() { // 调用封装的连接函数 session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "qyxy" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //---------------------------// //query := elastic.NewBoolQuery() //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理")) //query.Must(elastic.NewTermQuery("company_city", "北京市")) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") query := elastic.NewBoolQuery(). //北京,天津,河北,上海,江苏,浙江,安徽 //Must(elastic.NewTermQuery("area", "北京市")). //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). MustNot( elastic.NewTermQuery("company_type", "个体工商户"), ). //Must(elastic.NewTermQuery("company_name", "河南拓普计算机网络工程有限公司")) Must(elastic.NewTermsQuery("company_area", "河南")) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } company1 := Legal{ Name: util.ObjToString(doc["company_name"]), Code: util.ObjToString(doc["credit_no"]), Type: "企业", } /** 1.stock_name_id 为空,直接跳过 2.stock_name 为空,直接跳过 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过 4.stock_name 不含中文,跳过 */ if util.ObjToString(doc["company_name"]) == "" || util.ObjToString(doc["credit_no"]) == "" { continue } if strings.Contains(util.ObjToString(doc["company_name"]), "已除名") { continue } res1, err1 := InsertCompany(session, company1) if err != nil { log.Println("InsertCompany err", res1, err1) } //边 if partners, ok := doc["partners"].([]interface{}); ok { for _, partner := range partners { if da, ok := partner.(map[string]interface{}); ok { if util.ObjToString(da["stock_type"]) == "企业法人" { if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" { continue } else { company2 := Legal{ Name: util.ObjToString(da["stock_name"]), Code: util.ObjToString(da["identify_no"]), Type: "企业", } res2, err2 := InsertCompany(session, company2) if err2 != nil { log.Println("InsertCompany err", res2, err2) } // if err1 != nil || err2 != nil { continue } where := map[string]interface{}{ "company_name": util.ObjToString(doc["company_name"]), "stock_name": util.ObjToString(da["stock_name"]), } ddd, _ := Mgo181.FindOne("company_partner", where) if len(*ddd) > 0 { par := *ddd amount := ParseStockCapital(util.ObjToString(par["stock_capital"])) investRel := Invest{FromCode: util.ObjToString(da["stock_name"]), ToCode: util.ObjToString(doc["company_name"]), Ratio: util.Float64All(par["stock_proportion"]), Amount: amount} err = InsertInvestRel(session, investRel) if err != nil { log.Println("InsertInvestRel", err, investRel) } } } } } } } } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } log.Println("结束~~~~~~~~~~~~~~~") } // dealCompanyPartner 处理企业投资关系 func dealCompanyPartner() { // 调用封装的连接函数 session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() //log.Println("session", session) defer util.Catch() sess := Mgo181.GetMgoConn() defer Mgo181.DestoryMongoConn(sess) it := sess.DB("mixdata").C("company_partner").Find(nil).Select(nil).Iter() count := 0 //realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current:", count, tmp["stock_name"], tmp["company_name"]) } //个人企业跳过 if util.IntAll(tmp["is_personal"]) == 1 { continue } if util.IntAll(tmp["use_flag"]) > 0 { continue } company1 := Legal{ Name: util.ObjToString(tmp["stock_name"]), Code: util.ObjToString(tmp["stock_name_id"]), } /** 1.stock_name_id 为空,直接跳过 2.stock_name 为空,直接跳过 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过 4.stock_name 不含中文,跳过 */ company2 := Legal{ Name: util.ObjToString(tmp["company_name"]), Code: util.ObjToString(tmp["company_id"]), } res1, err1 := InsertCompany(session, company1) if err != nil { log.Println("InsertCompany err", res1, err1) } res2, err2 := InsertCompany(session, company2) if err != nil { log.Println("InsertCompany err", res2, err2) } if err1 != nil || err2 != nil { continue } //边 amount := ParseStockCapital(util.ObjToString(tmp["stock_capital"])) investRel := Invest{FromCode: res1, ToCode: res2, Ratio: util.Float64All(tmp["stock_proportion"]), Amount: amount} err = InsertInvestRel(session, investRel) if err != nil { log.Println("InsertInvestRel", err, investRel) } } } // InsertCompany 插入公司节点的方法 func InsertCompany(session *nebula.Session, company Legal) (string, error) { // 构建插入公司节点的查询 //insertCompanyStmt := ` // USE ` + Table_Space + `; // INSERT VERTEX company(company_id,name) VALUES "%s":("%s", "%s"); //` //insertCompanyStmt = fmt.Sprintf(insertCompanyStmt, inv.id, inv.company_id, inv.company_name) query := fmt.Sprintf(` USE `+Table_Space+`; INSERT VERTEX Legal(name, code, type, state ) VALUES "%s":("%s", "%s", "%s", "%s") `, company.Name, company.Name, company.Code, company.Type, company.State) // 执行查询 result, err := session.Execute(query) if err != nil { log.Println("InsertCompany", result) return "", err } // 打印返回结果 //fmt.Println("Insert Company Result:", result) // 返回节点ID(通常可以通过返回的结果中的 "_vid" 字段获取) return company.Name, nil } // InsertInvestRel 插入投资关系边的方法 func InsertInvestRel(session *nebula.Session, investRel Invest) error { // 构建插入投资关系边的查询 query := fmt.Sprintf(` USE `+Table_Space+`; INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f) `, investRel.FromCode, investRel.ToCode, investRel.Amount, investRel.Ratio) // 执行查询 result, err := session.Execute(query) if err != nil { log.Println("InsertInvestRel", result) return err } // 打印返回结果 //fmt.Println("Insert InvestRel Result:", result) return nil } func ParseStockCapital(raw string) float64 { raw = strings.TrimSpace(raw) // 默认单位:万元人民币 exchangeRateUSD := 7.0 // 匹配数值部分(可能带小数) re := regexp.MustCompile(`([\d.]+)`) match := re.FindStringSubmatch(raw) if len(match) < 2 { return 0 } value, _ := strconv.ParseFloat(match[1], 64) // 判断单位并转换 switch { case strings.Contains(raw, "万美元"): value *= exchangeRateUSD // 转换成人民币 case strings.Contains(raw, "元") || strings.Contains(raw, "人民币"): if strings.Contains(raw, "万元") || strings.Contains(raw, "万") { // 已经是万元单位,无需处理 } else { // 是“元”,需要除以1万 value = value / 10000 } default: // 可能是纯数字,默认视为“万元” } return value } /* 根据公司名称和层级向上挖掘,获取顶点和边; maxLevel 挖掘层级数量; direction 0:双向挖掘 -1:向上挖掘 1:向下挖掘 person true:保留自然人股东 false:不保留自然人股东 */ func GetInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) { verter := map[string]InvestVertex{} edges := []InvestEdge{} if direction == 0 { v1, e1 := getInvByLevel(company_name, maxLevel, 1, person) v2, e2 := getInvByLevel(company_name, maxLevel, -1, person) for k, v := range v1 { verter[k] = v } for k, v := range v2 { verter[k] = v } edges = append(edges, e1...) edges = append(edges, e2...) } else { verter, edges = getInvByLevel(company_name, maxLevel, direction, person) } return verter, edges } func getInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) { data, _ := Mgo181.FindOne("company_base", map[string]interface{}{ "company_name": company_name, }) company_id := fmt.Sprint((*data)["company_id"]) credit_no := fmt.Sprint((*data)["credit_no"]) var edges = []InvestEdge{} //记录边 var verter = map[string]InvestVertex{} //有效顶点 // 初始化队列和访问记录 type node struct { companyID, companyName, creditNo string level int } queue := []node{{companyID: company_id, companyName: company_name, creditNo: credit_no, level: 1}} visited := make(map[string]bool) for len(queue) > 0 { current := queue[0] if _, ok := verter[current.companyID]; !ok { verter[current.companyID] = InvestVertex{ id: current.companyID, company_id: current.companyID, company_name: current.companyName, credit_no: current.creditNo, } } queue = queue[1:] if visited[current.companyID] || current.level > maxLevel { // 防止重复处理和超过最大层级 continue } visited[current.companyID] = true query := map[string]interface{}{"company_id": current.companyID} if direction > 0 { query = map[string]interface{}{"stock_name_id": current.companyID} } partners, _ := Mgo181.Find("company_partner", query, nil, nil, false, -1, -1) // 处理股东数据 for _, p := range *partners { //log.Println(direction, p) if fmt.Sprint(p["is_history"]) == "1" { continue } // 构建投资关系 inv := InvestEdge{ company_id: fmt.Sprint(p["company_id"]), company_name: fmt.Sprint(p["company_name"]), stock_id: fmt.Sprint(p["stock_name_id"]), stock_name: fmt.Sprint(p["stock_name"]), stock_rate: convertStockCapitalToFloat(fmt.Sprint(p["stock_proportion"])), stock_amount: convertStockCapitalToFloat(fmt.Sprint(p["stock_capital"])), stock_level: current.level, stock_type: 0, // 默认机构股东 } edges = append(edges, inv) // 根据股东类型是否继续挖掘 if fmt.Sprint(p["stock_type"]) == "自然人股东" || convertStockCapitalToFloat(fmt.Sprint(p["is_personal"])) > 0 { inv.stock_type = 1 if _, ok := verter[inv.stock_id]; !ok && person { verter[inv.stock_id] = InvestVertex{ id: inv.stock_id, company_id: inv.stock_id, company_name: inv.stock_name, } } } else { where1 := map[string]interface{}{ "company_name": inv.company_name, } where2 := map[string]interface{}{ "company_name": inv.stock_name, } company, _ := Mgo181.FindOne("company_base", where1) stock, _ := Mgo181.FindOne("company_base", where2) // 机构股东加入队列继续穿透 if direction > 0 { //向下挖掘 if !visited[inv.company_id] { queue = append(queue, node{ companyID: inv.company_id, companyName: inv.company_name, creditNo: util.ObjToString((*company)["credit_no"]), level: current.level + 1, }) } } else { //向上挖掘 if !visited[inv.stock_id] { queue = append(queue, node{ companyID: inv.stock_id, companyName: inv.stock_name, creditNo: util.ObjToString((*stock)["credit_no"]), level: current.level + 1, }) } } } } //log.Printf("已处理层级%d,当前队列深度%d", current.level, len(queue)) } return verter, edges } func convertStockCapitalToFloat(str string) float64 { // 查找匹配的数字 match := re.FindString(str) if match == "" { return 0 } // 将匹配到的数字字符串转换为浮点数 result, err := strconv.ParseFloat(match, 64) if err != nil { return 0 } return result }