""" Gradio frontend for the text processing pipeline. Provides drag-and-drop file upload, URL fetching, search across Project Gutenberg / MIT Classics / Internet Archive, and corpus management with HuggingFace push. Usage: python app.py # Launch on http://localhost:7860 python app.py --share # Launch with public Gradio link """ import argparse import logging import os import shutil import sys import tempfile from pathlib import Path # Ensure the script directory is on the path for imports SCRIPT_DIR = Path(__file__).resolve().parent sys.path.insert(0, str(SCRIPT_DIR)) from pipeline import Pipeline logger = logging.getLogger("app") # --------------------------------------------------------------------------- # Pipeline singleton # --------------------------------------------------------------------------- _pipeline: Pipeline | None = None def get_pipeline() -> Pipeline: global _pipeline if _pipeline is None: _pipeline = Pipeline() return _pipeline # --------------------------------------------------------------------------- # Tab 1: Add Texts # --------------------------------------------------------------------------- def process_uploaded_files(files) -> str: """Process uploaded files through the pipeline.""" if not files: return "No files uploaded." pipeline = get_pipeline() results = [] for file_obj in files: src = Path(file_obj.name) dest = pipeline.inbox / src.name # Copy to inbox shutil.copy2(str(src), str(dest)) results.append(f"Copied {src.name} to inbox/") # Process inbox new_chunks = pipeline.process_inbox() # Rebuild output train_n, val_n = pipeline.rebuild_output() results.append(f"\nProcessed: {new_chunks} new chunks") results.append(f"Total corpus: {train_n} train / {val_n} val") return "\n".join(results) def fetch_url(url: str) -> str: """Download text from a URL and process it.""" if not url.strip(): return "Please enter a URL." import requests pipeline = get_pipeline() url = url.strip() try: resp = requests.get(url, timeout=30, headers={ "User-Agent": "PhilosophyCorpus-Pipeline/1.0", }) resp.raise_for_status() # Determine filename from URL fname = url.split("/")[-1] if not fname.endswith(".txt"): fname = fname.replace(".", "_") + ".txt" # Save to inbox dest = pipeline.inbox / fname dest.write_text(resp.text, encoding="utf-8") # Process new_chunks = pipeline.process_inbox() train_n, val_n = pipeline.rebuild_output() return ( f"Downloaded: {fname} ({len(resp.text):,} chars)\n" f"Processed: {new_chunks} new chunks\n" f"Total corpus: {train_n} train / {val_n} val" ) except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Tab 2: Internet Archive Search # --------------------------------------------------------------------------- def search_archive(query: str, subject: str) -> list[list]: """Search Internet Archive and return results as table rows.""" if not query.strip(): return [] from sources.ia_search import search_ia subject_key = subject.lower() if subject != "All" else None results = search_ia(query, subject=subject_key, rows=20) rows = [] for r in results: creator = r["creator"] if isinstance(creator, list): creator = ", ".join(creator) rows.append([ r["identifier"], r["title"], creator, str(r["date"])[:10] if r["date"] else "", str(r["downloads"]), ]) return rows def add_ia_text(identifier: str) -> str: """Download an IA text and process it through the pipeline.""" if not identifier.strip(): return "Please enter an Internet Archive identifier." from sources.ia_search import get_ia_text pipeline = get_pipeline() try: text = get_ia_text(identifier.strip()) fname = f"ia_{identifier.strip()}.txt" dest = pipeline.inbox / fname dest.write_text(text, encoding="utf-8") new_chunks = pipeline.process_inbox() train_n, val_n = pipeline.rebuild_output() return ( f"Downloaded: {identifier} ({len(text):,} chars)\n" f"Processed: {new_chunks} new chunks\n" f"Total corpus: {train_n} train / {val_n} val" ) except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Tab 3: Search Project Gutenberg # --------------------------------------------------------------------------- def search_gutenberg_ui(query: str, topic: str) -> list[list]: """Search Gutenberg via Gutendex and return results as table rows.""" if not query.strip(): return [] from sources.gutenberg_search import search_gutenberg topic_key = topic.lower() if topic != "All" else None results = search_gutenberg(query, topic=topic_key, rows=20) rows = [] for r in results: rows.append([ str(r["id"]), r["title"], r["author"], r["subjects"][:60], str(r["download_count"]), ]) return rows def add_gutenberg_text(book_id: str) -> str: """Download a Gutenberg text and process it through the pipeline.""" if not book_id.strip(): return "Please enter a Gutenberg book ID." from sources.gutenberg_search import get_gutenberg_text pipeline = get_pipeline() try: bid = int(book_id.strip()) text = get_gutenberg_text(bid) fname = f"gutenberg_{bid}.txt" dest = pipeline.inbox / fname dest.write_text(text, encoding="utf-8") new_chunks = pipeline.process_inbox() train_n, val_n = pipeline.rebuild_output() return ( f"Downloaded: Gutenberg #{bid} ({len(text):,} chars)\n" f"Processed: {new_chunks} new chunks\n" f"Total corpus: {train_n} train / {val_n} val" ) except ValueError as e: return f"Error: Invalid book ID '{book_id}' — enter a number (e.g. 1497)" except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Tab 4: Browse MIT Classics # --------------------------------------------------------------------------- def search_mit_ui(query: str, author: str) -> list[list]: """Search MIT Classics catalog and return results as table rows.""" from sources.mit_classics_search import search_mit_classics author_key = author if author != "All" else "" results = search_mit_classics(query=query.strip(), author=author_key) rows = [] for r in results: rows.append([ r["author"], r["title"], r["work_path"], ]) return rows def get_mit_authors_list() -> list[str]: """Get author list for the dropdown (lazy-loaded).""" try: from sources.mit_classics_search import get_authors return ["All"] + get_authors() except Exception: return ["All"] def add_mit_text(work_path: str) -> str: """Download an MIT Classics text and process it through the pipeline.""" if not work_path.strip(): return "Please enter a work path (e.g. /Plato/republic.html)." from sources.mit_classics_search import get_mit_text pipeline = get_pipeline() try: text = get_mit_text(work_path.strip()) # Build filename from path: /Aristotle/rhetoric.html -> mit_aristotle_rhetoric.txt parts = work_path.strip("/").replace(".html", "").split("/") fname = "mit_" + "_".join(parts).lower() + ".txt" dest = pipeline.inbox / fname dest.write_text(text, encoding="utf-8") new_chunks = pipeline.process_inbox() train_n, val_n = pipeline.rebuild_output() return ( f"Downloaded: {work_path} ({len(text):,} chars)\n" f"Processed: {new_chunks} new chunks\n" f"Total corpus: {train_n} train / {val_n} val" ) except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Tab 5: Corpus Management # --------------------------------------------------------------------------- def get_corpus_stats() -> str: """Get current corpus statistics.""" pipeline = get_pipeline() parsed_files = sorted(pipeline.parsed.glob("*.txt")) if not parsed_files: return "No parsed files yet. Add texts to get started." lines_out = ["File Chunks Chars", "-" * 60] total_chunks = 0 total_chars = 0 for pf in parsed_files: file_lines = [l for l in pf.read_text(encoding="utf-8").splitlines() if l.strip()] chars = sum(len(l) for l in file_lines) total_chunks += len(file_lines) total_chars += chars lines_out.append(f"{pf.name:<40} {len(file_lines):>8} {chars:>10}") lines_out.append("-" * 60) lines_out.append(f"{'TOTAL':<40} {total_chunks:>8} {total_chars:>10}") if total_chunks > 0: avg = total_chars / total_chunks lines_out.append(f"\nAverage chunk length: {avg:.0f} chars") # Output split info train_path = pipeline.output / "train.txt" val_path = pipeline.output / "val.txt" if train_path.exists() and val_path.exists(): train_n = len([l for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()]) val_n = len([l for l in val_path.read_text(encoding="utf-8").splitlines() if l.strip()]) lines_out.append(f"\nOutput split: {train_n} train / {val_n} val") # Vocabulary check text = train_path.read_text(encoding="utf-8") vocab = sorted(set(text) - {"\n"}) lines_out.append(f"Vocabulary: {len(vocab)} chars -> {''.join(vocab)}") return "\n".join(lines_out) def get_sample_chunks() -> str: """Get sample chunks from the training data.""" pipeline = get_pipeline() train_path = pipeline.output / "train.txt" if not train_path.exists(): return "No training data yet. Process some texts first." lines = [l.strip() for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()] if not lines: return "Training file is empty." import random samples = random.sample(lines, min(10, len(lines))) return "\n\n---\n\n".join(f"[{i+1}] {s}" for i, s in enumerate(samples)) def rebuild_dataset() -> str: """Rebuild train/val split from existing parsed chunks.""" pipeline = get_pipeline() train_n, val_n = pipeline.rebuild_output() return f"Rebuilt: {train_n} train / {val_n} val chunks" def push_to_hf(repo_id: str) -> str: """Push dataset to HuggingFace Hub.""" if not repo_id.strip(): return "Please enter a HuggingFace repo ID (e.g. username/philosophy-corpus)." pipeline = get_pipeline() try: url = pipeline.push_to_hub(repo_id=repo_id.strip()) return f"Dataset pushed successfully!\n{url}" except Exception as e: return f"Error: {e}" # --------------------------------------------------------------------------- # Gradio UI # --------------------------------------------------------------------------- def build_ui(): import gradio as gr with gr.Blocks(title="Philosophy Corpus Pipeline", theme=gr.themes.Soft()) as app: gr.Markdown("# Philosophy Corpus Pipeline\nBuild training data for JuliaGPT") with gr.Tab("Add Texts"): gr.Markdown("### Upload Files") file_upload = gr.File( label="Drag and drop .txt, .epub, or .zip files", file_count="multiple", file_types=[".txt", ".epub", ".zip"], ) upload_btn = gr.Button("Process Uploaded Files", variant="primary") upload_output = gr.Textbox(label="Result", lines=6) upload_btn.click(process_uploaded_files, inputs=[file_upload], outputs=[upload_output]) gr.Markdown("### Fetch from URL") url_input = gr.Textbox( label="Text URL (Gutenberg, MIT Classics, Internet Archive, or any .txt URL)", placeholder="https://www.gutenberg.org/cache/epub/21076/pg21076.txt", ) fetch_btn = gr.Button("Fetch and Process") fetch_output = gr.Textbox(label="Result", lines=4) fetch_btn.click(fetch_url, inputs=[url_input], outputs=[fetch_output]) with gr.Tab("Search Gutenberg"): gr.Markdown("### Search Project Gutenberg for public domain texts") with gr.Row(): gut_query = gr.Textbox(label="Search Query", placeholder="aristotle philosophy") gut_topic = gr.Dropdown( choices=["All", "Philosophy", "Ethics", "Politics", "Metaphysics", "Science", "Mathematics", "Classical", "Religion", "History"], value="Philosophy", label="Topic Filter", ) gut_search_btn = gr.Button("Search", variant="primary") gut_results = gr.Dataframe( headers=["ID", "Title", "Author", "Subjects", "Downloads"], label="Search Results", interactive=False, ) gut_search_btn.click( search_gutenberg_ui, inputs=[gut_query, gut_topic], outputs=[gut_results], ) gr.Markdown("### Add a text to the corpus") gut_id_input = gr.Textbox( label="Gutenberg Book ID", placeholder="Paste a book ID from the search results above (e.g. 1497)", ) gut_add_btn = gr.Button("Download and Process") gut_add_output = gr.Textbox(label="Result", lines=4) gut_add_btn.click(add_gutenberg_text, inputs=[gut_id_input], outputs=[gut_add_output]) with gr.Tab("Browse MIT Classics"): gr.Markdown("### Search the MIT Internet Classics Archive (441 works by 59 authors)") with gr.Row(): mit_query = gr.Textbox(label="Search Query", placeholder="republic") mit_author = gr.Dropdown( choices=get_mit_authors_list(), value="All", label="Author Filter", ) mit_search_btn = gr.Button("Search", variant="primary") mit_results = gr.Dataframe( headers=["Author", "Title", "Work Path"], label="Search Results", interactive=False, ) mit_search_btn.click( search_mit_ui, inputs=[mit_query, mit_author], outputs=[mit_results], ) gr.Markdown("### Add a text to the corpus") mit_path_input = gr.Textbox( label="Work Path", placeholder="Paste a work path from the results above (e.g. /Plato/republic.html)", ) mit_add_btn = gr.Button("Download and Process") mit_add_output = gr.Textbox(label="Result", lines=4) mit_add_btn.click(add_mit_text, inputs=[mit_path_input], outputs=[mit_add_output]) with gr.Tab("Search Internet Archive"): gr.Markdown("### Search the Internet Archive for classical texts") with gr.Row(): search_input = gr.Textbox(label="Search Query", placeholder="aristotle philosophy") subject_dropdown = gr.Dropdown( choices=["All", "Philosophy", "Mathematics", "Rhetoric", "Logic", "Ethics", "Metaphysics", "Politics", "Classical"], value="Philosophy", label="Subject Filter", ) search_btn = gr.Button("Search", variant="primary") search_results = gr.Dataframe( headers=["Identifier", "Title", "Author", "Date", "Downloads"], label="Search Results", interactive=False, ) search_btn.click( search_archive, inputs=[search_input, subject_dropdown], outputs=[search_results], ) gr.Markdown("### Add a text to the corpus") ia_id_input = gr.Textbox( label="Internet Archive Identifier", placeholder="Paste an identifier from the search results above", ) add_btn = gr.Button("Download and Process") add_output = gr.Textbox(label="Result", lines=4) add_btn.click(add_ia_text, inputs=[ia_id_input], outputs=[add_output]) with gr.Tab("Corpus"): gr.Markdown("### Corpus Statistics") stats_output = gr.Textbox(label="Statistics", lines=15, value=get_corpus_stats) refresh_btn = gr.Button("Refresh Stats") refresh_btn.click(get_corpus_stats, outputs=[stats_output]) gr.Markdown("### Sample Chunks") sample_output = gr.Textbox(label="Random samples from training data", lines=15) sample_btn = gr.Button("Show Samples") sample_btn.click(get_sample_chunks, outputs=[sample_output]) gr.Markdown("### Actions") with gr.Row(): rebuild_btn = gr.Button("Rebuild Dataset") rebuild_output = gr.Textbox(label="Result", lines=2) rebuild_btn.click(rebuild_dataset, outputs=[rebuild_output]) with gr.Row(): hf_repo_input = gr.Textbox( label="HuggingFace Repo ID", placeholder="username/philosophy-corpus", ) push_btn = gr.Button("Push to HuggingFace", variant="primary") push_output = gr.Textbox(label="Result", lines=2) push_btn.click(push_to_hf, inputs=[hf_repo_input], outputs=[push_output]) return app # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Philosophy Corpus Pipeline UI") parser.add_argument("--share", action="store_true", help="Create a public Gradio link") parser.add_argument("--port", type=int, default=7860, help="Port to run on") args = parser.parse_args() app = build_ui() app.queue() app.launch(share=args.share, server_name="0.0.0.0", server_port=args.port) if __name__ == "__main__": main()