Kalpokoch commited on
Commit
7e6e5a8
Β·
verified Β·
1 Parent(s): b5cf3d3

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +74 -141
app/app.py CHANGED
@@ -4,8 +4,6 @@ import asyncio
4
  import logging
5
  import uuid
6
  import re
7
- import multiprocessing as mp
8
- from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
9
  from fastapi import FastAPI, HTTPException, Request
10
  from pydantic import BaseModel
11
  from llama_cpp import Llama
@@ -24,28 +22,21 @@ class RequestIdAdapter(logging.LoggerAdapter):
24
  logger = logging.getLogger("app")
25
 
26
  # -----------------------------
27
- # βœ… Configuration - Optimized for CPU
28
  # -----------------------------
29
  DB_PERSIST_DIRECTORY = os.getenv("DB_PERSIST_DIRECTORY", "/app/vector_database")
30
  CHUNKS_FILE_PATH = os.getenv("CHUNKS_FILE_PATH", "/app/granular_chunks_final.jsonl")
31
  MODEL_PATH = os.getenv("MODEL_PATH", "/app/tinyllama_dop_q4_k_m.gguf")
32
- LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", "60")) # Reduced timeout
33
  RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.3"))
34
  TOP_K_SEARCH = int(os.getenv("TOP_K_SEARCH", "3"))
35
  TOP_K_CONTEXT = int(os.getenv("TOP_K_CONTEXT", "1"))
36
 
37
- # CPU Optimization settings
38
- CPU_COUNT = mp.cpu_count()
39
- logger.info(f"Detected {CPU_COUNT} CPU cores")
40
-
41
  # -----------------------------
42
  # βœ… Initialize FastAPI App
43
  # -----------------------------
44
  app = FastAPI(title="NEEPCO DoP RAG Chatbot", version="2.1.0")
45
 
46
- # Thread pool for async operations
47
- thread_executor = ThreadPoolExecutor(max_workers=CPU_COUNT * 2)
48
-
49
  @app.middleware("http")
50
  async def add_request_id(request: Request, call_next):
51
  request_id = str(uuid.uuid4())
@@ -76,26 +67,19 @@ except Exception as e:
76
  db_ready = False
77
 
78
  # -----------------------------
79
- # βœ… Load TinyLlama GGUF Model - Optimized
80
  # -----------------------------
81
  logger.info(f"Loading GGUF model from: {MODEL_PATH}")
82
  try:
83
  llm = Llama(
84
  model_path=MODEL_PATH,
85
- n_ctx=2048, # Reduced context window for speed
86
- n_threads=CPU_COUNT, # Use all CPU cores
87
- n_batch=256, # Optimized batch size
88
  use_mlock=True,
89
- verbose=False,
90
- # Additional optimizations
91
- n_gpu_layers=0, # Force CPU only
92
- rope_scaling_type=-1, # Disable rope scaling for speed
93
- use_mmap=True, # Enable memory mapping
94
- low_vram=False, # We're on CPU
95
- # CPU-specific optimizations
96
- numa=True, # Enable NUMA awareness if available
97
  )
98
- logger.info("GGUF model loaded successfully with CPU optimizations.")
99
  model_ready = True
100
  except Exception as e:
101
  logger.error(f"FATAL: Failed to load GGUF model: {e}", exc_info=True)
@@ -116,49 +100,6 @@ class Feedback(BaseModel):
116
  feedback: str
117
  comment: str | None = None
118
 
119
- # -----------------------------
120
- # βœ… Optimized LLM Generation
121
- # -----------------------------
122
- async def generate_llm_response(prompt: str, request_id: str):
123
- """Optimized LLM response generation with better CPU utilization."""
124
- loop = asyncio.get_running_loop()
125
-
126
- def generate_response():
127
- return llm(
128
- prompt,
129
- max_tokens=1024, # Reduced for faster generation
130
- stop=["###", "Question:", "Context:", "</s>", "\n\n###"],
131
- temperature=0.05,
132
- echo=False,
133
- # CPU optimizations
134
- repeat_penalty=1.1,
135
- top_p=0.9,
136
- top_k=40,
137
- # Faster inference settings
138
- typical_p=1.0,
139
- mirostat_mode=0, # Disable for speed
140
- )
141
-
142
- # Use thread executor for better concurrency
143
- response = await loop.run_in_executor(thread_executor, generate_response)
144
-
145
- answer = response["choices"][0]["text"].strip()
146
- if not answer:
147
- raise ValueError("Empty response from LLM")
148
- return answer
149
-
150
- # -----------------------------
151
- # βœ… Optimized Search Function
152
- # -----------------------------
153
- async def perform_optimized_search(query_text: str):
154
- """Perform vector search in a separate thread to avoid blocking."""
155
- loop = asyncio.get_running_loop()
156
-
157
- def search_db():
158
- return db.search(query_text, top_k=TOP_K_SEARCH)
159
-
160
- return await loop.run_in_executor(thread_executor, search_db)
161
-
162
  # -----------------------------
163
  # βœ… Endpoints
164
  # -----------------------------
@@ -167,21 +108,30 @@ def get_logger_adapter(request: Request):
167
 
168
  @app.get("/")
169
  async def root():
170
- return {"status": "βœ… Server is running.", "cpu_cores": CPU_COUNT}
171
 
172
  @app.get("/health")
173
  async def health_check():
174
  status = {
175
  "status": "ok",
176
  "database_status": "ready" if db_ready else "error",
177
- "model_status": "ready" if model_ready else "error",
178
- "cpu_cores": CPU_COUNT,
179
- "optimization": "enabled"
180
  }
181
  if not db_ready or not model_ready:
182
  raise HTTPException(status_code=503, detail=status)
183
  return status
184
 
 
 
 
 
 
 
 
 
 
 
 
185
  @app.post("/chat")
186
  async def chat(query: Query, request: Request):
187
  adapter = get_logger_adapter(request)
@@ -192,9 +142,9 @@ async def chat(query: Query, request: Request):
192
  if question_lower in greeting_keywords:
193
  adapter.info(f"Handling a greeting or introductory query: '{query.question}'")
194
  intro_message = (
195
- f"Hello! I am an AI assistant specifically trained on NEEPCO's Delegation of Powers (DoP) policy document. "
196
- f"My purpose is to help you find accurate information and answer questions based on this specific dataset. "
197
- f"I am currently running optimized on a {CPU_COUNT}-core CPU environment. How can I assist you with the DoP policy today?"
198
  )
199
  return {
200
  "request_id": getattr(request.state, 'request_id', 'N/A'),
@@ -209,86 +159,75 @@ async def chat(query: Query, request: Request):
209
 
210
  adapter.info(f"Received query: '{query.question}'")
211
 
212
- try:
213
- # 1. Perform parallel search and prepare context
214
- search_task = perform_optimized_search(query.question)
215
- search_results = await search_task
216
 
217
- if not search_results:
218
- adapter.warning("No relevant context found in vector DB.")
219
- return {
220
- "request_id": request.state.request_id,
221
- "question": query.question,
222
- "context_used": "No relevant context found.",
223
- "answer": "Sorry, I could not find a relevant policy to answer that question. Please try rephrasing."
224
- }
225
-
226
- scores = [f"{result['relevance_score']:.4f}" for result in search_results]
227
- adapter.info(f"Found {len(search_results)} relevant chunks with scores: {scores}")
228
 
229
- # 2. Prepare Context (limit context size for faster processing)
230
- context_chunks = [result['text'] for result in search_results[:TOP_K_CONTEXT]]
231
- context = "\n---\n".join(context_chunks)
232
-
233
- # Truncate context if too long for faster processing
234
- max_context_length = 1500 # Reduced for faster inference
235
- if len(context) > max_context_length:
236
- context = context[:max_context_length] + "..."
237
-
238
- # 3. Build optimized prompt
239
- prompt = f"""<|system|>
240
  You are a precise and factual assistant for NEEPCO's Delegation of Powers (DoP) policy.
241
  Your task is to answer the user's question based ONLY on the provided context.
 
242
  - **Formatting Rule:** If the answer contains a list of items or steps, you **MUST** separate each item with a pipe symbol (`|`). For example: `First item|Second item|Third item`.
243
  - **Content Rule:** If the information is not in the provided context, you **MUST** reply with the exact phrase: "The provided policy context does not contain information on this topic."
244
- - **Brevity Rule:** Keep your answer concise and to the point.
245
  </s>
246
  <|user|>
247
  ### Relevant Context:
248
  ```
249
  {context}
250
  ```
 
251
  ### Question:
252
  {query.question}
253
  </s>
254
  <|assistant|>
255
- ### Answer:
256
  """
257
 
258
- # 4. Generate Response with timeout
259
- answer = "An error occurred while processing your request."
260
- try:
261
- adapter.info("Sending prompt to LLM for generation...")
262
- raw_answer = await asyncio.wait_for(
263
- generate_llm_response(prompt, request.state.request_id),
264
- timeout=LLM_TIMEOUT_SECONDS
265
- )
266
- adapter.info(f"LLM generation successful. Raw response: {raw_answer[:100]}...")
267
-
268
- # --- POST-PROCESSING LOGIC ---
269
- # Check if the model used the pipe separator, indicating a list.
270
- if '|' in raw_answer:
271
- adapter.info("Pipe separator found. Formatting response as a bulleted list.")
272
- # Split the string into a list of items
273
- items = raw_answer.split('|')
274
- # Clean up each item and format it as a bullet point
275
- cleaned_items = [f"β€’ {item.strip()}" for item in items if item.strip()]
276
- # Join them back together with newlines
277
- answer = "\n".join(cleaned_items)
278
- else:
279
- # If no separator, use the answer as is.
280
- answer = raw_answer
281
-
282
- except asyncio.TimeoutError:
283
- adapter.warning(f"LLM generation timed out after {LLM_TIMEOUT_SECONDS} seconds.")
284
- answer = "Sorry, the request took too long to process. Please try again with a simpler question."
285
- except Exception as e:
286
- adapter.error(f"An unexpected error occurred during LLM generation: {e}", exc_info=True)
287
- answer = "Sorry, an unexpected error occurred while generating a response."
288
-
289
  except Exception as e:
290
- adapter.error(f"An unexpected error occurred: {e}", exc_info=True)
291
- answer = "Sorry, an unexpected error occurred. Please try again."
292
 
293
  adapter.info(f"Final answer prepared. Returning to client.")
294
  return {
@@ -312,9 +251,3 @@ async def collect_feedback(feedback: Feedback, request: Request):
312
  }
313
  adapter.info(json.dumps(feedback_log))
314
  return {"status": "βœ… Feedback recorded. Thank you!"}
315
-
316
- # Graceful shutdown
317
- @app.on_event("shutdown")
318
- async def shutdown_event():
319
- thread_executor.shutdown(wait=True)
320
- logger.info("Application shutdown complete.")
 
4
  import logging
5
  import uuid
6
  import re
 
 
7
  from fastapi import FastAPI, HTTPException, Request
8
  from pydantic import BaseModel
9
  from llama_cpp import Llama
 
22
  logger = logging.getLogger("app")
23
 
24
  # -----------------------------
25
+ # βœ… Configuration
26
  # -----------------------------
27
  DB_PERSIST_DIRECTORY = os.getenv("DB_PERSIST_DIRECTORY", "/app/vector_database")
28
  CHUNKS_FILE_PATH = os.getenv("CHUNKS_FILE_PATH", "/app/granular_chunks_final.jsonl")
29
  MODEL_PATH = os.getenv("MODEL_PATH", "/app/tinyllama_dop_q4_k_m.gguf")
30
+ LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", "90"))
31
  RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.3"))
32
  TOP_K_SEARCH = int(os.getenv("TOP_K_SEARCH", "3"))
33
  TOP_K_CONTEXT = int(os.getenv("TOP_K_CONTEXT", "1"))
34
 
 
 
 
 
35
  # -----------------------------
36
  # βœ… Initialize FastAPI App
37
  # -----------------------------
38
  app = FastAPI(title="NEEPCO DoP RAG Chatbot", version="2.1.0")
39
 
 
 
 
40
  @app.middleware("http")
41
  async def add_request_id(request: Request, call_next):
42
  request_id = str(uuid.uuid4())
 
67
  db_ready = False
68
 
69
  # -----------------------------
70
+ # βœ… Load TinyLlama GGUF Model
71
  # -----------------------------
72
  logger.info(f"Loading GGUF model from: {MODEL_PATH}")
73
  try:
74
  llm = Llama(
75
  model_path=MODEL_PATH,
76
+ n_ctx=4096,
77
+ n_threads=4,
78
+ n_batch=512,
79
  use_mlock=True,
80
+ verbose=False
 
 
 
 
 
 
 
81
  )
82
+ logger.info("GGUF model loaded successfully.")
83
  model_ready = True
84
  except Exception as e:
85
  logger.error(f"FATAL: Failed to load GGUF model: {e}", exc_info=True)
 
100
  feedback: str
101
  comment: str | None = None
102
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
  # -----------------------------
104
  # βœ… Endpoints
105
  # -----------------------------
 
108
 
109
  @app.get("/")
110
  async def root():
111
+ return {"status": "βœ… Server is running."}
112
 
113
  @app.get("/health")
114
  async def health_check():
115
  status = {
116
  "status": "ok",
117
  "database_status": "ready" if db_ready else "error",
118
+ "model_status": "ready" if model_ready else "error"
 
 
119
  }
120
  if not db_ready or not model_ready:
121
  raise HTTPException(status_code=503, detail=status)
122
  return status
123
 
124
+ async def generate_llm_response(prompt: str, request_id: str):
125
+ loop = asyncio.get_running_loop()
126
+ response = await loop.run_in_executor(
127
+ None,
128
+ lambda: llm(prompt, max_tokens=2048, stop=["###", "Question:", "Context:", "</s>"], temperature=0.05, echo=False)
129
+ )
130
+ answer = response["choices"][0]["text"].strip()
131
+ if not answer:
132
+ raise ValueError("Empty response from LLM")
133
+ return answer
134
+
135
  @app.post("/chat")
136
  async def chat(query: Query, request: Request):
137
  adapter = get_logger_adapter(request)
 
142
  if question_lower in greeting_keywords:
143
  adapter.info(f"Handling a greeting or introductory query: '{query.question}'")
144
  intro_message = (
145
+ "Hello! I am an AI assistant specifically trained on NEEPCO's Delegation of Powers (DoP) policy document. "
146
+ "My purpose is to help you find accurate information and answer questions based on this specific dataset. "
147
+ "I am currently running on a CPU-based environment. How can I assist you with the DoP policy today?"
148
  )
149
  return {
150
  "request_id": getattr(request.state, 'request_id', 'N/A'),
 
159
 
160
  adapter.info(f"Received query: '{query.question}'")
161
 
162
+ # 1. Search Vector DB
163
+ search_results = db.search(query.question, top_k=TOP_K_SEARCH)
 
 
164
 
165
+ if not search_results:
166
+ adapter.warning("No relevant context found in vector DB.")
167
+ return {
168
+ "question": query.question,
169
+ "context_used": "No relevant context found.",
170
+ "answer": "Sorry, I could not find a relevant policy to answer that question. Please try rephrasing."
171
+ }
172
+
173
+ scores = [f"{result['relevance_score']:.4f}" for result in search_results]
174
+ adapter.info(f"Found {len(search_results)} relevant chunks with scores: {scores}")
 
175
 
176
+ # 2. Prepare Context
177
+ context_chunks = [result['text'] for result in search_results[:TOP_K_CONTEXT]]
178
+ context = "\n---\n".join(context_chunks)
179
+
180
+ # 3. Build Prompt with Separator Instruction
181
+ prompt = f"""<|system|>
 
 
 
 
 
182
  You are a precise and factual assistant for NEEPCO's Delegation of Powers (DoP) policy.
183
  Your task is to answer the user's question based ONLY on the provided context.
184
+
185
  - **Formatting Rule:** If the answer contains a list of items or steps, you **MUST** separate each item with a pipe symbol (`|`). For example: `First item|Second item|Third item`.
186
  - **Content Rule:** If the information is not in the provided context, you **MUST** reply with the exact phrase: "The provided policy context does not contain information on this topic."
 
187
  </s>
188
  <|user|>
189
  ### Relevant Context:
190
  ```
191
  {context}
192
  ```
193
+
194
  ### Question:
195
  {query.question}
196
  </s>
197
  <|assistant|>
198
+ ### Detailed Answer:
199
  """
200
 
201
+ # 4. Generate Response
202
+ answer = "An error occurred while processing your request."
203
+ try:
204
+ adapter.info("Sending prompt to LLM for generation...")
205
+ raw_answer = await asyncio.wait_for(
206
+ generate_llm_response(prompt, request.state.request_id),
207
+ timeout=LLM_TIMEOUT_SECONDS
208
+ )
209
+ adapter.info(f"LLM generation successful. Raw response: {raw_answer[:250]}...")
210
+
211
+ # --- POST-PROCESSING LOGIC ---
212
+ # Check if the model used the pipe separator, indicating a list.
213
+ if '|' in raw_answer:
214
+ adapter.info("Pipe separator found. Formatting response as a bulleted list.")
215
+ # Split the string into a list of items
216
+ items = raw_answer.split('|')
217
+ # Clean up each item and format it as a bullet point
218
+ cleaned_items = [f"* {item.strip()}" for item in items if item.strip()]
219
+ # Join them back together with newlines
220
+ answer = "\n".join(cleaned_items)
221
+ else:
222
+ # If no separator, use the answer as is.
223
+ answer = raw_answer
224
+
225
+ except asyncio.TimeoutError:
226
+ adapter.warning(f"LLM generation timed out after {LLM_TIMEOUT_SECONDS} seconds.")
227
+ answer = "Sorry, the request took too long to process. Please try again with a simpler question."
 
 
 
 
228
  except Exception as e:
229
+ adapter.error(f"An unexpected error occurred during LLM generation: {e}", exc_info=True)
230
+ answer = "Sorry, an unexpected error occurred while generating a response."
231
 
232
  adapter.info(f"Final answer prepared. Returning to client.")
233
  return {
 
251
  }
252
  adapter.info(json.dumps(feedback_log))
253
  return {"status": "βœ… Feedback recorded. Thank you!"}