Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import torch | |
| import bitsandbytes | |
| import accelerate | |
| import scipy | |
| import copy | |
| from PIL import Image | |
| import torch.nn as nn | |
| import pandas as pd | |
| from my_model.object_detection import detect_and_draw_objects | |
| from my_model.captioner.image_captioning import get_caption | |
| from my_model.gen_utilities import free_gpu_resources | |
| from my_model.KBVQA import KBVQA, prepare_kbvqa_model | |
| from my_model.utilities.state_manager import StateManager | |
| state_manager = StateManager() | |
| def answer_question(caption, detected_objects_str, question, model): | |
| free_gpu_resources() | |
| answer = model.generate_answer(question, caption, detected_objects_str) | |
| free_gpu_resources() | |
| return answer | |
| # Sample images (assuming these are paths to your sample images) | |
| sample_images = ["Files/sample1.jpg", "Files/sample2.jpg", "Files/sample3.jpg", | |
| "Files/sample4.jpg", "Files/sample5.jpg", "Files/sample6.jpg", | |
| "Files/sample7.jpg"] | |
| def analyze_image(image, model): | |
| img = copy.deepcopy(image) # we dont wanna apply changes to the original image | |
| caption = model.get_caption(img) | |
| image_with_boxes, detected_objects_str = model.detect_objects(img) | |
| st.text("I am ready, let's talk!") | |
| free_gpu_resources() | |
| return caption, detected_objects_str, image_with_boxes | |
| def image_qa_app(kbvqa): | |
| # Display sample images as clickable thumbnails | |
| st.write("Choose from sample images:") | |
| cols = st.columns(len(sample_images)) | |
| for idx, sample_image_path in enumerate(sample_images): | |
| with cols[idx]: | |
| image = Image.open(sample_image_path) | |
| st.image(image, use_column_width=True) | |
| if st.button(f'Select Sample Image {idx + 1}', key=f'sample_{idx}'): | |
| state_manager.process_new_image(sample_image_path, image, kbvqa) | |
| # Image uploader | |
| uploaded_image = st.file_uploader("Or upload an Image", type=["png", "jpg", "jpeg"]) | |
| if uploaded_image is not None: | |
| state_manager.process_new_image(uploaded_image.name, Image.open(uploaded_image), kbvqa) | |
| # Display and interact with each uploaded/selected image | |
| for image_key, image_data in state_manager.get_images_data().items(): | |
| st.image(image_data['image'], caption=f'Uploaded Image: {image_key[-11:]}', use_column_width=True) | |
| if not image_data['analysis_done']: | |
| st.text("Cool image, please click 'Analyze Image'..") | |
| if st.button('Analyze Image', key=f'analyze_{image_key}'): | |
| caption, detected_objects_str, image_with_boxes = state_manager.analyze_image(image_data['image'], kbvqa) | |
| state_manager.update_image_data(image_key, caption, detected_objects_str, True) | |
| # Initialize qa_history for each image | |
| qa_history = image_data.get('qa_history', []) | |
| if image_data['analysis_done']: | |
| question = st.text_input(f"Ask a question about this image ({image_key[-11:]}):", key=f'question_{image_key}') | |
| if st.button('Get Answer', key=f'answer_{image_key}'): | |
| if question not in [q for q, _ in qa_history]: | |
| answer = answer_question(image_data['caption'], image_data['detected_objects_str'], question, kbvqa) | |
| state_manager.add_to_qa_history(image_key, question, answer) | |
| # Display Q&A history for each image | |
| for q, a in qa_history: | |
| st.text(f"Q: {q}\nA: {a}\n") | |
| def process_new_image(image_key, image, kbvqa): | |
| """Process a new image and update the session state.""" | |
| if image_key not in st.session_state['images_data']: | |
| st.session_state['images_data'][image_key] = { | |
| 'image': image, | |
| 'caption': '', | |
| 'detected_objects_str': '', | |
| 'qa_history': [], | |
| 'analysis_done': False | |
| } | |
| def run_inference(): | |
| st.title("Run Inference") | |
| state_manager.initialize_state() | |
| state_manager.set_up_widgets() | |
| st.session_state.button_label = "Reload Model" if state_manager.is_model_loaded() and state_manager.has_state_changed() else "Load Model" | |
| # state_manager.display_session_state() | |
| state_manager.display_model_settings() | |
| state_manager.display_session_state() | |
| if st.session_state.method == "Fine-Tuned Model": | |
| if st.button(st.session_state.button_label): | |
| if st.session_state.button_label == "Load Model": | |
| if state_manager.is_model_loaded(): | |
| st.text("Model already loaded and no settings were changed:)") | |
| else: state_manager.load_model() | |
| else: | |
| state_manager.reload_detection_model() | |
| st.success("Model reloaded with updated settings and ready for inference.") | |
| if state_manager.is_model_loaded() and st.session_state.kbvqa.all_models_loaded: | |
| image_qa_app(state_manager.get_model()) | |
| st.write(st.session_state.kbvqa.all_models_loaded) | |
| else: | |
| st.write('Model is not ready yet, will be updated later.') | |
| def display_model_settings(): | |
| st.write("### Current Model Settings:") | |
| st.table(pd.DataFrame(st.session_state['model_settings'], index=[0])) | |
| def display_session_state(): | |
| st.write("### Current Session State:") | |
| # Convert session state to a list of dictionaries, each representing a row | |
| data = [{'Key': key, 'Value': str(value)} for key, value in st.session_state.items()] | |
| # Create a DataFrame from the list | |
| df = pd.DataFrame(data) | |
| st.table(df) | |