government.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-12-07
  4. ---------
  5. @summary: 政府部门 数据挖掘
  6. ---------
  7. @author: Dzr
  8. """
  9. import datetime
  10. import re
  11. from concurrent.futures import ThreadPoolExecutor, as_completed
  12. import bson
  13. import crawler.utils as tools
  14. from common.databases import mongo_table, redis_client
  15. from common.log import logger
  16. from common.tools import sha1
  17. from crawler.analysis import parser_items
  18. from crawler.download import Downloader
  19. gov_lst = mongo_table('dzr', 'BasicDataList')
  20. gov_task_lst = mongo_table('dzr', 'GovDataList')
  21. r = redis_client()
  22. r_key = 'gov_2023'
  23. downloader = Downloader(max_retries=0, disable_debug_log=False)
  24. # 数据挖掘层级深度
  25. max_excavate_depth = 3
  26. def to_mongodb(host, title, href, depth, **kwargs):
  27. gov_task_lst.insert_one({
  28. 'host': host,
  29. 'href': href,
  30. 'title': title,
  31. 'depth': depth,
  32. 'is_crawl': False,
  33. 'create_at': bson.Int64(int(datetime.datetime.now().timestamp())),
  34. **kwargs
  35. })
  36. r.hset(r_key, sha1(href), '') # 添加数据指纹
  37. def deduplicate(href):
  38. if not r.hexists(r_key, sha1(href)):
  39. return False
  40. return True
  41. def deduplicate_task_add_to_mongodb(host, title, href, depth, **kwargs):
  42. """
  43. :param str host:
  44. :param str title:
  45. :param str href:
  46. :param int depth:
  47. :param kwargs:
  48. """
  49. if not deduplicate(href):
  50. to_mongodb(host, title, href, depth, **kwargs)
  51. def production_data_excavate_tasks():
  52. data_lst = []
  53. query = {"collect": "否"}
  54. with gov_lst.find(query, projection={"site": 1, "href": 1}) as cursor:
  55. for doc in cursor:
  56. site = str(doc["site"]).strip()
  57. href = str(doc["href"]).strip()
  58. if not tools.is_url(href):
  59. continue
  60. args = list(filter(lambda x: x is not None, tools.get_host(href)))
  61. if len(args) > 2:
  62. host = "{0}://{1}:{2}".format(*args)
  63. else:
  64. host = "{0}://{1}".format(*args)
  65. if not re.search("^http[s|]?", href):
  66. href = host
  67. task = {
  68. 'href': href,
  69. 'origin': href,
  70. 'host': host,
  71. 'title': site,
  72. 'site': site,
  73. 'depth': 1,
  74. 'datalist_id': str(doc['_id']) # 数据源主键
  75. }
  76. data_lst.append(task)
  77. return data_lst
  78. def get_tasks(query, projection=None, limit=100):
  79. with gov_task_lst.find(query, projection=projection, limit=limit) as cursor:
  80. data_lst = [item for item in cursor.sort([('depth', 1)])]
  81. return data_lst
  82. def get_response_by_request(url, host):
  83. response = downloader.get(url, timeout=10)
  84. # Decode unicode from given encoding.
  85. try:
  86. content = str(response.content, response.encoding, errors="replace")
  87. except (LookupError, TypeError):
  88. content = str(response.content, errors="replace")
  89. if response.status_code == 200 and content not in ["", None]:
  90. items = parser_items(content, url=host, mode=1) # 同源抽取
  91. text_lst = tools.extract_text(content, parser="bs4").split()
  92. # 去除所有不包含中文的文本
  93. text_lst = list(filter(lambda x: re.search('[\u4e00-\u9fa5]', x) is not None, text_lst))
  94. # 过滤短语(长度小于10)
  95. text_lst = list(filter(lambda x: len(x) > 10, text_lst))
  96. # 招投标文本预测命中数量
  97. hits = tools.predict_bidding_model_v2(text_lst) if text_lst else 0
  98. result = {
  99. 'href': url,
  100. 'host': host,
  101. 'total': len(text_lst), # 招投标预测文档总量
  102. 'hits': hits, # 有效量
  103. 'items': items
  104. }
  105. return result
  106. def spider(task):
  107. success, dedup = 0, 0
  108. update = {
  109. 'is_crawl': True,
  110. 'fetch': 0, # 页面访问是否成功;0=失败 1=成功
  111. 'depth': task['depth'],
  112. }
  113. try:
  114. response = get_response_by_request(task['href'], task['host'])
  115. if response:
  116. update['docs'] = response['total'] # 数据挖掘的文本量
  117. update['hits'] = response['hits'] # 招投标预测命中的文本量
  118. update['fetch'] = 1 # 访问成功的标识
  119. for ret in response['items']:
  120. if deduplicate(ret['href']):
  121. dedup += 1
  122. continue
  123. excavate = {
  124. 'title': ret['title'],
  125. 'href': ret['href'],
  126. 'depth': update['depth'] + 1,
  127. 'host': task['host'],
  128. 'origin': task['origin'],
  129. 'datalist_id': task['datalist_id']
  130. }
  131. if excavate['depth'] > max_excavate_depth:
  132. continue
  133. to_mongodb(**excavate, site=task['site'])
  134. success += 1
  135. except Exception as e:
  136. logger.exception(e)
  137. # 更新任务详情
  138. update['update_at'] = bson.Int64(int(datetime.datetime.now().timestamp()))
  139. gov_task_lst.update_one({'_id': task['_id']}, {'$set': update})
  140. return success, dedup
  141. def start(query, workers=1, init_excavate=False):
  142. if init_excavate:
  143. logger.info("创建数据挖掘任务...")
  144. task_lst = production_data_excavate_tasks()
  145. for task in task_lst:
  146. deduplicate_task_add_to_mongodb(**task)
  147. while True:
  148. tasks = get_tasks(query, limit=1000)
  149. logger.info(f"数据挖掘任务加载 {len(tasks)} 条")
  150. with ThreadPoolExecutor(max_workers=workers) as executor:
  151. fs = [executor.submit(spider, task) for task in tasks]
  152. logger.info(f"待处理数据挖掘任务 {len(fs)} 条")
  153. for f in as_completed(fs):
  154. pubs, dupl = f.result()
  155. tips = [f"发布任务 {pubs} 条", f"重复任务 {dupl} 条"]
  156. logger.info(" ".join(tips))
  157. logger.info("数据挖掘结束,等待加载新任务...")
  158. if not tasks:
  159. break
  160. if __name__ == '__main__':
  161. start(
  162. init_excavate=True,
  163. query={"is_crawl": False}, # 数据挖掘条件
  164. workers=20,
  165. )