# file_handler/file_utils.py #import os from pathlib import Path from itertools import chain from typing import List, Union, Any, Mapping from PIL import Image import utils.config as config ##SMY: Might be deprecated vis duplicated. See marker/marker/config/parser.py ~ https://github.com/datalab-to/marker/blob/master/marker/config/parser.py#L169 #def create_outputdir(root: Union[str, Path], out_dir:Union[str, Path] = None) -> Path: #List[Path]: def create_outputdir(root: Union[str, Path], output_dir_string:str = None) -> Path: #List[Path]: """ Create output dir under the input folder """ ''' ##preserved for future implementation if needed again root = root if isinstance(root, Path) else Path(root) #root = Path(root) if not root.exists(): raise FileNotFoundError(f"Root path {root} does not exist: cannot create output dir.") out_dir = out_dir if out_dir else "output_md" ## SMY: default to outputdir in config file = "output_md" output_dir = root.parent / out_dir #"md_output" ##SMY: concatenating output str with src Path ''' ## map to img_path. Opt to putting output within same output_md folder rather than individual source folders output_dir_string = output_dir_string if output_dir_string else "output_dir" ##redundant SMY: default to outputdir in config file = "output_md" output_dir = Path("data") / output_dir_string #"output_md" ##SMY: concatenating output str with src Path output_dir.mkdir(mode=0o2644, parents=True, exist_ok=True) return output_dir def is_file_with_extension(path_obj: Path) -> bool: """ Checks if a pathlib.Path object is a file and has a non-empty extension. """ path_obj = path_obj if isinstance(path_obj, Path) else Path(path_obj) if isinstance(path_obj, str) else None return path_obj.is_file() and bool(path_obj.suffix) def process_dicts_data(data:Union[dict, list[dict]]): """ Returns formatted JSON string for a single dictionary or a list of dictionaries""" import json from pathlib import WindowsPath #from typing import dict, list # Serialise WindowsPath objects to strings using custom json.JSoNEncoder subclass class PathEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, WindowsPath): return str(obj) # Let the base class default method raise the TypeError for other types return json.JSONEncoder.default(self, obj) # Convert the list of dicts to a formatted JSON string formatted_string = json.dumps(data, indent=4, cls=PathEncoder) return formatted_string ##NB: Python =>3.10, X | Y equiv to the type checker as Union[X, Y] def collect_pdf_html_paths(root: Union[str, Path]) -> List[Path]: """ Recursively walk *root* and return a list of all PDF files. """ root = Path(root) patterns = ["*.pdf", "*.html"] #, "*.htm*"] if not root.exists(): raise FileNotFoundError(f"Root path {root} does not exist.") #pdfs_htmls = [p for p in root.rglob("*.pdf", "*.html", "*.htm*") if p.is_file()] #pdfs_htmls = [chain.from_iterable(root.rglob(pattern) for pattern in patterns)] # Use itertools.chain to combine the generators from multiple rglob calls pdfs_htmls = list(chain.from_iterable(root.rglob(pattern) for pattern in patterns)) return pdfs_htmls def collect_pdf_paths(root: Union[str, Path]) -> List[Path]: """ Recursively walk *root* and return a list of all PDF files. """ root = Path(root) if not root.exists(): raise FileNotFoundError(f"Root path {root} does not exist.") pdfs = [p for p in root.rglob("*.pdf") if p.is_file()] return pdfs def collect_html_paths(root: Union[str, Path]) -> List[Path]: """ Recursively walk *root* and return a list of all PDF files. """ root = Path(root) if not root.exists(): raise FileNotFoundError(f"Root path {root} does not exist.") htmls = [p for p in root.rglob("*.html", ".htm") if p.is_file()] ## SMY: TODO: convert htmls to PDF. Marker will by default attempt weasyprint which typically raise 'libgobject-2' error on Win return htmls def collect_markdown_paths(root: Union[str, Path]) -> List[Path]: """ Recursively walk *root* and return a list of all Markdown files. """ root = Path(root) md_files = [p for p in root.rglob("*.md") if p.is_file()] return md_files #m __future__ import annotations def write_markdown( src_path: Union[str, Path], output_dir: Union[str, Path], rendered: Any, ) -> Path: """ Write the Markdown representation of a source file to an output directory. Parameters ---------- src_path : str | Path Path to the original source file. Only its base name is used for naming the resulting Markdown file. output_dir : str | Path Directory where the Markdown file will be written. It was created if it does not exist with create_outputdir(). rendered : object Object that provides a ``markdown`` attribute containing the text to write. Returns ------- pathlib.Path The full path of the written Markdown file. Raises ------ FileNotFoundError If *src_path* does not point to an existing file. OSError If writing the file fails for any reason (e.g. permission denied). AttributeError If *rendered* does not expose a ``markdown`` attribute. Notes ----- The function is intentionally lightweight: it only handles path resolution, directory creation, and file I/O. All rendering logic should be performed before calling this helper. """ src = Path(src_path) if not src.is_file(): raise FileNotFoundError(f"Source file does not exist: {src}") #out_dir = Path(output_dir) #out_dir.mkdir(parents=True, exist_ok=True) md_name = f"{src.stem}.md" if isinstance(output_dir, Path): md_path = output_dir / f"{src.stem}" / md_name else: #md_path = Path(src.parent) / f"{Path(output_dir).stem}" / f"{src.stem}" / md_name ## Opt to putting output within same output_md folder rather than individual source folders #md_path = Path("data\\pdf") / "output_md" / f"{src.stem}" / md_name ##debug md_path = Path("data") / output_dir / f"{src.stem}" / md_name ##debug ##SMY: [resolved] Permission Errno13 - https://stackoverflow.com/a/57454275 md_path.parent.mkdir(mode=0o2644, parents=True, exist_ok=True) ##SMY: create nested md_path if not exists md_path.parent.chmod(0) try: markdown_text = getattr(rendered, "markdown") ##SMY: get extracted markdown except AttributeError as exc: # pragma: no cover raise AttributeError( "Extractor Rendered object must have a 'markdown' attribute" ) from exc with md_path.open(mode="w", encoding="utf-8") as md_f: md_f.write(markdown_text) ##SMY: write markdown content to markdown file return md_path ##SMY: return the markdown file #✓ #return {"files": md_path} ##SMY: return dict of file with markdown filename. # Dummp Markdown extracted images def dump_images( src_path: Union[str, Path], output_dir: Union[str, Path], rendered: Any, ) -> int: """ Dump the images of the Markdown representation of a source file to an output directory. Parameters ---------- src_path : str | Path Path to the original source file. Only its base name is used for naming the resulting Markdown file. output_dir : str | Path Directory where the Markdown file will be written. It was created if it does not exist with create_outputdir(). rendered : object Object that provides a ``markdown`` attribute containing the text to write. Returns ------- Number of images dumped from the Markdown file. """ try: images: Image.Image = getattr(rendered, "images") except TypeError as exc: # pragma: no cover raise AttributeError( "Extracted images from rendered.images must be a mapping of str -> PIL.Image" ) from exc # Initialise variables images_count = 0 img_path_list = [] ##SMY: See marker.output.save_output() : https://github.com/datalab-to/marker/blob/master/marker/output.py #for img_name, img_bytes in images.items(): src = Path(src_path) ##SMY: keep uniform with write_markdown. No need is exists anymore for img_name, img in images.items(): # Resolve the full path and make sure any sub‑directories exist. #img_path = Path(output_dir) / src_path / img_name ##SMY: image files ##concatenate Path + str #img_path = create_outputdir(src_path) / img_name if isinstance(output_dir, Path): img_path = output_dir.stem / img_name else: # #img_path = Path(output_dir) / f"{src.stem}" / img_name ##SMY: create markdown file ##SMY concatenating Path with str # #img_path = Path(output_dir) / img_name ##SMY: create markdown file ##SMY concatenating Path with str #img_path = Path(src.parent) / f"{Path(output_dir).stem}" / f"{src.stem}" / img_name #img_path = Path("data\\pdf") / "output_md" / f"{src.stem}" / img_name ##debug img_path = Path("data") / output_dir / f"{src.stem}" / img_name ##debug #img_path.mkdir(mode=0o777, parents=True, exist_ok=True) ##SMY: create nested img_path if not exists #img_path.parent.mkdir(parents=True, exist_ok=True) img.save(img_path) ##SMY: save images (of type PIL.Image.Image) to markdown folder images_count += 1 #img_path_list = img_path_list.append(img_path) img_path_list.append(img_path) return images_count, img_path_list ##SMY: return number of images and path #return images.items().count #return len(images) # Dummp Markdown extracted images ##SMY: Marked for deprecated ''' def dump_images( src_path: Union[str, Path], output_dir: Union[str, Path], rendered: Any, ) -> int: """ Dump the images of the Markdown representation of a source file to an output directory. Parameters ---------- src_path : str | Path Path to the original source file. Only its base name is used for naming the resulting Markdown file. output_dir : str | Path Directory where the Markdown file will be written. It was created if it does not exist with create_outputdir(). rendered : object Object that provides a ``markdown`` attribute containing the text to write. Returns ------- Number of images dumped from the Markdown file. """ try: images: Mapping[str, bytes] = getattr(rendered, "images") except TypeError as exc: # pragma: no cover raise AttributeError( "Extracted images from rendered.images must be a mapping of str -> bytes" ) from exc images_count = 0 ##SMY: See marker.output.save_output() : https://github.com/datalab-to/marker/blob/master/marker/output.py #for img_name, img_bytes in images.items(): for img_name, img in images.items(): # Resolve the full path and make sure any sub‑directories exist. img_path = Path(output_dir) / src_path / img_name ##SMY: image files ##concatenate Path + str img_path.parent.mkdir(parents=True, exist_ok=True) #'' ' #with img_path.open("wb") as fp: # fp.write(img_bytes) ##SMY: write images to markdown folder #images_count += 1 #'' ' img.save(img_path) ##SMY: save images (of type PIL.Image.Image) to markdown folder images_count += 1 return images_count ##SMY: return number of images #return images.items().count #return len(images) '''