123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-12-07
- ---------
- @summary: 政府部门 数据挖掘
- ---------
- @author: Dzr
- """
- import datetime
- import re
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import bson
- import crawler.utils as tools
- from common.databases import mongo_table, redis_client
- from common.log import logger
- from common.tools import sha1
- from crawler.analysis import parser_items
- from crawler.download import Downloader
- gov_lst = mongo_table('dzr', 'BasicDataList')
- gov_task_lst = mongo_table('dzr', 'GovDataList')
- r = redis_client()
- r_key = 'gov_2023'
- downloader = Downloader(max_retries=0, disable_debug_log=False)
- # 数据挖掘层级深度
- max_excavate_depth = 3
- def to_mongodb(host, title, href, depth, **kwargs):
- gov_task_lst.insert_one({
- 'host': host,
- 'href': href,
- 'title': title,
- 'depth': depth,
- 'is_crawl': False,
- 'create_at': bson.Int64(int(datetime.datetime.now().timestamp())),
- **kwargs
- })
- r.hset(r_key, sha1(href), '') # 添加数据指纹
- def deduplicate(href):
- if not r.hexists(r_key, sha1(href)):
- return False
- return True
- def deduplicate_task_add_to_mongodb(host, title, href, depth, **kwargs):
- """
- :param str host:
- :param str title:
- :param str href:
- :param int depth:
- :param kwargs:
- """
- if not deduplicate(href):
- to_mongodb(host, title, href, depth, **kwargs)
- def production_data_excavate_tasks():
- data_lst = []
- query = {"collect": "否"}
- with gov_lst.find(query, projection={"site": 1, "href": 1}) as cursor:
- for doc in cursor:
- site = str(doc["site"]).strip()
- href = str(doc["href"]).strip()
- if not tools.is_url(href):
- continue
- args = list(filter(lambda x: x is not None, tools.get_host(href)))
- if len(args) > 2:
- host = "{0}://{1}:{2}".format(*args)
- else:
- host = "{0}://{1}".format(*args)
- if not re.search("^http[s|]?", href):
- href = host
- task = {
- 'href': href,
- 'origin': href,
- 'host': host,
- 'title': site,
- 'site': site,
- 'depth': 1,
- 'datalist_id': str(doc['_id']) # 数据源主键
- }
- data_lst.append(task)
- return data_lst
- def get_tasks(query, projection=None, limit=100):
- with gov_task_lst.find(query, projection=projection, limit=limit) as cursor:
- data_lst = [item for item in cursor.sort([('depth', 1)])]
- return data_lst
- def get_response_by_request(url, host):
- response = downloader.get(url, timeout=10)
- # Decode unicode from given encoding.
- try:
- content = str(response.content, response.encoding, errors="replace")
- except (LookupError, TypeError):
- content = str(response.content, errors="replace")
- if response.status_code == 200 and content not in ["", None]:
- items = parser_items(content, url=host, mode=1) # 同源抽取
- text_lst = tools.extract_text(content, parser="bs4").split()
- # 去除所有不包含中文的文本
- text_lst = list(filter(lambda x: re.search('[\u4e00-\u9fa5]', x) is not None, text_lst))
- # 过滤短语(长度小于10)
- text_lst = list(filter(lambda x: len(x) > 10, text_lst))
- # 招投标文本预测命中数量
- hits = tools.predict_bidding_model_v2(text_lst) if text_lst else 0
- result = {
- 'href': url,
- 'host': host,
- 'total': len(text_lst), # 招投标预测文档总量
- 'hits': hits, # 有效量
- 'items': items
- }
- return result
- def spider(task):
- success, dedup = 0, 0
- update = {
- 'is_crawl': True,
- 'fetch': 0, # 页面访问是否成功;0=失败 1=成功
- 'depth': task['depth'],
- }
- try:
- response = get_response_by_request(task['href'], task['host'])
- if response:
- update['docs'] = response['total'] # 数据挖掘的文本量
- update['hits'] = response['hits'] # 招投标预测命中的文本量
- update['fetch'] = 1 # 访问成功的标识
- for ret in response['items']:
- if deduplicate(ret['href']):
- dedup += 1
- continue
- excavate = {
- 'title': ret['title'],
- 'href': ret['href'],
- 'depth': update['depth'] + 1,
- 'host': task['host'],
- 'origin': task['origin'],
- 'datalist_id': task['datalist_id']
- }
- if excavate['depth'] > max_excavate_depth:
- continue
- to_mongodb(**excavate, site=task['site'])
- success += 1
- except Exception as e:
- logger.exception(e)
- # 更新任务详情
- update['update_at'] = bson.Int64(int(datetime.datetime.now().timestamp()))
- gov_task_lst.update_one({'_id': task['_id']}, {'$set': update})
- return success, dedup
- def start(query, workers=1, init_excavate=False):
- if init_excavate:
- logger.info("创建数据挖掘任务...")
- task_lst = production_data_excavate_tasks()
- for task in task_lst:
- deduplicate_task_add_to_mongodb(**task)
- while True:
- tasks = get_tasks(query, limit=1000)
- logger.info(f"数据挖掘任务加载 {len(tasks)} 条")
- with ThreadPoolExecutor(max_workers=workers) as executor:
- fs = [executor.submit(spider, task) for task in tasks]
- logger.info(f"待处理数据挖掘任务 {len(fs)} 条")
- for f in as_completed(fs):
- pubs, dupl = f.result()
- tips = [f"发布任务 {pubs} 条", f"重复任务 {dupl} 条"]
- logger.info(" ".join(tips))
- logger.info("数据挖掘结束,等待加载新任务...")
- if not tasks:
- break
- if __name__ == '__main__':
- start(
- init_excavate=True,
- query={"is_crawl": False}, # 数据挖掘条件
- workers=20,
- )
|