bank_poc.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/xuri/excelize/v2"
  5. "go.mongodb.org/mongo-driver/bson"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  8. "log"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "unicode/utf8"
  13. )
  14. // getCompany 获取企业
  15. func getCompany() {
  16. Mgo := &mongodb.MongodbSim{
  17. MongodbAddr: "172.17.189.140:27080",
  18. //MongodbAddr: "127.0.0.1:27083",
  19. DbName: "mixdata",
  20. Size: 10,
  21. UserName: "SJZY_RWbid_ES",
  22. Password: "SJZY@B4i4D5e6S",
  23. //Direct: true,
  24. }
  25. Mgo.InitPool()
  26. where := map[string]interface{}{
  27. "company_area": map[string]interface{}{
  28. "$in": []string{"北京", "上海", "浙江", "江苏", "广东"},
  29. },
  30. //"company_status": "存续",
  31. }
  32. defer util.Catch()
  33. sess := Mgo.GetMgoConn()
  34. defer Mgo.DestoryMongoConn(sess)
  35. //pool := make(chan bool, 10) //处理协程
  36. //wg := &sync.WaitGroup{}
  37. selected := map[string]interface{}{"company_name": 1, "company_area": 1, "credit_no": 1, "company_status": 1, "company_city": 1}
  38. it := sess.DB("mixdata").C("qyxy_std").Find(&where).Select(&selected).Iter()
  39. log.Println("开始")
  40. count := 0
  41. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  42. if count%10000 == 0 {
  43. log.Println("current: ", count)
  44. }
  45. if util.ObjToString(tmp["credit_no"]) == "" {
  46. continue
  47. }
  48. Mgo.SaveByOriID("wcc_bank_company", tmp)
  49. tmp = make(map[string]interface{})
  50. }
  51. log.Println("结束")
  52. }
  53. // bankPOC bankPOC
  54. func bankPOC() {
  55. Mgo := &mongodb.MongodbSim{
  56. MongodbAddr: "172.17.189.140:27080",
  57. //MongodbAddr: "127.0.0.1:27083",
  58. DbName: "qfw",
  59. Size: 10,
  60. UserName: "SJZY_RWbid_ES",
  61. Password: "SJZY@B4i4D5e6S",
  62. //Direct: true,
  63. }
  64. Mgo.InitPool()
  65. MgoC := &mongodb.MongodbSim{
  66. MongodbAddr: "172.17.189.140:27080",
  67. //MongodbAddr: "127.0.0.1:27083",
  68. DbName: "mixdata",
  69. Size: 10,
  70. UserName: "SJZY_RWbid_ES",
  71. Password: "SJZY@B4i4D5e6S",
  72. //Direct: true,
  73. }
  74. MgoC.InitPool()
  75. defer util.Catch()
  76. sess := Mgo.GetMgoConn()
  77. defer Mgo.DestoryMongoConn(sess)
  78. pool := make(chan bool, 20) //处理协程
  79. wg := &sync.WaitGroup{}
  80. //查询条件
  81. // 设置查询条件
  82. filter := bson.D{
  83. {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
  84. {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
  85. }
  86. selected := map[string]interface{}{
  87. "contenthtml": 0, // 0表示不返回该字段
  88. "attach_text": 0, // 0表示不返回该字段
  89. "detail": 0, // 0表示不返回该字段
  90. "purchasingsource": 0, // 0表示不返回该字段
  91. "jsondata": 0, // 0表示不返回该字段
  92. "package": 0, // 0表示不返回该字段
  93. }
  94. it := sess.DB("qfw").C("bidding").Find(&filter).Select(&selected).Iter()
  95. //total, _ := sess.DB("qfw").C("bidding").Find(filter).Count()
  96. //fmt.Println("开始", "总数是:", total)
  97. count := 0
  98. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  99. if count%10000 == 0 {
  100. log.Println("CURRENT :", count)
  101. }
  102. pool <- true
  103. wg.Add(1)
  104. go func(tmp map[string]interface{}) {
  105. defer func() {
  106. <-pool
  107. wg.Done()
  108. }()
  109. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  110. tmp = make(map[string]interface{})
  111. return
  112. }
  113. // 针对存量数据,重复数据不进索引
  114. if util.IntAll(tmp["extracttype"]) == -1 {
  115. return
  116. }
  117. projectName := util.ObjToString(tmp["projectname"])
  118. if strings.Contains(projectName, "非政府") {
  119. return
  120. }
  121. buyerclass := util.ObjToString(tmp["buyerclass"])
  122. if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  123. return
  124. }
  125. swinner := util.ObjToString(tmp["s_winner"])
  126. if swinner == "" {
  127. return
  128. }
  129. if utf8.RuneCountInString(swinner) < 4 {
  130. return
  131. }
  132. if strings.Contains(swinner, ",") {
  133. winners := strings.Split(swinner, ",")
  134. for _, v := range winners {
  135. if utf8.RuneCountInString(v) < 4 {
  136. continue
  137. } else {
  138. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": v})
  139. if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
  140. return
  141. }
  142. area := util.ObjToString((*da)["company_area"])
  143. areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  144. if !IsInStringArray(area, areas) {
  145. continue
  146. }
  147. if !IsInStringArray(area, areas) {
  148. continue
  149. }
  150. insert := map[string]interface{}{
  151. "winner": v,
  152. "credit_no": (*da)["credit_no"],
  153. "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
  154. "projectname": projectName,
  155. "company_type": (*da)["company_type"],
  156. "company_status": (*da)["company_status"],
  157. "company_area": (*da)["company_area"],
  158. "company_city": (*da)["company_city"],
  159. }
  160. Mgo.Save("wcc_bank_winner_new", insert)
  161. }
  162. }
  163. } else {
  164. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": swinner})
  165. if da == nil || (*da)["credit_no"] == nil || util.ObjToString((*da)["credit_no"]) == "" {
  166. return
  167. }
  168. area := util.ObjToString((*da)["company_area"])
  169. areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  170. if !IsInStringArray(area, areas) {
  171. return
  172. }
  173. if !IsInStringArray(area, areas) {
  174. return
  175. }
  176. insert := map[string]interface{}{
  177. "winner": swinner,
  178. "credit_no": (*da)["credit_no"],
  179. "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
  180. "projectname": projectName,
  181. }
  182. Mgo.Save("wcc_bank_winner", insert)
  183. }
  184. }(tmp)
  185. tmp = make(map[string]interface{})
  186. }
  187. wg.Wait()
  188. fmt.Println("结束")
  189. }
  190. // bankWinnerStatistic 统计企业信息;企业出现数量
  191. func bankWinnerStatistic() {
  192. Mgo := &mongodb.MongodbSim{
  193. MongodbAddr: "172.17.189.140:27080",
  194. //MongodbAddr: "127.0.0.1:27083",
  195. DbName: "qfw",
  196. Size: 10,
  197. UserName: "SJZY_RWbid_ES",
  198. Password: "SJZY@B4i4D5e6S",
  199. //Direct: true,
  200. }
  201. Mgo.InitPool()
  202. MgoC := &mongodb.MongodbSim{
  203. MongodbAddr: "172.17.189.140:27080",
  204. //MongodbAddr: "127.0.0.1:27083",
  205. DbName: "mixdata",
  206. Size: 10,
  207. UserName: "SJZY_RWbid_ES",
  208. Password: "SJZY@B4i4D5e6S",
  209. //Direct: true,
  210. }
  211. MgoC.InitPool()
  212. defer util.Catch()
  213. sess := Mgo.GetMgoConn()
  214. defer Mgo.DestoryMongoConn(sess)
  215. it := sess.DB("qfw").C("wcc_bank_winner").Find(nil).Select(nil).Iter()
  216. statisticMap := make(map[string]int)
  217. noMap := make(map[string]interface{})
  218. count := 0
  219. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  220. if count%10000 == 0 {
  221. log.Println("current:", count)
  222. }
  223. winner := util.ObjToString(tmp["winner"])
  224. da, _ := MgoC.FindOne("qyxy_std", map[string]interface{}{"company_name": winner})
  225. if util.ObjToString((*da)["company_type"]) == "个体工商户" {
  226. continue
  227. }
  228. statisticMap[winner]++
  229. noMap[winner] = tmp["credit_no"]
  230. }
  231. //
  232. log.Println("开始保存")
  233. for k, v := range statisticMap {
  234. insert := map[string]interface{}{
  235. "winner": k,
  236. "num": v,
  237. "credit_no": noMap[k],
  238. }
  239. Mgo.Save("wcc_bank_winner_statistic_company", insert)
  240. }
  241. log.Println("保存结束")
  242. }
  243. // getPengWinner 根据 龚文华需求, 碰撞提供的企业名单,获取相关标讯信息
  244. func getPengWinner() {
  245. Mgo := &mongodb.MongodbSim{
  246. //MongodbAddr: "172.17.189.140:27080",
  247. MongodbAddr: "127.0.0.1:27083",
  248. DbName: "qfw",
  249. Size: 10,
  250. UserName: "SJZY_RWbid_ES",
  251. Password: "SJZY@B4i4D5e6S",
  252. Direct: true,
  253. }
  254. Mgo.InitPool()
  255. f, err := excelize.OpenFile("政采测试100户名单0118.xlsx")
  256. if err != nil {
  257. fmt.Println(err)
  258. return
  259. }
  260. defer func() {
  261. f.Save()
  262. if err := f.Close(); err != nil {
  263. fmt.Println(err)
  264. }
  265. }()
  266. // 获取 Sheet1 上所有单元格
  267. rows, err := f.GetRows("Sheet1")
  268. if err != nil {
  269. fmt.Println(err)
  270. return
  271. }
  272. for i := 1; i < len(rows); i++ {
  273. name := rows[i][0]
  274. where := map[string]interface{}{
  275. "winner": name,
  276. }
  277. log.Println(name)
  278. datas, _ := Mgo.Find("wcc_bank_winner", where, nil, nil, false, -1, -1)
  279. //没有找到中标单位 标讯信息
  280. if len(*datas) == 0 {
  281. f.SetCellValue("Sheet1", fmt.Sprintf("C%v", i+1), "无")
  282. } else {
  283. f.SetCellValue("Sheet1", fmt.Sprintf("C%v", i+1), "有")
  284. }
  285. //if len(*datas) > 0 {
  286. // for k, _ := range *datas {
  287. // biddingID := util.ObjToString((*datas)[k]["bidding_id"])
  288. // biddingData, _ := Mgo.FindById("bidding", biddingID, nil)
  289. // //fmt.Println(biddingData)
  290. // Mgo.SaveByOriID("wcc_bank_winner_bidding", biddingData)
  291. // }
  292. //}
  293. }
  294. log.Println("over")
  295. }
  296. /**
  297. // exportPocN 导出合作渠道银行POC需求;龚文华 需求;函数已作废
  298. func exportPocN() {
  299. //InitMgo()
  300. username := "SJZY_RWbid_ES"
  301. password := "SJZY@B4i4D5e6S"
  302. addr := "172.17.189.140:27080"
  303. //addr := "127.0.0.1:27083"
  304. direct := true
  305. if !strings.Contains(addr, "127") {
  306. direct = false
  307. }
  308. // Escape special characters in username and password
  309. escapedUsername := url.QueryEscape(username)
  310. escapedPassword := url.QueryEscape(password)
  311. // Construct MongoDB connection string
  312. urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr)
  313. clientOptions := options.Client().ApplyURI(urls).SetDirect(direct)
  314. //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083")
  315. // 连接到MongoDB
  316. client, err := mongo.Connect(context.TODO(), clientOptions)
  317. if err != nil {
  318. log.Fatal(err)
  319. }
  320. defer func() {
  321. if err := client.Disconnect(context.TODO()); err != nil {
  322. log.Fatal(err)
  323. }
  324. }()
  325. // 获取要查询的集合
  326. sourceCollection := client.Database("qfw").Collection("bidding")
  327. //写入新表
  328. targetCollection := client.Database("qfw").Collection("wcc_bank_poc_new")
  329. // 设置查询条件
  330. filter := bson.D{
  331. {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
  332. {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
  333. }
  334. // 设置投影,排除 contenthtml 字段
  335. projection := bson.D{
  336. {"contenthtml", 0}, // 0表示不返回该字段
  337. {"attach_text", 0}, // 0表示不返回该字段
  338. {"detail", 0}, // 0表示不返回该字段
  339. {"purchasingsource", 0}, // 0表示不返回该字段
  340. {"jsondata", 0}, // 0表示不返回该字段
  341. {"package", 0}, // 0表示不返回该字段
  342. }
  343. // 获取查询结果的总文档数
  344. totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil)
  345. if err != nil {
  346. log.Fatal(err)
  347. }
  348. log.Println("总数量:", totalCount)
  349. findOptions := options.Find().SetProjection(projection)
  350. // 执行查询
  351. cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions)
  352. if err != nil {
  353. log.Fatal(err)
  354. }
  355. defer cursor.Close(context.TODO())
  356. count := 0
  357. // 迭代查询结果
  358. for cursor.Next(context.TODO()) {
  359. var result = make(map[string]interface{})
  360. if err := cursor.Decode(&result); err != nil {
  361. log.Fatal(err)
  362. }
  363. count++
  364. if count%10000 == 0 {
  365. log.Println("current count", count)
  366. }
  367. //过滤重复数据
  368. if util.IntAll(result["extracttype"]) != 1 {
  369. continue
  370. }
  371. // 处理查询结果
  372. area := util.ObjToString(result["area"])
  373. areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  374. if !IsInStringArray(area, areas) {
  375. continue
  376. }
  377. projectName := util.ObjToString(result["projectname"])
  378. if strings.Contains(projectName, "非政府") {
  379. continue
  380. }
  381. buyerclass := util.ObjToString(result["buyerclass"])
  382. if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  383. continue
  384. }
  385. //存入新表
  386. _, err := targetCollection.InsertOne(context.TODO(), result)
  387. if err != nil {
  388. log.Fatal(err)
  389. }
  390. if err := cursor.Err(); err != nil {
  391. log.Fatal(err)
  392. }
  393. }
  394. log.Println("over")
  395. }
  396. // staticBank 统计中标单位中标项目数量;函数已作废
  397. func staticBank() {
  398. username := "SJZY_RWbid_ES"
  399. password := "SJZY@B4i4D5e6S"
  400. addr := "172.17.189.140:27080"
  401. //addr := "127.0.0.1:27083"
  402. direct := true
  403. if !strings.Contains(addr, "127") {
  404. direct = false
  405. }
  406. escapedUsername := url.QueryEscape(username)
  407. escapedPassword := url.QueryEscape(password)
  408. urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr)
  409. clientOptions := options.Client().ApplyURI(urls).SetDirect(direct)
  410. // 连接到MongoDB
  411. client, err := mongo.Connect(context.TODO(), clientOptions)
  412. if err != nil {
  413. log.Fatal(err)
  414. }
  415. defer func() {
  416. if err := client.Disconnect(context.TODO()); err != nil {
  417. log.Fatal(err)
  418. }
  419. }()
  420. statisticMap := make(map[string]int)
  421. // 获取要查询的集合
  422. sourceCollection := client.Database("qfw").Collection("wcc_bank_poc3")
  423. //写入新表
  424. targetCollection := client.Database("qfw").Collection("wcc_bank_poc3_statistic")
  425. // 设置查询条件
  426. filter := bson.D{
  427. //{"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
  428. //{"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
  429. }
  430. // 设置投影,排除 contenthtml 字段
  431. projection := bson.D{
  432. {"s_winner", 1}, // 0表示不返回该字段
  433. {"projectname", 1}, // 0表示不返回该字段
  434. }
  435. // 获取查询结果的总文档数
  436. totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil)
  437. if err != nil {
  438. log.Fatal(err)
  439. }
  440. log.Println("总数量:", totalCount)
  441. findOptions := options.Find().SetProjection(projection)
  442. // 执行查询
  443. cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions)
  444. if err != nil {
  445. log.Fatal(err)
  446. }
  447. defer cursor.Close(context.TODO())
  448. //凭安企业库
  449. MgoC := &mongodb.MongodbSim{
  450. MongodbAddr: "172.17.4.181:27001",
  451. DbName: "mixdata",
  452. Size: 10,
  453. UserName: "",
  454. Password: "",
  455. //Direct: true,
  456. }
  457. MgoC.InitPool()
  458. count := 0
  459. // 迭代查询结果
  460. for cursor.Next(context.TODO()) {
  461. var result = make(map[string]interface{})
  462. if err := cursor.Decode(&result); err != nil {
  463. log.Fatal(err)
  464. }
  465. count++
  466. if count%10000 == 0 {
  467. log.Println("current count", count)
  468. }
  469. projectName := util.ObjToString(result["projectname"])
  470. if projectName == "" {
  471. continue
  472. }
  473. winner := util.ObjToString(result["s_winner"])
  474. if winner == "" {
  475. continue
  476. }
  477. if strings.Contains(winner, ",") {
  478. winners := strings.Split(winner, ",")
  479. for _, v := range winners {
  480. statisticMap[v]++
  481. }
  482. }
  483. }
  484. for k, v := range statisticMap {
  485. insert := map[string]interface{}{
  486. "winner": k,
  487. "num": v,
  488. }
  489. //存入新表
  490. _, err := targetCollection.InsertOne(context.TODO(), insert)
  491. if err != nil {
  492. log.Fatal(err)
  493. }
  494. }
  495. log.Println("over")
  496. }
  497. // exportPoc 导出合作渠道银行POC需求;龚文华 需求;函数作废
  498. func exportPoc() {
  499. //InitMgo()
  500. username := "SJZY_RWbid_ES"
  501. password := "SJZY@B4i4D5e6S"
  502. addr := "172.17.189.140:27080"
  503. //addr := "127.0.0.1:27083"
  504. direct := true
  505. //url := fmt.Sprintf("mongodb://%s:%s@%s", username, password, addr)
  506. //clientOptions := options.Client().ApplyURI(url)
  507. if !strings.Contains(addr, "127") {
  508. direct = false
  509. }
  510. // Escape special characters in username and password
  511. escapedUsername := url.QueryEscape(username)
  512. escapedPassword := url.QueryEscape(password)
  513. // Construct MongoDB connection string
  514. urls := fmt.Sprintf("mongodb://%s:%s@%s", escapedUsername, escapedPassword, addr)
  515. clientOptions := options.Client().ApplyURI(urls).SetDirect(direct)
  516. //clientOptions := options.Client().ApplyURI("mongodb://SJZY_RWbid_ES:SJZY%40B4i4D5e6S@172.17.145.163:27083")
  517. // 连接到MongoDB
  518. client, err := mongo.Connect(context.TODO(), clientOptions)
  519. if err != nil {
  520. log.Fatal(err)
  521. }
  522. defer func() {
  523. if err := client.Disconnect(context.TODO()); err != nil {
  524. log.Fatal(err)
  525. }
  526. }()
  527. // 获取要查询的集合
  528. sourceCollection := client.Database("qfw").Collection("bidding")
  529. //写入新表
  530. targetCollection := client.Database("qfw").Collection("wcc_bank_poc")
  531. // 设置查询条件
  532. filter := bson.D{
  533. {"comeintime", bson.M{"$gte": 1640966400, "$lte": 1703952000}},
  534. {"subtype", bson.M{"$in": []string{"中标", "单一", "成交", "合同"}}},
  535. }
  536. // 设置投影,排除 contenthtml 字段
  537. projection := bson.D{
  538. {"contenthtml", 0}, // 0表示不返回该字段
  539. {"attach_text", 0}, // 0表示不返回该字段
  540. }
  541. // 获取查询结果的总文档数
  542. totalCount, err := sourceCollection.EstimatedDocumentCount(context.Background(), nil)
  543. if err != nil {
  544. log.Fatal(err)
  545. }
  546. log.Println("总数量:", totalCount)
  547. // 设置每批次查询的文档数量和当前批次数
  548. batchSize := 1000000
  549. currentBatch := 1 //当前处理批次
  550. recordCount := 0
  551. // 执行分批查询和导出
  552. for skip := 0; skip < int(totalCount); skip += batchSize {
  553. // 设置查询选项,包括批次大小和偏移量,过滤查询字段
  554. findOptions := options.Find().SetSkip(int64(skip)).SetLimit(int64(batchSize)).SetProjection(projection)
  555. // 执行查询
  556. cursor, err := sourceCollection.Find(context.TODO(), filter, findOptions)
  557. if err != nil {
  558. log.Fatal(err)
  559. }
  560. defer cursor.Close(context.TODO())
  561. // 迭代查询结果
  562. for cursor.Next(context.TODO()) {
  563. var result = make(map[string]interface{})
  564. if err := cursor.Decode(&result); err != nil {
  565. log.Fatal(err)
  566. }
  567. recordCount++
  568. if recordCount%10000 == 0 {
  569. log.Printf("已处理 %d 批次 %d 条记录\n", currentBatch, recordCount)
  570. }
  571. //过滤重复数据
  572. if util.IntAll(result["extracttype"]) != 1 {
  573. continue
  574. }
  575. // 处理查询结果
  576. area := util.ObjToString(result["area"])
  577. areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  578. if !IsInStringArray(area, areas) {
  579. continue
  580. }
  581. projectName := util.ObjToString(result["projectname"])
  582. if strings.Contains(projectName, "非政府") {
  583. continue
  584. }
  585. buyerclass := util.ObjToString(result["buyerclass"])
  586. if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  587. continue
  588. }
  589. //存入新表
  590. _, err := targetCollection.InsertOne(context.TODO(), result)
  591. if err != nil {
  592. log.Fatal(err)
  593. }
  594. }
  595. if err := cursor.Err(); err != nil {
  596. log.Fatal(err)
  597. }
  598. log.Printf("批次 %d 完成\n", currentBatch)
  599. // 增加当前批次数
  600. currentBatch++
  601. // 可选:如果查询结果较大,可以使用游标的超时机制,以防止游标在处理较大数据集时被自动关闭
  602. //time.Sleep(5 * time.Second)
  603. }
  604. log.Println("over")
  605. }
  606. **/
  607. // IsInStringArray 判断数组中是否存在字符串
  608. func IsInStringArray(str string, arr []string) bool {
  609. // 先对字符串数组进行排序
  610. sort.Strings(arr)
  611. // 使用二分查找算法查找字符串
  612. pos := sort.SearchStrings(arr, str)
  613. // 如果找到了则返回 true,否则返回 false
  614. return pos < len(arr) && arr[pos] == str
  615. }