File size: 11,991 Bytes
0fd441a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# file_handler/file_utils.py
#import os
from pathlib import Path
from itertools import chain
from typing import List, Union, Any, Mapping
from PIL import Image

import utils.config as config

##SMY: Might be deprecated vis duplicated. See marker/marker/config/parser.py  ~ https://github.com/datalab-to/marker/blob/master/marker/config/parser.py#L169
#def create_outputdir(root: Union[str, Path], out_dir:Union[str, Path] = None) -> Path:  #List[Path]:
def create_outputdir(root: Union[str, Path], output_dir_string:str = None) -> Path:  #List[Path]:
    """ Create output dir under the input folder """
    
    '''  ##preserved for future implementation if needed again
    root = root if isinstance(root, Path) else Path(root)  
    #root = Path(root)
    if not root.exists():
        raise FileNotFoundError(f"Root path {root} does not exist: cannot create output dir.")
    out_dir = out_dir if out_dir else "output_md"  ## SMY: default to outputdir in config file = "output_md"
    output_dir = root.parent / out_dir  #"md_output"  ##SMY: concatenating output str with src Path
    '''

    ## map to img_path. Opt to putting output within same output_md folder rather than individual source folders
    output_dir_string = output_dir_string if output_dir_string else "output_dir"  ##redundant SMY: default to outputdir in config file = "output_md"
    output_dir = Path("data") / output_dir_string  #"output_md"  ##SMY: concatenating output str with src Path
    output_dir.mkdir(mode=0o2644, parents=True, exist_ok=True)
    return output_dir

def is_file_with_extension(path_obj: Path) -> bool:
    """
    Checks if a pathlib.Path object is a file and has a non-empty extension.
    """
    path_obj = path_obj if isinstance(path_obj, Path) else Path(path_obj) if isinstance(path_obj, str) else None
    return path_obj.is_file() and bool(path_obj.suffix)

def process_dicts_data(data:Union[dict, list[dict]]):
    """ Returns formatted JSON string for a single dictionary or a list of dictionaries"""
    import json
    from pathlib import WindowsPath
    #from typing import dict, list

    # Serialise WindowsPath objects to strings using custom json.JSoNEncoder subclass
    class PathEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, WindowsPath):
                return str(obj)
            # Let the base class default method raise the TypeError for other types
            return json.JSONEncoder.default(self, obj)

    # Convert the list of dicts to a formatted JSON string
    formatted_string = json.dumps(data, indent=4, cls=PathEncoder)
    
    return formatted_string

##NB: Python =>3.10, X | Y equiv to the type checker as Union[X, Y]
def collect_pdf_html_paths(root: Union[str, Path]) -> List[Path]:
    """
    Recursively walk *root* and return a list of all PDF files.
    """
    root = Path(root)
    patterns = ["*.pdf", "*.html"]  #, "*.htm*"]
    if not root.exists():
        raise FileNotFoundError(f"Root path {root} does not exist.")
    #pdfs_htmls = [p for p in root.rglob("*.pdf", "*.html", "*.htm*") if p.is_file()]
    #pdfs_htmls = [chain.from_iterable(root.rglob(pattern) for pattern in patterns)]
    # Use itertools.chain to combine the generators from multiple rglob calls
    pdfs_htmls = list(chain.from_iterable(root.rglob(pattern) for pattern in patterns))

    return pdfs_htmls

def collect_pdf_paths(root: Union[str, Path]) -> List[Path]:
    """
    Recursively walk *root* and return a list of all PDF files.
    """
    root = Path(root)
    if not root.exists():
        raise FileNotFoundError(f"Root path {root} does not exist.")
    pdfs = [p for p in root.rglob("*.pdf") if p.is_file()]
    return pdfs

def collect_html_paths(root: Union[str, Path]) -> List[Path]:
    """
    Recursively walk *root* and return a list of all PDF files.
    """
    root = Path(root)
    if not root.exists():
        raise FileNotFoundError(f"Root path {root} does not exist.")
    htmls = [p for p in root.rglob("*.html", ".htm") if p.is_file()]

    ## SMY: TODO: convert htmls to PDF. Marker will by default attempt weasyprint which typically raise 'libgobject-2' error on Win
    
    return htmls

def collect_markdown_paths(root: Union[str, Path]) -> List[Path]:
    """
    Recursively walk *root* and return a list of all Markdown files.
    """
    root = Path(root)
    md_files = [p for p in root.rglob("*.md") if p.is_file()]
    return md_files

#m __future__ import annotations
def write_markdown(
    src_path: Union[str, Path],
    output_dir: Union[str, Path],
    rendered: Any,
) -> Path:
    
    """
    Write the Markdown representation of a source file to an output directory.

    Parameters
    ----------
    src_path : str | Path
        Path to the original source file. Only its base name is used for naming
        the resulting Markdown file.
    output_dir : str | Path
        Directory where the Markdown file will be written. It was created if it does not
        exist with create_outputdir().
    rendered : object
        Object that provides a ``markdown`` attribute containing the text to write.

    Returns
    -------
    pathlib.Path
        The full path of the written Markdown file.

    Raises
    ------
    FileNotFoundError
        If *src_path* does not point to an existing file.
    OSError
        If writing the file fails for any reason (e.g. permission denied).
    AttributeError
        If *rendered* does not expose a ``markdown`` attribute.

    Notes
    -----
    The function is intentionally lightweight: it only handles path resolution,
    directory creation, and file I/O. All rendering logic should be performed before
    calling this helper.
    """
    src = Path(src_path)
    if not src.is_file():
        raise FileNotFoundError(f"Source file does not exist: {src}")

    #out_dir = Path(output_dir)
    #out_dir.mkdir(parents=True, exist_ok=True)

    md_name = f"{src.stem}.md"
    if isinstance(output_dir, Path):
        md_path = output_dir / f"{src.stem}" / md_name
    else:
        #md_path = Path(src.parent) / f"{Path(output_dir).stem}" / f"{src.stem}" / md_name
        
        ## Opt to putting output within same output_md folder rather than individual source folders
        #md_path = Path("data\\pdf") / "output_md" / f"{src.stem}" / md_name  ##debug
        md_path = Path("data") / output_dir / f"{src.stem}" / md_name  ##debug
    ##SMY: [resolved] Permission Errno13 - https://stackoverflow.com/a/57454275
    md_path.parent.mkdir(mode=0o2644, parents=True, exist_ok=True)  ##SMY: create nested md_path if not exists
    md_path.parent.chmod(0)

    try:
        markdown_text = getattr(rendered, "markdown")  ##SMY: get extracted markdown
    except AttributeError as exc:  # pragma: no cover
        raise AttributeError(
            "Extractor Rendered object must have a 'markdown' attribute"
        ) from exc

    with md_path.open(mode="w", encoding="utf-8") as md_f:
        md_f.write(markdown_text)    ##SMY: write markdown content to markdown file

    return md_path               ##SMY: return the markdown file  #✓ 
    #return {"files": md_path}   ##SMY: return dict of file with markdown filename.

# Dummp Markdown extracted images
def dump_images(
    src_path: Union[str, Path],
    output_dir: Union[str, Path],
    rendered: Any,
) -> int:
    
    """
    Dump the images  of the Markdown representation of a source file to an output directory.

    Parameters
    ----------
    src_path : str | Path
        Path to the original source file. Only its base name is used for naming
        the resulting Markdown file.
    output_dir : str | Path
        Directory where the Markdown file will be written. It was created if it does not
        exist with create_outputdir().
    rendered : object
        Object that provides a ``markdown`` attribute containing the text to write.

    Returns
    -------
    Number of images dumped from the  Markdown file.
    """

    try:
        images: Image.Image = getattr(rendered, "images")
    except TypeError as exc:  # pragma: no cover
        raise AttributeError(
            "Extracted images from rendered.images must be a mapping of str -> PIL.Image"
        ) from exc
    
    # Initialise variables
    images_count = 0
    img_path_list = []
    ##SMY: See marker.output.save_output()  : https://github.com/datalab-to/marker/blob/master/marker/output.py
    #for img_name, img_bytes in images.items():

    src = Path(src_path)  ##SMY: keep uniform with write_markdown. No need is exists anymore
    for img_name, img in images.items():
        # Resolve the full path and make sure any sub‑directories exist.
        #img_path = Path(output_dir) / src_path / img_name    ##SMY: image files  ##concatenate Path + str
        #img_path = create_outputdir(src_path) / img_name
        
        if isinstance(output_dir, Path):
            img_path = output_dir.stem / img_name            
        else:
            # #img_path = Path(output_dir) / f"{src.stem}" / img_name   ##SMY: create markdown file ##SMY concatenating Path with str
            # #img_path = Path(output_dir) / img_name   ##SMY: create markdown file ##SMY concatenating Path with str
            #img_path = Path(src.parent) / f"{Path(output_dir).stem}" / f"{src.stem}" / img_name
            
            #img_path = Path("data\\pdf") / "output_md" / f"{src.stem}" / img_name  ##debug
            img_path = Path("data") / output_dir / f"{src.stem}" / img_name  ##debug
        #img_path.mkdir(mode=0o777, parents=True, exist_ok=True)  ##SMY: create nested img_path if not exists
        #img_path.parent.mkdir(parents=True, exist_ok=True)

        img.save(img_path)    ##SMY: save images (of type PIL.Image.Image) to markdown folder
        images_count += 1
        #img_path_list = img_path_list.append(img_path)
        img_path_list.append(img_path)

    return images_count, img_path_list        ##SMY: return number of images and path
    #return images.items().count
    #return len(images)

# Dummp Markdown extracted images  ##SMY: Marked for deprecated
'''
def dump_images(
    src_path: Union[str, Path],
    output_dir: Union[str, Path],
    rendered: Any,
) -> int:
    
    """
    Dump the images  of the Markdown representation of a source file to an output directory.

    Parameters
    ----------
    src_path : str | Path
        Path to the original source file. Only its base name is used for naming
        the resulting Markdown file.
    output_dir : str | Path
        Directory where the Markdown file will be written. It was created if it does not
        exist with create_outputdir().
    rendered : object
        Object that provides a ``markdown`` attribute containing the text to write.

    Returns
    -------
    Number of images dumped from the  Markdown file.
    """

    try:
        images: Mapping[str, bytes] = getattr(rendered, "images")
    except TypeError as exc:  # pragma: no cover
        raise AttributeError(
            "Extracted images from rendered.images must be a mapping of str -> bytes"
        ) from exc

    images_count = 0
    ##SMY: See marker.output.save_output()  : https://github.com/datalab-to/marker/blob/master/marker/output.py
    #for img_name, img_bytes in images.items():
    for img_name, img in images.items():
        # Resolve the full path and make sure any sub‑directories exist.
        img_path = Path(output_dir) / src_path / img_name    ##SMY: image files  ##concatenate Path + str
        img_path.parent.mkdir(parents=True, exist_ok=True)

        #'' '
        #with img_path.open("wb") as fp:
        #    fp.write(img_bytes)    ##SMY: write images to markdown folder
        #images_count += 1
        #'' '
        img.save(img_path)    ##SMY: save images (of type PIL.Image.Image) to markdown folder
        images_count += 1

    return images_count        ##SMY: return number of images
    #return images.items().count
    #return len(images)
'''