Spaces:
Runtime error
Runtime error
| # filename: core/bot.py | |
| import asyncio | |
| import logging | |
| import os | |
| import time | |
| from collections import deque | |
| from telethon import TelegramClient | |
| import config | |
| import templates | |
| from database import manager as db_manager | |
| from utils import terabox, ffmpeg, helpers | |
| logger = logging.getLogger(__name__) | |
| # --- Core Bot Client --- | |
| bot = TelegramClient('terabox_bot_session', config.API_ID, config.API_HASH) | |
| # --- In-Memory State Management --- | |
| # Queues for the workers | |
| PREMIUM_QUEUE = asyncio.Queue() | |
| FREE_QUEUE = asyncio.Queue() | |
| # Dictionary to track the state of all active batch jobs | |
| # Structure: { "batch_id": { ...job_details... } } | |
| BATCH_JOBS = {} | |
| # Dictionary to hold pending tasks for each free user before they hit the queue | |
| # This is essential for the round-robin scheduler | |
| # Structure: { user_id: [task1, task2, ...] } | |
| ACTIVE_USER_TASKS = {} | |
| # Deque to manage the turn order for the fair-share scheduler | |
| USER_TURN_ORDER = deque() | |
| # --- The Fair-Share Scheduler for Free Users --- | |
| async def scheduler_loop(): | |
| """ | |
| The heart of the fair-share system. | |
| This loop checks whose turn it is and feeds one task at a time into the FREE_QUEUE. | |
| """ | |
| logger.info("Fair-Share Scheduler started.") | |
| while True: | |
| await asyncio.sleep(0.2) # Prevent busy-looping | |
| if not USER_TURN_ORDER or FREE_QUEUE.full(): | |
| continue | |
| # Get the next user in line | |
| user_id = USER_TURN_ORDER[0] | |
| if user_id in ACTIVE_USER_TASKS and ACTIVE_USER_TASKS[user_id]: | |
| # Take the next task for this user | |
| task = ACTIVE_USER_TASKS[user_id].pop(0) | |
| await FREE_QUEUE.put(task) | |
| # If the user has no more tasks, remove them from the turn order. | |
| # Otherwise, move them to the back of the line. | |
| USER_TURN_ORDER.rotate(-1) # Move current user to the end | |
| if not ACTIVE_USER_TASKS[user_id]: | |
| USER_TURN_ORDER.pop() # Remove them if their list is now empty | |
| del ACTIVE_USER_TASKS[user_id] | |
| else: | |
| # Cleanup: If user is in the turn order but has no tasks, remove them. | |
| USER_TURN_ORDER.popleft() | |
| # --- The Worker Logic --- | |
| async def _process_task(task: dict, worker_name: str): | |
| """The main logic for processing a single link. Executed by all workers.""" | |
| batch_id = task['batch_id'] | |
| original_link = task['link'] | |
| user_id = task['user_id'] | |
| metadata = task['metadata'] # Metadata is pre-fetched by the handler | |
| batch_info = BATCH_JOBS.get(batch_id) | |
| if not batch_info: | |
| logger.warning(f"[{worker_name}] Batch {batch_id} not found. Task for {original_link} skipped.") | |
| return | |
| logger.info(f"[{worker_name}] Starting processing for link: {original_link}") | |
| download_path = None | |
| final_file_path = None | |
| error_reason = None | |
| try: | |
| # Step 1: Download the file from the direct link | |
| download_path = await terabox.download_file_from_url( | |
| url=metadata['url'], | |
| dir_path="downloads", | |
| filename=metadata['file_name'] | |
| ) | |
| if not download_path: | |
| raise ValueError("File download failed.") | |
| final_file_path = download_path | |
| # Step 2: FFMPEG processing (if enabled and it's a video) | |
| thumbnail_path = None | |
| if config.ENABLE_FFMPEG and final_file_path.endswith(('.mp4', '.mkv', '.webm')): | |
| # Remux to MP4 if needed (fast operation) | |
| if not final_file_path.endswith('.mp4'): | |
| remuxed_path = f"{os.path.splitext(final_file_path)[0]}.mp4" | |
| remuxed_path = await ffmpeg.remux_to_mp4(final_file_path, remuxed_path) | |
| if remuxed_path: | |
| os.remove(final_file_path) # remove original file | |
| final_file_path = remuxed_path | |
| # Generate thumbnail (fast operation) | |
| thumb_path = f"{os.path.splitext(final_file_path)[0]}.jpg" | |
| thumbnail_path = await ffmpeg.generate_thumbnail(final_file_path, thumb_path) | |
| # Step 3: Upload to backup channel & cache | |
| caption = templates.BotResponses.FILE_CAPTION.format( | |
| file_name=metadata['file_name'], | |
| file_size=helpers.format_bytes(metadata['file_size']), | |
| source_url=original_link | |
| ) | |
| backup_message = await bot.send_file( | |
| config.BACKUP_CHANNEL_ID, | |
| file=final_file_path, | |
| thumb=thumbnail_path, | |
| caption=caption | |
| ) | |
| await db_manager.add_to_cache( | |
| short_id=task['short_id'], | |
| file_id=backup_message.id, | |
| file_name=metadata['file_name'], | |
| file_size=metadata['file_size'] | |
| ) | |
| # Step 4: Deliver to user and schedule deletion | |
| dm_message = await bot.send_file( | |
| user_id, | |
| file=backup_message, # Send using file_id from backup channel for speed | |
| caption=caption | |
| ) | |
| # Here you would add logic to call the APScheduler to delete dm_message in 30 mins | |
| except Exception as e: | |
| logger.error(f"[{worker_name}] Error processing {original_link}: {e}", exc_info=True) | |
| error_reason = str(e) | |
| finally: | |
| # Clean up local files | |
| if download_path and os.path.exists(download_path): | |
| os.remove(download_path) | |
| if final_file_path and final_file_path != download_path and os.path.exists(final_file_path): | |
| os.remove(final_file_path) | |
| if 'thumbnail_path' in locals() and thumbnail_path and os.path.exists(thumbnail_path): | |
| os.remove(thumbnail_path) | |
| # --- Final Step: Update Batch Status Safely --- | |
| async with batch_info['lock']: | |
| batch_info['processed_links'] += 1 | |
| if error_reason: | |
| batch_info['failed_links'].append({"link": original_link, "error": error_reason}) | |
| # Update progress message | |
| processed = batch_info['processed_links'] | |
| total = batch_info['total_links'] | |
| progress = processed / total | |
| try: | |
| await bot.edit_message( | |
| batch_info['chat_id'], | |
| batch_info['status_message_id'], | |
| text=templates.BotResponses.BATCH_UPDATE_VALIDATED.format( | |
| batch_id=batch_id[:6], | |
| valid_count=total, | |
| total_count=batch_info['original_total'], | |
| skipped_count=batch_info['original_total'] - total, | |
| progress_bar=helpers.create_progress_bar(progress), | |
| processed_count=processed | |
| ) | |
| ) | |
| except Exception: | |
| pass # Ignore if message can't be edited | |
| # If batch is fully processed, send final summary | |
| if processed == total: | |
| # Here you would call a final summary function | |
| logger.info(f"[{worker_name}] Batch {batch_id[:6]} complete.") | |
| # ... logic to send final summary and delete job from BATCH_JOBS ... | |
| async def worker(name: str, queue: asyncio.Queue): | |
| """The generic worker function.""" | |
| while True: | |
| task = await queue.get() | |
| try: | |
| await _process_task(task, name) | |
| finally: | |
| queue.task_done() | |
| # --- Startup Function --- | |
| def start_bot_runtime(): | |
| """Creates all the background tasks for the bot's engine.""" | |
| # Create Premium Workers | |
| for i in range(config.PREMIUM_WORKERS): | |
| asyncio.create_task(worker(f"PremiumWorker-{i+1}", PREMIUM_QUEUE)) | |
| # Create Free Workers | |
| for i in range(config.FREE_WORKERS): | |
| asyncio.create_task(worker(f"FreeWorker-{i+1}", FREE_QUEUE)) | |
| # Start the Fair-Share Scheduler | |
| asyncio.create_task(scheduler_loop()) | |
| logger.info(f"Bot runtime started with {config.PREMIUM_WORKERS} premium and {config.FREE_WORKERS} free workers.") | |