fetch_list.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2024-11-11
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Dzr
  8. """
  9. import time
  10. from concurrent.futures import ThreadPoolExecutor
  11. from bson import Int64
  12. from pymongo import MongoClient
  13. import net
  14. import setting
  15. from db.RedisDB import RedisFilter
  16. from log import logger
  17. def spider(url):
  18. items = net.download_list(url, proxies=net.get_proxy())
  19. if not items:
  20. logger.error(f'列表数据|下载失败|{url}')
  21. return
  22. logger.info(f'列表数据|下载成功|{url}')
  23. results = []
  24. for item in items:
  25. results.append({
  26. 'site': '供应商网',
  27. 'channel': '最新采购',
  28. 'spidercode': 'a_gysw_zxcg',
  29. 'area': item['area'],
  30. 'city': item['city'],
  31. 'district': item['district'],
  32. 'href': item['href'],
  33. 'title': item['title'],
  34. 'publishtime': item['publishtime'],
  35. 'l_np_publishtime': Int64(item['l_np_publishtime']),
  36. 'comeintime': Int64(int(time.time())),
  37. 'T': 'bidding',
  38. 'infoformat': 1,
  39. 'sendflag': 'false',
  40. 'iscompete': True,
  41. '_d': 'comeintime',
  42. 'publishdept': '',
  43. 'type': '',
  44. 'is_mixed': True,
  45. })
  46. return results
  47. def main():
  48. while True:
  49. client = MongoClient(setting.MONGO_HOST, setting.MONGO_PORT)
  50. to_coll = client[setting.MONGO_DB][setting.MONGO_LIST_COLL]
  51. to_dedup = RedisFilter(url=setting.REDIS_URL)
  52. urls = (f'https://www.gys.cn/buy/purchase/{i}.html' for i in range(1, 101))
  53. with ThreadPoolExecutor(max_workers=4) as executor:
  54. fs = executor.map(spider, urls)
  55. for f in fs:
  56. items = f or []
  57. data_count = 0
  58. dedupe_count = 0
  59. unique = []
  60. inserts = []
  61. for item in items:
  62. href = item['href']
  63. if not to_dedup.get(href):
  64. inserts.append(item)
  65. unique.append(href)
  66. else:
  67. dedupe_count += 1
  68. if len(inserts) == 50:
  69. to_coll.insert_many(inserts, ordered=False)
  70. to_dedup.add(unique)
  71. data_count += len(inserts)
  72. inserts = []
  73. unique = []
  74. if len(inserts) > 0:
  75. to_coll.insert_many(inserts, ordered=False)
  76. to_dedup.add(unique)
  77. data_count += len(inserts)
  78. logger.info(f'列表数据|数据处理|重复{dedupe_count}条|入库{data_count}条')
  79. logger.info(f'列表数据|数据下载|10m后执行...')
  80. time.sleep(600)
  81. if __name__ == '__main__':
  82. try:
  83. main()
  84. except KeyboardInterrupt:
  85. pass
  86. except Exception as e:
  87. logger.exception(e)
  88. finally:
  89. net.send_wechat_warning('列表采集被中止')