123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-11-11
- ---------
- @summary:
- ---------
- @author: Dzr
- """
- import time
- from concurrent.futures import ThreadPoolExecutor
- from bson import Int64
- from pymongo import MongoClient
- import net
- import setting
- from db.RedisDB import RedisFilter
- from log import logger
- def spider(url):
- items = net.download_list(url, proxies=net.get_proxy())
- if not items:
- logger.error(f'列表数据|下载失败|{url}')
- return
- logger.info(f'列表数据|下载成功|{url}')
- results = []
- for item in items:
- results.append({
- 'site': '供应商网',
- 'channel': '最新采购',
- 'spidercode': 'a_gysw_zxcg',
- 'area': item['area'],
- 'city': item['city'],
- 'district': item['district'],
- 'href': item['href'],
- 'title': item['title'],
- 'publishtime': item['publishtime'],
- 'l_np_publishtime': Int64(item['l_np_publishtime']),
- 'comeintime': Int64(int(time.time())),
- 'T': 'bidding',
- 'infoformat': 1,
- 'sendflag': 'false',
- 'iscompete': True,
- '_d': 'comeintime',
- 'publishdept': '',
- 'type': '',
- 'is_mixed': True,
- })
- return results
- def main():
- while True:
- client = MongoClient(setting.MONGO_HOST, setting.MONGO_PORT)
- to_coll = client[setting.MONGO_DB][setting.MONGO_LIST_COLL]
- to_dedup = RedisFilter(url=setting.REDIS_URL)
- urls = (f'https://www.gys.cn/buy/purchase/{i}.html' for i in range(1, 101))
- with ThreadPoolExecutor(max_workers=4) as executor:
- fs = executor.map(spider, urls)
- for f in fs:
- items = f or []
- data_count = 0
- dedupe_count = 0
- unique = []
- inserts = []
- for item in items:
- href = item['href']
- if not to_dedup.get(href):
- inserts.append(item)
- unique.append(href)
- else:
- dedupe_count += 1
- if len(inserts) == 50:
- to_coll.insert_many(inserts, ordered=False)
- to_dedup.add(unique)
- data_count += len(inserts)
- inserts = []
- unique = []
- if len(inserts) > 0:
- to_coll.insert_many(inserts, ordered=False)
- to_dedup.add(unique)
- data_count += len(inserts)
- logger.info(f'列表数据|数据处理|重复{dedupe_count}条|入库{data_count}条')
- logger.info(f'列表数据|数据下载|10m后执行...')
- time.sleep(600)
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- logger.exception(e)
- finally:
- net.send_wechat_warning('列表采集被中止')
|