|
@@ -2577,6 +2577,9 @@ def ensure_int64(n):
|
|
|
return bson.int64.Int64(n)
|
|
|
|
|
|
|
|
|
+######### custom ##########
|
|
|
+
|
|
|
+
|
|
|
def literal_eval(node_or_string):
|
|
|
"""
|
|
|
安全地计算表达式节点或包含Python表达式的字符串。
|
|
@@ -2606,3 +2609,58 @@ def chinese_character(text: str):
|
|
|
|
|
|
# 列表长度即是中文的字数
|
|
|
return SearchText(len(re.findall('[\u4e00-\u9fa5]', text, re.S)))
|
|
|
+
|
|
|
+
|
|
|
+def repair_json(data, limit=10):
|
|
|
+ """
|
|
|
+ 检查json字符串格式,对错误的json格式在规定次数中尝试进行修复
|
|
|
+ @param str data: json_str
|
|
|
+ @param int limit: json_str 结构修正次数上限
|
|
|
+ """
|
|
|
+ retries = 0
|
|
|
+ try:
|
|
|
+ data = ast.literal_eval(data)
|
|
|
+ return json.loads(json.dumps(data, ensure_ascii=False))
|
|
|
+ except SyntaxError:
|
|
|
+ while retries < limit:
|
|
|
+ try:
|
|
|
+ parsed_data = json.loads(data)
|
|
|
+ # print("字符串符合JSON格式")
|
|
|
+ # print(parsed_data)
|
|
|
+ return parsed_data
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ retries += 1
|
|
|
+ error_pos = e.pos
|
|
|
+ err_msg = e.msg
|
|
|
+ # print("字符串不符合JSON格式")
|
|
|
+ # print("错误位置:", error_pos)
|
|
|
+ # print("错误原因:", err_msg)
|
|
|
+ data = data[:-1] if data.endswith(",") else data
|
|
|
+ # 补全'[{...}]' 缺失符号
|
|
|
+ missing_bracket_count = data.count('[{') - data.count('}]')
|
|
|
+ if missing_bracket_count > 0:
|
|
|
+ for i in range(missing_bracket_count):
|
|
|
+ delimiter = ']' if i == 0 and data.endswith("}") else '}]'
|
|
|
+ data += delimiter
|
|
|
+ continue
|
|
|
+ # 补全缺失闭合符号
|
|
|
+ missing_closing_bracket_count = data.count('{') - data.count('}')
|
|
|
+ if missing_closing_bracket_count > 0:
|
|
|
+ for i in range(missing_closing_bracket_count):
|
|
|
+ data += '}'
|
|
|
+ continue
|
|
|
+ # 双引号替换单引号
|
|
|
+ if 'Expecting property name enclosed in double quotes' == err_msg:
|
|
|
+ data = data.replace("'", '"')
|
|
|
+ try:
|
|
|
+ data = ast.literal_eval(data)
|
|
|
+ data = json.dumps(data, ensure_ascii=False)
|
|
|
+ except SyntaxError:
|
|
|
+ pass
|
|
|
+ continue
|
|
|
+ # 缺失符号补全
|
|
|
+ if 'delimiter' in err_msg:
|
|
|
+ ret = re.search('Expecting(.*?)delimiter', err_msg).group(1)
|
|
|
+ delimiter = ret.strip().replace("'", '')
|
|
|
+ data = "".join([data[:error_pos], delimiter, data[error_pos:]])
|
|
|
+ continue
|