dongshi.go 8.7 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/dgraph-io/badger/v3"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "log"
  11. "strings"
  12. "sync"
  13. "unicode/utf8"
  14. )
  15. type Employee struct {
  16. EmployeeName string `bson:"employee_name"`
  17. Position string `bson:"position"`
  18. }
  19. type CompanyD struct {
  20. CreditNo string `bson:"credit_no"`
  21. CompanyID string `bson:"_id"`
  22. UseFlag int `bson:"use_flag"`
  23. CompanyName string `bson:"company_name"`
  24. Employees []Employee `bson:"employees"`
  25. CompanyStatus string `bson:"company_status"`
  26. }
  27. type EmployeeMatch struct {
  28. CreditNo1 string `bson:"credit_no_1"`
  29. CreditNo2 string `bson:"credit_no_2"`
  30. Name1 string `bson:"company_name_1"`
  31. Name2 string `bson:"company_name_2"`
  32. EmployeeName string `bson:"employee_name"`
  33. Position1 string `bson:"position_1"`
  34. Position2 string `bson:"position_2"`
  35. CompanyID1 string `bson:"company_id1"`
  36. CompanyID2 string `bson:"company_id2"`
  37. }
  38. // dealTsDongShi 处理企业 董事关系 到 MongoDB数据库
  39. func dealTsDongShi() {
  40. ctx := context.Background()
  41. username := "SJZY_RWbid_ES"
  42. password := "SJZY@B4i4D5e6S"
  43. hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"}
  44. //hosts := []string{"127.0.0.1:27083"}
  45. optionsSet := map[string]string{}
  46. uri, err := BuildMongoURI(username, password, hosts, optionsSet)
  47. if err != nil {
  48. panic(err)
  49. }
  50. clientOptions := options.Client().ApplyURI(uri)
  51. //clientOptions.SetReadPreference(readpref.Primary())
  52. //clientOptions.SetDirect(true)
  53. // 连接MongoDB
  54. client, err := mongo.Connect(context.Background(), clientOptions)
  55. if err != nil {
  56. log.Println(err)
  57. }
  58. srcColl := client.Database("mixdata").Collection("qyxy_std")
  59. dstColl := client.Database("mixdata").Collection("employee_relation0424")
  60. // 打开 BadgerDB
  61. db, err := badger.Open(badger.DefaultOptions("wcc_employee_cache"))
  62. if err != nil {
  63. log.Fatal(err)
  64. }
  65. defer db.Close()
  66. // 设置并发处理
  67. taskChan := make(chan CompanyD, 10000)
  68. var wg sync.WaitGroup
  69. worker := 10
  70. // 启动 worker 处理任务
  71. for i := 0; i < worker; i++ {
  72. wg.Add(1)
  73. go func() {
  74. defer wg.Done()
  75. for company := range taskChan {
  76. for _, emp := range company.Employees {
  77. name := strings.TrimSpace(emp.EmployeeName)
  78. if name == "" || strings.Contains(name, "无") || strings.Contains(name, "*") {
  79. continue
  80. }
  81. key := "employee_" + name
  82. currentInfo, _ := json.Marshal(map[string]string{
  83. "credit_no": company.CreditNo,
  84. "company_id": company.CompanyID,
  85. "company_name": company.CompanyName,
  86. "position": emp.Position,
  87. })
  88. err := db.View(func(txn *badger.Txn) error {
  89. item, err := txn.Get([]byte(key))
  90. if err == badger.ErrKeyNotFound {
  91. return nil
  92. }
  93. return item.Value(func(val []byte) error {
  94. var prev map[string]string
  95. json.Unmarshal(val, &prev)
  96. if prev["credit_no"] != company.CreditNo {
  97. match := EmployeeMatch{
  98. CreditNo1: prev["credit_no"],
  99. Name1: prev["company_name"],
  100. Position1: prev["position"],
  101. CreditNo2: company.CreditNo,
  102. Name2: company.CompanyName,
  103. Position2: emp.Position,
  104. EmployeeName: name,
  105. CompanyID1: prev["company_id"],
  106. CompanyID2: company.CompanyID,
  107. }
  108. _, err := dstColl.InsertOne(ctx, match)
  109. if err != nil {
  110. log.Println("insert error:", err)
  111. }
  112. }
  113. return nil
  114. })
  115. })
  116. if err != nil {
  117. log.Println("badger read error:", err)
  118. }
  119. // 更新缓存
  120. _ = db.Update(func(txn *badger.Txn) error {
  121. return txn.Set([]byte(key), currentInfo)
  122. })
  123. }
  124. }
  125. }()
  126. }
  127. // 分发任务
  128. // 查询条件
  129. where := map[string]interface{}{
  130. "company_type": map[string]interface{}{
  131. "$ne": "个体工商户",
  132. },
  133. }
  134. cursor, _ := srcColl.Find(ctx, where)
  135. defer cursor.Close(ctx)
  136. count := 0
  137. for cursor.Next(ctx) {
  138. count++
  139. var c CompanyD
  140. if err := cursor.Decode(&c); err != nil {
  141. log.Println("Decode err", err)
  142. continue
  143. }
  144. if count%10000 == 0 {
  145. log.Println("current:", count, c)
  146. }
  147. if c.CreditNo == "" || len(c.Employees) == 0 {
  148. continue
  149. }
  150. if strings.Contains(c.CompanyStatus, "注销") || strings.Contains(c.CompanyStatus, "吊销") {
  151. continue
  152. }
  153. if c.CompanyName == "" || strings.Contains(c.CompanyName, "已除名") {
  154. continue
  155. }
  156. if utf8.RuneCountInString(c.CompanyName) < 5 {
  157. continue
  158. }
  159. if c.UseFlag > 0 {
  160. continue
  161. }
  162. if !strings.Contains(c.CompanyName, "公司") {
  163. continue
  164. }
  165. taskChan <- c
  166. }
  167. close(taskChan)
  168. wg.Wait()
  169. log.Println("done.")
  170. }
  171. // dealDsGraph 处理 董事 高管数据到 graph 图形数据库
  172. func dealDsGraph() {
  173. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  174. if err != nil {
  175. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  176. }
  177. defer pool.Close()
  178. defer session.Release()
  179. //
  180. sess := MgoQy.GetMgoConn()
  181. defer MgoQy.DestoryMongoConn(sess)
  182. it := sess.DB("mixdata").C("wcc_employee_relation0424").Find(nil).Sort("_id").Select(nil).Iter()
  183. jobChan := make(chan ExecutivesInvest, WorkerCount*2)
  184. var wg sync.WaitGroup
  185. // 启动工作协程
  186. for i := 0; i < 5; i++ {
  187. wg.Add(1)
  188. go BatchInsertExecutivesInvestWork(session, &wg, jobChan)
  189. }
  190. count := 0
  191. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  192. if count%10000 == 0 {
  193. log.Println("current:", count, tmp["company_name_1"], tmp["company_name_2"])
  194. }
  195. //
  196. if util.ObjToString(tmp["company_name_1"]) == "" || util.ObjToString(tmp["company_name_2"]) == "" || util.ObjToString(tmp["employee_name"]) == "" {
  197. continue
  198. }
  199. job := ExecutivesInvest{
  200. FromCode: util.ObjToString(tmp["company_id1"]),
  201. ToCode: util.ObjToString(tmp["company_id2"]),
  202. Name: util.ObjToString(tmp["employee_name"]),
  203. }
  204. jobChan <- job
  205. }
  206. close(jobChan)
  207. wg.Wait()
  208. log.Println(" dealTsGraph 完成!")
  209. }
  210. //------------------------------------------//
  211. type ExecutivesInvestRelation struct {
  212. FromName string
  213. ToName string
  214. Name string // 边属性
  215. }
  216. func (c *NebulaClient) FindExecutivesInvestRelationsByNames(names []string) ([]ExecutivesInvestRelation, error) {
  217. if len(names) == 0 {
  218. return nil, nil
  219. }
  220. // 1. 企业名称 -> vid
  221. var nameList strings.Builder
  222. for i, name := range names {
  223. nameList.WriteString(fmt.Sprintf("\"%s\"", name))
  224. if i < len(names)-1 {
  225. nameList.WriteString(", ")
  226. }
  227. }
  228. vidQuery := fmt.Sprintf(`
  229. USE %s;
  230. LOOKUP ON Legal WHERE Legal.name IN [%s] YIELD id(vertex) AS vid, Legal.name AS name
  231. `, Table_Space, nameList.String())
  232. resp, err := c.ExecuteWithReconnect(vidQuery)
  233. if err != nil {
  234. return nil, err
  235. }
  236. if !resp.IsSucceed() {
  237. return nil, fmt.Errorf("vid lookup failed: %s", resp.GetErrorMsg())
  238. }
  239. nameToVid := make(map[string]string)
  240. vidToName := make(map[string]string)
  241. rows := resp.GetRows()
  242. for _, row := range rows {
  243. vid := row.Values[0].GetSVal()
  244. name := row.Values[1].GetSVal()
  245. nameToVid[string(name)] = string(vid)
  246. vidToName[string(vid)] = string(name)
  247. }
  248. if len(nameToVid) == 0 {
  249. return nil, nil
  250. }
  251. // 2. 用 vid 查询 ExecutivesInvest 边
  252. var vidList strings.Builder
  253. i := 0
  254. for _, vid := range nameToVid {
  255. vidList.WriteString(fmt.Sprintf("\"%s\"", vid))
  256. if i < len(nameToVid)-1 {
  257. vidList.WriteString(", ")
  258. }
  259. i++
  260. }
  261. edgeQuery := fmt.Sprintf(`
  262. USE %s;
  263. GO FROM %s OVER ExecutivesInvest
  264. YIELD
  265. ExecutivesInvest._src AS from_id,
  266. ExecutivesInvest._dst AS to_id,
  267. ExecutivesInvest.name AS name
  268. `, Table_Space, vidList.String())
  269. resp2, err := c.ExecuteWithReconnect(edgeQuery)
  270. if err != nil {
  271. return nil, err
  272. }
  273. if !resp2.IsSucceed() {
  274. return nil, fmt.Errorf("edge query failed: %s", resp2.GetErrorMsg())
  275. }
  276. // 3. 解析查询结果,筛选起点和终点都在输入列表里的边
  277. vidSet := make(map[string]struct{})
  278. for _, vid := range nameToVid {
  279. vidSet[vid] = struct{}{}
  280. }
  281. existingRelations := make(map[string]struct{})
  282. var relations []ExecutivesInvestRelation
  283. rows2 := resp2.GetRows()
  284. for _, row := range rows2 {
  285. fromVid := row.Values[0].GetSVal()
  286. toVid := row.Values[1].GetSVal()
  287. name := row.Values[2].GetSVal()
  288. if _, ok1 := vidSet[string(fromVid)]; ok1 {
  289. if _, ok2 := vidSet[string(toVid)]; ok2 {
  290. fromName := vidToName[string(fromVid)]
  291. toName := vidToName[string(toVid)]
  292. key := fromName + "->" + toName
  293. if _, exists := existingRelations[key]; exists {
  294. continue
  295. }
  296. existingRelations[key] = struct{}{}
  297. relations = append(relations, ExecutivesInvestRelation{
  298. FromName: fromName,
  299. ToName: toName,
  300. Name: string(name),
  301. })
  302. }
  303. }
  304. }
  305. return relations, nil
  306. }