Spaces:
Running
Running
| import os | |
| from huggingface_hub import snapshot_download | |
| import streamlit as st | |
| from utils.help import get_intro, get_disclaimer | |
| from utils.format import sec_to_time, fix_latex, get_youtube_embed | |
| from utils.rag_utils import load_youtube_data, load_book_data, load_summary, embed_question_sentence_transformer, fixed_knn_retrieval, get_random_question | |
| from utils.system_prompts import get_expert_system_prompt, get_synthesis_user_prompt, get_synthesis_system_prompt | |
| from utils.openai_utils import embed_question_openai, openai_domain_specific_answer_generation, openai_context_integration | |
| from utils.endpoint_utils import get_inference_endpoint_response, parse_thinking_response, get_custom_inference_endpoint_response | |
| st.set_page_config(page_title="AI University") | |
| st.markdown(""" | |
| <style> | |
| .video-wrapper { | |
| position: relative; | |
| padding-bottom: 56.25%; | |
| height: 0; | |
| } | |
| .video-wrapper iframe { | |
| position: absolute; | |
| top: 0; | |
| left: 0; | |
| width: 100%; | |
| height: 100%; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # --------------------------------------- | |
| # paths | |
| # --------------------------------------- | |
| HOME = "." | |
| data_dir = HOME +"/data" | |
| private_data_dir = HOME + "/private_data" # Relative path in your Space | |
| # getting private data | |
| os.makedirs(private_data_dir, exist_ok=True) | |
| token = os.getenv("HF_API_KEY") | |
| local_repo_path = snapshot_download( | |
| repo_id="my-ai-university/data", | |
| use_auth_token=token, | |
| repo_type="dataset", | |
| local_dir=private_data_dir, | |
| ) | |
| adapter_path = HOME + "/LLaMA-TOMMI-1.0/" | |
| base_model_path = "meta-llama/Llama-3.2-11B-Vision-Instruct" | |
| # --------------------------------------- | |
| st.title(":red[AI University] :gray[/] FEM") | |
| st.markdown(get_intro(), unsafe_allow_html=True) | |
| st.markdown(" ") | |
| st.markdown(" ") | |
| # Sidebar for settings | |
| with st.sidebar: | |
| st.header("Settings") | |
| with st.expander('Embedding model', expanded=True): | |
| embedding_model = st.selectbox("Choose content embedding model", [ | |
| "text-embedding-3-small", | |
| "all-MiniLM-L6-v2", | |
| ]) | |
| st.divider() | |
| st.write('**Video lectures**') | |
| if embedding_model == "all-MiniLM-L6-v2": | |
| yt_token_choice = st.select_slider("Token per content", [128, 256], value=256, help="Larger values lead to an increase in the length of each retrieved piece of content", key="yt_token_len") | |
| elif embedding_model == "text-embedding-3-small": | |
| yt_token_choice = st.select_slider("Token per content", [256, 512, 1024], value=256, help="Larger values lead to an increase in the length of each retrieved piece of content", key="yt_token_len") | |
| yt_chunk_tokens = yt_token_choice | |
| yt_max_content = {128: 32, 256: 16, 512: 8, 1024: 4}[yt_chunk_tokens] | |
| top_k_YT = st.slider("Number of content pieces to retrieve", 0, yt_max_content, 4, key="yt_token_num") | |
| yt_overlap_tokens = yt_chunk_tokens // 4 | |
| st.divider() | |
| st.write('**Textbook**') | |
| show_textbook = False | |
| if embedding_model == "all-MiniLM-L6-v2": | |
| latex_token_choice = st.select_slider("Token per content", [128, 256], value=256, help="Larger values lead to an increase in the length of each retrieved piece of content", key="latex_token_len") | |
| elif embedding_model == "text-embedding-3-small": | |
| latex_token_choice = st.select_slider("Token per content", [128, 256, 512, 1024], value=256, help="Larger values lead to an increase in the length of each retrieved piece of content", key="latex_token_len") | |
| latex_chunk_tokens = latex_token_choice | |
| latex_max_content = {128: 32, 256: 16, 512: 8, 1024: 4}[latex_chunk_tokens] | |
| top_k_Latex = st.slider("Number of content pieces to retrieve", 0, latex_max_content, 4, key="latex_token_num") | |
| latex_overlap_tokens = 0 | |
| st.write(' ') | |
| with st.expander('Expert model', expanded=True): | |
| if 'activate_expert' in st.session_state: | |
| st.session_state.activate_expert = st.toggle("Use expert model", value=st.session_state.activate_expert) | |
| else: | |
| st.session_state.activate_expert = st.toggle("Use expert model", value=True) | |
| st.session_state.expert_model = st.selectbox( | |
| "Choose the LLM model", | |
| ["LLaMA-TOMMI-1.0-11B", "LLaMA-3.2-11B", "gpt-4o-mini"], | |
| index=0, # Default to LLaMA-TOMMI-1.0-11B | |
| key='a1model' | |
| ) | |
| if st.session_state.expert_model in ["LLaMA-TOMMI-1.0-11B", "LLaMA-3.2-11B"]: | |
| expert_do_sample = st.toggle("Enable Sampling", value=False, key='expert_sample') | |
| if expert_do_sample: | |
| expert_temperature = st.slider("Temperature", 0.0, 1.0, 0.2, key='expert_temp') | |
| expert_top_k = st.slider("Top K", 0, 100, 50, key='expert_top_k') | |
| expert_top_p = st.slider("Top P", 0.0, 1.0, 0.1, key='expert_top_p') | |
| else: | |
| expert_num_beams = st.slider("Num Beams", 1, 4, 1, key='expert_num_beams') | |
| expert_max_new_tokens = st.slider("Max New Tokens", 100, 2000, 500, step=50, key='expert_max_new_tokens') | |
| else: | |
| expert_api_temperature = st.slider("Temperature", 0.0, 1.0, 0.2, key='a1t') | |
| expert_api_top_p = st.slider("Top P", 0.0, 1.0, 0.1, key='a1p') | |
| with st.expander('Synthesis model', expanded=True): | |
| st.session_state.synthesis_model = st.selectbox( | |
| "Choose the LLM model", | |
| ["DeepSeek-R1-0528-Qwen3-8B", "gpt-4o-mini", "gpt-4.1-mini"], | |
| index=0, # Default to DeepSeek-R1 | |
| key='a2model' | |
| ) | |
| if st.session_state.synthesis_model == "DeepSeek-R1-0528-Qwen3-8B": | |
| synthesis_deepseek_temperature = st.slider("Temperature", 0.0, 1.0, 0.2, key='synthesis_deepseek_temperature') | |
| synthesis_deepseek_top_p = st.slider("Top P", 0.0, 1.0, 0.1, key='synthesis_deepseek_top_p') | |
| synthesis_deepseek_max_tokens = st.slider("Max Tokens", 1000, 4000, 10000, step=100, key='synthesis_deepseek_max_tokens') | |
| else: | |
| synthesis_api_temperature = st.slider("Temperature", 0.0, .3, .5, help="Defines the randomness in the next token prediction. Lower: More predictable and focused. Higher: More adventurous and diverse.", key='a2t') | |
| synthesis_api_top_p = st.slider("Top P", 0.1, 0.5, .3, help="Defines the range of token choices the model can consider in the next prediction. Lower: More focused and restricted to high-probability options. Higher: More creative, allowing consideration of less likely options.", key='a2p') | |
| # Main content area | |
| if "question" not in st.session_state: | |
| st.session_state.question = "" | |
| text_area_placeholder = st.empty() | |
| question_help = "Including details or instructions improves the answer." | |
| st.session_state.question = text_area_placeholder.text_area( | |
| "**Enter your query about Finite Element Method**", | |
| height=120, | |
| value=st.session_state.question, | |
| help=question_help | |
| ) | |
| _, col1, col2, _ = st.columns([4, 2, 4, 3]) | |
| with col1: | |
| submit_button_placeholder = st.empty() | |
| with col2: | |
| if st.button("π² Random Question"): | |
| while True: | |
| random_question = get_random_question(data_dir + "/questions.txt") | |
| if random_question != st.session_state.question: | |
| break | |
| st.session_state.question = random_question | |
| text_area_placeholder.text_area( | |
| "**Enter your query about Finite Element Method:**", | |
| height=120, | |
| value=st.session_state.question, | |
| help=question_help | |
| ) | |
| # Load YouTube and LaTeX data | |
| text_data_YT, context_embeddings_YT = load_youtube_data(data_dir, embedding_model, yt_chunk_tokens, yt_overlap_tokens) | |
| text_data_Latex, context_embeddings_Latex = load_book_data(private_data_dir, embedding_model, latex_chunk_tokens, latex_overlap_tokens) | |
| summary = load_summary(data_dir + '/KG_FEM_summary.json') | |
| # Initialize session state variables | |
| if 'question_answered' not in st.session_state: | |
| st.session_state.question_answered = False | |
| if 'context_by_video' not in st.session_state: | |
| st.session_state.context_by_video = {} | |
| if 'context_by_section' not in st.session_state: | |
| st.session_state.context_by_section = {} | |
| if 'answer' not in st.session_state: | |
| st.session_state.answer = "" | |
| if 'thinking' not in st.session_state: | |
| st.session_state.thinking = "" | |
| if 'playing_video_id' not in st.session_state: | |
| st.session_state.playing_video_id = None | |
| if 'yt_context_for_display' not in st.session_state: | |
| st.session_state.yt_context_for_display = "" | |
| if 'latex_context_count' not in st.session_state: | |
| st.session_state.latex_context_count = 0 | |
| if 'video_context_count' not in st.session_state: | |
| st.session_state.video_context_count = 0 | |
| if submit_button_placeholder.button("AI Answer", type="primary"): | |
| if st.session_state.question == "": | |
| st.markdown("") | |
| st.write("Please enter a query. :smirk:") | |
| st.session_state.question_answered = False | |
| else: | |
| with st.spinner("Finding relevant contexts..."): | |
| if embedding_model == "all-MiniLM-L6-v2": | |
| question_embedding = embed_question_sentence_transformer(st.session_state.question, model_name="all-MiniLM-L6-v2") | |
| elif embedding_model == "text-embedding-3-small": | |
| question_embedding = embed_question_openai(st.session_state.question, embedding_model) | |
| initial_max_k = int(0.1 * context_embeddings_YT.shape[0]) | |
| idx_YT = fixed_knn_retrieval(question_embedding, context_embeddings_YT, top_k=top_k_YT, min_k=0) | |
| idx_Latex = fixed_knn_retrieval(question_embedding, context_embeddings_Latex, top_k=top_k_Latex, min_k=0) | |
| relevant_contexts_YT = sorted([text_data_YT[i] for i in idx_YT], key=lambda x: x['order']) | |
| relevant_contexts_Latex = sorted([text_data_Latex[i] for i in idx_Latex], key=lambda x: x['order']) | |
| st.session_state.context_by_video = {} | |
| for context_item in relevant_contexts_YT: | |
| video_id = context_item['video_id'] | |
| if video_id not in st.session_state.context_by_video: | |
| st.session_state.context_by_video[video_id] = [] | |
| st.session_state.context_by_video[video_id].append(context_item) | |
| st.session_state.video_context_count = len(st.session_state.context_by_video) | |
| st.session_state.context_by_section = {} | |
| for context_item in relevant_contexts_Latex: | |
| section_id = context_item['section'] | |
| if section_id not in st.session_state.context_by_section: | |
| st.session_state.context_by_section[section_id] = [] | |
| st.session_state.context_by_section[section_id].append(context_item) | |
| # Build context strings | |
| yt_context_string = '' | |
| for i, (video_id, contexts) in enumerate(st.session_state.context_by_video.items(), start=1): | |
| yt_context_string += f"--- Video {i}: {contexts[0]['title']} ---\n" | |
| for context_item in contexts: | |
| start_time = int(context_item['start']) | |
| yt_context_string += f"Timestamp {sec_to_time(start_time)}: {context_item['text']}\n\n" | |
| latex_context_string = '' | |
| if top_k_Latex > 0: | |
| for i, (section_id, contexts) in enumerate(st.session_state.context_by_section.items(), start=1): | |
| latex_context_string += f'--- Textbook Section {i} ({section_id}) ---\n' | |
| for context_item in contexts: | |
| latex_context_string += context_item['text'] + '\n\n' | |
| context_for_llm = yt_context_string + latex_context_string | |
| st.session_state.yt_context_for_display = fix_latex(yt_context_string) | |
| st.session_state.latex_context_count = len(st.session_state.context_by_section) | |
| with st.spinner("Answering the question..."): | |
| if st.session_state.activate_expert: | |
| if st.session_state.expert_model in ["LLaMA-TOMMI-1.0-11B", "LLaMA-3.2-11B"]: | |
| if st.session_state.expert_model == "LLaMA-TOMMI-1.0-11B": | |
| use_expert = True | |
| elif st.session_state.expert_model == "LLaMA-3.2-11B": | |
| use_expert = False | |
| messages = [ | |
| {"role": "system", "content": get_expert_system_prompt()}, | |
| {"role": "user", "content": st.session_state.question} | |
| ] | |
| expert_answer = get_custom_inference_endpoint_response( | |
| messages=messages, | |
| use_expert=use_expert, | |
| tokenizer_max_length=500, | |
| do_sample=expert_do_sample, | |
| temperature=expert_temperature if expert_do_sample else None, | |
| top_k=expert_top_k if expert_do_sample else None, | |
| top_p=expert_top_p if expert_do_sample else None, | |
| num_beams=expert_num_beams if not expert_do_sample else 1, | |
| max_new_tokens=expert_max_new_tokens | |
| ) | |
| else: | |
| expert_answer = openai_domain_specific_answer_generation( | |
| get_expert_system_prompt(), | |
| st.session_state.question, | |
| model=st.session_state.expert_model, | |
| temperature=expert_api_temperature, | |
| top_p=expert_api_top_p | |
| ) | |
| st.session_state.expert_answer = fix_latex(expert_answer) | |
| else: | |
| st.session_state.expert_answer = 'No Expert Answer. Only use the context.' | |
| if st.session_state.synthesis_model == "DeepSeek-R1-0528-Qwen3-8B": | |
| messages = [ | |
| {"role": "system", "content": get_synthesis_system_prompt("Finite Element Method")}, | |
| {"role": "user", "content": get_synthesis_user_prompt(st.session_state.question, st.session_state.expert_answer, context_for_llm)} | |
| ] | |
| raw_synthesis_answer = get_inference_endpoint_response( | |
| model="tgi",#"deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", | |
| messages=messages, | |
| temperature=synthesis_deepseek_temperature, | |
| top_p=synthesis_deepseek_top_p, | |
| max_tokens=synthesis_deepseek_max_tokens | |
| ) | |
| # print(raw_synthesis_answer) | |
| thinking, synthesis_answer = parse_thinking_response(raw_synthesis_answer) | |
| st.session_state.thinking = thinking | |
| else: | |
| synthesis_answer = openai_context_integration( | |
| get_synthesis_system_prompt("Finite Element Method"), | |
| st.session_state.question, | |
| st.session_state.expert_answer, | |
| context_for_llm, | |
| model=st.session_state.synthesis_model, | |
| temperature=synthesis_api_temperature, | |
| top_p=synthesis_api_top_p | |
| ) | |
| # quick check after getting the answer | |
| if synthesis_answer.split()[0] == "NOT_ENOUGH_INFO": | |
| st.markdown("") | |
| st.markdown("#### Query", unsafe_allow_html=True) | |
| st.markdown(fix_latex(st.session_state.question)) | |
| st.markdown("#### Final Answer") | |
| st.write(":smiling_face_with_tear:") | |
| st.markdown(synthesis_answer.split('NOT_ENOUGH_INFO')[1]) | |
| st.divider() | |
| st.caption(get_disclaimer()) | |
| st.session_state.question_answered = False | |
| st.stop() | |
| else: | |
| st.session_state.answer = fix_latex(synthesis_answer) | |
| st.session_state.question_answered = True | |
| if st.session_state.question_answered: | |
| st.divider() | |
| st.markdown("#### Query", unsafe_allow_html=True) | |
| st.markdown(fix_latex(st.session_state.question)) | |
| # st.markdown(" ") | |
| st.markdown("#### Inference and Reasoning") | |
| # Expander for Initial Expert Answer | |
| if st.session_state.activate_expert and 'expert_answer' in st.session_state: | |
| with st.expander("Initial Expert Answer", expanded=False): | |
| st.info(f"This is the initial answer from the expert model ({st.session_state.expert_model}), used as a starting point for the final synthesis.", icon="π§βπ«") | |
| st.markdown(st.session_state.expert_answer) | |
| # Expander for Retrieved Context | |
| if 'yt_context_for_display' in st.session_state and st.session_state.yt_context_for_display: | |
| with st.expander("Retrieved Context", expanded=False): | |
| st.info("This is the raw context retrieved from the knowledge base to inform the final answer.", icon="π") | |
| if 'video_context_count' in st.session_state and st.session_state.video_context_count > 0: | |
| st.success(f"Found {st.session_state.video_context_count} relevant video transcript(s) containing retrieved content.", icon="πΊ") | |
| st.markdown(st.session_state.yt_context_for_display) | |
| if 'latex_context_count' in st.session_state and st.session_state.latex_context_count > 0: | |
| st.info(f"Additionally, {st.session_state.latex_context_count} relevant sections were found in the textbook: *The Finite Element Method: Linear Static and Dynamic Finite Element Analysis* by Thomas J. R. Hughes Β· 2012.", icon="π") | |
| # Expander for Model's Thinking Process | |
| if st.session_state.synthesis_model == "DeepSeek-R1-0528-Qwen3-8B" and 'thinking' in st.session_state and st.session_state.thinking: | |
| with st.expander(":blue[**Model's Thinking Process**]", expanded=False): | |
| st.info(f"This is the reasoning from the synthesis model ({st.session_state.synthesis_model}) used to synthesize the final answer.", icon="π€") | |
| st.markdown(st.session_state.thinking) | |
| # st.markdown("---") | |
| st.markdown("#### Final Answer") | |
| st.markdown(st.session_state.answer) | |
| st.markdown(" ") | |
| if top_k_YT > 0: | |
| st.markdown("#### Retrieved content in lecture videos") | |
| for i, (video_id, contexts) in enumerate(st.session_state.context_by_video.items(), start=1): | |
| with st.container(border=True): | |
| st.markdown(f"**Video {i} | {contexts[0]['title']}**") | |
| video_placeholder = st.empty() | |
| video_placeholder.markdown(get_youtube_embed(video_id, 0, 0), unsafe_allow_html=True) | |
| st.markdown('') | |
| with st.container(border=False): | |
| st.markdown("Retrieved Times") | |
| cols = st.columns([1 for i in range(len(contexts))] + [9 - len(contexts)]) | |
| for j, context_item in enumerate(contexts): | |
| start_time = int(context_item['start']) | |
| label = sec_to_time(start_time) | |
| if cols[j].button(label, key=f"{video_id}_{start_time}"): | |
| if st.session_state.playing_video_id is not None: | |
| st.session_state.playing_video_id = None | |
| video_placeholder.empty() | |
| video_placeholder.markdown(get_youtube_embed(video_id, start_time, 1), unsafe_allow_html=True) | |
| st.session_state.playing_video_id = video_id | |
| with st.expander("Video Summary", expanded=False): | |
| st.markdown(summary[video_id]) | |
| if show_textbook and top_k_Latex > 0: | |
| st.markdown("#### Retrieved content in textbook",help="The Finite Element Method: Linear Static and Dynamic Finite Element Analysis") | |
| for i, (section_id, contexts) in enumerate(st.session_state.context_by_section.items(), start=1): | |
| st.markdown(f"**Section {i} | {section_id}**") | |
| for context_item in contexts: | |
| st.markdown(context_item['text']) | |
| st.divider() | |
| st.markdown(" ") | |
| st.divider() | |
| st.caption(get_disclaimer()) | |