| | from types import SimpleNamespace |
| | import pdfplumber |
| | import logging |
| | from langchain.docstore.document import Document |
| |
|
| | def prepare_table_config(crop_page): |
| | """Prepare table查找边界, 要求page为原始page |
| | |
| | From https://github.com/jsvine/pdfplumber/issues/242 |
| | """ |
| | page = crop_page.root_page |
| | cs = page.curves + page.edges |
| | def curves_to_edges(): |
| | """See https://github.com/jsvine/pdfplumber/issues/127""" |
| | edges = [] |
| | for c in cs: |
| | edges += pdfplumber.utils.rect_to_edges(c) |
| | return edges |
| | edges = curves_to_edges() |
| | return { |
| | "vertical_strategy": "explicit", |
| | "horizontal_strategy": "explicit", |
| | "explicit_vertical_lines": edges, |
| | "explicit_horizontal_lines": edges, |
| | "intersection_y_tolerance": 10, |
| | } |
| |
|
| | def get_text_outside_table(crop_page): |
| | ts = prepare_table_config(crop_page) |
| | if len(ts["explicit_vertical_lines"]) == 0 or len(ts["explicit_horizontal_lines"]) == 0: |
| | return crop_page |
| |
|
| | |
| | bboxes = [table.bbox for table in crop_page.root_page.find_tables(table_settings=ts)] |
| | def not_within_bboxes(obj): |
| | """Check if the object is in any of the table's bbox.""" |
| | def obj_in_bbox(_bbox): |
| | """See https://github.com/jsvine/pdfplumber/blob/stable/pdfplumber/table.py#L404""" |
| | v_mid = (obj["top"] + obj["bottom"]) / 2 |
| | h_mid = (obj["x0"] + obj["x1"]) / 2 |
| | x0, top, x1, bottom = _bbox |
| | return (h_mid >= x0) and (h_mid < x1) and (v_mid >= top) and (v_mid < bottom) |
| | return not any(obj_in_bbox(__bbox) for __bbox in bboxes) |
| |
|
| | return crop_page.filter(not_within_bboxes) |
| | |
| |
|
| | extract_words = lambda page: page.extract_words(keep_blank_chars=True, y_tolerance=0, x_tolerance=1, extra_attrs=["fontname", "size", "object_type"]) |
| | |
| |
|
| | def get_title_with_cropped_page(first_page): |
| | title = [] |
| | x0,top,x1,bottom = first_page.bbox |
| |
|
| | for word in extract_words(first_page): |
| | word = SimpleNamespace(**word) |
| |
|
| | if word.size >= 14: |
| | title.append(word.text) |
| | title_bottom = word.bottom |
| | elif word.text == "Abstract": |
| | top = word.top |
| |
|
| | user_info = [i["text"] for i in extract_words(first_page.within_bbox((x0,title_bottom,x1,top)))] |
| | |
| | return title, user_info, first_page.within_bbox((x0,top,x1,bottom)) |
| |
|
| | def get_column_cropped_pages(pages, two_column=True): |
| | new_pages = [] |
| | for page in pages: |
| | if two_column: |
| | left = page.within_bbox((0, 0, page.width/2, page.height),relative=True) |
| | right = page.within_bbox((page.width/2, 0, page.width, page.height), relative=True) |
| | new_pages.append(left) |
| | new_pages.append(right) |
| | else: |
| | new_pages.append(page) |
| |
|
| | return new_pages |
| |
|
| | def parse_pdf(filename, two_column = True): |
| | level = logging.getLogger().level |
| | if level == logging.getLevelName("DEBUG"): |
| | logging.getLogger().setLevel("INFO") |
| |
|
| | with pdfplumber.open(filename) as pdf: |
| | title, user_info, first_page = get_title_with_cropped_page(pdf.pages[0]) |
| | new_pages = get_column_cropped_pages([first_page] + pdf.pages[1:], two_column) |
| |
|
| | chapters = [] |
| | |
| | create_chapter = lambda page_start,name_top,name_bottom: SimpleNamespace( |
| | name=[], |
| | name_top=name_top, |
| | name_bottom=name_bottom, |
| | record_chapter_name = True, |
| |
|
| | page_start=page_start, |
| | page_stop=None, |
| |
|
| | text=[], |
| | ) |
| | cur_chapter = None |
| |
|
| | |
| | for idx, page in enumerate(new_pages): |
| | page = get_text_outside_table(page) |
| |
|
| | |
| | for word in extract_words(page): |
| | word = SimpleNamespace(**word) |
| |
|
| | |
| | if word.size >= 11: |
| | if cur_chapter is None: |
| | cur_chapter = create_chapter(page.page_number, word.top, word.bottom) |
| | elif not cur_chapter.record_chapter_name or (cur_chapter.name_bottom != cur_chapter.name_bottom and cur_chapter.name_top != cur_chapter.name_top): |
| | |
| | cur_chapter.page_stop = page.page_number |
| | chapters.append(cur_chapter) |
| | |
| | cur_chapter = create_chapter(page.page_number, word.top, word.bottom) |
| |
|
| | |
| | cur_chapter.name.append(word.text) |
| | else: |
| | cur_chapter.record_chapter_name = False |
| | cur_chapter.text.append(word.text) |
| | else: |
| | |
| | cur_chapter.page_stop = page.page_number |
| | chapters.append(cur_chapter) |
| |
|
| | for i in chapters: |
| | logging.info(f"section: {i.name} pages:{i.page_start, i.page_stop} word-count:{len(i.text)}") |
| | logging.debug(" ".join(i.text)) |
| |
|
| | title = " ".join(title) |
| | user_info = " ".join(user_info) |
| | text = f"Article Title: {title}, Information:{user_info}\n" |
| | for idx, chapter in enumerate(chapters): |
| | chapter.name = " ".join(chapter.name) |
| | text += f"The {idx}th Chapter {chapter.name}: " + " ".join(chapter.text) + "\n" |
| |
|
| | logging.getLogger().setLevel(level) |
| | return Document(page_content=text, metadata={"title": title}) |
| |
|
| |
|
| | if __name__ == '__main__': |
| | |
| | z = parse_pdf("./build/test.pdf") |
| | print(z["user_info"]) |
| | print(z["title"]) |