package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" nebula "github.com/vesoft-inc/nebula-go/v3" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "strings" "sync" "time" "unicode/utf8" ) func dealCompanyBase22() { log.Println("dealCompanyBase", "开始处理数据") session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() defer util.Catch() sess := Mgo181.GetMgoConn() defer Mgo181.DestoryMongoConn(sess) it := sess.DB("mixdata").C("company_base").Find(nil).Sort("_id").Select(nil).Iter() //jobChan := make(chan InsertJob, WorkerCount*2) var wg sync.WaitGroup count := 0 // 新增一个 job 构建的协程池 buildChan := make(chan map[string]interface{}, WorkerCount*5) // 用于传递原始 Mongo 数据 var wgBuild sync.WaitGroup for i := 0; i < 2; i++ { wgBuild.Add(1) go func() { defer wgBuild.Done() count2 := 0 for tmp := range buildChan { count2++ if count2%10000 == 0 { log.Printf("已处理 %d 条,休息 1 秒...\n", count) time.Sleep(time.Second) } c1 := Legal{ Name: util.ObjToString(tmp["company_name"]), Code: util.ObjToString(tmp["credit_no"]), Type: "企业", } r1, err := InsertCompany(session, c1) if err != nil { log.Println("InsertCompany", r1, err) } // 耗时查询移到这里 rea, resb := GetInvByLevel(c1.Name, 1, 0, false) for _, v := range rea { d := Legal{ Name: v.company_name, Code: v.credit_no, Type: "企业", } r, err := InsertCompany(session, d) if err != nil { log.Println("InsertCompany", r, err) } } for _, v := range resb { d := Invest{ FromCode: v.stock_name, ToCode: v.company_name, Amount: v.stock_amount, Ratio: v.stock_rate, } err := InsertInvestRel(session, d) if err != nil { log.Println("InsertInvestRel", err, d) } } } }() } //realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current:", count, tmp["company_name"], tmp["_id"]) } if util.IntAll(tmp["use_flag"]) > 0 { continue } if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" { continue } if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" { continue } // 注销;关闭 if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") { continue } if utf8.RuneCountInString(util.ObjToString(tmp["company_name"])) < 5 { continue } buildChan <- tmp // 推送到异步处理构建 } close(buildChan) wgBuild.Wait() wg.Wait() log.Println("完成!") } func dealCompanyBase() { log.Println("dealCompanyBase", "开始处理数据") session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() defer util.Catch() sess := Mgo181.GetMgoConn() defer Mgo181.DestoryMongoConn(sess) it := sess.DB("mixdata").C("company_base").Find(nil).Select(nil).Iter() //jobChan := make(chan InsertJob, WorkerCount*2) var wg sync.WaitGroup count := 0 // 新增一个 job 构建的协程池 buildChan := make(chan map[string]interface{}, WorkerCount*10) // 用于传递原始 Mongo 数据 jobChan := make(chan InsertJob, WorkerCount*5) // 启动工作协程;存储数据 //for i := 0; i < WorkerCount; i++ { // wg.Add(1) // go insertWorker(session, &wg, jobChan) //} //写入图形数据库 for i := 0; i < WorkerCount; i++ { wg.Add(1) go func() { defer wg.Done() // 每个 worker 拿自己的 session localSession, err := pool.GetSession(UserName, PassWord) if err != nil { log.Println("获取 session 失败:", err) return } defer localSession.Release() insertWorker2(localSession, jobChan) }() } var wgBuild sync.WaitGroup for i := 0; i < WorkerCount; i++ { wgBuild.Add(1) go func() { defer wgBuild.Done() for tmp := range buildChan { c1 := Legal{ Id: util.ObjToString(tmp["company_id"]), Name: util.ObjToString(tmp["company_name"]), Code: util.ObjToString(tmp["credit_no"]), Type: "企业", } if utf8.RuneCountInString(c1.Name) < 5 { continue } job := InsertJob{} job.Companies = append(job.Companies, c1) // 耗时查询移到这里 rea, resb := GetInvByLevel(c1.Name, 2, 0, false) for _, v := range rea { d := Legal{ Id: v.company_id, Name: v.company_name, Code: v.credit_no, Type: "企业", } job.Companies = append(job.Companies, d) } for _, v := range resb { d := Invest{ FromCode: v.stock_id, ToCode: v.company_id, Amount: v.stock_amount, Ratio: v.stock_rate, } job.Relations = append(job.Relations, d) } jobChan <- job } }() } //realNum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current:", count, tmp["company_name"]) } if util.IntAll(tmp["use_flag"]) > 0 { continue } if util.ObjToString(tmp["company_type"]) == "个体工商户" || util.ObjToString(tmp["company_type"]) == "个人独资企业" { continue } if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["credit_no"]) == "" { continue } // 注销;关闭 if strings.Contains(util.ObjToString(tmp["company_status"]), "吊销") || strings.Contains(util.ObjToString(tmp["company_status"]), "注销") || strings.Contains(util.ObjToString(tmp["company_status"]), "关闭") { continue } buildChan <- tmp // 推送到异步处理构建 //1、处理点 //job := InsertJob{} //c1 := Legal{ // Name: util.ObjToString(tmp["company_name"]), // Code: util.ObjToString(tmp["credit_no"]), // Type: "企业", //} //if utf8.RuneCountInString(c1.Name) < 5 { // continue //} //job.Companies = append(job.Companies, c1) ////2、处理变 //rea, resb := GetInvByLevel(c1.Name, 1, 0, false) //for _, v := range rea { // d := Legal{ // Name: v.company_name, // Code: v.credit_no, // Type: "企业", // } // job.Companies = append(job.Companies, d) //} // //for _, v := range resb { // d := Invest{ // FromCode: v.stock_name, // ToCode: v.company_name, // Amount: v.stock_amount, // Ratio: v.stock_rate, // } // job.Relations = append(job.Relations, d) //} // //jobChan <- job } close(buildChan) wgBuild.Wait() close(jobChan) wg.Wait() log.Println("完成!") } func batchDealGraph() { session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() client, err := elastic.NewClient( elastic.SetURL("http://172.17.4.184:19908"), //elastic.SetURL("http://127.0.0.1:19908"), elastic.SetBasicAuth("jybid", "Top2023_JEB01i@31"), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } query := elastic.NewBoolQuery(). //北京,天津,河北,上海,江苏,浙江,安徽 //Must(elastic.NewTermQuery("area", "北京市")). //Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). MustNot( elastic.NewTermQuery("company_type", "个体工商户"), elastic.NewTermsQuery("company_status", "吊销", "注销"), ) //Must(elastic.NewTermQuery("company_name", "北京剑鱼信息技术有限公司")) //Must(elastic.NewTermsQuery("company_area", "河南")) ctx := context.Background() searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) searchService := client.Scroll("qyxy"). Size(10000). Scroll("5m"). SearchSource(searchSource) jobChan := make(chan InsertJob, WorkerCount*2) var wg sync.WaitGroup // 启动工作协程 for i := 0; i < WorkerCount; i++ { wg.Add(1) go insertWorker(session, &wg, jobChan) } total := 0 for { res, err := searchService.Do(ctx) if err == io.EOF { break } if err != nil { log.Println("scroll error:", err) break } fmt.Println("总数是:", res.TotalHits()) if len(res.Hits.Hits) == 0 { break } job := InsertJob{} for _, hit := range res.Hits.Hits { var doc map[string]interface{} if err := json.Unmarshal(hit.Source, &doc); err != nil { log.Println("解析失败", err) continue } c1 := Legal{ Id: util.ObjToString(doc["id"]), Name: util.ObjToString(doc["company_name"]), Code: util.ObjToString(doc["credit_no"]), Type: "企业", } ////存续、在营、开业、在册 //if strings.Contains(util.ObjToString(doc["company_status"]), "存续") || strings.Contains(util.ObjToString(doc["company_status"]), "在营") || strings.Contains(util.ObjToString(doc["company_status"]), "在册") || strings.Contains(util.ObjToString(doc["company_status"]), "开业") { // c1.State = "有效" //} else { // c1.State = "无效" //} if !strings.Contains(c1.Name, "公司") { continue } if c1.Name == "" || c1.Code == "" || strings.Contains(c1.Name, "已除名") { continue } if strings.Contains(util.ObjToString(doc["company_status"]), "吊销") || strings.Contains(util.ObjToString(doc["company_status"]), "注销") { continue } if utf8.RuneCountInString(c1.Name) < 5 { continue } job.Companies = append(job.Companies, c1) if partners, ok := doc["partners"].([]interface{}); ok { for _, partner := range partners { if da, ok := partner.(map[string]interface{}); ok { if !strings.Contains(util.ObjToString(da["stock_type"]), "自然人") && !strings.Contains(util.ObjToString(da["stock_type"]), "个人") { if util.ObjToString(da["stock_name"]) == "" || util.ObjToString(da["identify_no"]) == "" { continue } //1 where1 := map[string]interface{}{ "company_name": util.ObjToString(da["stock_name"]), } tmpBase, _ := Mgo181.FindOne("company_base", where1) if len(*tmpBase) > 0 { c2 := Legal{ Id: util.ObjToString((*tmpBase)["company_id"]), Name: util.ObjToString(da["stock_name"]), Code: util.ObjToString(da["identify_no"]), Type: "企业", } //if strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "存续") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在营") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "在册") || strings.Contains(util.ObjToString((*tmpBase)["company_status"]), "开业") { // c2.State = "有效" //} else { // c2.State = "无效" //} job.Companies = append(job.Companies, c2) //2 where := map[string]interface{}{ "company_name": util.ObjToString(doc["company_name"]), "stock_name": util.ObjToString(da["stock_name"]), } ddd, _ := Mgo181.FindOne("company_partner", where) if len(*ddd) > 0 { par := *ddd invest := Invest{ FromCode: c2.Id, ToCode: c1.Id, Ratio: util.Float64All(par["stock_proportion"]), Amount: ParseStockCapital(util.ObjToString(par["stock_capital"])), } job.Relations = append(job.Relations, invest) } } } } } } } jobChan <- job total += len(res.Hits.Hits) log.Println("处理总量:", total) } close(jobChan) wg.Wait() log.Println("完成!") } func insertWorker2(session *nebula.Session, jobs <-chan InsertJob) { for job := range jobs { // 分批插入公司 for i := 0; i < len(job.Companies); i += BatchSize { end := i + BatchSize if end > len(job.Companies) { end = len(job.Companies) } BatchInsertCompanies(session, job.Companies[i:end]) //time.Sleep(time.Second * 1) } // 分批插入投资关系 for i := 0; i < len(job.Relations); i += BatchSize { end := i + BatchSize if end > len(job.Relations) { end = len(job.Relations) } BatchInsertInvestRels(session, job.Relations[i:end]) //time.Sleep(time.Second * 1) } } } func insertWorker(session *nebula.Session, wg *sync.WaitGroup, jobs <-chan InsertJob) { defer wg.Done() for job := range jobs { // 分批插入公司 for i := 0; i < len(job.Companies); i += BatchSize { end := i + BatchSize if end > len(job.Companies) { end = len(job.Companies) } BatchInsertCompanies(session, job.Companies[i:end]) } // 分批插入投资关系 for i := 0; i < len(job.Relations); i += BatchSize { end := i + BatchSize if end > len(job.Relations) { end = len(job.Relations) } BatchInsertInvestRels(session, job.Relations[i:end]) } } } func BatchInsertCompanies(session *nebula.Session, companies []Legal) { if len(companies) == 0 { return } var sb strings.Builder sb.WriteString("USE " + Table_Space + "; ") for _, c := range companies { sb.WriteString(fmt.Sprintf(`INSERT VERTEX Legal(name, code, type, state) VALUES "%s":("%s", "%s", "%s", "%s");`, c.Id, c.Name, c.Code, c.Type, c.State)) } _, err := session.Execute(sb.String()) if err != nil { log.Println("批量插入公司失败:", err) } } func BatchInsertInvestRels(session *nebula.Session, rels []Invest) { if len(rels) == 0 { return } var sb strings.Builder sb.WriteString("USE " + Table_Space + "; ") for _, r := range rels { sb.WriteString(fmt.Sprintf(`INSERT EDGE Invest(amount, ratio) VALUES "%s"->"%s":(%f, %f);`, r.FromCode, r.ToCode, r.Amount, r.Ratio)) } _, err := session.Execute(sb.String()) if err != nil { log.Println("批量插入投资关系失败:", err) } } type InvestRelationResult struct { Related bool Paths []map[string]string CommonNodes []CommonNodeInfo } type CommonNodeInfo struct { VID string Name string } func CheckInvestRelationWithIntersection(session *nebula.Session, names []string, depth int) (bool, []map[string]string, []string, error) { if len(names) == 0 || depth <= 0 { return false, nil, nil, fmt.Errorf("invalid input") } // Step 1: 获取所有企业的 VID vids := make([]string, 0) vidToName := make(map[string]string) inputVIDSet := make(map[string]bool) for _, name := range names { query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name) resp, err := session.Execute(query) if err != nil || !resp.IsSucceed() { log.Printf("lookup failed for name %s: %v", name, err) continue } for _, row := range resp.GetRows() { vid := string(row.Values[0].GetSVal()) vids = append(vids, fmt.Sprintf(`"%s"`, vid)) vidToName[vid] = name inputVIDSet[vid] = true } } if len(vids) < 2 { return false, nil, nil, nil // 不足两个公司参与判断 } // Step 2: 查找路径(双向 Invest) fromClause := strings.Join(vids, ",") query := fmt.Sprintf(` GO FROM %s OVER Invest BIDIRECT UPTO %d STEPS YIELD src(edge) AS from, dst(edge) AS to `, fromClause, depth) resp, err := session.Execute(query) if err != nil || !resp.IsSucceed() { return false, nil, nil, fmt.Errorf("GO query failed: %v", err) } // Step 3: 统计路径和交集节点 relationPaths := make([]map[string]string, 0) nodeToSources := make(map[string]map[string]bool) // key: node, value: set of inputVIDs for _, row := range resp.GetRows() { from := string(row.Values[0].GetSVal()) to := string(row.Values[1].GetSVal()) // 记录路径 relationPaths = append(relationPaths, map[string]string{ "from": from, "to": to, }) // 记录 from 节点来源 if !inputVIDSet[from] { if nodeToSources[from] == nil { nodeToSources[from] = make(map[string]bool) } for _, vid := range vids { if from == strings.Trim(vid, `"`) { continue } if strings.Contains(query, vid) { nodeToSources[from][strings.Trim(vid, `"`)] = true } } } // 记录 to 节点来源 if !inputVIDSet[to] { if nodeToSources[to] == nil { nodeToSources[to] = make(map[string]bool) } for _, vid := range vids { if to == strings.Trim(vid, `"`) { continue } if strings.Contains(query, vid) { nodeToSources[to][strings.Trim(vid, `"`)] = true } } } } // Step 4: 找出出现在多个输入公司路径中的中间节点(交集) commonNodeVIDs := make([]string, 0) for node, sourceSet := range nodeToSources { if len(sourceSet) >= 2 { commonNodeVIDs = append(commonNodeVIDs, node) } } // Step 5: 查名称 intersectionNames := make([]string, 0) if len(commonNodeVIDs) > 0 { query := fmt.Sprintf(`FETCH PROP ON Legal %s YIELD Legal.name`, strings.Join(wrapInQuotes(commonNodeVIDs), ",")) resp, err := session.Execute(query) if err == nil && resp.IsSucceed() { for _, row := range resp.GetRows() { name := string(row.Values[1].GetSVal()) intersectionNames = append(intersectionNames, name) } } } found := len(intersectionNames) > 0 return found, relationPaths, intersectionNames, nil } func wrapInQuotes(ids []string) []string { result := make([]string, len(ids)) for i, id := range ids { result[i] = fmt.Sprintf(`"%s"`, id) } return result } func CheckInvestRelation1(session *nebula.Session, names []string, depth int) (bool, []map[string]string, error) { if len(names) == 0 || depth <= 0 { return false, nil, fmt.Errorf("invalid input") } // Step 1: 获取所有企业的 VID vids := make([]string, 0) nameSet := make(map[string]bool) for _, name := range names { query := fmt.Sprintf(`LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name) resp, err := session.Execute(query) if err != nil || !resp.IsSucceed() { log.Printf("lookup failed for name %s: %v", name, err) continue } for _, row := range resp.GetRows() { //vid := row.Values[0].GetSVal() vid := string(row.Values[0].GetSVal()) vids = append(vids, fmt.Sprintf(`"%s"`, vid)) nameSet[vid] = true } } if len(vids) == 0 { return false, nil, nil // 没有查出任何节点 } // Step 2: 构造 GO 查询路径 fromClause := strings.Join(vids, ",") query := fmt.Sprintf(` GO FROM %s OVER Invest UPTO %d STEPS YIELD src(edge) AS from, dst(edge) AS to `, fromClause, depth) resp, err := session.Execute(query) if err != nil { return false, nil, fmt.Errorf("GO query failed: %v", err) } if !resp.IsSucceed() { return false, nil, fmt.Errorf("Nebula error: %s", resp.GetErrorMsg()) } // Step 3: 分析路径结果 resultPaths := make([]map[string]string, 0) found := false for _, row := range resp.GetRows() { from := string(row.Values[0].GetSVal()) to := string(row.Values[1].GetSVal()) if nameSet[from] && nameSet[to] && from != to { found = true } resultPaths = append(resultPaths, map[string]string{ "from": from, "to": to, }) } return found, resultPaths, nil } func FindInvestmentRelations() { } //type PathRelation struct { // Companies []string // Paths []string //} // //func CheckLegalRelationsGraph(session *nebula.Session, names []string, deep int) (*PathRelation, error) { // // 查询 name -> vid 映射 // nameToVid := make(map[string]string) // vidToName := make(map[string]string) // for _, name := range names { // vid, err := getVidByName(session, name) // if err != nil { // log.Printf("获取 %s 的 VID 失败: %v", name, err) // continue // } // nameToVid[name] = vid // vidToName[vid] = name // } // // allPaths := [][]string{} // checked := make(map[string]bool) // // // 遍历所有组合 // for i := 0; i < len(names); i++ { // for j := i + 1; j < len(names); j++ { // a, b := names[i], names[j] // vidA, okA := nameToVid[a] // vidB, okB := nameToVid[b] // if !okA || !okB { // continue // } // key := vidA + "|" + vidB // if checked[key] { // continue // } // checked[key] = true // // if pathAB, _ := findPath(session, vidA, vidB, deep); len(pathAB) > 0 { // allPaths = append(allPaths, pathAB) // } // if pathBA, _ := findPath(session, vidB, vidA, deep); len(pathBA) > 0 { // allPaths = append(allPaths, pathBA) // } // // // 共同上级路径 // common, commonPaths := checkCommonAncestor(session, vidA, vidB, deep) // if common { // allPaths = append(allPaths, commonPaths) // } // } // } // // // 1. 收集所有涉及的 VID // vidSet := make(map[string]struct{}) // for _, path := range allPaths { // for _, vid := range path { // vidSet[vid] = struct{}{} // } // } // // // 2. 获取所有 VID 的公司名 // for vid := range vidSet { // if _, ok := vidToName[vid]; ok { // continue // } // query := fmt.Sprintf(`FETCH PROP ON Legal "%s" YIELD Legal.name`, vid) // resp, err := session.Execute(query) // if err != nil || resp.IsEmpty() { // continue // } // rows := resp.GetRows() // if len(rows) > 0 && len(rows[0].Values) > 0 && rows[0].Values[0].SVal != nil { // vidToName[vid] = string(rows[0].Values[0].SVal) // } // } // // // 3. 清洗路径并格式化输出 // companySet := make(map[string]struct{}) // result := &PathRelation{ // Companies: []string{}, // Paths: []string{}, // } // // for _, path := range allPaths { // namesPath := []string{} // last := "" // for _, vid := range path { // name, ok := vidToName[vid] // if !ok { // continue // } // if name == last { // continue // 去除重复节点 // } // namesPath = append(namesPath, name) // last = name // companySet[name] = struct{}{} // } // if len(namesPath) >= 2 { // result.Paths = append(result.Paths, strings.Join(namesPath, "->")) // } // } // // for name := range companySet { // result.Companies = append(result.Companies, name) // } // sort.Strings(result.Companies) // return result, nil //} // //func checkCommonAncestor(session *nebula.Session, aVid, bVid string, deep int) (bool, []string) { // query := fmt.Sprintf(` // ( // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor // ) // INTERSECT // ( // GO 1 TO %d STEPS FROM "%s" OVER Invest REVERSELY YIELD dst(edge) AS ancestor // ); // `, deep, aVid, deep, bVid) // // resp, err := session.Execute(query) // if err != nil { // return false, nil // } // ancestors, err := getFirstColumnStrings(resp) // if err != nil || len(ancestors) == 0 { // return false, nil // } // // // 只返回第一个共同祖先的简单路径:a->ancestor->b // return true, []string{aVid, ancestors[0], bVid} //} // //func findPath(session *nebula.Session, fromVid, toVid string, maxStep int) ([]string, error) { // query := fmt.Sprintf(`FIND ALL PATH FROM "%s" TO "%s" OVER Invest UPTO %d STEPS YIELD path as p`, fromVid, toVid, maxStep) // resp, err := session.Execute(query) // if err != nil { // return nil, err // } // return getFirstColumnStrings(resp) //} // //func getVidByName(session *nebula.Session, name string) (string, error) { // query := fmt.Sprintf(` //USE `+Table_Space+`; //LOOKUP ON Legal WHERE Legal.name == "%s" YIELD id(vertex)`, name) // resp, err := session.Execute(query) // if err != nil { // return "", err // } // // values, err := getFirstColumnStrings(resp) // if err != nil || len(values) == 0 { // return "", fmt.Errorf("未找到公司: %s", name) // } // return values[0], nil //} // //func getFirstColumnStrings(resp *nebula.ResultSet) ([]string, error) { // if resp == nil { // return nil, fmt.Errorf("result set is nil") // } // // var values []string // for _, row := range resp.GetRows() { // if len(row.Values) == 0 { // continue // } // val := row.Values[0] // switch { // case val.SVal != nil: // values = append(values, string(val.SVal)) // case val.IVal != nil: // values = append(values, fmt.Sprintf("%d", *val.IVal)) // case val.BVal != nil: // values = append(values, fmt.Sprintf("%v", *val.BVal)) // default: // log.Printf("未知类型值: %+v", val) // } // } // return values, nil //}