spiders.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. import threading
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from typing import List, Mapping
  5. from common.databases import insert_one, int2long
  6. from common.execptions import ValidatorError, HostsRetrieveError
  7. from common.log import logger
  8. from common.tools import delay_by
  9. from crawler.Task import Task
  10. from crawler.analysis import parse_urls
  11. from crawler.download import Downloader
  12. from crawler.retrieve import Validator
  13. from crawler.schedule import Scheduler
  14. from crawler.search_engine import JySearchEngine
  15. from crawler.utils import (
  16. extract_base_url,
  17. extract_page_title,
  18. extract_domain,
  19. is_url,
  20. err_details
  21. )
  22. from settings import (
  23. REDIS_KEY,
  24. SPECIAL_ENCODINGS,
  25. MGO_LUA_SPIDERS,
  26. MGO_DOMAIN,
  27. MGO_URLS,
  28. MGO_ORGS,
  29. MGO_KEYWORDS,
  30. MGO_REMOVAL_DUPLICATE
  31. )
  32. class BasicSearch:
  33. def __init__(
  34. self,
  35. scheduler=None,
  36. validator=None,
  37. downloader=None,
  38. parser=None,
  39. org_weight=9,
  40. url_weight=8,
  41. keyword_weight=7,
  42. **kwargs
  43. ):
  44. self.scheduler = (scheduler or Scheduler())
  45. self.validator = (validator or Validator())
  46. self.downloader = (downloader or Downloader())
  47. self.parser = (parser or parse_urls)
  48. # mongo查询
  49. self.query = {'enable_added': {'$exists': False}}
  50. self.projection = {'name': 1}
  51. self.sort = [('_id', -1)]
  52. # 分类
  53. self.visit_classify = 'visit'
  54. self.query_classify = 'query'
  55. # 权重
  56. self.org_weight = org_weight
  57. self.url_weight = url_weight
  58. self.keyword_weight = keyword_weight
  59. # 归属组
  60. self.org_groups = 'organization'
  61. self.keyword_groups = 'keyword'
  62. self.url_groups = 'seed_url'
  63. self.competing_groups = 'competing_goods'
  64. @staticmethod
  65. def loops_interval(label, interval):
  66. next_run_time = delay_by((interval or 300))
  67. logger.info(f'执行:<{label}>,下次运行时间:{next_run_time}')
  68. time.sleep(interval)
  69. @staticmethod
  70. def make_task(**kwargs):
  71. """生成Task对象"""
  72. return Task(**kwargs)
  73. def seed_orgs(self) -> List[Mapping]:
  74. """组织|单位"""
  75. search_orgs = []
  76. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  77. for item in cursor.sort(self.sort):
  78. search_orgs.append(item)
  79. return search_orgs
  80. def seed_keywords(self):
  81. """关键词"""
  82. search_keywords = []
  83. cursor = MGO_KEYWORDS.find(projection=self.projection)
  84. for item in cursor.sort(self.sort):
  85. search_keywords.append(item['name'])
  86. return search_keywords
  87. def seed_urls(self) -> List[Mapping]:
  88. """种子urls"""
  89. search_urls = []
  90. cursor = MGO_URLS.find(self.query, projection=self.projection)
  91. for item in cursor.sort(self.sort):
  92. search_urls.append(item)
  93. return search_urls
  94. def lua_common_domains(self):
  95. """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
  96. parm_commons = []
  97. projection = {'param_common': 1}
  98. cursor = MGO_LUA_SPIDERS.find(projection=projection)
  99. for item in cursor.sort(self.sort):
  100. name = item['param_common'][1]
  101. try:
  102. url = item['param_common'][11]
  103. if not is_url(url):
  104. continue
  105. domain = extract_domain(url)
  106. except IndexError:
  107. continue
  108. exists_domain = self.validator.url(domain)
  109. if not exists_domain:
  110. parm_commons.append({'name': name, 'domain': domain})
  111. self.validator.add_url(domain)
  112. return parm_commons
  113. class SyncData(BasicSearch):
  114. def __init__(self, allow_load_filter=False, **kwargs):
  115. super(SyncData, self).__init__(**kwargs)
  116. self._init(allow_load_filter)
  117. def _init(self, allow_load_filter=False):
  118. threading.Thread(
  119. target=self.sync_data,
  120. name='SyncData'
  121. ).start()
  122. if allow_load_filter:
  123. self.validator.load_filter()
  124. def sync_data_urls(self):
  125. """同步网址数据"""
  126. # _interval = 7200
  127. # while True:
  128. items = self.seed_urls()
  129. lst = []
  130. for item in items:
  131. if not is_url(item['name']):
  132. items.remove(item)
  133. continue
  134. lst.append(self.make_task(
  135. url=item['name'],
  136. groups=self.url_groups,
  137. classify=self.visit_classify,
  138. weight=self.url_weight))
  139. self.scheduler.insert_tasks(lst, level=self.url_weight)
  140. for item in items:
  141. MGO_URLS.update_many(
  142. {'_id': item['_id']},
  143. {'$set': {'enable_added': True}}
  144. )
  145. logger.info(f'[同步数据]更新{len(items)}条网址数据')
  146. # self.loops_interval(self.sync_data_urls.__name__, _interval)
  147. def sync_data_keywords(self):
  148. """同步关键词数据"""
  149. # _interval = 1800
  150. # while True:
  151. words = self.seed_keywords()
  152. # 处理关键词格式并推送到任务队列
  153. words = [str(word).replace('&nbsp;', '').strip() for word in words]
  154. lst = []
  155. for word in words:
  156. lst.append(self.make_task(
  157. search=word,
  158. groups=self.keyword_groups,
  159. classify=self.query_classify,
  160. weight=self.keyword_weight))
  161. self.scheduler.insert_tasks(lst, level=self.keyword_weight)
  162. logger.info(f'[同步数据]更新{len(words)}条关键词数据')
  163. # self.loops_interval(self.sync_data_keywords.__name__, _interval)
  164. def sync_data_orgs(self):
  165. """同步组织单位数据"""
  166. # _interval = 3600
  167. # while True:
  168. items = self.seed_orgs()
  169. # 处理单位组织名称并推送到任务队列
  170. orgs = []
  171. for item in items:
  172. name = item.get('name')
  173. if name in ['', None]:
  174. logger.warning(f'[异常的单位组织]{item}')
  175. continue
  176. word = str(name).replace('&nbsp;', '').strip()
  177. orgs.append(word)
  178. lst = []
  179. for word in orgs:
  180. lst.append(self.make_task(
  181. search=word,
  182. groups=self.org_groups,
  183. classify=self.query_classify,
  184. weight=self.org_weight))
  185. self.scheduler.insert_tasks(lst, level=self.org_weight)
  186. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  187. for item in items:
  188. MGO_ORGS.update_one(
  189. {'_id': item['_id']},
  190. {'$set': {'enable_added': True}}
  191. )
  192. logger.info(f'[同步数据]更新{len(items)}个单位组织数据')
  193. # self.loops_interval(self.sync_data_orgs.__name__, _interval)
  194. def sync_lua_commons(self):
  195. """同步lua采集爬虫中网址与网址名称"""
  196. _interval = 3600
  197. # while True:
  198. items = self.lua_common_domains()
  199. for item in items:
  200. item['create_at'] = int2long(int(time.time()))
  201. MGO_REMOVAL_DUPLICATE.insert_one(item)
  202. logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
  203. # self.loops_interval(self.sync_lua_commons.__name__, _interval)
  204. def sync_data(self):
  205. """同步数据"""
  206. # threading.Thread(
  207. # target=self.sync_data_urls,
  208. # name='LoadingSeedUrls'
  209. # ).start()
  210. # threading.Thread(
  211. # target=self.sync_data_keywords,
  212. # name='LoadingSeedKeyWords'
  213. # ).start()
  214. # threading.Thread(
  215. # target=self.sync_data_orgs,
  216. # name='LoadingSeedOrgs'
  217. # ).start()
  218. # threading.Thread(
  219. # target=self.sync_lua_commons,
  220. # name='LoadingLuaCommons'
  221. # ).start()
  222. logger.info(f'[同步数据]初始化加载')
  223. _interval = 600
  224. while True:
  225. self.sync_lua_commons()
  226. self.sync_data_orgs()
  227. self.sync_data_urls()
  228. self.sync_data_keywords()
  229. self.loops_interval(self.sync_data.__name__, _interval)
  230. class SearchEngine(BasicSearch):
  231. def __init__(self, wait_task_interval=20, **kwargs):
  232. super(SearchEngine, self).__init__(**kwargs)
  233. self._wait_task_interval = wait_task_interval
  234. self._engines = []
  235. def set_search_engine(self, engine=None):
  236. if isinstance(engine, JySearchEngine):
  237. self._engines.append(engine)
  238. logger.info(f'添加搜索引擎<{engine.__class__.__name__}>完成')
  239. return self
  240. def set_search_engines(self, engines):
  241. for engine in engines:
  242. self.set_search_engine(engine)
  243. return self
  244. def enable_engine(self, engine):
  245. fname = self.enable_engine.__name__
  246. ename = engine.__class__.__name__
  247. logger.info(f'[搜索引擎]成功启动 - <{ename}>')
  248. while True:
  249. tasks = self.scheduler.get_task()
  250. if len(tasks) == 0:
  251. self.loops_interval(fname, self._wait_task_interval)
  252. task_key, task = tasks
  253. task['update_at'] = int2long(int(time.time()))
  254. if task['classify'] == self.visit_classify:
  255. self.scheduler.insert_task(task, level=task['weight'])
  256. else:
  257. word = task['search']
  258. logger.info(f"<{ename}> {task['groups']} >>> {word}")
  259. urls = engine.search(word)
  260. lst = []
  261. for url in urls:
  262. lst.append(self.make_task(
  263. url=url,
  264. groups=task['groups'],
  265. classify=self.visit_classify,
  266. weight=self.url_weight
  267. ))
  268. self.scheduler.insert_tasks(lst, level=self.url_weight)
  269. def load_engines(self):
  270. logger.info(f'[搜索引擎]初始化加载')
  271. max_workers = len(self._engines) # 根据搜索引擎最大数量设置最大线程池
  272. with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='SearchEngine') as executor:
  273. futures = []
  274. for engine in self._engines:
  275. f = executor.submit(self.enable_engine, engine)
  276. f.add_done_callback(err_details)
  277. futures.append(f)
  278. wait(futures)
  279. class VisitDomain(BasicSearch):
  280. def __init__(self, **kwargs):
  281. super(VisitDomain, self).__init__(**kwargs)
  282. def push_new_domain(self, task: Task):
  283. # 新源
  284. insert_one(MGO_DOMAIN, task)
  285. # 加入过滤器
  286. self.validator.add_url(task['domain'])
  287. # 加入去重库
  288. item = dict(
  289. domain=task['domain'],
  290. create_at=task['update_at']
  291. )
  292. insert_one(MGO_REMOVAL_DUPLICATE, item)
  293. logger.info(f"[录入新域]{task['domain']} - {task['name']}")
  294. def verify(self, task: Task):
  295. valid_words = self.validator.words(task['name'], task)
  296. if all([valid_words]):
  297. self.push_new_domain(task)
  298. else:
  299. if any([task['sensitive'], task['duplication']]):
  300. raise ValidatorError(f"特征检验未通过:{task['name']}")
  301. def search_domains(self):
  302. while True:
  303. _redis_key = REDIS_KEY + '-' + str(self.url_weight)
  304. tasks = self.scheduler.get_task(_redis_key)
  305. if len(tasks) == 0:
  306. logger.info('关闭寻源爬虫')
  307. break
  308. task_key, task = tasks
  309. task['update_at'] = int2long(int(time.time()))
  310. if task['classify'] != self.visit_classify:
  311. self.scheduler.insert_task(task, level=task['weight'])
  312. else:
  313. domain = extract_domain(task['url'])
  314. exists_domain = self.validator.url(domain)
  315. if exists_domain:
  316. continue
  317. logger.info(f'request web site -> {task["url"]}')
  318. response = self.downloader.get(task['url'])
  319. if response.status_code != 200 or response.text in ['', None]:
  320. continue
  321. response.encoding = response.apparent_encoding
  322. if response.encoding in SPECIAL_ENCODINGS:
  323. response.encoding = 'utf-8'
  324. task['domain'] = domain
  325. base_url = extract_base_url(task['url'])
  326. task['base_url'] = base_url
  327. page_source = response.text
  328. title = extract_page_title(page_source)
  329. task['name'] = title
  330. try:
  331. self.verify(task)
  332. urls = self.parser(page_source, base_url)
  333. new_tasks = []
  334. for url in urls:
  335. new_tasks.append(self.make_task(
  336. url=url,
  337. groups=task['groups'],
  338. classify=self.visit_classify,
  339. weight=task['weight']
  340. ))
  341. self.scheduler.insert_tasks(new_tasks, level=self.url_weight)
  342. except HostsRetrieveError:
  343. pass