| | """ |
| | Gradio frontend for the text processing pipeline. |
| | |
| | Provides drag-and-drop file upload, URL fetching, search across |
| | Project Gutenberg / MIT Classics / Internet Archive, and corpus |
| | management with HuggingFace push. |
| | |
| | Usage: |
| | python app.py # Launch on http://localhost:7860 |
| | python app.py --share # Launch with public Gradio link |
| | """ |
| |
|
| | import argparse |
| | import logging |
| | import os |
| | import shutil |
| | import sys |
| | import tempfile |
| | from pathlib import Path |
| |
|
| | |
| | SCRIPT_DIR = Path(__file__).resolve().parent |
| | sys.path.insert(0, str(SCRIPT_DIR)) |
| |
|
| | from pipeline import Pipeline |
| |
|
| | logger = logging.getLogger("app") |
| |
|
| | |
| | |
| | |
| |
|
| | _pipeline: Pipeline | None = None |
| |
|
| |
|
| | def get_pipeline() -> Pipeline: |
| | global _pipeline |
| | if _pipeline is None: |
| | _pipeline = Pipeline() |
| | return _pipeline |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def process_uploaded_files(files) -> str: |
| | """Process uploaded files through the pipeline.""" |
| | if not files: |
| | return "No files uploaded." |
| |
|
| | pipeline = get_pipeline() |
| | results = [] |
| |
|
| | for file_obj in files: |
| | src = Path(file_obj.name) |
| | dest = pipeline.inbox / src.name |
| |
|
| | |
| | shutil.copy2(str(src), str(dest)) |
| | results.append(f"Copied {src.name} to inbox/") |
| |
|
| | |
| | new_chunks = pipeline.process_inbox() |
| |
|
| | |
| | train_n, val_n = pipeline.rebuild_output() |
| |
|
| | results.append(f"\nProcessed: {new_chunks} new chunks") |
| | results.append(f"Total corpus: {train_n} train / {val_n} val") |
| |
|
| | return "\n".join(results) |
| |
|
| |
|
| | def fetch_url(url: str) -> str: |
| | """Download text from a URL and process it.""" |
| | if not url.strip(): |
| | return "Please enter a URL." |
| |
|
| | import requests |
| |
|
| | pipeline = get_pipeline() |
| | url = url.strip() |
| |
|
| | try: |
| | resp = requests.get(url, timeout=30, headers={ |
| | "User-Agent": "PhilosophyCorpus-Pipeline/1.0", |
| | }) |
| | resp.raise_for_status() |
| |
|
| | |
| | fname = url.split("/")[-1] |
| | if not fname.endswith(".txt"): |
| | fname = fname.replace(".", "_") + ".txt" |
| |
|
| | |
| | dest = pipeline.inbox / fname |
| | dest.write_text(resp.text, encoding="utf-8") |
| |
|
| | |
| | new_chunks = pipeline.process_inbox() |
| | train_n, val_n = pipeline.rebuild_output() |
| |
|
| | return ( |
| | f"Downloaded: {fname} ({len(resp.text):,} chars)\n" |
| | f"Processed: {new_chunks} new chunks\n" |
| | f"Total corpus: {train_n} train / {val_n} val" |
| | ) |
| | except Exception as e: |
| | return f"Error: {e}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def search_archive(query: str, subject: str) -> list[list]: |
| | """Search Internet Archive and return results as table rows.""" |
| | if not query.strip(): |
| | return [] |
| |
|
| | from sources.ia_search import search_ia |
| |
|
| | subject_key = subject.lower() if subject != "All" else None |
| | results = search_ia(query, subject=subject_key, rows=20) |
| |
|
| | rows = [] |
| | for r in results: |
| | creator = r["creator"] |
| | if isinstance(creator, list): |
| | creator = ", ".join(creator) |
| | rows.append([ |
| | r["identifier"], |
| | r["title"], |
| | creator, |
| | str(r["date"])[:10] if r["date"] else "", |
| | str(r["downloads"]), |
| | ]) |
| |
|
| | return rows |
| |
|
| |
|
| | def add_ia_text(identifier: str) -> str: |
| | """Download an IA text and process it through the pipeline.""" |
| | if not identifier.strip(): |
| | return "Please enter an Internet Archive identifier." |
| |
|
| | from sources.ia_search import get_ia_text |
| |
|
| | pipeline = get_pipeline() |
| |
|
| | try: |
| | text = get_ia_text(identifier.strip()) |
| |
|
| | fname = f"ia_{identifier.strip()}.txt" |
| | dest = pipeline.inbox / fname |
| | dest.write_text(text, encoding="utf-8") |
| |
|
| | new_chunks = pipeline.process_inbox() |
| | train_n, val_n = pipeline.rebuild_output() |
| |
|
| | return ( |
| | f"Downloaded: {identifier} ({len(text):,} chars)\n" |
| | f"Processed: {new_chunks} new chunks\n" |
| | f"Total corpus: {train_n} train / {val_n} val" |
| | ) |
| | except Exception as e: |
| | return f"Error: {e}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def search_gutenberg_ui(query: str, topic: str) -> list[list]: |
| | """Search Gutenberg via Gutendex and return results as table rows.""" |
| | if not query.strip(): |
| | return [] |
| |
|
| | from sources.gutenberg_search import search_gutenberg |
| |
|
| | topic_key = topic.lower() if topic != "All" else None |
| | results = search_gutenberg(query, topic=topic_key, rows=20) |
| |
|
| | rows = [] |
| | for r in results: |
| | rows.append([ |
| | str(r["id"]), |
| | r["title"], |
| | r["author"], |
| | r["subjects"][:60], |
| | str(r["download_count"]), |
| | ]) |
| |
|
| | return rows |
| |
|
| |
|
| | def add_gutenberg_text(book_id: str) -> str: |
| | """Download a Gutenberg text and process it through the pipeline.""" |
| | if not book_id.strip(): |
| | return "Please enter a Gutenberg book ID." |
| |
|
| | from sources.gutenberg_search import get_gutenberg_text |
| |
|
| | pipeline = get_pipeline() |
| |
|
| | try: |
| | bid = int(book_id.strip()) |
| | text = get_gutenberg_text(bid) |
| |
|
| | fname = f"gutenberg_{bid}.txt" |
| | dest = pipeline.inbox / fname |
| | dest.write_text(text, encoding="utf-8") |
| |
|
| | new_chunks = pipeline.process_inbox() |
| | train_n, val_n = pipeline.rebuild_output() |
| |
|
| | return ( |
| | f"Downloaded: Gutenberg #{bid} ({len(text):,} chars)\n" |
| | f"Processed: {new_chunks} new chunks\n" |
| | f"Total corpus: {train_n} train / {val_n} val" |
| | ) |
| | except ValueError as e: |
| | return f"Error: Invalid book ID '{book_id}' — enter a number (e.g. 1497)" |
| | except Exception as e: |
| | return f"Error: {e}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def search_mit_ui(query: str, author: str) -> list[list]: |
| | """Search MIT Classics catalog and return results as table rows.""" |
| | from sources.mit_classics_search import search_mit_classics |
| |
|
| | author_key = author if author != "All" else "" |
| | results = search_mit_classics(query=query.strip(), author=author_key) |
| |
|
| | rows = [] |
| | for r in results: |
| | rows.append([ |
| | r["author"], |
| | r["title"], |
| | r["work_path"], |
| | ]) |
| |
|
| | return rows |
| |
|
| |
|
| | def get_mit_authors_list() -> list[str]: |
| | """Get author list for the dropdown (lazy-loaded).""" |
| | try: |
| | from sources.mit_classics_search import get_authors |
| | return ["All"] + get_authors() |
| | except Exception: |
| | return ["All"] |
| |
|
| |
|
| | def add_mit_text(work_path: str) -> str: |
| | """Download an MIT Classics text and process it through the pipeline.""" |
| | if not work_path.strip(): |
| | return "Please enter a work path (e.g. /Plato/republic.html)." |
| |
|
| | from sources.mit_classics_search import get_mit_text |
| |
|
| | pipeline = get_pipeline() |
| |
|
| | try: |
| | text = get_mit_text(work_path.strip()) |
| |
|
| | |
| | parts = work_path.strip("/").replace(".html", "").split("/") |
| | fname = "mit_" + "_".join(parts).lower() + ".txt" |
| | dest = pipeline.inbox / fname |
| | dest.write_text(text, encoding="utf-8") |
| |
|
| | new_chunks = pipeline.process_inbox() |
| | train_n, val_n = pipeline.rebuild_output() |
| |
|
| | return ( |
| | f"Downloaded: {work_path} ({len(text):,} chars)\n" |
| | f"Processed: {new_chunks} new chunks\n" |
| | f"Total corpus: {train_n} train / {val_n} val" |
| | ) |
| | except Exception as e: |
| | return f"Error: {e}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def get_corpus_stats() -> str: |
| | """Get current corpus statistics.""" |
| | pipeline = get_pipeline() |
| | parsed_files = sorted(pipeline.parsed.glob("*.txt")) |
| |
|
| | if not parsed_files: |
| | return "No parsed files yet. Add texts to get started." |
| |
|
| | lines_out = ["File Chunks Chars", "-" * 60] |
| | total_chunks = 0 |
| | total_chars = 0 |
| |
|
| | for pf in parsed_files: |
| | file_lines = [l for l in pf.read_text(encoding="utf-8").splitlines() if l.strip()] |
| | chars = sum(len(l) for l in file_lines) |
| | total_chunks += len(file_lines) |
| | total_chars += chars |
| | lines_out.append(f"{pf.name:<40} {len(file_lines):>8} {chars:>10}") |
| |
|
| | lines_out.append("-" * 60) |
| | lines_out.append(f"{'TOTAL':<40} {total_chunks:>8} {total_chars:>10}") |
| |
|
| | if total_chunks > 0: |
| | avg = total_chars / total_chunks |
| | lines_out.append(f"\nAverage chunk length: {avg:.0f} chars") |
| |
|
| | |
| | train_path = pipeline.output / "train.txt" |
| | val_path = pipeline.output / "val.txt" |
| | if train_path.exists() and val_path.exists(): |
| | train_n = len([l for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()]) |
| | val_n = len([l for l in val_path.read_text(encoding="utf-8").splitlines() if l.strip()]) |
| | lines_out.append(f"\nOutput split: {train_n} train / {val_n} val") |
| |
|
| | |
| | text = train_path.read_text(encoding="utf-8") |
| | vocab = sorted(set(text) - {"\n"}) |
| | lines_out.append(f"Vocabulary: {len(vocab)} chars -> {''.join(vocab)}") |
| |
|
| | return "\n".join(lines_out) |
| |
|
| |
|
| | def get_sample_chunks() -> str: |
| | """Get sample chunks from the training data.""" |
| | pipeline = get_pipeline() |
| | train_path = pipeline.output / "train.txt" |
| |
|
| | if not train_path.exists(): |
| | return "No training data yet. Process some texts first." |
| |
|
| | lines = [l.strip() for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()] |
| |
|
| | if not lines: |
| | return "Training file is empty." |
| |
|
| | import random |
| | samples = random.sample(lines, min(10, len(lines))) |
| | return "\n\n---\n\n".join(f"[{i+1}] {s}" for i, s in enumerate(samples)) |
| |
|
| |
|
| | def rebuild_dataset() -> str: |
| | """Rebuild train/val split from existing parsed chunks.""" |
| | pipeline = get_pipeline() |
| | train_n, val_n = pipeline.rebuild_output() |
| | return f"Rebuilt: {train_n} train / {val_n} val chunks" |
| |
|
| |
|
| | def push_to_hf(repo_id: str) -> str: |
| | """Push dataset to HuggingFace Hub.""" |
| | if not repo_id.strip(): |
| | return "Please enter a HuggingFace repo ID (e.g. username/philosophy-corpus)." |
| |
|
| | pipeline = get_pipeline() |
| |
|
| | try: |
| | url = pipeline.push_to_hub(repo_id=repo_id.strip()) |
| | return f"Dataset pushed successfully!\n{url}" |
| | except Exception as e: |
| | return f"Error: {e}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def build_ui(): |
| | import gradio as gr |
| |
|
| | with gr.Blocks(title="Philosophy Corpus Pipeline", theme=gr.themes.Soft()) as app: |
| | gr.Markdown("# Philosophy Corpus Pipeline\nBuild training data for JuliaGPT") |
| |
|
| | with gr.Tab("Add Texts"): |
| | gr.Markdown("### Upload Files") |
| | file_upload = gr.File( |
| | label="Drag and drop .txt, .epub, or .zip files", |
| | file_count="multiple", |
| | file_types=[".txt", ".epub", ".zip"], |
| | ) |
| | upload_btn = gr.Button("Process Uploaded Files", variant="primary") |
| | upload_output = gr.Textbox(label="Result", lines=6) |
| | upload_btn.click(process_uploaded_files, inputs=[file_upload], outputs=[upload_output]) |
| |
|
| | gr.Markdown("### Fetch from URL") |
| | url_input = gr.Textbox( |
| | label="Text URL (Gutenberg, MIT Classics, Internet Archive, or any .txt URL)", |
| | placeholder="https://www.gutenberg.org/cache/epub/21076/pg21076.txt", |
| | ) |
| | fetch_btn = gr.Button("Fetch and Process") |
| | fetch_output = gr.Textbox(label="Result", lines=4) |
| | fetch_btn.click(fetch_url, inputs=[url_input], outputs=[fetch_output]) |
| |
|
| | with gr.Tab("Search Gutenberg"): |
| | gr.Markdown("### Search Project Gutenberg for public domain texts") |
| | with gr.Row(): |
| | gut_query = gr.Textbox(label="Search Query", placeholder="aristotle philosophy") |
| | gut_topic = gr.Dropdown( |
| | choices=["All", "Philosophy", "Ethics", "Politics", |
| | "Metaphysics", "Science", "Mathematics", |
| | "Classical", "Religion", "History"], |
| | value="Philosophy", |
| | label="Topic Filter", |
| | ) |
| | gut_search_btn = gr.Button("Search", variant="primary") |
| | gut_results = gr.Dataframe( |
| | headers=["ID", "Title", "Author", "Subjects", "Downloads"], |
| | label="Search Results", |
| | interactive=False, |
| | ) |
| | gut_search_btn.click( |
| | search_gutenberg_ui, |
| | inputs=[gut_query, gut_topic], |
| | outputs=[gut_results], |
| | ) |
| |
|
| | gr.Markdown("### Add a text to the corpus") |
| | gut_id_input = gr.Textbox( |
| | label="Gutenberg Book ID", |
| | placeholder="Paste a book ID from the search results above (e.g. 1497)", |
| | ) |
| | gut_add_btn = gr.Button("Download and Process") |
| | gut_add_output = gr.Textbox(label="Result", lines=4) |
| | gut_add_btn.click(add_gutenberg_text, inputs=[gut_id_input], outputs=[gut_add_output]) |
| |
|
| | with gr.Tab("Browse MIT Classics"): |
| | gr.Markdown("### Search the MIT Internet Classics Archive (441 works by 59 authors)") |
| | with gr.Row(): |
| | mit_query = gr.Textbox(label="Search Query", placeholder="republic") |
| | mit_author = gr.Dropdown( |
| | choices=get_mit_authors_list(), |
| | value="All", |
| | label="Author Filter", |
| | ) |
| | mit_search_btn = gr.Button("Search", variant="primary") |
| | mit_results = gr.Dataframe( |
| | headers=["Author", "Title", "Work Path"], |
| | label="Search Results", |
| | interactive=False, |
| | ) |
| | mit_search_btn.click( |
| | search_mit_ui, |
| | inputs=[mit_query, mit_author], |
| | outputs=[mit_results], |
| | ) |
| |
|
| | gr.Markdown("### Add a text to the corpus") |
| | mit_path_input = gr.Textbox( |
| | label="Work Path", |
| | placeholder="Paste a work path from the results above (e.g. /Plato/republic.html)", |
| | ) |
| | mit_add_btn = gr.Button("Download and Process") |
| | mit_add_output = gr.Textbox(label="Result", lines=4) |
| | mit_add_btn.click(add_mit_text, inputs=[mit_path_input], outputs=[mit_add_output]) |
| |
|
| | with gr.Tab("Search Internet Archive"): |
| | gr.Markdown("### Search the Internet Archive for classical texts") |
| | with gr.Row(): |
| | search_input = gr.Textbox(label="Search Query", placeholder="aristotle philosophy") |
| | subject_dropdown = gr.Dropdown( |
| | choices=["All", "Philosophy", "Mathematics", "Rhetoric", |
| | "Logic", "Ethics", "Metaphysics", "Politics", "Classical"], |
| | value="Philosophy", |
| | label="Subject Filter", |
| | ) |
| | search_btn = gr.Button("Search", variant="primary") |
| | search_results = gr.Dataframe( |
| | headers=["Identifier", "Title", "Author", "Date", "Downloads"], |
| | label="Search Results", |
| | interactive=False, |
| | ) |
| | search_btn.click( |
| | search_archive, |
| | inputs=[search_input, subject_dropdown], |
| | outputs=[search_results], |
| | ) |
| |
|
| | gr.Markdown("### Add a text to the corpus") |
| | ia_id_input = gr.Textbox( |
| | label="Internet Archive Identifier", |
| | placeholder="Paste an identifier from the search results above", |
| | ) |
| | add_btn = gr.Button("Download and Process") |
| | add_output = gr.Textbox(label="Result", lines=4) |
| | add_btn.click(add_ia_text, inputs=[ia_id_input], outputs=[add_output]) |
| |
|
| | with gr.Tab("Corpus"): |
| | gr.Markdown("### Corpus Statistics") |
| | stats_output = gr.Textbox(label="Statistics", lines=15, value=get_corpus_stats) |
| | refresh_btn = gr.Button("Refresh Stats") |
| | refresh_btn.click(get_corpus_stats, outputs=[stats_output]) |
| |
|
| | gr.Markdown("### Sample Chunks") |
| | sample_output = gr.Textbox(label="Random samples from training data", lines=15) |
| | sample_btn = gr.Button("Show Samples") |
| | sample_btn.click(get_sample_chunks, outputs=[sample_output]) |
| |
|
| | gr.Markdown("### Actions") |
| | with gr.Row(): |
| | rebuild_btn = gr.Button("Rebuild Dataset") |
| | rebuild_output = gr.Textbox(label="Result", lines=2) |
| | rebuild_btn.click(rebuild_dataset, outputs=[rebuild_output]) |
| |
|
| | with gr.Row(): |
| | hf_repo_input = gr.Textbox( |
| | label="HuggingFace Repo ID", |
| | placeholder="username/philosophy-corpus", |
| | ) |
| | push_btn = gr.Button("Push to HuggingFace", variant="primary") |
| | push_output = gr.Textbox(label="Result", lines=2) |
| | push_btn.click(push_to_hf, inputs=[hf_repo_input], outputs=[push_output]) |
| |
|
| | return app |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser(description="Philosophy Corpus Pipeline UI") |
| | parser.add_argument("--share", action="store_true", help="Create a public Gradio link") |
| | parser.add_argument("--port", type=int, default=7860, help="Port to run on") |
| | args = parser.parse_args() |
| |
|
| | app = build_ui() |
| | app.queue() |
| | app.launch(share=args.share, server_name="0.0.0.0", server_port=args.port) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|