main.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "github.com/olivere/elastic/v7"
  8. nebula "github.com/vesoft-inc/nebula-go/v3"
  9. "io"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "net/http"
  14. "regexp"
  15. "strconv"
  16. "strings"
  17. )
  18. var (
  19. re = regexp.MustCompile(`[-+]?[0-9]*\.?[0-9]+`)
  20. HostList = []nebula.HostAddress{{Host: "114.116.213.97", Port: 9669}}
  21. //HostList = []nebula.HostAddress{{Host: "127.0.0.1", Port: 9669}}
  22. UserName = "root"
  23. PassWord = "jianyu@123"
  24. Mgo181 *mongodb.MongodbSim
  25. Table_Space = "legal_profile"
  26. WorkerCount = 5
  27. BatchSize = 100
  28. )
  29. // Legal 代表公司节点的结构体
  30. type Legal struct {
  31. Id string
  32. Name string
  33. Code string
  34. Type string //类型,企业,事业单哪位,政府部门
  35. State string //状态,有效/无效;不是存续、在营、开业、在册
  36. }
  37. // Invest 代表公司之间的投资关系边的结构体
  38. type Invest struct {
  39. FromCode string
  40. ToCode string
  41. Amount float64
  42. Ratio float64
  43. }
  44. type InsertJob struct {
  45. Companies []Legal
  46. Relations []Invest
  47. }
  48. type InvestVertex struct { //顶点
  49. id string
  50. company_id string
  51. company_name string
  52. credit_no string
  53. }
  54. type InvestEdge struct { //边
  55. company_id string
  56. company_name string
  57. stock_id string
  58. stock_name string
  59. stock_rate float64
  60. stock_amount float64
  61. stock_level int
  62. stock_type int //0企业股东 1自然人股东
  63. }
  64. // ConnectToNebula 封装数据库连接函数
  65. func ConnectToNebula(hosts []nebula.HostAddress, username, password string) (*nebula.Session, *nebula.ConnectionPool, error) {
  66. // 创建连接池配置
  67. config := nebula.GetDefaultConf()
  68. config.UseHTTP2 = false
  69. config.HandshakeKey = ""
  70. // 初始化连接池
  71. pool, err := nebula.NewConnectionPool(hosts, config, nebula.DefaultLogger{})
  72. if err != nil {
  73. return nil, nil, err
  74. }
  75. // 获取会话
  76. session, err := pool.GetSession(username, password)
  77. if err != nil {
  78. pool.Close()
  79. return nil, nil, err
  80. }
  81. return session, pool, nil
  82. }
  83. type CheckRequest struct {
  84. Names []string `json:"names"`
  85. Deep int `json:"deep"`
  86. }
  87. func main() {
  88. //InitMgo()
  89. //dda()
  90. //dealCompanyBase22()
  91. //dealCompanyBase() //迭代company_base 处理企业数据
  92. //batchDealGraph() // 迭代es 处理企业数据;
  93. //
  94. log.Println("数据处理完毕!!!!!!!")
  95. //封装对外提供的HTTP
  96. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  97. if err != nil {
  98. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  99. }
  100. defer pool.Close()
  101. defer session.Release()
  102. // 初始化 Gin 路由
  103. r := gin.Default()
  104. // 注册 POST 接口
  105. r.POST("/check-relations", func(c *gin.Context) {
  106. var req CheckRequest
  107. if err := c.ShouldBindJSON(&req); err != nil {
  108. c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数无效"})
  109. return
  110. }
  111. results, err := CheckLegalRelations(session, req.Names, req.Deep)
  112. if err != nil {
  113. c.JSON(http.StatusInternalServerError, gin.H{"error": "查询失败", "details": err.Error()})
  114. return
  115. }
  116. c.JSON(http.StatusOK, results)
  117. })
  118. // 启动服务
  119. r.Run(":8080")
  120. }
  121. func dda() {
  122. name := "北京拓普丰联信息科技股份有限公司"
  123. rea, resb := GetInvByLevel(name, 5, 0, false)
  124. // 调用封装的连接函数
  125. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  126. if err != nil {
  127. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  128. }
  129. defer pool.Close()
  130. defer session.Release()
  131. for _, v := range rea {
  132. d := Legal{
  133. Id: v.company_id,
  134. Name: v.company_name,
  135. Code: v.credit_no,
  136. Type: "企业",
  137. }
  138. res, err := InsertCompany(session, d)
  139. if err != nil {
  140. log.Println(err, res)
  141. }
  142. }
  143. for _, v := range resb {
  144. d := Invest{
  145. FromCode: v.stock_id,
  146. ToCode: v.company_id,
  147. Amount: v.stock_amount,
  148. Ratio: v.stock_rate,
  149. }
  150. err := InsertInvestRel(session, d)
  151. if err != nil {
  152. log.Println(err, d)
  153. }
  154. }
  155. }
  156. // getQyxytData 获取企业数据
  157. func getQyxytData() {
  158. // 调用封装的连接函数
  159. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  160. if err != nil {
  161. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  162. }
  163. defer pool.Close()
  164. defer session.Release()
  165. url := "http://172.17.4.184:19908"
  166. //url := "http://127.0.0.1:19908"
  167. username := "jybid"
  168. password := "Top2023_JEB01i@31"
  169. index := "qyxy" //索引名称
  170. // 创建 Elasticsearch 客户端
  171. client, err := elastic.NewClient(
  172. elastic.SetURL(url),
  173. elastic.SetBasicAuth(username, password),
  174. elastic.SetSniff(false),
  175. )
  176. if err != nil {
  177. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  178. }
  179. //---------------------------//
  180. //query := elastic.NewBoolQuery()
  181. //query.Must(elastic.NewMatchQuery("business_scope", "招投标代理"))
  182. //query.Must(elastic.NewTermQuery("company_city", "北京市"))
  183. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  184. query := elastic.NewBoolQuery().
  185. //北京,天津,河北,上海,江苏,浙江,安徽
  186. //Must(elastic.NewTermQuery("area", "北京市")).
  187. //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  188. MustNot(
  189. elastic.NewTermQuery("company_type", "个体工商户"),
  190. ).
  191. //Must(elastic.NewTermQuery("company_name", "河南拓普计算机网络工程有限公司"))
  192. Must(elastic.NewTermsQuery("company_area", "河南"))
  193. ctx := context.Background()
  194. //开始滚动搜索
  195. scrollID := ""
  196. scroll := "10m"
  197. searchSource := elastic.NewSearchSource().
  198. Query(query).
  199. Size(10000).
  200. Sort("_doc", true) //升序排序
  201. //Sort("_doc", false) //降序排序
  202. searchService := client.Scroll(index).
  203. Size(10000).
  204. Scroll(scroll).
  205. SearchSource(searchSource)
  206. res, err := searchService.Do(ctx)
  207. if err != nil {
  208. if err == io.EOF {
  209. fmt.Println("没有数据")
  210. } else {
  211. panic(err)
  212. }
  213. }
  214. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  215. fmt.Println("总数是:", res.TotalHits())
  216. total := 0
  217. for len(res.Hits.Hits) > 0 {
  218. for _, hit := range res.Hits.Hits {
  219. var doc map[string]interface{}
  220. err := json.Unmarshal(hit.Source, &doc)
  221. if err != nil {
  222. log.Printf("解析文档失败:%s", err)
  223. continue
  224. }
  225. company1 := Legal{
  226. Name: util.ObjToString(doc["company_name"]),
  227. Code: util.ObjToString(doc["credit_no"]),
  228. Type: "企业",
  229. }
  230. /**
  231. 1.stock_name_id 为空,直接跳过
  232. 2.stock_name 为空,直接跳过
  233. 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
  234. 4.stock_name 不含中文,跳过
  235. */
  236. if util.ObjToString(doc["company_name"]) == "" || util.ObjToString(doc["credit_no"]) == "" {
  237. continue
  238. }
  239. if strings.Contains(util.ObjToString(doc["company_name"]), "已除名") {
  240. continue
  241. }
  242. res1, err1 := InsertCompany(session, company1)
  243. if err != nil {
  244. log.Println("InsertCompany err", res1, err1)
  245. }
  246. //边
  247. if partners, ok := doc["partners"].([]interface{}); ok {
  248. for _, partner := range partners {
  249. if da, ok := partner.(map[string]interface{}); ok {
  250. if util.ObjToString(da["stock_type"]) == "企业法人" {
  251. if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" {
  252. continue
  253. } else {
  254. company2 := Legal{
  255. Name: util.ObjToString(da["stock_name"]),
  256. Code: util.ObjToString(da["identify_no"]),
  257. Type: "企业",
  258. }
  259. res2, err2 := InsertCompany(session, company2)
  260. if err2 != nil {
  261. log.Println("InsertCompany err", res2, err2)
  262. }
  263. //
  264. if err1 != nil || err2 != nil {
  265. continue
  266. }
  267. where := map[string]interface{}{
  268. "company_name": util.ObjToString(doc["company_name"]),
  269. "stock_name": util.ObjToString(da["stock_name"]),
  270. }
  271. ddd, _ := Mgo181.FindOne("company_partner", where)
  272. if len(*ddd) > 0 {
  273. par := *ddd
  274. amount := ParseStockCapital(util.ObjToString(par["stock_capital"]))
  275. investRel := Invest{FromCode: util.ObjToString(da["stock_name"]), ToCode: util.ObjToString(doc["company_name"]), Ratio: util.Float64All(par["stock_proportion"]), Amount: amount}
  276. err = InsertInvestRel(session, investRel)
  277. if err != nil {
  278. log.Println("InsertInvestRel", err, investRel)
  279. }
  280. }
  281. }
  282. }
  283. }
  284. }
  285. }
  286. }
  287. total = total + len(res.Hits.Hits)
  288. scrollID = res.ScrollId
  289. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  290. log.Println("current count:", total)
  291. if err != nil {
  292. if err == io.EOF {
  293. // 滚动到最后一批数据,退出循环
  294. break
  295. }
  296. log.Println("滚动搜索失败:", err, res)
  297. break // 处理错误时退出循环
  298. }
  299. }
  300. // 在循环外调用 ClearScroll
  301. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  302. if err != nil {
  303. log.Printf("清理滚动搜索失败:%s", err)
  304. }
  305. log.Println("结束~~~~~~~~~~~~~~~")
  306. }
  307. // dealCompanyPartner 处理企业投资关系
  308. func dealCompanyPartner() {
  309. // 调用封装的连接函数
  310. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  311. if err != nil {
  312. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  313. }
  314. defer pool.Close()
  315. defer session.Release()
  316. //log.Println("session", session)
  317. defer util.Catch()
  318. sess := Mgo181.GetMgoConn()
  319. defer Mgo181.DestoryMongoConn(sess)
  320. it := sess.DB("mixdata").C("company_partner").Find(nil).Select(nil).Iter()
  321. count := 0
  322. //realNum := 0
  323. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  324. if count%10000 == 0 {
  325. log.Println("current:", count, tmp["stock_name"], tmp["company_name"])
  326. }
  327. //个人企业跳过
  328. if util.IntAll(tmp["is_personal"]) == 1 {
  329. continue
  330. }
  331. if util.IntAll(tmp["use_flag"]) > 0 {
  332. continue
  333. }
  334. company1 := Legal{
  335. Name: util.ObjToString(tmp["stock_name"]),
  336. Code: util.ObjToString(tmp["stock_name_id"]),
  337. }
  338. /**
  339. 1.stock_name_id 为空,直接跳过
  340. 2.stock_name 为空,直接跳过
  341. 3.stock_name 含有 已除名/不适宜/待清理/拟吊销 ,直接跳过
  342. 4.stock_name 不含中文,跳过
  343. */
  344. company2 := Legal{
  345. Name: util.ObjToString(tmp["company_name"]),
  346. Code: util.ObjToString(tmp["company_id"]),
  347. }
  348. res1, err1 := InsertCompany(session, company1)
  349. if err != nil {
  350. log.Println("InsertCompany err", res1, err1)
  351. }
  352. res2, err2 := InsertCompany(session, company2)
  353. if err != nil {
  354. log.Println("InsertCompany err", res2, err2)
  355. }
  356. if err1 != nil || err2 != nil {
  357. continue
  358. }
  359. //边
  360. amount := ParseStockCapital(util.ObjToString(tmp["stock_capital"]))
  361. investRel := Invest{FromCode: res1, ToCode: res2, Ratio: util.Float64All(tmp["stock_proportion"]), Amount: amount}
  362. err = InsertInvestRel(session, investRel)
  363. if err != nil {
  364. log.Println("InsertInvestRel", err, investRel)
  365. }
  366. }
  367. }
  368. // InsertCompany 插入公司节点的方法
  369. func InsertCompany(session *nebula.Session, company Legal) (string, error) {
  370. // 构建插入公司节点的查询
  371. //insertCompanyStmt := `
  372. // USE ` + Table_Space + `;
  373. // INSERT VERTEX company(company_id,name) VALUES "%s":("%s", "%s");
  374. //`
  375. //insertCompanyStmt = fmt.Sprintf(insertCompanyStmt, inv.id, inv.company_id, inv.company_name)
  376. query := fmt.Sprintf(`
  377. USE `+Table_Space+`;
  378. INSERT VERTEX Legal(name, code, type, state ) VALUES "%s":("%s", "%s", "%s", "%s")
  379. `, company.Id, company.Name, company.Code, company.Type, company.State)
  380. // 执行查询
  381. result, err := session.Execute(query)
  382. if err != nil {
  383. log.Println("InsertCompany", result)
  384. return "", err
  385. }
  386. // 打印返回结果
  387. //fmt.Println("Insert Company Result:", result)
  388. // 返回节点ID(通常可以通过返回的结果中的 "_vid" 字段获取)
  389. return company.Name, nil
  390. }
  391. // InsertInvestRel 插入投资关系边的方法
  392. func InsertInvestRel(session *nebula.Session, investRel Invest) error {
  393. // 构建插入投资关系边的查询
  394. query := fmt.Sprintf(`
  395. USE `+Table_Space+`;
  396. INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f)
  397. `, investRel.FromCode, investRel.ToCode, investRel.Amount, investRel.Ratio)
  398. // 执行查询
  399. result, err := session.Execute(query)
  400. if err != nil {
  401. log.Println("InsertInvestRel", result)
  402. return err
  403. }
  404. // 打印返回结果
  405. //fmt.Println("Insert InvestRel Result:", result)
  406. return nil
  407. }
  408. func ParseStockCapital(raw string) float64 {
  409. raw = strings.TrimSpace(raw)
  410. // 默认单位:万元人民币
  411. exchangeRateUSD := 7.0
  412. // 匹配数值部分(可能带小数)
  413. re := regexp.MustCompile(`([\d.]+)`)
  414. match := re.FindStringSubmatch(raw)
  415. if len(match) < 2 {
  416. return 0
  417. }
  418. value, _ := strconv.ParseFloat(match[1], 64)
  419. // 判断单位并转换
  420. switch {
  421. case strings.Contains(raw, "万美元"):
  422. value *= exchangeRateUSD // 转换成人民币
  423. case strings.Contains(raw, "元") || strings.Contains(raw, "人民币"):
  424. if strings.Contains(raw, "万元") || strings.Contains(raw, "万") {
  425. // 已经是万元单位,无需处理
  426. } else {
  427. // 是“元”,需要除以1万
  428. value = value / 10000
  429. }
  430. default:
  431. // 可能是纯数字,默认视为“万元”
  432. }
  433. return value
  434. }
  435. /*
  436. 根据公司名称和层级向上挖掘,获取顶点和边;
  437. maxLevel 挖掘层级数量;
  438. direction 0:双向挖掘 -1:向上挖掘 1:向下挖掘
  439. person true:保留自然人股东 false:不保留自然人股东
  440. */
  441. func GetInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
  442. verter := map[string]InvestVertex{}
  443. edges := []InvestEdge{}
  444. if direction == 0 {
  445. v1, e1 := getInvByLevel(company_name, maxLevel, 1, person)
  446. v2, e2 := getInvByLevel(company_name, maxLevel, -1, person)
  447. for k, v := range v1 {
  448. verter[k] = v
  449. }
  450. for k, v := range v2 {
  451. verter[k] = v
  452. }
  453. edges = append(edges, e1...)
  454. edges = append(edges, e2...)
  455. } else {
  456. verter, edges = getInvByLevel(company_name, maxLevel, direction, person)
  457. }
  458. return verter, edges
  459. }
  460. func getInvByLevel(company_name string, maxLevel int, direction int, person bool) (map[string]InvestVertex, []InvestEdge) {
  461. data, _ := Mgo181.FindOne("company_base", map[string]interface{}{
  462. "company_name": company_name,
  463. })
  464. company_id := fmt.Sprint((*data)["company_id"])
  465. credit_no := fmt.Sprint((*data)["credit_no"])
  466. var edges = []InvestEdge{} //记录边
  467. var verter = map[string]InvestVertex{} //有效顶点
  468. // 初始化队列和访问记录
  469. type node struct {
  470. companyID, companyName, creditNo string
  471. level int
  472. }
  473. queue := []node{{companyID: company_id, companyName: company_name, creditNo: credit_no, level: 1}}
  474. visited := make(map[string]bool)
  475. for len(queue) > 0 {
  476. current := queue[0]
  477. if _, ok := verter[current.companyID]; !ok {
  478. verter[current.companyID] = InvestVertex{
  479. id: current.companyID,
  480. company_id: current.companyID,
  481. company_name: current.companyName,
  482. credit_no: current.creditNo,
  483. }
  484. }
  485. queue = queue[1:]
  486. if visited[current.companyID] || current.level > maxLevel { // 防止重复处理和超过最大层级
  487. continue
  488. }
  489. visited[current.companyID] = true
  490. query := map[string]interface{}{"company_id": current.companyID}
  491. if direction > 0 {
  492. query = map[string]interface{}{"stock_name_id": current.companyID}
  493. }
  494. partners, _ := Mgo181.Find("company_partner", query, nil, nil, false, -1, -1)
  495. // 处理股东数据
  496. for _, p := range *partners {
  497. //log.Println(direction, p)
  498. if fmt.Sprint(p["is_history"]) == "1" {
  499. continue
  500. }
  501. // 构建投资关系
  502. inv := InvestEdge{
  503. company_id: fmt.Sprint(p["company_id"]),
  504. company_name: fmt.Sprint(p["company_name"]),
  505. stock_id: fmt.Sprint(p["stock_name_id"]),
  506. stock_name: fmt.Sprint(p["stock_name"]),
  507. stock_rate: convertStockCapitalToFloat(fmt.Sprint(p["stock_proportion"])),
  508. stock_amount: convertStockCapitalToFloat(fmt.Sprint(p["stock_capital"])),
  509. stock_level: current.level,
  510. stock_type: 0, // 默认机构股东
  511. }
  512. edges = append(edges, inv)
  513. // 根据股东类型是否继续挖掘
  514. if fmt.Sprint(p["stock_type"]) == "自然人股东" || convertStockCapitalToFloat(fmt.Sprint(p["is_personal"])) > 0 {
  515. inv.stock_type = 1
  516. if _, ok := verter[inv.stock_id]; !ok && person {
  517. verter[inv.stock_id] = InvestVertex{
  518. id: inv.stock_id,
  519. company_id: inv.stock_id,
  520. company_name: inv.stock_name,
  521. }
  522. }
  523. } else {
  524. where1 := map[string]interface{}{
  525. "company_name": inv.company_name,
  526. }
  527. where2 := map[string]interface{}{
  528. "company_name": inv.stock_name,
  529. }
  530. company, _ := Mgo181.FindOne("company_base", where1)
  531. stock, _ := Mgo181.FindOne("company_base", where2)
  532. // 机构股东加入队列继续穿透
  533. if direction > 0 { //向下挖掘
  534. if !visited[inv.company_id] {
  535. queue = append(queue, node{
  536. companyID: inv.company_id,
  537. companyName: inv.company_name,
  538. creditNo: util.ObjToString((*company)["credit_no"]),
  539. level: current.level + 1,
  540. })
  541. }
  542. } else { //向上挖掘
  543. if !visited[inv.stock_id] {
  544. queue = append(queue, node{
  545. companyID: inv.stock_id,
  546. companyName: inv.stock_name,
  547. creditNo: util.ObjToString((*stock)["credit_no"]),
  548. level: current.level + 1,
  549. })
  550. }
  551. }
  552. }
  553. }
  554. //log.Printf("已处理层级%d,当前队列深度%d", current.level, len(queue))
  555. }
  556. return verter, edges
  557. }
  558. func convertStockCapitalToFloat(str string) float64 {
  559. // 查找匹配的数字
  560. match := re.FindString(str)
  561. if match == "" {
  562. return 0
  563. }
  564. // 将匹配到的数字字符串转换为浮点数
  565. result, err := strconv.ParseFloat(match, 64)
  566. if err != nil {
  567. return 0
  568. }
  569. return result
  570. }