from concurrent.futures import ThreadPoolExecutor from crawl_spiders import crawl_spider from exceptions import InvalidProxiesException from utils.databases import MongoDBS from utils.log import logger from utils.socks5 import Proxy proxy = Proxy(True) # def save_data_by_mongo(collection: str, results: list, item: dict): # success = MongoDBS('py_spider', collection).collection.update_one( # {"_id": item["_id"]}, # {'$set': {'basic_info': results}} # ) # msg = "[Mongo数据库]{} 查询结果:{}条 更新:{}条 ".format( # item['_id'], # len(results), # success.modified_count, # ) # logger.info(msg) def save_data_by_mongo(collection: str, results: list, item: dict): success = MongoDBS('py_spider', collection).collection.insert_many(results) msg = "[Mongo数据库]{} 查询结果:{}条 成功添加:{}条 ".format( item['company_name'], len(results), len(success.inserted_ids) ) logger.info(msg) def crawl_spiders(item: dict): global proxy while True: try: # results = crawl_spider(item['name'], proxy.proxies) results = crawl_spider(item['company_name'], proxy.proxies) if len(results) > 0: # save_data_by_mongo('buyer_err', results, item) save_data_by_mongo('company_basic_info_all', results, item) break except InvalidProxiesException: proxy.switch() # def main(): # query = {'basic_info': {'$exists': False}} # with ThreadPoolExecutor(max_workers=10) as Executor: # with MongoDBS('py_spider', 'buyer_err') as coll: # with coll.find(query, no_cursor_timeout=True, batch_size=10) as cursor: # # task = [] # # for item in cursor.limit(10): # # task.append(Executor.submit(crawl_spiders, item)) # # wait(task, return_when=ALL_COMPLETED) # Executor.map(crawl_spiders, cursor) def main(): query = {'basic_info': {'$exists': False}} with ThreadPoolExecutor(max_workers=10) as Executor: # with MongoDBS('py_spider', 'company_basic_info_all') as coll: with MongoDBS('py_spider', 'company_name') as coll: with coll.find(no_cursor_timeout=True, batch_size=10) as cursor: Executor.map(crawl_spiders, cursor) if __name__ == '__main__': main()