Spaces:
Running
Running
| import json # Added for TLDR JSON parsing | |
| import logging | |
| import os | |
| import tempfile | |
| from huggingface_hub import HfApi | |
| from huggingface_hub.inference._generated.types import \ | |
| ChatCompletionOutput # Added for type hinting | |
| # Imports from other project modules | |
| from llm_interface import (ERROR_503_DICT, parse_qwen_response, | |
| query_qwen_endpoint) | |
| from prompts import format_privacy_prompt, format_summary_highlights_prompt | |
| from utils import (PRIVACY_FILENAME, # Import constants for filenames | |
| SUMMARY_FILENAME, TLDR_FILENAME, check_report_exists, | |
| download_cached_reports, get_space_code_files) | |
| # Configure logging (can inherit from app.py if called from there, but good practice) | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| # Load environment variables - redundant if always called by app.py which already loads them | |
| # load_dotenv() | |
| # Constants needed by helper functions (can be passed as args too) | |
| # Consider passing these from app.py if they might change or for clarity | |
| CACHE_INFO_MSG = "\n\n*(Report retrieved from cache)*" | |
| TRUNCATION_WARNING = """**⚠️ Warning:** The input data (code and/or prior analysis) was too long for the AI model's context limit and had to be truncated. The analysis below may be incomplete or based on partial information.\n\n---\n\n""" | |
| # --- Constants for TLDR Generation --- | |
| TLDR_SYSTEM_PROMPT = ( | |
| "You are an AI assistant specialized in summarizing privacy analysis reports for Hugging Face Spaces. " | |
| "You will receive two reports: a detailed privacy analysis and a summary/highlights report. " | |
| "Based **only** on the content of these two reports, generate a concise JSON object containing a structured TLDR (Too Long; Didn't Read). " | |
| "Do not use any information not present in the provided reports. " | |
| "The JSON object must have the following keys:\n" | |
| '- "app_description": A 1-2 sentence summary of what the application does from a user\'s perspective.\n' | |
| '- "privacy_tldr": A 2-3 sentence high-level overview of privacy. Mention if the analysis was conclusive based on available code, if data processing is local, or if/what data goes to external services.\n' | |
| '- "data_types": A list of JSON objects, where each object has two keys: \'name\' (a short, unique identifier string for the data type, e.g., "User Text") and \'description\' (a brief string explaining the data type in context, max 6-8 words, e.g., "Text prompt entered by the user").\n' | |
| "- \"user_input_data\": A list of strings, where each string is the 'name' of a data type defined in 'data_types' that is provided by the user to the app.\n" | |
| "- \"local_processing\": A list of strings describing data processed locally. Each string should start with the 'name' of a data type defined in 'data_types', followed by details (like the processing model) in parentheses if mentioned in the reports. Example: \"User Text (Local Model XYZ)\".\n" | |
| "- \"remote_processing\": A list of strings describing data sent to remote services. Each string should start with the 'name' of a data type defined in 'data_types', followed by the service/model name in parentheses if mentioned in the reports. Example: \"User Text (HF Inference API)\".\n" | |
| "- \"external_logging\": A list of strings describing data logged or saved externally. Each string should start with the 'name' of a data type defined in 'data_types', followed by the location/service in parentheses if mentioned. Example: \"User Text (External DB)\".\n" | |
| "Ensure the output is **only** a valid JSON object, starting with `{` and ending with `}`. Ensure all listed data types in the processing/logging lists exactly match a 'name' defined in the 'data_types' list." | |
| ) | |
| # --- Analysis Pipeline Helper Functions --- | |
| def check_cache_and_download(space_id: str, dataset_id: str, hf_token: str | None): | |
| """Checks cache and downloads if reports exist.""" | |
| logging.info(f"Checking cache for '{space_id}'...") | |
| found_in_cache = False | |
| if hf_token: | |
| try: | |
| found_in_cache = check_report_exists(space_id, dataset_id, hf_token) | |
| except Exception as e: | |
| logging.warning(f"Cache check failed for {space_id}: {e}. Proceeding.") | |
| # Return cache_miss even if check failed, proceed to live analysis | |
| return {"status": "cache_miss", "error_message": f"Cache check failed: {e}"} | |
| if found_in_cache: | |
| logging.info(f"Cache hit for {space_id}. Downloading.") | |
| try: | |
| cached_reports = download_cached_reports(space_id, dataset_id, hf_token) | |
| summary_report = ( | |
| cached_reports.get("summary", "Error: Cached summary not found.") | |
| + CACHE_INFO_MSG | |
| ) | |
| privacy_report = ( | |
| cached_reports.get("privacy", "Error: Cached privacy report not found.") | |
| + CACHE_INFO_MSG | |
| ) | |
| logging.info(f"Successfully downloaded cached reports for {space_id}.") | |
| return { | |
| "status": "cache_hit", | |
| "summary": summary_report, | |
| "privacy": privacy_report, | |
| "tldr_json_str": cached_reports.get("tldr_json_str"), | |
| } | |
| except Exception as e: | |
| error_msg = f"Cache download failed for {space_id}: {e}" | |
| logging.warning(f"{error_msg}. Proceeding with live analysis.") | |
| # Return error, but let caller decide if live analysis proceeds | |
| return {"status": "cache_error", "ui_message": error_msg} | |
| else: | |
| logging.info(f"Cache miss for {space_id}. Performing live analysis.") | |
| return {"status": "cache_miss"} | |
| def check_endpoint_status( | |
| endpoint_name: str, hf_token: str | None, error_503_user_message: str | |
| ): | |
| """Checks the status of the inference endpoint.""" | |
| logging.info(f"Checking endpoint status for '{endpoint_name}'...") | |
| if not hf_token: | |
| # Allow proceeding if token missing, maybe endpoint is public | |
| logging.warning("HF_TOKEN not set, cannot check endpoint status definitively.") | |
| return {"status": "ready", "warning": "HF_TOKEN not set"} | |
| try: | |
| api = HfApi(token=hf_token) | |
| endpoint = api.get_inference_endpoint(name=endpoint_name) | |
| status = endpoint.status | |
| logging.info(f"Endpoint '{endpoint_name}' status: {status}") | |
| if status == "running": | |
| return {"status": "ready"} | |
| else: | |
| logging.warning( | |
| f"Endpoint '{endpoint_name}' is not ready (Status: {status})." | |
| ) | |
| if status == "scaledToZero": | |
| logging.info( | |
| f"Endpoint '{endpoint_name}' is scaled to zero. Attempting to resume..." | |
| ) | |
| try: | |
| endpoint.resume() | |
| # Still return an error message suggesting retry, as resume takes time | |
| # Keep this message concise as the action is specific (wait) | |
| msg = f"**Endpoint Resuming:** The analysis endpoint ('{endpoint_name}') was scaled to zero and is now restarting.\n\n{error_503_user_message}" | |
| return {"status": "error", "ui_message": msg} | |
| except Exception as resume_error: | |
| # Resume failed, provide detailed message | |
| logging.error( | |
| f"Failed to resume endpoint {endpoint_name}: {resume_error}" | |
| ) | |
| # Construct detailed message including full explanation | |
| msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') is currently {status} and an attempt to resume it failed ({resume_error}).\n\n{error_503_user_message}" | |
| return {"status": "error", "ui_message": msg} | |
| else: # Paused, failed, pending etc. | |
| # Construct detailed message including full explanation | |
| msg = f"**Endpoint Issue:** The analysis endpoint ('{endpoint_name}') status is currently <span style='color:red'>**{status}**</span>.\n\n{error_503_user_message}" | |
| return {"status": "error", "ui_message": msg} | |
| except Exception as e: | |
| error_msg = f"Error checking analysis endpoint status for {endpoint_name}: {e}" | |
| logging.error(error_msg) | |
| # Let analysis stop if endpoint check fails critically | |
| return {"status": "error", "ui_message": f"Error checking endpoint status: {e}"} | |
| def fetch_and_validate_code(space_id: str): | |
| """Fetches and validates code files for the space.""" | |
| logging.info(f"Fetching code files for {space_id}...") | |
| code_files = get_space_code_files(space_id) | |
| if not code_files: | |
| error_msg = f"Could not retrieve code files for '{space_id}'. Check ID and ensure it's a public Space." | |
| logging.warning(error_msg) | |
| return { | |
| "status": "error", | |
| "ui_message": f"**Error:**\n{error_msg}\nAnalysis Canceled.", | |
| } | |
| logging.info(f"Successfully fetched {len(code_files)} files for {space_id}.") | |
| return {"status": "success", "code_files": code_files} | |
| def generate_detailed_report( | |
| space_id: str, code_files: dict, error_503_user_message: str | |
| ): | |
| """Generates the detailed privacy report using the LLM.""" | |
| logging.info("Generating detailed privacy analysis report...") | |
| privacy_prompt_messages, privacy_truncated = format_privacy_prompt( | |
| space_id, code_files | |
| ) | |
| privacy_api_response = query_qwen_endpoint(privacy_prompt_messages, max_tokens=3072) | |
| if privacy_api_response == ERROR_503_DICT: | |
| logging.warning("LLM Call 1 (Privacy) failed with 503.") | |
| return {"status": "error", "ui_message": error_503_user_message} | |
| detailed_privacy_report = parse_qwen_response(privacy_api_response) | |
| if "Error:" in detailed_privacy_report: | |
| error_msg = ( | |
| f"Failed to generate detailed privacy report: {detailed_privacy_report}" | |
| ) | |
| logging.error(error_msg) | |
| return { | |
| "status": "error", | |
| "ui_message": f"**Error Generating Detailed Privacy Report:**\n{detailed_privacy_report}\nAnalysis Halted.", | |
| } | |
| if privacy_truncated: | |
| detailed_privacy_report = TRUNCATION_WARNING + detailed_privacy_report | |
| logging.info("Successfully generated detailed privacy report.") | |
| return { | |
| "status": "success", | |
| "report": detailed_privacy_report, | |
| "truncated": privacy_truncated, | |
| } | |
| def generate_summary_report( | |
| space_id: str, | |
| code_files: dict, | |
| detailed_privacy_report: str, | |
| error_503_user_message: str, | |
| ): | |
| """Generates the summary & highlights report using the LLM.""" | |
| logging.info("Generating summary and highlights report...") | |
| # Remove potential truncation warning from detailed report before sending to next LLM | |
| clean_detailed_report = detailed_privacy_report.replace(TRUNCATION_WARNING, "") | |
| summary_highlights_prompt_messages, summary_truncated = ( | |
| format_summary_highlights_prompt(space_id, code_files, clean_detailed_report) | |
| ) | |
| summary_highlights_api_response = query_qwen_endpoint( | |
| summary_highlights_prompt_messages, max_tokens=2048 | |
| ) | |
| if summary_highlights_api_response == ERROR_503_DICT: | |
| logging.warning("LLM Call 2 (Summary) failed with 503.") | |
| # Return specific status to indicate partial success | |
| return {"status": "error_503_summary", "ui_message": error_503_user_message} | |
| summary_highlights_report = parse_qwen_response(summary_highlights_api_response) | |
| if "Error:" in summary_highlights_report: | |
| error_msg = ( | |
| f"Failed to generate summary/highlights report: {summary_highlights_report}" | |
| ) | |
| logging.error(error_msg) | |
| # Return specific status to indicate partial success | |
| return { | |
| "status": "error_summary", | |
| "ui_message": f"**Error Generating Summary/Highlights:**\n{summary_highlights_report}", | |
| } | |
| if summary_truncated: | |
| summary_highlights_report = TRUNCATION_WARNING + summary_highlights_report | |
| logging.info("Successfully generated summary & highlights report.") | |
| return { | |
| "status": "success", | |
| "report": summary_highlights_report, | |
| "truncated": summary_truncated, | |
| } | |
| def upload_results( | |
| space_id: str, | |
| summary_report: str, | |
| detailed_report: str, | |
| dataset_id: str, | |
| hf_token: str | None, | |
| tldr_json_data: dict | None = None, | |
| ): | |
| """Uploads the generated reports (Markdown and optional JSON TLDR) to the specified dataset repository.""" | |
| if not hf_token: | |
| logging.warning("HF Token not provided, skipping dataset report upload.") | |
| return {"status": "skipped", "reason": "HF_TOKEN not set"} | |
| if "Error:" in detailed_report or "Error:" in summary_report: | |
| msg = "Skipping cache upload due to errors in generated reports." | |
| logging.warning(msg) | |
| return {"status": "skipped", "reason": msg} | |
| safe_space_id = space_id.replace("..", "") | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| # Define local paths | |
| summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME) | |
| privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME) | |
| tldr_json_path_local = os.path.join(tmpdir, TLDR_FILENAME) | |
| # Write Markdown reports | |
| with open(summary_path_local, "w", encoding="utf-8") as f: | |
| f.write(summary_report) | |
| with open(privacy_path_local, "w", encoding="utf-8") as f: | |
| f.write(detailed_report) | |
| # Prepare commit message | |
| commit_message = f"Add analysis reports for Space: {safe_space_id}" | |
| if tldr_json_data: | |
| commit_message += " (including TLDR JSON)" | |
| print(f"Successfully wrote TLDR JSON locally for {safe_space_id}.") | |
| # Write JSON TLDR data if available | |
| try: | |
| with open(tldr_json_path_local, "w", encoding="utf-8") as f: | |
| json.dump(tldr_json_data, f, indent=2, ensure_ascii=False) | |
| logging.info( | |
| f"Successfully wrote TLDR JSON locally for {safe_space_id}." | |
| ) | |
| except Exception as json_err: | |
| logging.error( | |
| f"Failed to write TLDR JSON locally for {safe_space_id}: {json_err}" | |
| ) | |
| tldr_json_data = None # Prevent upload attempt if writing failed | |
| # Ensure repo exists | |
| api = HfApi(token=hf_token) | |
| repo_url = api.create_repo( | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| ) | |
| logging.info(f"Ensured dataset repo {repo_url} exists.") | |
| # Upload summary report | |
| api.upload_file( | |
| path_or_fileobj=summary_path_local, | |
| path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}", | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| commit_message=commit_message, | |
| ) | |
| logging.info(f"Successfully uploaded summary report for {safe_space_id}.") | |
| # Upload privacy report | |
| api.upload_file( | |
| path_or_fileobj=privacy_path_local, | |
| path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}", | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| commit_message=commit_message, | |
| ) | |
| logging.info( | |
| f"Successfully uploaded detailed privacy report for {safe_space_id}." | |
| ) | |
| # print(f"Successfully uploaded detailed privacy report for {safe_space_id}.") # Keep if needed for debug | |
| # Upload JSON TLDR if it was successfully written locally | |
| if tldr_json_data and os.path.exists(tldr_json_path_local): | |
| api.upload_file( | |
| path_or_fileobj=tldr_json_path_local, | |
| path_in_repo=f"{safe_space_id}/{TLDR_FILENAME}", | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| commit_message=commit_message, # Can reuse commit message or make specific | |
| ) | |
| logging.info(f"Successfully uploaded TLDR JSON for {safe_space_id}.") | |
| print(f"Successfully uploaded TLDR JSON for {safe_space_id}.") | |
| # Return success if all uploads finished without error | |
| return {"status": "success"} | |
| except Exception as e: | |
| error_msg = f"Non-critical error during report upload for {safe_space_id}: {e}" | |
| logging.error(error_msg) | |
| print(error_msg) | |
| return {"status": "error", "message": error_msg} | |
| # --- New TLDR Generation Functions --- | |
| def format_tldr_prompt( | |
| detailed_report: str, summary_report: str | |
| ) -> list[dict[str, str]]: | |
| """Formats the prompt for the TLDR generation task.""" | |
| # Clean potential cache/truncation markers from input reports for the LLM | |
| cleaned_detailed = detailed_report.replace(CACHE_INFO_MSG, "").replace( | |
| TRUNCATION_WARNING, "" | |
| ) | |
| cleaned_summary = summary_report.replace(CACHE_INFO_MSG, "").replace( | |
| TRUNCATION_WARNING, "" | |
| ) | |
| user_content = ( | |
| "Please generate a structured JSON TLDR based on the following reports:\n\n" | |
| "--- DETAILED PRIVACY ANALYSIS REPORT START ---\n" | |
| f"{cleaned_detailed}\n" | |
| "--- DETAILED PRIVACY ANALYSIS REPORT END ---\n\n" | |
| "--- SUMMARY & HIGHLIGHTS REPORT START ---\n" | |
| f"{cleaned_summary}\n" | |
| "--- SUMMARY & HIGHLIGHTS REPORT END ---" | |
| ) | |
| # Note: We are not handling truncation here, assuming the input reports | |
| # are already reasonably sized from the previous steps. | |
| # If reports could be extremely long, add truncation logic similar to other format_* functions. | |
| messages = [ | |
| {"role": "system", "content": TLDR_SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_content}, | |
| ] | |
| return messages | |
| def parse_tldr_json_response( | |
| response: ChatCompletionOutput | dict | None, | |
| ) -> dict | None: | |
| """Parses the LLM response, expecting JSON content for the TLDR.""" | |
| if response is None: | |
| logging.error("TLDR Generation: Failed to get response from LLM.") | |
| return None | |
| # Check for 503 error dict first | |
| if isinstance(response, dict) and response.get("error_type") == "503": | |
| logging.error(f"TLDR Generation: Received 503 error: {response.get('message')}") | |
| return None # Treat 503 as failure for this specific task | |
| # --- Direct Content Extraction (Replaces call to parse_qwen_response) --- | |
| raw_content = "" | |
| try: | |
| # Check if it's likely the expected ChatCompletionOutput structure | |
| if not hasattr(response, "choices"): | |
| logging.error( | |
| f"TLDR Generation: Unexpected response type received: {type(response)}. Content: {response}" | |
| ) | |
| return None # Return None if not the expected structure | |
| # Access the generated content according to the ChatCompletionOutput structure | |
| if response.choices and len(response.choices) > 0: | |
| content = response.choices[0].message.content | |
| if content: | |
| raw_content = content.strip() | |
| logging.info( | |
| "TLDR Generation: Successfully extracted raw content from response." | |
| ) | |
| else: | |
| logging.warning( | |
| "TLDR Generation: Response received, but content is empty." | |
| ) | |
| return None | |
| else: | |
| logging.warning("TLDR Generation: Response received, but no choices found.") | |
| return None | |
| except AttributeError as e: | |
| # This might catch cases where response looks like the object but lacks expected attributes | |
| logging.error( | |
| f"TLDR Generation: Attribute error parsing response object: {e}. Response structure might be unexpected. Response: {response}" | |
| ) | |
| return None | |
| except Exception as e: | |
| logging.error( | |
| f"TLDR Generation: Unexpected error extracting content from response object: {e}" | |
| ) | |
| return None | |
| # --- End Direct Content Extraction --- | |
| # --- JSON Parsing Logic --- | |
| if not raw_content: # Should be caught by checks above, but belts and suspenders | |
| logging.error("TLDR Generation: Raw content is empty after extraction attempt.") | |
| return None | |
| try: | |
| # Clean potential markdown code block formatting | |
| if raw_content.strip().startswith("```json"): | |
| raw_content = raw_content.strip()[7:-3].strip() | |
| elif raw_content.strip().startswith("```"): | |
| raw_content = raw_content.strip()[3:-3].strip() | |
| tldr_data = json.loads(raw_content) | |
| # Validate structure: Check if it's a dict and has all required keys | |
| required_keys = [ | |
| "app_description", | |
| "privacy_tldr", | |
| "data_types", | |
| "user_input_data", | |
| "local_processing", | |
| "remote_processing", | |
| "external_logging", | |
| ] | |
| if not isinstance(tldr_data, dict): | |
| logging.error( | |
| f"TLDR Generation: Parsed content is not a dictionary. Content: {raw_content[:500]}..." | |
| ) | |
| return None | |
| if not all(key in tldr_data for key in required_keys): | |
| missing_keys = [key for key in required_keys if key not in tldr_data] | |
| logging.error( | |
| f"TLDR Generation: Parsed JSON is missing required keys: {missing_keys}. Content: {raw_content[:500]}..." | |
| ) | |
| return None | |
| # --- Add validation for the new data_types structure --- | |
| data_types_list = tldr_data.get("data_types") | |
| if not isinstance(data_types_list, list): | |
| logging.error( | |
| f"TLDR Generation: 'data_types' is not a list. Content: {data_types_list}" | |
| ) | |
| return None | |
| for item in data_types_list: | |
| if ( | |
| not isinstance(item, dict) | |
| or "name" not in item | |
| or "description" not in item | |
| ): | |
| logging.error( | |
| f"TLDR Generation: Invalid item found in 'data_types' list: {item}. Must be dict with 'name' and 'description'." | |
| ) | |
| return None | |
| if not isinstance(item["name"], str) or not isinstance( | |
| item["description"], str | |
| ): | |
| logging.error( | |
| f"TLDR Generation: Invalid types for name/description in 'data_types' item: {item}. Must be strings." | |
| ) | |
| return None | |
| # --- End validation for data_types --- | |
| # Basic validation for other lists (should contain strings) | |
| validation_passed = True | |
| for key in [ | |
| "user_input_data", | |
| "local_processing", | |
| "remote_processing", | |
| "external_logging", | |
| ]: | |
| data_list = tldr_data.get(key) | |
| # Add more detailed check and logging | |
| if not isinstance(data_list, list): | |
| logging.error( | |
| f"TLDR Generation Validation Error: Key '{key}' is not a list. Found type: {type(data_list)}, Value: {data_list}" | |
| ) | |
| validation_passed = False | |
| # Allow continuing validation for other keys, but mark as failed | |
| elif not all(isinstance(x, str) for x in data_list): | |
| # This check might be too strict if LLM includes non-strings, but keep for now | |
| logging.warning( | |
| f"TLDR Generation Validation Warning: Not all items in list '{key}' are strings. Content: {data_list}" | |
| ) | |
| # Decide if this should cause failure - currently it doesn't, just warns | |
| if not validation_passed: | |
| logging.error( | |
| "TLDR Generation: Validation failed due to incorrect list types." | |
| ) | |
| return None # Ensure failure if any key wasn't a list | |
| logging.info("Successfully parsed and validated TLDR JSON response.") | |
| return tldr_data | |
| except json.JSONDecodeError as e: | |
| logging.error( | |
| f"TLDR Generation: Failed to decode JSON response: {e}. Content: {raw_content[:500]}..." | |
| ) | |
| return None | |
| except Exception as e: | |
| logging.error(f"TLDR Generation: Unexpected error parsing JSON response: {e}") | |
| return None | |
| def render_tldr_markdown(tldr_data: dict | None, space_id: str | None = None) -> str: | |
| """Renders the top-level TLDR (description, privacy) data into a Markdown string. | |
| (Does not include the data lists) | |
| """ | |
| if not tldr_data: | |
| # Return a more specific message for this part | |
| return "*TLDR Summary could not be generated.*\n" | |
| output = [] | |
| # Add Space link if space_id is provided | |
| if space_id: | |
| output.append( | |
| f"**Source Space:** [`{space_id}`](https://huggingface.co/spaces/{space_id})\n" | |
| ) | |
| output.append(f"**App Description:** {tldr_data.get('app_description', 'N/A')}\n") | |
| privacy_summary = tldr_data.get("privacy_tldr", "N/A") | |
| output.append(f"**Privacy TLDR:** {privacy_summary}") # Removed extra newline | |
| # Removed data list rendering from this function | |
| return "\n".join(output) | |
| def render_data_details_markdown(tldr_data: dict | None) -> str: | |
| """Renders the data lists (types, input, processing, logging) from TLDR data.""" | |
| if not tldr_data: | |
| return "*Data details could not be generated.*\n" | |
| output = [] | |
| # Get defined names for formatting | |
| defined_names = sorted( | |
| [ | |
| dt.get("name", "") | |
| for dt in tldr_data.get("data_types", []) | |
| if dt.get("name") | |
| ], | |
| key=len, | |
| reverse=True, | |
| ) | |
| output.append("**Data Types Defined:**") # Renamed slightly for clarity | |
| data_types = tldr_data.get("data_types") | |
| if data_types and isinstance(data_types, list): | |
| if not data_types: | |
| output.append("- None identified.") | |
| else: | |
| for item in data_types: | |
| name = item.get("name", "Unnamed") | |
| desc = item.get("description", "No description") | |
| output.append(f"- `{name}`: {desc}") | |
| else: | |
| output.append("- (Error loading data types)") | |
| output.append("") # Add newline for spacing | |
| # Reusable helper for rendering lists | |
| def render_list(title, key): | |
| output.append(f"**{title}:**") | |
| data_list = tldr_data.get(key) | |
| if isinstance(data_list, list): | |
| if not data_list: | |
| output.append("- None identified.") | |
| else: | |
| for item_str in data_list: | |
| formatted_item = item_str # Default | |
| found_match = False | |
| for name in defined_names: | |
| if item_str == name: | |
| formatted_item = f"`{name}`" | |
| found_match = True | |
| break | |
| elif item_str.startswith(name + " "): | |
| formatted_item = f"`{name}`{item_str[len(name):]}" | |
| found_match = True | |
| break | |
| if ( | |
| not found_match | |
| and " " not in item_str | |
| and not item_str.startswith("`") | |
| ): | |
| formatted_item = f"`{item_str}`" | |
| output.append(f"- {formatted_item}") | |
| else: | |
| output.append("- (Error loading list)") | |
| output.append("") | |
| render_list("Data Sent by User to App", "user_input_data") | |
| render_list("Data Processed Locally within App", "local_processing") | |
| render_list("Data Processed Remotely", "remote_processing") | |
| render_list("Data Logged/Saved Externally", "external_logging") | |
| # Remove the last empty line | |
| if output and output[-1] == "": | |
| output.pop() | |
| return "\n".join(output) | |
| # --- Combined TLDR Generation Function --- | |
| def generate_and_parse_tldr(detailed_report: str, summary_report: str) -> dict | None: | |
| """Formats prompt, queries LLM, and parses JSON response for TLDR. | |
| Args: | |
| detailed_report: The detailed privacy report content. | |
| summary_report: The summary & highlights report content. | |
| Returns: | |
| A dictionary with the parsed TLDR data, or None if any step fails. | |
| """ | |
| logging.info("Starting TLDR generation and parsing...") | |
| try: | |
| # Format | |
| tldr_prompt_messages = format_tldr_prompt(detailed_report, summary_report) | |
| if not tldr_prompt_messages: | |
| logging.error("TLDR Generation: Failed to format prompt.") | |
| return None | |
| # Query (using existing import within analysis_utils) | |
| # Use slightly smaller max_tokens | |
| llm_response = query_qwen_endpoint(tldr_prompt_messages, max_tokens=1024) | |
| if llm_response is None: # Check if query itself failed critically | |
| logging.error("TLDR Generation: LLM query returned None.") | |
| return None | |
| # 503 handled within parse function below | |
| # Parse | |
| parsed_data = parse_tldr_json_response(llm_response) | |
| if parsed_data: | |
| logging.info("Successfully generated and parsed TLDR.") | |
| return parsed_data | |
| else: | |
| logging.error("TLDR Generation: Failed to parse JSON response.") | |
| return None | |
| except Exception as e: | |
| logging.error( | |
| f"TLDR Generation: Unexpected error in generate_and_parse_tldr: {e}", | |
| exc_info=True, | |
| ) | |
| return None | |