123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- # coding:utf-8
- import pdfplumber
- from file_processing.models.picture_extract import extract_image
- from file_processing.models.table_extract import TableStruct
- from loguru import logger
- from docs.config import error_number
- from file_processing.file_picture import ocr
- from file_processing.file_picture import ocr_patch
- from file_processing.models.picture_extract import Picture
- import os
- from pdf2image import convert_from_path
- def get_table_block(page):
- """
- 表格查询
- :param page:page对象
- :return:
- """
- parse_tables = []
- tables = page.find_tables(table_settings={})
- for table in tables:
- # 创建表格结构对象
- table_bbox = TableStruct()
- table_bbox.parse(table)
- parse_tables.append(table_bbox)
- parse_tables = sorted(parse_tables, key=lambda x: x.min_x)
- return parse_tables
- def split_block(tables, words):
- """
- 通过坐标对比生成表格块
- :param tables:表格列表
- :param words:word列表
- :return:
- """
- replace_blocks = []
- for table in tables:
- blocks = []
- for index, word in enumerate(words):
- min_y = word['top']
- max_y = word['bottom']
- # 坐标对比
- if table.min_y < min_y and table.max_y > (max_y + min_y) / 2:
- blocks.append(index)
- blocks.sort() # 排序
- # 添加块
- replace_blocks.append((blocks[0], blocks[-1], table.contents))
- return replace_blocks
- def section(words: list):
- """
- pdf文章划分段落
- :param words:
- :return:
- """
- l_y0, l_y1 = 0, 0 # 初始化y轴坐标值
- result = []
- for word in words:
- y0, y1 = word['top'], word['bottom']
- # 同一段落检测
- if y1 - l_y1 < (l_y1 - l_y0) / 2:
- result[-1] += word.get("text", "")
- l_y0, l_y1 = y0, y1
- continue
- # 更新y轴坐标值
- l_y0, l_y1 = y0, y1
- result.append(word.get("text", ""))
- return "\n".join(result)
- class PdfRead(object):
- def __init__(self):
- self._images = {}
- self._base_dir = ""
- def read_pdf(self, file_path: str) -> tuple:
- pdf = pdfplumber.open(file_path)
- screenshots_pages = []
- self._base_dir = os.path.dirname(file_path)
- results = []
- for page_number, page in enumerate(pdf.pages):
- # 获取当前页面的全部文本信息,包括表格中的文字(x,y,x,y)
- tables = get_table_block(page)
- words = page.extract_words()
- contents = self.__merge_table(tables, words)
- images = self.__extract_images(page)
- state = self.__check_images(images)
- if state and images and contents:
- contents = self.__merge_picture(contents, images)
- elif not contents:
- results.append(f"{page_number}->image")
- screenshots_pages.append(page_number)
- continue
- paragraph = section(contents)
- results.append(paragraph)
- pdf.close()
- if screenshots_pages:
- screenshots_result = self.__to_screenshots(file_path, screenshots_pages)
- ocr_result = ocr_patch(screenshots_result)
- for key, val in ocr_result.items():
- if key < len(results):
- results[key] = val
- results = [result for result in results if result]
- return "\n".join(results), error_number["成功"]
- def __extract_images(self, page):
- """
- 抽取图片
- :param page:
- :return:
- """
- images_result = []
- images = page.images # 页面图片
- page_width = page.width # 页面宽度
- page_number = page.page_number # 页码
- for image_number, image in enumerate(images):
- image_stream = image.get("stream", "")
- if not image_stream:
- continue
- image_name = f"{page_number}-{image_number}"
- image_path = extract_image(image_stream, self._base_dir, image_name)
- image_width = image.get("width", 0)
- image_high = image.get('height', 0)
- if image_path and image_width > page_width / 2:
- attr = {"min_y": image.get("y0", 0), "max_y": image.get("y1", 0),
- "width": image_width, "height": image_high, "image_path": image_path}
- picture = Picture(**attr)
- images_result.append(picture)
- return images_result
- @staticmethod
- def __check_images(images_result):
- """
- 检查图片是不是重叠
- :param images_result:
- :return:
- """
- for base_ind in range(len(images_result)):
- base_img = images_result[base_ind]
- for cont_ind in range(base_ind + 1, len(images_result)):
- cont_img = images_result[cont_ind]
- if base_img.min_y < base_img.max_y < cont_img.min_y or cont_img.max_y < base_img.min_y < base_img.max_y:
- continue
- else:
- return False
- return True
- @staticmethod
- def __merge_picture(contents, images):
- images = sorted(images, key=lambda x: x.min_y)
- for image in images:
- img_min_y, img_max_y = image.min_y, image.max_y
- for ind in range(len(contents) - 1):
- word, last_word = contents[ind], contents[ind + 1]
- y0, y1 = word['top'], word['bottom']
- l_y0, l_y1 = last_word['top'], last_word['bottom']
- if y1 < img_min_y and img_max_y < l_y1:
- contents.insert(ind + 1, {"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.max_y})
- if not contents:
- contents.append({"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.min_y})
- continue
- end_word = contents[-1]
- e_y0, e_y1 = end_word['top'], end_word['bottom']
- if not contents or e_y1 < img_min_y:
- contents.append({"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.min_y})
- return contents
- @staticmethod
- def __merge_table(tables, words):
- """
- 合并表格
- :param tables:
- :param words:
- :return:
- """
- contents = []
- start = 0
- replace_blocks = split_block(tables, words)
- for block in replace_blocks:
- contents.extend(words[start:block[0]])
- contents.append({"text": block[2], 'top': block[0], "bottom": block[1], "type": "table"})
- start = block[1] + 1
- contents.extend(words[start:])
- return contents
- def __to_screenshots(self, pdf_path, pages):
- """
- 筛选需要识别的截图
- :param pdf_path:
- :param pages:
- :return:
- """
- image_real = []
- images = convert_from_path(pdf_path)
- for ind, image in enumerate(images):
- if ind not in pages:
- continue
- try:
- image_name = "test{}.png".format(ind) # 生成图片的名称
- image_path = os.path.join(self._base_dir, image_name)
- image.save(image_path, 'PNG')
- image_real.append((ind, image_path))
- except Exception as e:
- print(e)
- continue
- # 释放内存
- image = None
- return image_real
- if __name__ == '__main__':
- pdf_obj = PdfRead()
- result01 = pdf_obj.read_pdf('./docs/yc.pdf')
- print(result01)
|