123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- # -*- coding: utf-8 -*-
- """
- Created on 2024-10-10
- ---------
- @summary:
- ---------
- @author: Dzr
- """
- import json
- from pathlib import Path
- from DrissionPage import ChromiumPage, ChromiumOptions
- from DrissionPage._functions.tools import PortFinder
- account_pool = [
- ('18530014520', 'qp!4LXH_'),
- ]
- def auto_login(username, password, proxy=False, headless=False, auto_quit=False, accident_url=None):
- co = ChromiumOptions()
- port, _ = PortFinder(path='./').get_port()
- co.set_paths(
- local_port=port,
- user_data_path=f'./chrome/{username}',
- download_path=f'./download/{username}'
- )
- # 禁用密码保存弹窗
- co.set_argument('--disable-extensions')
- if proxy:
- proxies = {
- 'https': 'socks5://58.221.59.179:8860',
- 'http': 'socks5://58.221.59.179:8860'
- }
- co.set_argument('--proxy-server', value=proxies['https'])
- else:
- proxies = None
- if headless:
- co.set_argument('--headless', value='new')
- co.set_user_agent('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36')
- page = ChromiumPage(addr_or_opts=co)
- try:
- targets = [
- 'website-seo/v2/cm/getcatid',
- 'rest/detail/alltypesdetail/detail',
- 'rest/account/companySpace/checkNewUser'
- ]
- page.listen.start(targets=targets, res_type=['Document', 'XHR']) # 开启路由监听
- success = page.get('https://vip.qianlima.com/') # 访问用户管理界面
- if not success:
- return
- login = page.wait.ele_displayed('x://span[text()="登录"]', timeout=5)
- if login:
- page.ele('x://input[@name="username"]').input(username, clear=True)
- page.ele('x://input[@name="password"]').input(password, clear=True)
- page.ele('x://span[text()="登录"]/parent::*').click()
- loaded = page.wait.ele_displayed(f'x://p[contains(text(), "{username}")]') # 等待页面加载完成
- if not loaded:
- print(f'登录失败>{username}')
- return
- # page.get('http://www.qianlima.com/zb/detail/20241016_454396207.html')
- packet = page.listen.wait()
- root = Path(__file__).parent
- if not (root / 'account').exists():
- (root / 'account').mkdir(exist_ok=True)
- file = (root / f'account/{username}.json').absolute()
- with open(file, 'w') as f:
- print(packet.url) # 打印数据包url
- # print(packet.response.body)
- headers = dict(packet.request.headers)
- print(f'** headers ** \n{json.dumps(headers, indent=4)}')
- cookies = page.cookies(as_dict=True)
- print(f'** cookies ** \n{json.dumps(cookies, indent=4)}')
- user = {
- 'cookies': cookies,
- 'headers': headers,
- 'proxies': proxies
- }
- f.write(json.dumps(user, indent=4))
- if accident_url is not None:
- while True:
- page.get(accident_url) # 人工处理意外情况
- if input('异常已处理?[Y|N]').upper() == 'Y':
- break
- if not auto_quit:
- f.flush()
- while True:
- if input("退出? >>>"):
- break
- except KeyboardInterrupt:
- pass
- finally:
- page.quit()
- print('关闭浏览器')
- if __name__ == '__main__':
- for username, password in account_pool:
- auto_login(username, password, proxy=True, auto_quit=False, headless=False)
|