# -*- coding: utf-8 -*- """ Created on 2023-12-07 --------- @summary: 政府部门 数据挖掘 --------- @author: Dzr """ import datetime import re from concurrent.futures import ThreadPoolExecutor, as_completed import bson import crawler.utils as tools from common.databases import mongo_table, redis_client from common.log import logger from common.tools import sha1 from crawler.analysis import parser_items from crawler.download import Downloader gov_lst = mongo_table('dzr', 'BasicDataList') gov_task_lst = mongo_table('dzr', 'GovDataList') r = redis_client() r_key = 'gov_2023' downloader = Downloader(max_retries=0, disable_debug_log=False) # 数据挖掘层级深度 max_excavate_depth = 3 def to_mongodb(host, title, href, depth, **kwargs): gov_task_lst.insert_one({ 'host': host, 'href': href, 'title': title, 'depth': depth, 'is_crawl': False, 'create_at': bson.Int64(int(datetime.datetime.now().timestamp())), **kwargs }) r.hset(r_key, sha1(href), '') # 添加数据指纹 def deduplicate(href): if not r.hexists(r_key, sha1(href)): return False return True def deduplicate_task_add_to_mongodb(host, title, href, depth, **kwargs): """ :param str host: :param str title: :param str href: :param int depth: :param kwargs: """ if not deduplicate(href): to_mongodb(host, title, href, depth, **kwargs) def production_data_excavate_tasks(): data_lst = [] query = {"collect": "否"} with gov_lst.find(query, projection={"site": 1, "href": 1}) as cursor: for doc in cursor: site = str(doc["site"]).strip() href = str(doc["href"]).strip() if not tools.is_url(href): continue args = list(filter(lambda x: x is not None, tools.get_host(href))) if len(args) > 2: host = "{0}://{1}:{2}".format(*args) else: host = "{0}://{1}".format(*args) if not re.search("^http[s|]?", href): href = host task = { 'href': href, 'origin': href, 'host': host, 'title': site, 'site': site, 'depth': 1, 'datalist_id': str(doc['_id']) # 数据源主键 } data_lst.append(task) return data_lst def get_tasks(query, projection=None, limit=100): with gov_task_lst.find(query, projection=projection, limit=limit) as cursor: data_lst = [item for item in cursor.sort([('depth', 1)])] return data_lst def get_response_by_request(url, host): response = downloader.get(url, timeout=10) # Decode unicode from given encoding. try: content = str(response.content, response.encoding, errors="replace") except (LookupError, TypeError): content = str(response.content, errors="replace") if response.status_code == 200 and content not in ["", None]: items = parser_items(content, url=host, mode=1) # 同源抽取 text_lst = tools.extract_text(content, parser="bs4").split() # 去除所有不包含中文的文本 text_lst = list(filter(lambda x: re.search('[\u4e00-\u9fa5]', x) is not None, text_lst)) # 过滤短语(长度小于10) text_lst = list(filter(lambda x: len(x) > 10, text_lst)) # 招投标文本预测命中数量 hits = tools.predict_bidding_model_v2(text_lst) if text_lst else 0 result = { 'href': url, 'host': host, 'total': len(text_lst), # 招投标预测文档总量 'hits': hits, # 有效量 'items': items } return result def spider(task): success, dedup = 0, 0 update = { 'is_crawl': True, 'fetch': 0, # 页面访问是否成功;0=失败 1=成功 'depth': task['depth'], } try: response = get_response_by_request(task['href'], task['host']) if response: update['docs'] = response['total'] # 数据挖掘的文本量 update['hits'] = response['hits'] # 招投标预测命中的文本量 update['fetch'] = 1 # 访问成功的标识 for ret in response['items']: if deduplicate(ret['href']): dedup += 1 continue excavate = { 'title': ret['title'], 'href': ret['href'], 'depth': update['depth'] + 1, 'host': task['host'], 'origin': task['origin'], 'datalist_id': task['datalist_id'] } if excavate['depth'] > max_excavate_depth: continue to_mongodb(**excavate, site=task['site']) success += 1 except Exception as e: logger.exception(e) # 更新任务详情 update['update_at'] = bson.Int64(int(datetime.datetime.now().timestamp())) gov_task_lst.update_one({'_id': task['_id']}, {'$set': update}) return success, dedup def start(query, workers=1, init_excavate=False): if init_excavate: logger.info("创建数据挖掘任务...") task_lst = production_data_excavate_tasks() for task in task_lst: deduplicate_task_add_to_mongodb(**task) while True: tasks = get_tasks(query, limit=1000) logger.info(f"数据挖掘任务加载 {len(tasks)} 条") with ThreadPoolExecutor(max_workers=workers) as executor: fs = [executor.submit(spider, task) for task in tasks] logger.info(f"待处理数据挖掘任务 {len(fs)} 条") for f in as_completed(fs): pubs, dupl = f.result() tips = [f"发布任务 {pubs} 条", f"重复任务 {dupl} 条"] logger.info(" ".join(tips)) logger.info("数据挖掘结束,等待加载新任务...") if not tasks: break if __name__ == '__main__': start( init_excavate=True, query={"is_crawl": False}, # 数据挖掘条件 workers=20, )