ソースを参照

update:优化若干内容与方法

dongzhaorui 2 年 前
コミット
1f66ec7266

+ 34 - 0
FworkSpider/README.md

@@ -0,0 +1,34 @@
+# 本地环境搭建文档
+### 1.python 安装
+
+    建议使用python3.8 python安装完成后要将本地的feapder包进行替换,此包根据实际需求,对原框架进行了部分修改
+### 2.nodejs 安装
+
+    无注意事项
+### 3.firfox + geckodrive 安装
+    项目使用火狐浏览器 78.14 本地建议使用88版本之前的火狐和驱动,以免与项目环境不一致
+    导致项目无法正常运行
+
+# 爬虫开发流程
+## 1.feapder 创建爬虫文件
+    feapder create -s  <spider_name> <spider_type>  spider_type建议使用 2 新包有对模板进行修改
+            (spider_type=1 AirSpider; spider_type=2 Spider; spider_type=3 BatchSpider)
+
+## 2.填参数、解析:
+    feapder.Request(url, item=menu._asdict(), cookies,callback,render,auto_request)
+    大部分参数同scrapy的Request参数相同,这里重点介绍
+#### 1)render
+    为true时使用selenium,这里在setting中定义的是firefox 无头、无图片、浏览器数量默认为1,即一个爬虫只打开一个浏览器
+#### 2)auto_request 
+    为自动请求的参数,自定义下载文件、图片时推荐使用,当前框架中,调用下载附件的方法时、必须将此字段设置为False
+###### 3)is_abandoned
+    当发生异常时是否放弃重试 True/False. 默认False,根据实际需求可考虑使用
+###### 4)random_user_agent
+    随机 user_agent ,需要指定user_agent ,此处最好设置为False
+###### 5)use_session
+    是否使用session
+#### 6)request_sync
+    是否同步请求下载网页,默认异步。
+    如果该请求url过期时间快,可设置为True,相当于yield的reqeust会立即响应,而不是去排队
+###### 7)priority
+    优先级 越小越优先 默认300

+ 4 - 0
FworkSpider/__init__.py

@@ -0,0 +1,4 @@
+__all__ = [
+    "setting",
+    "feapder",
+]

+ 26 - 0
FworkSpider/create.py

@@ -0,0 +1,26 @@
+import fire
+
+from feapder.commands.create.create_spider import CreateSpider
+
+
+def create_spider(spider_name, spider_type):
+    """
+
+    4 招投标爬虫列表页模板(ztbpc_feapder)
+    5 招投标爬虫详情页模板(T_details)
+    6 拟建爬虫列表页模板(njpc_feapder)
+    7 拟建爬虫详情页模板(njpc_details)
+
+    :param spider_name: 类名
+    :param spider_type: 爬虫模版类型
+    :return:
+    """
+    creat = CreateSpider()
+    creat.create(spider_name=spider_name, spider_type=spider_type, author="swordFish")
+
+
+if __name__ == '__main__':
+    # fire.Fire(create_spider('ztbpc_feapder', 4))
+    fire.Fire(create_spider('ztbpc_feapder', 5))
+    # fire.Fire(create_spider('njpc_list', 6))
+    # fire.Fire(create_spider('njpc_detail', 7))

+ 95 - 0
FworkSpider/feapder/core/handle_failed_items.py

@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2022/11/18 11:33 AM
+---------
+@summary:
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+import bson
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.buffer.item_buffer import ItemBuffer
+from feapder.db.redisdb import RedisDB
+from feapder.network.item import Item, UpdateItem
+from feapder.utils.log import log
+
+# 执行 eval 时动态变量
+try:
+    from bson import ObjectId
+except ImportError:
+    pass
+
+
+class HandleFailedItems:
+    def __init__(self, redis_key, task_table=None, item_buffer=None):
+        if redis_key.endswith(":s_failed_items"):
+            redis_key = redis_key.replace(":s_failed_items", "")
+
+        self._redisdb = RedisDB()
+        self._item_buffer = item_buffer or ItemBuffer(redis_key, task_table=task_table)
+
+        self._table_failed_items = setting.TAB_FAILED_ITEMS.format(redis_key=redis_key)
+
+    def get_failed_items(self, count=1):
+        failed_items = self._redisdb.sget(
+            self._table_failed_items, count=count, is_pop=False
+        )
+        return failed_items
+
+    def reput_failed_items_to_db(self):
+        log.debug("正在重新写入失败的items...")
+        total_count = 0
+        while True:
+            try:
+                failed_items = self.get_failed_items()
+                if not failed_items:
+                    break
+
+                for data_str in failed_items:
+                    data = eval(data_str)
+
+                    for add in data.get("add"):
+                        table = add.get("table")
+                        datas = add.get("datas")
+                        for _data in datas:
+                            _data = dict(**_data)
+                            _data.pop("_id", None)  # 删除ObjectId
+                            if "comeintime" in _data:
+                                _data["comeintime"] = bson.Int64(tools.get_current_timestamp())  # 重置入库时间
+
+                            item = Item(**_data)
+                            item.table_name = table
+                            self._item_buffer.put_item(item)  # 异步推送
+                            total_count += 1
+
+                    for update in data.get("update"):
+                        table = update.get("table")
+                        datas = update.get("datas")
+                        update_keys = update.get("update_keys")
+                        for _data in datas:
+                            item = UpdateItem(**_data)
+                            item.table_name = table
+                            item.update_keys = update_keys
+                            self._item_buffer.put_item(item)
+                            total_count += 1
+
+                    # 入库成功后删除
+                    def delete_item():
+                        self._redisdb.srem(self._table_failed_items, data_str)
+
+                    self._item_buffer.put_item(delete_item)
+                    self._item_buffer.flush()
+
+            except Exception as e:
+                log.exception(e)
+
+        if total_count:
+            log.debug("导入%s条失败item到数库" % total_count)
+        else:
+            log.debug("没有失败的item")
+
+    def close(self):
+        self._item_buffer.close()

+ 109 - 0
FworkSpider/feapder/dedup/README.md

@@ -0,0 +1,109 @@
+# Dedup
+
+Dedup是feapder大数据去重模块,内置3种去重机制,使用方式一致,可容纳的去重数据量与内存有关。不同于BloomFilter,去重受槽位数量影响,Dedup使用了弹性的去重机制,可容纳海量的数据去重。
+
+
+## 去重方式
+
+### 临时去重
+
+> 基于redis,支持批量,去重有时效性。去重一万条数据约0.26秒,一亿条数据占用内存约1.43G
+
+```
+from feapder.dedup import Dedup
+
+data = {"xxx": 123, "xxxx": "xxxx"}
+datas = ["xxx", "bbb"]
+
+def test_ExpireFilter():
+    dedup = Dedup(
+        Dedup.ExpireFilter, expire_time=10, redis_url="redis://@localhost:6379/0"
+    )
+
+    # 逐条去重
+    assert dedup.add(data) == 1
+    assert dedup.get(data) == 1
+
+    # 批量去重
+    assert dedup.add(datas) == [1, 1]
+    assert dedup.get(datas) == [1, 1]
+```
+
+
+### 内存去重
+
+> 基于内存,支持批量。去重一万条数据约0.5秒,一亿条数据占用内存约285MB
+
+```
+from feapder.dedup import Dedup
+
+data = {"xxx": 123, "xxxx": "xxxx"}
+datas = ["xxx", "bbb"]
+
+def test_MemoryFilter():
+    dedup = Dedup(Dedup.MemoryFilter)  # 表名为test 历史数据3秒有效期
+
+    # 逐条去重
+    assert dedup.add(data) == 1
+    assert dedup.get(data) == 1
+
+    # 批量去重
+    assert dedup.add(datas) == [1, 1]
+    assert dedup.get(datas) == [1, 1]
+```
+
+### 永久去重
+
+> 基于redis,支持批量,永久去重。 去重一万条数据约3.5秒,一亿条数据占用内存约285MB
+
+    from feapder.dedup import Dedup
+
+    datas = {
+        "xxx": xxx,
+        "xxxx": "xxxx",
+    }
+
+    dedup = Dedup()
+
+    print(dedup) # <ScalableBloomFilter: RedisBitArray: dedup:bloomfilter:bloomfilter>
+    print(dedup.add(datas)) # 0 不存在
+    print(dedup.get(datas)) # 1 存在
+    
+## 过滤数据
+
+Dedup可以通过如下方法,过滤掉已存在的数据
+
+
+```python
+from feapder.dedup import Dedup
+
+def test_filter():
+    dedup = Dedup(Dedup.BloomFilter, redis_url="redis://@localhost:6379/0")
+
+    # 制造已存在数据
+    datas = ["xxx", "bbb"]
+    dedup.add(datas)
+
+    # 过滤掉已存在数据 "xxx", "bbb"
+    datas = ["xxx", "bbb", "ccc"]
+    dedup.filter_exist_data(datas)
+    assert datas == ["ccc"]
+```
+
+```python
+# redis cluster 去重
+from feapder.dedup import Dedup
+
+def test_filter():
+    dedup = Dedup(Dedup.SwordFishFilter, redis_url=["192.168.3.207:2179", "192.168.3.166:2379"], expire_time=20)
+
+    # 制造已存在数据
+    datas = ["xxx", "bbb"]
+    dedup.add(datas)
+
+    # 过滤掉已存在数据 "xxx", "bbb"
+    datas = ["xxx", "bbb", "ccc"]
+    ss = dedup.filter_exist_data(datas)
+    print(ss)
+    assert datas == ["ccc"]
+```

+ 41 - 0
FworkSpider/feapder/dedup/basefilter.py

@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2022/9/21 11:17 AM
+---------
+@summary:
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+import abc
+from typing import List, Union
+
+
+class BaseFilter:
+    @abc.abstractmethod
+    def add(
+        self, keys: Union[List[str], str], *args, **kwargs
+    ) -> Union[List[bool], bool]:
+        """
+
+        Args:
+            keys: list / 单个值
+            *args:
+            **kwargs:
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 0 否则返回 1, 可以理解为是否添加成功)
+        """
+        pass
+
+    @abc.abstractmethod
+    def get(self, keys: Union[List[str], str]) -> Union[List[bool], bool]:
+        """
+        检查数据是否存在
+        Args:
+            keys: list / 单个值
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 1 否则返回 0)
+        """
+        pass

+ 70 - 0
FworkSpider/feapder/dedup/litefilter.py

@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2022/9/21 11:28 AM
+---------
+@summary:
+---------
+@author: Boris
+@email: boris_liu@foxmail.com
+"""
+from typing import List, Union, Set
+
+from feapder.dedup.basefilter import BaseFilter
+
+
+class LiteFilter(BaseFilter):
+    def __init__(self):
+        self.datas: Set[str] = set()
+
+    def add(
+        self, keys: Union[List[str], str], *args, **kwargs
+    ) -> Union[List[int], int]:
+        """
+
+        Args:
+            keys: list / 单个值
+            *args:
+            **kwargs:
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 0 否则返回 1, 可以理解为是否添加成功)
+        """
+        if isinstance(keys, list):
+            is_add = []
+            for key in keys:
+                if key not in self.datas:
+                    self.datas.add(key)
+                    is_add.append(1)
+                else:
+                    is_add.append(0)
+        else:
+            if keys not in self.datas:
+                is_add = 1
+                self.datas.add(keys)
+            else:
+                is_add = 0
+        return is_add
+
+    def get(self, keys: Union[List[str], str]) -> Union[List[int], int]:
+        """
+        检查数据是否存在
+        Args:
+            keys: list / 单个值
+
+        Returns:
+            list / 单个值 (如果数据已存在 返回 1 否则返回 0)
+        """
+        if isinstance(keys, list):
+            temp_set = set()
+            is_exist = []
+            for key in keys:
+                # 数据本身重复或者数据在去重库里
+                if key in temp_set or key in self.datas:
+                    is_exist.append(1)
+                else:
+                    is_exist.append(0)
+                    temp_set.add(key)
+
+            return is_exist
+        else:
+            return int(keys in self.datas)

+ 99 - 0
FworkSpider/feapder/dedup/swordfishfilter.py

@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-03-01
+---------
+@summary:
+---------
+@author: dzr
+@email: dongzhaorui@topnet.net.cn
+"""
+from Crypto.Hash import SHA256
+
+from feapder.db.redisdb import RedisDB
+from feapder.dedup.basefilter import BaseFilter
+
+
+class SwordFishFilter(BaseFilter):
+    redis_cluster = None
+
+    def __init__(self, redis_url, expire_time=None):
+        if not redis_url:
+            raise ValueError("redis_url can't be None")
+
+        self.startup_nodes = redis_url
+        if self.__class__.redis_cluster is None:
+            self.__class__.redis_cluster = RedisDB(
+                ip_ports=self.startup_nodes,
+                decode_responses=True,
+                user_pass='',
+            )
+
+        self._ex = expire_time or 86400 * 365 * 2  # 2年 = 86400 * 365 * 2
+        self._prefix = 'pylist_'
+
+    def __repr__(self):
+        return "<RedisCluster: {}>".format(self.startup_nodes)
+
+    @staticmethod
+    def sha256_encrypt(info):
+        if info is None:
+            return ''
+        res = SHA256.new(info.encode('utf-8'))
+        data = res.hexdigest()
+        return data
+
+    def encrypt_datas(self, datas):
+        return [self.sha256_encrypt(data) for data in datas]
+
+    def _exists(self, key):
+        return self.redis_cluster.exists(key)
+
+    def exists(self, key):
+        """全量检索或者列表页检索"""
+        if self._exists(key) > 0 or self._exists(self._prefix + key) > 0:
+            return True
+        return False
+
+    def add(self, keys, *args, **kwargs):
+        """
+        添加数据
+        @param keys: 检查关键词在redis_cluster中是否存在,支持列表批量
+        @return: list / 单个值
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+        encrypt_keys = self.encrypt_datas(keys)
+
+        is_added = []
+        for key in encrypt_keys:
+            if not self.exists(key):
+                is_added.append(
+                    self.redis_cluster.set(self._prefix + key, 1, ex=self._ex))
+            else:
+                is_added.append(True)
+
+        return is_added if is_list else is_added[0]
+
+    def get(self, keys):
+        """
+        检查数据是否存在
+        @param keys: list / 单个值
+        @return: list / 单个值 (存在返回True 不存在返回False)
+        """
+        is_list = isinstance(keys, list)
+        keys = keys if is_list else [keys]
+        encrypt_keys = self.encrypt_datas(keys)
+
+        is_exist = []
+        for key in encrypt_keys:
+            is_exist.append(self.exists(key))
+
+        # 判断数据本身是否重复
+        temp_set = set()
+        for i, key in enumerate(encrypt_keys):
+            if key in temp_set:
+                is_exist[i] = True
+            else:
+                temp_set.add(key)
+
+        return is_exist if is_list else is_exist[0]

+ 0 - 0
FworkSpider/feapder/pipelines/swordfish/__init__.py


+ 46 - 0
FworkSpider/feapder/pipelines/swordfish/redis_pipeline.py

@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2021-04-18 14:12:21
+---------
+@summary: 导出数据(写入Redis,不直接保存在MongoDB)
+---------
+@author: 马国鹏
+@email:  305021384@qq.com
+"""
+from typing import Dict, List
+
+from feapder.db.redisdb import RedisDB
+from feapder.pipelines import BasePipeline
+from feapder.utils.tools import log
+
+
+class RedisPipeline(BasePipeline):
+    def __init__(self):
+        self._to_db = None
+
+    @property
+    def to_db(self):
+        if not self._to_db:
+            self._to_db = RedisDB()
+
+        return self._to_db
+
+    def save_items(self, table, items: List[Dict]) -> bool:
+        """
+        保存数据
+        Args:
+            table: 表名
+            items: 数据,[{},{},...]
+
+        Returns: 是否保存成功 True / False
+                 若False,不会将本批数据入到去重库,以便再次入库
+        """
+        try:
+            table_name = "savemongo:" + table
+            self.to_db.lpush(table=table_name, values=items)
+            datas_size = len(items)
+            log.info("共导出 %s 条数据到 %s" % (datas_size, table_name))
+            return True
+        except Exception as e:
+            log.exception(e)
+            return False

+ 1 - 57
FworkSpider/feapder/templates/air_spider_template.tmpl

@@ -16,21 +16,11 @@ from feapder.utils.tools import wechat_warning
 from untils.attachment import AttachmentDownloader
 import execjs
 from items.spider_item import DataBakItem, MgpListItem
-from feapder.db.mongodb import MongoDB
 from feapder.utils.log import log
 
 
 
-class Details(feapder.Spider):
-    _to_db = None
-    db_name = 'mgp_list'
-    send_list = []
-    # 定义mongo链接
-    @property
-    def to_db(self):
-        if not self._to_db:
-            self._to_db = MongoDB()
-        return self._to_db
+class Details(feapder.BiddingDetailSpider):
 
     def start_requests(self):
         while True:
@@ -123,51 +113,5 @@ class Details(feapder.Spider):
         yield list_item
 
 
-    def failed_request(self, request, response):
-        '''请求、解析次数超过上限后,将原信息重新保存至mongo,并修改failed字段'''
-        if response is None:
-            code = 0
-        else:
-            code = response.status_code
-        err_dic = {"200":"analysis","400":"download","500":"servers","300":"download"}
-        if 200<=code<300:
-            err = 'analysis'
-        elif 300<=code<400:
-            err = 'download'
-        elif 400<=code<500:
-            err = 'download'
-        elif 500<=code:
-            err = "servers"
-        else:
-            err = "timeout"
-        mgp = MgpListItem()
-        mgp.code=code
-        mgp.error=err
-        items = request.base_info
-        for key in items:
-            mgp.__setitem__(key,items[key])
-        mgp.failed +=1
-        if mgp.pri is None:
-            mgp.pri = 0
-
-        if mgp.pri > 5:
-            if mgp.failed in(10,30,50,100,200)or mgp.failed>200:
-                if self.send_list.count(mgp.item.get("site")) == mgp.pri - 5:
-                    '''
-                    根据爬虫优先级报警'''
-                    info= f'''`
-        您的爬虫出现超<font color="#FF0000">{mgp.failed}</font>次请求、解析失败的任务。
-        > **爬虫名称:** {mgp.item.get("site")}
-        > **栏目名称:** {mgp.item.get("channel")}
-        > **爬虫代码:** {mgp.item.get("spidercode")}
-        > **爬虫等级:** {mgp.pri}
-        > **所属管理人员:** {mgp.author}
-        请登录剑鱼爬虫管理平台查看详情。
-        `'''
-                    wechat_warning(info)
-                    self.send_list.append(mgp.item.get("site"))
-        yield mgp
-
-
 if __name__ == "__main__":
     Details(redis_key="{USER}:${spider_name}").start()

+ 175 - 0
FworkSpider/feapder/templates/njpc_detail_template.tmpl

@@ -0,0 +1,175 @@
+# -*- coding: utf-8 -*-
+"""
+Created on {DATE}
+---------
+@summary: 拟建爬虫通用快照页
+---------
+@author: njpc_feapder
+"""
+import sys
+# sys.path.append('C:/Users/topnet/Desktop/FworkSpider')
+sys.path.append('/app/spiders/sword_feapder/FworkSpider')
+import feapder
+import re
+import json
+import time,random
+from items.njpc_item import DataNjpcItem,NjpcListItem
+from untils.attachment import AttachmentDownloader as AD
+from untils.attachment_res import AttachmentDownloader as ADres
+from lxml.html import fromstring
+from feapder.utils.log import log
+
+
+redis_key = "njpc_details"
+
+
+# 拟建爬虫下载附件
+def njpc_get_files(html,file_type="",s_key="http",proxies=False):
+    root = fromstring(html)
+    file_info = root.xpath('//a[@href]')
+    if file_info:
+        attachments = {}
+        for info in file_info:
+            file_url = "".join(info.xpath('./@href'))
+            file_types = ['zip', 'docx', 'ftp', 'pdf', 'doc', 'rar', 'gzzb', 'jpg',
+                          'png', 'zbid', 'xls', 'xlsx', 'swp', 'dwg', 'wps']
+            file_name = "".join(info.xpath('./@title') or info.xpath('.//text()'))
+            if file_type.lower() == "res":
+                file_type_name = "content-disposition"
+                get_file_type = '''
+file_type = file_type_txt.split('.')[-1].replace('"','').replace("'","")
+file_types.append(file_type)
+'''
+                if s_key in file_url and file_name:
+                    file_name = file_name.strip()
+                    attachment = ADres().fetch_attachment(get_file_type=get_file_type,file_type_name=file_type_name,
+                                    proxies=proxies,file_name=file_name,download_url=file_url,enable_proxy=False,)
+                    attachments[str(len(attachments) + 1)] = attachment
+            else:
+                if file_type.lower() in file_types:
+                    file_tp = file_type
+                else:
+                    file_tp = file_url.split(".")[-1].lower()
+                    if file_tp not in file_types and file_name:
+                        file_tp = file_name.strip().split(".")[-1].lower()
+
+                if file_tp in file_types and s_key in file_url and file_name:
+                    file_name = file_name.strip()
+                    attachment = AD().fetch_attachment(
+                        file_name=file_name, file_type=file_tp, download_url=file_url,
+                        enable_proxy=False, proxies=proxies)
+                    attachments[str(len(attachments) + 1)] = attachment
+        return attachments
+
+
+# 过滤详情页无效数据
+def remover_htmldata(request,response,html):
+    extra_html_info = request.extra_html
+    if html and extra_html_info:
+        for extra_item in extra_html_info:
+            if re.search('^//.*', extra_item):
+                extra_html = response.xpath(extra_item).extract_first()
+            else:
+                extra_html = extra_item
+            if extra_html:
+                html = html.replace(extra_html, '')
+    return html
+
+
+class Details(feapder.PlanToBuildDetailSpider):
+
+    def start_requests(self):
+        data_lsit = self.to_db.find(self.db_name,{"parser_name":f"{redis_key}","failed":{"$eq":0}},limit=50)
+        for item in data_lsit:
+            log.debug(item)
+            request_params = item.get("request_params")
+            timeout = request_params.pop('timeout',10)
+            is_join_html = item.get("is_join_html")      # 正文是否根据xpath拼接
+            extra_html = item.get("extra_html")          # 过滤无效内容
+            title_xpath = item.get("title_xpath")        # 三级页标题
+            render = item.get("render") or False         # 是否开启浏览器
+            render_time = item.get("render_time") or 3   # 浏览器渲染时间
+            extra_activity = item.get("extra_activity")  # 额外的需求动作
+            file_params = item.get("file_params")        # 附件下载配置
+            if item.get("proxies"):
+                yield feapder.Request(url=item.get("parser_url"), item=item, deal_detail=item.get("deal_detail"),
+                                      is_join_html=is_join_html, extra_html=extra_html,title_xpath=title_xpath,
+                                      callback=item.get("parser"), render=render, render_time=render_time,
+                                      file_params=file_params,
+                                      extra_activity=extra_activity, timeout=timeout, **request_params)
+            else:
+                yield feapder.Request(url=item.get("parser_url"), item=item,deal_detail=item.get("deal_detail"),
+                                      is_join_html=is_join_html, extra_html=extra_html,title_xpath=title_xpath,
+                                      callback=item.get("parser"), render=render, render_time=render_time,
+                                      file_params=file_params,
+                                      extra_activity=extra_activity, proxies=False, timeout=timeout, **request_params)
+            self.to_db.delete(self.db_name, {"_id": item.get("_id")})
+
+    def detail_get(self,request,response):
+        items = request.item
+        data_item = DataNjpcItem()
+        for key in items:
+            if key not in ('failed','parser_name','parser_url','_id','code','proxies','deal_detail',
+                           'error','request_params','is_join_html','extra_html','parser','title_xpath',
+                           'render','render_time','extra_activity','file_params'):
+                data_item.__setitem__(key, items[key])
+
+        html = ''
+        for xpath in request.deal_detail:
+            htmls = response.xpath(xpath).extract_first()  # 标书详细内容
+            if request.is_join_html:
+                if htmls is not None:
+                    html += htmls
+            else:
+                if htmls is not None:
+                    html = htmls
+                    break
+
+        if request.title_xpath:
+            for sxpath in request.title_xpath:
+                title = response.xpath(sxpath).extract_first() # 三级页标题
+                if title:
+                    data_item.title = title.strip()
+                    if "..." in data_item.projectname or "…" in data_item.projectname:
+                        data_item.projectname = title.strip()
+                    break
+
+        try:
+            if request.extra_activity:
+                from untils.tools import njpc_fields_extract
+                exec(request.extra_activity)
+        except:
+            pass
+
+        data_item.contenthtml = remover_htmldata(request,response,html)
+
+        fp = request.file_params or {}
+        attachments = njpc_get_files(
+            html,
+            file_type=fp.get("file_type", ""),
+            s_key=fp.get("s_key", "http"),
+            proxies=fp.get("proxies", False)
+        )
+        if attachments:
+            data_item.projectinfo = {"attachments": attachments}
+
+        yield data_item
+
+
+    def detail_json(self,request,response):
+        items = request.item
+        data_item = DataNjpcItem()
+        for key in items:
+            if key not in ('failed','parser_name','parser_url','_id','code','proxies','deal_detail',
+               'error','request_params','is_join_html','extra_html','parser','title_xpath',
+               'render','render_time','extra_activity','file_params'):
+                data_item.__setitem__(key, items[key])
+
+        exec(request.deal_detail)
+
+        yield data_item
+
+
+if __name__ == '__main__':
+    Details(redis_key=f"njpc:details:{redis_key}").start()
+

+ 90 - 0
FworkSpider/feapder/templates/njpc_list_template.tmpl

@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+"""
+Created on {DATE}
+---------
+@summary: ${spider_name}
+---------
+@author: {USER}
+"""
+import sys
+sys.path.append('/app/spiders/sword_feapder/FworkSpider')
+import feapder
+from items.njpc_item import NjpcListItem
+from collections import namedtuple
+import time,random
+
+
+class Njpc_Feapder(feapder.PlanToBuildListSpider):
+
+    def start_callback(self):
+
+         self.site = ""
+
+         #               --- --- crawl_page 必须存在,且为纯数字(int) --- ---
+         Menu = namedtuple('Menu', ['channel', 'code', 'types', 'crawl_page'])
+
+         self.menus = [
+             Menu('${spider_name}抓取栏目', '${spider_name}爬虫code', "自定义参数", 1),
+             Menu('${spider_name}抓取栏目', '${spider_name}爬虫code', "自定义参数", 1),
+         ]
+
+         self.headers = {}
+
+    def start_requests(self):
+         start_url = ''
+         for menu in self.menus:
+             yield feapder.Request(url=start_url,item=menu._asdict(),page=1,real_page=0,proxies=False)
+
+    def download_midware(self, request):
+        page = request.page
+        request.headers = self.headers
+
+    def parse(self, request, response):
+        menu = request.item
+        info_list = response.xpath('')       # 数据结构为html
+        for info in info_list:
+            detail_href = info.xpath('').extract_first().strip()
+            projectname = info.xpath('').extract_first().strip()
+            publish_time = info.xpath('').extract_first().strip()
+
+            area = ""      # 省份
+            city = ""      # 城市
+            district = ""  # 区县
+
+            data_item = NjpcListItem()  # 存储数据的管道
+            data_item.unique_key = ("href", "publishtime") # 去重
+            data_item.channel = menu.get("channel")    # 最上方定义的抓取栏目 (编辑器定的)
+            data_item.spidercode = menu.get("code")    # 最上方定义的爬虫code(编辑器定的)
+            data_item.projectname = projectname        # 项目名称
+            data_item.publishtime = publish_time       # 发布时间
+
+            data_item.site = self.site
+            data_item.area = area or "全国"             # 城市默认:全国
+            data_item.city = city                      # 城市 默认为空
+            data_item.district = district              # 城市 默认为空
+            data_item.parser_name = "njpc_details"     # 调用的爬虫
+            data_item.parser_url = detail_href         # 详情页数据链接
+            data_item.href = detail_href               # 详情链接
+            data_item.request_params = {"headers":self.headers}
+            data_item.parser = "detail_get"            # 快照页爬虫调用的方法
+            data_item.deal_detail = ['//div[@class="***"]']  # 正文解析规则
+
+            # data_item.proxies = True               # 快照页是否开启代理
+            # data_item.is_join_html = True          # 正文是否根据xpath拼接
+            # data_item.extra_html = []              # 删除正文的无效数据(xpath列表 或 删除的内容)
+            # data_item.title_xpath = []             # 三级页标题 xpath列表
+            # data_item.file_params = {"file_type":"", "s_key":"http", "proxies":False}
+                                                     # 附件下载配置
+            # data_item.render = True                # 是否开启开启浏览器
+            # data_item.render_time = 3              # 渲染时间
+            # data_item.extra_activity = '''***'''   # 额外的需求动作(三引号内顶左边框写执行语句)
+            yield data_item
+
+        # 无限翻页
+        time.sleep(random.randint(2,5))
+        request = self.infinite_pages(request,response)
+        yield request
+
+
+if __name__ == "__main__":
+    Njpc_Feapder(redis_key="{USER}:${spider_name}").start()

+ 10 - 18
FworkSpider/feapder/templates/project_template/CHECK_DATA.md

@@ -15,7 +15,7 @@ from collections import namedtuple
 
 
 
-class ${spider_name}(feapder.Spider):
+class ${spider_name}(feapder.BiddingListSpider):
 
     def start_callback(self):
 
@@ -36,6 +36,9 @@ class ${spider_name}(feapder.Spider):
              start_url = ''
              yield feapder.Request(url=start_url,item=menu._asdict(),page=1,real_page=0,proxies=False)
 
+    def download_midware(self, request):
+        page = request.page
+        request.headers = self.headers
 
     def parse(self, request, response):
         real_count = 0
@@ -47,8 +50,9 @@ class ${spider_name}(feapder.Spider):
             title = info.xpath('').extract_first().strip()
             publish_time = info.xpath('').extract_first().strip()
 
-            area = ""   # 省份
-            city = ""   # 城市
+            area = ""      # 省份
+            city = ""      # 城市
+            district = ""  # 区县
 
             data_item = DataBakItem()                # 存储数据的管道
             data_item.href = href                    # 标书链接
@@ -59,19 +63,16 @@ class ${spider_name}(feapder.Spider):
             data_item.site = self.site
             data_item.area = area or "全国"           # 省份 默认:全国
             data_item.city = city                    # 城市 默认 为空
-
-            undedup_data = dedup.filter_exist_data([href])    # 去重
-            if undedup_data == []:
-                continue
+            data_item.district = district            # 区县 默认 为空
 
             list_item =  MgpListItem()
+            list_item.unique_key = ('href',)
             list_item.parse = "self.detail_get"      # 详情页回调方法
             list_item.parser_name = "details"        # 详情页标识 默认通用详情页
-            list_item.item = data_item.to_dict
+            list_item.item = data_item
             list_item.deal_detail = ['//div[@class="****"]']   # 抽取正文xpath
             list_item.proxies = False
             list_item.parse_url = href               # 详情页请求地址
-            list_item.pri = 1                        # 执行等级
 
             list_item.files={                        # 附件采集规则
                 "list_xpath":'//div[@class="***"]//a[@href]',
@@ -84,21 +85,12 @@ class ${spider_name}(feapder.Spider):
                 "host":'',                           # 需要拼接url的host
             }
 
-            dedup.add(href)
             yield list_item
-            real_count += 1
-
-
 
         # 无限翻页
-
         request = self.infinite_pages(request,response)
         yield request
 
-    def download_midware(self, request):
-        page = request.page
-        request.headers = self.headers
-
 
 if __name__ == "__main__":
     ${spider_name}(redis_key="{USER}:${spider_name}").start()

+ 48 - 0
FworkSpider/items/base_item.py

@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+
+import feapder.utils.tools as tools
+from feapder import Item
+
+
+class SwordFishProjectItem(Item):
+
+    def __init__(self):
+        super(SwordFishProjectItem, self).__init__(
+            save=True  # 是否入库,默认入库=True,不入库=False
+        )
+
+    @property
+    def save(self) -> bool:
+        return self.__dict__["save"]
+
+    @save.setter
+    def save(self, enable: bool):
+        self.__dict__["save"] = enable
+
+    @property
+    def fingerprint(self):
+        args = []
+        # 1、入库字段去重
+        for key, value in self.to_dict.items():
+            if value:
+                if (self.unique_key and key in self.unique_key) or not self.unique_key:
+                    args.append(str(value))
+
+        # 2、入库字段去重_招投标字段
+        bidding = self.to_dict.get("item", {})
+        for key, value in bidding.items():
+            if value:
+                if (self.unique_key and key in self.unique_key) or not self.unique_key:
+                    args.append(str(value))
+
+        # 3、非入库字段去重
+        if self.unique_key:
+            for key in filter(lambda x: x not in bidding and x not in self.to_dict, self.unique_key):
+                if key not in args:
+                    args.append(str(key))
+
+        if args:
+            args = sorted(args)
+            return tools.get_md5(*args)
+        else:
+            return None

+ 3 - 0
FworkSpider/requirements.txt

@@ -0,0 +1,3 @@
+feapder==1.6.9
+pymongo~=3.10.1
+redis==3.3.6

+ 303 - 0
FworkSpider/untils/attachment_res.py

@@ -0,0 +1,303 @@
+import hashlib
+import os
+import re
+import traceback
+import uuid
+from urllib.parse import urlparse, unquote
+
+import requests
+import urllib3
+
+from feapder.utils.log import log as logger
+from untils.aliyun import AliYunService
+from untils.execptions import AttachmentNullError
+from untils.proxy_pool import ProxyPool
+
+urllib3.disable_warnings()
+# 文件文档类型
+DOCTYPE = {
+    'txt', 'rtf', 'dps', 'et', 'ett', 'xls',
+    'xlsx', 'xlsb', 'xlsm', 'xlt', 'ods', 'pmd', 'pmdx',
+    'doc', 'docm', 'docx', 'dot', 'dotm', 'dotx',
+    'odt', 'wps', 'csv', 'xml', 'xps'
+}
+# 压缩类型
+COMPRESSION_TYPE = {
+    'rar', 'zip', 'gzzb', '7z', 'tar', 'gz', 'bz2', 'jar', 'iso', 'cab',
+    'arj', 'lzh', 'ace', 'uue', 'edxz',
+}
+# 图片类型
+IMAGE_TYPE = {
+    'jpg', 'png', 'jpeg', 'tiff', 'gif', 'psd', 'raw', 'eps', 'svg', 'bmp',
+    'pdf'
+}
+# 其他类型
+OTHER_TYPE = {
+    'swf', 'nxzf', 'xezf', 'nxcf'
+}
+
+
+headers = {
+    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36',
+    'Accept': '*/*'
+}
+
+
+def sha1(val):
+    _sha1 = hashlib.sha1()
+    if isinstance(val, bytes):
+        _sha1.update(str(val).encode("utf-8"))
+    elif isinstance(val, str):
+        _sha1.update(val.encode("utf-8"))
+    return _sha1.hexdigest()
+
+
+def remove(file_path: str):
+    try:
+        os.remove(file_path)
+    except FileNotFoundError:
+        pass
+
+
+def getsize(file):
+    try:
+        return os.path.getsize(file)
+    except FileNotFoundError:
+        return 0
+
+
+def discern_file_format(text, allow_show_waring=False):
+    """
+    识别文件格式
+
+    @param text: 识别文本
+    @param allow_show_waring: 是否打印警告信息
+    @return: 文件格式
+    """
+    file_types = {
+        *DOCTYPE,
+        *COMPRESSION_TYPE,
+        *IMAGE_TYPE,
+        *OTHER_TYPE
+    }
+    for file_type in file_types:
+        all_file_format = [file_type, file_type.upper()]
+        for t in all_file_format:
+            result = re.match(f'.*{t}$', text, re.S)
+            if result is not None:
+                return t
+    else:
+        unknown_type = re.findall('[^.\\/:*?"<>|\r\n]+$', text, re.S)
+        if allow_show_waring:
+            logger.warning(f'[未识别文件类型]{unknown_type}')
+        return None
+
+
+def extract_file_type(text):
+    if text is None:
+        return None
+    return discern_file_format(text)
+
+
+def extract_file_name_by_href(href: str, file_type: str):
+    """从url中抽取文件名称"""
+    # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
+    # 中文字符:[\u4e00 -\u9fa5]
+    zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+'
+    parser = urlparse(href)
+    query = (parser.query or parser.path)
+    result = re.search(f'.*\\.{file_type}', query, re.S)
+    if result is not None:
+        encode_str = unquote(result.group())
+        name = re.search(zh_char_pattern, encode_str)
+        if name is not None:
+            return unquote(name.group())
+    return None
+
+
+def extract_file_name(text):
+    file_type = discern_file_format(text)
+    if file_type is not None:
+        repl = '.{}'.format(file_type)
+        text = text.replace(repl, '')
+    return text
+
+
+def verify_file_name(name):
+    if extract_file_type(name) is None:
+        raise ValueError
+
+
+# 去除附件名空格、两个后缀
+def clean_file_name(file_name: str, file_type: str):
+    file_name = file_name.strip()
+    if file_type in file_name:
+        file_name = file_name.replace(f'.{file_type}', '')
+    return file_name
+
+
+# 限制附件大小:size < 5 kb 不存入数据库
+def limit_file_size(file_size: str):
+    _pattern = '^[0-9]\d*\.\d*|[1-9]\d*'
+    if "M" in file_size or "m" in file_size:
+        file_size = float("".join(re.findall(_pattern, file_size))) * 1000
+    else:
+        file_size = "".join(re.findall(_pattern, file_size))
+    if float(file_size) < 5:
+        return False
+    else:
+        return True
+
+
+# 判断附件地址是否正确
+def judge_file_url(file_url: str):
+    file_url = file_url.strip()
+    if " " in file_url:
+        file_url = file_url.split(" ")[0]
+    return file_url
+
+
+class AttachmentDownloader(AliYunService):
+
+    def __init__(self):
+        super(AttachmentDownloader, self).__init__()
+        self.dir_name = 'file'
+
+    def _create_file(self, filename, filetype):
+        os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
+        file = "{filename}.{filetype}".format(
+            filename=sha1("{}_{}".format(filename, uuid.uuid4())),
+            filetype=filetype
+        )
+        return "{}/{}".format(self.dir_name, file)
+
+    @staticmethod
+    def _create_fid(file_stream: bytes):
+        return sha1(file_stream)
+
+    @staticmethod
+    def _origin_filename(fid: str, filetype: str):
+        return "{}.{}".format(fid, filetype)
+
+    @staticmethod
+    def _file_size(file: str):
+        _kb = float(getsize(file)) / 1024
+        if _kb >= 1024:
+            _M = _kb / 1024
+            if _M >= 1024:
+                _G = _M / 1024
+                return "{:.1f} G".format(_G)
+            else:
+                return "{:.1f} M".format(_M)
+        else:
+            return "{:.1f} kb".format(_kb)
+
+    @staticmethod
+    def _fetch_attachment(
+            get_file_type:str,
+            file_type_name:str,
+            url: str,
+            enable_proxy=False,
+            proxy={},
+            allow_show_exception=False,
+            **kwargs
+    ):
+        request_params = {}
+        request_params.setdefault('headers', kwargs.get('headers') or headers)
+        request_params.setdefault('proxies', kwargs.get('proxies'))
+        request_params.setdefault('timeout', kwargs.get('timeout') or 60)
+        request_params.setdefault('stream', kwargs.get('stream') or True)
+        request_params.setdefault('verify', kwargs.get('verify') or False)
+        if enable_proxy:
+            proxy = ProxyPool()
+        else:
+            proxy = proxy
+        retries = 0
+        while retries < 3:
+            try:
+                with requests.get(url, **request_params) as req:
+                    if req.status_code == 200:
+                        stream = req.content
+                        '''
+                        file_type_name 响应头中附件后缀所对应的键
+                        get_file_type  取附件后缀的规则
+                        file_type_txt  附件响应头
+                        '''
+                        if len(get_file_type) > 10:
+                            file_types = []
+                            file_type_txt = req.headers.get(file_type_name)
+                            exec(get_file_type)
+                            if file_types:
+                                file_type = file_types[0]
+                            else:
+                                file_type = ''
+                            return stream,file_type
+                        else:
+                            return stream, get_file_type
+                    else:
+                        retries += 1
+            except requests.RequestException:
+                if allow_show_exception:
+                    traceback.print_exc()
+                if enable_proxy:
+                    request_params.update({'proxies': proxy.get()})
+                retries += 1
+        return b''
+
+    def fetch_attachment(
+            self,
+            get_file_type:str,
+            file_name: str,
+            file_type_name: str,
+            download_url: str,
+            enable_proxy=False,
+            allow_show_exception=False,
+            **kwargs
+    ):
+        if not file_name  or not download_url:
+            raise AttachmentNullError
+
+        file_stream = self._fetch_attachment(
+            get_file_type,
+            file_type_name,
+            download_url,
+            enable_proxy,
+            allow_show_exception=allow_show_exception,
+            **kwargs
+        )
+
+        if len(file_stream) == 2:
+            file_type = file_stream[-1]
+        else:
+            file_type = ''
+
+        file_name = clean_file_name(file_name,file_type)
+        download_url = judge_file_url(download_url)
+
+        local_tmp_file = self._create_file(file_name, file_type)
+        with open(local_tmp_file, 'wb') as f:
+            f.write(file_stream[0])
+
+        result = {
+            'filename': '{}.{}'.format(file_name, file_type),
+            'org_url': download_url
+        }
+        if len(file_stream[0]) > 0:
+            try:
+                fid = self._create_fid(file_stream[0])
+                key = self._origin_filename(fid, file_type)
+                result.setdefault('fid', key)
+                result.setdefault('ftype', file_type)
+                result.setdefault('size', self._file_size(local_tmp_file))
+                result.setdefault('url', 'oss')
+                super().push_oss_from_local(key, local_tmp_file)
+            except Exception as e:
+                logger.warning(
+                    "[{}]下载异常,原因:{}".format(file_name, e.__class__.__name__)
+                )
+        remove(local_tmp_file)
+        '''上传/下载,无论失败/成功必须返回附件信息'''
+        if "size" not in result or limit_file_size(result.get('size')):
+            return result
+        else:
+            return {}

+ 56 - 0
FworkSpider/untils/check_data.py

@@ -0,0 +1,56 @@
+import re
+
+
+class CheckData:
+    """检查数据文本内容"""
+    __bidding_title_set = {'招标', '流标', '评标', '询价', '中标候选人', '抽签',
+                           '谈判', '中选', '意见征询', '更正公告', '废标',
+                           '补遗', '议价', '邀请', '资格预审', '竞标', '变更',
+                           '遴选', '磋商', '项目', '评审', '询比', '开标',
+                           '澄清', '比选', '中止', '采购', '竟价', '招投标',
+                           '拟建', '成交', '中标', '竞争性谈判', '工程',
+                           '验收公告', '更正', '单一来源', '变更公告', '合同',
+                           '违规', '评判', '监理', '竞价', '答疑', '终止',
+                           '系统'}
+
+    __bidding_channel_set = {"通知公告", "公告公示"}
+
+    __plan_to_build_title_set = {'项目', '工程', '验收', '评价', '设计', '调查',
+                                 '审核', '审批', '批复', '批后', '批前', '核准',
+                                 '备案', '立项', '规划设计', '环评', }
+
+    __plan_to_build_channel_set = {"通知公告", "公示公告", "部门文件",
+                                   "发布公示", "公告信息",
+                                   "公示公开", "公开公示", "公示通知",
+                                   "公示信息", "公告信息", "公示专区",
+                                   "公告专区", "公司公告", "公司通知",
+                                   "公司新闻", "其他公示", "通知公示",
+                                   "最新公告", "最新公示", "最新资讯"}
+
+    @classmethod
+    def title(cls, name: str, group=None):
+        check_texts = cls.__bidding_title_set
+        if group and group.lower() in ["njpc", "plan_to_build"]:
+            check_texts = cls.__plan_to_build_title_set
+
+        for text in check_texts:
+            valid_text = re.search(text, name)
+            if valid_text is not None:
+                break
+        else:
+            return 10106, '标题未检索到采集关键词'
+        return 200, 'ok'
+
+    @classmethod
+    def channel(cls, name: str, group=None):
+        check_texts = cls.__bidding_channel_set
+        if group and group.lower() in ["njpc", "plan_to_build"]:
+            check_texts = cls.__plan_to_build_channel_set
+
+        for text in check_texts:
+            valid_text = re.search(text, name)
+            if valid_text is not None:
+                break
+        else:
+            return False
+        return True

+ 146 - 0
FworkSpider/untils/clean_html.py

@@ -0,0 +1,146 @@
+import re
+__all__ = ['cleaner']
+
+# 独立元素
+INDEPENDENT_TAGS = {
+    '<head>[\s\S]*?</head>': '',
+    '<html>|<html [^>]*>|</html>': '',
+    '<body>|<body [^>]*>|</body>': '',
+    '<meta[^<>]*>|<meta [^<>]*>|<meta[^<>]*>[\s\S]*?</meta>|</meta>': '',  # 元数据
+    '&(nbsp|e[mn]sp|thinsp|zwn?j|#13);': '',  # 空格
+    '\\xa0|\\u3000': '',  # 空格
+    '<!--[\s\S]*?-->': '',  # 注释
+    '<style[^<>]*>[\s\S]*?</style>': '',  # 样式
+    '<script[^<>]*>[\s\S]*?</script>': '',  # JavaScript
+    '<input>': '',  # 输入框
+    '<img[^>]*>': '<br>',  # 图片
+}
+# 行内元素
+INLINE_TAGS = {
+    '<a>|<a [^>]*>|</a>': '',  # 超链接
+    '<link>|<link [^>]*>|</link>': '',  # 超链接
+    '<span>|<span [^>]*>|</span>': '',  # span
+    '<label>|<label [^>]*>|</label>': '<br>',  # label
+    '<font>|<font [^>]*>|</font>': '',  # font
+}
+# 块级元素
+BLOCK_TAGS = {
+    '<div>\s*?</div>':'',
+    '<h[1-6][^>]*>|</h[1-6]>': '',  # 标题
+    '<p>|<p [^>]*>': '<br>',  # 段落
+    '</p>': '',  # 段落
+    '<div>|<div [^>]*>': '<br>',  # 分割 division
+    '</div>': '',  # 分割 division
+    '<o:p>|<o:p [^>]*>|</o:p>': ''  # OFFICE微软WORD段落
+}
+# 其他
+OTHER = {
+    '<?xml[^>]*>|<?xml [^>]*>|<?xml:.*?>': '',
+    '<epointform>': '',
+    '<!doctype html>|<!doctype html [^>]*>': '',
+    '【关闭】|关闭': '',
+    '【打印】|打印本页': '',
+    '【字体:[\s\S]*】': '',
+    '文章来源:[\u4e00-\u9fa5]+': '',
+    '浏览次数:.*[<]+': '',
+    '(责任编辑:.*?)': '',
+    '分享到[:]': '',
+
+}
+# 样式
+CSS_STYLE = {
+    'style="[\s\S]*?"|style ="[\s\S]*?"': '',
+    'bgcolor="[\s\S]*?"|bgcolor ="[\s\S]*?"': '',
+    'bordercolor="[\s\S]*?"|bordercolor ="[\s\S]*?"': '',
+    'class="[\s\S]*?"|class ="[\s\S]*?"': '',
+    'align="[\s\S]*?"|align ="[\s\S]*?"': '',
+    'cellpadding="(\d+)"|cellspacing="(\d+)"': '',
+
+}
+# 空白符
+BLANKS = {
+    '\n\s*\n': '\n',
+    '\s*\n\s*': '\n',
+    '[^\S\n]': ' ',
+    '\s+': ' ',
+}
+# css标签集合
+TAGS = {'table', 'tr', 'td', 'div', 'span', 'p'}
+# css属性集合
+ATTRS = {'id', 'class', 'style', 'width'}
+
+
+def _repair_tag():
+    """异常的标签组合,用来替换非标准页面的标签"""
+    _repairs = {}
+    for tag in TAGS:
+        for attr in ATTRS:
+            key = '{}{}'.format(tag, attr)
+            val = '{} {}'.format(tag, attr)
+            _repairs[key] = val
+    return _repairs
+
+
+def _escape_character(html):
+    """转义字符"""
+    html = html.replace('&lt;', '<')
+    html = html.replace('&gt;', '>')
+    html = html.replace('&quot;', '"')
+    html = html.replace('&amp;', '&')
+    # 不显示输入框边框
+    html = html.replace('<input', '<input style="border-color: transparent;"')
+    return html
+
+
+def _lowercase_tag(html):
+    """标签归一化处理(全部小写 + 标签修复)"""
+    tags = re.findall("<[^>]+>", html)
+    tag_sets = set(tags)
+
+    if len(tag_sets) > 10000:
+        from bs4 import BeautifulSoup
+        soup = BeautifulSoup(html, "lxml")
+        html = str(soup.body.next_element)
+    else:
+        for tag in tag_sets:
+            html = html.replace(tag, str(tag).lower())
+
+    repair_tags = _repair_tag()
+    for err, right in repair_tags.items():
+        html = html.replace(err, right)
+
+    return html
+
+
+def cleaner(html, special=None, completely=False):
+    """
+    数据清洗
+
+    :param html: 清洗的页面
+    :param special: 额外指定页面清洗规则
+    :param completely: 是否完全清洗页面
+    :return: 清洗后的页面源码
+    """
+    if special is None:
+        special = {}
+
+    OTHER.update(special)
+    remove_tags = {
+        **INDEPENDENT_TAGS,
+        **INLINE_TAGS,
+        **BLOCK_TAGS,
+        **OTHER,
+        **CSS_STYLE,
+        **BLANKS,
+    }
+    html = _lowercase_tag(html)
+    for tag, repl in remove_tags.items():
+        html = re.sub(tag, repl, html)
+
+    if completely:
+        html = re.sub(r'<canvas[^<>]*>[\s\S]*?</canvas>', '', html)  # 画布
+        html = re.sub(r'<iframe[^<>]*>[\s\S]*?</iframe>', '', html)  # 内框架
+        html = re.sub('<([^<>\u4e00-\u9fa5]|微软雅黑|宋体|仿宋)+>', '', html)
+
+    html = _escape_character(html)
+    return html