# -*- coding: utf-8 -*- """ Created on 2024-10-10 --------- @summary: --------- @author: Dzr """ import json from pathlib import Path from DrissionPage import ChromiumPage, ChromiumOptions from DrissionPage._functions.tools import PortFinder account_pool = [ ('18530014520', 'qp!4LXH_'), ] def auto_login(username, password, proxy=False, headless=False, auto_quit=False, accident_url=None): co = ChromiumOptions() port, _ = PortFinder(path='./').get_port() co.set_paths( local_port=port, user_data_path=f'./chrome/{username}', download_path=f'./download/{username}' ) # 禁用密码保存弹窗 co.set_argument('--disable-extensions') if proxy: proxies = { 'https': 'socks5://58.221.59.179:8860', 'http': 'socks5://58.221.59.179:8860' } co.set_argument('--proxy-server', value=proxies['https']) else: proxies = None if headless: co.set_argument('--headless', value='new') co.set_user_agent('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36') page = ChromiumPage(addr_or_opts=co) try: targets = [ 'website-seo/v2/cm/getcatid', 'rest/detail/alltypesdetail/detail', 'rest/account/companySpace/checkNewUser' ] page.listen.start(targets=targets, res_type=['Document', 'XHR']) # 开启路由监听 success = page.get('https://vip.qianlima.com/') # 访问用户管理界面 if not success: return login = page.wait.ele_displayed('x://span[text()="登录"]', timeout=5) if login: page.ele('x://input[@name="username"]').input(username, clear=True) page.ele('x://input[@name="password"]').input(password, clear=True) page.ele('x://span[text()="登录"]/parent::*').click() loaded = page.wait.ele_displayed(f'x://p[contains(text(), "{username}")]') # 等待页面加载完成 if not loaded: print(f'登录失败>{username}') return # page.get('http://www.qianlima.com/zb/detail/20241016_454396207.html') packet = page.listen.wait() root = Path(__file__).parent if not (root / 'account').exists(): (root / 'account').mkdir(exist_ok=True) file = (root / f'account/{username}.json').absolute() with open(file, 'w') as f: print(packet.url) # 打印数据包url # print(packet.response.body) headers = dict(packet.request.headers) print(f'** headers ** \n{json.dumps(headers, indent=4)}') cookies = page.cookies(as_dict=True) print(f'** cookies ** \n{json.dumps(cookies, indent=4)}') user = { 'cookies': cookies, 'headers': headers, 'proxies': proxies } f.write(json.dumps(user, indent=4)) if accident_url is not None: while True: page.get(accident_url) # 人工处理意外情况 if input('异常已处理?[Y|N]').upper() == 'Y': break if not auto_quit: f.flush() while True: if input("退出? >>>"): break except KeyboardInterrupt: pass finally: page.quit() print('关闭浏览器') if __name__ == '__main__': for username, password in account_pool: auto_login(username, password, proxy=True, auto_quit=False, headless=False)