spiders.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. import threading
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from typing import List, Mapping
  5. from common.databases import insert_one, int2long
  6. from common.execptions import HostsRetrieveError
  7. from common.log import logger
  8. from common.tools import delay_by
  9. from crawler.Task import Task
  10. from crawler.analysis import Parser
  11. from crawler.download import Downloader
  12. from crawler.retrieve import Validator
  13. from crawler.schedule import Scheduler
  14. from crawler.search import JySearchEngine
  15. from crawler.utils import (
  16. extract_base_url,
  17. extract_page_title,
  18. extract_domain,
  19. is_url,
  20. err_details,
  21. )
  22. from settings import (
  23. MGO_LUA_SPIDERS,
  24. MGO_SEARCH,
  25. MGO_DOMAIN,
  26. MGO_URLS,
  27. MGO_ORGS,
  28. MGO_KEYWORDS,
  29. MGO_COMPETING_GOODS,
  30. MGO_REMOVAL_DUPLICATE
  31. )
  32. class BasicSearch:
  33. def __init__(
  34. self,
  35. keyword_weight=9,
  36. url_weight=8,
  37. org_weight=7,
  38. scheduler=None,
  39. validator=None,
  40. downloader=None,
  41. parser=None,
  42. **kwargs
  43. ):
  44. self.scheduler = (scheduler or Scheduler())
  45. self.validator = (validator or Validator())
  46. self.downloader = (downloader or Downloader())
  47. self.parser = (parser or Parser())
  48. # mongo查询
  49. self.query = {'enable_added': {'$exists': False}}
  50. self.projection = {'name': 1}
  51. self.sort = [('_id', -1)]
  52. # 分类
  53. self.visit_classify = 'visit'
  54. self.query_classify = 'query'
  55. # 权重
  56. self.org_weight = org_weight
  57. self.url_weight = url_weight
  58. self.keyword_weight = keyword_weight
  59. self.retrieve_weight = 0
  60. # 归属组
  61. self.org_groups = 'organization'
  62. self.keyword_groups = 'keyword'
  63. self.url_groups = 'seed_url'
  64. self.competing_groups = 'competing_goods'
  65. @staticmethod
  66. def loops_interval(interval):
  67. t_name = threading.currentThread().getName()
  68. next_run_time = delay_by((interval or 300))
  69. logger.info(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
  70. time.sleep(interval)
  71. @staticmethod
  72. def make_task(**kwargs):
  73. """生成Task对象"""
  74. return Task(**kwargs)
  75. def seed_orgs(self) -> List[Mapping]:
  76. """组织|单位"""
  77. search_orgs = []
  78. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  79. for item in cursor.sort(self.sort):
  80. search_orgs.append(item)
  81. return search_orgs
  82. def seed_keywords(self):
  83. """关键词"""
  84. search_keywords = []
  85. cursor = MGO_KEYWORDS.find(projection=self.projection)
  86. for item in cursor.sort(self.sort):
  87. search_keywords.append(item['name'])
  88. return search_keywords
  89. def seed_urls(self) -> List[Mapping]:
  90. """种子urls"""
  91. search_urls = []
  92. cursor = MGO_URLS.find(self.query, projection=self.projection)
  93. for item in cursor.sort(self.sort):
  94. search_urls.append(item)
  95. return search_urls
  96. def seed_competing_goods(self):
  97. """竞品urls"""
  98. competing_goods = []
  99. cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
  100. for item in cursor.sort(self.sort):
  101. competing_goods.append(item)
  102. return competing_goods
  103. def lua_common_domains(self):
  104. """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
  105. parm_commons = []
  106. projection = {'param_common': 1}
  107. cursor = MGO_LUA_SPIDERS.find(projection=projection)
  108. for item in cursor.sort(self.sort):
  109. name = item['param_common'][1]
  110. try:
  111. url = item['param_common'][11]
  112. if not is_url(url):
  113. continue
  114. domain = extract_domain(url)
  115. except IndexError:
  116. continue
  117. if not self.validator.url(domain):
  118. parm_commons.append({'name': name, 'domain': domain})
  119. self.validator.add_url(domain)
  120. return parm_commons
  121. def push_data(self, purpose: str, task: Task, collection):
  122. if purpose == 'save':
  123. insert_one(collection, self.make_retrieve_item(task))
  124. else:
  125. insert_one(collection, self.make_duplicate_removal(task))
  126. @staticmethod
  127. def make_retrieve_item(task: Task):
  128. item = {
  129. 'name': task['name'],
  130. 'url': task['url'],
  131. 'domain': task['domain'],
  132. 'origin': task['origin'],
  133. 'groups': task['groups'],
  134. 'create_at': task['create_at'],
  135. 'update_at': task['update_at'],
  136. }
  137. return item
  138. @staticmethod
  139. def make_duplicate_removal(task: Task):
  140. item = {
  141. 'domain': task['domain'],
  142. 'url': task['url'],
  143. 'create_at': task['update_at'],
  144. }
  145. return item
  146. class SyncData(BasicSearch):
  147. def __init__(self, allow_load_filter=False, **kwargs):
  148. super(SyncData, self).__init__(**kwargs)
  149. self._init(allow_load_filter)
  150. def _init(self, allow_load_filter=False):
  151. threading.Thread(
  152. target=self.sync_data,
  153. name='SyncData'
  154. ).start()
  155. if allow_load_filter:
  156. self.validator.load_filter()
  157. def sync_data_keywords(self):
  158. """同步关键词数据"""
  159. logger.info(f'[同步数据]加载关键词')
  160. words = self.seed_keywords()
  161. # 处理关键词格式并推送到任务队列
  162. words = [str(word).replace('&nbsp;', '').strip() for word in words]
  163. lst = []
  164. for word in words:
  165. lst.append(self.make_task(
  166. search=word,
  167. origin=word,
  168. groups=self.keyword_groups,
  169. classify=self.query_classify,
  170. weight=self.keyword_weight
  171. ))
  172. self.scheduler.add_query(lst, level=self.keyword_weight)
  173. logger.info(f'[同步数据]更新{len(words)}条关键词')
  174. def sync_data_orgs(self):
  175. """同步组织单位数据"""
  176. logger.info(f'[同步数据]加载单位组织数据')
  177. items = self.seed_orgs()
  178. # 处理单位组织名称并推送到任务队列
  179. orgs = []
  180. for item in items:
  181. name = item.get('name')
  182. if name in ['', None]:
  183. logger.warning(f'[异常的单位组织]{item}')
  184. continue
  185. word = str(name).replace('&nbsp;', '').strip()
  186. orgs.append(word)
  187. lst = []
  188. for word in orgs:
  189. lst.append(self.make_task(
  190. search=word,
  191. origin=word,
  192. groups=self.org_groups,
  193. classify=self.query_classify,
  194. weight=self.org_weight
  195. ))
  196. self.scheduler.add_query(lst, level=self.org_weight)
  197. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  198. for item in items:
  199. MGO_ORGS.update_one(
  200. {'_id': item['_id']},
  201. {'$set': {'enable_added': True}}
  202. )
  203. logger.info(f'[同步数据]更新{len(items)}个单位组织')
  204. def sync_data_urls(self):
  205. """同步网址数据"""
  206. logger.info(f'[同步数据]加载种子url列表')
  207. items = self.seed_urls()
  208. lst = []
  209. for item in items:
  210. if not is_url(item['name']):
  211. items.remove(item)
  212. continue
  213. exists_url = self.validator.url(item['name'])
  214. if exists_url:
  215. items.remove(item)
  216. continue
  217. lst.append(self.make_task(
  218. url=item['name'],
  219. origin=item['name'],
  220. groups=self.url_groups,
  221. classify=self.visit_classify,
  222. weight=self.url_weight
  223. ))
  224. self.scheduler.add_excavate(lst, level=self.url_weight)
  225. for item in items:
  226. MGO_URLS.update_many(
  227. {'_id': item['_id']},
  228. {'$set': {'enable_added': True}}
  229. )
  230. logger.info(f'[同步数据]更新{len(items)}条网址数据')
  231. def sync_data_competing_goods(self):
  232. """同步竞品urls"""
  233. logger.info(f'[同步数据]加载竞品url列表')
  234. items = self.seed_competing_goods()
  235. # 处理竞品urls并推送到任务队列
  236. lst = []
  237. for item in items:
  238. if not is_url(item['name']):
  239. items.remove(item)
  240. continue
  241. exists_url = self.validator.url(item['name'])
  242. if exists_url:
  243. items.remove(item)
  244. continue
  245. lst.append(self.make_task(
  246. url=item['name'],
  247. origin=item['name'],
  248. groups=self.competing_groups,
  249. classify=self.visit_classify,
  250. weight=self.url_weight))
  251. self.scheduler.add_excavate(lst, level=self.url_weight)
  252. # 更新已推送竞品urls状态
  253. for item in items:
  254. MGO_COMPETING_GOODS.update_one(
  255. {'_id': item['_id']},
  256. {'$set': {'enable_added': True}}
  257. )
  258. logger.info(f'[同步数据]更新{len(items)}条竞品挖掘url')
  259. def sync_lua_commons(self):
  260. """同步lua采集爬虫中网址与网址名称"""
  261. logger.info(f'[同步数据]加载lua_commons数据')
  262. items = self.lua_common_domains()
  263. for item in items:
  264. item['create_at'] = int2long(int(time.time()))
  265. MGO_REMOVAL_DUPLICATE.insert_one(item)
  266. logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
  267. def sync_data(self):
  268. """同步数据"""
  269. logger.info(f'[同步数据]初始化加载')
  270. _interval = 600
  271. while True:
  272. try:
  273. self.sync_lua_commons()
  274. self.sync_data_keywords()
  275. self.sync_data_orgs()
  276. self.sync_data_competing_goods()
  277. self.sync_data_urls()
  278. except Exception as e:
  279. logger.exception(e)
  280. self.loops_interval(_interval)
  281. class SearchEngine(BasicSearch):
  282. def __init__(
  283. self,
  284. engines=None,
  285. max_search_page=1,
  286. loop_search_interval=60,
  287. **kwargs
  288. ):
  289. super(SearchEngine, self).__init__(**kwargs)
  290. self._max_pages = max_search_page
  291. self._interval = loop_search_interval
  292. self._engines = []
  293. self.set_engines(engines)
  294. def _init(self):
  295. self.set_engines(self._engines)
  296. def _set_engine(self, engine):
  297. if isinstance(engine, JySearchEngine):
  298. self._engines.append(engine)
  299. logger.info(f'[搜索引擎]添加<{engine.__class__.__name__}>完成')
  300. return self
  301. def set_engines(self, engines):
  302. if isinstance(engines, list):
  303. for engine in engines:
  304. self._set_engine(engine)
  305. else:
  306. self._set_engine(engines)
  307. return self
  308. def search(self, engine):
  309. ename = engine.__class__.__name__
  310. threading.currentThread().setName(ename)
  311. logger.info(f'[搜索引擎]启动 - <{ename}>')
  312. while True:
  313. tasks = self.scheduler.get_query_task()
  314. if len(tasks) == 0:
  315. self.loops_interval(self._interval)
  316. continue
  317. task_key, task = tasks
  318. word = task['search']
  319. task['update_at'] = int2long(int(time.time()))
  320. if task['groups'] == self.org_groups:
  321. '''使用企查查服务检索site'''
  322. logger.info(f"<QccSearch> {task['groups']} >>> {word}")
  323. try:
  324. url = engine.by_org_get_site(task['search'])
  325. task['url'] = extract_base_url(url)
  326. task['name'] = word
  327. task['domain'] = extract_domain(task['url'])
  328. '''保存数据'''
  329. self.push_data('save', task, MGO_SEARCH)
  330. if not is_url(task['url']):
  331. continue
  332. if self.validator.url(task['domain']):
  333. continue
  334. '''domain - 添加过滤器'''
  335. self.validator.add_url(task['domain'])
  336. '''推送数据挖掘队列'''
  337. task['classify'] = self.visit_classify
  338. self.scheduler.add_excavate(task, level=task['weight'])
  339. except HostsRetrieveError as e:
  340. logger.exception(e)
  341. else:
  342. '''使用搜索引擎查询关键词'''
  343. logger.info(f"<{ename}> {task['groups']} >>> {word}")
  344. cur_page = 0
  345. while cur_page < self._max_pages:
  346. cur_page += 1
  347. '''检索文本'''
  348. lst = []
  349. urls = engine.search(word, cur_page)
  350. '''生成数据挖掘任务'''
  351. for url in urls:
  352. domain = extract_domain(url)
  353. if self.validator.url(domain):
  354. continue
  355. lst.append(self.make_task(
  356. url=extract_base_url(url),
  357. origin=task['origin'],
  358. groups=task['groups'],
  359. classify=self.visit_classify,
  360. weight=task['weight'],
  361. ))
  362. '''推送数据挖掘队列'''
  363. self.scheduler.add_excavate(lst, level=task['weight'])
  364. logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
  365. def start(self):
  366. if len(self._engines) > 0:
  367. logger.info(f'[搜索引擎]初始化加载')
  368. # 根据搜索引擎最大数量设置最大线程池
  369. max_workers = len(self._engines)
  370. with ThreadPoolExecutor(max_workers, 'SearchEngine') as executor:
  371. futures = []
  372. for engine in self._engines:
  373. f = executor.submit(self.search, engine)
  374. f.add_done_callback(err_details)
  375. futures.append(f)
  376. wait(futures)
  377. class DataExcavate(BasicSearch):
  378. def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
  379. super(DataExcavate, self).__init__(**kwargs)
  380. self._interval = loop_excavate_interval
  381. self._workers = excavate_workers
  382. def save(self, task: Task):
  383. """保存数据挖掘符合要求的站点"""
  384. self.push_data('save', task, MGO_DOMAIN)
  385. '''url添加去重库'''
  386. self.push_data('remove', task, MGO_REMOVAL_DUPLICATE)
  387. logger.info(f"[上传数据]{task['name']} - {task['domain']}")
  388. def retrieve_site(self, task: Task):
  389. if self.validator.url(task['url']):
  390. return
  391. logger.info(f'request host -> {task["url"]}')
  392. response = self.downloader.get(task['url'])
  393. if response.status_code != 200 or response.text in ['', None]:
  394. return
  395. task['domain'] = extract_domain(task['url'])
  396. page_source = response.text
  397. title = extract_page_title(page_source)
  398. task['name'] = title
  399. base_url = extract_base_url(task['url'])
  400. task['base_url'] = base_url
  401. items = self.parser.site_items(page_source, base_url)
  402. lst = []
  403. _c = 0 # 页面包含的关键词计数器
  404. for item in items:
  405. name, url = item['name'], item['host']
  406. if self.validator.requirement_word(name):
  407. lst.append(self.make_task(
  408. url=url,
  409. name=name,
  410. groups=task['groups'],
  411. classify=self.visit_classify,
  412. weight=task['weight']
  413. ))
  414. _c += 1
  415. if _c > 1:
  416. self.save(task)
  417. self.scheduler.add_excavate(lst, level=task['weight'])
  418. '''domain - 添加过滤器'''
  419. self.validator.add_url(task['domain'])
  420. '''url - 添加过滤器'''
  421. self.validator.add_url(task['url'])
  422. def excavate(self):
  423. t_name = threading.currentThread().getName()
  424. logger.info(f'[数据挖掘]启动 - {t_name}')
  425. while True:
  426. tasks = self.scheduler.get_excavate_task()
  427. if len(tasks) == 0:
  428. self.loops_interval(self._interval)
  429. continue
  430. task_key, task = tasks
  431. task['update_at'] = int2long(int(time.time()))
  432. self.retrieve_site(task)
  433. def start(self):
  434. logger.info(f'[数据挖掘]初始化加载')
  435. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  436. futures = []
  437. for _ in range(1, self._workers + 1):
  438. f = executor.submit(self.excavate)
  439. f.add_done_callback(err_details)
  440. futures.append(f)
  441. wait(futures)