parserPDF / converters /extraction_converter.py
semmyk's picture
baseline08_beta0.4.0_06Oct25: Refactored. now runs without ProcessPoolExecutor. Marker inherently handles ThreadPoolExecutor and ProcessPoolExecutor. Gradio ui separated from Gradio process logics
c6fb648
import os
from pathlib import Path
import traceback
#import time
from typing import Dict, Any, Type, Optional, Union, Literal #, BaseModel
from pydantic import BaseModel
from marker.models import create_model_dict
#from marker.converters.extraction import ExtractionConverter as MarkerExtractor ## structured pydantic extraction
from marker.converters.pdf import PdfConverter as MarkerConverter ## full document convertion/extraction
from marker.config.parser import ConfigParser ## Process custom configuration
from marker.services.openai import OpenAIService as MarkerOpenAIService
from marker.settings import settings
#from sympy import Union
from utils.logger import get_logger
logger = get_logger(__name__)
# create/load models. Called to curtail reloading models at each instance
def load_models():
""" Creates Marker's models dict. Initiate download of models """
return create_model_dict()
# Full document converter
class DocumentConverter:
"""
Business logic wrapper using Marker OpenAI LLM Services to
convert documents (PDF, HTML files) into markdowns + assets.
"""
def __init__(self,
#provider: str,
model_id: str,
#base_url: str,
hf_provider: str,
#endpoint_url: str,
#backend_choice: str,
#system_message: str,
#max_tokens: int,
temperature: float,
top_p: float,
#stream: bool,
api_token: str,
openai_base_url: str = "https://router.huggingface.co/v1",
openai_image_format: Optional[str] = "webp",
max_workers: Optional[str] = 1, #4, for config_dict["pdftext_workers"]
max_retries: Optional[int] = 2,
debug: Optional[bool] = None, #bool = False,
#output_format: str = "markdown",
output_format: Literal["markdown", "json", "html"] = "markdown",
output_dir: Optional[Union[str, Path]] = "output_dir",
use_llm: Optional[bool] = None, #bool = False, #Optional[bool] = False, #True,
force_ocr: Optional[bool] = None, #bool = False,
strip_existing_ocr: Optional[bool] = None, #bool = False,
disable_ocr_math: Optional[bool] = None, #bool = False,
page_range: Optional[str] = None, #str = None #Optional[str] = None,
):
#self.converter = None #MarkerConverter
self.model_id = model_id #"model_name"
self.openai_api_key = api_token ## to replace dependency on self.client.openai_api_key
self.openai_base_url = openai_base_url #, #self.base_url,
self.temperature = temperature #, self.client.temperature,
self.top_p = top_p # self.client.top_p,
self.llm_service = MarkerOpenAIService
self.openai_image_format = openai_image_format #"png" #better compatibility
self.max_workers = max_workers #int(1) ## pass to config_dict["pdftext_workers"]
self.max_retries = max_retries ## pass to __call__
self.debug = debug
#self.output_format = output_format
self.output_format = output_format
self.output_dir = settings.DEBUG_DATA_FOLDER if debug else output_dir,
self.use_llm = use_llm if use_llm else False #use_llm[0] if isinstance(use_llm, tuple) else use_llm, #False, #True,
self.force_ocr = force_ocr if force_ocr else False
self.strip_existing_ocr = strip_existing_ocr #if strip_existing_ocr else False
self.disable_ocr_math = disable_ocr_math #if disable_ocr else False
#self.page_range = page_range[0] if isinstance(page_range, tuple) else page_range ##SMY: iterating twice because self.page casting as hint type tuple!
self.page_range = page_range if page_range else None
# self.page_range = page_range[0] if isinstance(page_range, tuple) else page_range if isinstance(page_range, str) else None, ##Example: "0,4-8,16" ##Marker parses as List[int] #]debug #len(pdf_file)
self.converter = None
# 0) Instantiate the LLM Client (OPENAIChatClient): Get a provider‐agnostic chat function
##SMY: #future. Plan to integrate into Marker: uses its own LLM services (clients). As at 1.9.2, there's no huggingface client service.
'''
try:
self.client = OpenAIChatClient(
model_id=model_id,
hf_provider=hf_provider,
#base_url=base_url,
api_token=api_token,
temperature=temperature,
top_p=top_p,
)
logger.log(level=20, msg="βœ”οΈ OpenAIChatClient instantiated:", extra={"model_id": self.client.model_id, "chatclient": str(self.client)})
except Exception as exc:
tb = traceback.format_exc() #exc.__traceback__
logger.exception(f"βœ— Error initialising OpenAIChatClient: {exc}\n{tb}")
raise RuntimeError(f"βœ— Error initialising OpenAIChatClient: {exc}\n{tb}") #.with_traceback(tb)
'''
# 1) # Define the custom configuration for the Hugging Face LLM.
# Use typing.Dict and typing.Any for flexible dictionary type hints
try:
#self.config_dict: Dict[str, Any] = self.get_config_dict(model_id=model_id, llm_service=str(self.llm_service), output_format=output_format)
self.config_dict: Dict[str, Any] = self.get_config_dict()
##SMY: execute if page_range is none. `else None` ensures valid syntactic expression
##SMY: if falsely empty tuple () or None, pop the "page_range" key-value pair, else do nothing if truthy tuple value (i.e. keep as-is)
self.config_dict.pop("page_range", None) if not self.config_dict.get("page_range") else None
# use_llm test moved to config_dict
#self.config_dict.pop("use_llm", None) if not self.config_dict.get("use_llm") or self.config_dict.get("use_llm") is False or self.config_dict.get("use_llm") == 'False' else None
self.config_dict.pop("force_ocr", None) if not self.config_dict.get("force_ocr") or self.config_dict.get("force_ocr") is False or self.config_dict.get("force_ocr") == 'False' else None
logger.log(level=20, msg="βœ”οΈ config_dict custom configured:", extra={"service": "openai"}) #, "config": str(self.config_dict)})
except Exception as exc:
tb = traceback.format_exc() #exc.__traceback__
logger.exception(f"βœ— Error configuring custom config_dict: {exc}\n{tb}")
raise RuntimeError(f"βœ— Error configuring custom config_dict: {exc}\n{tb}") #.with_traceback(tb)
# 2) Use the Marker's ConfigParser to process configuration.
# The `ConfigParser` class is explicitly imported and used as the type hint.
try:
config_parser: ConfigParser = ConfigParser(self.config_dict)
logger.log(level=20, msg="βœ”οΈ parsed/processed custom config_dict:", extra={"config": str(config_parser)}) #.config_dict)})
except Exception as exc:
tb = traceback.format_exc() #exc.__traceback__
logger.exception(f"βœ— Error parsing/processing custom config_dict: {exc}\n{tb}")
raise RuntimeError(f"βœ— Error parsing/processing custom config_dict: {exc}\n{tb}") #.with_traceback(tb)
# 3) Load models if not already loaded in reload mode
from globals import config_load_models
try:
if config_load_models.model_dict:
model_dict = config_load_models.model_dict
#elif not config_load_models.model_dict or 'model_dict' not in globals():
else:
model_dict = load_models()
'''if 'model_dict' not in globals():
#model_dict = self.load_models()
model_dict = load_models()'''
except OSError as exc_ose:
tb = traceback.format_exc() #exc.__traceback__
logger.warning(f"⚠️ OSError: the paging file is too small (to complete reload): {exc_ose}\n{tb}")
pass
except Exception as exc:
tb = traceback.format_exc() #exc.__traceback__
logger.exception(f"βœ— Error loading models (reload): {exc}\n{tb}")
raise RuntimeError(f"βœ— Error loading models (reload): {exc}\n{tb}") #.with_traceback(tb)
# 4) Instantiate Marker's MarkerConverter (PdfConverter) with config managed by config_parser
try: # Assign llm_service if api_token. ##SMY: split and slicing ##Gets the string value
#llm_service_str = None if api_token == '' or api_token is None or self.use_llm is False else str(self.llm_service).split("'")[1] #
llm_service_str = None if not self.use_llm or self.use_llm == "False" or self.use_llm is False else str(self.llm_service).split("'")[1] #
# sets api_key required by Marker ## to handle Marker's assertion test on OpenAI
if llm_service_str:
os.environ["OPENAI_API_KEY"] = api_token if api_token and api_token != '' else os.getenv("OPENAI_API_KEY") or os.getenv("GEMINI_API_KEY") or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN")
#logger.log(level=20, msg="self.converter: instantiating MarkerConverter:", extra={"llm_service_str": llm_service_str, "api_token": api_token}) ##debug
config_dict = config_parser.generate_config_dict()
#config_dict["pdftext_worker"] = self.max_workers #1 ##SMY: moved to get_config_dicts()
#self.converter: marker.converters.pdf.PdfConverter
self.converter = MarkerConverter(
#artifact_dict=create_model_dict(),
artifact_dict=model_dict if model_dict else create_model_dict(),
config=config_dict,
#config=config_parser.generate_config_dict(),
#llm_service=self.llm_service ##SMY expecting str but self.llm_service, is service object marker.services of type BaseServices
llm_service=llm_service_str, ##resolve
processor_list=config_parser.get_processors(),
renderer=config_parser.get_renderer(),
)
logger.log(level=20, msg="βœ”οΈ MarkerConverter instantiated successfully:", extra={"converter.config": str(self.converter.config.get("openai_base_url")), "use_llm":self.converter.use_llm})
#return self.converter ##SMY: to query why did I comment out?. Bingo: "__init__() should return None, not 'PdfConverter'"
except Exception as exc:
tb = traceback.format_exc
logger.exception(f"βœ— Error initialising MarkerExtractor: {exc}\n{tb}")
raise RuntimeError(f"βœ— Error initialising MarkerExtractor: {exc}\n{tb}")
# Define the custom configuration for HF LLM.
#def get_config_dict(self, model_id: str, llm_service=MarkerOpenAIService, output_format: Optional[str] = "markdown" ) -> Dict[str, Any]:
def get_config_dict(self, ) -> Dict[str, Any]:
""" Define the custom configuration for the Hugging Face LLM: combining Markers cli_options and LLM. """
try:
## LLM Enable higher quality processing. ## See MarkerOpenAIService,
##llm_service = llm_service.removeprefix("<class '").removesuffix("'>") # e.g <class 'marker.services.openai.OpenAIService'>
#llm_service = str(llm_service).split("'")[1] ## SMY: split and slicing
self.use_llm = self.use_llm[0] if isinstance(self.use_llm, tuple) else self.use_llm
self.page_range = self.page_range[0] if isinstance(self.page_range, tuple) else self.page_range #if isinstance(self.page_range, str) else None, ##SMY: passing as hint type tuple!
##SMY: TODO: convert to {inputs} and called from gradio_ui
if not self.use_llm or self.use_llm == 'False':
config_dict = {
"output_format" : self.output_format, #"markdown",
#"openai_model" : self.model_id, #self.client.model_id, #"model_name"
#"openai_api_key" : self.openai_api_key, #self.client.openai_api_key, #self.api_token,
#"openai_base_url": self.openai_base_url, #self.client.base_url, #self.base_url,
#"temperature" : self.temperature, #self.client.temperature,
#"top_p" : self.top_p, #self.client.top_p,
#"openai_image_format": self.openai_image_format, #"webp", #"png" #better compatibility
"pdftext_workers": self.max_workers, ## number of workers to use for pdftext."
#"max_retries" : self.max_retries, #3, ## pass to __call__
"debug" : self.debug,
"output_dir" : self.output_dir,
#"use_llm" : self.use_llm, #False, #True,
"force_ocr" : self.force_ocr, #False,
"strip_existing_ocr": self.strip_existing_ocr, #False
"disable_ocr_math": self.disable_ocr_math,
"page_range" : self.page_range, ##debug #len(pdf_file)
}
else:
config_dict = {
"output_format" : self.output_format, #"markdown",
"openai_model" : self.model_id, #self.client.model_id, #"model_name"
"openai_api_key" : self.openai_api_key, #self.client.openai_api_key, #self.api_token,
"openai_base_url": self.openai_base_url, #self.client.base_url, #self.base_url,
"temperature" : self.temperature, #self.client.temperature,
"top_p" : self.top_p, #self.client.top_p,
"openai_image_format": self.openai_image_format, #"webp", #"png" #better compatibility
"pdftext_workers": self.max_workers, ## number of workers to use for pdftext."
#"max_retries" : self.max_retries, #3, ## pass to __call__
"debug" : self.debug,
"output_dir" : self.output_dir,
"use_llm" : self.use_llm, #False, #True,
"force_ocr" : self.force_ocr, #False,
"strip_existing_ocr": self.strip_existing_ocr, #False
"disable_ocr_math": self.disable_ocr_math,
"page_range" : self.page_range, ##debug #len(pdf_file)
}
return config_dict
except Exception as exc:
tb = traceback.format_exc() #exc.__traceback__
logger.exception(f"βœ— Error configuring custom config_dict: {exc}\n{tb}")
raise RuntimeError(f"βœ— Error configuring custom config_dict: {exc}\n{tb}") #").with_traceback(tb)
#raise