123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- # coding:utf-8
- from tables.ai import org_ner
- from tables import clear_spacing
- from html_table_extractor.extractor import Extractor
- from tables import CatchContentObject, fsc
- from util.sensitive_word import AcAutomation
- from tables import match_company_index, key_value_header
- from tables import tfc_object
- from docs.config import ai2config, abnormal_config
- from docs.config import DEBUG
- import csv
- import re
- pattern = r',|。|\?|!|;'
- class WinnerChecker(object):
- """
- 中标字段检查
- """
- def __init__(self):
- self.errors_tables = {
- # "0101": {
- # "name": "实体识别",
- # "parent_name": "名称错误",
- # "parent_code": "01",
- # "checkFn": self.check0101
- # },
- # "0201": {
- # "name": "看数据的标签是不是之中标单位",
- # "parent_name": "数据标签错误",
- # "parent_code": "02",
- # "checkFn": self.check0201
- # },
- "0103": {
- "name": "包含叠词,异常词汇,特殊词汇",
- "parent_name": "名称错误",
- "parent_code": "01",
- "checkFn": self.check0103
- }
- }
- #
- self.winner_ac = AcAutomation()
- with open(ai2config["table_field_config"]["corpus_path"], "r") as f:
- reads = csv.reader(f)
- [self.winner_ac.add_word(d[0].strip()) for d in reads if "中标单位" in d[1] and d[0].strip()]
- def intention_check(self, header):
- """
- 意图结果检测
- :param header:
- :return:
- """
- if header in self.winner_ac.catch:
- if DEBUG:
- print(f"中标单位意图:::>> **{header}**==> [中标单位]")
- return True
- tags = tfc_object.predict([header])
- if tags:
- if "中标单位" in tags[0]:
- if DEBUG:
- print(f"中标单位意图:::>> **{header}**==> {tags}")
- return True
- def winner_intention_table(self, tables: list, companies):
- """
- 表格意图检测
- :param tables:
- :param companies:
- :return:
- """
- for row_ind, row in enumerate(tables):
- for col_ind, column in enumerate(row):
- extract_companies = re.findall("|".join(companies), column)
- if extract_companies:
- if col_ind > 0:
- status = self.intention_check(row[col_ind - 1])
- if status:
- for company in companies:
- companies.remove(company)
- if not companies:
- return False
- if row_ind > 0 and len(tables[row_ind - 1]) > col_ind:
- status = self.intention_check(tables[row_ind - 1][col_ind])
- if status:
- for company in companies:
- companies.remove(company)
- if not companies:
- return False
- if self.winner_ac.search(column):
- companies = self.winner_intention_content(companies, column)
- if not companies:
- return False
- return companies
- def winner_intention_content(self, companies, column):
- """
- 文本意图检测
- :param companies:
- :param column:
- :return:
- """
- # 公司名称的下标
- indexes = match_company_index(companies, column)
- if not indexes:
- return companies
- # 实体提取的head字段
- start_ind = 0
- for r in indexes:
- start, end, company_name = r
- if company_name not in companies:
- start_ind = end
- continue
- start_ind = start_ind if start_ind > start - 10 else start - 10
- text_ = column[start_ind:end + 10]
- start_ind = end
- head = key_value_header(text_)
- for val, ind in head:
- if self.intention_check(val):
- if company_name in companies:
- companies.pop(company_name)
- if not companies:
- return False
- return companies
- @staticmethod
- def check_company_name(contents: list, companies: list):
- """
- 公司名称检测
- :param contents:正文段落分割后
- :param companies: 公司list
- :return:返回False结束流程,list继续流程
- """
- new_content_list = []
- # 合并文本
- for ind, con in enumerate(contents):
- if "<table" in con:
- table = Extractor(con).parse()
- _tables = table.return_list()
- _tables = clear_spacing(_tables)
- for text in _tables:
- new_content_list.extend(text)
- else:
- # 一段文本
- new_content_list.append(con.replace(" ", ""))
- # 开始判断公司名称
- for text in new_content_list:
- p = r"|".join(companies)
- repatten = p.replace(")", "\)").replace("(", "\(").replace(".", "\.")
- s = re.split(pattern, text)
- for t in s:
- if re.search(repatten, t):
- al_result = org_ner(t)
- for company in al_result:
- if company in companies:
- if DEBUG:
- print(f"中标单位实体识别:::>> **{text}**==> {company}")
- companies.remove(company)
- if not companies:
- return False
- return companies
- def check_intention(self, contents: list, companies: list):
- """
- 意图检测
- :param contents:正文段落分割后
- :param companies: 公司list
- :return:返回False结束流程,list继续流程
- """
- for ind, content in enumerate(contents):
- if "<table" in content:
- # 表格处理
- table = Extractor(content).parse()
- _tables = table.return_list()
- _tables = clear_spacing(_tables)
- _table_str = str(_tables)
- if re.search("|".join(companies), _table_str):
- companies = self.winner_intention_table(_tables, companies)
- if not companies:
- return False
- continue
- # 非表格处理
- companies = self.winner_intention_content(companies, content)
- if not companies:
- return False
- return companies
- def check0101(self, winner: str, detail: str, attach_text: dict, catch_content: CatchContentObject) -> bool:
- """
- 公司名称检测
- :param winner:中标单位,多个逗号分割
- :param detail: 公告
- :param attach_text: 附件解析结果
- :param catch_content: 单挑数据缓存
- :return:返回true 代表异常
- """
- companies = [company for company in winner.split(",") if company]
- contents = catch_content.public_attachment_catch(detail, platform="html", document_id="公告")
- companies = self.check_company_name(contents, companies)
- if not companies:
- return False
- for attach_index, attach_content in attach_text.items():
- if attach_content:
- for topic_index, topic_detail in attach_content.items():
- # oss地址
- attach_url = topic_detail.get("attach_url", "")
- if attach_url:
- # 获取附件内容
- st, content = fsc.download_text_content(attach_url)
- # 下载成功
- # 超长文本不处理,暂定30万字
- if st and content.strip():
- if len(content) > 300000:
- continue
- # 开始检测
- contents = catch_content.public_attachment_catch(content, platform="attach",
- document_id=attach_url)
- companies = self.check_company_name(contents, companies)
- if not companies:
- return False
- return True
- def check0201(self, winner: str, detail: str, attach_text: dict, catch_content: CatchContentObject) -> bool:
- """
- 公司名称检测
- :param winner:中标单位,多个逗号分割
- :param detail: 公告
- :param attach_text: 附件解析结果
- :param catch_content: 单挑数据缓存
- :return:返回true 代表异常
- """
- companies = [company for company in winner.split(",") if company] # 多中标人
- # 公告意图检测
- contents = catch_content.public_attachment_catch(detail, platform="html", document_id="公告")
- companies = self.check_intention(contents, companies)
- if not companies:
- return False
- # 附件意图检测
- for attach_index, attach_content in attach_text.items():
- if attach_content:
- for topic_index, topic_detail in attach_content.items():
- # oss地址
- attach_url = topic_detail.get("attach_url", "")
- if attach_url:
- # 获取附件内容
- st, content = fsc.download_text_content(attach_url)
- # 下载成功
- # 超长文本不处理,暂定30万字
- if st and content.strip():
- if len(content) > 300000:
- continue
- # 开始检测
- contents = catch_content.public_attachment_catch(content, platform="attach",
- document_id=attach_url)
- companies = self.check_intention(contents, companies)
- if not companies:
- return False
- return True
- def check0103(self,winner:str):
- #中标单位名称以异常词开始
- with open(abnormal_config["table_field_config"]["path1"], "r") as f:
- reads = csv.reader(f)
- for n in reads:
- p1 = re.compile("^"+n[0])
- if p1.match(winner):
- return True
- # 中标单位名称包含异常词
- with open(abnormal_config["table_field_config"]["path2"], "r") as f:
- reads = csv.reader(f)
- for n in reads:
- if n[0] in winner:
- return True
- # 中标单位名称以异常词结尾
- with open(abnormal_config["table_field_config"]["path3"], "r") as f:
- reads = csv.reader(f)
- for w in reads:
- if re.search(f"{w[0]}$", winner):
- return True
- return False
|