spiders.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. import threading
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from typing import List, Mapping
  5. from common.databases import insert_one, int2long
  6. from common.execptions import HostsRetrieveError
  7. from common.log import logger
  8. from common.tools import delay_by
  9. from crawler.Task import Task
  10. from crawler.analysis import Parser
  11. from crawler.download import Downloader
  12. from crawler.retrieve import Validator
  13. from crawler.schedule import Scheduler
  14. from crawler.search import JySearchEngine
  15. from crawler.utils import (
  16. extract_base_url,
  17. extract_page_title,
  18. extract_domain,
  19. is_url,
  20. err_details,
  21. )
  22. from settings import (
  23. REDIS_KEY,
  24. MGO_LUA_SPIDERS,
  25. MGO_SEARCH,
  26. MGO_DOMAIN,
  27. MGO_URLS,
  28. MGO_ORGS,
  29. MGO_KEYWORDS,
  30. MGO_COMPETING_GOODS,
  31. MGO_REMOVAL_DUPLICATE
  32. )
  33. class BasicSearch:
  34. def __init__(
  35. self,
  36. keyword_weight=9,
  37. url_weight=8,
  38. org_weight=7,
  39. scheduler=None,
  40. validator=None,
  41. downloader=None,
  42. parser=None,
  43. **kwargs
  44. ):
  45. self.scheduler = (scheduler or Scheduler())
  46. self.validator = (validator or Validator())
  47. self.downloader = (downloader or Downloader())
  48. self.parser = (parser or Parser())
  49. # mongo查询
  50. self.query = {'enable_added': {'$exists': False}}
  51. self.projection = {'name': 1}
  52. self.sort = [('_id', -1)]
  53. # 分类
  54. self.visit_classify = 'visit'
  55. self.query_classify = 'query'
  56. # 权重
  57. self.org_weight = org_weight
  58. self.url_weight = url_weight
  59. self.keyword_weight = keyword_weight
  60. self.retrieve_weight = 0
  61. # 归属组
  62. self.org_groups = 'organization'
  63. self.keyword_groups = 'keyword'
  64. self.url_groups = 'seed_url'
  65. self.competing_groups = 'competing_goods'
  66. @staticmethod
  67. def loops_interval(interval):
  68. t_name = threading.currentThread().getName()
  69. next_run_time = delay_by((interval or 300))
  70. logger.info(f'线程运行结束:<{t_name}>,下次运行时间:{next_run_time}')
  71. time.sleep(interval)
  72. @staticmethod
  73. def make_task(**kwargs):
  74. """生成Task对象"""
  75. return Task(**kwargs)
  76. def seed_orgs(self) -> List[Mapping]:
  77. """组织|单位"""
  78. search_orgs = []
  79. cursor = MGO_ORGS.find(self.query, projection=self.projection)
  80. for item in cursor.sort(self.sort):
  81. search_orgs.append(item)
  82. return search_orgs
  83. def seed_keywords(self):
  84. """关键词"""
  85. search_keywords = []
  86. cursor = MGO_KEYWORDS.find(projection=self.projection)
  87. for item in cursor.sort(self.sort):
  88. search_keywords.append(item['name'])
  89. return search_keywords
  90. def seed_urls(self) -> List[Mapping]:
  91. """种子urls"""
  92. search_urls = []
  93. cursor = MGO_URLS.find(self.query, projection=self.projection)
  94. for item in cursor.sort(self.sort):
  95. search_urls.append(item)
  96. return search_urls
  97. def seed_competing_goods(self):
  98. """竞品urls"""
  99. competing_goods = []
  100. cursor = MGO_COMPETING_GOODS.find(self.query, projection=self.projection)
  101. for item in cursor.sort(self.sort):
  102. competing_goods.append(item)
  103. return competing_goods
  104. def lua_common_domains(self):
  105. """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库"""
  106. parm_commons = []
  107. projection = {'param_common': 1}
  108. cursor = MGO_LUA_SPIDERS.find(projection=projection)
  109. for item in cursor.sort(self.sort):
  110. name = item['param_common'][1]
  111. try:
  112. url = item['param_common'][11]
  113. if not is_url(url):
  114. continue
  115. domain = extract_domain(url)
  116. except IndexError:
  117. continue
  118. if not self.validator.url(domain):
  119. parm_commons.append({'name': name, 'domain': domain})
  120. self.validator.add_url(domain)
  121. return parm_commons
  122. def push_data(self, purpose: str, task: Task, collection):
  123. if purpose == 'save':
  124. insert_one(collection, self.make_retrieve_item(task))
  125. else:
  126. insert_one(collection, self.make_duplicate_removal(task))
  127. @staticmethod
  128. def make_retrieve_item(task: Task):
  129. item = {
  130. 'name': task['name'],
  131. 'url': task['url'],
  132. 'domain': task['domain'],
  133. 'origin': task['origin'],
  134. 'groups': task['groups'],
  135. 'create_at': task['create_at'],
  136. 'update_at': task['update_at'],
  137. }
  138. return item
  139. @staticmethod
  140. def make_duplicate_removal(task: Task):
  141. item = {
  142. 'domain': task['domain'],
  143. 'url': task['url'],
  144. 'create_at': task['update_at'],
  145. }
  146. return item
  147. class SyncData(BasicSearch):
  148. def __init__(self, allow_load_filter=False, **kwargs):
  149. super(SyncData, self).__init__(**kwargs)
  150. self._init(allow_load_filter)
  151. def _init(self, allow_load_filter=False):
  152. threading.Thread(
  153. target=self.sync_data,
  154. name='SyncData'
  155. ).start()
  156. if allow_load_filter:
  157. self.validator.load_filter()
  158. def sync_data_urls(self):
  159. """同步网址数据"""
  160. logger.info(f'[同步数据]加载种子网址')
  161. items = self.seed_urls()
  162. lst = []
  163. for item in items:
  164. if not is_url(item['name']):
  165. items.remove(item)
  166. continue
  167. exists_url = self.validator.url(item['name'])
  168. if exists_url:
  169. items.remove(item)
  170. continue
  171. lst.append(self.make_task(
  172. url=item['name'],
  173. origin=item['name'],
  174. groups=self.url_groups,
  175. classify=self.visit_classify,
  176. weight=self.url_weight
  177. ))
  178. self.scheduler.insert_tasks(lst, level=self.url_weight)
  179. for item in items:
  180. MGO_URLS.update_many(
  181. {'_id': item['_id']},
  182. {'$set': {'enable_added': True}}
  183. )
  184. logger.info(f'[同步数据]更新{len(items)}条网址数据')
  185. def sync_data_keywords(self):
  186. """同步关键词数据"""
  187. logger.info(f'[同步数据]加载关键词')
  188. words = self.seed_keywords()
  189. # 处理关键词格式并推送到任务队列
  190. words = [str(word).replace('&nbsp;', '').strip() for word in words]
  191. lst = []
  192. for word in words:
  193. lst.append(self.make_task(
  194. search=word,
  195. origin=word,
  196. groups=self.keyword_groups,
  197. classify=self.query_classify,
  198. weight=self.keyword_weight
  199. ))
  200. self.scheduler.insert_tasks(lst, level=self.keyword_weight)
  201. logger.info(f'[同步数据]更新{len(words)}条关键词')
  202. def sync_data_orgs(self):
  203. """同步组织单位数据"""
  204. logger.info(f'[同步数据]加载单位组织数据')
  205. items = self.seed_orgs()
  206. # 处理单位组织名称并推送到任务队列
  207. orgs = []
  208. for item in items:
  209. name = item.get('name')
  210. if name in ['', None]:
  211. logger.warning(f'[异常的单位组织]{item}')
  212. continue
  213. word = str(name).replace('&nbsp;', '').strip()
  214. orgs.append(word)
  215. lst = []
  216. for word in orgs:
  217. lst.append(self.make_task(
  218. search=word,
  219. origin=word,
  220. groups=self.org_groups,
  221. classify=self.query_classify,
  222. weight=self.org_weight
  223. ))
  224. self.scheduler.insert_tasks(lst, level=self.org_weight)
  225. # 已添加的组织单位名称进行标记,之后不在推送到任务队列
  226. for item in items:
  227. MGO_ORGS.update_one(
  228. {'_id': item['_id']},
  229. {'$set': {'enable_added': True}}
  230. )
  231. logger.info(f'[同步数据]更新{len(items)}个单位组织')
  232. def sync_data_competing_goods(self):
  233. """同步竞品urls"""
  234. logger.info(f'[同步数据]加载竞品列表数据')
  235. items = self.seed_competing_goods()
  236. # 处理竞品urls并推送到任务队列
  237. lst = []
  238. for item in items:
  239. if not is_url(item['name']):
  240. items.remove(item)
  241. continue
  242. exists_url = self.validator.url(item['name'])
  243. if exists_url:
  244. items.remove(item)
  245. continue
  246. lst.append(self.make_task(
  247. url=item['name'],
  248. origin=item['name'],
  249. groups=self.competing_groups,
  250. classify=self.visit_classify,
  251. weight=self.url_weight))
  252. self.scheduler.insert_tasks(lst, level=self.url_weight)
  253. # 更新已推送竞品urls状态
  254. for item in items:
  255. MGO_COMPETING_GOODS.update_one(
  256. {'_id': item['_id']},
  257. {'$set': {'enable_added': True}}
  258. )
  259. logger.info(f'[同步数据]更新{len(items)}条竞品源网址')
  260. def sync_lua_commons(self):
  261. """同步lua采集爬虫中网址与网址名称"""
  262. logger.info(f'[同步数据]加载lua_commons数据')
  263. items = self.lua_common_domains()
  264. for item in items:
  265. item['create_at'] = int2long(int(time.time()))
  266. MGO_REMOVAL_DUPLICATE.insert_one(item)
  267. logger.info(f'[同步数据]更新{len(items)}个网站域名数据')
  268. def sync_data(self):
  269. """同步数据"""
  270. logger.info(f'[同步数据]初始化加载')
  271. _interval = 600
  272. while True:
  273. try:
  274. self.sync_lua_commons()
  275. self.sync_data_keywords()
  276. self.sync_data_orgs()
  277. self.sync_data_competing_goods()
  278. self.sync_data_urls()
  279. except Exception as e:
  280. logger.exception(e)
  281. self.loops_interval(_interval)
  282. class SearchEngine(BasicSearch):
  283. def __init__(
  284. self,
  285. engines=None,
  286. max_search_page=1,
  287. loop_search_interval=60,
  288. **kwargs
  289. ):
  290. super(SearchEngine, self).__init__(**kwargs)
  291. self._max_pages = max_search_page
  292. self._interval = loop_search_interval
  293. self._engines = []
  294. self.set_engines(engines)
  295. def _init(self):
  296. self.set_engines(self._engines)
  297. def _set_engine(self, engine):
  298. if isinstance(engine, JySearchEngine):
  299. self._engines.append(engine)
  300. logger.info(f'[搜索引擎]添加<{engine.__class__.__name__}>完成')
  301. return self
  302. def set_engines(self, engines):
  303. if isinstance(engines, list):
  304. for engine in engines:
  305. self._set_engine(engine)
  306. else:
  307. self._set_engine(engines)
  308. return self
  309. def search(self, engine):
  310. ename = engine.__class__.__name__
  311. threading.currentThread().setName(ename)
  312. logger.info(f'[搜索引擎]启动 - <{ename}>')
  313. while True:
  314. tasks = self.scheduler.get_task()
  315. if len(tasks) == 0:
  316. self.loops_interval(self._interval)
  317. continue
  318. task_key, task = tasks
  319. word = task['search']
  320. task['update_at'] = int2long(int(time.time()))
  321. if task['classify'] == self.visit_classify:
  322. self.scheduler.insert_tasks(task, level=task['weight'])
  323. elif task['groups'] == self.org_groups:
  324. '''使用企查查服务检索site'''
  325. logger.info(f"<QccSearch> {task['groups']} >>> {word}")
  326. try:
  327. url = engine.by_org_get_site(task['search'])
  328. task['url'] = url
  329. task['name'] = word
  330. task['domain'] = extract_domain(task['url'])
  331. '''保存数据'''
  332. self.push_data('save', task, MGO_SEARCH)
  333. if not is_url(url):
  334. continue
  335. if self.validator.url(task['domain']):
  336. continue
  337. '''domain - 添加过滤器'''
  338. self.validator.add_url(task['domain'])
  339. '''推送数据挖掘队列'''
  340. task['classify'] = self.visit_classify
  341. task['weight'] = self.url_weight
  342. self.scheduler.insert_tasks(task, level=self.url_weight)
  343. except HostsRetrieveError as e:
  344. logger.exception(e)
  345. else:
  346. '''使用搜索引擎查询关键词'''
  347. logger.info(f"<{ename}> {task['groups']} >>> {word}")
  348. cur_page = 0
  349. while cur_page < self._max_pages:
  350. cur_page += 1
  351. '''检索文本'''
  352. lst = []
  353. urls = engine.search(word, cur_page)
  354. '''生成数据挖掘任务'''
  355. for url in urls:
  356. domain = extract_domain(url)
  357. if self.validator.url(domain):
  358. continue
  359. lst.append(self.make_task(
  360. url=extract_base_url(url),
  361. origin=task['origin'],
  362. groups=task['groups'],
  363. classify=self.visit_classify,
  364. weight=self.url_weight,
  365. ))
  366. '''推送数据挖掘队列'''
  367. self.scheduler.insert_tasks(lst, level=self.url_weight)
  368. logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
  369. def start(self):
  370. if len(self._engines) > 0:
  371. logger.info(f'[搜索引擎]初始化加载')
  372. # 根据搜索引擎最大数量设置最大线程池
  373. max_workers = len(self._engines)
  374. with ThreadPoolExecutor(max_workers, 'SearchEngine') as executor:
  375. futures = []
  376. for engine in self._engines:
  377. f = executor.submit(self.search, engine)
  378. f.add_done_callback(err_details)
  379. futures.append(f)
  380. wait(futures)
  381. class DataExcavate(BasicSearch):
  382. def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs):
  383. super(DataExcavate, self).__init__(**kwargs)
  384. self._interval = loop_excavate_interval
  385. self._workers = excavate_workers
  386. def save(self, task: Task):
  387. """保存数据挖掘符合要求的站点"""
  388. self.push_data('save', task, MGO_DOMAIN)
  389. '''url添加去重库'''
  390. self.push_data('remove', task, MGO_REMOVAL_DUPLICATE)
  391. logger.info(f"[上传数据]{task['name']} - {task['domain']}")
  392. def retrieve_site(self, task: Task):
  393. if self.validator.url(task['url']):
  394. return
  395. logger.info(f'request host -> {task["url"]}')
  396. response = self.downloader.get(task['url'])
  397. if response.status_code != 200 or response.text in ['', None]:
  398. return
  399. task['domain'] = extract_domain(task['url'])
  400. page_source = response.text
  401. title = extract_page_title(page_source)
  402. task['name'] = title
  403. base_url = extract_base_url(task['url'])
  404. task['base_url'] = base_url
  405. items = self.parser.site_items(page_source, base_url)
  406. lst = []
  407. _c = 0 # 页面包含的关键词计数器
  408. for item in items:
  409. name, url = item['name'], item['host']
  410. if self.validator.requirement_word(name):
  411. lst.append(self.make_task(
  412. url=url,
  413. name=name,
  414. groups=task['groups'],
  415. classify=self.visit_classify,
  416. weight=task['weight']
  417. ))
  418. _c += 1
  419. if _c > 1:
  420. self.save(task)
  421. self.scheduler.insert_tasks(lst, level=self.url_weight)
  422. '''domain - 添加过滤器'''
  423. self.validator.add_url(task['domain'])
  424. '''url - 添加过滤器'''
  425. self.validator.add_url(task['url'])
  426. def excavate(self):
  427. t_name = threading.currentThread().getName()
  428. logger.info(f'[数据挖掘]启动 - {t_name}')
  429. while True:
  430. _redis_key = REDIS_KEY + '-' + str(self.url_weight)
  431. tasks = self.scheduler.get_task(_redis_key)
  432. if len(tasks) == 0:
  433. self.loops_interval(self._interval)
  434. continue
  435. task_key, task = tasks
  436. task['update_at'] = int2long(int(time.time()))
  437. self.retrieve_site(task)
  438. def start(self):
  439. logger.info(f'[数据挖掘]初始化加载')
  440. with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
  441. futures = []
  442. for _ in range(1, self._workers + 1):
  443. f = executor.submit(self.excavate)
  444. f.add_done_callback(err_details)
  445. futures.append(f)
  446. wait(futures)