123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/gin-gonic/gin"
- "github.com/olivere/elastic/v7"
- nebula "github.com/vesoft-inc/nebula-go/v3"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "net/http"
- "regexp"
- "strconv"
- "strings"
- )
- var (
- re = regexp.MustCompile(`[-+]?[0-9]*\.?[0-9]+`)
- HostList = []nebula.HostAddress{{Host: "114.116.213.97", Port: 9669}}
- //HostList = []nebula.HostAddress{{Host: "127.0.0.1", Port: 9669}}
- UserName = "root"
- PassWord = "jianyu@123"
- Mgo181 *mongodb.MongodbSim
- //Table_Space = "legal_profile"
- Table_Space = "legal_profile"
- WorkerCount = 5
- BatchSize = 100
- )
- // Legal 代表公司节点的结构体
- type Legal struct {
- Id string
- Name string
- Code string
- Type string //类型,企业,事业单哪位,政府部门
- State string //状态,有效/无效;不是存续、在营、开业、在册
- }
- // Invest 代表公司之间的投资关系边的结构体
- type Invest struct {
- FromCode string
- ToCode string
- Amount float64
- Ratio float64
- }
- type InsertJob struct {
- Companies []Legal
- Relations []Invest
- }
- type InvestVertex struct { //顶点
- id string
- company_id string
- company_name string
- credit_no string
- }
- type InvestEdge struct { //边
- company_id string
- company_name string
- stock_id string
- stock_name string
- stock_rate float64
- stock_amount float64
- stock_level int
- stock_type int //0企业股东 1自然人股东
- }
- // ConnectToNebula 封装数据库连接函数
- func ConnectToNebula(hosts []nebula.HostAddress, username, password string) (*nebula.Session, *nebula.ConnectionPool, error) {
- // 创建连接池配置
- config := nebula.GetDefaultConf()
- config.UseHTTP2 = false
- config.HandshakeKey = ""
- // 初始化连接池
- pool, err := nebula.NewConnectionPool(hosts, config, nebula.DefaultLogger{})
- if err != nil {
- return nil, nil, err
- }
- // 获取会话
- session, err := pool.GetSession(username, password)
- if err != nil {
- pool.Close()
- return nil, nil, err
- }
- return session, pool, nil
- }
- type CheckRequest struct {
- Names []string `json:"names"`
- Deep int `json:"deep"`
- Stype int `json:"stype"` //0.简易模式,匹配到直接返回;1.匹配完所有的数据
- }
- type CheckResponse struct {
- Code int `json:"code"`
- Data []string `json:"data"`
- Msg string `json:"msg"`
- }
- func main() {
- //InitMgo()
- //dda()
- //dealCompanyBase22()
- //dealCompanyBase() //迭代company_base 处理企业数据
- //batchDealGraph() // 迭代es 处理企业数据;
- //
- //log.Println("数据处理完毕!!!!!!!")
- //return
- //2、封装对外提供的HTTP
- //session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- //if err != nil {
- // log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- //}
- //defer pool.Close()
- //defer session.Release()
- //// 初始化 Gin 路由
- //r := gin.Default()
- //// 注册 POST 接口
- //r.POST("/check-relations", func(c *gin.Context) {
- // var req CheckRequest
- // if err := c.ShouldBindJSON(&req); err != nil {
- // c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
- // return
- // }
- //
- // has, results, err := CheckLegalRelationships4(session, req.Names, req.Deep, req.Stype)
- // if err != nil {
- // res := CheckResponse{
- // Code: -1,
- // Data: results,
- // Msg: "请求失败",
- // }
- // c.JSON(http.StatusInternalServerError, res)
- // return
- // }
- //
- // res := CheckResponse{
- // Code: 200,
- // Data: results,
- // }
- // if has {
- // res.Msg = "存在投资关系"
- // } else {
- // res.Msg = "不存在投资关系"
- // }
- //
- // c.JSON(http.StatusOK, res)
- //})
- //3、改造方法,使用连接池,避免session过去//
- // 初始化 Gin 路由
- r := gin.Default()
- // 加载模板文件(你可以自定义路径)
- r.LoadHTMLGlob("templates/*")
- client, err := NewNebulaClient(HostList, UserName, PassWord)
- if err != nil {
- log.Fatal("连接失败:", err)
- }
- defer client.Close()
- // 注册 POST 接口
- // 提供 HTML 页面
- r.GET("/legal/graph", func(c *gin.Context) {
- c.HTML(http.StatusOK, "graph.html", nil)
- })
- r.POST("/check-relations", func(c *gin.Context) {
- var req CheckRequest
- if err := c.ShouldBindJSON(&req); err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
- return
- }
- has, results, err := client.CheckLegalRelationships(req.Names, req.Deep, req.Stype)
- if err != nil {
- res := CheckResponse{
- Code: -1,
- Data: results,
- Msg: "请求失败;" + err.Error(),
- }
- c.JSON(http.StatusInternalServerError, res)
- return
- }
- res := CheckResponse{
- Code: 200,
- Data: results,
- }
- if has {
- res.Msg = "存在投资关系"
- } else {
- res.Msg = "不存在投资关系"
- }
- c.JSON(http.StatusOK, res)
- })
- // 启动服务
- r.Run(":8080")
- }
- // dda 针对某个企业,单独生节点数据
- func dda() {
- name := "北京拓普丰联信息科技股份有限公司"
- rea, resb := GetInvByLevel(name, 5, 0, false)
- // 调用封装的连接函数
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- for _, v := range rea {
- d := Legal{
- Id: v.company_id,
- Name: v.company_name,
- Code: v.credit_no,
- Type: "企业",
- }
- res, err := InsertCompany(session, d)
- if err != nil {
- log.Println(err, res)
- }
- }
- for _, v := range resb {
- d := Invest{
- FromCode: v.stock_name,
- ToCode: v.company_name,
- Amount: v.stock_amount,
- Ratio: v.stock_rate,
- }
- err := InsertInvestRel(session, d)
- if err != nil {
- log.Println(err, d)
- }
- }
- }
- // getQyxytData 获取企业数据
- func getQyxytData() {
- // 调用封装的连接函数
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- index := "qyxy" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- //---------------------------//
- //query := elastic.NewBoolQuery()
- //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
- //query.Must(elastic.NewTermQuery("company_city", "北京市"))
- //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
- query := elastic.NewBoolQuery().
- //北京,天津,河北,上海,江苏,浙江,安徽
- //Must(elastic.NewTermQuery("area", "北京市")).
- //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
- MustNot(
- elastic.NewTermQuery("company_type", "个体工商户"),
- ).
- //Must(elastic.NewTermQuery("company_name", "河南拓普计算机网络工程有限公司"))
- Must(elastic.NewTermsQuery("company_area", "河南"))
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- company1 := Legal{
- Name: util.ObjToString(doc["company_name"]),
- Code: util.ObjToString(doc["credit_no"]),
- Type: "企业",
- }
- /**
- 1.stock_name_id 为空,直接跳过
- 2.stock_name 为空,直接跳过
- 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
- 4.stock_name 不含中文,跳过
- */
- if util.ObjToString(doc["company_name"]) == "" || util.ObjToString(doc["credit_no"]) == "" {
- continue
- }
- if strings.Contains(util.ObjToString(doc["company_name"]), "已除名") {
- continue
- }
- res1, err1 := InsertCompany(session, company1)
- if err != nil {
- log.Println("InsertCompany err", res1, err1)
- }
- //边
- if partners, ok := doc["partners"].([]interface{}); ok {
- for _, partner := range partners {
- if da, ok := partner.(map[string]interface{}); ok {
- if util.ObjToString(da["stock_type"]) == "企业法人" {
- if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
- continue
- } else {
- company2 := Legal{
- Name: util.ObjToString(da["stock_name"]),
- Code: util.ObjToString(da["identify_no"]),
- Type: "企业",
- }
- res2, err2 := InsertCompany(session, company2)
- if err2 != nil {
- log.Println("InsertCompany err", res2, err2)
- }
- //
- if err1 != nil || err2 != nil {
- continue
- }
- where := map[string]interface{}{
- "company_name": util.ObjToString(doc["company_name"]),
- "stock_name": util.ObjToString(da["stock_name"]),
- }
- ddd, _ := Mgo181.FindOne("company_partner", where)
- if len(*ddd) > 0 {
- par := *ddd
- amount := ParseStockCapital(util.ObjToString(par["stock_capital"]))
- investRel := Invest{FromCode: util.ObjToString(da["stock_name"]), ToCode: util.ObjToString(doc["company_name"]), Ratio: util.Float64All(par["stock_proportion"]), Amount: amount}
- err = InsertInvestRel(session, investRel)
- if err != nil {
- log.Println("InsertInvestRel", err, investRel)
- }
- }
- }
- }
- }
- }
- }
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- log.Println("结束~~~~~~~~~~~~~~~")
- }
- // dealCompanyPartner 处理企业投资关系
- func dealCompanyPartner() {
- // 调用封装的连接函数
- session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
- if err != nil {
- log.Fatalf("Failed to connect to Nebula Graph: %v", err)
- }
- defer pool.Close()
- defer session.Release()
- //log.Println("session", session)
- defer util.Catch()
- sess := Mgo181.GetMgoConn()
- defer Mgo181.DestoryMongoConn(sess)
- it := sess.DB("mixdata").C("company_partner").Find(nil).Select(nil).Iter()
- count := 0
- //realNum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%10000 == 0 {
- log.Println("current:", count, tmp["stock_name"], tmp["company_name"])
- }
- //个人企业跳过
- if util.IntAll(tmp["is_personal"]) == 1 {
- continue
- }
- if util.IntAll(tmp["use_flag"]) > 0 {
- continue
- }
- company1 := Legal{
- Name: util.ObjToString(tmp["stock_name"]),
- Code: util.ObjToString(tmp["stock_name_id"]),
- }
- /**
- 1.stock_name_id 为空,直接跳过
- 2.stock_name 为空,直接跳过
- 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
- 4.stock_name 不含中文,跳过
- */
- company2 := Legal{
- Name: util.ObjToString(tmp["company_name"]),
- Code: util.ObjToString(tmp["company_id"]),
- }
- res1, err1 := InsertCompany(session, company1)
- if err != nil {
- log.Println("InsertCompany err", res1, err1)
- }
- res2, err2 := InsertCompany(session, company2)
- if err != nil {
- log.Println("InsertCompany err", res2, err2)
- }
- if err1 != nil || err2 != nil {
- continue
- }
- //边
- amount := ParseStockCapital(util.ObjToString(tmp["stock_capital"]))
- investRel := Invest{FromCode: res1, ToCode: res2, Ratio: util.Float64All(tmp["stock_proportion"]), Amount: amount}
- err = InsertInvestRel(session, investRel)
- if err != nil {
- log.Println("InsertInvestRel", err, investRel)
- }
- }
- }
- // InsertCompany 插入公司节点的方法
- func InsertCompany(session *nebula.Session, company Legal) (string, error) {
- // 构建插入公司节点的查询
- //insertCompanyStmt := `
- // USE ` + Table_Space + `;
- // INSERT VERTEX company(company_id,name) VALUES "%s":("%s", "%s");
- //`
- //insertCompanyStmt = fmt.Sprintf(insertCompanyStmt, inv.id, inv.company_id, inv.company_name)
- query := fmt.Sprintf(`
- USE `+Table_Space+`;
- INSERT VERTEX Legal(name, code, type, state ) VALUES "%s":("%s", "%s", "%s", "%s")
- `, company.Name, company.Name, company.Code, company.Type, company.State)
- // 执行查询
- result, err := session.Execute(query)
- if err != nil {
- log.Println("InsertCompany", result)
- return "", err
- }
- // 打印返回结果
- //fmt.Println("Insert Company Result:", result)
- // 返回节点ID(通常可以通过返回的结果中的 "_vid" 字段获取)
- return company.Name, nil
- }
- // InsertInvestRel 插入投资关系边的方法
- func InsertInvestRel(session *nebula.Session, investRel Invest) error {
- // 构建插入投资关系边的查询
- query := fmt.Sprintf(`
- USE `+Table_Space+`;
- INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f)
- `, investRel.FromCode, investRel.ToCode, investRel.Amount, investRel.Ratio)
- // 执行查询
- result, err := session.Execute(query)
- if err != nil {
- log.Println("InsertInvestRel", result)
- return err
- }
- // 打印返回结果
- //fmt.Println("Insert InvestRel Result:", result)
- return nil
- }
- func ParseStockCapital(raw string) float64 {
- raw = strings.TrimSpace(raw)
- // 默认单位:万元人民币
- exchangeRateUSD := 7.0
- // 匹配数值部分(可能带小数)
- re := regexp.MustCompile(`([\d.]+)`)
- match := re.FindStringSubmatch(raw)
- if len(match) < 2 {
- return 0
- }
- value, _ := strconv.ParseFloat(match[1], 64)
- // 判断单位并转换
- switch {
- case strings.Contains(raw, "万美元"):
- value *= exchangeRateUSD // 转换成人民币
- case strings.Contains(raw, "元") || strings.Contains(raw, "人民币"):
- if strings.Contains(raw, "万元") || strings.Contains(raw, "万") {
- // 已经是万元单位,无需处理
- } else {
- // 是“元”,需要除以1万
- value = value / 10000
- }
- default:
- // 可能是纯数字,默认视为“万元”
- }
- return value
- }
- /*
- 根据公司名称和层级向上挖掘,获取顶点和边;
- maxLevel 挖掘层级数量;
- direction 0:双向挖掘 -1:向上挖掘 1:向下挖掘
- person true:保留自然人股东 false:不保留自然人股东
- */
- func GetInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
- verter := map[string]InvestVertex{}
- edges := []InvestEdge{}
- if direction == 0 {
- v1, e1 := getInvByLevel(company_name, maxLevel, 1, person)
- v2, e2 := getInvByLevel(company_name, maxLevel, -1, person)
- for k, v := range v1 {
- verter[k] = v
- }
- for k, v := range v2 {
- verter[k] = v
- }
- edges = append(edges, e1...)
- edges = append(edges, e2...)
- } else {
- verter, edges = getInvByLevel(company_name, maxLevel, direction, person)
- }
- return verter, edges
- }
- func getInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
- data, _ := Mgo181.FindOne("company_base", map[string]interface{}{
- "company_name": company_name,
- })
- company_id := fmt.Sprint((*data)["company_id"])
- credit_no := fmt.Sprint((*data)["credit_no"])
- var edges = []InvestEdge{} //记录边
- var verter = map[string]InvestVertex{} //有效顶点
- // 初始化队列和访问记录
- type node struct {
- companyID, companyName, creditNo string
- level int
- }
- queue := []node{{companyID: company_id, companyName: company_name, creditNo: credit_no, level: 1}}
- visited := make(map[string]bool)
- for len(queue) > 0 {
- current := queue[0]
- if _, ok := verter[current.companyID]; !ok {
- verter[current.companyID] = InvestVertex{
- id: current.companyID,
- company_id: current.companyID,
- company_name: current.companyName,
- credit_no: current.creditNo,
- }
- }
- queue = queue[1:]
- if visited[current.companyID] || current.level > maxLevel { // 防止重复处理和超过最大层级
- continue
- }
- visited[current.companyID] = true
- query := map[string]interface{}{"company_id": current.companyID}
- if direction > 0 {
- query = map[string]interface{}{"stock_name_id": current.companyID}
- }
- partners, _ := Mgo181.Find("company_partner", query, nil, nil, false, -1, -1)
- // 处理股东数据
- for _, p := range *partners {
- //log.Println(direction, p)
- if fmt.Sprint(p["is_history"]) == "1" {
- continue
- }
- // 构建投资关系
- inv := InvestEdge{
- company_id: fmt.Sprint(p["company_id"]),
- company_name: fmt.Sprint(p["company_name"]),
- stock_id: fmt.Sprint(p["stock_name_id"]),
- stock_name: fmt.Sprint(p["stock_name"]),
- stock_rate: convertStockCapitalToFloat(fmt.Sprint(p["stock_proportion"])),
- stock_amount: convertStockCapitalToFloat(fmt.Sprint(p["stock_capital"])),
- stock_level: current.level,
- stock_type: 0, // 默认机构股东
- }
- edges = append(edges, inv)
- // 根据股东类型是否继续挖掘
- if fmt.Sprint(p["stock_type"]) == "自然人股东" || convertStockCapitalToFloat(fmt.Sprint(p["is_personal"])) > 0 {
- inv.stock_type = 1
- if _, ok := verter[inv.stock_id]; !ok && person {
- verter[inv.stock_id] = InvestVertex{
- id: inv.stock_id,
- company_id: inv.stock_id,
- company_name: inv.stock_name,
- }
- }
- } else {
- where1 := map[string]interface{}{
- "company_name": inv.company_name,
- }
- where2 := map[string]interface{}{
- "company_name": inv.stock_name,
- }
- company, _ := Mgo181.FindOne("company_base", where1)
- stock, _ := Mgo181.FindOne("company_base", where2)
- // 机构股东加入队列继续穿透
- if direction > 0 { //向下挖掘
- if !visited[inv.company_id] {
- queue = append(queue, node{
- companyID: inv.company_id,
- companyName: inv.company_name,
- creditNo: util.ObjToString((*company)["credit_no"]),
- level: current.level + 1,
- })
- }
- } else { //向上挖掘
- if !visited[inv.stock_id] {
- queue = append(queue, node{
- companyID: inv.stock_id,
- companyName: inv.stock_name,
- creditNo: util.ObjToString((*stock)["credit_no"]),
- level: current.level + 1,
- })
- }
- }
- }
- }
- //log.Printf("已处理层级%d,当前队列深度%d", current.level, len(queue))
- }
- return verter, edges
- }
- func convertStockCapitalToFloat(str string) float64 {
- // 查找匹配的数字
- match := re.FindString(str)
- if match == "" {
- return 0
- }
- // 将匹配到的数字字符串转换为浮点数
- result, err := strconv.ParseFloat(match, 64)
- if err != nil {
- return 0
- }
- return result
- }
|