dongshi.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/dgraph-io/badger/v3"
  6. "go.mongodb.org/mongo-driver/mongo"
  7. "go.mongodb.org/mongo-driver/mongo/options"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "log"
  10. "strings"
  11. "sync"
  12. "unicode/utf8"
  13. )
  14. type Employee struct {
  15. EmployeeName string `bson:"employee_name"`
  16. Position string `bson:"position"`
  17. }
  18. type CompanyD struct {
  19. CreditNo string `bson:"credit_no"`
  20. CompanyID string `bson:"_id"`
  21. UseFlag int `bson:"use_flag"`
  22. CompanyName string `bson:"company_name"`
  23. Employees []Employee `bson:"employees"`
  24. CompanyStatus string `bson:"company_status"`
  25. }
  26. type EmployeeMatch struct {
  27. CreditNo1 string `bson:"credit_no_1"`
  28. CreditNo2 string `bson:"credit_no_2"`
  29. Name1 string `bson:"company_name_1"`
  30. Name2 string `bson:"company_name_2"`
  31. EmployeeName string `bson:"employee_name"`
  32. Position1 string `bson:"position_1"`
  33. Position2 string `bson:"position_2"`
  34. CompanyID1 string `bson:"company_id1"`
  35. CompanyID2 string `bson:"company_id2"`
  36. }
  37. // dealTsDongShi 处理企业 董事关系 到 MongoDB数据库
  38. func dealTsDongShi() {
  39. ctx := context.Background()
  40. username := "SJZY_RWbid_ES"
  41. password := "SJZY@B4i4D5e6S"
  42. hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"}
  43. //hosts := []string{"127.0.0.1:27083"}
  44. optionsSet := map[string]string{}
  45. uri, err := BuildMongoURI(username, password, hosts, optionsSet)
  46. if err != nil {
  47. panic(err)
  48. }
  49. clientOptions := options.Client().ApplyURI(uri)
  50. //clientOptions.SetReadPreference(readpref.Primary())
  51. //clientOptions.SetDirect(true)
  52. // 连接MongoDB
  53. client, err := mongo.Connect(context.Background(), clientOptions)
  54. if err != nil {
  55. log.Println(err)
  56. }
  57. srcColl := client.Database("mixdata").Collection("qyxy_std")
  58. dstColl := client.Database("mixdata").Collection("employee_relation0424")
  59. // 打开 BadgerDB
  60. db, err := badger.Open(badger.DefaultOptions("wcc_employee_cache"))
  61. if err != nil {
  62. log.Fatal(err)
  63. }
  64. defer db.Close()
  65. // 设置并发处理
  66. taskChan := make(chan CompanyD, 10000)
  67. var wg sync.WaitGroup
  68. worker := 10
  69. // 启动 worker 处理任务
  70. for i := 0; i < worker; i++ {
  71. wg.Add(1)
  72. go func() {
  73. defer wg.Done()
  74. for company := range taskChan {
  75. for _, emp := range company.Employees {
  76. name := strings.TrimSpace(emp.EmployeeName)
  77. if name == "" || strings.Contains(name, "无") || strings.Contains(name, "*") {
  78. continue
  79. }
  80. key := "employee_" + name
  81. currentInfo, _ := json.Marshal(map[string]string{
  82. "credit_no": company.CreditNo,
  83. "company_id": company.CompanyID,
  84. "company_name": company.CompanyName,
  85. "position": emp.Position,
  86. })
  87. err := db.View(func(txn *badger.Txn) error {
  88. item, err := txn.Get([]byte(key))
  89. if err == badger.ErrKeyNotFound {
  90. return nil
  91. }
  92. return item.Value(func(val []byte) error {
  93. var prev map[string]string
  94. json.Unmarshal(val, &prev)
  95. if prev["credit_no"] != company.CreditNo {
  96. match := EmployeeMatch{
  97. CreditNo1: prev["credit_no"],
  98. Name1: prev["company_name"],
  99. Position1: prev["position"],
  100. CreditNo2: company.CreditNo,
  101. Name2: company.CompanyName,
  102. Position2: emp.Position,
  103. EmployeeName: name,
  104. CompanyID1: prev["company_id"],
  105. CompanyID2: company.CompanyID,
  106. }
  107. _, err := dstColl.InsertOne(ctx, match)
  108. if err != nil {
  109. log.Println("insert error:", err)
  110. }
  111. }
  112. return nil
  113. })
  114. })
  115. if err != nil {
  116. log.Println("badger read error:", err)
  117. }
  118. // 更新缓存
  119. _ = db.Update(func(txn *badger.Txn) error {
  120. return txn.Set([]byte(key), currentInfo)
  121. })
  122. }
  123. }
  124. }()
  125. }
  126. // 分发任务
  127. // 查询条件
  128. where := map[string]interface{}{
  129. "company_type": map[string]interface{}{
  130. "$ne": "个体工商户",
  131. },
  132. }
  133. cursor, _ := srcColl.Find(ctx, where)
  134. defer cursor.Close(ctx)
  135. count := 0
  136. for cursor.Next(ctx) {
  137. count++
  138. var c CompanyD
  139. if err := cursor.Decode(&c); err != nil {
  140. log.Println("Decode err", err)
  141. continue
  142. }
  143. if count%10000 == 0 {
  144. log.Println("current:", count, c)
  145. }
  146. if c.CreditNo == "" || len(c.Employees) == 0 {
  147. continue
  148. }
  149. if strings.Contains(c.CompanyStatus, "注销") || strings.Contains(c.CompanyStatus, "吊销") {
  150. continue
  151. }
  152. if c.CompanyName == "" || strings.Contains(c.CompanyName, "已除名") {
  153. continue
  154. }
  155. if utf8.RuneCountInString(c.CompanyName) < 5 {
  156. continue
  157. }
  158. if c.UseFlag > 0 {
  159. continue
  160. }
  161. if !strings.Contains(c.CompanyName, "公司") {
  162. continue
  163. }
  164. taskChan <- c
  165. }
  166. close(taskChan)
  167. wg.Wait()
  168. log.Println("done.")
  169. }
  170. // dealDsGraph 处理 董事 高管数据到 graph 图形数据库
  171. func dealDsGraph() {
  172. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  173. if err != nil {
  174. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  175. }
  176. defer pool.Close()
  177. defer session.Release()
  178. //
  179. sess := MgoQy.GetMgoConn()
  180. defer MgoQy.DestoryMongoConn(sess)
  181. it := sess.DB("mixdata").C("wcc_employee_relation0424").Find(nil).Sort("_id").Select(nil).Iter()
  182. jobChan := make(chan ExecutivesInvest, WorkerCount*2)
  183. var wg sync.WaitGroup
  184. // 启动工作协程
  185. for i := 0; i < 5; i++ {
  186. wg.Add(1)
  187. go BatchInsertExecutivesInvestWork(session, &wg, jobChan)
  188. }
  189. count := 0
  190. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  191. if count%10000 == 0 {
  192. log.Println("current:", count, tmp["company_name_1"], tmp["company_name_2"])
  193. }
  194. //
  195. if util.ObjToString(tmp["company_name_1"]) == "" || util.ObjToString(tmp["company_name_2"]) == "" || util.ObjToString(tmp["employee_name"]) == "" {
  196. continue
  197. }
  198. job := ExecutivesInvest{
  199. FromCode: util.ObjToString(tmp["company_id1"]),
  200. ToCode: util.ObjToString(tmp["company_id2"]),
  201. Name: util.ObjToString(tmp["employee_name"]),
  202. }
  203. jobChan <- job
  204. }
  205. close(jobChan)
  206. wg.Wait()
  207. log.Println(" dealTsGraph 完成!")
  208. }