123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import copy
- import json
- import re
- import time
- from pathlib import Path
- import execjs
- import jsbeautifier
- import requests
- from requests.utils import dict_from_cookiejar
- from config.load import node_module_path
- from jsl import jsl
- def save_js_script(js_code: str, allow_beautify_code=False):
- with open('etx.js', 'w', encoding='utf-8') as f:
- if allow_beautify_code:
- f.write(jsbeautifier.beautify(js_code)) # 解压缩js代码
- f.write(js_code)
- def load_js_script():
- with open('etx.js', encoding='utf-8') as f:
- return f.read()
- def modify_go_func(repl_js: str, js_func: str):
- document_code = re.search('document\[.*?\]\s{0,1}=.*?;', repl_js).group()
- property_name = re.search('document\[.*?\]\s{0,1}?', document_code).group()
- return_back = 'return {};'.format(property_name)
- new_js = '\n{a}{b}\n{a}{c}\n{a}'.format(
- a=' ' * 6,
- b=document_code,
- c=return_back
- )
- return js_func.replace(repl_js, new_js)
- def execute_js_script(script_js: str):
- js_header = '''
- const jsdom = require("jsdom");
- const { JSDOM } = jsdom;
- const dom = new JSDOM(`<!DOCTYPE html><p>Hello world</p>`);
- window = dom.window;
- document = window.document;
- '''
- js_script = js_header + script_js
- # 格式化js代码
- beautify_js = jsbeautifier.beautify(js_script)
- # 将js代码弹窗事件修改为控制台输出
- js_script = beautify_js.replace('alert', 'console.log')
- # 将源js脚本代码中go函数回调逻辑修改为 return
- go_etx = re.search('go\(\{.*?\}\)', js_script, flags=re.S).group()
- return_go_etx = 'return ' + go_etx
- js_script = js_script.replace(go_etx, return_go_etx)
- # 替换js代码中setTimeout事件
- go_func = re.search('function go\(.*\};', js_script, flags=re.S).group()
- patterns = {
- 'p1': '\n[ ]+.{10,100}\(setTimeout.*\n[ ]+document.*\n[ ]+location.*?\n.*?\n[ ]+',
- 'p2': '\n[ ]+setTimeout.*\n[ ]+.*document.*\n[ ]+location.*?\n.*?\n[ ]+',
- 'p3': '[ ]+.{10,100}\(setTimeout.*[ ]+if \(.*\) \{.*\}.*, _\w{8}\);',
- 'p4': '[ ]+setTimeout.*_\w{8}\);',
- }
- go_func_new = copy.deepcopy(go_func)
- for p, pattern in patterns.items():
- if p in ['p3', 'p4']:
- # p1 会误判 p3 情况
- results = re.findall(pattern, go_func_new, flags=re.S)
- else:
- results = re.findall(pattern, go_func_new)
- if len(results) > 0:
- # print(f"清洗规则:{p}")
- for obj_js in results:
- go_func_new = modify_go_func(obj_js, go_func_new)
- js_script = js_script.replace(go_func, go_func_new)
- js_script = 'function getCookies(){' + js_script + '}'
- # print(js_script)
- cwd = node_module_path
- etx = execjs.compile(js_script, cwd=cwd)
- return etx.call('getCookies')
- def extract_clearance(js_code: str):
- result = re.search('_s=.*%3D', js_code)
- '''
- '__jsl_clearance_s=1641259814.553|-1|LlKSd3QgHj0KliuCI5cEMbwU7HU%3D;max-age=3600;path=/'
- '__jsl_clearance_s=1641259382.821|0|ThOeD4stO5usoh9oC0MP5%2Fx3SPc%3D'
- '''
- if result is None:
- return None
- result = result.group()
- result = result.replace('_s=', '')
- return result
- def extract_cookies_js(js_html: str):
- patterns = [
- '<script>document\.cookie=(.*);location\.href=location\.pathname\+location\.search</script>',
- '<script>(.*)</script>'
- ]
- for pattern in patterns:
- result = re.search(pattern, js_html)
- if result is not None:
- return result.group(1)
- else:
- return None
- def http_session_521_old(url: str, headers: dict, **kwargs):
- if 'Cookie' in headers:
- del headers['Cookie']
- if 'cookies' in kwargs:
- del kwargs['cookies']
- request_params = {}
- request_params.setdefault('proxies', kwargs.get('proxies'))
- request_params.setdefault('timeout', kwargs.get('timeout') or 60)
- http_session = requests.Session()
- try:
- resp1 = http_session.get(url, headers=headers, **request_params)
- except requests.RequestException:
- # print("代理超时")
- return False, http_session, None
- else:
- if resp1.status_code != 521:
- # print(dict_from_cookiejar(resp1.cookies))
- return True, http_session, dict_from_cookiejar(resp1.cookies)
- cookies_js1 = extract_cookies_js(resp1.text)
- if cookies_js1 is None:
- return False, http_session, None
- resp1_jsl_clearance_s = execjs.eval(cookies_js1)
- clearance1 = extract_clearance(resp1_jsl_clearance_s)
- resp1_cookies = dict_from_cookiejar(resp1.cookies)
- resp1_cookies.update({'__jsl_clearance_s': clearance1})
- try:
- resp2 = http_session.get(url, headers=headers, cookies=resp1_cookies, **request_params)
- except requests.RequestException:
- # print("代理超时")
- return False, http_session, None
- else:
- cookies_js2 = extract_cookies_js(resp2.text)
- if cookies_js2 is None:
- return False, http_session, None
- # save_js_script(cookies_js2)
- js_script = cookies_js2.replace('失败', '')
- resp2_jsl_clearance_s = execute_js_script(js_script)
- clearance2 = extract_clearance(resp2_jsl_clearance_s)
- resp2_cookies = dict_from_cookiejar(resp2.cookies)
- resp2_cookies.update({'__jsl_clearance_s': clearance2})
- return True, http_session, resp2_cookies
- def create_cookie_old(page_url, headers, proxies=None, is_save_js=False):
- retry = 0
- while (retry := retry + 1) < 10:
- try:
- session = requests.Session()
- session.proxies = proxies
- start_url = page_url
- res = session.get(start_url, headers=headers, timeout=60, verify=False)
- js_func = "".join(re.findall("document.cookie=(.*?)location.href", res.text))
- js_func = 'function sd() { return ' + js_func + "}"
- ctx = execjs.compile(js_func)
- sss = ctx.call("sd")
- cookie = {}
- for temp, index in res.cookies.get_dict().items():
- cookie[temp] = index
- for item in sss.split(";"):
- if '=' in item:
- cookie[item.split("=")[0]] = item.split("=")[-1]
- res = session.get(start_url, cookies=cookie, headers=headers, timeout=60, verify=False)
- html_str = res.content.decode()
- if "<!DOCTYPE html>" in html_str:
- html_str = re.sub("<!DOCTYPE html>[\s\S]*?</html>", "", html_str.strip(), re.S)
- if is_save_js:
- with open('./source_code.js', 'w+', encoding='utf-8') as f:
- f.write(html_str)
- js_do_data = "".join(re.findall('};go\((.*?)\)', html_str))
- js_func = re.sub("<(/*?)script>", "", html_str)
- location = re.compile('location(.*?)}}else')
- location2 = re.compile('location(.*?)}else')
- setTimeout = re.compile('0x5dc;}}(.*?)setTimeout,function\(\)\{')
- setTimeout2 = re.compile('0x5dc;}(.*?)setTimeout\(function\(\)\{')
- gox = re.compile('};go(.*?)\)')
- js_func = re.sub(location, "}}else", js_func)
- js_func = re.sub(location2, "}else", js_func)
- js_func = re.sub(setTimeout, "0x5dc;}}", js_func)
- js_func = re.sub(setTimeout2, "0x5dc;}", js_func)
- js_func = re.sub(gox, "return document['cookie']\n};", js_func)
- js_func = '''const jsdom = require("jsdom");
- const {JSDOM} = jsdom;
- const dom = new JSDOM(`<!DOCTYPE html><p>Hello world</p>`,
- {
- url: "https://example.org/",
- referrer: "https://example.com/",
- contentType: "text/html",
- });
- window = dom.window;
- document = window.document;
- location = window.location;
- ''' + js_func
- ctx = execjs.compile(js_func, cwd=node_module_path)
- if is_save_js:
- with open('./clean_code.js', 'w+', encoding='utf-8') as f:
- f.write(js_func)
- ss = ctx.call("go", json.loads(js_do_data))
- for item in ss.split(";"):
- if '=' in item:
- session.cookies.setdefault(item.split("=")[0], item.split("=")[-1])
- session.get(start_url, headers=headers, timeout=60, verify=False)
- cookies = session.cookies.get_dict()
- return cookies
- except Exception as e:
- print("cookie生产错误!")
- time.sleep(3)
- def create_cookie(page_url, headers, **kwargs):
- try:
- return jsl.get_jsl_cookies(page_url, headers, **kwargs)
- except IndexError:
- raise requests.exceptions.ContentDecodingError('jsl cookies fetch failed')
- except AssertionError:
- return {}
- def http_session_521(url, headers, proxies=None, storage=True):
- jsl_cookies = create_cookie(page_url=url, headers=headers, proxies=proxies)
- if storage:
- root = Path(__file__).parent.parent
- file = root.joinpath("config/jsl_ck.json").resolve()
- with file.open('w', encoding='utf-8') as fw:
- fw.write(json.dumps(jsl_cookies))
- return jsl_cookies
|