parserPDF / file_handler /file_utils.py
semmyk's picture
initial commit
0fd441a
raw
history blame
12 kB
# file_handler/file_utils.py
#import os
from pathlib import Path
from itertools import chain
from typing import List, Union, Any, Mapping
from PIL import Image
import utils.config as config
##SMY: Might be deprecated vis duplicated. See marker/marker/config/parser.py ~ https://github.com/datalab-to/marker/blob/master/marker/config/parser.py#L169
#def create_outputdir(root: Union[str, Path], out_dir:Union[str, Path] = None) -> Path: #List[Path]:
def create_outputdir(root: Union[str, Path], output_dir_string:str = None) -> Path: #List[Path]:
""" Create output dir under the input folder """
''' ##preserved for future implementation if needed again
root = root if isinstance(root, Path) else Path(root)
#root = Path(root)
if not root.exists():
raise FileNotFoundError(f"Root path {root} does not exist: cannot create output dir.")
out_dir = out_dir if out_dir else "output_md" ## SMY: default to outputdir in config file = "output_md"
output_dir = root.parent / out_dir #"md_output" ##SMY: concatenating output str with src Path
'''
## map to img_path. Opt to putting output within same output_md folder rather than individual source folders
output_dir_string = output_dir_string if output_dir_string else "output_dir" ##redundant SMY: default to outputdir in config file = "output_md"
output_dir = Path("data") / output_dir_string #"output_md" ##SMY: concatenating output str with src Path
output_dir.mkdir(mode=0o2644, parents=True, exist_ok=True)
return output_dir
def is_file_with_extension(path_obj: Path) -> bool:
"""
Checks if a pathlib.Path object is a file and has a non-empty extension.
"""
path_obj = path_obj if isinstance(path_obj, Path) else Path(path_obj) if isinstance(path_obj, str) else None
return path_obj.is_file() and bool(path_obj.suffix)
def process_dicts_data(data:Union[dict, list[dict]]):
""" Returns formatted JSON string for a single dictionary or a list of dictionaries"""
import json
from pathlib import WindowsPath
#from typing import dict, list
# Serialise WindowsPath objects to strings using custom json.JSoNEncoder subclass
class PathEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, WindowsPath):
return str(obj)
# Let the base class default method raise the TypeError for other types
return json.JSONEncoder.default(self, obj)
# Convert the list of dicts to a formatted JSON string
formatted_string = json.dumps(data, indent=4, cls=PathEncoder)
return formatted_string
##NB: Python =>3.10, X | Y equiv to the type checker as Union[X, Y]
def collect_pdf_html_paths(root: Union[str, Path]) -> List[Path]:
"""
Recursively walk *root* and return a list of all PDF files.
"""
root = Path(root)
patterns = ["*.pdf", "*.html"] #, "*.htm*"]
if not root.exists():
raise FileNotFoundError(f"Root path {root} does not exist.")
#pdfs_htmls = [p for p in root.rglob("*.pdf", "*.html", "*.htm*") if p.is_file()]
#pdfs_htmls = [chain.from_iterable(root.rglob(pattern) for pattern in patterns)]
# Use itertools.chain to combine the generators from multiple rglob calls
pdfs_htmls = list(chain.from_iterable(root.rglob(pattern) for pattern in patterns))
return pdfs_htmls
def collect_pdf_paths(root: Union[str, Path]) -> List[Path]:
"""
Recursively walk *root* and return a list of all PDF files.
"""
root = Path(root)
if not root.exists():
raise FileNotFoundError(f"Root path {root} does not exist.")
pdfs = [p for p in root.rglob("*.pdf") if p.is_file()]
return pdfs
def collect_html_paths(root: Union[str, Path]) -> List[Path]:
"""
Recursively walk *root* and return a list of all PDF files.
"""
root = Path(root)
if not root.exists():
raise FileNotFoundError(f"Root path {root} does not exist.")
htmls = [p for p in root.rglob("*.html", ".htm") if p.is_file()]
## SMY: TODO: convert htmls to PDF. Marker will by default attempt weasyprint which typically raise 'libgobject-2' error on Win
return htmls
def collect_markdown_paths(root: Union[str, Path]) -> List[Path]:
"""
Recursively walk *root* and return a list of all Markdown files.
"""
root = Path(root)
md_files = [p for p in root.rglob("*.md") if p.is_file()]
return md_files
#m __future__ import annotations
def write_markdown(
src_path: Union[str, Path],
output_dir: Union[str, Path],
rendered: Any,
) -> Path:
"""
Write the Markdown representation of a source file to an output directory.
Parameters
----------
src_path : str | Path
Path to the original source file. Only its base name is used for naming
the resulting Markdown file.
output_dir : str | Path
Directory where the Markdown file will be written. It was created if it does not
exist with create_outputdir().
rendered : object
Object that provides a ``markdown`` attribute containing the text to write.
Returns
-------
pathlib.Path
The full path of the written Markdown file.
Raises
------
FileNotFoundError
If *src_path* does not point to an existing file.
OSError
If writing the file fails for any reason (e.g. permission denied).
AttributeError
If *rendered* does not expose a ``markdown`` attribute.
Notes
-----
The function is intentionally lightweight: it only handles path resolution,
directory creation, and file I/O. All rendering logic should be performed before
calling this helper.
"""
src = Path(src_path)
if not src.is_file():
raise FileNotFoundError(f"Source file does not exist: {src}")
#out_dir = Path(output_dir)
#out_dir.mkdir(parents=True, exist_ok=True)
md_name = f"{src.stem}.md"
if isinstance(output_dir, Path):
md_path = output_dir / f"{src.stem}" / md_name
else:
#md_path = Path(src.parent) / f"{Path(output_dir).stem}" / f"{src.stem}" / md_name
## Opt to putting output within same output_md folder rather than individual source folders
#md_path = Path("data\\pdf") / "output_md" / f"{src.stem}" / md_name ##debug
md_path = Path("data") / output_dir / f"{src.stem}" / md_name ##debug
##SMY: [resolved] Permission Errno13 - https://stackoverflow.com/a/57454275
md_path.parent.mkdir(mode=0o2644, parents=True, exist_ok=True) ##SMY: create nested md_path if not exists
md_path.parent.chmod(0)
try:
markdown_text = getattr(rendered, "markdown") ##SMY: get extracted markdown
except AttributeError as exc: # pragma: no cover
raise AttributeError(
"Extractor Rendered object must have a 'markdown' attribute"
) from exc
with md_path.open(mode="w", encoding="utf-8") as md_f:
md_f.write(markdown_text) ##SMY: write markdown content to markdown file
return md_path ##SMY: return the markdown file #✓
#return {"files": md_path} ##SMY: return dict of file with markdown filename.
# Dummp Markdown extracted images
def dump_images(
src_path: Union[str, Path],
output_dir: Union[str, Path],
rendered: Any,
) -> int:
"""
Dump the images of the Markdown representation of a source file to an output directory.
Parameters
----------
src_path : str | Path
Path to the original source file. Only its base name is used for naming
the resulting Markdown file.
output_dir : str | Path
Directory where the Markdown file will be written. It was created if it does not
exist with create_outputdir().
rendered : object
Object that provides a ``markdown`` attribute containing the text to write.
Returns
-------
Number of images dumped from the Markdown file.
"""
try:
images: Image.Image = getattr(rendered, "images")
except TypeError as exc: # pragma: no cover
raise AttributeError(
"Extracted images from rendered.images must be a mapping of str -> PIL.Image"
) from exc
# Initialise variables
images_count = 0
img_path_list = []
##SMY: See marker.output.save_output() : https://github.com/datalab-to/marker/blob/master/marker/output.py
#for img_name, img_bytes in images.items():
src = Path(src_path) ##SMY: keep uniform with write_markdown. No need is exists anymore
for img_name, img in images.items():
# Resolve the full path and make sure any sub‑directories exist.
#img_path = Path(output_dir) / src_path / img_name ##SMY: image files ##concatenate Path + str
#img_path = create_outputdir(src_path) / img_name
if isinstance(output_dir, Path):
img_path = output_dir.stem / img_name
else:
# #img_path = Path(output_dir) / f"{src.stem}" / img_name ##SMY: create markdown file ##SMY concatenating Path with str
# #img_path = Path(output_dir) / img_name ##SMY: create markdown file ##SMY concatenating Path with str
#img_path = Path(src.parent) / f"{Path(output_dir).stem}" / f"{src.stem}" / img_name
#img_path = Path("data\\pdf") / "output_md" / f"{src.stem}" / img_name ##debug
img_path = Path("data") / output_dir / f"{src.stem}" / img_name ##debug
#img_path.mkdir(mode=0o777, parents=True, exist_ok=True) ##SMY: create nested img_path if not exists
#img_path.parent.mkdir(parents=True, exist_ok=True)
img.save(img_path) ##SMY: save images (of type PIL.Image.Image) to markdown folder
images_count += 1
#img_path_list = img_path_list.append(img_path)
img_path_list.append(img_path)
return images_count, img_path_list ##SMY: return number of images and path
#return images.items().count
#return len(images)
# Dummp Markdown extracted images ##SMY: Marked for deprecated
'''
def dump_images(
src_path: Union[str, Path],
output_dir: Union[str, Path],
rendered: Any,
) -> int:
"""
Dump the images of the Markdown representation of a source file to an output directory.
Parameters
----------
src_path : str | Path
Path to the original source file. Only its base name is used for naming
the resulting Markdown file.
output_dir : str | Path
Directory where the Markdown file will be written. It was created if it does not
exist with create_outputdir().
rendered : object
Object that provides a ``markdown`` attribute containing the text to write.
Returns
-------
Number of images dumped from the Markdown file.
"""
try:
images: Mapping[str, bytes] = getattr(rendered, "images")
except TypeError as exc: # pragma: no cover
raise AttributeError(
"Extracted images from rendered.images must be a mapping of str -> bytes"
) from exc
images_count = 0
##SMY: See marker.output.save_output() : https://github.com/datalab-to/marker/blob/master/marker/output.py
#for img_name, img_bytes in images.items():
for img_name, img in images.items():
# Resolve the full path and make sure any sub‑directories exist.
img_path = Path(output_dir) / src_path / img_name ##SMY: image files ##concatenate Path + str
img_path.parent.mkdir(parents=True, exist_ok=True)
#'' '
#with img_path.open("wb") as fp:
# fp.write(img_bytes) ##SMY: write images to markdown folder
#images_count += 1
#'' '
img.save(img_path) ##SMY: save images (of type PIL.Image.Image) to markdown folder
images_count += 1
return images_count ##SMY: return number of images
#return images.items().count
#return len(images)
'''