# coding:utf-8 import pdfplumber from file_processing.models.picture_extract import extract_image from file_processing.models.table_extract import TableStruct from loguru import logger from docs.config import error_number from file_processing.file_picture import ocr from file_processing.file_picture import ocr_patch from file_processing.models.picture_extract import Picture import os from pdf2image import convert_from_path def get_table_block(page): """ 表格查询 :param page:page对象 :return: """ parse_tables = [] tables = page.find_tables(table_settings={}) for table in tables: # 创建表格结构对象 table_bbox = TableStruct() table_bbox.parse(table) parse_tables.append(table_bbox) parse_tables = sorted(parse_tables, key=lambda x: x.min_x) return parse_tables def split_block(tables, words): """ 通过坐标对比生成表格块 :param tables:表格列表 :param words:word列表 :return: """ replace_blocks = [] for table in tables: blocks = [] for index, word in enumerate(words): min_y = word['top'] max_y = word['bottom'] # 坐标对比 if table.min_y < min_y and table.max_y > (max_y + min_y) / 2: blocks.append(index) blocks.sort() # 排序 # 添加块 replace_blocks.append((blocks[0], blocks[-1], table.contents)) return replace_blocks def section(words: list): """ pdf文章划分段落 :param words: :return: """ l_y0, l_y1 = 0, 0 # 初始化y轴坐标值 result = [] for word in words: y0, y1 = word['top'], word['bottom'] # 同一段落检测 if y1 - l_y1 < (l_y1 - l_y0) / 2: result[-1] += word.get("text", "") l_y0, l_y1 = y0, y1 continue # 更新y轴坐标值 l_y0, l_y1 = y0, y1 result.append(word.get("text", "")) return "\n".join(result) class PdfRead(object): def __init__(self): self._images = {} self._base_dir = "" def read_pdf(self, file_path: str) -> tuple: pdf = pdfplumber.open(file_path) screenshots_pages = [] self._base_dir = os.path.dirname(file_path) results = [] for page_number, page in enumerate(pdf.pages): # 获取当前页面的全部文本信息,包括表格中的文字(x,y,x,y) tables = get_table_block(page) words = page.extract_words() contents = self.__merge_table(tables, words) images = self.__extract_images(page) state = self.__check_images(images) if state and images and contents: contents = self.__merge_picture(contents, images) elif not contents: results.append(f"{page_number}->image") screenshots_pages.append(page_number) continue paragraph = section(contents) results.append(paragraph) pdf.close() if screenshots_pages: screenshots_result = self.__to_screenshots(file_path, screenshots_pages) ocr_result = ocr_patch(screenshots_result) for key, val in ocr_result.items(): if key < len(results): results[key] = val results = [result for result in results if result] return "\n".join(results), error_number["成功"] def __extract_images(self, page): """ 抽取图片 :param page: :return: """ images_result = [] images = page.images # 页面图片 page_width = page.width # 页面宽度 page_number = page.page_number # 页码 for image_number, image in enumerate(images): image_stream = image.get("stream", "") if not image_stream: continue image_name = f"{page_number}-{image_number}" image_path = extract_image(image_stream, self._base_dir, image_name) image_width = image.get("width", 0) image_high = image.get('height', 0) if image_path and image_width > page_width / 2: attr = {"min_y": image.get("y0", 0), "max_y": image.get("y1", 0), "width": image_width, "height": image_high, "image_path": image_path} picture = Picture(**attr) images_result.append(picture) return images_result @staticmethod def __check_images(images_result): """ 检查图片是不是重叠 :param images_result: :return: """ for base_ind in range(len(images_result)): base_img = images_result[base_ind] for cont_ind in range(base_ind + 1, len(images_result)): cont_img = images_result[cont_ind] if base_img.min_y < base_img.max_y < cont_img.min_y or cont_img.max_y < base_img.min_y < base_img.max_y: continue else: return False return True @staticmethod def __merge_picture(contents, images): images = sorted(images, key=lambda x: x.min_y) for image in images: img_min_y, img_max_y = image.min_y, image.max_y for ind in range(len(contents) - 1): word, last_word = contents[ind], contents[ind + 1] y0, y1 = word['top'], word['bottom'] l_y0, l_y1 = last_word['top'], last_word['bottom'] if y1 < img_min_y and img_max_y < l_y1: contents.insert(ind + 1, {"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.max_y}) if not contents: contents.append({"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.min_y}) continue end_word = contents[-1] e_y0, e_y1 = end_word['top'], end_word['bottom'] if not contents or e_y1 < img_min_y: contents.append({"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.min_y}) return contents @staticmethod def __merge_table(tables, words): """ 合并表格 :param tables: :param words: :return: """ contents = [] start = 0 replace_blocks = split_block(tables, words) for block in replace_blocks: contents.extend(words[start:block[0]]) contents.append({"text": block[2], 'top': block[0], "bottom": block[1], "type": "table"}) start = block[1] + 1 contents.extend(words[start:]) return contents def __to_screenshots(self, pdf_path, pages): """ 筛选需要识别的截图 :param pdf_path: :param pages: :return: """ image_real = [] images = convert_from_path(pdf_path) for ind, image in enumerate(images): if ind not in pages: continue try: image_name = "test{}.png".format(ind) # 生成图片的名称 image_path = os.path.join(self._base_dir, image_name) image.save(image_path, 'PNG') image_real.append((ind, image_path)) except Exception as e: print(e) continue # 释放内存 image = None return image_real if __name__ == '__main__': pdf_obj = PdfRead() result01 = pdf_obj.read_pdf('./docs/yc.pdf') print(result01)