package main import ( "context" "encoding/json" "github.com/dgraph-io/badger/v3" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "strings" "sync" "unicode/utf8" ) type Employee struct { EmployeeName string `bson:"employee_name"` Position string `bson:"position"` } type CompanyD struct { CreditNo string `bson:"credit_no"` CompanyID string `bson:"_id"` UseFlag int `bson:"use_flag"` CompanyName string `bson:"company_name"` Employees []Employee `bson:"employees"` CompanyStatus string `bson:"company_status"` } type EmployeeMatch struct { CreditNo1 string `bson:"credit_no_1"` CreditNo2 string `bson:"credit_no_2"` Name1 string `bson:"company_name_1"` Name2 string `bson:"company_name_2"` EmployeeName string `bson:"employee_name"` Position1 string `bson:"position_1"` Position2 string `bson:"position_2"` CompanyID1 string `bson:"company_id1"` CompanyID2 string `bson:"company_id2"` } // dealTsDongShi 处理企业 董事关系 到 MongoDB数据库 func dealTsDongShi() { ctx := context.Background() username := "SJZY_RWbid_ES" password := "SJZY@B4i4D5e6S" hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"} //hosts := []string{"127.0.0.1:27083"} optionsSet := map[string]string{} uri, err := BuildMongoURI(username, password, hosts, optionsSet) if err != nil { panic(err) } clientOptions := options.Client().ApplyURI(uri) //clientOptions.SetReadPreference(readpref.Primary()) //clientOptions.SetDirect(true) // 连接MongoDB client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { log.Println(err) } srcColl := client.Database("mixdata").Collection("qyxy_std") dstColl := client.Database("mixdata").Collection("employee_relation0424") // 打开 BadgerDB db, err := badger.Open(badger.DefaultOptions("wcc_employee_cache")) if err != nil { log.Fatal(err) } defer db.Close() // 设置并发处理 taskChan := make(chan CompanyD, 10000) var wg sync.WaitGroup worker := 10 // 启动 worker 处理任务 for i := 0; i < worker; i++ { wg.Add(1) go func() { defer wg.Done() for company := range taskChan { for _, emp := range company.Employees { name := strings.TrimSpace(emp.EmployeeName) if name == "" || strings.Contains(name, "无") || strings.Contains(name, "*") { continue } key := "employee_" + name currentInfo, _ := json.Marshal(map[string]string{ "credit_no": company.CreditNo, "company_id": company.CompanyID, "company_name": company.CompanyName, "position": emp.Position, }) err := db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) if err == badger.ErrKeyNotFound { return nil } return item.Value(func(val []byte) error { var prev map[string]string json.Unmarshal(val, &prev) if prev["credit_no"] != company.CreditNo { match := EmployeeMatch{ CreditNo1: prev["credit_no"], Name1: prev["company_name"], Position1: prev["position"], CreditNo2: company.CreditNo, Name2: company.CompanyName, Position2: emp.Position, EmployeeName: name, CompanyID1: prev["company_id"], CompanyID2: company.CompanyID, } _, err := dstColl.InsertOne(ctx, match) if err != nil { log.Println("insert error:", err) } } return nil }) }) if err != nil { log.Println("badger read error:", err) } // 更新缓存 _ = db.Update(func(txn *badger.Txn) error { return txn.Set([]byte(key), currentInfo) }) } } }() } // 分发任务 // 查询条件 where := map[string]interface{}{ "company_type": map[string]interface{}{ "$ne": "个体工商户", }, } cursor, _ := srcColl.Find(ctx, where) defer cursor.Close(ctx) count := 0 for cursor.Next(ctx) { count++ var c CompanyD if err := cursor.Decode(&c); err != nil { log.Println("Decode err", err) continue } if count%10000 == 0 { log.Println("current:", count, c) } if c.CreditNo == "" || len(c.Employees) == 0 { continue } if strings.Contains(c.CompanyStatus, "注销") || strings.Contains(c.CompanyStatus, "吊销") { continue } if c.CompanyName == "" || strings.Contains(c.CompanyName, "已除名") { continue } if utf8.RuneCountInString(c.CompanyName) < 5 { continue } if c.UseFlag > 0 { continue } if !strings.Contains(c.CompanyName, "公司") { continue } taskChan <- c } close(taskChan) wg.Wait() log.Println("done.") } // dealDsGraph 处理 董事 高管数据到 graph 图形数据库 func dealDsGraph() { session, pool, err := ConnectToNebula(HostList, UserName, PassWord) if err != nil { log.Fatalf("Failed to connect to Nebula Graph: %v", err) } defer pool.Close() defer session.Release() // sess := MgoQy.GetMgoConn() defer MgoQy.DestoryMongoConn(sess) it := sess.DB("mixdata").C("wcc_employee_relation0424").Find(nil).Sort("_id").Select(nil).Iter() jobChan := make(chan ExecutivesInvest, WorkerCount*2) var wg sync.WaitGroup // 启动工作协程 for i := 0; i < 5; i++ { wg.Add(1) go BatchInsertExecutivesInvestWork(session, &wg, jobChan) } count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current:", count, tmp["company_name_1"], tmp["company_name_2"]) } // if util.ObjToString(tmp["company_name_1"]) == "" || util.ObjToString(tmp["company_name_2"]) == "" || util.ObjToString(tmp["employee_name"]) == "" { continue } job := ExecutivesInvest{ FromCode: util.ObjToString(tmp["company_id1"]), ToCode: util.ObjToString(tmp["company_id2"]), Name: util.ObjToString(tmp["employee_name"]), } jobChan <- job } close(jobChan) wg.Wait() log.Println(" dealTsGraph 完成!") }