Spaces:
Running
Running
| """ | |
| Task Manager with SQLite backend for CRUD operations. | |
| """ | |
| import sqlite3 | |
| import json | |
| from typing import List, Dict, Optional | |
| from datetime import datetime | |
| import os | |
| class TaskManager: | |
| """Manages tasks with SQLite persistence.""" | |
| # Strict status enum | |
| VALID_STATUSES = {"Todo", "In Progress", "Done"} | |
| def __init__(self, db_path: str = "focusflow.db", use_memory: bool = False): | |
| """ | |
| Initialize the task manager. | |
| Args: | |
| db_path: Path to SQLite database file | |
| use_memory: If True, use in-memory list instead of SQLite (for Demo/HF Spaces) | |
| """ | |
| self.db_path = db_path | |
| self.use_memory = use_memory | |
| self.memory_tasks = [] # List of dicts for in-memory storage | |
| self.memory_counter = 0 # Auto-increment ID for in-memory | |
| if not self.use_memory: | |
| self._init_db() | |
| else: | |
| print("ℹ️ TaskManager initialized in IN-MEMORY mode (non-persistent)") | |
| def _init_db(self): | |
| """Create the tasks table if it doesn't exist.""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS tasks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| title TEXT NOT NULL, | |
| description TEXT, | |
| status TEXT DEFAULT 'Todo', | |
| estimated_duration TEXT, | |
| position INTEGER, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| print(f"⚠️ Database initialization failed: {e}. Falling back to in-memory mode.") | |
| self.use_memory = True | |
| self.memory_tasks = [] | |
| self.memory_counter = 0 | |
| def add_task(self, title: str, description: str = "", | |
| estimated_duration: str = "", status: str = "Todo") -> int: | |
| """Add a new task and return its ID.""" | |
| # Validate status | |
| if status not in self.VALID_STATUSES: | |
| status = "Todo" | |
| if self.use_memory: | |
| self.memory_counter += 1 | |
| # Calculate position | |
| max_pos = 0 | |
| if self.memory_tasks: | |
| max_pos = max(t.get('position', 0) for t in self.memory_tasks) | |
| position = max_pos + 1 | |
| new_task = { | |
| "id": self.memory_counter, | |
| "title": title, | |
| "description": description, | |
| "status": status, | |
| "estimated_duration": estimated_duration, | |
| "position": position, | |
| "created_at": datetime.now().isoformat(), | |
| "updated_at": datetime.now().isoformat() | |
| } | |
| self.memory_tasks.append(new_task) | |
| return self.memory_counter | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Get max position | |
| cursor.execute("SELECT MAX(position) FROM tasks") | |
| result = cursor.fetchone() | |
| max_pos = result[0] if result and result[0] is not None else 0 | |
| position = max_pos + 1 | |
| cursor.execute(""" | |
| INSERT INTO tasks (title, description, status, estimated_duration, position) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (title, description, status, estimated_duration, position)) | |
| task_id = cursor.lastrowid or 0 | |
| conn.commit() | |
| conn.close() | |
| return task_id | |
| def get_all_tasks(self) -> List[Dict]: | |
| """Get all tasks ordered by position.""" | |
| if self.use_memory: | |
| return sorted(self.memory_tasks, key=lambda x: x.get('position', 0)) | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT id, title, description, status, estimated_duration, position | |
| FROM tasks ORDER BY position | |
| """) | |
| tasks = [dict(row) for row in cursor.fetchall()] | |
| conn.close() | |
| return tasks | |
| def get_task(self, task_id: int) -> Optional[Dict]: | |
| """Get a specific task by ID.""" | |
| if self.use_memory: | |
| for task in self.memory_tasks: | |
| if task['id'] == task_id: | |
| return task.copy() | |
| return None | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT id, title, description, status, estimated_duration, position | |
| FROM tasks WHERE id = ? | |
| """, (task_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def update_task(self, task_id: int, **kwargs): | |
| """Update a task's fields with validation.""" | |
| # Validate status if provided | |
| if 'status' in kwargs and kwargs['status'] not in self.VALID_STATUSES: | |
| raise ValueError(f"Invalid status. Must be one of: {', '.join(self.VALID_STATUSES)}") | |
| allowed_fields = ['title', 'description', 'status', 'estimated_duration', 'position'] | |
| if self.use_memory: | |
| for task in self.memory_tasks: | |
| if task['id'] == task_id: | |
| for key, value in kwargs.items(): | |
| if key in allowed_fields: | |
| task[key] = value | |
| task['updated_at'] = datetime.now().isoformat() | |
| return | |
| return | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| updates = [] | |
| values = [] | |
| for key, value in kwargs.items(): | |
| if key in allowed_fields: | |
| updates.append(f"{key} = ?") | |
| values.append(value) | |
| if updates: | |
| values.append(task_id) | |
| query = f"UPDATE tasks SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP WHERE id = ?" | |
| cursor.execute(query, values) | |
| conn.commit() | |
| conn.close() | |
| def delete_task(self, task_id: int): | |
| """Delete a task by ID.""" | |
| if self.use_memory: | |
| self.memory_tasks = [t for t in self.memory_tasks if t['id'] != task_id] | |
| return | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) | |
| conn.commit() | |
| conn.close() | |
| def reorder_tasks(self, task_ids: List[int]): | |
| """Reorder tasks based on new order.""" | |
| if self.use_memory: | |
| # Create a map for O(1) lookup | |
| task_map = {t['id']: t for t in self.memory_tasks} | |
| # Update positions based on the input list order | |
| for position, task_id in enumerate(task_ids, start=1): | |
| if task_id in task_map: | |
| task_map[task_id]['position'] = position | |
| return | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| for position, task_id in enumerate(task_ids, start=1): | |
| cursor.execute("UPDATE tasks SET position = ? WHERE id = ?", (position, task_id)) | |
| conn.commit() | |
| conn.close() | |
| def get_active_task(self) -> Optional[Dict]: | |
| """Get the task marked as 'In Progress'.""" | |
| if self.use_memory: | |
| # Filter for In Progress tasks and sort by position | |
| active_tasks = [t for t in self.memory_tasks if t['status'] == 'In Progress'] | |
| if not active_tasks: | |
| return None | |
| # Return the first one (lowest position) | |
| return sorted(active_tasks, key=lambda x: x.get('position', 0))[0].copy() | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT id, title, description, status, estimated_duration | |
| FROM tasks WHERE status = 'In Progress' | |
| ORDER BY position LIMIT 1 | |
| """) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def set_active_task(self, task_id: int) -> bool: | |
| """Set a task as 'In Progress' and ensure only one task has this status. | |
| Returns True if successful, False otherwise.""" | |
| if self.use_memory: | |
| # Check if task exists | |
| target_task = None | |
| for t in self.memory_tasks: | |
| if t['id'] == task_id: | |
| target_task = t | |
| break | |
| if not target_task: | |
| return False | |
| if target_task['status'] == 'Done': | |
| return False | |
| # Reset other In Progress tasks | |
| for t in self.memory_tasks: | |
| if t['status'] == 'In Progress': | |
| t['status'] = 'Todo' | |
| t['updated_at'] = datetime.now().isoformat() | |
| # Set target task | |
| target_task['status'] = 'In Progress' | |
| target_task['updated_at'] = datetime.now().isoformat() | |
| return True | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Check if task exists and is not already Done | |
| cursor.execute("SELECT status FROM tasks WHERE id = ?", (task_id,)) | |
| result = cursor.fetchone() | |
| if not result: | |
| conn.close() | |
| return False | |
| current_status = result[0] | |
| if current_status == "Done": | |
| conn.close() | |
| return False | |
| # Enforce single "In Progress" rule: set all current "In Progress" tasks back to "Todo" | |
| cursor.execute(""" | |
| UPDATE tasks SET status = 'Todo', updated_at = CURRENT_TIMESTAMP | |
| WHERE status = 'In Progress' | |
| """) | |
| # Set the selected task as 'In Progress' | |
| cursor.execute(""" | |
| UPDATE tasks SET status = 'In Progress', updated_at = CURRENT_TIMESTAMP | |
| WHERE id = ? | |
| """, (task_id,)) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| def clear_all_tasks(self): | |
| """Clear all tasks from the database.""" | |
| if self.use_memory: | |
| self.memory_tasks = [] | |
| return | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM tasks") | |
| conn.commit() | |
| conn.close() | |