main.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from concurrent.futures import ThreadPoolExecutor
  2. from crawl_spiders import crawl_spider
  3. from exceptions import InvalidProxiesException
  4. from utils.databases import MongoDBS
  5. from utils.log import logger
  6. from utils.socks5 import Proxy
  7. proxy = Proxy(True)
  8. # def save_data_by_mongo(collection: str, results: list, item: dict):
  9. # success = MongoDBS('py_spider', collection).collection.update_one(
  10. # {"_id": item["_id"]},
  11. # {'$set': {'basic_info': results}}
  12. # )
  13. # msg = "[Mongo数据库]{} 查询结果:{}条 更新:{}条 ".format(
  14. # item['_id'],
  15. # len(results),
  16. # success.modified_count,
  17. # )
  18. # logger.info(msg)
  19. def save_data_by_mongo(collection: str, results: list, item: dict):
  20. success = MongoDBS('py_spider', collection).collection.insert_many(results)
  21. msg = "[Mongo数据库]{} 查询结果:{}条 成功添加:{}条 ".format(
  22. item['company_name'],
  23. len(results),
  24. len(success.inserted_ids)
  25. )
  26. logger.info(msg)
  27. def crawl_spiders(item: dict):
  28. global proxy
  29. while True:
  30. try:
  31. # results = crawl_spider(item['name'], proxy.proxies)
  32. results = crawl_spider(item['company_name'], proxy.proxies)
  33. if len(results) > 0:
  34. # save_data_by_mongo('buyer_err', results, item)
  35. save_data_by_mongo('company_basic_info_all', results, item)
  36. break
  37. except InvalidProxiesException:
  38. proxy.switch()
  39. # def main():
  40. # query = {'basic_info': {'$exists': False}}
  41. # with ThreadPoolExecutor(max_workers=10) as Executor:
  42. # with MongoDBS('py_spider', 'buyer_err') as coll:
  43. # with coll.find(query, no_cursor_timeout=True, batch_size=10) as cursor:
  44. # # task = []
  45. # # for item in cursor.limit(10):
  46. # # task.append(Executor.submit(crawl_spiders, item))
  47. # # wait(task, return_when=ALL_COMPLETED)
  48. # Executor.map(crawl_spiders, cursor)
  49. def main():
  50. query = {'basic_info': {'$exists': False}}
  51. with ThreadPoolExecutor(max_workers=10) as Executor:
  52. # with MongoDBS('py_spider', 'company_basic_info_all') as coll:
  53. with MongoDBS('py_spider', 'company_name') as coll:
  54. with coll.find(no_cursor_timeout=True, batch_size=10) as cursor:
  55. Executor.map(crawl_spiders, cursor)
  56. if __name__ == '__main__':
  57. main()