file_pdf.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # coding:utf-8
  2. import pdfplumber
  3. from file_processing.models.picture_extract import extract_image
  4. from file_processing.models.table_extract import TableStruct
  5. from loguru import logger
  6. from docs.config import error_number
  7. from file_processing.file_picture import ocr
  8. from file_processing.file_picture import ocr_patch
  9. from file_processing.models.picture_extract import Picture
  10. import os
  11. from pdf2image import convert_from_path
  12. def get_table_block(page):
  13. """
  14. 表格查询
  15. :param page:page对象
  16. :return:
  17. """
  18. parse_tables = []
  19. tables = page.find_tables(table_settings={})
  20. for table in tables:
  21. # 创建表格结构对象
  22. table_bbox = TableStruct()
  23. table_bbox.parse(table)
  24. parse_tables.append(table_bbox)
  25. parse_tables = sorted(parse_tables, key=lambda x: x.min_x)
  26. return parse_tables
  27. def split_block(tables, words):
  28. """
  29. 通过坐标对比生成表格块
  30. :param tables:表格列表
  31. :param words:word列表
  32. :return:
  33. """
  34. replace_blocks = []
  35. for table in tables:
  36. blocks = []
  37. for index, word in enumerate(words):
  38. min_y = word['top']
  39. max_y = word['bottom']
  40. # 坐标对比
  41. if table.min_y < min_y and table.max_y > (max_y + min_y) / 2:
  42. blocks.append(index)
  43. blocks.sort() # 排序
  44. # 添加块
  45. replace_blocks.append((blocks[0], blocks[-1], table.contents))
  46. return replace_blocks
  47. def section(words: list):
  48. """
  49. pdf文章划分段落
  50. :param words:
  51. :return:
  52. """
  53. l_y0, l_y1 = 0, 0 # 初始化y轴坐标值
  54. result = []
  55. for word in words:
  56. y0, y1 = word['top'], word['bottom']
  57. # 同一段落检测
  58. if y1 - l_y1 < (l_y1 - l_y0) / 2:
  59. result[-1] += word.get("text", "")
  60. l_y0, l_y1 = y0, y1
  61. continue
  62. # 更新y轴坐标值
  63. l_y0, l_y1 = y0, y1
  64. result.append(word.get("text", ""))
  65. return "\n".join(result)
  66. class PdfRead(object):
  67. def __init__(self):
  68. self._images = {}
  69. self._base_dir = ""
  70. def read_pdf(self, file_path: str) -> tuple:
  71. pdf = pdfplumber.open(file_path)
  72. screenshots_pages = []
  73. self._base_dir = os.path.dirname(file_path)
  74. results = []
  75. for page_number, page in enumerate(pdf.pages):
  76. # 获取当前页面的全部文本信息,包括表格中的文字(x,y,x,y)
  77. tables = get_table_block(page)
  78. words = page.extract_words()
  79. contents = self.__merge_table(tables, words)
  80. images = self.__extract_images(page)
  81. state = self.__check_images(images)
  82. if state and images and contents:
  83. contents = self.__merge_picture(contents, images)
  84. elif not contents:
  85. results.append(f"{page_number}->image")
  86. screenshots_pages.append(page_number)
  87. continue
  88. paragraph = section(contents)
  89. results.append(paragraph)
  90. pdf.close()
  91. if screenshots_pages:
  92. screenshots_result = self.__to_screenshots(file_path, screenshots_pages)
  93. ocr_result = ocr_patch(screenshots_result)
  94. for key, val in ocr_result.items():
  95. if key < len(results):
  96. results[key] = val
  97. results = [result for result in results if result]
  98. return "\n".join(results), error_number["成功"]
  99. def __extract_images(self, page):
  100. """
  101. 抽取图片
  102. :param page:
  103. :return:
  104. """
  105. images_result = []
  106. images = page.images # 页面图片
  107. page_width = page.width # 页面宽度
  108. page_number = page.page_number # 页码
  109. for image_number, image in enumerate(images):
  110. image_stream = image.get("stream", "")
  111. if not image_stream:
  112. continue
  113. image_name = f"{page_number}-{image_number}"
  114. image_path = extract_image(image_stream, self._base_dir, image_name)
  115. image_width = image.get("width", 0)
  116. image_high = image.get('height', 0)
  117. if image_path and image_width > page_width / 2:
  118. attr = {"min_y": image.get("y0", 0), "max_y": image.get("y1", 0),
  119. "width": image_width, "height": image_high, "image_path": image_path}
  120. picture = Picture(**attr)
  121. images_result.append(picture)
  122. return images_result
  123. @staticmethod
  124. def __check_images(images_result):
  125. """
  126. 检查图片是不是重叠
  127. :param images_result:
  128. :return:
  129. """
  130. for base_ind in range(len(images_result)):
  131. base_img = images_result[base_ind]
  132. for cont_ind in range(base_ind + 1, len(images_result)):
  133. cont_img = images_result[cont_ind]
  134. if base_img.min_y < base_img.max_y < cont_img.min_y or cont_img.max_y < base_img.min_y < base_img.max_y:
  135. continue
  136. else:
  137. return False
  138. return True
  139. @staticmethod
  140. def __merge_picture(contents, images):
  141. images = sorted(images, key=lambda x: x.min_y)
  142. for image in images:
  143. img_min_y, img_max_y = image.min_y, image.max_y
  144. for ind in range(len(contents) - 1):
  145. word, last_word = contents[ind], contents[ind + 1]
  146. y0, y1 = word['top'], word['bottom']
  147. l_y0, l_y1 = last_word['top'], last_word['bottom']
  148. if y1 < img_min_y and img_max_y < l_y1:
  149. contents.insert(ind + 1, {"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.max_y})
  150. if not contents:
  151. contents.append({"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.min_y})
  152. continue
  153. end_word = contents[-1]
  154. e_y0, e_y1 = end_word['top'], end_word['bottom']
  155. if not contents or e_y1 < img_min_y:
  156. contents.append({"text": ocr(image.image_path), 'top': image.max_y, "bottom": image.min_y})
  157. return contents
  158. @staticmethod
  159. def __merge_table(tables, words):
  160. """
  161. 合并表格
  162. :param tables:
  163. :param words:
  164. :return:
  165. """
  166. contents = []
  167. start = 0
  168. replace_blocks = split_block(tables, words)
  169. for block in replace_blocks:
  170. contents.extend(words[start:block[0]])
  171. contents.append({"text": block[2], 'top': block[0], "bottom": block[1], "type": "table"})
  172. start = block[1] + 1
  173. contents.extend(words[start:])
  174. return contents
  175. def __to_screenshots(self, pdf_path, pages):
  176. """
  177. 筛选需要识别的截图
  178. :param pdf_path:
  179. :param pages:
  180. :return:
  181. """
  182. image_real = []
  183. images = convert_from_path(pdf_path)
  184. for ind, image in enumerate(images):
  185. if ind not in pages:
  186. continue
  187. try:
  188. image_name = "test{}.png".format(ind) # 生成图片的名称
  189. image_path = os.path.join(self._base_dir, image_name)
  190. image.save(image_path, 'PNG')
  191. image_real.append((ind, image_path))
  192. except Exception as e:
  193. print(e)
  194. continue
  195. # 释放内存
  196. image = None
  197. return image_real
  198. if __name__ == '__main__':
  199. pdf_obj = PdfRead()
  200. result01 = pdf_obj.read_pdf('./docs/yc.pdf')
  201. print(result01)