123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- import time
- from collections import Iterable
- from func_timeout import func_set_timeout
- import feapder.utils.tools as tools
- from feapder.db.mongodb import MongoDB
- from feapder.db.redisdb import RedisDB
- from feapder.network.cookie_pool import (
- CookiePoolInterface,
- PageCookiePool,
- User,
- )
- from feapder.utils.log import log
- from feapder.utils.redis_lock import RedisLock
- from feapder.utils.tools import get_current_date
- __all__ = [
- "PageCookiePool",
- "User",
- "LoginCookiePool"
- ]
- class LoginCookiePool(CookiePoolInterface):
- """
- 需要登陆的cookie池, 用户账号密码等信息用mongoDB保存
- """
- def __init__(
- self,
- redis_key,
- *,
- login_site,
- table_userbase="feapder_login",
- table_login_record="feapder_login_record",
- login_state_key="login_state",
- lock_state_key="lock_state",
- username_key="username",
- password_key="password",
- login_retry_times=10,
- ):
- """
- @param redis_key: 项目名
- @param login_site: 网站名称
- @param table_userbase: 用户表名
- @param table_login_record: 用户登录状态表名
- @param login_state_key: 登录状态列名
- @param lock_state_key: 封锁状态列名
- @param username_key: 登陆名列名
- @param password_key: 密码列名
- @param login_retry_times: 登陆失败重试次数
- """
- self._tab_cookie_pool = "{}:l_cookie_pool".format(redis_key)
- self._login_retry_times = login_retry_times
- self._table_userbase = table_userbase
- self._table_login_record = table_login_record
- self._login_state_key = login_state_key
- self._lock_state_key = lock_state_key
- self._username_key = username_key
- self._password_key = password_key
- self._login_site = login_site
- self._redisdb = RedisDB()
- self._mongo = MongoDB(db='user_login')
- def create_cookie(self, username, password):
- """
- 创建cookie
- @param username: 用户名
- @param password: 密码
- @return: return cookie / None
- """
- raise NotImplementedError
- def get_user_info(self):
- """
- 返回用户信息
- @return: yield username, password
- """
- query = {
- "site": self._login_site,
- self._lock_state_key: 0,
- self._login_state_key: 0
- }
- return self._mongo.find(self._table_userbase, query)
- def handle_login_failed_user(self, username, password):
- """
- 处理登录失败的user
- @param username:
- @param password:
- @return:
- """
- pass
- def handel_exception(self, e):
- """
- 处理异常
- @param e:
- @return:
- """
- log.exception(e)
- def save_cookie(self, username, cookie):
- user_cookie = {"username": username, "cookie": cookie}
- self._redisdb.lpush(self._tab_cookie_pool, user_cookie)
- self._mongo.add(
- coll_name=self._table_login_record,
- data={self._login_state_key: 1,
- "status": "create",
- "site": self._login_site,
- "login_time": time.strftime("%Y-%m-%d %H:%M:%S",
- time.localtime(
- int(round(time.time()))))},
- update_columns=self._username_key,
- update_columns_value=username)
- @func_set_timeout(60)
- def get_cookie(self, wait_when_null=True) -> User:
- while True:
- try:
- user_cookie = self._redisdb.rpoplpush(self._tab_cookie_pool)
- if not user_cookie and wait_when_null:
- log.info("暂无cookie 生产中...")
- self.login()
- continue
- if user_cookie:
- user_cookie = eval(user_cookie)
- return User(**user_cookie)
- return None
- except Exception as e:
- log.exception(e)
- tools.delay_time(1)
- def del_cookie(self, user: User):
- """
- 删除失效的cookie
- @param user:
- @return:
- """
- user_info = {"username": user.username, "cookie": user.cookie}
- self._redisdb.lrem(self._tab_cookie_pool, user_info)
- self._mongo.add(
- coll_name=self._table_login_record,
- data={
- self._login_state_key: 1,
- "status": "remove",
- "site": self._login_site,
- "login_time": get_current_date()
- },
- update_columns=self._username_key,
- update_columns_value=user.username)
- def user_is_locked(self, user: User):
- self._mongo.add(
- coll_name=self._table_login_record,
- data={
- self._lock_state_key: 1,
- "site": self._login_site,
- "login_time": get_current_date()
- },
- update_columns=self._username_key,
- update_columns_value=user.username)
- def run(self):
- with RedisLock(
- key=self._tab_cookie_pool, lock_timeout=3600, wait_timeout=100
- ) as _lock:
- if _lock.locked:
- user_infos = self.get_user_info()
- if not isinstance(user_infos, Iterable):
- raise ValueError("get_user_info 返回值必须可迭代")
- if not user_infos:
- log.info("无可用用户")
- for info in user_infos:
- username = info.get("username")
- password = info.get("password")
- for i in range(self._login_retry_times):
- try:
- cookie = self.create_cookie(username, password)
- if cookie:
- self.save_cookie(username, cookie)
- else:
- self.handle_login_failed_user(username,
- password)
- break
- except Exception as e:
- self.handel_exception(e)
- else:
- self.handle_login_failed_user(username, password)
- login = run
|