#!/usr/bin/env python3 """ Database management utility for Stock Alchemist Similar to cache_util.py but for the local database system """ import argparse import sys from pathlib import Path from datetime import datetime, timedelta import json # Add src to path sys.path.append(str(Path(__file__).parent.parent)) from db.local_database import LocalDatabase, DataType def show_stats(db_dir: str): """Show database statistics""" db = LocalDatabase(db_dir=db_dir) stats = db.get_stats() print("\n" + "="*60) print("DATABASE STATISTICS") print("="*60) print(f"\nšŸ“Š Overview:") print(f" Total Entries: {stats.get('total_entries', 0):,}") print(f" Total Size: {stats.get('total_size_mb', 0)} MB ({stats.get('total_size_bytes', 0):,} bytes)") print(f" Compression: {stats.get('compression', 'disabled')}") print(f" Expired Entries: {stats.get('expired_entries', 0)}") date_range = stats.get('date_range') if date_range: print(f" Date Range: {date_range['from']} to {date_range['to']}") by_type = stats.get('by_type', {}) if by_type: print(f"\nšŸ“ By Data Type:") for data_type, count in sorted(by_type.items()): print(f" {data_type:.<30} {count:>6,}") top_tickers = stats.get('top_tickers', {}) if top_tickers: print(f"\nšŸ¢ Top 10 Tickers:") for ticker, count in list(top_tickers.items())[:10]: print(f" {ticker:.<20} {count:>6,}") print("\n" + "="*60 + "\n") def clean_expired(db_dir: str): """Clean expired entries""" db = LocalDatabase(db_dir=db_dir) count = db.clean_expired() print(f"āœ“ Cleaned {count} expired entries") def clear_all(db_dir: str, confirm: bool = False): """Clear all database entries""" if not confirm: response = input("āš ļø This will delete ALL data. Are you sure? (yes/no): ") if response.lower() != 'yes': print("āŒ Cancelled") return db = LocalDatabase(db_dir=db_dir) db.clear_all() def clear_by_type(db_dir: str, data_type: str): """Clear entries of specific type""" db = LocalDatabase(db_dir=db_dir) # Query all entries of this type entries = db.query(data_type=data_type) if not entries: print(f"No entries found for type: {data_type}") return print(f"Found {len(entries)} entries of type '{data_type}'") response = input("Delete these entries? (yes/no): ") if response.lower() != 'yes': print("āŒ Cancelled") return # Delete each entry deleted = 0 for entry in entries: if db.delete(entry.date, entry.data_type, entry.ticker): deleted += 1 print(f"āœ“ Deleted {deleted} entries") def clear_by_ticker(db_dir: str, ticker: str): """Clear entries for specific ticker""" db = LocalDatabase(db_dir=db_dir) # Query all entries for this ticker entries = db.query(ticker=ticker) if not entries: print(f"No entries found for ticker: {ticker}") return print(f"Found {len(entries)} entries for ticker '{ticker}'") response = input("Delete these entries? (yes/no): ") if response.lower() != 'yes': print("āŒ Cancelled") return # Delete each entry deleted = 0 for entry in entries: if db.delete(entry.date, entry.data_type, entry.ticker): deleted += 1 print(f"āœ“ Deleted {deleted} entries") def clear_older_than(db_dir: str, days: int): """Clear entries older than specified days""" db = LocalDatabase(db_dir=db_dir) cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat() # Query old entries entries = db.query(date_to=cutoff_date) if not entries: print(f"No entries older than {days} days") return print(f"Found {len(entries)} entries older than {days} days") response = input("Delete these entries? (yes/no): ") if response.lower() != 'yes': print("āŒ Cancelled") return # Delete each entry deleted = 0 for entry in entries: if db.delete(entry.date, entry.data_type, entry.ticker): deleted += 1 print(f"āœ“ Deleted {deleted} entries") def search(db_dir: str, date_from: str = None, date_to: str = None, data_type: str = None, ticker: str = None, limit: int = 10): """Search database entries""" db = LocalDatabase(db_dir=db_dir) entries = db.query( date_from=date_from, date_to=date_to, data_type=data_type, ticker=ticker, limit=limit ) if not entries: print("No entries found") return print(f"\nšŸ“‹ Found {len(entries)} entries:\n") print(f"{'Date':<12} {'Type':<20} {'Ticker':<10} {'Created':<20}") print("-" * 70) for entry in entries: created = datetime.fromisoformat(entry.created_at).strftime("%Y-%m-%d %H:%M") print(f"{entry.date:<12} {entry.data_type:<20} {entry.ticker:<10} {created:<20}") print() def export_data(db_dir: str, output_file: str, date_from: str = None, date_to: str = None, data_type: str = None, ticker: str = None): """Export database entries to JSON file""" db = LocalDatabase(db_dir=db_dir) entries = db.query( date_from=date_from, date_to=date_to, data_type=data_type, ticker=ticker ) if not entries: print("No entries to export") return # Convert to dict list export_data = [entry.to_dict() for entry in entries] # Write to file with open(output_file, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, default=str) print(f"āœ“ Exported {len(entries)} entries to {output_file}") def import_data(db_dir: str, input_file: str, expiry_days: int = None): """Import database entries from JSON file""" db = LocalDatabase(db_dir=db_dir) with open(input_file, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, list): print("āŒ Invalid format: expected list of entries") return from db.local_database import DatabaseEntry entries = [] for item in data: try: entry = DatabaseEntry.from_dict(item) entries.append(entry) except Exception as e: print(f"āš ļø Skipping invalid entry: {e}") if not entries: print("No valid entries to import") return print(f"Found {len(entries)} valid entries") response = input("Import these entries? (yes/no): ") if response.lower() != 'yes': print("āŒ Cancelled") return count = db.save_batch(entries, expiry_days=expiry_days) print(f"āœ“ Imported {count} entries") def list_types(): """List available data types""" print("\nšŸ“ Available Data Types:\n") for dt in DataType: print(f" - {dt.value}") print() def main(): parser = argparse.ArgumentParser( description="Database management utility for Stock Alchemist", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Show statistics python db_util.py --stats # Clean expired entries python db_util.py --clean-expired # Search for entries python db_util.py --search --ticker AAPL --limit 20 # Clear entries by type python db_util.py --clear-type financial_info # Clear entries older than 30 days python db_util.py --clear-older-than 30 # Export data python db_util.py --export backup.json --date-from 2025-01-01 # Import data python db_util.py --import backup.json --expiry-days 30 """ ) parser.add_argument("--db-dir", type=str, default="database", help="Database directory (default: database)") # Actions parser.add_argument("--stats", action="store_true", help="Show database statistics") parser.add_argument("--clean-expired", action="store_true", help="Clean expired entries") parser.add_argument("--clear-all", action="store_true", help="Clear all entries (WARNING: destructive)") parser.add_argument("--clear-type", type=str, help="Clear entries of specific type") parser.add_argument("--clear-ticker", type=str, help="Clear entries for specific ticker") parser.add_argument("--clear-older-than", type=int, help="Clear entries older than N days") parser.add_argument("--search", action="store_true", help="Search database entries") parser.add_argument("--export", type=str, help="Export entries to JSON file") parser.add_argument("--import", type=str, dest="import_file", help="Import entries from JSON file") parser.add_argument("--list-types", action="store_true", help="List available data types") # Query filters parser.add_argument("--date-from", type=str, help="Start date (YYYY-MM-DD)") parser.add_argument("--date-to", type=str, help="End date (YYYY-MM-DD)") parser.add_argument("--data-type", type=str, help="Filter by data type") parser.add_argument("--ticker", type=str, help="Filter by ticker") parser.add_argument("--limit", type=int, default=10, help="Limit number of results (default: 10)") parser.add_argument("--expiry-days", type=int, help="Set expiry days for imported data") parser.add_argument("--yes", action="store_true", help="Skip confirmation prompts") args = parser.parse_args() # Execute actions if args.stats: show_stats(args.db_dir) elif args.clean_expired: clean_expired(args.db_dir) elif args.clear_all: clear_all(args.db_dir, confirm=args.yes) elif args.clear_type: clear_by_type(args.db_dir, args.clear_type) elif args.clear_ticker: clear_by_ticker(args.db_dir, args.clear_ticker) elif args.clear_older_than: clear_older_than(args.db_dir, args.clear_older_than) elif args.search: search(args.db_dir, args.date_from, args.date_to, args.data_type, args.ticker, args.limit) elif args.export: export_data(args.db_dir, args.export, args.date_from, args.date_to, args.data_type, args.ticker) elif args.import_file: import_data(args.db_dir, args.import_file, args.expiry_days) elif args.list_types: list_types() else: parser.print_help() if __name__ == "__main__": main()