dongzhaorui@topnet.net.cn 3 年之前
父節點
當前提交
ccc0ae2e94
共有 3 個文件被更改,包括 118 次插入98 次删除
  1. 102 78
      ybw/crawler/crawl_scheduler.py
  2. 13 15
      ybw/detail_spider.py
  3. 3 5
      ybw/list_spider.py

+ 102 - 78
ybw/crawler/crawl_scheduler.py

@@ -7,19 +7,21 @@ from datetime import date, timedelta
 import requests
 
 from crawler.login import User
-from utils.databases import MongoDBS
+from utils.databases import mongo_table, int2long, object_id
 from utils.execptions import JyBasicException
 from utils.log import logger
-from utils.tools import int2long, object_id
+from utils.tools import get_host_ip
 
 
 class Scheduler:
 
     def __init__(self, query: dict):
         self.query = query
-        self.crawl_account_tab = MongoDBS('py_spider', 'match_account').coll
-        self.crawl_error_tab = MongoDBS('py_spider', 'crawl_error').coll
-        self.crawl_start = False
+        self.account_tab = mongo_table('py_spider', 'match_account')
+        self.error_tab = mongo_table('py_spider', 'crawl_error')
+        self.crawl_start = False  # 控制调度的状态
+        self.count = None  # 日采集数量
+        self.total = None  # 日采集上限
         self.account_id = None
         self.user = None
         self.spider_code = None
@@ -27,7 +29,60 @@ class Scheduler:
         self.crawl_params = None
         self.crawl_exception = None
         self.crawl_type = None
-        self.__records = None
+
+    @property
+    def today(self):
+        return datetime.datetime.today().strftime('%Y-%m-%d')
+
+    @property
+    def yesterday(self):
+        return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
+
+    @property
+    def crawl_task(self):
+        results = {}
+        url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler'
+        # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler'
+        try:
+            response = requests.get(url, timeout=10)
+            if response.status_code == 200:
+                data = response.json()['data']
+                if len(data) > 0:
+                    results['_id'] = object_id(data['_id'])
+                    for key, val in data.items():
+                        if key != '_id':
+                            results[key] = val
+            return results
+        except requests.RequestException:
+            return results
+
+    def finished(self, execute_next_time=None):
+        logger.info("任务结束")
+        self._release_account()
+        self.sleep(execute_next_time)
+
+    def err_record(self, e: JyBasicException):
+        rows = {
+            'account': self.user.phone if self.user is not None else '',
+            'spidercode': self.spider_code,
+            'url': self.crawl_url,
+            'status_code': e.code,
+            'reason': e.reason,
+            'params': getattr(e, 'title', ''),
+            'crawl_time': int2long(int(time.time())),
+            'crawl_type': self.crawl_type,
+        }
+        self.error_tab.insert_one(rows)
+
+    @staticmethod
+    def wait_for_next_task(wait_time=None):
+        _sleep = (wait_time or random.choice(range(5, 11)))
+        time.sleep(_sleep)
+
+    @staticmethod
+    def sleep(wait_time=None):
+        sleep_time = (wait_time or 600)
+        time.sleep(sleep_time)
 
     def _release_account(self):
         rows = dict(
@@ -35,31 +90,54 @@ class Scheduler:
             update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
         )
         if self.account_id is not None:
-            self.crawl_account_tab.update_one(
-                {'_id': self.account_id},
-                {'$set': rows}
-            )
+            self._update_data(rows)
+
+    def _update_data(self, item):
+        """
+        更新账号所属的采集数据信息
+
+        :param item: 最新数据
+        """
+        item['ip'] = get_host_ip()
+        item['update_time'] = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
+        self.account_tab.update_one(
+            {'_id': self.account_id},
+            {'$set': item}
+        )
+
+    def update_count(self, number):
+        rows = self.account_tab.find_one({'_id': self.account_id})
+        records = rows.get('records', {self.today: 0})
+        '''采集记录历史保存7天'''
+        count = records.get(self.today, 0)
+        count += number
+        self.count = count
+        if len(records) > 7:
+            records.clear()
+            records.setdefault(self.today, count)
+        else:
+            records.update({self.today: count})
+        rows.update({'records': records})
+        self._update_data(rows)
 
     def __enter__(self):
-        # 取出一个空闲并且使用次数较少的账号
-        rows = self.crawl_account_tab.find_one(self.query, sort=[('usage', 1)])
+        logger.info(f'[开启调度]')
+        '''获取一个闲置时间较久的账号'''
+        rows = self.account_tab.find_one(self.query, sort=[('update_time', 1)])
         if rows is not None:
             self.account_id = rows['_id']
             self.user = User(rows['account'], rows['password'])
-            logger.info(f'[开启调度]启用账号: {self.user.phone}')
-            usage = int(rows['usage'])
-            rows['usage'] = usage + 1
-            use_time = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
-            rows['update_time'] = use_time
+            logger.info(f'[启用账号]{self.user.phone}')
             rows['used'] = True
-            self.crawl_account_tab.update_one(
-                {'_id': self.account_id},
-                {'$set': rows}
-            )
-            self.crawl_start = True  # 控制调度的状态
+            '''初始化采集数据记录'''
+            records = rows.get('records', {self.today: 0})
+            rows.update({'records': records})
+            self._update_data(rows)
+            self.total = rows['total']
+            self.count = records.get(self.today, 0)
+            self.crawl_start = True
         else:
-            # TODO 没有空闲账号时,取出使用次数最少的账号,暂未实现
-            logger.warning(f'请检查mongo表 {self.crawl_account_tab.name} 账号状态')
+            logger.warning(f'[{self.query.get("site")}采集]暂无闲置账号')
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
@@ -77,57 +155,3 @@ class Scheduler:
             self.err_record(e)
             logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
         return True
-
-    def finished(self, execute_next_time=None):
-        logger.info("任务完成")
-        self._release_account()
-        self.sleep(execute_next_time)
-
-    @staticmethod
-    def wait_for_next_task(wait_time=None):
-        _sleep = (wait_time or random.choice(range(5, 15)))
-        time.sleep(_sleep)
-
-    @staticmethod
-    def sleep(wait_time=None):
-        sleep_time = (wait_time or 600)
-        time.sleep(sleep_time)
-
-    @property
-    def today(self):
-        return datetime.datetime.today().strftime('%Y-%m-%d')
-
-    @property
-    def yesterday(self):
-        return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
-
-    def err_record(self, e: JyBasicException):
-        rows = {
-            'account': self.user.phone if self.user is not None else '',
-            'spidercode': self.spider_code,
-            'url': self.crawl_url,
-            'status_code': e.code,
-            'reason': e.reason,
-            'params': getattr(e, 'title', ''),
-            'crawl_time': int2long(int(time.time())),
-            'crawl_type': self.crawl_type,
-        }
-        self.crawl_error_tab.insert_one(rows)
-
-    @property
-    def crawl_task(self):
-        results = {}
-        url = 'http://cc.spdata.jianyu360.com/schedule/crawl_task/ybw_scheduler'
-        # url = 'http://127.0.0.1:1405/schedule/crawl_task/ybw_scheduler'
-        try:
-            response = requests.get(url, timeout=10)
-            if response.status_code == 200:
-                data = response.json()['data']
-                if len(data) > 0:
-                    results['_id'] = object_id(data['_id'])
-                    for key, val in data.items():
-                        if key != '_id':
-                            results[key] = val
-            return results
-        except requests.RequestException:
-            return results

+ 13 - 15
ybw/detail_spider.py

@@ -6,15 +6,13 @@ from lxml.html.clean import Cleaner
 from pymongo.errors import DuplicateKeyError
 
 from crawler.check_utils import CheckText, CheckTask
-from crawler.clean_html import clean_html
-from crawler.crawl_record import update_records, load_records
+from crawler.clean_html import cleaner
 from crawler.crawl_scheduler import Scheduler
 from crawler.login import login, load_login_cookies, login_check
-from utils.databases import MongoDBS
+from utils.databases import mongo_table, int2long
 from utils.execptions import VoidCrawlError, JyBasicException
 from utils.log import logger
 from utils.socks5 import Proxy
-from utils.tools import int2long
 
 
 def iter_node(element: HtmlElement):
@@ -75,11 +73,9 @@ class DetailSpider:
             db: str,
             crawl_tab: str,
             save_tab: str,
-            crawl_total=None,
     ):
-        self.crawl_tab = MongoDBS(db, crawl_tab).coll
-        self.save_tab = MongoDBS(db, save_tab).coll
-        self.crawl_total = crawl_total or 6000
+        self.crawl_tab = mongo_table(db, crawl_tab)
+        self.save_tab = mongo_table(db, save_tab)
         self.user = None
 
     def crawl_request(self, url):
@@ -166,7 +162,7 @@ class DetailSpider:
         '''检查文本内容'''
         CheckText(html)
         item["contenthtml"] = html
-        item["detail"] = clean_html(html)
+        item["detail"] = cleaner(html)
         item["comeintime"] = int2long(int(time.time()))
         del item['count'], item['crawl']
         if 'crawl_status' in item:
@@ -177,7 +173,7 @@ class DetailSpider:
             pass
         logger.info('[采集成功]{}-{}'.format(item['title'], item['publishtime']))
 
-    def set_crawl_status(self, item: dict, status: bool):
+    def update_crawl_status(self, item: dict, status: bool):
         self.crawl_tab.update_one(
             {'_id': item['_id']},
             {'$set': {'crawl': status}}
@@ -185,12 +181,12 @@ class DetailSpider:
 
     def crawl_spider(self, sc: Scheduler):
         while True:
-            if load_records(self.user.phone, sc.today) >= self.crawl_total:
+            if sc.count >= sc.total:
                 return True
             item = sc.crawl_task
             if len(item) == 0:
                 return False
-            self.set_crawl_status(item, True)
+            self.update_crawl_status(item, True)
             '''使用调度器记录采集内容,出现错误时错误写入数据库'''
             sc.spider_code = item['spidercode']
             sc.crawl_url = item['competehref']
@@ -205,7 +201,7 @@ class DetailSpider:
                         {"_id": item["_id"]},
                         {'$set': {'crawl_status': 'finished'}}
                     )
-                    update_records(self.user.phone, 1)
+                    sc.update_count(1)
             except JyBasicException as e:
                 if e.code == 10105:
                     '''检查出该异常时,程序会将es查询结果更新采集表'''
@@ -213,14 +209,17 @@ class DetailSpider:
                         {"_id": item["_id"]},
                         {'$set': {'count': item['count']}}
                     )
+                    logger.info('[重复数据]{}-{}'.format(item['title'], item['publishtime']))
                 else:
                     sc.err_record(e)
                     self.crawl_tab.update_one(
                         {"_id": item["_id"]},
                         {'$set': {'crawl_status': 'error'}}
                     )
+                    logger.info('[问题数据]{}-{}'.format(item['title'], item['publishtime']))
+                sc.update_count(0)
             finally:
-                self.set_crawl_status(item, False)
+                self.update_crawl_status(item, False)
                 sc.wait_for_next_task()
 
     def start(self):
@@ -244,5 +243,4 @@ if __name__ == '__main__':
         db='py_spider',
         crawl_tab='ybw_list',
         save_tab='data_bak',
-        crawl_total=6000,
     ).start()

+ 3 - 5
ybw/list_spider.py

@@ -9,12 +9,10 @@ from lxml.html import fromstring, HtmlElement
 from config.load import crawler_url, region
 from crawler.crawl_scheduler import Scheduler
 from crawler.login import login, load_login_cookies, login_session_check
-from utils.databases import MongoDBS
-from utils.es_query import get_es
+from utils.databases import mongo_table, int2long, es_query
 from utils.execptions import CustomCheckError, VoidCrawlError, JyBasicException
 from utils.log import logger
 from utils.socks5 import Proxy
-from utils.tools import int2long
 
 CrawlMenu = namedtuple('CrawlMenu', ['channel', 'spidercode', 'table_type'])
 
@@ -32,7 +30,7 @@ class ListSpider:
             CrawlMenu('工程招标', 'a_ybwcgyzbw_gczb', '1%2C'),
         ]
         self.crawl_max_page = crawl_max_page or 1
-        self.crawl_tab = MongoDBS(db, crawl_tab).coll
+        self.crawl_tab = mongo_table(db, crawl_tab)
         self.user = None
         self.session = None
 
@@ -116,7 +114,7 @@ class ListSpider:
             }
             if title is None:
                 raise CustomCheckError(code=10107, reason='发布标题解析空值错误')
-            item['count'] = get_es(item["title"], item["l_np_publishtime"])
+            item['count'] = es_query(item["title"], item["l_np_publishtime"])
             item['crawl'] = False
             # print(f'>>> {title} - {competehref}')
             results.append(item)