new_recommender_system_nlp2 / cache_manager.py
bharatverse11's picture
Create cache_manager.py
c2a9907 verified
import time
from collections import OrderedDict
class CacheManager:
"""
A simple LRU (Least Recently Used) Cache with Time-to-Live (TTL) support.
"""
def __init__(self, capacity: int = 100, ttl_seconds: int = 3600):
"""
Initialize the cache.
:param capacity: Maximum number of items to store.
:param ttl_seconds: Time in seconds before an item expires.
"""
self.cache = OrderedDict()
self.capacity = capacity
self.ttl_seconds = ttl_seconds
print(f"πŸš€ CacheManager initialized with capacity={capacity} and TTL={ttl_seconds}s")
def get(self, key: str):
"""
Retrieve an item from the cache.
:param key: The key to lookup.
:return: The cached value if found and valid, else None.
"""
if key in self.cache:
result, timestamp = self.cache[key]
# Check if expired
if time.time() - timestamp < self.ttl_seconds:
# Move to end to mark as recently used
self.cache.move_to_end(key)
print(f"⚑ Cache HIT for query: '{key}'")
return result
else:
# Remove expired item
print(f"βŒ› Cache EXPIRED for query: '{key}'")
del self.cache[key]
return None
def set(self, key: str, value):
"""
Add an item to the cache.
:param key: The key to store.
:param value: The value to store.
"""
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = (value, time.time())
print(f"πŸ’Ύ Cache SET for query: '{key}'")
# Enforce capacity
if len(self.cache) > self.capacity:
removed = self.cache.popitem(last=False) # Remove first (least recently used)
print(f"πŸ—‘οΈ Cache full. Removed old entry: '{removed[0]}'")
def clear(self):
"""Clear the entire cache."""
self.cache.clear()
print("🧹 Cache cleared.")