dongzhaorui@topnet.net.cn 3 years ago
parent
commit
49559576ba

+ 2 - 2
zbytb/config/conf.yaml

@@ -23,12 +23,12 @@ ali_oss:
   bucket_name: jy-datafile
 
 
-# es
 es:
   host: 172.17.145.170
-#  host: 192.168.3.206
 #  host: 127.0.0.1
+#  host: 192.168.3.206
   port: !!int 9800
+  db: bidding_all
 
 
 # 代理

+ 12 - 1
zbytb/config/load.py

@@ -1,3 +1,4 @@
+import sys
 from pathlib import Path
 
 import yaml
@@ -9,7 +10,9 @@ __all__ = [
     'jy_proxy',
     'node_module',
     'es_conf',
-    'headers'
+    'headers',
+    'analyze_url',
+    'node_module_path'
 ]
 
 base_path = Path(__file__).parent
@@ -25,6 +28,14 @@ with open(yaml_conf, encoding="utf-8") as f:
     jy_proxy: dict = conf['proxy']
     node_module: dict = conf['node_module']
 
+
 with open(yaml_constants, encoding="utf-8") as fp:
     constants = yaml.safe_load(fp)
     headers: dict = constants['headers']
+    analyze_url = f'http://{es_conf["host"]}:{es_conf["port"]}/{es_conf["db"]}/_analyze'
+
+
+if sys.platform == 'linux':
+    node_module_path = node_module['linux']
+else:
+    node_module_path = node_module['windows']

+ 2 - 2
zbytb/crawler/check_utils.py

@@ -1,6 +1,6 @@
 import re
 
-from utils.es_query import get_es
+from utils.databases import es_query
 from utils.execptions import (
     CustomAccountPrivilegeError,
     CustomCheckError
@@ -76,7 +76,7 @@ class CheckPrePareRequest:
         :param publish_time: 发布时间的时间戳(l_np_publishtime)
         :param rows: 采集内容
         """
-        retrieved_result = get_es(title, publish_time)
+        retrieved_result = es_query(title, publish_time)
         if retrieved_result != 0:
             '''es查询数据结果'''
             rows['count'] = retrieved_result

+ 83 - 70
zbytb/crawler/crawl_scheduler.py

@@ -7,18 +7,18 @@ from datetime import date, timedelta
 import requests
 
 from crawler.login import User
-from utils.databases import MongoDBS
+from utils.databases import mongo_table, int2long, object_id
 from utils.execptions import JyBasicException
 from utils.log import logger
-from utils.tools import int2long, object_id
+from utils.tools import get_host_ip
 
 
 class Scheduler:
 
     def __init__(self, query: dict):
         self.query = query
-        self.crawl_account_tab = MongoDBS('py_spider', 'match_account').coll
-        self.crawl_error_tab = MongoDBS('py_spider', 'crawl_error').coll
+        self.account_tab = mongo_table('py_spider', 'match_account')
+        self.crawl_error_tab = mongo_table('py_spider', 'crawl_error')
         self.crawl_start = False
         self.account_id = None
         self.user = None
@@ -29,64 +29,26 @@ class Scheduler:
         self.crawl_type = None
         self.__records = None
 
+    def _update_data(self, item):
+        """
+        更新账号所属的采集数据信息
+
+        :param item: 最新数据
+        """
+        item['ip'] = get_host_ip()
+        item['update_time'] = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
+        self.account_tab.update_one(
+            {'_id': self.account_id},
+            {'$set': item}
+        )
+
     def _release_account(self):
         rows = dict(
             used=False,
             update_time=datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
         )
         if self.account_id is not None:
-            self.crawl_account_tab.update_one(
-                {'_id': self.account_id},
-                {'$set': rows}
-            )
-
-    def _set_account(self, item: dict):
-        self.account_id = item['_id']
-        self.user = User(item['account'], item['password'])
-        logger.info(f'[开启调度]启用账号: {self.user.username}')
-        usage = int(item['usage'])
-        item['usage'] = usage + 1
-        use_time = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
-        item['update_time'] = use_time
-        item['used'] = True
-        self.crawl_account_tab.update_one(
-            {'_id': self.account_id},
-            {'$set': item}
-        )
-
-    def _query_account(self, query: dict):
-        return self.crawl_account_tab.find_one(query, sort=[('usage', 1)])
-
-    def __enter__(self):
-        rows = self._query_account(self.query)
-        if rows is not None:
-            self._set_account(rows)
-            self.crawl_start = True  # 控制调度的状态
-        else:
-            # TODO 没有空闲账号时,取出使用次数最少的账号,暂未实现
-            logger.warning(f'请检查mongo表 {self.crawl_account_tab.name} 账号状态')
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        logger.info(f'[关闭调度]')
-        self._release_account()
-        self.crawl_start = False
-
-        if exc_type is not None:
-            errmsg = traceback.extract_tb(exc_tb)
-            e = JyBasicException(
-                code=10500,
-                reason=str(exc_type),
-                title='未知系统错误'
-            )
-            self.err_record(e)
-            logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
-        return True
-
-    def finished(self, execute_next_time=None):
-        logger.info("任务结束")
-        self._release_account()
-        self.sleep(execute_next_time)
+            self._update_data(rows)
 
     @staticmethod
     def sleep(wait_time=None):
@@ -106,19 +68,6 @@ class Scheduler:
     def yesterday(self):
         return (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
 
-    def err_record(self, e: JyBasicException):
-        rows = {
-            'account': self.user.username if self.user is not None else '',
-            'spidercode': self.spider_code,
-            'url': self.crawl_url,
-            'status_code': e.code,
-            'reason': e.reason,
-            'params': getattr(e, 'title', ''),
-            'crawl_time': int2long(int(time.time())),
-            'crawl_type': self.crawl_type,
-        }
-        self.crawl_error_tab.insert_one(rows)
-
     @property
     def crawl_task(self):
         results = {}
@@ -137,9 +86,73 @@ class Scheduler:
         except requests.RequestException:
             return results
 
+    def err_record(self, e: JyBasicException):
+        rows = {
+            'account': self.user.username if self.user is not None else '',
+            'spidercode': self.spider_code,
+            'url': self.crawl_url,
+            'status_code': e.code,
+            'reason': e.reason,
+            'params': getattr(e, 'title', ''),
+            'crawl_time': int2long(int(time.time())),
+            'crawl_type': self.crawl_type,
+        }
+        self.crawl_error_tab.insert_one(rows)
+
     def query_user(self, account: str):
         query = {'account': account}
-        rows = self.crawl_account_tab.find_one(query)
+        rows = self.account_tab.find_one(query)
         if rows is None:
             raise
         return User(rows['account'], rows['password'])
+
+    def finished(self, execute_next_time=None):
+        logger.info("任务结束")
+        self._release_account()
+        self.sleep(execute_next_time)
+
+    def update_count(self, number):
+        rows = self.account_tab.find_one({'_id': self.account_id})
+        records = rows.get('records', {self.today: 0})
+        '''采集记录历史保存7天'''
+        count = records.get(self.today, 0)
+        count += number
+        if len(records) > 7:
+            records.clear()
+            records.setdefault(self.today, count)
+        else:
+            records.update({self.today: count})
+        rows.update({'records': records})
+        self._update_data(rows)
+
+    def __enter__(self):
+        logger.info(f'[开启调度]')
+        rows = self.account_tab.find_one(self.query, sort=[('update_time', 1)])
+        if rows is not None:
+            self.account_id = rows['_id']
+            self.user = User(rows['account'], rows['password'])
+            logger.info(f'[启用账号] {self.user.username}')
+            rows['used'] = True
+            records = rows.get('records', {self.today: 0})
+            rows.update({'records': records})
+            self._update_data(rows)
+            self.crawl_start = True  # 控制调度的状态
+        else:
+            logger.warning(f'[{self.query.get("site")}采集]暂无闲置账号')
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        logger.info(f'[关闭调度]')
+        self._release_account()
+        self.crawl_start = False
+
+        if exc_type is not None:
+            errmsg = traceback.extract_tb(exc_tb)
+            e = JyBasicException(
+                code=10500,
+                reason=str(exc_type),
+                title='未知系统错误'
+            )
+            self.err_record(e)
+            logger.error(f'错误类型: {exc_type}, 错误内容: {exc_val}, 错误详情: {errmsg}')
+        return True

+ 1 - 7
zbytb/crawler/sessions_521.py

@@ -1,18 +1,12 @@
 import copy
 import re
-import sys
 
 import execjs
 import jsbeautifier
 import requests
 from requests.utils import dict_from_cookiejar
 
-from config.load import node_module
-
-if sys.platform == 'linux':
-    node_module_path = node_module['linux']
-else:
-    node_module_path = node_module['windows']
+from config.load import node_module_path
 
 
 def save_js_script(js_code: str, allow_beautify_code=False):

+ 14 - 17
zbytb/crawler/spiders/DetailPageSpider.py

@@ -16,14 +16,13 @@ from utils.attachment import (
     AttachmentDownloader,
     extract_file_name, extract_file_name_by_href
 )
-from utils.databases import MongoDBS
+from utils.databases import mongo_table, int2long
 from utils.execptions import (
     CustomAccountPrivilegeError,
     AttachmentNullError,
     CustomCheckError, JyBasicException
 )
 from utils.log import logger
-from utils.tools import int2long
 
 
 class CrawlDetailPageSpider:
@@ -35,13 +34,11 @@ class CrawlDetailPageSpider:
             save_tab: str,
             error_tab: str,
     ):
-        self.crawl_tab = MongoDBS(db, crawl_tab).coll
-        self.save_tab = MongoDBS(db, save_tab).coll
-        self.crawl_error_tab = MongoDBS(db, error_tab).coll
-
-        self.senior_account = 'runhekeji'
-
+        self.crawl_tab = mongo_table(db, crawl_tab)
+        self.save_tab = mongo_table(db, save_tab)
+        self.crawl_error_tab = mongo_table(db, error_tab)
         self.attachment_downloader = AttachmentDownloader()
+        self.senior_account = 'runhekeji'
 
     @staticmethod
     def select_user(rows: dict, sc: Scheduler):
@@ -160,19 +157,19 @@ class CrawlDetailPageSpider:
             pass
         logger.info("[采集成功]{}-{}".format(rows['title'], rows['publishtime']))
 
-    def set_senior_privilege(self, rows: dict):
+    def set_senior_privilege(self, item: dict):
         """
         设置高级账号
 
-        :param rows: 采集数据内容
+        :param item: 采集数据内容
         """
         '''需要高级会员才能查询的招标信息,设置高级账号'''
         self.crawl_tab.update_one(
-            {"_id": rows["_id"]},
+            {"_id": item["_id"]},
             {'$set': {'account': self.senior_account}}
         )
-        '''释放采集状态'''
-        self.set_crawl_status(rows, False)
+        '''采集状态'''
+        self.update_crawl_status(item, False)
 
     def crawl_error(
             self,
@@ -241,7 +238,7 @@ class CrawlDetailPageSpider:
                     break
         return success, response
 
-    def set_crawl_status(self, rows: dict, status: bool):
+    def update_crawl_status(self, rows: dict, status: bool):
         self.crawl_tab.update_one(
             {'_id': rows['_id']},
             {'$set': {'crawl': status}}
@@ -275,13 +272,13 @@ class CrawlDetailPageSpider:
             item = sc.crawl_task
             if len(item) == 0:
                 return False
-            self.set_crawl_status(item, True)
+            self.update_crawl_status(item, True)
             sc.spider_code = item['spidercode']
             sc.crawl_url = item['competehref']
             try:
                 CheckTask(item)
                 self.crawl_spider(item, sc)
-                self.set_crawl_status(item, False)
+                self.update_crawl_status(item, False)
                 sc.wait_for_next_task(10)
             except JyBasicException as e:
                 if e.code == 10105:
@@ -296,7 +293,7 @@ class CrawlDetailPageSpider:
                         {"_id": item["_id"]},
                         {'$set': {'crawl_status': 'error'}}
                     )
-                self.set_crawl_status(item, False)
+                self.update_crawl_status(item, False)
 
     def start(self):
         query = {'used': False, 'site': '中国招标与采购网'}

+ 5 - 7
zbytb/crawler/spiders/ListPageSpider.py

@@ -6,11 +6,9 @@ from lxml.html import fromstring
 
 from config.load import headers
 from crawler.defaults import http_request_get
-from utils.databases import MongoDBS
-from utils.es_query import get_es
+from utils.databases import mongo_table, int2long, es_query
 from utils.log import logger
 from utils.socks5 import Proxy
-from utils.tools import int2long
 
 
 class CrawlListPageSpider:
@@ -23,8 +21,8 @@ class CrawlListPageSpider:
             enable_proxy=False,
             **kwargs
     ):
-        self.crawl_tab = MongoDBS(db, crawl_tab).coll
-        self.crawl_error_tab = MongoDBS(db, error_tab).coll
+        self.crawl_tab = mongo_table(db, crawl_tab)
+        self.crawl_error_tab = mongo_table(db, error_tab)
 
         self.host = 'https://www.zbytb.com/search'
         self.headers = kwargs.get('headers') or headers
@@ -71,7 +69,7 @@ class CrawlListPageSpider:
                 **label_info,
                 "crawl": False,
             }
-            info["count"] = get_es(info["title"], info["l_np_publishtime"])
+            info["count"] = es_query(info["title"], info["l_np_publishtime"])
             # print('>>> ', info['competehref'])
             results.append(info)
         self.crawl_tab.insert_many(results)
@@ -88,7 +86,7 @@ class CrawlListPageSpider:
             'crawl_type': 'list'
         }
         self.crawl_error_tab.insert_one(items)
-        logger.error('采集失败')
+        logger.error('[采集失败]列表页')
 
     def crawl_spider(self, task: tuple):
         module_id, area_id, page = task

+ 31 - 5
zbytb/utils/attachment.py

@@ -3,7 +3,6 @@ import os
 import re
 import traceback
 import uuid
-import warnings
 from urllib.parse import urlparse, unquote
 
 import requests
@@ -12,9 +11,31 @@ import urllib3
 from config.load import headers
 from utils.aliyun import AliYunService
 from utils.execptions import AttachmentNullError
+from utils.log import logger
 from utils.socks5 import Proxy
 
 urllib3.disable_warnings()
+# 文件文档类型
+DOCTYPE = {
+    'txt', 'rtf', 'dps', 'et', 'ett', 'xls',
+    'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx',
+    'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx',
+    'odt', 'wps', 'csv', 'xml', 'xps'
+}
+# 压缩类型
+COMPRESSION_TYPE = {
+    'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab',
+    'arj', 'lzh', 'ace', 'uue', 'edxz',
+}
+# 图片类型
+IMAGE_TYPE = {
+    'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp',
+    'pdf'
+}
+# 其他类型
+OTHER_TYPE = {
+    'swf', 'nxzf', 'xezf', 'nxcf'
+}
 
 
 def sha1(val):
@@ -30,16 +51,19 @@ def remove(file_path: str):
     os.remove(file_path)
 
 
-def getsize(file_path: str):
+def getsize(file):
     try:
-        return os.path.getsize(file_path)
+        return os.path.getsize(file)
     except FileNotFoundError:
         return 0
 
 
 def discern_file_format(text):
     file_types = {
-        'pdf', 'doc', 'docx', 'rar', 'zip', 'gzzb', 'jpg', 'png', 'swf'
+        *DOCTYPE,
+        *COMPRESSION_TYPE,
+        *IMAGE_TYPE,
+        *OTHER_TYPE
     }
     for file_type in file_types:
         all_file_format = [file_type, file_type.upper()]
@@ -48,6 +72,8 @@ def discern_file_format(text):
             if result is not None:
                 return t
     else:
+        unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
+        logger.warning(f'[附件类型识别]未定义的文件类型{unknown_type}')
         return None
 
 
@@ -190,7 +216,7 @@ class AttachmentDownloader(AliYunService):
                 result.setdefault('url', 'oss')
                 super()._push_oss_from_local(key, local_tmp_file)
             except Exception as e:
-                warnings.warn(
+                logger.warning(
                     "[{}]下载异常,原因:{}".format(file_name, e.__class__.__name__)
                 )
         remove(local_tmp_file)

+ 95 - 33
zbytb/utils/databases.py

@@ -1,47 +1,109 @@
-from typing import Optional
-
+import bson
 import pymongo
 import redis
-from pymongo.collection import Collection
-from pymongo.database import Database
+import requests
+from elasticsearch import Elasticsearch
+
+from config.load import mongo_conf, redis_conf, es_conf, analyze_url
+
+
+# ---------------------------------- mongo ----------------------------------
+def mongo_client(cfg=None):
+    if cfg is None:
+        cfg = mongo_conf
+    return pymongo.MongoClient(host=cfg['host'], port=cfg['port'])
+
+
+def mongo_database(db: str):
+    client = mongo_client()
+    return client[db]
+
+
+def mongo_table(db: str, coll: str):
+    client = mongo_client()
+    return client[db][coll]
+
+
+def int2long(param: int):
+    """int 转换成 long """
+    return bson.int64.Int64(param)
 
-from config.load import mongo_conf, redis_conf
 
-__all__ = ['MongoDBS', 'RedisDBS']
+def object_id(_id: str):
+    return bson.objectid.ObjectId(_id)
 
 
-class MongoDBS:
-    """ Mongo """
+# ---------------------------------- es ----------------------------------
+def es_client(cfg=None):
+    if cfg is None:
+        cfg = es_conf
+    return Elasticsearch([{"host": cfg['host'], "port": cfg['port']}])
 
-    def __init__(self, db: str, collection: str, cfg: dict = mongo_conf):
-        self.client = pymongo.MongoClient(host=cfg['host'], port=cfg['port'])
-        self.db: Database = self.client[db]
-        self.coll: Collection = self.db[collection]
 
-    def __enter__(self):
-        return self.coll
+def es_participles_service(text: str):
+    """
+    获取文本的分词列表
 
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        # 上下文管理器,实例调用完毕后,关闭客户端连接
-        self.client.close()
+    :param text: 需要分词的文本
+    :return: 分词列表
+    """
+    result = []
+    params = {"text": text, "analyzer": "ik_smart"}
+    res = requests.get(analyze_url, params=params, timeout=60)
+    if res.status_code == 200:
+        tokens = res.json().get('tokens', [])
+        for x in tokens:
+            if x["token"].encode('utf-8').isalpha():
+                continue
+            result.append(x["token"])
+    return result
 
-    def __del__(self):
-        # 实例调用完毕后,关闭客户端连接
-        self.client.close()
 
+def es_query(title: str, publish_time: int):
+    """
+    查询es
 
-class RedisDBS:
-    """ redis """
+    :param title: 标题
+    :param publish_time: 发布时间
+    :return:
+    """
+    client = es_client()
+    stime = publish_time - 432000  # 往前推5天
+    etime = publish_time + 432000
+    conditions = []
+    participles = es_participles_service(title)
+    for word in participles:
+        conditions.append({
+            "multi_match": {
+                "query": word,
+                "type": "phrase",
+                "fields": ["title"]
+            }
+        })
+    conditions.append({
+        "range": {"publishtime": {"from": stime, "to": etime}}
+    })
+    query = {
+        "query": {
+            "bool": {
+                "must": conditions,
+                "minimum_should_match": 1
+            }
+        }
+    }
+    result = client.search(index='bidding', body=query, request_timeout=100)
+    count = len(result['hits']['hits'])
+    return count
 
-    def __init__(self, cfg: Optional[dict] = redis_conf):
-        pool = redis.ConnectionPool(
-            host=cfg['host'],
-            port=cfg['port'],
-            password=cfg['pwd'],
-            db=cfg['db']
-        )
-        self.__r = redis.Redis(connection_pool=pool, decode_responses=True)
 
-    @property
-    def redis(self):
-        return self.__r
+# ---------------------------------- redis ----------------------------------
+def redis_client(cfg=None):
+    if cfg is None:
+        cfg = redis_conf
+    pool = redis.ConnectionPool(
+        host=cfg['host'],
+        port=cfg['port'],
+        password=cfg['pwd'],
+        db=cfg['db']
+    )
+    return redis.Redis(connection_pool=pool, decode_responses=True)

+ 0 - 45
zbytb/utils/es_query.py

@@ -1,45 +0,0 @@
-import requests
-from elasticsearch import Elasticsearch
-
-from config.load import es_conf
-
-es = Elasticsearch([{"host": es_conf['host'], "port": es_conf['port']}])
-
-
-def httpAz(title):
-    url = "http://{}:{}/bidding/_analyze".format(es_conf['host'], es_conf['port'])
-    params = {"text": title, "analyzer": "ik_smart"}
-    arr = []
-    res = requests.get(url=url, params=params, timeout=60)
-    if res.status_code == 200:
-        tokens = res.json().get('tokens', [])
-        for x in tokens:
-            if x["token"].encode('utf-8').isalpha():
-                continue
-            arr.append(x["token"])
-
-    q = [{"multi_match": {"query": v, "type": "phrase", "fields": ["title"]}} for v in arr]
-    return q
-
-
-def get_es(title, publishtime):
-    """
-    :param title: 标题
-    :param publishtime: 发布时间
-    :return:
-    """
-    stime = publishtime - 432000    # 往前推5天
-    etime = publishtime + 432000
-    q1 = httpAz(title)
-    q1.append({"range": {"publishtime": {"from": stime, "to": etime}}})
-    esQuery = {
-        "query": {
-            "bool": {
-                "must": q1,
-                "minimum_should_match": 1
-            }
-        }
-    }
-    result = es.search(index='bidding', body=esQuery, request_timeout=100)
-    count = len(result['hits']['hits'])
-    return count

+ 5 - 2
zbytb/utils/log.py

@@ -1,11 +1,14 @@
+from pathlib import Path
+
 from loguru import logger
 
+_absolute = Path(__file__).absolute().parent.parent
+_log_path = (_absolute / 'logs/crawl-{time:YYYY-MM-DD}.log').resolve()
 logger.add(
-    'logs/crawl-{time:YYYY-MM-DD}.log',
+    _log_path,
     format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}',
     level='INFO',
     rotation='00:00',
     retention='1 week',
     encoding='utf-8',
-    # filter=lambda x: '采集' in x['message']
 )

+ 9 - 8
zbytb/utils/tools.py

@@ -1,10 +1,11 @@
-import bson
+import socket
 
 
-def int2long(param: int):
-    """int 转换成 long """
-    return bson.int64.Int64(param)
-
-
-def object_id(_id: str):
-    return bson.objectid.ObjectId(_id)
+def get_host_ip():
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    try:
+        s.connect(('8.8.8.8', 80))
+        ip = s.getsockname()[0]
+    finally:
+        s.close()
+    return ip