|
|
def caption_slide(image_path, slide_name, prompt="Diagnosis:", output_dir="./output"): |
|
|
"""Captions a Whole Slide Image(WSI). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
image_path: str |
|
|
Path to the whole slide image file. |
|
|
slide_name: str |
|
|
Name of whole slide image file |
|
|
prompt: str |
|
|
Starting prompt of the generated caption (default: "Diagnosis:") |
|
|
output_dir: str, optional |
|
|
Directory to save output files (default: "./output") |
|
|
Returns |
|
|
------- |
|
|
str |
|
|
Research log summarizing analysis and results |
|
|
""" |
|
|
import os |
|
|
import glob |
|
|
import timm |
|
|
import torch |
|
|
from PIL import Image |
|
|
import lazyslide as zs |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
from transformers import AutoModel |
|
|
from timm.layers import SwiGLUPacked |
|
|
from timm.data import resolve_data_config |
|
|
from huggingface_hub import login, whoami |
|
|
from timm.data.transforms_factory import create_transform |
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
|
|
login(token=os.getenv("HUGGINGFACE_ACCESS_TOKEN")) |
|
|
hf_user = whoami() |
|
|
username = hf_user['name'] |
|
|
|
|
|
|
|
|
virchow2 = timm.create_model("hf-hub:paige-ai/Virchow2", pretrained=True, mlp_layer=SwiGLUPacked, act_layer=torch.nn.SiLU) |
|
|
virchow2 = virchow2.eval() |
|
|
prism = AutoModel.from_pretrained('paige-ai/Prism', trust_remote_code=True) |
|
|
prism = prism.to(device) |
|
|
transforms = create_transform(**resolve_data_config(virchow2.pretrained_cfg, model=virchow2)) |
|
|
tile_embeddings = [] |
|
|
|
|
|
files = [f for f in glob.glob(f"{image_path}/*") if slide_name in os.path.basename(f)] |
|
|
if len(files) == 1 and files[0].endswith(".svs"): |
|
|
|
|
|
wsi = zs.open_wsi(f"{image_path}/{slide_name}.svs") |
|
|
tiles, tile_spec = zs.pp.tile_tissues(wsi, 224, mpp=0.5, return_tiles=True) |
|
|
|
|
|
tile_dir = Path("tiles") |
|
|
tile_dir.mkdir(exist_ok=True) |
|
|
for _, row in tiles.iterrows(): |
|
|
tile_id = row["tile_id"] |
|
|
geometry = row["geometry"] |
|
|
|
|
|
minx, miny, maxx, maxy = geometry.bounds |
|
|
width = int(maxx - minx) |
|
|
height = int(maxy - miny) |
|
|
|
|
|
|
|
|
tile_img = wsi.read_region(int(minx), int(miny), width, height, tile_spec.ops_level) |
|
|
tile_img = Image.fromarray(tile_img, 'RGB') |
|
|
tile_tensor = transforms(tile_img).unsqueeze(0) |
|
|
output = virchow2(tile_tensor) |
|
|
class_token = output[:, 0] |
|
|
patch_tokens = output[:, 1:] |
|
|
|
|
|
embedding = torch.cat([class_token, patch_tokens.mean(1)], dim=-1) |
|
|
tile_embeddings.append(embedding) |
|
|
|
|
|
|
|
|
tile_path = tile_dir / f"tile_{tile_id:05d}.png" |
|
|
tile_img.save(tile_path) |
|
|
else: |
|
|
|
|
|
for file in files: |
|
|
tile_img = Image.open(file).convert('RGB') |
|
|
tile_tensor = transforms(tile_img).unsqueeze(0) |
|
|
output = virchow2(tile_tensor) |
|
|
class_token = output[:, 0] |
|
|
patch_tokens = output[:, 1:] |
|
|
embedding = torch.cat([class_token, patch_tokens.mean(1)], dim=-1) |
|
|
tile_embeddings.append(embedding) |
|
|
|
|
|
tile_embeddings = torch.cat(tile_embeddings, dim=0).unsqueeze(0).to(device) |
|
|
with torch.autocast(device, torch.float16), torch.inference_mode(): |
|
|
reprs = prism.slide_representations(tile_embeddings) |
|
|
genned_ids = prism.generate( |
|
|
key_value_states=reprs['image_latents'], |
|
|
do_sample=False, |
|
|
num_beams=5, |
|
|
num_beam_groups=1, |
|
|
) |
|
|
generated_caption = prism.untokenize(genned_ids) |
|
|
|
|
|
|
|
|
|
|
|
log = f""" |
|
|
Research Log: Whole Slide Image Captioning |
|
|
Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
Image Path: {os.path.basename(image_path)} |
|
|
Slide Name: {slide_name} |
|
|
|
|
|
Analysis Steps: |
|
|
1. Logged into HuggingFace as {username} |
|
|
2. Load in PRISM and Virchow2 models for encoding and captioning |
|
|
3. Initialized, processed, tiled, and encode slide file(s) |
|
|
4. Generated the caption with "{prompt}" as initial prompt |
|
|
|
|
|
Results: |
|
|
|
|
|
Caption |
|
|
------- |
|
|
{generated_caption} |
|
|
""" |
|
|
|
|
|
return log |
|
|
|
|
|
|
|
|
def segment_slide(image_path, seg_type, model, output_dir="./output"): |
|
|
"""Segment a Whole Slide Image (WSI). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
image_path: str |
|
|
Path to the whole slide image file. |
|
|
seg_type: str |
|
|
Type of segmentation to perform |
|
|
model: str |
|
|
Segmentation model to use |
|
|
output_dir: str, optional |
|
|
Directory to save output files (default: "./output") |
|
|
Returns |
|
|
------- |
|
|
str |
|
|
Research log summarizing analysis and results |
|
|
""" |
|
|
import os |
|
|
import lazyslide as zs |
|
|
from datetime import datetime |
|
|
from huggingface_hub import login, whoami |
|
|
|
|
|
|
|
|
usable_models = set(zs.models.list_models("segmentation")) |
|
|
if seg_type not in {"cells", "cell_type", "semantic", "tissue", "artifact"}: return None |
|
|
if model not in usable_models: return None |
|
|
if seg_type == "tissue" and model not in {"grandqc", "pathprofiler"}: return None |
|
|
if seg_type == "artifact" and model != "grandqc": return None |
|
|
if seg_type == "cells" and model not in {"instanseg", "cellpose"}: return None |
|
|
if seg_type == "cell_type" and model != "nulite": return None |
|
|
|
|
|
|
|
|
login(token=os.getenv("HUGGINGFACE_ACCESS_TOKEN")) |
|
|
hf_user = whoami() |
|
|
username = hf_user['name'] |
|
|
|
|
|
|
|
|
wsi = zs.open_wsi(image_path) |
|
|
zs.pp.find_tissues(wsi) |
|
|
zs.pp.tile_graph(wsi) |
|
|
|
|
|
zs.pp.tile_tissues(wsi, 512, background_fraction=0.95, mpp=0.5) |
|
|
|
|
|
|
|
|
if seg_type == "cells": |
|
|
zs.seg.cells(wsi, model=model) |
|
|
elif seg_type == "cell_type": |
|
|
zs.seg.cell_type(wsi, model=model) |
|
|
elif seg_type == "semantic": |
|
|
zs.seg.semantic(wsi, model=model) |
|
|
elif seg_type == "tissue": |
|
|
zs.seg.tissue(wsi, model=model) |
|
|
else: |
|
|
zs.seg.artifact(wsi, model=model) |
|
|
|
|
|
|
|
|
|
|
|
log = f""" |
|
|
Research Log: Whole Slide Image Segmentation |
|
|
Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
Image: {os.path.basename(image_path)} |
|
|
|
|
|
Analysis Steps: |
|
|
1. Performed validity checking |
|
|
2. Logged into HuggingFace as {username} |
|
|
3. Open WSI, find, tile and graph tissues |
|
|
4. Segmented tissues using {model} |
|
|
5. Generated and displayed segmentation results in {output_dir} |
|
|
|
|
|
Results: |
|
|
|
|
|
Output Files |
|
|
""" |
|
|
return log |
|
|
|
|
|
def zero_shot_classification(image_path, labels, output_dir="./output"): |
|
|
"""Performs Zero-Shot Classification from Whole Slide Images (WSIs). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
image_path: str |
|
|
Path to the whole slide image file. |
|
|
labels: list |
|
|
Labels of the classes to perform zero-shot classification |
|
|
output_dir: str, optional |
|
|
Directory to save output files (default: "./output") |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
Research log summarizing analysis and results |
|
|
""" |
|
|
import os |
|
|
import lazyslide as zs |
|
|
from datetime import datetime |
|
|
from huggingface_hub import login, whoami |
|
|
|
|
|
|
|
|
login(token=os.getenv("HUGGINGFACE_ACCESS_TOKEN")) |
|
|
hf_user = whoami() |
|
|
username = hf_user['name'] |
|
|
wsi = zs.open_wsi(image_path) |
|
|
zs.pp.find_tissues(wsi) |
|
|
zs.pp.tile_tissues(wsi, 512, background_fraction=0.95, mpp=0.5) |
|
|
|
|
|
|
|
|
|
|
|
zs.tl.feature_extraction(wsi, "virchow") |
|
|
zs.tl.feature_aggregation(wsi, feature_key="virchow", encoder="prism") |
|
|
results = zs.tl.zero_shot_score(wsi, labels, feature_key="virchow_tiles") |
|
|
log = f""" |
|
|
Research Log: Zero-Shot Classification |
|
|
Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
Image: {os.path.basename(image_path)} |
|
|
|
|
|
Analysis Steps: |
|
|
1. Logged in as user {username} to HuggingFace |
|
|
2. Loaded WSI: {wsi} |
|
|
3. Found tissues |
|
|
4. Tiled tissues |
|
|
5. Extracted features |
|
|
6. Aggregated features |
|
|
|
|
|
|
|
|
Results: |
|
|
{results} |
|
|
|
|
|
Output Files: |
|
|
|
|
|
""" |
|
|
print(log) |
|
|
return log |
|
|
|
|
|
def quantify_tumor_infiltrating_lymphocites(image_path, tile_size=256, tile_step=128, batch_size=4, output_dir="./output"): |
|
|
"""Quantifies Tumor-Infiltrating Lymphocytes (TILs) from Whole-Slide Images (WSIs). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
image_path: str |
|
|
Path to the whole slide image file. |
|
|
tile_size: int, optional |
|
|
Size of inference tiles (default: 256) |
|
|
tile_step: int, optional |
|
|
Step size between inference tiles (default: 128) |
|
|
batch_size: int, optional |
|
|
Simulatenous inference tiles (default: 4) |
|
|
output_dir: str, optional |
|
|
Directory to save output files (default: "./output") |
|
|
Returns |
|
|
------- |
|
|
str |
|
|
Research log summarizing analysis and results |
|
|
|
|
|
""" |
|
|
import os |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import lazyslide as zs |
|
|
from datetime import datetime |
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
|
|
|
try: |
|
|
wsi = zs.open_wsi(image_path) |
|
|
except Exception as e: |
|
|
return f"Error loading WSI: {str(e)}" |
|
|
|
|
|
|
|
|
try: |
|
|
tissue_mask = zs.pp.find_tissues(wsi, refine_level=0, to_hsv=True) |
|
|
except: |
|
|
return f"Error building tissue mask: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
zs.seg.cell_types(wsi, batch_size=batch_size) |
|
|
except Exception as e: |
|
|
return f"Error during cell type segmentation: {str(e)}" |
|
|
|
|
|
|
|
|
instance_map = zs.io.load_annotations(wsi, "instance_map") |
|
|
type_map = zs.io.load_annotations(wsi, "cell_types") |
|
|
|
|
|
instance_map_path = os.path.join(output_dir, "instance_map.npy") |
|
|
type_map_path = os.path.join(output_dir, "cell_type_map.npy") |
|
|
np.save(instance_map_path, instance_map) |
|
|
np.save(type_map_path, type_map) |
|
|
|
|
|
|
|
|
til_type_id = 1 |
|
|
|
|
|
|
|
|
valid_cells = tissue_mask & (type_map == til_type_id) |
|
|
total_cells = np.count_nonzero(valid_cells) |
|
|
til_cells = np.count_nonzero(valid_cells & (type_map == til_type_id)) |
|
|
|
|
|
|
|
|
pixel_area_mm2 = (wsi.mpp ** 2) / 1e6 |
|
|
roi_area_mm2 = np.count_nonzero(tissue_mask) * pixel_area_mm2 |
|
|
til_density = til_cells / roi_area_mm2 if roi_area_mm2 > 0 else float("nan") |
|
|
total_density = total_cells / roi_area_mm2 if roi_area_mm2 > 0 else float("nan") |
|
|
til_fraction = til_cells / total_cells if total_cells > 0 else float("nan") |
|
|
|
|
|
|
|
|
metrics = { |
|
|
"total_nuclei": total_cells, |
|
|
"til_nuclei": til_cells, |
|
|
"til_fraction": til_fraction, |
|
|
"til_density_per_mm2": til_density, |
|
|
"total_density_per_mm2": total_density, |
|
|
"roi_area_mm2": roi_area_mm2 |
|
|
} |
|
|
metrics_df = pd.DataFrame([metrics]) |
|
|
metrics_path = os.path.join(output_dir, "metrics.csv") |
|
|
metrics_df.to_csv(metrics_path, index=False) |
|
|
|
|
|
|
|
|
overlay = np.zeros((*type_map.shape, 3), dtype=np.uint8) |
|
|
overlay[type_map == til_type_id] = [255, 0, 0] |
|
|
overlay[(type_map != til_type_id) & (instance_map > 0)] = [0, 255, 0] |
|
|
overlay_path = os.path.join(output_dir, "overlay.png") |
|
|
plt.imsave(overlay_path, overlay) |
|
|
|
|
|
|
|
|
log = f""" |
|
|
Research Log: Quantification of Tumor-Infiltrating Lymphocytes |
|
|
Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
Image: {os.path.basename(image_path)} |
|
|
|
|
|
Analysis Steps: |
|
|
1. Loaded and preprocessed the whole slide image into upscaled tiles |
|
|
2. Applied NuLite Nucleus Instance Segmentation and Classification on tiles |
|
|
3. Computed and quantified TIL (based on inflammed cell class) and total nuclear density |
|
|
|
|
|
Results: |
|
|
- Total Nuclei: {int(total_cells)} |
|
|
- Total Inflammed Nuclei: {int(til_cells)} |
|
|
- Fiber Density: {til_density:.2f} |
|
|
|
|
|
Output Files: |
|
|
- Segmented Image: {os.path.basename(overlay_path)} |
|
|
- Measurements: {os.path.basename(metrics_path)} |
|
|
""" |
|
|
|
|
|
return log |
|
|
|
|
|
def quantify_fibrosis(image_path, model="grandqc", output_dir="./output"): |
|
|
"""Quantifies Fibrosis from Whole Slide Images (WSIs). |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
image_path: str |
|
|
Path to the image file. |
|
|
output_dir: str, optional |
|
|
Directory to save output files (default: "./output") |
|
|
model: str, optional |
|
|
Tissue segmentation model to use (default: grandqc) |
|
|
|
|
|
Returns |
|
|
------- |
|
|
str |
|
|
Research log summarizing analysis and results |
|
|
""" |
|
|
import os |
|
|
import lazyslide as zs |
|
|
from datetime import datetime |
|
|
|
|
|
try: |
|
|
wsi = zs.open_wsi(image_path) |
|
|
except Exception as e: |
|
|
return f"Error loading WSI: {str(e)}" |
|
|
|
|
|
zs.seg.tissue(wsi, model=model) |
|
|
log = f""" |
|
|
Research Log: Template |
|
|
Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
|
|
Image: {os.path.basename(image_path)} |
|
|
|
|
|
Analysis Steps: |
|
|
1. |
|
|
2. |
|
|
3. |
|
|
|
|
|
Results: |
|
|
- |
|
|
- |
|
|
- |
|
|
|
|
|
Output Files: |
|
|
- |
|
|
- |
|
|
|
|
|
|
|
|
""" |
|
|
return log |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|