yisi.go 11 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/dgraph-io/badger/v3"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "log"
  11. "net/url"
  12. "regexp"
  13. "strings"
  14. "sync"
  15. "unicode/utf8"
  16. )
  17. var phoneRegexp = regexp.MustCompile(`^((\+?86)?1[3-9]\d{9}$)|(^0\d{2,3}-?\d{7,8}$)`)
  18. var emailRegexp = regexp.MustCompile(`^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$`)
  19. type AnnualReport struct {
  20. OperatorName string `bson:"operator_name"`
  21. CompanyPhone string `bson:"company_phone"`
  22. CompanyEmail string `bson:"company_email"`
  23. }
  24. type Company struct {
  25. CreditNo string `bson:"credit_no"`
  26. CompanyID string `bson:"_id"`
  27. CompanyStatus string `bson:"company_status"`
  28. CompanyName string `bson:"company_name"`
  29. AnnualReports []AnnualReport `bson:"annual_reports"`
  30. UseFlag int `bson:"use_flag"`
  31. }
  32. type SuspectMatch struct {
  33. CreditNo1 string `bson:"credit_no1"`
  34. CreditNo2 string `bson:"credit_no2"`
  35. CompanyID1 string `bson:"company_id1"`
  36. CompanyID2 string `bson:"company_id2"`
  37. Name1 string `bson:"company_name1"`
  38. Name2 string `bson:"company_name2"`
  39. MatchKeys []string `bson:"match_keys"`
  40. MatchValue []string `bson:"match_values"`
  41. }
  42. // BuildMongoURI 构造 MongoDB 连接 URI
  43. func BuildMongoURI(username, password string, hosts []string, options map[string]string) (string, error) {
  44. if len(hosts) == 0 {
  45. return "", fmt.Errorf("hosts cannot be empty")
  46. }
  47. escapedUsername := url.QueryEscape(username)
  48. escapedPassword := url.QueryEscape(password)
  49. hostList := strings.Join(hosts, ",")
  50. var optionStr string
  51. if len(options) > 0 {
  52. query := url.Values{}
  53. for k, v := range options {
  54. query.Set(k, v)
  55. }
  56. optionStr = "?" + query.Encode()
  57. }
  58. return fmt.Sprintf("mongodb://%s:%s@%s%s", escapedUsername, escapedPassword, hostList, optionStr), nil
  59. }
  60. func dealYS() {
  61. ctx := context.Background()
  62. username := "SJZY_RWbid_ES"
  63. password := "SJZY@B4i4D5e6S"
  64. hosts := []string{"172.31.31.202:27081", "172.20.45.128:27080"}
  65. //hosts := []string{"127.0.0.1:27083"}
  66. optionsSet := map[string]string{}
  67. uri, err := BuildMongoURI(username, password, hosts, optionsSet)
  68. if err != nil {
  69. panic(err)
  70. }
  71. clientOptions := options.Client().ApplyURI(uri)
  72. //clientOptions.SetReadPreference(readpref.Primary())
  73. //clientOptions.SetDirect(true)
  74. // 连接MongoDB
  75. client, err := mongo.Connect(context.Background(), clientOptions)
  76. if err != nil {
  77. log.Println(err)
  78. }
  79. srcColl := client.Database("mixdata").Collection("qyxy_std")
  80. dstColl := client.Database("mixdata").Collection("wcc_qyxy_yisi0424")
  81. // BadgerDB
  82. dbCache, _ := badger.Open(badger.DefaultOptions("badgerdb"))
  83. defer dbCache.Close()
  84. worker := 8
  85. taskChan := make(chan Company, 10000)
  86. var wg sync.WaitGroup
  87. // 启动 workers
  88. for i := 0; i < worker; i++ {
  89. wg.Add(1)
  90. go func() {
  91. defer wg.Done()
  92. for c := range taskChan {
  93. processCompany(ctx, c, dbCache, dstColl)
  94. }
  95. }()
  96. }
  97. // 查询条件
  98. where := map[string]interface{}{
  99. "company_type": map[string]interface{}{
  100. "$ne": "个体工商户",
  101. },
  102. }
  103. cursor, err := srcColl.Find(ctx, where)
  104. if err != nil {
  105. log.Println("Find error:", err)
  106. return
  107. }
  108. defer cursor.Close(ctx)
  109. count := 0
  110. for cursor.Next(ctx) {
  111. var c Company
  112. if err := cursor.Decode(&c); err != nil {
  113. continue
  114. }
  115. count++
  116. if count%10000 == 0 {
  117. log.Println("current:", count, c)
  118. }
  119. if strings.Contains(c.CompanyStatus, "注销") || strings.Contains(c.CompanyStatus, "吊销") {
  120. continue
  121. }
  122. if c.CreditNo == "" || len(c.AnnualReports) == 0 {
  123. continue
  124. }
  125. if c.CompanyName == "" || strings.Contains(c.CompanyName, "已除名") {
  126. continue
  127. }
  128. if utf8.RuneCountInString(c.CompanyName) < 5 {
  129. continue
  130. }
  131. if c.UseFlag > 0 {
  132. continue
  133. }
  134. if !strings.Contains(c.CompanyName, "公司") {
  135. continue
  136. }
  137. taskChan <- c
  138. }
  139. if err := cursor.Err(); err != nil {
  140. log.Println("Cursor iteration error:", err)
  141. }
  142. close(taskChan)
  143. wg.Wait()
  144. log.Println("All done.")
  145. }
  146. func processCompany(ctx context.Context, c Company, db *badger.DB, dst *mongo.Collection) {
  147. for _, ar := range c.AnnualReports {
  148. phone := strings.TrimSpace(ar.CompanyPhone)
  149. email := strings.TrimSpace(ar.CompanyEmail)
  150. operator := strings.TrimSpace(ar.OperatorName)
  151. validFields := make([]string, 0)
  152. if isValidPhone(phone) {
  153. validFields = append(validFields, "company_phone")
  154. }
  155. if isValidEmail(email) {
  156. validFields = append(validFields, "company_email")
  157. }
  158. if isValidOperator(operator) {
  159. validFields = append(validFields, "operator_name")
  160. }
  161. if len(validFields) < 2 {
  162. continue
  163. }
  164. fields := map[string]string{
  165. "operator_name": ar.OperatorName,
  166. "company_phone": ar.CompanyPhone,
  167. "company_email": ar.CompanyEmail,
  168. }
  169. validKeys := make([]string, 0)
  170. for k, v := range fields {
  171. if strings.TrimSpace(v) != "" {
  172. validKeys = append(validKeys, k)
  173. }
  174. }
  175. if len(validKeys) < 2 {
  176. continue
  177. }
  178. key := generateKey(ar.OperatorName, ar.CompanyPhone, ar.CompanyEmail)
  179. if key == "" {
  180. continue
  181. }
  182. valMap := map[string]string{
  183. "credit_no": c.CreditNo,
  184. "company_name": c.CompanyName,
  185. "company_id": c.CompanyID,
  186. }
  187. val, _ := json.Marshal(valMap)
  188. // 查缓存
  189. existing := getFromBadger(db, key)
  190. if existing != nil {
  191. var e map[string]string
  192. _ = json.Unmarshal(existing, &e)
  193. if e["credit_no"] != c.CreditNo {
  194. match := SuspectMatch{
  195. CreditNo1: e["credit_no"],
  196. CreditNo2: c.CreditNo,
  197. Name1: e["company_name"],
  198. Name2: c.CompanyName,
  199. CompanyID1: e["company_id"],
  200. CompanyID2: c.CompanyID,
  201. MatchKeys: validKeys,
  202. MatchValue: getMatchValues(fields, validKeys),
  203. }
  204. _, err := dst.InsertOne(ctx, match)
  205. if err != nil {
  206. log.Println("Insert error:", err)
  207. }
  208. }
  209. } else {
  210. putToBadger(db, key, val)
  211. }
  212. }
  213. }
  214. func generateKey(operator, phone, email string) string {
  215. items := []string{}
  216. if strings.TrimSpace(operator) != "" {
  217. items = append(items, "op:"+operator)
  218. }
  219. if strings.TrimSpace(phone) != "" {
  220. items = append(items, "ph:"+phone)
  221. }
  222. if strings.TrimSpace(email) != "" {
  223. items = append(items, "em:"+email)
  224. }
  225. if len(items) < 2 {
  226. return ""
  227. }
  228. return strings.Join(items, "|")
  229. }
  230. func getMatchValues(fields map[string]string, keys []string) []string {
  231. res := []string{}
  232. for _, k := range keys {
  233. res = append(res, fields[k])
  234. }
  235. return res
  236. }
  237. func getFromBadger(db *badger.DB, key string) []byte {
  238. var val []byte
  239. err := db.View(func(txn *badger.Txn) error {
  240. item, err := txn.Get([]byte(key))
  241. if err == nil {
  242. val, _ = item.ValueCopy(nil)
  243. }
  244. return nil
  245. })
  246. if err != nil {
  247. return nil
  248. }
  249. return val
  250. }
  251. func putToBadger(db *badger.DB, key string, val []byte) {
  252. _ = db.Update(func(txn *badger.Txn) error {
  253. return txn.Set([]byte(key), val)
  254. })
  255. }
  256. func isValidPhone(phone string) bool {
  257. return phoneRegexp.MatchString(strings.TrimSpace(phone))
  258. }
  259. func isValidEmail(email string) bool {
  260. return emailRegexp.MatchString(strings.TrimSpace(email))
  261. }
  262. func isValidOperator(op string) bool {
  263. op = strings.TrimSpace(op)
  264. if op == "" || strings.Contains(op, "无") || strings.Contains(op, "*") {
  265. return false
  266. }
  267. return true
  268. }
  269. // dealTsGraph 处理疑似数据到图形数据库
  270. func dealTsGraph() {
  271. session, pool, err := ConnectToNebula(HostList, UserName, PassWord)
  272. if err != nil {
  273. log.Fatalf("Failed to connect to Nebula Graph: %v", err)
  274. }
  275. defer pool.Close()
  276. defer session.Release()
  277. //
  278. sess := MgoQy.GetMgoConn()
  279. defer MgoQy.DestoryMongoConn(sess)
  280. it := sess.DB("mixdata").C("wcc_qyxy_yisi0424").Find(nil).Sort("_id").Select(nil).Iter()
  281. jobChan := make(chan InsertSuspectJob, WorkerCount*2)
  282. var wg sync.WaitGroup
  283. // 启动工作协程
  284. for i := 0; i < 5; i++ {
  285. wg.Add(1)
  286. go BatchInsertSuspectInvestWork(session, &wg, jobChan)
  287. }
  288. count := 0
  289. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  290. if count%10000 == 0 {
  291. log.Println("current:", count, tmp["company_name1"], tmp["company_name2"])
  292. }
  293. //
  294. cname1 := util.ObjToString(tmp["company_name1"])
  295. cname2 := util.ObjToString(tmp["company_name2"])
  296. if (strings.Contains(cname1, "营业厅") && strings.Contains(cname2, "营业厅")) || (strings.Contains(cname1, "分公司") || strings.Contains(cname2, "分公司")) {
  297. continue
  298. }
  299. // 判断是否是同样的前缀,只是分公司、营业厅等不同
  300. if IsSameCompanySubject(cname1, cname2) {
  301. continue
  302. }
  303. //
  304. job := InsertSuspectJob{
  305. Relations: SuspectInvest{
  306. FromCode: util.ObjToString(tmp["company_id1"]),
  307. ToCode: util.ObjToString(tmp["company_id2"]),
  308. },
  309. }
  310. //拼接 疑似的原因
  311. result, err := concatMatchPairs(tmp)
  312. if err != nil {
  313. log.Println("concatMatchPairs Error:", err, result)
  314. continue
  315. }
  316. if result == "" {
  317. continue
  318. }
  319. job.Relations.Reason = result
  320. jobChan <- job
  321. }
  322. close(jobChan)
  323. wg.Wait()
  324. log.Println(" dealTsGraph 完成!")
  325. }
  326. // IsSameCompanySubject 判断两个公司名是否主体一致,仅结尾不同
  327. func IsSameCompanySubject(name1, name2 string) bool {
  328. name1 = strings.TrimSpace(name1)
  329. name2 = strings.TrimSpace(name2)
  330. // 找出最小长度
  331. minLen := min(utf8.RuneCountInString(name1), utf8.RuneCountInString(name2))
  332. // 获取最长公共前缀
  333. commonPrefix := longestCommonPrefix(name1, name2)
  334. // 阈值判断(比如公共前缀至少占短字符串的 80%)
  335. if float64(utf8.RuneCountInString(commonPrefix)) >= float64(minLen)*0.6 {
  336. return true
  337. //// 且公共前缀包含“有限公司”这种关键词
  338. //if strings.Contains(commonPrefix, "有限公司") || strings.Contains(commonPrefix, "公司") {
  339. // return true
  340. //}
  341. }
  342. return false
  343. }
  344. // longestCommonPrefix 获取最长公共前缀
  345. func longestCommonPrefix(s1, s2 string) string {
  346. var builder strings.Builder
  347. r1 := []rune(s1)
  348. r2 := []rune(s2)
  349. for i := 0; i < min(len(r1), len(r2)); i++ {
  350. if r1[i] == r2[i] {
  351. builder.WriteRune(r1[i])
  352. } else {
  353. break
  354. }
  355. }
  356. return builder.String()
  357. }
  358. // ------------------------------------//
  359. // 提取 map 中指定键的字符串数组
  360. func extractStringSlice(tmp map[string]interface{}, key string) ([]string, error) {
  361. interfaceSlice, ok := tmp[key].([]interface{})
  362. if !ok {
  363. return nil, fmt.Errorf("%s is not a slice", key)
  364. }
  365. stringSlice := make([]string, 0, len(interfaceSlice))
  366. for _, item := range interfaceSlice {
  367. str, ok := item.(string)
  368. if !ok {
  369. return nil, fmt.Errorf("%s contains non-string element", key)
  370. }
  371. stringSlice = append(stringSlice, str)
  372. }
  373. return stringSlice, nil
  374. }
  375. // 拼接键值对字符串
  376. func concatMatchPairs(tmp map[string]interface{}) (string, error) {
  377. // 提取 match_keys 和 match_values
  378. keys, err := extractStringSlice(tmp, "match_keys")
  379. if err != nil {
  380. return "", fmt.Errorf("failed to get match_keys: %v", err)
  381. }
  382. values, err := extractStringSlice(tmp, "match_values")
  383. if err != nil {
  384. return "", fmt.Errorf("failed to get match_values: %v", err)
  385. }
  386. // 检查长度是否一致
  387. if len(keys) != len(values) {
  388. return "", fmt.Errorf("length mismatch: match_keys (%d) vs match_values (%d)", len(keys), len(values))
  389. }
  390. // 拼接字符串
  391. var builder strings.Builder
  392. for i := 0; i < len(keys); i++ {
  393. if i > 0 {
  394. builder.WriteString(", ")
  395. }
  396. builder.WriteString(keys[i])
  397. builder.WriteString(":")
  398. builder.WriteString(values[i])
  399. }
  400. return builder.String(), nil
  401. }