Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Database management utility for Stock Alchemist | |
| Similar to cache_util.py but for the local database system | |
| """ | |
| import argparse | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| import json | |
| # Add src to path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from db.local_database import LocalDatabase, DataType | |
| def show_stats(db_dir: str): | |
| """Show database statistics""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| stats = db.get_stats() | |
| print("\n" + "="*60) | |
| print("DATABASE STATISTICS") | |
| print("="*60) | |
| print(f"\n📊 Overview:") | |
| print(f" Total Entries: {stats.get('total_entries', 0):,}") | |
| print(f" Total Size: {stats.get('total_size_mb', 0)} MB ({stats.get('total_size_bytes', 0):,} bytes)") | |
| print(f" Compression: {stats.get('compression', 'disabled')}") | |
| print(f" Expired Entries: {stats.get('expired_entries', 0)}") | |
| date_range = stats.get('date_range') | |
| if date_range: | |
| print(f" Date Range: {date_range['from']} to {date_range['to']}") | |
| by_type = stats.get('by_type', {}) | |
| if by_type: | |
| print(f"\n📁 By Data Type:") | |
| for data_type, count in sorted(by_type.items()): | |
| print(f" {data_type:.<30} {count:>6,}") | |
| top_tickers = stats.get('top_tickers', {}) | |
| if top_tickers: | |
| print(f"\n🏢 Top 10 Tickers:") | |
| for ticker, count in list(top_tickers.items())[:10]: | |
| print(f" {ticker:.<20} {count:>6,}") | |
| print("\n" + "="*60 + "\n") | |
| def clean_expired(db_dir: str): | |
| """Clean expired entries""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| count = db.clean_expired() | |
| print(f"✓ Cleaned {count} expired entries") | |
| def clear_all(db_dir: str, confirm: bool = False): | |
| """Clear all database entries""" | |
| if not confirm: | |
| response = input("⚠️ This will delete ALL data. Are you sure? (yes/no): ") | |
| if response.lower() != 'yes': | |
| print("❌ Cancelled") | |
| return | |
| db = LocalDatabase(db_dir=db_dir) | |
| db.clear_all() | |
| def clear_by_type(db_dir: str, data_type: str): | |
| """Clear entries of specific type""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| # Query all entries of this type | |
| entries = db.query(data_type=data_type) | |
| if not entries: | |
| print(f"No entries found for type: {data_type}") | |
| return | |
| print(f"Found {len(entries)} entries of type '{data_type}'") | |
| response = input("Delete these entries? (yes/no): ") | |
| if response.lower() != 'yes': | |
| print("❌ Cancelled") | |
| return | |
| # Delete each entry | |
| deleted = 0 | |
| for entry in entries: | |
| if db.delete(entry.date, entry.data_type, entry.ticker): | |
| deleted += 1 | |
| print(f"✓ Deleted {deleted} entries") | |
| def clear_by_ticker(db_dir: str, ticker: str): | |
| """Clear entries for specific ticker""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| # Query all entries for this ticker | |
| entries = db.query(ticker=ticker) | |
| if not entries: | |
| print(f"No entries found for ticker: {ticker}") | |
| return | |
| print(f"Found {len(entries)} entries for ticker '{ticker}'") | |
| response = input("Delete these entries? (yes/no): ") | |
| if response.lower() != 'yes': | |
| print("❌ Cancelled") | |
| return | |
| # Delete each entry | |
| deleted = 0 | |
| for entry in entries: | |
| if db.delete(entry.date, entry.data_type, entry.ticker): | |
| deleted += 1 | |
| print(f"✓ Deleted {deleted} entries") | |
| def clear_older_than(db_dir: str, days: int): | |
| """Clear entries older than specified days""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat() | |
| # Query old entries | |
| entries = db.query(date_to=cutoff_date) | |
| if not entries: | |
| print(f"No entries older than {days} days") | |
| return | |
| print(f"Found {len(entries)} entries older than {days} days") | |
| response = input("Delete these entries? (yes/no): ") | |
| if response.lower() != 'yes': | |
| print("❌ Cancelled") | |
| return | |
| # Delete each entry | |
| deleted = 0 | |
| for entry in entries: | |
| if db.delete(entry.date, entry.data_type, entry.ticker): | |
| deleted += 1 | |
| print(f"✓ Deleted {deleted} entries") | |
| def search(db_dir: str, date_from: str = None, date_to: str = None, | |
| data_type: str = None, ticker: str = None, limit: int = 10): | |
| """Search database entries""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| entries = db.query( | |
| date_from=date_from, | |
| date_to=date_to, | |
| data_type=data_type, | |
| ticker=ticker, | |
| limit=limit | |
| ) | |
| if not entries: | |
| print("No entries found") | |
| return | |
| print(f"\n📋 Found {len(entries)} entries:\n") | |
| print(f"{'Date':<12} {'Type':<20} {'Ticker':<10} {'Created':<20}") | |
| print("-" * 70) | |
| for entry in entries: | |
| created = datetime.fromisoformat(entry.created_at).strftime("%Y-%m-%d %H:%M") | |
| print(f"{entry.date:<12} {entry.data_type:<20} {entry.ticker:<10} {created:<20}") | |
| print() | |
| def export_data(db_dir: str, output_file: str, date_from: str = None, | |
| date_to: str = None, data_type: str = None, ticker: str = None): | |
| """Export database entries to JSON file""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| entries = db.query( | |
| date_from=date_from, | |
| date_to=date_to, | |
| data_type=data_type, | |
| ticker=ticker | |
| ) | |
| if not entries: | |
| print("No entries to export") | |
| return | |
| # Convert to dict list | |
| export_data = [entry.to_dict() for entry in entries] | |
| # Write to file | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| json.dump(export_data, f, indent=2, default=str) | |
| print(f"✓ Exported {len(entries)} entries to {output_file}") | |
| def import_data(db_dir: str, input_file: str, expiry_days: int = None): | |
| """Import database entries from JSON file""" | |
| db = LocalDatabase(db_dir=db_dir) | |
| with open(input_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if not isinstance(data, list): | |
| print("❌ Invalid format: expected list of entries") | |
| return | |
| from db.local_database import DatabaseEntry | |
| entries = [] | |
| for item in data: | |
| try: | |
| entry = DatabaseEntry.from_dict(item) | |
| entries.append(entry) | |
| except Exception as e: | |
| print(f"⚠️ Skipping invalid entry: {e}") | |
| if not entries: | |
| print("No valid entries to import") | |
| return | |
| print(f"Found {len(entries)} valid entries") | |
| response = input("Import these entries? (yes/no): ") | |
| if response.lower() != 'yes': | |
| print("❌ Cancelled") | |
| return | |
| count = db.save_batch(entries, expiry_days=expiry_days) | |
| print(f"✓ Imported {count} entries") | |
| def list_types(): | |
| """List available data types""" | |
| print("\n📁 Available Data Types:\n") | |
| for dt in DataType: | |
| print(f" - {dt.value}") | |
| print() | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Database management utility for Stock Alchemist", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Show statistics | |
| python db_util.py --stats | |
| # Clean expired entries | |
| python db_util.py --clean-expired | |
| # Search for entries | |
| python db_util.py --search --ticker AAPL --limit 20 | |
| # Clear entries by type | |
| python db_util.py --clear-type financial_info | |
| # Clear entries older than 30 days | |
| python db_util.py --clear-older-than 30 | |
| # Export data | |
| python db_util.py --export backup.json --date-from 2025-01-01 | |
| # Import data | |
| python db_util.py --import backup.json --expiry-days 30 | |
| """ | |
| ) | |
| parser.add_argument("--db-dir", type=str, default="database", | |
| help="Database directory (default: database)") | |
| # Actions | |
| parser.add_argument("--stats", action="store_true", | |
| help="Show database statistics") | |
| parser.add_argument("--clean-expired", action="store_true", | |
| help="Clean expired entries") | |
| parser.add_argument("--clear-all", action="store_true", | |
| help="Clear all entries (WARNING: destructive)") | |
| parser.add_argument("--clear-type", type=str, | |
| help="Clear entries of specific type") | |
| parser.add_argument("--clear-ticker", type=str, | |
| help="Clear entries for specific ticker") | |
| parser.add_argument("--clear-older-than", type=int, | |
| help="Clear entries older than N days") | |
| parser.add_argument("--search", action="store_true", | |
| help="Search database entries") | |
| parser.add_argument("--export", type=str, | |
| help="Export entries to JSON file") | |
| parser.add_argument("--import", type=str, dest="import_file", | |
| help="Import entries from JSON file") | |
| parser.add_argument("--list-types", action="store_true", | |
| help="List available data types") | |
| # Query filters | |
| parser.add_argument("--date-from", type=str, | |
| help="Start date (YYYY-MM-DD)") | |
| parser.add_argument("--date-to", type=str, | |
| help="End date (YYYY-MM-DD)") | |
| parser.add_argument("--data-type", type=str, | |
| help="Filter by data type") | |
| parser.add_argument("--ticker", type=str, | |
| help="Filter by ticker") | |
| parser.add_argument("--limit", type=int, default=10, | |
| help="Limit number of results (default: 10)") | |
| parser.add_argument("--expiry-days", type=int, | |
| help="Set expiry days for imported data") | |
| parser.add_argument("--yes", action="store_true", | |
| help="Skip confirmation prompts") | |
| args = parser.parse_args() | |
| # Execute actions | |
| if args.stats: | |
| show_stats(args.db_dir) | |
| elif args.clean_expired: | |
| clean_expired(args.db_dir) | |
| elif args.clear_all: | |
| clear_all(args.db_dir, confirm=args.yes) | |
| elif args.clear_type: | |
| clear_by_type(args.db_dir, args.clear_type) | |
| elif args.clear_ticker: | |
| clear_by_ticker(args.db_dir, args.clear_ticker) | |
| elif args.clear_older_than: | |
| clear_older_than(args.db_dir, args.clear_older_than) | |
| elif args.search: | |
| search(args.db_dir, args.date_from, args.date_to, | |
| args.data_type, args.ticker, args.limit) | |
| elif args.export: | |
| export_data(args.db_dir, args.export, args.date_from, | |
| args.date_to, args.data_type, args.ticker) | |
| elif args.import_file: | |
| import_data(args.db_dir, args.import_file, args.expiry_days) | |
| elif args.list_types: | |
| list_types() | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() | |