|
- # -*- coding: utf-8 -*-
- """
- Created on 2018/12/27 11:32 AM
- ---------
- @summary: cookie池
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- import abc
- import datetime
- import random
- import time
- import warnings
- from collections import Iterable
- from enum import Enum, unique
- import requests
- from feapder.db.mongodb import MongoDB
- import feapder.utils.tools as tools
- from feapder import setting
- from feapder.network import user_agent
- from feapder.db.mysqldb import MysqlDB
- from feapder.db.redisdb import RedisDB
- from feapder.utils import metrics
- from feapder.utils.log import log
- from feapder.utils.redis_lock import RedisLock
- from feapder.utils.tools import send_msg
- from feapder.utils.webdriver import WebDriver
- class CookiePoolInterface(metaclass=abc.ABCMeta):
- """
- cookie pool interface
- """
- @abc.abstractmethod
- def create_cookie(self, *args, **kwargs):
- raise NotImplementedError
- @abc.abstractmethod
- def get_cookie(self, *args, **kwargs):
- raise NotImplementedError
- @abc.abstractmethod
- def del_cookie(self, *args, **kwargs):
- raise NotImplementedError
- @abc.abstractmethod
- def run(self):
- raise NotImplementedError
- class PageCookiePool(CookiePoolInterface):
- """
- 由页面产生的cookie 不需要用户登陆
- """
- def __init__(
- self,
- redis_key,
- page_url=None,
- min_cookies=10000,
- must_contained_keys=(),
- keep_alive=False,
- **kwargs,
- ):
- """
- @param redis_key: 项目名
- @param page_url: 生产cookie的url
- @param min_cookies: 最小cookie数
- @param must_contained_keys: cookie 必须包含的key
- @param keep_alive: 当cookie数量足够是是否保持随时待命,生产cookie的状态。False为否,满足则退出
- ---
- @param kwargs: WebDriver的一些参数
- load_images: 是否加载图片
- user_agent_pool: user-agent池 为None时不使用
- proxies_pool: ;代理池 为None时不使用
- headless: 是否启用无头模式
- driver_type: web driver 类型
- timeout: 请求超时时间 默认16s
- window_size: 屏幕分辨率 (width, height)
- """
- self._redisdb = RedisDB()
- self._tab_cookie_pool = "{}:l_cookie_pool".format(redis_key)
- self._tab_cookie_pool_last_count = "{}:str_cookie_pool_count".format(
- redis_key
- ) # 存储上一次统计cookie 数量的时间,格式为 时间戳:数量
- self._page_url = page_url
- self._min_cookies = min_cookies
- self._must_contained_keys = must_contained_keys
- self._keep_alive = keep_alive
- self._kwargs = kwargs
- self._kwargs.setdefault("load_images", False)
- self._kwargs.setdefault("headless", True)
- def create_cookie(self):
- """
- 可能会重写
- @return:
- """
- url = self._page_url
- header = {
- "Upgrade-Insecure-Requests": "1",
- "User-Agent": user_agent.get()
- }
- res = requests.get(url, headers=header)
- cookies = requests.utils.dict_from_cookiejar(res.cookies)
- return cookies
- def add_cookies(self, cookies):
- log.info("添加cookie {}".format(cookies))
- self._redisdb.lpush(self._tab_cookie_pool, cookies)
- def run(self):
- while True:
- try:
- now_cookie_count = self._redisdb.lget_count(self._tab_cookie_pool)
- need_cookie_count = self._min_cookies - now_cookie_count
- if need_cookie_count > 0:
- log.info(
- "当前cookie数为 {} 小于 {}, 生产cookie".format(
- now_cookie_count, self._min_cookies
- )
- )
- try:
- print('????')
- cookies = self.create_cookie()
- if cookies:
- self.add_cookies(cookies)
- except Exception as e:
- log.exception(e)
- else:
- log.info("当前cookie数为 {} 数量足够 暂不生产".format(now_cookie_count))
- # 判断cookie池近一分钟数量是否有变化,无变化则认为爬虫不再用了,退出
- last_count_info = self._redisdb.strget(
- self._tab_cookie_pool_last_count
- )
- if not last_count_info:
- self._redisdb.strset(
- self._tab_cookie_pool_last_count,
- "{}:{}".format(time.time(), now_cookie_count),
- )
- else:
- last_time, last_count = last_count_info.split(":")
- last_time = float(last_time)
- last_count = int(last_count)
- if time.time() - last_time > 60:
- if now_cookie_count == last_count:
- log.info("近一分钟,cookie池数量无变化,判定爬虫未使用,退出生产")
- break
- else:
- self._redisdb.strset(
- self._tab_cookie_pool_last_count,
- "{}:{}".format(time.time(), now_cookie_count),
- )
- if self._keep_alive:
- log.info("sleep 10")
- tools.delay_time(10)
- else:
- break
- except Exception as e:
- log.exception(e)
- tools.delay_time(1)
- def get_cookie(self, wait_when_null=True):
- while True:
- try:
- cookie_info = self._redisdb.rpoplpush(self._tab_cookie_pool)
- if not cookie_info and wait_when_null:
- log.info("暂无cookie 生产中...")
- self._keep_alive = False
- self._min_cookies = 1
- with RedisLock(
- key=self._tab_cookie_pool, lock_timeout=3600, wait_timeout=5
- ) as _lock:
- if _lock.locked:
- self.run()
- continue
- return eval(cookie_info) if cookie_info else {}
- except Exception as e:
- log.exception(e)
- tools.delay_time(1)
- def del_cookie(self, cookies):
- self._redisdb.lrem(self._tab_cookie_pool, cookies)
- # PageCookiePool('cookie_1',page_url="https://www.whzbtb.com/V2PRTS/PrequalificationPublicityInfoListInit.do").create_cookie()
- class User:
- def __init__(self, username, cookie):
- self.username = username
- self.cookie = cookie
- class LoginCookiePool(CookiePoolInterface):
- """
- 需要登陆的cookie池, 用户账号密码等信息用mysql保存
- """
- def __init__(
- self,
- redis_key,
- *,
- table_userbase,
- login_state_key="login_state",
- lock_state_key="lock_state",
- username_key="username",
- password_key="password",
- login_retry_times=10,
- ):
- """
- @param redis_key: 项目名
- @param table_userbase: 用户表名
- @param login_state_key: 登录状态列名
- @param lock_state_key: 封锁状态列名
- @param username_key: 登陆名列名
- @param password_key: 密码列名
- @param login_retry_times: 登陆失败重试次数
- """
- self._tab_cookie_pool = "{}:l_cookie_pool".format(redis_key)
- self._login_retry_times = login_retry_times
- self._table_userbase = table_userbase
- self._login_state_key = login_state_key
- self._lock_state_key = lock_state_key
- self._username_key = username_key
- self._password_key = password_key
- self._redisdb = RedisDB()
- self._mongo = MongoDB(db='user_login')
- def create_cookie(self, username, password):
- """
- 创建cookie
- @param username: 用户名
- @param password: 密码
- @return: return cookie / None
- """
- raise NotImplementedError
- def get_user_info(self):
- """
- 返回用户信息
- @return: yield username, password
- """
- return self._mongo.find(self._table_userbase,{self._lock_state_key:0,self._login_state_key:0})
- def handle_login_failed_user(self, username, password):
- """
- 处理登录失败的user
- @param username:
- @param password:
- @return:
- """
- pass
- def handel_exception(self, e):
- """
- 处理异常
- @param e:
- @return:
- """
- log.exception(e)
- def save_cookie(self, username, cookie):
- user_cookie = {"username": username, "cookie": cookie}
- self._redisdb.lpush(self._tab_cookie_pool, user_cookie)
- self._mongo.add(
- coll_name=self._table_userbase,
- data={self._login_state_key:1},
- update_columns=self._username_key,
- update_columns_value=username)
- def get_cookie(self, wait_when_null=True) -> User:
- while True:
- try:
- user_cookie = self._redisdb.rpoplpush(self._tab_cookie_pool)
- if not user_cookie and wait_when_null:
- log.info("暂无cookie 生产中...")
- self.login()
- continue
- if user_cookie:
- user_cookie = eval(user_cookie)
- return User(**user_cookie)
- return None
- except Exception as e:
- log.exception(e)
- tools.delay_time(1)
- def del_cookie(self, user: User):
- """
- 删除失效的cookie
- @param user:
- @return:
- """
- user_info = {"username": user.username, "cookie": user.cookie}
- self._redisdb.lrem(self._tab_cookie_pool, user_info)
- self._mongo.add(
- coll_name=self._table_userbase,
- data={self._login_state_key: 1},
- update_columns=self._username_key,
- update_columns_value=user.username)
- def user_is_locked(self, user: User):
- self._mongo.add(
- coll_name=self._table_userbase,
- data={self._lock_state_key: 1},
- update_columns=self._username_key,
- update_columns_value=user.username)
- def run(self):
- with RedisLock(
- key=self._tab_cookie_pool, lock_timeout=3600, wait_timeout=100
- ) as _lock:
- if _lock.locked:
- user_infos = self.get_user_info()
- if not isinstance(user_infos, Iterable):
- raise ValueError("get_user_info 返回值必须可迭代")
- if not user_infos:
- log.info("无可用用户")
- for info in user_infos:
- username = info.get("username")
- password = info.get("password")
- for i in range(self._login_retry_times):
- try:
- cookie = self.create_cookie(username, password)
- if cookie:
- self.save_cookie(username, cookie)
- else:
- self.handle_login_failed_user(username, password)
- break
- except Exception as e:
- self.handel_exception(e)
- else:
- self.handle_login_failed_user(username, password)
- login = run
- @unique
- class LimitTimesUserStatus(Enum):
- # 使用状态
- USED = "used"
- SUCCESS = "success"
- OVERDUE = "overdue" # cookie 过期
- SLEEP = "sleep"
- EXCEPTION = "exception"
- # 登陆状态
- LOGIN_SUCCESS = "login_success"
- LOGIN_FALIED = "login_failed"
- class LimitTimesUser:
- """
- 有次数限制的账户
- 基于本地做的缓存,不支持多进程调用
- """
- ACCOUNT_INFO_KEY = "accounts:h_account_info" # 存储cookie的redis key
- SITE_NAME = "" # 网站名
- redisdb = None
- def __init__(
- self,
- username,
- password,
- max_search_times,
- proxies=None,
- search_interval=0,
- **kwargs,
- ):
- """
- @param username:
- @param password:
- @param max_search_times:
- @param proxies:
- @param search_interval: 调用时间间隔。 支持元组 指定间隔的时间范围 如(5,10)即5到10秒;或直接传整数
- """
- self.__dict__.update(kwargs)
- self.username = username
- self.password = password
- self.max_search_times = max_search_times
- self.proxies = proxies
- self.search_interval = search_interval
- self.delay_use = 0 # 延时使用,用于等待解封的用户
- if isinstance(search_interval, (tuple, list)):
- if len(search_interval) != 2:
- raise ValueError("search_interval 需传递两个值的元组或列表。如(5,10)即5到10秒")
- self.used_for_time_length = (
- search_interval[1] * 5
- ) # 抢占式爬虫独享cookie时间,这段时间内其他爬虫不可抢占
- else:
- self.used_for_time_length = (
- search_interval * 5
- ) # 抢占式爬虫独享cookie时间,这段时间内其他爬虫不可抢占
- self.account_info = {
- "login_time": 0,
- "cookies": {},
- "search_times": 0,
- "last_search_time": 0,
- "used_for_spider_name": None, # 只被某个爬虫使用 其他爬虫不可使用
- "init_search_times_time": 0, # 初始化搜索次数的时间
- }
- if not self.__class__.redisdb:
- self.__class__.redisdb = RedisDB()
- self.sync_account_info_from_redis()
- self.__init_metrics()
- def __init_metrics(self):
- """
- 初始化打点系统
- @return:
- """
- metrics.init(**setting.METRICS_OTHER_ARGS)
- def record_user_status(self, status: LimitTimesUserStatus):
- metrics.emit_counter(f"{self.username}:{status.value}", 1, classify="users")
- def __repr__(self):
- return "<LimitTimesUser {} | cookies:{}>".format(self.username, self.cookies)
- def __eq__(self, other):
- return self.username == other.username
- def sync_account_info_from_redis(self):
- account_info = self.redisdb.hget(self.ACCOUNT_INFO_KEY, self.username)
- if account_info:
- account_info = eval(account_info)
- self.account_info.update(account_info)
- @property
- def cookies(self):
- cookies = self.account_info.get("cookies")
- return cookies
- def set_cookies(self, cookies):
- self.account_info["cookies"] = cookies
- return self.redisdb.hset(
- self.ACCOUNT_INFO_KEY, self.username, self.account_info
- )
- def set_login_time(self, login_time=None):
- self.account_info["login_time"] = login_time or time.time()
- return self.redisdb.hset(
- self.ACCOUNT_INFO_KEY, self.username, self.account_info
- )
- def get_login_time(self):
- return self.account_info.get("login_time")
- def is_time_to_login(self):
- return time.time() - self.get_login_time() > 40 * 60
- def get_last_search_time(self):
- return self.account_info.get("last_search_time", 0)
- def is_time_to_search(self):
- if self.delay_use:
- is_time = time.time() - self.get_last_search_time() > self.delay_use
- if is_time:
- self.delay_use = 0
- else:
- is_time = time.time() - self.get_last_search_time() > (
- random.randint(*self.search_interval)
- if isinstance(self.search_interval, (tuple, list))
- else self.search_interval
- )
- return is_time
- @property
- def used_for_spider_name(self):
- return self.account_info.get("used_for_spider_name")
- @used_for_spider_name.setter
- def used_for_spider_name(self, spider_name):
- self.account_info["used_for_spider_name"] = spider_name
- def update_status(self):
- """
- 更新search的一些状态
- @return:
- """
- self.account_info["search_times"] += 1
- self.account_info["last_search_time"] = time.time()
- return self.redisdb.hset(
- self.ACCOUNT_INFO_KEY, self.username, self.account_info
- )
- @property
- def search_times(self):
- init_search_times_time = self.account_info.get("init_search_times_time")
- current_time = time.time()
- if (
- current_time - init_search_times_time >= 86400
- ): # 如果距离上次初始化搜索次数时间大于1天,则搜索次数清清零
- self.account_info["search_times"] = 0
- self.account_info["init_search_times_time"] = current_time
- self.redisdb.hset(self.ACCOUNT_INFO_KEY, self.username, self.account_info)
- return self.account_info["search_times"]
- def is_overwork(self):
- if self.search_times > self.max_search_times:
- log.warning("账号 {} 请求次数超限制".format(self.username))
- return True
- return False
- def is_at_work_time(self):
- if datetime.datetime.now().hour in list(range(7, 23)):
- return True
- log.warning("账号 {} 不再工作时间内".format(self.username))
- return False
- def del_cookie(self):
- self.account_info["cookies"] = {}
- return self.redisdb.hset(
- self.ACCOUNT_INFO_KEY, self.username, self.account_info
- )
- def create_cookie(self):
- """
- 生产cookie 有异常需要抛出
- @return: cookie_dict
- """
- raise NotImplementedError
- def login(self):
- """
- @return: 1 成功 0 失败
- """
- try:
- # 预检查
- if not self.is_time_to_login():
- log.info("此账号尚未到登陆时间: {}".format(self.username))
- time.sleep(5)
- return 0
- cookies = self.create_cookie()
- if not cookies:
- raise Exception("登陆失败 未获取到合法cookie")
- if not isinstance(cookies, dict):
- raise Exception("cookie 必须为字典格式")
- # 保存cookie
- self.set_login_time()
- self.set_cookies(cookies)
- log.info("登录成功 {}".format(self.username))
- self.record_user_status(LimitTimesUserStatus.LOGIN_SUCCESS)
- return 1
- except Exception as e:
- log.exception(e)
- send_msg(
- msg=f"{self.SITE_NAME} {self.username} 账号登陆异常 exception: {str(e)}",
- level="error",
- message_prefix=f"{self.SITE_NAME} {self.username} 账号登陆异常",
- )
- log.info("登录失败 {}".format(self.username))
- self.record_user_status(LimitTimesUserStatus.LOGIN_FALIED)
- return 0
- class LimitTimesUserPool:
- """
- 限制查询次数的用户的User pool
- 基于本地做的缓存,不支持多进程调用
- """
- LOAD_USER_INTERVAL = 60
- def __init__(self, *, accounts_dict, limit_user_class, support_more_client=True):
- """
- @param accounts_dic: 账户信息字典
- {
- "15011300228": {
- "password": "300228",
- "proxies": {},
- "max_search_times": 500,
- "search_interval": 1, # 使用时间间隔
- # 其他携带信息
- }
- }
- @param limit_user_class: 用户重写的 limit_user_class
- @param support_more_client: 是否支持多客户端 即多线程 多进程模式 (可能在计数上及使用频率上有些误差)
- """
- self.accounts_dict = accounts_dict
- self.limit_user_class = limit_user_class
- self.limit_times_users = []
- self.current_user_index = -1
- self.support_more_client = support_more_client
- self.last_load_user_time = 0
- def __load_users(self, username=None):
- # 装载user
- log.info("更新可用用户")
- for _username, detail in self.accounts_dict.items():
- if username and username != _username:
- continue
- limit_times_users = self.limit_user_class(username=_username, **detail)
- if limit_times_users in self.limit_times_users:
- continue
- if limit_times_users.is_overwork():
- continue
- else:
- if (
- limit_times_users.cookies or limit_times_users.login()
- ): # 如果有cookie 或者登陆成功 则添加到可用的user队列
- self.limit_times_users.append(limit_times_users)
- self.last_load_user_time = time.time()
- def get_user(
- self,
- username=None,
- used_for_spider_name=None,
- wait_when_null=True,
- not_limit_frequence=False,
- ) -> LimitTimesUser:
- """
- @params username: 获取指定的用户
- @params used_for_spider_name: 独享式使用,独享爬虫的名字。其他爬虫不可抢占
- @params wait_when_null: 无用户时是否等待
- @params not_limit_frequence: 不限制使用频率
- @return: LimitTimesUser
- """
- if not self.support_more_client:
- warnings.warn(
- "LimitTimesUserCookiePool 取查询次数等信息时基于本地做的缓存,不支持多进程或多线程",
- category=Warning,
- )
- self._is_show_warning = True
- while True:
- if (
- not self.limit_times_users
- or time.time() - self.last_load_user_time >= self.LOAD_USER_INTERVAL
- ):
- self.__load_users(username)
- if not self.limit_times_users:
- log.warning("无可用的用户")
- if wait_when_null:
- time.sleep(1)
- continue
- else:
- return None
- self.current_user_index += 1
- self.current_user_index = self.current_user_index % len(
- self.limit_times_users
- )
- limit_times_user = self.limit_times_users[self.current_user_index]
- if self.support_more_client: # 需要先同步下最新数据
- limit_times_user.sync_account_info_from_redis()
- if username and limit_times_user.username != username:
- log.info(
- "{} 为非指定用户 {}, 获取下一个用户".format(limit_times_user.username, username)
- )
- time.sleep(1)
- continue
- # 独占式使用,若为其他爬虫,检查等待使用时间是否超过独占时间,若超过则可以使用
- if (
- limit_times_user.used_for_spider_name
- and limit_times_user.used_for_spider_name != used_for_spider_name
- ):
- wait_time = time.time() - limit_times_user.get_last_search_time()
- if wait_time < limit_times_user.used_for_time_length:
- log.info(
- "用户{} 被 {} 爬虫独占,需等待 {} 秒后才可使用".format(
- limit_times_user.username,
- limit_times_user.used_for_spider_name,
- limit_times_user.used_for_time_length - wait_time,
- )
- )
- time.sleep(1)
- continue
- if (
- not limit_times_user.is_overwork()
- and limit_times_user.is_at_work_time()
- ):
- if not limit_times_user.cookies:
- self.limit_times_users.remove(limit_times_user)
- continue
- if not_limit_frequence or limit_times_user.is_time_to_search():
- limit_times_user.used_for_spider_name = used_for_spider_name
- limit_times_user.update_status()
- log.info("使用用户 {}".format(limit_times_user.username))
- limit_times_user.record_user_status(LimitTimesUserStatus.USED)
- return limit_times_user
- else:
- log.info("{} 用户使用间隔过短 查看下一个用户".format(limit_times_user.username))
- time.sleep(1)
- continue
- else:
- self.limit_times_users.remove(limit_times_user)
- self.current_user_index -= 1
- if not limit_times_user.is_at_work_time():
- log.warning("用户 {} 不在工作时间".format(limit_times_user.username))
- if wait_when_null:
- time.sleep(30)
- continue
- else:
- return None
- def del_user(self, username):
- for limit_times_user in self.limit_times_users:
- if limit_times_user.username == username:
- limit_times_user.del_cookie()
- self.limit_times_users.remove(limit_times_user)
- limit_times_user.record_user_status(LimitTimesUserStatus.OVERDUE)
- self.__load_users(username)
- break
- def update_cookies(self, username, cookies):
- for limit_times_user in self.limit_times_users:
- if limit_times_user.username == username:
- limit_times_user.set_cookies(cookies)
- break
- def delay_use(self, username, delay_seconds):
- for limit_times_user in self.limit_times_users:
- if limit_times_user.username == username:
- limit_times_user.delay_use = delay_seconds
- limit_times_user.record_user_status(LimitTimesUserStatus.SLEEP)
- break
- def record_success_user(self, username):
- for limit_times_user in self.limit_times_users:
- if limit_times_user.username == username:
- limit_times_user.record_user_status(LimitTimesUserStatus.SUCCESS)
- def record_exception_user(self, username):
- for limit_times_user in self.limit_times_users:
- if limit_times_user.username == username:
- limit_times_user.record_user_status(LimitTimesUserStatus.EXCEPTION)
- # if __name__ == '__main__':
- # cookiepool = PageCookiePool(redis_key='fwork:gszfcg',
- # page_url='http://www.ccgp-hubei.gov.cn/notice/cgyxgg/index_1.html',
- # driver_type='FIREFOX',
- # executable_path="D:\\geckodriver.exe")
- # cookiepool.create_cookie()
|