Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import psutil | |
| import time | |
| import subprocess | |
| import torch # Add torch import for CUDA detection | |
| import threading | |
| import queue | |
| from typing import Iterator, Any, Optional, Generator, Dict | |
| from datetime import datetime | |
| from flask import Response | |
| from openai import OpenAI | |
| from lpm_kernel.api.domains.kernel2.dto.server_dto import ServerStatus, ProcessInfo | |
| from lpm_kernel.configs.config import Config | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class LocalLLMService: | |
| """Service for managing local LLM client and server""" | |
| def __init__(self): | |
| self._client = None | |
| self._stopping_server = False | |
| def client(self) -> OpenAI: | |
| config = Config.from_env() | |
| """Get the OpenAI client for local LLM server""" | |
| if self._client is None: | |
| base_url = config.get("LOCAL_LLM_SERVICE_URL") | |
| if not base_url: | |
| raise ValueError("LOCAL_LLM_SERVICE_URL environment variable is not set") | |
| self._client = OpenAI( | |
| base_url=base_url, | |
| api_key="sk-no-key-required" | |
| ) | |
| return self._client | |
| def start_server(self, model_path: str, use_gpu: bool = True) -> bool: | |
| """ | |
| Start the llama-server service with GPU acceleration when available | |
| Args: | |
| model_path: Path to the GGUF model file | |
| use_gpu: Whether to use GPU acceleration if available | |
| Returns: | |
| bool: True if server started successfully, False otherwise | |
| """ | |
| try: | |
| # Check if server is already running | |
| status = self.get_server_status() | |
| if status.is_running: | |
| logger.info("LLama server is already running") | |
| return True | |
| # Check for CUDA availability if GPU was requested | |
| cuda_available = torch.cuda.is_available() if use_gpu else False | |
| cuda_available = False | |
| gpu_info = "" | |
| if use_gpu and cuda_available: | |
| gpu_device = torch.cuda.current_device() | |
| gpu_info = f" using GPU: {torch.cuda.get_device_name(gpu_device)}" | |
| gpu_memory = torch.cuda.get_device_properties(gpu_device).total_memory / (1024**3) | |
| logger.info(f"CUDA is available. Using GPU acceleration{gpu_info}") | |
| logger.info(f"CUDA device capabilities: {torch.cuda.get_device_capability(gpu_device)}") | |
| logger.info(f"CUDA memory: {gpu_memory:.2f} GB") | |
| # Pre-initialize CUDA to speed up first inference | |
| logger.info("Pre-initializing CUDA context to speed up first inference") | |
| torch.cuda.init() | |
| torch.cuda.empty_cache() | |
| elif use_gpu and not cuda_available: | |
| logger.warning("CUDA was requested but is not available. Using CPU instead.") | |
| else: | |
| logger.info("Using CPU for inference (GPU not requested)") | |
| # Check for GPU optimization marker | |
| gpu_optimized = False | |
| model_dir = os.path.dirname(model_path) | |
| gpu_marker_path = os.path.join(model_dir, "gpu_optimized.json") | |
| if os.path.exists(gpu_marker_path): | |
| try: | |
| with open(gpu_marker_path, 'r') as f: | |
| gpu_data = json.load(f) | |
| if gpu_data.get("gpu_optimized", False): | |
| gpu_optimized = True | |
| logger.info(f"Found GPU optimization marker created on {gpu_data.get('optimized_on', 'unknown date')}") | |
| except Exception as e: | |
| logger.warning(f"Error reading GPU marker file: {e}") | |
| # Get the correct path to the llama-server executable | |
| base_dir = os.getcwd() | |
| server_path = os.path.join(base_dir, "llama.cpp", "build", "bin", "llama-server") | |
| # For Windows, add .exe extension if needed | |
| if os.name == 'nt' and not server_path.endswith('.exe'): | |
| server_path += '.exe' | |
| # Verify executable exists | |
| if not os.path.exists(server_path): | |
| logger.error(f"llama-server executable not found at: {server_path}") | |
| return False | |
| # Start server with optimal parameters for faster startup | |
| cmd = [ | |
| server_path, | |
| "-m", model_path, | |
| "--host", "0.0.0.0", | |
| "--port", "8080", | |
| "--ctx-size", "2048", # Default context size (adjust based on needs) | |
| "--parallel", "2", # Enable request parallelism | |
| "--cont-batching" # Enable continuous batching | |
| ] | |
| # Set up environment with CUDA variables to ensure GPU detection | |
| env = os.environ.copy() | |
| env["CUDA_VISIBLE_DEVICES"] = "" | |
| # Add GPU-related parameters if CUDA is available | |
| if cuda_available and use_gpu: | |
| # Force GPU usage with optimal parameters for faster loads | |
| cmd.extend([ | |
| "--n-gpu-layers", "999", # Use all layers on GPU | |
| "--tensor-split", "0", # Use the first GPU for all operations | |
| "--main-gpu", "0", # Use GPU 0 as the primary device | |
| "--mlock" # Lock memory to prevent swapping during inference | |
| ]) | |
| # Set CUDA environment variables to help with GPU detection | |
| env["CUDA_VISIBLE_DEVICES"] = "0" # Force using first GPU | |
| # Ensure comprehensive library paths for CUDA | |
| cuda_lib_paths = [ | |
| "/usr/local/cuda/lib64", | |
| "/usr/lib/cuda/lib64", | |
| "/usr/local/lib", | |
| "/usr/lib/x86_64-linux-gnu", | |
| "/usr/lib/wsl/lib" # For Windows WSL environments | |
| ] | |
| # Build a comprehensive LD_LIBRARY_PATH | |
| current_ld_path = env.get("LD_LIBRARY_PATH", "") | |
| for path in cuda_lib_paths: | |
| if os.path.exists(path) and path not in current_ld_path: | |
| current_ld_path = f"{path}:{current_ld_path}" if current_ld_path else path | |
| env["LD_LIBRARY_PATH"] = current_ld_path | |
| logger.info(f"Setting LD_LIBRARY_PATH to: {current_ld_path}") | |
| # If this is Windows, use different approach for CUDA libraries | |
| if os.name == 'nt': | |
| # Windows typically has CUDA in PATH already if installed | |
| logger.info("Windows system detected, using system CUDA libraries") | |
| else: | |
| # On Linux, try to find CUDA libraries in common locations | |
| for cuda_path in [ | |
| # Common CUDA paths | |
| "/usr/local/cuda/lib64", | |
| "/usr/lib/cuda/lib64", | |
| "/usr/local/lib/python3.12/site-packages/nvidia/cuda_runtime/lib", | |
| "/usr/local/lib/python3.10/site-packages/nvidia/cuda_runtime/lib", | |
| ]: | |
| if os.path.exists(cuda_path): | |
| # Add CUDA path to library path | |
| env["LD_LIBRARY_PATH"] = f"{cuda_path}:{env.get('LD_LIBRARY_PATH', '')}" | |
| env["CUDA_HOME"] = os.path.dirname(cuda_path) | |
| logger.info(f"Found CUDA at {cuda_path}, setting environment variables") | |
| break | |
| # NOTE: CUDA support and rebuild should be handled at build/setup time (e.g., Docker build or setup script). | |
| # The runtime check and rebuild logic has been removed for efficiency and reliability. | |
| # Ensure llama.cpp is built with CUDA support before running the server if GPU is required. | |
| # Pre-heat GPU to ensure faster initial response | |
| if torch.cuda.is_available(): | |
| logger.info("Pre-warming GPU to reduce initial latency...") | |
| dummy_tensor = torch.zeros(1, 1).cuda() | |
| del dummy_tensor | |
| torch.cuda.synchronize() | |
| torch.cuda.empty_cache() | |
| logger.info("GPU warm-up complete") | |
| logger.info("Using GPU acceleration for inference with optimized settings") | |
| else: | |
| # If GPU isn't available or supported, optimize for CPU | |
| cmd.extend([ | |
| "--threads", str(max(1, os.cpu_count() - 1)), # Use all CPU cores except one | |
| ]) | |
| logger.info(f"Using CPU-only mode with {max(1, os.cpu_count() - 1)} threads") | |
| logger.info(f"Starting llama-server with command: {' '.join(cmd)}") | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| universal_newlines=True, | |
| env=env | |
| ) | |
| # Wait for server to start (longer wait for GPU initialization) | |
| wait_time = 5 if cuda_available and use_gpu else 3 | |
| logger.info(f"Waiting {wait_time} seconds for server to start...") | |
| time.sleep(wait_time) | |
| # Check if process is still running | |
| if process.poll() is None: | |
| # Log initialization success | |
| if cuda_available and use_gpu: | |
| logger.info(f"✅ LLama server started successfully with GPU acceleration{gpu_info}") | |
| else: | |
| logger.info("✅ LLama server started successfully in CPU-only mode") | |
| return True | |
| else: | |
| stdout, stderr = process.communicate() | |
| logger.error(f"Failed to start llama-server: {stderr}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error starting llama-server: {str(e)}") | |
| return False | |
| def stop_server(self) -> ServerStatus: | |
| """ | |
| Stop the llama-server service. | |
| Find and forcibly terminate all llama-server processes | |
| Returns: | |
| ServerStatus: Service status object containing information about whether processes are still running | |
| """ | |
| try: | |
| if self._stopping_server: | |
| logger.info("Server is already in the process of stopping") | |
| return self.get_server_status() | |
| self._stopping_server = True | |
| try: | |
| # Find all possible llama-server processes and forcibly terminate them | |
| terminated_pids = [] | |
| for proc in psutil.process_iter(["pid", "name", "cmdline"]): | |
| try: | |
| cmdline = proc.cmdline() | |
| if any("llama-server" in cmd for cmd in cmdline): | |
| pid = proc.pid | |
| logger.info(f"Force terminating llama-server process, PID: {pid}") | |
| # Directly use kill signal to forcibly terminate | |
| proc.kill() | |
| # Ensure the process has been terminated | |
| try: | |
| proc.wait(timeout=0.2) # Slightly increase wait time to ensure process termination | |
| terminated_pids.append(pid) | |
| logger.info(f"Successfully terminated llama-server process {pid}") | |
| except psutil.TimeoutExpired: | |
| # If timeout, try to terminate again | |
| logger.warning(f"Process {pid} still running, sending SIGKILL again") | |
| try: | |
| import os | |
| import signal | |
| os.kill(pid, signal.SIGKILL) # Use system-level SIGKILL signal | |
| terminated_pids.append(pid) | |
| logger.info(f"Successfully force killed llama-server process {pid} with SIGKILL") | |
| except ProcessLookupError: | |
| # Process no longer exists | |
| terminated_pids.append(pid) | |
| logger.info(f"Process {pid} no longer exists after kill attempt") | |
| except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): | |
| continue | |
| if terminated_pids: | |
| logger.info(f"Terminated llama-server processes: {terminated_pids}") | |
| else: | |
| logger.info("No running llama-server process found") | |
| # Check again if any llama-server processes are still running | |
| return self.get_server_status() | |
| finally: | |
| self._stopping_server = False | |
| except Exception as e: | |
| logger.error(f"Error stopping llama-server: {str(e)}") | |
| self._stopping_server = False | |
| return ServerStatus.not_running() | |
| def get_server_status(self) -> ServerStatus: | |
| """ | |
| Get the current status of llama-server | |
| Returns: ServerStatus object | |
| """ | |
| try: | |
| base_dir = os.getcwd() | |
| server_path = os.path.join(base_dir, "llama.cpp", "build", "bin", "llama-server") | |
| server_exec_name = os.path.basename(server_path) | |
| for proc in psutil.process_iter(["pid", "name", "cmdline"]): | |
| try: | |
| cmdline = proc.cmdline() | |
| # Check both for the executable name and the full path | |
| if any(server_exec_name in cmd for cmd in cmdline) or any("llama-server" in cmd for cmd in cmdline): | |
| with proc.oneshot(): | |
| process_info = ProcessInfo( | |
| pid=proc.pid, | |
| cpu_percent=proc.cpu_percent(), | |
| memory_percent=proc.memory_percent(), | |
| create_time=proc.create_time(), | |
| cmdline=cmdline, | |
| ) | |
| return ServerStatus.running(process_info) | |
| except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): | |
| continue | |
| return ServerStatus.not_running() | |
| except Exception as e: | |
| logger.error(f"Error checking llama-server status: {str(e)}") | |
| return ServerStatus.not_running() | |
| def _parse_response_chunk(self, chunk): | |
| """Parse different response chunk formats into a standardized format.""" | |
| try: | |
| if chunk is None: | |
| logger.warning("Received None chunk") | |
| return None | |
| # logger.info(f"Parsing response chunk: {chunk}") | |
| # Handle custom format | |
| if isinstance(chunk, dict) and "type" in chunk and chunk["type"] == "chat_response": | |
| logger.info(f"Processing custom format response: {chunk}") | |
| return { | |
| "id": str(uuid.uuid4()), # Generate a unique ID | |
| "object": "chat.completion.chunk", | |
| "created": int(datetime.now().timestamp()), | |
| "model": "models/lpm", | |
| "system_fingerprint": None, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": { | |
| "content": chunk.get("content", "") | |
| }, | |
| "finish_reason": "stop" if chunk.get("done", False) else None | |
| } | |
| ] | |
| } | |
| # Handle OpenAI format | |
| if not hasattr(chunk, 'choices'): | |
| logger.warning(f"Chunk has no choices attribute: {chunk}") | |
| return None | |
| choices = getattr(chunk, 'choices', []) | |
| if not choices: | |
| logger.warning("Chunk has empty choices") | |
| return None | |
| # logger.info(f"Processing OpenAI format response: choices={choices}") | |
| delta = choices[0].delta | |
| # Create standard response structure | |
| response_data = { | |
| "id": chunk.id, | |
| "object": "chat.completion.chunk", | |
| "created": int(datetime.now().timestamp()), | |
| "model": "models/lpm", | |
| "system_fingerprint": chunk.system_fingerprint if hasattr(chunk, 'system_fingerprint') else None, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": { | |
| # Keep even if content is None, let the client handle it | |
| "content": delta.content if hasattr(delta, 'content') else "" | |
| }, | |
| "finish_reason": choices[0].finish_reason | |
| } | |
| ] | |
| } | |
| # If there is neither content nor finish_reason, skip | |
| if not (hasattr(delta, 'content') or choices[0].finish_reason): | |
| logger.debug("Skipping chunk with no content and no finish_reason") | |
| return None | |
| return response_data | |
| except Exception as e: | |
| logger.error(f"Error parsing response chunk: {e}, chunk: {chunk}") | |
| return None | |
| def handle_stream_response(self, response_iter: Iterator[Any]) -> Response: | |
| """Handle streaming response from the LLM server""" | |
| # Create a queue for thread communication | |
| message_queue = queue.Queue() | |
| # Create an event flag to notify when model processing is complete | |
| completion_event = threading.Event() | |
| # Create a variable to track if heartbeat is needed after first response | |
| first_response_received = False | |
| def heartbeat_thread(): | |
| """Thread function for sending heartbeats""" | |
| start_time = time.time() | |
| heartbeat_interval = 10 # Send heartbeat every 10 seconds | |
| heartbeat_count = 0 | |
| logger.info("[STREAM_DEBUG] Heartbeat thread started") | |
| try: | |
| # Send initial heartbeat | |
| message_queue.put((b": initial heartbeat\n\n", "[INITIAL_HEARTBEAT]")) | |
| last_heartbeat_time = time.time() | |
| while not completion_event.is_set(): | |
| current_time = time.time() | |
| # Check if we need to send a heartbeat | |
| if current_time - last_heartbeat_time >= heartbeat_interval: | |
| heartbeat_count += 1 | |
| elapsed = current_time - start_time | |
| logger.info(f"[STREAM_DEBUG] Sending heartbeat #{heartbeat_count} at {elapsed:.2f}s") | |
| message_queue.put((f": heartbeat #{heartbeat_count}\n\n".encode('utf-8'), "[HEARTBEAT]")) | |
| last_heartbeat_time = current_time | |
| # Short sleep to prevent CPU spinning | |
| time.sleep(0.1) | |
| logger.info(f"[STREAM_DEBUG] Heartbeat thread stopping after {heartbeat_count} heartbeats") | |
| except Exception as e: | |
| logger.error(f"[STREAM_DEBUG] Error in heartbeat thread: {str(e)}", exc_info=True) | |
| message_queue.put((f"data: {{\"error\": \"Heartbeat error: {str(e)}\"}}\n\n".encode('utf-8'), "[ERROR]")) | |
| def model_response_thread(): | |
| """Thread function for processing model responses""" | |
| chunk = None | |
| start_time = time.time() | |
| chunk_count = 0 | |
| try: | |
| logger.info("[STREAM_DEBUG] Model response thread started") | |
| # Process model responses | |
| for chunk in response_iter: | |
| current_time = time.time() | |
| elapsed_time = current_time - start_time | |
| chunk_count += 1 | |
| logger.info(f"[STREAM_DEBUG] Received chunk #{chunk_count} after {elapsed_time:.2f}s") | |
| if chunk is None: | |
| logger.warning("[STREAM_DEBUG] Received None chunk, skipping") | |
| continue | |
| # Check if it's an end marker | |
| if chunk == "[DONE]": | |
| logger.info(f"[STREAM_DEBUG] Received [DONE] marker after {elapsed_time:.2f}s") | |
| message_queue.put((b"data: [DONE]\n\n", "[DONE]")) | |
| break | |
| # Handle error responses | |
| if isinstance(chunk, dict) and "error" in chunk: | |
| logger.warning(f"[STREAM_DEBUG] Received error response: {chunk}") | |
| data_str = json.dumps(chunk) | |
| message_queue.put((f"data: {data_str}\n\n".encode('utf-8'), "[ERROR]")) | |
| message_queue.put((b"data: [DONE]\n\n", "[DONE]")) | |
| break | |
| # Handle normal responses | |
| response_data = self._parse_response_chunk(chunk) | |
| if response_data: | |
| data_str = json.dumps(response_data) | |
| content = response_data.get("choices", [{}])[0].get("delta", {}).get("content", "") | |
| content_length = len(content) if content else 0 | |
| logger.info(f"[STREAM_DEBUG] Sending chunk #{chunk_count}, content length: {content_length}, elapsed: {elapsed_time:.2f}s") | |
| message_queue.put((f"data: {data_str}\n\n".encode('utf-8'), "[CONTENT]")) | |
| else: | |
| logger.warning(f"[STREAM_DEBUG] Parsed response data is None for chunk #{chunk_count}") | |
| # Handle the case where no responses were received | |
| if chunk_count == 0: | |
| logger.info("[STREAM_DEBUG] No chunks received, sending empty message") | |
| thinking_message = { | |
| "id": str(uuid.uuid4()), | |
| "object": "chat.completion.chunk", | |
| "created": int(datetime.now().timestamp()), | |
| "model": "models/lpm", | |
| "system_fingerprint": None, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": { | |
| "content": "" # Empty content won't affect frontend display | |
| }, | |
| "finish_reason": None | |
| } | |
| ] | |
| } | |
| data_str = json.dumps(thinking_message) | |
| message_queue.put((f"data: {data_str}\n\n".encode('utf-8'), "[THINKING]")) | |
| # Model processing is complete, send end marker | |
| if chunk != "[DONE]": | |
| logger.info(f"[STREAM_DEBUG] Sending final [DONE] marker after {elapsed_time:.2f}s") | |
| message_queue.put((b"data: [DONE]\n\n", "[DONE]")) | |
| except Exception as e: | |
| logger.error(f"[STREAM_DEBUG] Error processing model response: {str(e)}", exc_info=True) | |
| message_queue.put((f"data: {{\"error\": \"{str(e)}\"}}\n\n".encode('utf-8'), "[ERROR]")) | |
| message_queue.put((b"data: [DONE]\n\n", "[DONE]")) | |
| finally: | |
| # Set completion event to notify heartbeat thread to stop | |
| completion_event.set() | |
| logger.info(f"[STREAM_DEBUG] Model response thread completed with {chunk_count} chunks") | |
| def generate(): | |
| """Main generator function for generating responses""" | |
| # Start heartbeat thread | |
| heart_thread = threading.Thread(target=heartbeat_thread, daemon=True) | |
| heart_thread.start() | |
| # Start model response processing thread | |
| model_thread = threading.Thread(target=model_response_thread, daemon=True) | |
| model_thread.start() | |
| try: | |
| # Get messages from queue and return to client | |
| while True: | |
| try: | |
| # Use short timeout to get message, prevent blocking | |
| message, message_type = message_queue.get(timeout=0.1) | |
| logger.debug(f"[STREAM_DEBUG] Yielding message type: {message_type}") | |
| yield message | |
| # If end marker is received, exit loop | |
| if message_type == "[DONE]": | |
| logger.info("[STREAM_DEBUG] Received [DONE] marker, ending generator") | |
| break | |
| except queue.Empty: | |
| # Queue is empty, continue trying to get message | |
| # Check if model thread has completed but didn't send [DONE] | |
| if completion_event.is_set() and not model_thread.is_alive(): | |
| logger.warning("[STREAM_DEBUG] Model thread completed without [DONE], ending generator") | |
| yield b"data: [DONE]\n\n" | |
| break | |
| pass | |
| except GeneratorExit: | |
| # Client closed connection | |
| logger.info("[STREAM_DEBUG] Client closed connection (GeneratorExit)") | |
| completion_event.set() | |
| except Exception as e: | |
| logger.error(f"[STREAM_DEBUG] Error in generator: {str(e)}", exc_info=True) | |
| try: | |
| yield f"data: {{\"error\": \"Generator error: {str(e)}\"}}\n\n".encode('utf-8') | |
| yield b"data: [DONE]\n\n" | |
| except: | |
| pass | |
| completion_event.set() | |
| finally: | |
| # Ensure completion event is set | |
| completion_event.set() | |
| # Wait for threads to complete | |
| if heart_thread.is_alive(): | |
| heart_thread.join(timeout=1.0) | |
| if model_thread.is_alive(): | |
| model_thread.join(timeout=1.0) | |
| logger.info("[STREAM_DEBUG] Generator completed") | |
| # Return response | |
| return Response( | |
| generate(), | |
| mimetype='text/event-stream', | |
| headers={ | |
| 'Cache-Control': 'no-cache, no-transform', | |
| 'X-Accel-Buffering': 'no', | |
| 'Connection': 'keep-alive', | |
| 'Transfer-Encoding': 'chunked' | |
| } | |
| ) | |
| # Global instance | |
| local_llm_service = LocalLLMService() | |