181.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/xuri/excelize/v2"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/mongo"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "net/url"
  13. "strings"
  14. "time"
  15. )
  16. func get181Data() {
  17. //181 凭安库
  18. MgoQY := &mongodb.MongodbSim{
  19. MongodbAddr: "172.17.4.181:27001",
  20. //MongodbAddr: "127.0.0.1:27001",
  21. DbName: "mixdata",
  22. Size: 10,
  23. UserName: "",
  24. Password: "",
  25. //Direct: true,
  26. }
  27. MgoQY.InitPool()
  28. sess := MgoQY.GetMgoConn()
  29. defer MgoQY.DestoryMongoConn(sess)
  30. where := map[string]interface{}{
  31. "use_flag": 0,
  32. "company_status": map[string]interface{}{
  33. "$nin": []string{"注销", "吊销", "吊销,已注销"},
  34. },
  35. }
  36. selected := map[string]interface{}{
  37. "company_name": 1,
  38. "company_status": 1,
  39. "company_type": 1,
  40. "use_flag": 1,
  41. }
  42. query := sess.DB("mixdata").C("company_base").Find(where).Select(selected).Iter()
  43. count := 0
  44. typeCount := make(map[string]int)
  45. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  46. if count%10000 == 0 {
  47. log.Println("current:", count, tmp["company_name"], tmp["company_status"], tmp["company_type"], len(typeCount))
  48. }
  49. if util.ObjToString(tmp["company_name"]) == "" || util.ObjToString(tmp["company_type"]) == "" {
  50. continue
  51. }
  52. company_status := util.ObjToString(tmp["company_status"])
  53. if strings.Contains(company_status, "注销") || strings.Contains(company_status, "吊销") {
  54. continue
  55. }
  56. company_type := util.ObjToString(tmp["company_type"])
  57. typeCount[company_type]++
  58. }
  59. log.Println("len types", len(typeCount))
  60. for k, v := range typeCount {
  61. sa := map[string]interface{}{
  62. "type": k,
  63. "count": v,
  64. }
  65. MgoQY.Save("wcc_company_type_static_0712", sa)
  66. }
  67. log.Println("get181Data --------- over ")
  68. }
  69. func exportByTraverseAndCount() {
  70. ctx := context.Background()
  71. username := ""
  72. password := ""
  73. //hosts := []string{"172.17.4.181:27001"}
  74. hosts := []string{"172.17.4.181:27001"}
  75. uri, err := BuildMongoURI(username, password, hosts, nil)
  76. if err != nil {
  77. panic(err)
  78. }
  79. clientOptions := options.Client().ApplyURI(uri)
  80. client, err := mongo.Connect(ctx, clientOptions)
  81. if err != nil {
  82. log.Fatal(err)
  83. }
  84. defer client.Disconnect(ctx)
  85. collection := client.Database("mixdata").Collection("company_base")
  86. // 查询条件
  87. filter := bson.D{
  88. {"use_flag", 0},
  89. {"company_status", bson.D{{"$nin", bson.A{"注销", "吊销", "吊销,已注销"}}}},
  90. }
  91. // 只查询 company_type 字段
  92. opts := options.Find().SetProjection(bson.D{{"company_type", 1}})
  93. // 创建游标
  94. cursor, err := collection.Find(ctx, filter, opts)
  95. if err != nil {
  96. log.Fatalf("查询失败: %v", err)
  97. }
  98. defer cursor.Close(ctx)
  99. // 用 map 统计
  100. typeCount := make(map[string]int64)
  101. count := 0
  102. // 遍历
  103. for cursor.Next(ctx) {
  104. var doc struct {
  105. CompanyType string `bson:"company_type"`
  106. }
  107. count++
  108. if count%10000 == 0 {
  109. log.Println("current:", count)
  110. }
  111. if err := cursor.Decode(&doc); err != nil {
  112. log.Printf("解码失败: %v", err)
  113. continue
  114. }
  115. if doc.CompanyType == "" {
  116. continue
  117. }
  118. typeCount[doc.CompanyType]++
  119. }
  120. if err := cursor.Err(); err != nil {
  121. log.Fatalf("遍历出错: %v", err)
  122. }
  123. // 写 Excel
  124. f := excelize.NewFile()
  125. sheet := "TypeCounts"
  126. f.SetCellValue(sheet, "A1", "company_type")
  127. f.SetCellValue(sheet, "B1", "count")
  128. row := 2
  129. for companyType, count := range typeCount {
  130. f.SetCellValue(sheet, fmt.Sprintf("A%d", row), companyType)
  131. f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count)
  132. row++
  133. }
  134. filename := fmt.Sprintf("company_type_counts_traverse_%d.xlsx", time.Now().Unix())
  135. if err := f.SaveAs(filename); err != nil {
  136. log.Fatalf("保存 Excel 失败: %v", err)
  137. }
  138. fmt.Println("\n导出完成,文件名:", filename)
  139. }
  140. func exportCompanyType2() {
  141. ctx := context.Background()
  142. username := ""
  143. password := ""
  144. hosts := []string{"172.17.4.181:27001"}
  145. uri, err := BuildMongoURI(username, password, hosts, nil)
  146. if err != nil {
  147. panic(err)
  148. }
  149. clientOptions := options.Client().ApplyURI(uri)
  150. client, err := mongo.Connect(ctx, clientOptions)
  151. if err != nil {
  152. log.Fatal(err)
  153. }
  154. defer client.Disconnect(ctx)
  155. db := client.Database("mixdata")
  156. collection := db.Collection("company_base")
  157. // 公共 filter:use_flag=0 且 company_status != 注销 且 company_status != 吊销
  158. filter := bson.M{
  159. "use_flag": 0,
  160. "company_status": bson.M{
  161. "$nin": []string{"注销", "吊销", "吊销,已注销"},
  162. },
  163. }
  164. // 第一步:distinct 不重复的 company_type
  165. types, err := collection.Distinct(ctx, "company_type", filter)
  166. if err != nil {
  167. log.Fatalf("查询 distinct 失败: %v", err)
  168. }
  169. fmt.Printf("共找到 %d 个不同的 company_type\n", len(types))
  170. // 创建 Excel 文件
  171. f := excelize.NewFile()
  172. sheet := "TypeCounts"
  173. f.SetCellValue(sheet, "A1", "company_type")
  174. f.SetCellValue(sheet, "B1", "count")
  175. // 第二步:对每个类型 countDocuments,也带上相同 filter
  176. row := 2
  177. for _, t := range types {
  178. strType, ok := t.(string)
  179. if !ok {
  180. continue
  181. }
  182. // 在 filter 上再加一个 company_type 条件
  183. typeFilter := bson.M{
  184. "use_flag": 0,
  185. "company_status": bson.M{
  186. "$nin": []string{"注销", "吊销", "吊销,已注销"},
  187. },
  188. "company_type": strType,
  189. }
  190. count, err := collection.CountDocuments(ctx, typeFilter)
  191. if err != nil {
  192. log.Printf("统计 %s 出错: %v", strType, err)
  193. continue
  194. }
  195. log.Println(strType, count)
  196. // 写到 Excel
  197. f.SetCellValue(sheet, fmt.Sprintf("A%d", row), strType)
  198. f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count)
  199. row++
  200. }
  201. // 保存 Excel
  202. if err := f.SaveAs("company_type_counts_filtered.xlsx"); err != nil {
  203. log.Fatalf("保存 Excel 失败: %v", err)
  204. }
  205. fmt.Println("导出完成,文件名:company_type_counts_filtered.xlsx")
  206. }
  207. func exportCompanyType() {
  208. ctx := context.Background()
  209. username := ""
  210. password := ""
  211. //hosts := []string{"127.0.0.1:27001"}
  212. hosts := []string{"172.17.4.181:27001"}
  213. uri, err := BuildMongoURI(username, password, hosts, nil)
  214. if err != nil {
  215. panic(err)
  216. }
  217. clientOptions := options.Client().ApplyURI(uri)
  218. //clientOptions.SetDirect(true) // 如果需要 direct
  219. client, err := mongo.Connect(ctx, clientOptions)
  220. if err != nil {
  221. log.Fatal(err)
  222. }
  223. defer client.Disconnect(ctx)
  224. db := client.Database("mixdata")
  225. collection := db.Collection("company_base")
  226. // 第一步:distinct 不重复的 company_type
  227. types, err := collection.Distinct(ctx, "company_type", bson.D{})
  228. if err != nil {
  229. log.Fatalf("查询 distinct 失败: %v", err)
  230. }
  231. fmt.Printf("共找到 %d 个不同的 company_type\n", len(types))
  232. // 创建 Excel 文件
  233. f := excelize.NewFile()
  234. sheet := "TypeCounts"
  235. f.SetCellValue(sheet, "A1", "company_type")
  236. f.SetCellValue(sheet, "B1", "count")
  237. // 第二步:对每个类型 countDocuments
  238. row := 2
  239. for _, t := range types {
  240. strType, ok := t.(string)
  241. if !ok {
  242. continue
  243. }
  244. count, err := collection.CountDocuments(ctx, bson.M{"company_type": strType})
  245. if err != nil {
  246. log.Printf("统计 %s 出错: %v", strType, err)
  247. continue
  248. }
  249. log.Println(strType, count)
  250. // 写到 Excel
  251. f.SetCellValue(sheet, fmt.Sprintf("A%d", row), strType)
  252. f.SetCellValue(sheet, fmt.Sprintf("B%d", row), count)
  253. row++
  254. }
  255. // 保存 Excel
  256. if err := f.SaveAs("company_type_counts_fast.xlsx"); err != nil {
  257. log.Fatalf("保存 Excel 失败: %v", err)
  258. }
  259. fmt.Println("导出完成,文件名:company_type_counts_fast.xlsx")
  260. }
  261. type Enterprise struct {
  262. CompanyName string `bson:"company_name"`
  263. CompanyType string `bson:"company_type"`
  264. CreditNo string `bson:"credit_no"`
  265. Organizer string `bson:"organizer"`
  266. UseFlag float64 `bson:"use_flag"`
  267. RuleName string `bson:"rule_name"` // Excel B列内容
  268. }
  269. // matchSpecialEnterprise 匹配特殊企业
  270. func matchSpecialEnterprise() {
  271. ctx := context.Background()
  272. /**
  273. MgoQY := &mongodb.MongodbSim{
  274. MongodbAddr: "172.17.4.181:27001",
  275. //MongodbAddr: "127.0.0.1:27001",
  276. DbName: "mixdata",
  277. Size: 10,
  278. UserName: "",
  279. Password: "",
  280. //Direct: true,
  281. }
  282. MgoQY.InitPool()
  283. */
  284. username := ""
  285. password := ""
  286. hosts := []string{"172.17.4.181:27001"}
  287. uri, err := BuildMongoURI(username, password, hosts, nil)
  288. if err != nil {
  289. panic(err)
  290. }
  291. // MongoDB连接
  292. client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
  293. if err != nil {
  294. log.Fatal(err)
  295. }
  296. defer client.Disconnect(ctx)
  297. db := client.Database("mixdata")
  298. sourceCol := db.Collection("special_enterprise")
  299. targetCol := db.Collection("special_enterprise_temp_03")
  300. // 读取Excel
  301. f, err := excelize.OpenFile("./政府、学校、医院、银行等.xlsx")
  302. if err != nil {
  303. log.Fatal(err)
  304. }
  305. defer f.Close()
  306. rows, err := f.GetRows("Sheet1")
  307. if err != nil {
  308. log.Fatal(err)
  309. }
  310. var ruleNames []string
  311. for idx, row := range rows {
  312. if idx == 0 {
  313. continue // 跳过标题
  314. }
  315. if len(row) > 1 {
  316. ruleNames = append(ruleNames, strings.TrimSpace(row[1]))
  317. }
  318. }
  319. log.Printf("读取到 %d 条规则", len(ruleNames))
  320. // 从MongoDB一次性读出 use_flag=0 的数据
  321. cursor, err := sourceCol.Find(ctx, bson.M{"use_flag": 0.0})
  322. if err != nil {
  323. log.Fatal("查询Mongo失败:", err)
  324. }
  325. defer cursor.Close(ctx)
  326. var allEnterprises []Enterprise
  327. for cursor.Next(ctx) {
  328. var doc bson.M
  329. if err := cursor.Decode(&doc); err != nil {
  330. log.Println("解码失败:", err)
  331. continue
  332. }
  333. ent := Enterprise{
  334. CompanyName: getString(doc, "company_name"),
  335. CompanyType: getString(doc, "company_type"),
  336. CreditNo: getString(doc, "credit_no"),
  337. Organizer: getString(doc, "organizer"),
  338. UseFlag: getFloat64(doc, "use_flag"),
  339. }
  340. allEnterprises = append(allEnterprises, ent)
  341. }
  342. log.Printf("从MongoDB读取到 %d 条数据", len(allEnterprises))
  343. var matched []interface{}
  344. for idx, ent := range allEnterprises {
  345. if idx%10000 == 0 {
  346. log.Println("当前匹配企业", idx, ent.CompanyName, "还剩余", len(allEnterprises)-idx)
  347. }
  348. if ent.Organizer == "" {
  349. continue
  350. }
  351. for _, rule := range ruleNames {
  352. if rule == "" {
  353. continue
  354. }
  355. if strings.Contains(ent.Organizer, rule) || strings.Contains(rule, ent.Organizer) {
  356. newEnt := ent
  357. newEnt.RuleName = rule
  358. matched = append(matched, newEnt)
  359. break // 匹配到一个就退出,继续下一个企业
  360. }
  361. }
  362. }
  363. log.Printf("匹配到 %d 条记录,准备写入临时表", len(matched))
  364. // 分批写入 MongoDB
  365. const batchSize = 1000
  366. for i := 0; i < len(matched); i += batchSize {
  367. end := i + batchSize
  368. if end > len(matched) {
  369. end = len(matched)
  370. }
  371. _, err := targetCol.InsertMany(ctx, matched[i:end])
  372. if err != nil {
  373. log.Println("插入失败:", err)
  374. }
  375. }
  376. log.Println("全部完成!")
  377. }
  378. // 工具函数:安全获取字符串
  379. func getString(m bson.M, key string) string {
  380. if val, ok := m[key]; ok {
  381. if s, ok := val.(string); ok {
  382. return s
  383. }
  384. }
  385. return ""
  386. }
  387. // 工具函数:安全获取float64
  388. func getFloat64(m bson.M, key string) float64 {
  389. if val, ok := m[key]; ok {
  390. switch v := val.(type) {
  391. case float64:
  392. return v
  393. case int32:
  394. return float64(v)
  395. case int64:
  396. return float64(v)
  397. }
  398. }
  399. return 0
  400. }
  401. func BuildMongoURI(username, password string, hosts []string, options map[string]string) (string, error) {
  402. if len(hosts) == 0 {
  403. return "", fmt.Errorf("hosts cannot be empty")
  404. }
  405. hostList := strings.Join(hosts, ",")
  406. var authPart string
  407. if username != "" {
  408. escapedUsername := url.QueryEscape(username)
  409. escapedPassword := url.QueryEscape(password)
  410. authPart = fmt.Sprintf("%s:%s@", escapedUsername, escapedPassword)
  411. // 如果密码为空,也会拼成 username:@host ,MongoDB URI 是支持的,可以保留
  412. }
  413. var optionStr string
  414. if len(options) > 0 {
  415. query := url.Values{}
  416. for k, v := range options {
  417. query.Set(k, v)
  418. }
  419. optionStr = "?" + query.Encode()
  420. }
  421. return fmt.Sprintf("mongodb://%s%s%s", authPart, hostList, optionStr), nil
  422. }