# -*- coding: utf-8 -*- """ Created on 2024-11-11 --------- @summary: --------- @author: Dzr """ import time from concurrent.futures import ThreadPoolExecutor from bson import Int64 from pymongo import MongoClient import net import setting from db.RedisDB import RedisFilter from log import logger def spider(url): items = net.download_list(url, proxies=net.get_proxy()) if not items: logger.error(f'列表数据|下载失败|{url}') return logger.info(f'列表数据|下载成功|{url}') results = [] for item in items: results.append({ 'site': '供应商网', 'channel': '最新采购', 'spidercode': 'a_gysw_zxcg', 'area': item['area'], 'city': item['city'], 'district': item['district'], 'href': item['href'], 'title': item['title'], 'publishtime': item['publishtime'], 'l_np_publishtime': Int64(item['l_np_publishtime']), 'comeintime': Int64(int(time.time())), 'T': 'bidding', 'infoformat': 1, 'sendflag': 'false', 'iscompete': True, '_d': 'comeintime', 'publishdept': '', 'type': '', 'is_mixed': True, }) return results def main(): while True: client = MongoClient(setting.MONGO_HOST, setting.MONGO_PORT) to_coll = client[setting.MONGO_DB][setting.MONGO_LIST_COLL] to_dedup = RedisFilter(url=setting.REDIS_URL) urls = (f'https://www.gys.cn/buy/purchase/{i}.html' for i in range(1, 101)) with ThreadPoolExecutor(max_workers=4) as executor: fs = executor.map(spider, urls) for f in fs: items = f or [] data_count = 0 dedupe_count = 0 unique = [] inserts = [] for item in items: href = item['href'] if not to_dedup.get(href): inserts.append(item) unique.append(href) else: dedupe_count += 1 if len(inserts) == 50: to_coll.insert_many(inserts, ordered=False) to_dedup.add(unique) data_count += len(inserts) inserts = [] unique = [] if len(inserts) > 0: to_coll.insert_many(inserts, ordered=False) to_dedup.add(unique) data_count += len(inserts) logger.info(f'列表数据|数据处理|重复{dedupe_count}条|入库{data_count}条') logger.info(f'列表数据|数据下载|10m后执行...') time.sleep(600) if __name__ == '__main__': try: main() except KeyboardInterrupt: pass except Exception as e: logger.exception(e) finally: net.send_wechat_warning('列表采集被中止')