LisaMegaWatts's picture
Enable Gradio queue for streaming UI updates
a9bec47 verified
"""
Gradio frontend for the text processing pipeline.
Provides drag-and-drop file upload, URL fetching, search across
Project Gutenberg / MIT Classics / Internet Archive, and corpus
management with HuggingFace push.
Usage:
python app.py # Launch on http://localhost:7860
python app.py --share # Launch with public Gradio link
"""
import argparse
import logging
import os
import shutil
import sys
import tempfile
from pathlib import Path
# Ensure the script directory is on the path for imports
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
from pipeline import Pipeline
logger = logging.getLogger("app")
# ---------------------------------------------------------------------------
# Pipeline singleton
# ---------------------------------------------------------------------------
_pipeline: Pipeline | None = None
def get_pipeline() -> Pipeline:
global _pipeline
if _pipeline is None:
_pipeline = Pipeline()
return _pipeline
# ---------------------------------------------------------------------------
# Tab 1: Add Texts
# ---------------------------------------------------------------------------
def process_uploaded_files(files) -> str:
"""Process uploaded files through the pipeline."""
if not files:
return "No files uploaded."
pipeline = get_pipeline()
results = []
for file_obj in files:
src = Path(file_obj.name)
dest = pipeline.inbox / src.name
# Copy to inbox
shutil.copy2(str(src), str(dest))
results.append(f"Copied {src.name} to inbox/")
# Process inbox
new_chunks = pipeline.process_inbox()
# Rebuild output
train_n, val_n = pipeline.rebuild_output()
results.append(f"\nProcessed: {new_chunks} new chunks")
results.append(f"Total corpus: {train_n} train / {val_n} val")
return "\n".join(results)
def fetch_url(url: str) -> str:
"""Download text from a URL and process it."""
if not url.strip():
return "Please enter a URL."
import requests
pipeline = get_pipeline()
url = url.strip()
try:
resp = requests.get(url, timeout=30, headers={
"User-Agent": "PhilosophyCorpus-Pipeline/1.0",
})
resp.raise_for_status()
# Determine filename from URL
fname = url.split("/")[-1]
if not fname.endswith(".txt"):
fname = fname.replace(".", "_") + ".txt"
# Save to inbox
dest = pipeline.inbox / fname
dest.write_text(resp.text, encoding="utf-8")
# Process
new_chunks = pipeline.process_inbox()
train_n, val_n = pipeline.rebuild_output()
return (
f"Downloaded: {fname} ({len(resp.text):,} chars)\n"
f"Processed: {new_chunks} new chunks\n"
f"Total corpus: {train_n} train / {val_n} val"
)
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Tab 2: Internet Archive Search
# ---------------------------------------------------------------------------
def search_archive(query: str, subject: str) -> list[list]:
"""Search Internet Archive and return results as table rows."""
if not query.strip():
return []
from sources.ia_search import search_ia
subject_key = subject.lower() if subject != "All" else None
results = search_ia(query, subject=subject_key, rows=20)
rows = []
for r in results:
creator = r["creator"]
if isinstance(creator, list):
creator = ", ".join(creator)
rows.append([
r["identifier"],
r["title"],
creator,
str(r["date"])[:10] if r["date"] else "",
str(r["downloads"]),
])
return rows
def add_ia_text(identifier: str) -> str:
"""Download an IA text and process it through the pipeline."""
if not identifier.strip():
return "Please enter an Internet Archive identifier."
from sources.ia_search import get_ia_text
pipeline = get_pipeline()
try:
text = get_ia_text(identifier.strip())
fname = f"ia_{identifier.strip()}.txt"
dest = pipeline.inbox / fname
dest.write_text(text, encoding="utf-8")
new_chunks = pipeline.process_inbox()
train_n, val_n = pipeline.rebuild_output()
return (
f"Downloaded: {identifier} ({len(text):,} chars)\n"
f"Processed: {new_chunks} new chunks\n"
f"Total corpus: {train_n} train / {val_n} val"
)
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Tab 3: Search Project Gutenberg
# ---------------------------------------------------------------------------
def search_gutenberg_ui(query: str, topic: str) -> list[list]:
"""Search Gutenberg via Gutendex and return results as table rows."""
if not query.strip():
return []
from sources.gutenberg_search import search_gutenberg
topic_key = topic.lower() if topic != "All" else None
results = search_gutenberg(query, topic=topic_key, rows=20)
rows = []
for r in results:
rows.append([
str(r["id"]),
r["title"],
r["author"],
r["subjects"][:60],
str(r["download_count"]),
])
return rows
def add_gutenberg_text(book_id: str) -> str:
"""Download a Gutenberg text and process it through the pipeline."""
if not book_id.strip():
return "Please enter a Gutenberg book ID."
from sources.gutenberg_search import get_gutenberg_text
pipeline = get_pipeline()
try:
bid = int(book_id.strip())
text = get_gutenberg_text(bid)
fname = f"gutenberg_{bid}.txt"
dest = pipeline.inbox / fname
dest.write_text(text, encoding="utf-8")
new_chunks = pipeline.process_inbox()
train_n, val_n = pipeline.rebuild_output()
return (
f"Downloaded: Gutenberg #{bid} ({len(text):,} chars)\n"
f"Processed: {new_chunks} new chunks\n"
f"Total corpus: {train_n} train / {val_n} val"
)
except ValueError as e:
return f"Error: Invalid book ID '{book_id}' — enter a number (e.g. 1497)"
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Tab 4: Browse MIT Classics
# ---------------------------------------------------------------------------
def search_mit_ui(query: str, author: str) -> list[list]:
"""Search MIT Classics catalog and return results as table rows."""
from sources.mit_classics_search import search_mit_classics
author_key = author if author != "All" else ""
results = search_mit_classics(query=query.strip(), author=author_key)
rows = []
for r in results:
rows.append([
r["author"],
r["title"],
r["work_path"],
])
return rows
def get_mit_authors_list() -> list[str]:
"""Get author list for the dropdown (lazy-loaded)."""
try:
from sources.mit_classics_search import get_authors
return ["All"] + get_authors()
except Exception:
return ["All"]
def add_mit_text(work_path: str) -> str:
"""Download an MIT Classics text and process it through the pipeline."""
if not work_path.strip():
return "Please enter a work path (e.g. /Plato/republic.html)."
from sources.mit_classics_search import get_mit_text
pipeline = get_pipeline()
try:
text = get_mit_text(work_path.strip())
# Build filename from path: /Aristotle/rhetoric.html -> mit_aristotle_rhetoric.txt
parts = work_path.strip("/").replace(".html", "").split("/")
fname = "mit_" + "_".join(parts).lower() + ".txt"
dest = pipeline.inbox / fname
dest.write_text(text, encoding="utf-8")
new_chunks = pipeline.process_inbox()
train_n, val_n = pipeline.rebuild_output()
return (
f"Downloaded: {work_path} ({len(text):,} chars)\n"
f"Processed: {new_chunks} new chunks\n"
f"Total corpus: {train_n} train / {val_n} val"
)
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Tab 5: Corpus Management
# ---------------------------------------------------------------------------
def get_corpus_stats() -> str:
"""Get current corpus statistics."""
pipeline = get_pipeline()
parsed_files = sorted(pipeline.parsed.glob("*.txt"))
if not parsed_files:
return "No parsed files yet. Add texts to get started."
lines_out = ["File Chunks Chars", "-" * 60]
total_chunks = 0
total_chars = 0
for pf in parsed_files:
file_lines = [l for l in pf.read_text(encoding="utf-8").splitlines() if l.strip()]
chars = sum(len(l) for l in file_lines)
total_chunks += len(file_lines)
total_chars += chars
lines_out.append(f"{pf.name:<40} {len(file_lines):>8} {chars:>10}")
lines_out.append("-" * 60)
lines_out.append(f"{'TOTAL':<40} {total_chunks:>8} {total_chars:>10}")
if total_chunks > 0:
avg = total_chars / total_chunks
lines_out.append(f"\nAverage chunk length: {avg:.0f} chars")
# Output split info
train_path = pipeline.output / "train.txt"
val_path = pipeline.output / "val.txt"
if train_path.exists() and val_path.exists():
train_n = len([l for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()])
val_n = len([l for l in val_path.read_text(encoding="utf-8").splitlines() if l.strip()])
lines_out.append(f"\nOutput split: {train_n} train / {val_n} val")
# Vocabulary check
text = train_path.read_text(encoding="utf-8")
vocab = sorted(set(text) - {"\n"})
lines_out.append(f"Vocabulary: {len(vocab)} chars -> {''.join(vocab)}")
return "\n".join(lines_out)
def get_sample_chunks() -> str:
"""Get sample chunks from the training data."""
pipeline = get_pipeline()
train_path = pipeline.output / "train.txt"
if not train_path.exists():
return "No training data yet. Process some texts first."
lines = [l.strip() for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()]
if not lines:
return "Training file is empty."
import random
samples = random.sample(lines, min(10, len(lines)))
return "\n\n---\n\n".join(f"[{i+1}] {s}" for i, s in enumerate(samples))
def rebuild_dataset() -> str:
"""Rebuild train/val split from existing parsed chunks."""
pipeline = get_pipeline()
train_n, val_n = pipeline.rebuild_output()
return f"Rebuilt: {train_n} train / {val_n} val chunks"
def push_to_hf(repo_id: str) -> str:
"""Push dataset to HuggingFace Hub."""
if not repo_id.strip():
return "Please enter a HuggingFace repo ID (e.g. username/philosophy-corpus)."
pipeline = get_pipeline()
try:
url = pipeline.push_to_hub(repo_id=repo_id.strip())
return f"Dataset pushed successfully!\n{url}"
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------------------
def build_ui():
import gradio as gr
with gr.Blocks(title="Philosophy Corpus Pipeline", theme=gr.themes.Soft()) as app:
gr.Markdown("# Philosophy Corpus Pipeline\nBuild training data for JuliaGPT")
with gr.Tab("Add Texts"):
gr.Markdown("### Upload Files")
file_upload = gr.File(
label="Drag and drop .txt, .epub, or .zip files",
file_count="multiple",
file_types=[".txt", ".epub", ".zip"],
)
upload_btn = gr.Button("Process Uploaded Files", variant="primary")
upload_output = gr.Textbox(label="Result", lines=6)
upload_btn.click(process_uploaded_files, inputs=[file_upload], outputs=[upload_output])
gr.Markdown("### Fetch from URL")
url_input = gr.Textbox(
label="Text URL (Gutenberg, MIT Classics, Internet Archive, or any .txt URL)",
placeholder="https://www.gutenberg.org/cache/epub/21076/pg21076.txt",
)
fetch_btn = gr.Button("Fetch and Process")
fetch_output = gr.Textbox(label="Result", lines=4)
fetch_btn.click(fetch_url, inputs=[url_input], outputs=[fetch_output])
with gr.Tab("Search Gutenberg"):
gr.Markdown("### Search Project Gutenberg for public domain texts")
with gr.Row():
gut_query = gr.Textbox(label="Search Query", placeholder="aristotle philosophy")
gut_topic = gr.Dropdown(
choices=["All", "Philosophy", "Ethics", "Politics",
"Metaphysics", "Science", "Mathematics",
"Classical", "Religion", "History"],
value="Philosophy",
label="Topic Filter",
)
gut_search_btn = gr.Button("Search", variant="primary")
gut_results = gr.Dataframe(
headers=["ID", "Title", "Author", "Subjects", "Downloads"],
label="Search Results",
interactive=False,
)
gut_search_btn.click(
search_gutenberg_ui,
inputs=[gut_query, gut_topic],
outputs=[gut_results],
)
gr.Markdown("### Add a text to the corpus")
gut_id_input = gr.Textbox(
label="Gutenberg Book ID",
placeholder="Paste a book ID from the search results above (e.g. 1497)",
)
gut_add_btn = gr.Button("Download and Process")
gut_add_output = gr.Textbox(label="Result", lines=4)
gut_add_btn.click(add_gutenberg_text, inputs=[gut_id_input], outputs=[gut_add_output])
with gr.Tab("Browse MIT Classics"):
gr.Markdown("### Search the MIT Internet Classics Archive (441 works by 59 authors)")
with gr.Row():
mit_query = gr.Textbox(label="Search Query", placeholder="republic")
mit_author = gr.Dropdown(
choices=get_mit_authors_list(),
value="All",
label="Author Filter",
)
mit_search_btn = gr.Button("Search", variant="primary")
mit_results = gr.Dataframe(
headers=["Author", "Title", "Work Path"],
label="Search Results",
interactive=False,
)
mit_search_btn.click(
search_mit_ui,
inputs=[mit_query, mit_author],
outputs=[mit_results],
)
gr.Markdown("### Add a text to the corpus")
mit_path_input = gr.Textbox(
label="Work Path",
placeholder="Paste a work path from the results above (e.g. /Plato/republic.html)",
)
mit_add_btn = gr.Button("Download and Process")
mit_add_output = gr.Textbox(label="Result", lines=4)
mit_add_btn.click(add_mit_text, inputs=[mit_path_input], outputs=[mit_add_output])
with gr.Tab("Search Internet Archive"):
gr.Markdown("### Search the Internet Archive for classical texts")
with gr.Row():
search_input = gr.Textbox(label="Search Query", placeholder="aristotle philosophy")
subject_dropdown = gr.Dropdown(
choices=["All", "Philosophy", "Mathematics", "Rhetoric",
"Logic", "Ethics", "Metaphysics", "Politics", "Classical"],
value="Philosophy",
label="Subject Filter",
)
search_btn = gr.Button("Search", variant="primary")
search_results = gr.Dataframe(
headers=["Identifier", "Title", "Author", "Date", "Downloads"],
label="Search Results",
interactive=False,
)
search_btn.click(
search_archive,
inputs=[search_input, subject_dropdown],
outputs=[search_results],
)
gr.Markdown("### Add a text to the corpus")
ia_id_input = gr.Textbox(
label="Internet Archive Identifier",
placeholder="Paste an identifier from the search results above",
)
add_btn = gr.Button("Download and Process")
add_output = gr.Textbox(label="Result", lines=4)
add_btn.click(add_ia_text, inputs=[ia_id_input], outputs=[add_output])
with gr.Tab("Corpus"):
gr.Markdown("### Corpus Statistics")
stats_output = gr.Textbox(label="Statistics", lines=15, value=get_corpus_stats)
refresh_btn = gr.Button("Refresh Stats")
refresh_btn.click(get_corpus_stats, outputs=[stats_output])
gr.Markdown("### Sample Chunks")
sample_output = gr.Textbox(label="Random samples from training data", lines=15)
sample_btn = gr.Button("Show Samples")
sample_btn.click(get_sample_chunks, outputs=[sample_output])
gr.Markdown("### Actions")
with gr.Row():
rebuild_btn = gr.Button("Rebuild Dataset")
rebuild_output = gr.Textbox(label="Result", lines=2)
rebuild_btn.click(rebuild_dataset, outputs=[rebuild_output])
with gr.Row():
hf_repo_input = gr.Textbox(
label="HuggingFace Repo ID",
placeholder="username/philosophy-corpus",
)
push_btn = gr.Button("Push to HuggingFace", variant="primary")
push_output = gr.Textbox(label="Result", lines=2)
push_btn.click(push_to_hf, inputs=[hf_repo_input], outputs=[push_output])
return app
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Philosophy Corpus Pipeline UI")
parser.add_argument("--share", action="store_true", help="Create a public Gradio link")
parser.add_argument("--port", type=int, default=7860, help="Port to run on")
args = parser.parse_args()
app = build_ui()
app.queue()
app.launch(share=args.share, server_name="0.0.0.0", server_port=args.port)
if __name__ == "__main__":
main()