Spaces:
Runtime error
Runtime error
| # filename: handlers/links.py | |
| import logging | |
| import re | |
| import uuid | |
| import asyncio | |
| from telethon import events, Button | |
| from core.bot import bot, BATCH_JOBS, PREMIUM_QUEUE, FREE_QUEUE, ACTIVE_USER_TASKS, USER_TURN_ORDER | |
| import config | |
| import templates | |
| from utils import terabox, helpers | |
| from database import manager as db_manager | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| # This regular expression finds all URLs in a message | |
| URL_REGEX = r'https?://[^\s<>"\']+' | |
| async def main_link_handler(event): | |
| sender = await event.get_sender() | |
| user_id = sender.id | |
| # --- Step 1: Pre-flight Checks --- | |
| user = await db_manager.get_user(user_id) | |
| if not user: | |
| user = await db_manager.add_or_update_user(user_id, sender.username, sender.first_name) | |
| if user.get('is_banned'): | |
| await event.reply(templates.BotResponses.USER_BANNED_MESSAGE) | |
| return | |
| if event.is_group and not await db_manager.is_group_authorized(event.chat_id): | |
| # Only reply if the group is not authorized, to avoid spam | |
| # You could add a check to only reply once per hour per group | |
| # await event.reply(templates.BotResponses.GROUP_NOT_AUTHORIZED.format(owner_id=config.OWNER_ID)) | |
| return | |
| # Force Subscribe Check | |
| if config.FORCE_SUB_CHANNEL_USERNAME: | |
| try: | |
| await bot.get_permissions(config.FORCE_SUB_CHANNEL_USERNAME, user_id) | |
| except Exception: | |
| await event.reply(templates.BotResponses.FORCE_SUBSCRIBE_MESSAGE.format(channel_username=config.FORCE_SUB_CHANNEL_USERNAME)) | |
| return | |
| # --- Step 2: Parse Links and Create Batch --- | |
| links = list(set(re.findall(URL_REGEX, event.text))) | |
| terabox_links = [link for link in links if "terabox" in link or "terashare" in link] | |
| if not terabox_links: | |
| return | |
| batch_id = str(uuid.uuid4())[:6] | |
| status_msg = await event.reply(templates.BotResponses.BATCH_ACKNOWLEDGEMENT.format( | |
| link_count=len(terabox_links), | |
| batch_id=batch_id | |
| )) | |
| # --- Step 3: Evaluate Each Link --- | |
| valid_tasks = [] | |
| skipped_links = [] | |
| for link in terabox_links: | |
| short_id = await terabox.extract_terabox_short_id(link) | |
| if not short_id: | |
| skipped_links.append({"link": link, "error": "Invalid Link Format"}) | |
| continue | |
| cached = await db_manager.get_cached_file(short_id) | |
| task_data = {"batch_id": batch_id, "link": link, "user_id": user_id, "short_id": short_id} | |
| if cached: | |
| task_data['metadata'] = {"file_name": cached['file_name'], "file_size": cached['file_size']} | |
| task_data['cached'] = True | |
| valid_tasks.append(task_data) | |
| continue | |
| # If not cached, get metadata for size check | |
| metadata = await terabox.get_final_url_and_metadata(link) | |
| if not metadata['success']: | |
| skipped_links.append({"link": link, "error": metadata['error']}) | |
| continue | |
| task_data['metadata'] = metadata | |
| task_data['cached'] = False | |
| # Apply free user limit | |
| is_premium = user.get('is_premium') and user.get('premium_expiry_date', datetime.min) > datetime.utcnow() | |
| if not is_premium and metadata['file_size'] > config.FREE_USER_FILE_SIZE_LIMIT_BYTES: | |
| skipped_links.append({ | |
| "link": link, | |
| "error": templates.BotResponses.PREMIUM_REQUIRED_ERROR.format( | |
| file_name=metadata['file_name'], | |
| file_size=helpers.format_bytes(metadata['file_size']), | |
| free_limit=helpers.format_bytes(config.FREE_USER_FILE_SIZE_LIMIT_BYTES) | |
| ) | |
| }) | |
| continue | |
| valid_tasks.append(task_data) | |
| # --- Step 4: Queue the Valid Tasks --- | |
| if not valid_tasks: | |
| # Handle case where no links were valid | |
| await status_msg.edit("❌ All links provided were invalid or failed the initial check.") | |
| return | |
| # Update the batch job tracker | |
| BATCH_JOBS[batch_id] = { | |
| "user_id": user_id, | |
| "chat_id": event.chat_id, | |
| "status_message_id": status_msg.id, | |
| "total_links": len(valid_tasks), | |
| "processed_links": 0, | |
| "original_total": len(terabox_links), | |
| "failed_links": skipped_links, # pre-flight failures | |
| "lock": asyncio.Lock() | |
| } | |
| is_premium = user.get('is_premium') and user.get('premium_expiry_date', datetime.min) > datetime.utcnow() | |
| if is_premium: | |
| for task in valid_tasks: | |
| await PREMIUM_QUEUE.put(task) | |
| else: | |
| # For free users, add tasks to their personal list and add them to the round-robin deque | |
| if user_id not in ACTIVE_USER_TASKS: | |
| ACTIVE_USER_TASKS[user_id] = [] | |
| ACTIVE_USER_TASKS[user_id].extend(valid_tasks) | |
| if user_id not in USER_TURN_ORDER: | |
| USER_TURN_ORDER.append(user_id) | |
| # Update the status message to show how many links are being processed | |
| await status_msg.edit(templates.BotResponses.BATCH_UPDATE_VALIDATED.format( | |
| batch_id=batch_id, | |
| valid_count=len(valid_tasks), | |
| total_count=len(terabox_links), | |
| skipped_count=len(skipped_links), | |
| progress_bar=helpers.create_progress_bar(0), | |
| processed_count=0 | |
| )) | |