from flask import Flask, request, jsonify, abort from flask_httpauth import HTTPBasicAuth from werkzeug.security import generate_password_hash, check_password_hash from common.databases import mongo_table from common.log import logger from services import ( accountManagePool, get_base_url, socks5ProxyPool, httpProxyPool, ) '''以下模块以动态方式加载入全局变量,请勿删除''' try: from services import zbytb from services import ybw # from services import nmpa from services import site_monitor except ImportError as e: print(f"缺少全局变量, 原因:{e.args}") app = Flask(__name__) '''认证方式''' auth = HTTPBasicAuth() '''chrome代理状态记录''' ChromeUser: dict = {} '''用户表''' Users = mongo_table('py_spider', 'spider_scheduler_auth') @auth.verify_password def verify_password(username, password): item = Users.find_one({'username': username}) if item is not None: user = { item['username']: generate_password_hash(item['password']) } if username in user and check_password_hash(user.get(username), password): return username return None @app.route('/') @auth.login_required def index(): return ' Hello, {}!

代理池使用情况'.format( auth.username(), get_base_url() + '/crawl/proxy/query' ) @app.route('/proxy', methods=['GET']) def chrome_proxy_plugin(): global ChromeUser client = request.args.get('clientid') ip = request.remote_addr if client is None: return jsonify(data={}) if client not in ChromeUser: ChromeUser.setdefault(client, {'chrome_use_proxy': True, 'ip': ip}) else: config: dict = ChromeUser.get(client) config.update({'chrome_use_proxy': True}) ChromeUser.update({client: config}) logger.info(f"ChromeUser: {ChromeUser}") return jsonify(data=ChromeUser.get(client)) @app.route('/proxy/test', methods=['GET']) def chrome_proxy_plugin_check(): global ChromeUser client = request.args.get('clientid') if client is None or client not in ChromeUser: return 'false' else: config: dict = ChromeUser.get(client) if config.get('chrome_use_proxy'): config.update({'chrome_use_proxy': False}) ChromeUser.update({client: config}) return 'true' else: return 'false' @app.route('/proxy/user/show', methods=['GET']) @auth.login_required def show_chrome_proxy_plugin_user(): return jsonify(data=ChromeUser) @app.route('/crawl/proxy//fetch', methods=['GET']) @auth.login_required def get_proxy(scheme): # logger.info(f'[访问ip]{request.remote_addr}, class:{scheduler_class_name}') result = {} try: proxies = None if scheme == 'http': proxies = httpProxyPool.proxies() elif scheme == 'socks5': proxies = socks5ProxyPool.proxies() else: abort(404) logger.info(f'[调用{scheme}代理]{proxies}') if proxies is not None: result.update(proxies) except KeyError: pass return jsonify(data=result) @app.route('/crawl/proxy/query', methods=['GET']) @auth.login_required def show_proxy(): socks_pool = socks5ProxyPool.get_proxy_pool() http_pool = httpProxyPool.get_proxy_pool() pool = [*socks_pool, *http_pool] return jsonify(data=pool) @app.route('/upload/data//', methods=['POST']) @auth.login_required def upload_data(scheduler_class_name, table): data_json = request.json logger.info(f"[接收数据]{data_json}") try: scheduler_class = globals()[scheduler_class_name] scheduler_class.save_data(table, data_json) return 'success' except KeyError: return 'failure' @app.route('/crawl//task/fetch', methods=['GET']) def get_crawl_task(scheduler_class_name): task = {} try: scheduler_class = globals()[scheduler_class_name] result = scheduler_class.get_crawl_task() if result is not None: task = result except KeyError: pass return jsonify(data=task) @app.route('/crawl//task/total', methods=['GET']) def get_crawl_task_total(scheduler_class_name): total = {'total': 0} try: scheduler_class = globals()[scheduler_class_name] total.update({'total': scheduler_class.task_total}) except KeyError: pass return jsonify(data=total) @app.route('/competing_goods/account/fetch', methods=['GET']) @auth.login_required def competing_goods_account_lock(): req_ip = request.remote_addr site = request.args.get('site') crawl_type = request.args.get('crawl_type') result = accountManagePool.lock_account(site, crawl_type, req_ip) return jsonify(data=result) @app.route('/competing_goods/account/release', methods=['GET']) @auth.login_required def competing_goods_account_release(): req_ip = request.remote_addr uid = request.args.get('uid') crawl_type = request.args.get('crawl_type') if uid in [None, '']: abort(404) # Unauthorized 未授权 res = accountManagePool.release_account(uid, crawl_type, req_ip) return jsonify(data=res) # if __name__ == '__main__': # app.run(host='0.0.0.0', port=1405, debug=True, use_reloader=False)