dzr 1 gadu atpakaļ
vecāks
revīzija
463057f971

+ 213 - 0
.gitignore

@@ -0,0 +1,213 @@
+### JetBrains template
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# Generated files
+.idea/**/contentModel.xml
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+# Gradle and Maven with auto-import
+# When using Gradle or Maven with auto-import, you should exclude module files,
+# since they will be recreated, and may cause churn.  Uncomment if using
+# auto-import.
+# .idea/artifacts
+# .idea/compiler.xml
+# .idea/jarRepositories.xml
+# .idea/modules.xml
+# .idea/*.iml
+# .idea/modules
+# *.iml
+# *.ipr
+
+# CMake
+cmake-build-*/
+
+# Mongo Explorer plugin
+.idea/**/mongoSettings.xml
+
+# File-based project format
+*.iws
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+# Editor-based Rest Client
+.idea/httpRequests
+
+# Android studio 3.1+ serialized cache file
+.idea/caches/build_file_checksums.ser
+
+### Python template
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+#   For a library or package, you might want to ignore these files since the code is
+#   intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+#   However, in case of collaboration, if having platform-specific dependencies or dependencies
+#   having no cross-platform support, pipenv may install dependencies that don't work, or not
+#   install all needed dependencies.
+#Pipfile.lock
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+

+ 44 - 0
Dockerfile

@@ -0,0 +1,44 @@
+# 拉取镜像
+FROM centos:centos7.9.2009
+
+# 配置容器时间
+RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone
+# 添加快捷命令
+RUN echo "alias ll='ls -hall'" >> ~/.bashrc && source ~/.bashrc
+
+# 更新yum源
+RUN curl -o /etc/yum.repos.d/CentOS7-Aliyun.repo http://mirrors.aliyun.com/repo/Centos-7.repo && curl -o /etc/yum.repos.d/epel-7-Aliyun.repo http://mirrors.aliyun.com/repo/epel-7.repo
+RUN yum clean all && yum makecache && yum -y update
+RUN yum install -y kde-l10n-Chinese
+
+# 设置系统编码
+ENV LANG=zh_CN.UTF-8
+# 设置vi编码(防止中文乱码)
+RUN grep -qxF 'set encoding=utf8' /etc/virc || echo 'set encoding=utf8' >> /etc/virc
+
+WORKDIR /opt
+# 安装 python3.8.10 gcc相关配置
+RUN yum --exclude=kernel* update -y && yum groupinstall -y 'Development Tools' && yum install -y gcc openssl-devel bzip2-devel libffi-devel mesa-libGL
+# python3.8.10下载与解压缩
+RUN curl -o python3.8.10.tgz https://mirrors.huaweicloud.com/python/3.8.10/Python-3.8.10.tgz && tar -zxvf python3.8.10.tgz
+# 编译 python3.8.10
+WORKDIR /opt/Python-3.8.10
+# 创建指定安装目录,设置编译安装目录
+RUN mkdir /usr/local/python38 & ./configure --prefix=/usr/local/python38
+# 编译安装
+RUN make -j 8 && make altinstall
+# 添加python3的软连接
+RUN rm -rf /usr/bin/python3 /usr/bin/pip3 && ln -s /usr/local/python38/bin/python3.8 /usr/bin/python3 && ln -s /usr/local/python38/bin/pip3.8 /usr/bin/pip3
+# 更换pip源&更新pip
+RUN pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple && pip3 install --upgrade pip
+# python3环境加入系统环境变量
+ENV PATH="/usr/local/python38/bin:$PATH"
+
+WORKDIR /app
+# 复制python环境依赖
+COPY requirements.txt requirements.txt
+# 安装python项目依赖
+RUN pip3 install -r requirements.txt
+
+# 指定工作目录
+WORKDIR /mnt

+ 15 - 0
README.md

@@ -0,0 +1,15 @@
+# pyspider - 通用采集服务
+
+#### 创建docker容器
+```shell
+   docker build -t pytools:latest .
+```
+
+#### 创建容器并启动
+    >>> docker-compose up -d
+
+#### 销毁容器并关闭
+    >>> docker-compose down
+
+#### 重启全部容器服务
+    >>> docker-compose restart

+ 89 - 0
base_server.py

@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-05-09 
+---------
+@summary:  管理服务基类
+---------
+@author: Dzr
+"""
+import common.utils as tools
+from common.databases import mongo_table, redis_client
+
+
+class BaseServer:
+
+    def __init__(self, server: str, label: str, db=None, table=None, **kwargs):
+        self.server = server
+
+        if db and table:
+            self.mgo_db_name = db
+            self.mgo_table_name = table
+            self.mgo_db = mongo_table(db, table)
+
+        self.task_fingerprint = f"{label}_task_fingerprint"
+        self.request_tasks = f"{label}_task_list"
+        self.redis_db = redis_client(cfg=kwargs.pop('redis_cfg', None))
+
+        self._unique_key = ('_id',)
+
+    @property
+    def unique_key(self):
+        return self._unique_key
+
+    @unique_key.setter
+    def unique_key(self, keys):
+        if isinstance(keys, (tuple, list)):
+            self._unique_key = keys
+        else:
+            self._unique_key = (keys,)
+
+    def fingerprint(self, **kwargs):
+        args = []
+        for key, value in kwargs.items():
+            if value:
+                if (self.unique_key and key in self.unique_key) or not self.unique_key:
+                    args.append(str(value))
+
+        if args:
+            args = sorted(args)
+            return tools.get_md5(*args)
+
+    def exists(self, fingerprint):
+        """判断元素是否存在redis集合"""
+        return self.redis_db.hexists(self.task_fingerprint, fingerprint)
+
+    def add(self, fingerprint, default=""):
+        """添加元素到redis集合"""
+        self.redis_db.hset(self.task_fingerprint, fingerprint, default)
+
+    def remove(self, fingerprint):
+        """从redis集合中移除元素"""
+        self.redis_db.hdel(self.task_fingerprint, fingerprint)
+
+    def rpush(self, task: str):
+        self.redis_db.rpush(self.request_tasks, task)
+
+    def lpop(self):
+        return self.redis_db.lpop(self.request_tasks)
+
+    @property
+    def task_total(self):
+        return self.redis_db.llen(self.request_tasks)
+
+    def upload_data_to_mongodb(self, data, bulk_size=100, **kwargs):
+        db = kwargs.pop('db', None) or self.mgo_db_name
+        table = kwargs.pop('table', None) or self.mgo_table_name
+
+        inserted_count = 0
+        caches = []
+        documents = data if isinstance(data, list) else [data]
+        for document in documents:
+            caches.append(document)
+            inserted_count += 1
+            if len(caches) % bulk_size == 0:
+                mongo_table(db, table).insert_many(caches)
+                caches.clear()
+
+        if len(caches) > 0:
+            mongo_table(db, table).insert_many(caches)
+        return inserted_count

+ 185 - 0
build_tools.py

@@ -0,0 +1,185 @@
+from flask import Flask, request, jsonify, abort
+from flask_httpauth import HTTPBasicAuth
+from werkzeug.security import generate_password_hash, check_password_hash
+
+from common.databases import mongo_table
+from common.log import logger
+from services import (
+    accountManagePool,
+    get_base_url,
+    socks5ProxyPool,
+    httpProxyPool,
+)
+
+'''以下模块以动态方式加载入全局变量,请勿删除'''
+try:
+    from services import zbytb
+    from services import ybw
+    # from services import nmpa
+    from services import site_monitor
+except ImportError as e:
+    print(f"缺少全局变量, 原因:{e.args}")
+
+app = Flask(__name__)
+
+'''认证方式'''
+auth = HTTPBasicAuth()
+'''chrome代理状态记录'''
+ChromeUser: dict = {}
+'''用户表'''
+Users = mongo_table('py_spider', 'spider_scheduler_auth')
+
+
+@auth.verify_password
+def verify_password(username, password):
+    item = Users.find_one({'username': username})
+    if item is not None:
+        user = {
+            item['username']: generate_password_hash(item['password'])
+        }
+        if username in user and check_password_hash(user.get(username), password):
+            return username
+    return None
+
+
+@app.route('/')
+@auth.login_required
+def index():
+    return '  Hello, {}!<br><br> <a href="{}">代理池使用情况</a>'.format(
+        auth.username(),
+        get_base_url() + '/crawl/proxy/query'
+    )
+
+
+@app.route('/proxy', methods=['GET'])
+def chrome_proxy_plugin():
+    global ChromeUser
+    client = request.args.get('clientid')
+    ip = request.remote_addr
+    if client is None:
+        return jsonify(data={})
+
+    if client not in ChromeUser:
+        ChromeUser.setdefault(client, {'chrome_use_proxy': True, 'ip': ip})
+    else:
+        config: dict = ChromeUser.get(client)
+        config.update({'chrome_use_proxy': True})
+        ChromeUser.update({client: config})
+
+    logger.info(f"ChromeUser: {ChromeUser}")
+    return jsonify(data=ChromeUser.get(client))
+
+
+@app.route('/proxy/test', methods=['GET'])
+def chrome_proxy_plugin_check():
+    global ChromeUser
+    client = request.args.get('clientid')
+    if client is None or client not in ChromeUser:
+        return 'false'
+    else:
+        config: dict = ChromeUser.get(client)
+        if config.get('chrome_use_proxy'):
+            config.update({'chrome_use_proxy': False})
+            ChromeUser.update({client: config})
+            return 'true'
+        else:
+            return 'false'
+
+
+@app.route('/proxy/user/show', methods=['GET'])
+@auth.login_required
+def show_chrome_proxy_plugin_user():
+    return jsonify(data=ChromeUser)
+
+
+@app.route('/crawl/proxy/<scheme>/fetch', methods=['GET'])
+@auth.login_required
+def get_proxy(scheme):
+    # logger.info(f'[访问ip]{request.remote_addr}, class:{scheduler_class_name}')
+    result = {}
+    try:
+        proxies = None
+        if scheme == 'http':
+            proxies = httpProxyPool.proxies()
+        elif scheme == 'socks5':
+            proxies = socks5ProxyPool.proxies()
+        else:
+            abort(404)
+        logger.info(f'[调用{scheme}代理]{proxies}')
+        if proxies is not None:
+            result.update(proxies)
+    except KeyError:
+        pass
+    return jsonify(data=result)
+
+
+@app.route('/crawl/proxy/query', methods=['GET'])
+@auth.login_required
+def show_proxy():
+    socks_pool = socks5ProxyPool.get_proxy_pool()
+    http_pool = httpProxyPool.get_proxy_pool()
+    pool = [*socks_pool, *http_pool]
+    return jsonify(data=pool)
+
+
+@app.route('/upload/data/<scheduler_class_name>/<table>', methods=['POST'])
+@auth.login_required
+def upload_data(scheduler_class_name, table):
+    data_json = request.json
+    logger.info(f"[接收数据]{data_json}")
+    try:
+        scheduler_class = globals()[scheduler_class_name]
+        scheduler_class.save_data(table, data_json)
+        return 'success'
+    except KeyError:
+        return 'failure'
+
+
+@app.route('/crawl/<scheduler_class_name>/task/fetch', methods=['GET'])
+def get_crawl_task(scheduler_class_name):
+    task = {}
+    try:
+        scheduler_class = globals()[scheduler_class_name]
+        result = scheduler_class.get_crawl_task()
+        if result is not None:
+            task = result
+    except KeyError:
+        pass
+    return jsonify(data=task)
+
+
+@app.route('/crawl/<scheduler_class_name>/task/total', methods=['GET'])
+def get_crawl_task_total(scheduler_class_name):
+    total = {'total': 0}
+    try:
+        scheduler_class = globals()[scheduler_class_name]
+        total.update({'total': scheduler_class.task_total})
+    except KeyError:
+        pass
+    return jsonify(data=total)
+
+
+@app.route('/competing_goods/account/fetch', methods=['GET'])
+@auth.login_required
+def competing_goods_account_lock():
+    req_ip = request.remote_addr
+    site = request.args.get('site')
+    crawl_type = request.args.get('crawl_type')
+    result = accountManagePool.lock_account(site, crawl_type, req_ip)
+    return jsonify(data=result)
+
+
+@app.route('/competing_goods/account/release', methods=['GET'])
+@auth.login_required
+def competing_goods_account_release():
+    req_ip = request.remote_addr
+    uid = request.args.get('uid')
+    crawl_type = request.args.get('crawl_type')
+    if uid in [None, '']:
+        abort(404)  # Unauthorized 未授权
+    res = accountManagePool.release_account(uid, crawl_type, req_ip)
+    return jsonify(data=res)
+
+
+if __name__ == '__main__':
+    app.run(host='0.0.0.0', port=1405, debug=True, use_reloader=False)

+ 0 - 0
common/__init__.py


+ 93 - 0
common/databases.py

@@ -0,0 +1,93 @@
+import bson
+import pymongo
+import redis
+from redis._compat import unicode, long, basestring
+from redis.connection import Encoder as RedisEncoder
+from redis.exceptions import DataError
+
+# import config.load as settings
+import setting as settings
+
+# ---------------------------------- mongo ----------------------------------
+MONGO_URI_CLIENTS = {}    # a dictionary hold all client with uri as key
+
+
+def mongo_client(cfg=None, host=None, port=None, fork=False, **kwargs):
+    if host is not None and port is not None:
+        uri = f'mongodb://{host}:{port}'
+    else:
+        _cfg = (cfg or settings.mongo_conf)
+        uri = f'mongodb://{_cfg["host"]}:{_cfg["port"]}'
+
+    if fork:
+        return pymongo.MongoClient(uri, **kwargs)
+    global MONGO_URI_CLIENTS
+    matched_client = MONGO_URI_CLIENTS.get(uri)
+    if matched_client is None:
+        new_client = pymongo.MongoClient(uri, **kwargs)
+        if new_client is not None:
+            MONGO_URI_CLIENTS[uri] = new_client
+        return new_client
+    return matched_client
+
+
+def mongo_database(name: str, **kw):
+    client = mongo_client(**kw)
+    return client.get_database(name)
+
+
+def mongo_table(db: str, name: str, **kw):
+    database = mongo_database(db, **kw)
+    return database.get_collection(name)
+
+
+def int2long(param: int):
+    """int 转换成 long """
+    return bson.int64.Int64(param)
+
+
+def object_id(_id: str):
+    return bson.objectid.ObjectId(_id)
+
+
+# ---------------------------------- redis ----------------------------------
+def redis_client(cfg=None):
+
+    class Encoder(RedisEncoder):
+
+        def encode(self, value):
+            "Return a bytestring or bytes-like representation of the value"
+            if isinstance(value, (bytes, memoryview)):
+                return value
+            # elif isinstance(value, bool):
+            #     # special case bool since it is a subclass of int
+            #     raise DataError(
+            #         "Invalid input of type: 'bool'. Convert to a "
+            #         "bytes, string, int or float first."
+            #     )
+            elif isinstance(value, float):
+                value = repr(value).encode()
+            elif isinstance(value, (int, long)):
+                # python 2 repr() on longs is '123L', so use str() instead
+                value = str(value).encode()
+            elif isinstance(value, (list, dict, tuple)):
+                value = unicode(value)
+            elif not isinstance(value, basestring):
+                # a value we don't know how to deal with. throw an error
+                typename = type(value).__name__
+                raise DataError(
+                    "Invalid input of type: '%s'. Convert to a "
+                    "bytes, string, int or float first." % typename
+                )
+            if isinstance(value, unicode):
+                value = value.encode(self.encoding, self.encoding_errors)
+            return value
+
+    redis.connection.Encoder = Encoder
+
+    if cfg is None:
+        cfg = settings.redis_conf
+
+    return redis.StrictRedis(host=cfg['host'], port=cfg['port'],
+                             password=cfg['pwd'], db=cfg['db'],
+                             decode_responses=True)

+ 14 - 0
common/log.py

@@ -0,0 +1,14 @@
+from pathlib import Path
+
+from loguru import logger
+
+_absolute = Path(__file__).absolute().parent.parent
+_log_path = (_absolute / 'logs/log_{time:YYYY-MM-DD}.log').resolve()
+logger.add(
+    _log_path,
+    format='{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}',
+    level='INFO',
+    rotation='00:00',
+    retention='1 week',
+    encoding='utf-8',
+)

+ 54 - 0
common/redis_lock.py

@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2019/9/30 22:03
+# @Author  : CRJ
+# @File    : utils.py
+# @Software: PyCharm
+# @Python3.6
+import math
+import time
+import uuid
+
+import redis
+
+
+# 获取锁(乐观锁)
+def acquire_lock_with_timeout(conn, lockname, acquire_timeout=5, lock_timeout=10):
+    # 128位随机标识符
+    identifier = str(uuid.uuid4())
+    lockname = 'lock:' + lockname
+    lock_timeout = int(math.ceil(lock_timeout))  # 确保传给exprie是整数
+
+    end = time.time() + acquire_timeout
+    while time.time() < end:
+        if conn.setnx(lockname, identifier):
+            conn.expire(lockname, lock_timeout)
+            return identifier
+        elif not conn.ttl(lockname):  # 为没有设置超时时间的锁设置超时时间
+            conn.expire(lockname, lock_timeout)
+
+        time.sleep(0.001)
+    return False
+
+
+# 释放锁(乐观锁)
+def release_lock(conn, lockname, identifier):
+    lockname = 'lock:' + lockname
+    with conn.pipeline(True) as pipe:
+        while True:
+            try:
+                pipe.watch(lockname)
+                value = pipe.get(lockname)
+                # 判断标志是否相同
+                if value is not None and value == identifier:
+                    pipe.multi()
+                    pipe.delete(lockname)
+                    pipe.execute()
+                    return True
+
+                # 不同则直接退出 return False
+                pipe.unwatch()
+                break
+            except redis.exceptions.WatchError:
+                pass
+
+        return False

+ 131 - 0
common/utils.py

@@ -0,0 +1,131 @@
+import datetime
+import hashlib
+import json
+import socket
+import time
+from datetime import date, timedelta
+from json import JSONEncoder
+
+from bson import ObjectId
+
+
+def delay(wait):
+    time.sleep(wait)
+
+
+def json_loads(s, **kwargs):
+    """
+    json 文档反序列化为 Python 对象
+
+    @param str s: json 字符串
+    @param kwargs: 字段和指定的特殊类型映射
+
+    For example:
+        > s = {'oid': '123123123'} # 把 oid 的 '123456' 转换成 ObjectId('123456')
+        > import bson
+        > result = json_loads(s, dzr_id=bson.ObjectId)
+    """
+    if not s:
+        return None
+
+    result = json.loads(s)
+    for key, impl in kwargs.items():
+        result[key] = impl(result[key])
+    return result
+
+
+def json_dumps(obj: dict):
+    """Python 对象序列化为 json 文档"""
+    class JsonEncoder(JSONEncoder):
+        def default(self, o):
+            if isinstance(o, ObjectId):
+                return str(o)
+            return super().default(o)
+
+    return json.dumps(obj, ensure_ascii=False, cls=JsonEncoder)
+
+
+def document2dict(items: dict):
+    """mongo 文档转成 python dict"""
+    return json_loads(json_dumps(items))
+
+
+def dict2document(items: dict, **kwargs):
+    """python dict 转成 mongo 文档"""
+    if len(kwargs) == 0:
+        kwargs['_id'] = ObjectId
+    return json_loads(json_dumps(items), **kwargs)
+
+
+def ts2dt(ts: float, fmt='%Y-%m-%d %H:%M:%S'):
+    """时间戳转换日期"""
+    return datetime.datetime.fromtimestamp(ts).strftime(fmt)
+
+
+def dt2ts(dt: str, fmt='%Y-%m-%d'):
+    """日期转换时间戳"""
+    # 转换成时间数组
+    time_array = time.strptime(dt, fmt)
+    # 转换成时间戳
+    timestamp = time.mktime(time_array)
+    return int(timestamp)
+
+
+def delta_t(days: int):
+    """时间差"""
+    return dt2ts((date.today() - timedelta(days=days)).strftime("%Y-%m-%d"))
+
+
+def now_date(fmt='%Y-%m-%d %H:%M:%S'):
+    return datetime.datetime.today().strftime(fmt)
+
+
+def now_ts():
+    """当前时间戳"""
+    return int(datetime.datetime.now().timestamp())
+
+
+def sha1(text: str):
+    """
+    十六进制数字字符串形式摘要值
+
+    @param text: 字符串文本
+    @return: 摘要值
+    """
+    _sha1 = hashlib.sha1()
+    _sha1.update(text.encode("utf-8"))
+    return _sha1.hexdigest()
+
+
+def get_md5(*args):
+    """
+    @summary: 获取唯一的32位md5
+    ---------
+    @param args: 参与联合去重的值
+    ---------
+    @result: 7c8684bcbdfcea6697650aa53d7b1405
+    """
+
+    m = hashlib.md5()
+    for arg in args:
+        m.update(str(arg).encode())
+
+    return m.hexdigest()
+
+
+def get_localhost_ip():
+    """
+    利用 UDP 协议来实现的,生成一个UDP包,把自己的 IP 放如到 UDP 协议头中,然后从UDP包中获取本机的IP。
+    这个方法并不会真实的向外部发包,所以用抓包工具是看不到的
+    :return:
+    """
+    s = None
+    try:
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        s.connect(("8.8.8.8", 80))
+        ip = s.getsockname()[0]
+    finally:
+        if s:
+            s.close()
+
+    return ip

+ 40 - 0
docker-compose.yml

@@ -0,0 +1,40 @@
+version: "3"
+services:
+  server:
+    container_name: pytools-main
+    image: pytools:latest
+    volumes: # 映射文件夹
+      - /mnt/pytools:/mnt
+    restart: always
+    privileged: true
+    shm_size: 4GB
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200k"
+        max-file: "10"
+    deploy:
+      resources:
+        reservations:
+          memory: 200M
+    command: 'python3 produce_task.py'
+
+  client:
+    container_name: pytools-server
+    image: pytools:latest
+    volumes: # 映射文件夹
+      - /mnt/pytools:/mnt
+    network_mode: "host"
+    restart: always
+    privileged: true
+    shm_size: 10GB
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "200k"
+        max-file: "10"
+    deploy:
+      resources:
+        reservations:
+          memory: 200M
+    command: 'python3 -m gunicorn -c gunicorn.conf.py build_tools:app'

+ 26 - 0
gunicorn.conf.py

@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-24 
+---------
+@summary:  gunicorn配置
+---------
+@author: Dzr
+"""
+import multiprocessing
+
+# 服务地址
+bind = '0.0.0.0:1405'
+# 代码更改时重新启动工作程序(适用于开发测试)
+reload = False
+# 启动工作进程数量
+workers = multiprocessing.cpu_count() * 2 + 1
+# 工作模式
+worker_class = 'gevent'
+# 启动工作线程数量(当worker指定为gevent或者evenlet类型时,线程变成基于Greentlet的task(伪线程),这时候线程数量threads参数是无效的)
+# threads = multiprocessing.cpu_count() * 2
+# 日志输出级别
+loglevel = 'info'
+# 访问记录到标准输出
+accesslog = '-'
+# 访问记录格式
+access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'

+ 38 - 0
produce_task.py

@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-24 
+---------
+@summary: 分布式任务管理服务
+---------
+@author: Dzr
+"""
+from common.log import logger
+from services import (
+    CompetitiveProductServer,
+    SiteMonitorServer,
+    ProxyPoolServer,
+    NMPAServer,
+)
+
+
+def create_server():
+    logger.info("开启服务")
+    services = [
+        # NMPAServer('国家药品监督管理局', 'py_theme', 'nmpa_c', 100),
+        CompetitiveProductServer('元博网', 'ybw', 'py_spider', 'ybw_list', 1000),
+        CompetitiveProductServer('中国招标与采购网', 'zbytb', 'py_spider', 'zbytb_list', 1000),
+        SiteMonitorServer('网站监控', 'monitor', 'py_spider', 'site_monitor'),
+        ProxyPoolServer('代理池', 'proxy', 'socks5'),
+        ProxyPoolServer('代理池', 'proxy', 'http')
+    ]
+    manage_services = []
+    for server in services:
+        server.start()
+        manage_services.append(server)
+
+    for server in manage_services:
+        server.join()
+
+
+if __name__ == '__main__':
+    create_server()

+ 13 - 0
requirements.txt

@@ -0,0 +1,13 @@
+Flask==2.0.2
+Flask-HTTPAuth==4.5.0
+loguru~=0.5.3
+pymongo~=3.12.0
+redis~=3.5.3
+requests==2.28.1
+pyyaml~=5.4.1
+werkzeug~=2.0.2
+tldextract>=3.3.1
+urllib3==1.26.13
+PySocks==1.7.1
+gevent==22.10.2
+gunicorn==20.1.0

+ 23 - 0
services/__init__.py

@@ -0,0 +1,23 @@
+from .account_pool import AccountManagePool
+from .competitive_product import (
+    CompetitiveProductClient,
+    CompetitiveProductServer
+)
+from .nmpa import NMPAClient, NMPAServer
+from .proxy import ProxyPoolClient, ProxyPoolServer, get_base_url
+from .site_monitor import SiteMonitorClient, SiteMonitorServer
+
+# 网站监控
+site_monitor = SiteMonitorClient('网站监控', 'monitor', 'py_spider', 'site_monitor')
+
+# 主题数据爬虫
+nmpa = NMPAClient('国家药品监督管理局', 'py_theme', 'nmpa_c')
+
+# 竞品管理
+ybw = CompetitiveProductClient('元博网', 'ybw', 'py_spider', 'ybw_list')
+zbytb = CompetitiveProductClient('中国招标与采购网', 'zbytb', 'py_spider', 'zbytb_list')
+accountManagePool = AccountManagePool('py_spider', 'match_account')
+
+# 代理池
+socks5ProxyPool = ProxyPoolClient('代理池', 'proxy', 'socks5')
+httpProxyPool = ProxyPoolClient('代理池', 'proxy', 'http')

+ 75 - 0
services/account_pool.py

@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-24
+---------
+@summary:  竞品账号管理
+---------
+@author: Dzr
+"""
+from common.databases import mongo_table, object_id
+from common.log import logger
+from common.utils import now_date
+
+
+class AccountManagePool:
+
+    def __init__(self, db: str, table: str):
+        self.account_tab = mongo_table(db, table)
+        self.sleep_interval = 300
+        self.output_log = True
+
+        list_attr = dict(
+            name='crawl_list',
+            lock=dict(crawl_list=True),
+            release=dict(crawl_list=False),
+        )
+        detail_attr = dict(
+            name='crawl_detail',
+            lock=dict(crawl_detail=True),
+            release=dict(crawl_detail=False),
+        )
+        self._schedule = {'list': list_attr, 'detail': detail_attr}
+
+    def _update_account(self, uid, **kwargs):
+        kwargs['update_time'] = now_date()
+        self.account_tab.update_one({'_id': uid}, {'$set': kwargs})
+
+    def _find_account(self, query, *args, **kwargs):
+        return self.account_tab.find_one(query, *args, **kwargs)
+
+    def _find_and_update_account(self, query, update, *args, **kwargs):
+        update['$set']['update_time'] = now_date()
+        return self.account_tab.find_one_and_update(query, update, *args, **kwargs)
+
+    def release_account(self, uid, crawl_type, ip):
+        if isinstance(uid, str):
+            uid = object_id(uid)
+
+        query = dict(_id=uid)
+        account = self._find_account(query)
+        application = self._schedule[crawl_type]['name']
+        response = {'status': 'duplicate'}
+        if account is not None and account[application]:
+            release = self._schedule[crawl_type]['release']
+            self._update_account(uid, **release)
+            logger.info(f'[账号池]释放账号:{uid},用途:{crawl_type}, {ip}')
+            response = {'status': 'ok'}
+        return response
+
+    def lock_account(self, site, crawl_type, ip):
+        release = self._schedule[crawl_type]['release']
+        query = dict(site=site, **release)
+        lock = self._schedule[crawl_type]['lock']
+        update = {'$set': lock}
+        sort = [('update_time', 1)]
+        account = self._find_and_update_account(query, update, sort=sort)
+        if account is not None:
+            _account = {}
+            for key, val in account.items():
+                if key == '_id':
+                    _account[key] = str(val)
+                else:
+                    _account[key] = val
+            account = _account
+            logger.info(f'[账号池]锁定账号:{account["_id"]},用途:{crawl_type},{ip}')
+        return account

+ 106 - 0
services/competitive_product.py

@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-24
+---------
+@summary:  竞品任务管理
+---------
+@author: Dzr
+"""
+import threading
+
+from base_server import BaseServer, tools
+from common.log import logger
+
+
+class CompetitiveProductServer(BaseServer, threading.Thread):
+
+    def __init__(self, site: str, redis_label: str, db: str, table: str, maxsize=None):
+        threading.Thread.__init__(self)
+        super(CompetitiveProductServer, self).__init__(site, redis_label, db, table)
+        self.label = f'{self.server}_{self.getName()}'
+
+        self.maxsize = maxsize or 10
+        self.sleep_interval = 300
+        self.output_log = True
+        self.push_interval = 12 * 3600  # 采集任务的延迟间隔
+        self.now_date = tools.now_date('%Y-%m-%d')  # 启动日期
+
+        self._mutex = threading.Lock()
+
+    def put_latest_tasks(self):
+        self.put_tasks(1)  # 首次查询一次昨天数据
+
+    def put_tasks(self, day):
+        is_add = True
+        query = {'l_np_publishtime': tools.delta_t(day)}
+        with self.mgo_db.find(query) as cursor:
+            for item in cursor:
+                if self.task_total == self.maxsize:
+                    is_add = False
+                    break
+                else:
+                    if all([
+                        item['count'] == 0,
+                        item['crawl'] is False,
+                        item.get('crawl_status') is None,
+                        tools.now_ts() - item['comeintime'] >= self.push_interval
+                    ]):
+                        fingerprint = self.fingerprint(**item)
+                        if not self.exists(fingerprint):
+                            task_str = tools.json_dumps(item)
+                            self.rpush(task_str)
+                            self.add(fingerprint)
+        return is_add
+
+    def task_pool(self):
+        with self._mutex:
+            days = 2
+            while True:
+                self.put_latest_tasks()
+                add_task = self.put_tasks(days)
+                if not add_task:
+                    self.output_log = True
+                    break
+                elif days > 3:
+                    # 3天内每日剩余数据已全部下载
+                    logger.info(f'[{self.label}]暂无采集任务')
+                    break
+                else:
+                    days += 1
+
+        if self.output_log:
+            logger.info(f'[{self.label}]待采集任务 {self.task_total} 条')
+            self.output_log = False
+
+    def flush(self):
+        now_date = tools.now_date('%Y-%m-%d')
+        if self.now_date != now_date:
+            tables = [self.request_tasks, self.task_fingerprint]
+            for table in tables:
+                self.redis_db.delete(table)
+            self.now_date = now_date
+            logger.info(f"[{self.label}]重置任务")
+
+    def run(self):
+        logger.info(f'[{self.label}]开始生产任务')
+        while True:
+            self.flush()
+            if self.task_total < 3:
+                self.task_pool()
+            tools.delay(self.sleep_interval)
+
+
+class CompetitiveProductClient(BaseServer):
+
+    def __init__(self, site: str, redis_label: str, db: str, table: str):
+        super(CompetitiveProductClient, self).__init__(site, redis_label, db, table)
+
+    def get_crawl_task(self):
+        task = tools.json_loads(self.lpop(), _id=tools.ObjectId)
+        if task is not None:
+            self.remove(self.fingerprint(**task))
+            task = tools.document2dict(task)
+        return task
+
+    def save_data(self, table, documents):
+        pass

+ 85 - 0
services/nmpa.py

@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-04-24
+---------
+@summary:  药监局任务管理
+---------
+@author: Dzr
+"""
+import threading
+
+from base_server import BaseServer, tools
+from common.log import logger
+
+redis_label = "NMPA"  # redis任务标签
+
+
+class NMPAServer(BaseServer, threading.Thread):
+
+    def __init__(self, site: str, db: str, table: str, maxsize=None):
+        threading.Thread.__init__(self)
+        super(NMPAServer, self).__init__(site, redis_label, db, table)
+        self.label = f'{self.server}_{self.getName()}'
+
+        self.maxsize = maxsize or 10
+        self.sleep_interval = 5
+        self.output_log = True
+
+    def task_pool(self):
+        query = {"status": False, "crawl_status": False}
+        with self.mgo_db.find(query) as cursor:
+            self.output_log = True
+            for item in cursor:
+                if "page" not in item:
+                    item['page'] = 1
+
+                if self.task_total >= self.maxsize:
+                    break
+
+                fingerprint = self.fingerprint(**item)
+                if not self.exists(fingerprint):
+                    task_str = tools.json_dumps(item)
+                    self.rpush(task_str)
+                    self.add(fingerprint)
+
+        if self.output_log:
+            logger.info(f'[{self.label}]待采集任务 {self.task_total} 条')
+            self.output_log = False
+
+    def run(self):
+        logger.info(f'[{self.label}]开始生产任务')
+        while True:
+            if self.task_total < 3:
+                self.task_pool()
+            tools.delay(self.sleep_interval)
+
+
+class NMPAClient(BaseServer):
+
+    def __init__(self, site: str, db: str, table: str):
+        super(NMPAClient, self).__init__(site, redis_label, db, table)
+
+    def get_crawl_task(self):
+        task = tools.json_loads(self.lpop(), _id=tools.ObjectId)
+        if task is not None:
+            self.remove(self.fingerprint(**task))
+            self.record_history(task)
+            task = tools.document2dict(task)
+        return task
+
+    def record_history(self, task: dict):
+        items: dict = self.mgo_db.find_one(task['_id'])
+        record: list = items.get('crawl_record', list())
+        record.append(task['page'])
+        self.mgo_db.update_one(
+            {'_id': task['_id']},
+            {'$set': {'crawl_record': record}},
+            upsert=True
+        )
+
+    def save_data(self, table, documents):
+        try:
+            inserted_total = self.upload_data_to_mongodb(documents, table=table)
+            logger.info(f"[{self.server}]采集成功 {inserted_total} 条 --列表页")
+        except TypeError:
+            pass

+ 274 - 0
services/proxy.py

@@ -0,0 +1,274 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-05-11
+---------
+@summary:  代理池
+---------
+@author: Dzr
+"""
+import ast
+import multiprocessing
+import random
+import threading
+from collections import deque
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from operator import itemgetter
+from urllib.parse import urlparse
+
+import requests
+
+import setting as settings
+from base_server import BaseServer, tools
+from common.log import logger
+from common.redis_lock import acquire_lock_with_timeout, release_lock
+
+DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
+
+
+def decrypt(input_str: str) -> str:
+    """
+    定义base64解密函数
+
+    :param input_str:
+    :return:
+    """
+    # 对前面不是“=”的字节取索引,然后转换为2进制
+    key = settings.jy_proxy['socks5']['decrypt']
+    ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
+    output_str = ''
+    # 补齐“=”的个数
+    equal_num = input_str.count('=')
+    while ascii_list:
+        temp_list = ascii_list[:4]
+        # 转换成2进制字符串
+        temp_str = ''.join(temp_list)
+        # 对没有8位2进制的字符串补够8位2进制
+        if len(temp_str) % 8 != 0:
+            temp_str = temp_str[0:-1 * equal_num * 2]
+        # 4个6字节的二进制  转换  为三个8字节的二进制
+        temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
+        # 二进制转为10进制
+        temp_str_list = [int(x, 2) for x in temp_str_list if x]
+        # 连接成字符串
+        output_str += ''.join([chr(x) for x in temp_str_list])
+        ascii_list = ascii_list[4:]
+    return output_str
+
+
+def get_base_url():
+    return settings.jy_proxy['socks5']['base_url']
+
+
+def get_netloc(proxy, default=None):
+    proxies = None
+    if isinstance(proxy, dict):
+        proxies = proxy.get('proxies')
+        if isinstance(proxies, str):
+            proxies = tools.json_loads(proxies)
+    # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None
+    if proxies is not None:
+        parser = urlparse(proxies.get('http'))
+        default = parser.netloc
+    return default
+
+
+class BaseProxyPool(BaseServer):
+
+    def __init__(self, name, redis_label, scheme):
+        super(BaseProxyPool, self).__init__(server=name, label=redis_label)
+        self.scheme = scheme.lower()
+        self.proxy_name = self.scheme + self.server
+        self.proxy_queue = f'{redis_label}_{self.scheme}'
+        self.unique_key = ('ip', 'port')  # 组合 proxy 指纹的字段名称
+
+    def get_redis_name(self, proxy):
+        return f"{self.proxy_queue}_{proxy['fingerprint']}"
+
+    def get_redis_name_lst(self, pattern='*'):
+        return self.redis_db.keys(self.proxy_queue + pattern)
+
+    def get_proxy(self, name):
+        items = self.redis_db.hgetall(name)
+        if items is None or 'proxies' not in items:
+            return None
+
+        proxy = {
+            'proxies': ast.literal_eval(items['proxies']),
+            'fingerprint': items['fingerprint'],
+            'start_time': int(items['start_time']),
+            'end_time': int(items['end_time']),
+            'last_time': int(items['last_time']),
+            'usage': int(items['usage']),
+        }
+        return proxy
+
+    def get(self, name, key):
+        return self.redis_db.hget(name, key)
+
+    def exists(self, proxy):
+        return self.redis_db.exists(self.get_redis_name(proxy))
+
+    def check(self, proxy):
+        is_ok = False
+        url = 'https://myip.ipip.net'
+        netloc = get_netloc(proxy)
+        try:
+            requests_param = {
+                "headers": {'User-Agent': DEFAULT_UA},
+                "proxies": proxy['proxies'],
+                "timeout": 5
+            }
+            requests.get(url, **requests_param)
+            is_ok = True
+        except requests.RequestException:
+            pass
+        msg = "正常" if is_ok else "失效"
+        logger.debug(f"[{self.proxy_name}]检查代理Ip - {netloc} --通信{msg}")
+        return proxy, is_ok
+
+    def remove_proxy(self, proxy):
+        netloc = get_netloc(proxy)
+        logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --删除")
+        if self.exists(proxy):
+            redis_name = self.get_redis_name(proxy)
+            self.redis_db.delete(redis_name)
+
+    def add_proxy(self, proxy):
+        netloc = get_netloc(proxy)
+        logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --添加")
+        if not self.exists(proxy):
+            redis_name = self.get_redis_name(proxy)
+            self.redis_db.hset(redis_name, None, None, mapping=proxy)
+            expire_ts = proxy['end_time'] - tools.now_ts()
+            self.redis_db.expire(redis_name, expire_ts)
+
+
+class ProxyPoolServer(BaseProxyPool, threading.Thread):
+
+    def __init__(self, name, redis_label, scheme: str):
+        threading.Thread.__init__(self)
+        super(ProxyPoolServer, self).__init__(name, redis_label, scheme)
+
+        self.label = f'{self.proxy_name}_{self.getName()}'
+        self.ports = ['8862', '8863'] if self.scheme == "http" else ['8860', '8861']
+        self.load_interval = 60  # 轮询访问vps代理服务的时间间隔
+
+    def remove_failure_proxy(self, proxy_lst):
+        """删除失效/故障代理ip"""
+        logger.info(f"[{self.label}]清除无效代理Ip")
+        proxy_fingerprints = set([proxy['fingerprint'] for proxy in proxy_lst])
+        for redis_name in self.get_redis_name_lst():
+            fingerprint = self.get(redis_name, 'fingerprint')
+            if fingerprint not in proxy_fingerprints:
+                self.redis_db.delete(redis_name)
+
+    def request_proxy(self):
+        logger.info(f"[{self.label}]请求vps服务")
+        proxy_lst = []
+        try:
+            url = settings.jy_proxy['socks5']['url']
+            response = requests.get(url, timeout=10)
+            for item in response.json():
+                ports = list(filter(lambda p: p in self.ports, item['ports']))
+                if not ports:
+                    continue
+
+                ip = decrypt(item['ip'])
+                port = int(ports[random.randint(0, len(ports) - 1)])
+                start_time = tools.now_ts()
+                end_time = item['lifetime']
+                if end_time - start_time > 0:
+                    proxy = {
+                        'proxies': {
+                            'http': '{}://{}:{}'.format(self.scheme, ip, port),
+                            'https': '{}://{}:{}'.format(self.scheme, ip, port)
+                        },
+                        'fingerprint': self.fingerprint(ip=ip, port=port),
+                        'start_time': start_time,
+                        'end_time': end_time,
+                        'last_time': 0,
+                        'usage': 0,
+                    }
+                    proxy_lst.append(proxy)
+
+        except Exception as e:
+            logger.error(f"[{self.label}]vps服务访问异常,原因:{e.args}")
+        return proxy_lst
+
+    def manage_proxy(self, proxy_lst: list, workers=1):
+        self.remove_failure_proxy(proxy_lst)
+        with ThreadPoolExecutor(max_workers=workers) as Executor:
+            fs = [Executor.submit(self.check, proxy) for proxy in proxy_lst]
+            for f in as_completed(fs):
+                proxy, is_ok = f.result()
+                if is_ok:
+                    self.add_proxy(proxy)
+                else:
+                    self.remove_proxy(proxy)
+
+    def run(self):
+        logger.info(f'[{self.label}]开始生产代理Ip')
+        while True:
+            proxy_lst = self.request_proxy()
+            if not proxy_lst:
+                tools.delay(2)
+                continue
+            dynamic_workers = min((int(len(proxy_lst) / 2) or 1), 10)
+            self.manage_proxy(proxy_lst, workers=dynamic_workers)  # 线程池上限10
+            tools.delay(self.load_interval)
+
+
+class ProxyPoolClient(BaseProxyPool):
+
+    def __init__(self, name: str, redis_label: str, scheme: str):
+        super(ProxyPoolClient, self).__init__(name, redis_label, scheme)
+        current_process = multiprocessing.current_process()
+        sub_label = f'{tools.get_localhost_ip()}:{current_process.pid}'
+        self.lock_label = f'{redis_label}:{sub_label}'
+
+    @property
+    def proxy_total(self):
+        return len(self.get_redis_name_lst())
+
+    def get_all_proxy(self):
+        proxy_lst = deque([])
+        for redis_name in self.get_redis_name_lst():
+            proxy = self.get_proxy(redis_name)
+            if isinstance(proxy, dict):
+                proxy_lst.append(proxy)
+
+        if len(proxy_lst) > 0:
+            '''按照使用次数大小从低到高(左小右大)排序'''
+            proxy_lst = deque(sorted(proxy_lst, key=itemgetter('usage')))
+
+        return proxy_lst
+
+    def get_proxy_pool(self):
+        _pool_proxy = []
+        for proxy in self.get_all_proxy():
+            last_time = proxy['last_time']
+            end_time = proxy['end_time']
+            expire = end_time - tools.now_ts()
+            _pool_proxy.append({
+                'proxies': proxy['proxies'],
+                'start_time': tools.ts2dt(proxy['start_time']),
+                'end_time': tools.ts2dt(end_time),
+                'last_time': tools.ts2dt(last_time) if last_time != 0 else '',
+                'expire': expire,
+                'usage': proxy['usage'],
+            })
+        # 展示时按照过期时间从大到小排列
+        return list(sorted(_pool_proxy, key=lambda x: x['expire'], reverse=True))
+
+    def proxies(self):
+        lock = acquire_lock_with_timeout(self.redis_db, self.lock_label)
+        if lock:
+            proxy = {}
+            if self.proxy_total > 0:
+                proxy_lst = self.get_all_proxy()
+                proxy = proxy_lst.popleft()
+                name = self.get_redis_name(proxy)
+                self.redis_db.hset(name, 'usage', proxy['usage'] + 1)
+                self.redis_db.hset(name, 'last_time', tools.now_ts())
+            release_lock(self.redis_db, self.lock_label, lock)
+            return proxy.get('proxies')

+ 214 - 0
services/site_monitor.py

@@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-05-09
+---------
+@summary:  原网站监控
+---------
+@author: Dzr
+"""
+import multiprocessing
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+import tldextract
+from urllib3 import get_host
+
+import setting as settings
+from base_server import BaseServer, tools, mongo_table
+from common.log import logger
+from common.redis_lock import acquire_lock_with_timeout, release_lock
+
+
+def extract_domain(url):
+    """返回完全限定的域名"""
+    ext = tldextract.extract(url)
+    return ext.fqdn or ext.ipv4
+
+
+def extract_host(url):
+    """
+
+    # >>> base_url = extract_host('http://192.168.3.207:8080/')
+    """
+    _s, _h, _p = get_host(url)
+    return f"{_s}://{_h}/" if _p is None else f"{_s}://{_h}:{_p}/"
+
+
+class SiteMonitorServer(BaseServer, threading.Thread):
+
+    def __init__(self, site: str, redis_label: str, db: str, table: str):
+        threading.Thread.__init__(self)
+        super(SiteMonitorServer, self).__init__(site, redis_label, db, table,
+                                                redis_cfg=settings.redis2_conf)
+        self.label = f'{self.server}_{self.getName()}'
+
+        self.sleep_interval = 3600 * 4
+        self.expire_time = 3600 * 24 * 15
+        self.output_log = True
+
+    def get_monitor_lst(self, conditions=None, projection=None):
+        results = []
+        table = mongo_table('editor', 'luaconfig')
+        with table.find(conditions, projection=projection) as cursor:
+            for items in cursor:
+                _id = items['_id']
+                fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
+                results.append({'fingerprint': fingerprint, 'document': items})
+        yield from results
+
+    def get_monitor_task(self, conditions, projection=None):
+        results = []
+        with self.mgo_db.find(conditions, projection=projection) as cursor:
+            for item in cursor:
+                _id = item["luaconfig_id"]
+                fingerprint = f'monitor_{self.fingerprint(_id=_id)}'
+                results.append({'fingerprint': fingerprint, 'document': item})
+        yield from results
+
+    def process_refresh(self, monitor, count, **kwargs):
+        fingerprint = monitor['fingerprint']
+        if self.redis_db.exists(fingerprint):
+            luaconfig_id = monitor['document']['_id']
+            self.mgo_db.delete_many({'luaconfig_id': luaconfig_id})
+            self.redis_db.delete(fingerprint)
+            count += 1
+        return count
+
+    def process_monitor_item(self, monitor, count, **kwargs):
+        document = monitor['document']
+        _id = document['_id']
+        domain = extract_domain(document['href'])
+        href = document['param_common'][11]
+        is_subdomain = (extract_domain(href) == domain)
+        fingerprint = monitor['fingerprint']
+        if not self.redis_db.exists(fingerprint):
+            data = {
+                "luaconfig_id": _id,
+                "site": document["site"],
+                "channel": document["channel"],
+                "spidercode": document["code"],
+                "platform": document["platform"],
+                "state": document["state"],
+                "domain": domain,
+                "host": extract_host(href),
+                "is_subdomain": is_subdomain,
+                "url": href,
+                "tags_count": 0,  # 栏目页面标签数量
+                "tags_count_diff": 0,  # 栏目页面标签计数差额
+                "tags_count_diff_lst": [],  # 栏目页面标签计数差额记录表
+                "channel_ischange": False,  # 栏目页面是否改版
+                "status_code": -1,  # 栏目访问状态码
+                "visit_count": 0,  # 栏目日访问次数
+                "failure_count": 0,  # 栏目日访问失败次数
+                "create_at": tools.now_ts(),
+                "update_at": tools.now_ts()
+            }
+            self.mgo_db.find_one_and_update(
+                {'luaconfig_id': _id},
+                {'$set': data},
+                upsert=True
+            )
+            count += 1
+        self.redis_db.setex(fingerprint, self.expire_time, str(_id))
+        return count
+
+    def process_monitor_task(self, task, **kwargs):
+        fingerprint = task['fingerprint']
+        if self.redis_db.exists(fingerprint):
+            document = task['document']
+            task_str = tools.json_dumps(document)
+            self.rpush(task_str)
+
+    def refresh_monitor(self, thread_num=1):
+        logger.info(f"[{self.label}]刷新监控任务")
+        q = {'state': {'$in': [0, 4, 6, 7, 10]}}
+        projection = {'_id': 1}
+        monitor_lst = self.get_monitor_lst(q, projection)
+        process_func = self.process_refresh
+        result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
+        logger.info(f"[{self.label}]无效监控任务清除 {result} 条")
+
+    def create_monitor_item(self, thread_num=1):
+        logger.info(f"[{self.label}]扫描爬虫任务表")
+        q = {'state': {'$nin': [0, 4, 6, 7, 10]}}
+        projection = {
+            'param_common': 1,
+            'channel': 1,
+            'site': 1,
+            'href': 1,
+            'code': 1,
+            'platform': 1,
+            'state': 1
+        }
+        monitor_lst = self.get_monitor_lst(q, projection)
+        process_func = self.process_monitor_item
+        result = self.multi_thread_worker(monitor_lst, process_func, thread_num)
+        logger.info(f"[{self.label}]新增监控爬虫 {result} 条")
+
+    def create_monitor_task(self, thread_num=1):
+        logger.info(f"[{self.label}]创建监控任务")
+        query = {'is_subdomain': True, 'channel_ischange': False}
+        projection = {
+            'luaconfig_id': 1,
+            'domain': 1,
+            'host': 1,
+            'url': 1,
+            'tags_count': 1,  # 栏目页面标签数量
+            'tags_count_diff': 1,  # 栏目页面标签计数差额
+            'tags_count_diff_lst': 1,  # 栏目页面标签计数差额记录表
+            'channel_ischange': 1,  # 栏目页面是否改版
+            'status_code': 1,  # 栏目访问状态码
+            'visit_count': 1,  # 栏目日访问次数
+            'failure_count': 1,  # 栏目失败访问次数
+            'update_at': 1
+        }
+        task_lst = self.get_monitor_task(query, projection)
+        process_func = self.process_monitor_task
+        self.multi_thread_worker(task_lst, process_func, thread_num)
+        logger.info(f'[{self.label}]待完成任务 {self.task_total} 条')
+
+    @staticmethod
+    def multi_thread_worker(iterable, func, workers=1):
+        count = 0
+        with ThreadPoolExecutor(max_workers=workers) as executor:
+            fs = []
+            for monitor in iterable:
+                f = executor.submit(func, monitor, count=count)
+                fs.append(f)
+
+            for f in as_completed(fs):
+                result = f.result()
+                if result is not None and isinstance(result, int):
+                    count += result
+        return count
+
+    def run(self):
+        logger.info(f'[{self.label}]开始生产任务')
+        while True:
+            self.refresh_monitor(10)
+            self.create_monitor_item(20)
+            self.create_monitor_task(12)
+            tools.delay(self.sleep_interval)
+
+
+class SiteMonitorClient(BaseServer):
+
+    def __init__(self, site: str, redis_label: str, db: str, table: str):
+        super(SiteMonitorClient, self).__init__(site, redis_label, db, table,
+                                                redis_cfg=settings.redis2_conf)
+        current_process = multiprocessing.current_process()
+        self.lock_label = f'{redis_label}:{current_process.pid}'
+
+    def get_crawl_task(self):
+        lock = acquire_lock_with_timeout(self.redis_db, self.lock_label)
+        if lock:
+            convert_fields = dict(luaconfig_id=tools.ObjectId, _id=tools.ObjectId)
+            task = tools.json_loads(self.lpop(), **convert_fields)
+            if task is not None:
+                task = tools.document2dict(task)
+                del task['luaconfig_id']
+            release_lock(self.redis_db, self.lock_label, lock)
+            return task
+
+    def save_data(self, table, documents):
+        pass

+ 33 - 0
setting.py

@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2023-11-09 
+---------
+@summary:  
+---------
+@author: Dzr
+"""
+import platform
+from pathlib import Path
+
+import yaml
+
+__all__ = [
+    'mongo_conf',
+    'redis_conf', 'redis2_conf',
+    'jy_proxy',
+]
+
+if platform.system() not in ['Darwin', 'Windows']:
+    ENV = 'dev.yaml'
+else:
+    ENV = 'test.yaml'
+
+_base_path = Path(__file__).parent
+_yaml_conf = (_base_path / 'yml' / ENV).resolve()
+
+with open(_yaml_conf, encoding="utf-8") as f:
+    _conf = yaml.safe_load(f)
+    mongo_conf = _conf['mongo']
+    redis_conf = _conf['redis']
+    redis2_conf = _conf['redis2']
+    jy_proxy: dict = _conf['proxy']

+ 21 - 0
yml/dev.yaml

@@ -0,0 +1,21 @@
+proxy:
+  socks5:
+    url: http://proxy.spdata.jianyu360.com/proxy/getallip
+    decrypt: ABNOPqrceQRSTklmUDEFGXYZabnopfghHVWdijstuvwCIJKLMxyz0123456789+/
+    base_url: http://cc.spdata.jianyu360.com
+
+mongo:
+  host: 172.17.4.87
+  port: !!int 27080
+
+redis:
+  host: 172.17.4.232
+  port: !!int 7361
+  pwd: "k5ZJR5KV4q7DRZ92DQ"
+  db: !!int 1
+
+redis2:
+  host: 172.17.4.232
+  port: !!int 7361
+  pwd: "k5ZJR5KV4q7DRZ92DQ"
+  db: !!int 4

+ 21 - 0
yml/test.yaml

@@ -0,0 +1,21 @@
+proxy:
+  socks5:
+    url: http://proxy.spdata.jianyu360.com/proxy/getallip
+    decrypt: ABNOPqrceQRSTklmUDEFGXYZabnopfghHVWdijstuvwCIJKLMxyz0123456789+/
+    base_url: http://127.0.0.1:1405
+
+mongo:
+  host: 192.168.3.166
+  port: !!int 27082
+
+redis:
+  host: 192.168.3.165
+  port: !!int 8165
+  pwd: "top@123"
+  db: !!int 1
+
+redis2:
+  host: 192.168.3.165
+  port: !!int 8165
+  pwd: "top@123"
+  db: !!int 4