Spaces:
Runtime error
Runtime error
| from pydrive2.auth import GoogleAuth | |
| from pydrive2.drive import GoogleDrive | |
| import os | |
| import gradio as gr | |
| from datasets import load_dataset, Dataset | |
| import pandas as pd | |
| from PIL import Image | |
| import pytesseract | |
| import cv2 | |
| import numpy as np | |
| import tensorflow as tf | |
| from transformers import LayoutLMv2Processor, LayoutLMv2ForSequenceClassification | |
| import torch | |
| from tqdm import tqdm | |
| import logging | |
| import re | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class CardPreprocessor: | |
| def __init__(self): | |
| # Initialize OCR and models | |
| self.processor = LayoutLMv2Processor.from_pretrained("microsoft/layoutlmv2-base-uncased") | |
| self.ocr_threshold = 0.5 | |
| def extract_text_regions(self, image): | |
| """Extract text regions from the image using OCR""" | |
| try: | |
| # Convert PIL Image to cv2 format | |
| img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Preprocess image for better OCR | |
| gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| thresh = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] | |
| # Perform OCR | |
| text = pytesseract.image_to_data(thresh, output_type=pytesseract.Output.DICT) | |
| # Extract relevant information | |
| extracted_info = { | |
| 'player_name': None, | |
| 'team': None, | |
| 'year': None, | |
| 'card_number': None, | |
| 'brand': None, | |
| 'stats': [] | |
| } | |
| # Process OCR results | |
| for i, word in enumerate(text['text']): | |
| if word.strip(): | |
| conf = int(text['conf'][i]) | |
| if conf > 50: # Filter low-confidence detections | |
| # Try to identify year | |
| year_match = re.search(r'19[0-9]{2}|20[0-2][0-9]', word) | |
| if year_match: | |
| extracted_info['year'] = year_match.group() | |
| # Try to identify card number | |
| card_num_match = re.search(r'#\d+|\d+/\d+', word) | |
| if card_num_match: | |
| extracted_info['card_number'] = card_num_match.group() | |
| # Look for common card brands | |
| brands = ['topps', 'upper deck', 'panini', 'fleer', 'bowman'] | |
| if word.lower() in brands: | |
| extracted_info['brand'] = word.lower() | |
| # Look for statistics (numbers with common sports stats patterns) | |
| stats_match = re.search(r'\d+\s*(?:HR|RBI|AVG|YDS|TD)', word) | |
| if stats_match: | |
| extracted_info['stats'].append(stats_match.group()) | |
| return extracted_info | |
| except Exception as e: | |
| logger.error(f"Error in OCR processing: {str(e)}") | |
| return None | |
| def analyze_card_condition(self, image): | |
| """Analyze the physical condition of the card""" | |
| try: | |
| # Convert PIL Image to cv2 format | |
| img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Edge detection for corner and edge analysis | |
| edges = cv2.Canny(img_cv, 100, 200) | |
| # Analyze corners | |
| corner_regions = { | |
| 'top_left': edges[0:50, 0:50], | |
| 'top_right': edges[0:50, -50:], | |
| 'bottom_left': edges[-50:, 0:50], | |
| 'bottom_right': edges[-50:, -50:] | |
| } | |
| corner_scores = {k: np.mean(v) for k, v in corner_regions.items()} | |
| # Analyze centering | |
| height, width = img_cv.shape[:2] | |
| center_x = width // 2 | |
| center_y = height // 2 | |
| # Calculate centering score | |
| centering_score = self.calculate_centering(img_cv, center_x, center_y) | |
| condition_info = { | |
| 'corner_scores': corner_scores, | |
| 'centering_score': centering_score, | |
| 'overall_condition': self.calculate_overall_condition(corner_scores, centering_score) | |
| } | |
| return condition_info | |
| except Exception as e: | |
| logger.error(f"Error in condition analysis: {str(e)}") | |
| return None | |
| def calculate_centering(self, image, center_x, center_y): | |
| """Calculate the centering score of the card""" | |
| try: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| # Find contours | |
| contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if contours: | |
| # Find the largest contour (assumed to be the card) | |
| main_contour = max(contours, key=cv2.contourArea) | |
| x, y, w, h = cv2.boundingRect(main_contour) | |
| # Calculate centering scores | |
| x_score = abs(0.5 - (x + w/2) / image.shape[1]) | |
| y_score = abs(0.5 - (y + h/2) / image.shape[0]) | |
| return 1 - (x_score + y_score) / 2 | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error in centering calculation: {str(e)}") | |
| return None | |
| def calculate_overall_condition(self, corner_scores, centering_score): | |
| """Calculate overall condition score""" | |
| if corner_scores and centering_score: | |
| corner_avg = sum(corner_scores.values()) / len(corner_scores) | |
| return (corner_avg + centering_score) / 2 | |
| return None | |
| def detect_orientation(self, image): | |
| """Detect if the card is portrait or landscape""" | |
| width, height = image.size | |
| return 'portrait' if height > width else 'landscape' | |
| class DatasetManager: | |
| def __init__(self, local_images_dir="downloaded_cards"): | |
| self.local_images_dir = local_images_dir | |
| self.drive = None | |
| self.dataset_name = "GotThatData/sports-cards" | |
| self.preprocessor = CardPreprocessor() | |
| # Create local directory if it doesn't exist | |
| os.makedirs(local_images_dir, exist_ok=True) | |
| def authenticate_drive(self): | |
| """Authenticate with Google Drive""" | |
| try: | |
| gauth = GoogleAuth() | |
| gauth.LocalWebserverAuth() | |
| self.drive = GoogleDrive(gauth) | |
| return True, "Successfully authenticated with Google Drive" | |
| except Exception as e: | |
| return False, f"Authentication failed: {str(e)}" | |
| def process_image(self, image_path): | |
| """Process a single image and extract information""" | |
| try: | |
| with Image.open(image_path) as img: | |
| # Extract text information | |
| text_info = self.preprocessor.extract_text_regions(img) | |
| # Analyze card condition | |
| condition_info = self.preprocessor.analyze_card_condition(img) | |
| # Get orientation | |
| orientation = self.preprocessor.detect_orientation(img) | |
| return { | |
| 'text_info': text_info, | |
| 'condition_info': condition_info, | |
| 'orientation': orientation | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing image {image_path}: {str(e)}") | |
| return None | |
| def generate_filename(self, info): | |
| """Generate filename based on extracted information""" | |
| year = info['text_info'].get('year', 'unknown_year') | |
| brand = info['text_info'].get('brand', 'unknown_brand') | |
| number = info['text_info'].get('card_number', '').replace('#', '').replace('/', '_') | |
| if not number: | |
| number = 'unknown_number' | |
| return f"sports_card_{year}_{brand}_{number}" | |
| def download_and_rename_files(self, drive_folder_id): | |
| """Download files from Google Drive and process them""" | |
| if not self.drive: | |
| return False, "Google Drive not authenticated", [] | |
| try: | |
| # List files in the folder | |
| query = f"'{drive_folder_id}' in parents and trashed=false" | |
| file_list = self.drive.ListFile({'q': query}).GetList() | |
| if not file_list: | |
| file = self.drive.CreateFile({'id': drive_folder_id}) | |
| if file: | |
| file_list = [file] | |
| else: | |
| return False, "No files found with the specified ID", [] | |
| processed_files = [] | |
| for i, file in enumerate(tqdm(file_list, desc="Processing files")): | |
| if file['mimeType'].startswith('image/'): | |
| temp_path = os.path.join(self.local_images_dir, f"temp_{i}.jpg") | |
| # Download file | |
| file.GetContentFile(temp_path) | |
| # Process image | |
| info = self.process_image(temp_path) | |
| if info: | |
| # Generate filename based on extracted info | |
| base_filename = self.generate_filename(info) | |
| new_filename = f"{base_filename}.jpg" | |
| final_path = os.path.join(self.local_images_dir, new_filename) | |
| # Rename file | |
| os.rename(temp_path, final_path) | |
| processed_files.append({ | |
| 'file_path': final_path, | |
| 'original_name': file['title'], | |
| 'new_name': new_filename, | |
| 'image': final_path, | |
| 'extracted_info': info['text_info'], | |
| 'condition': info['condition_info'], | |
| 'orientation': info['orientation'] | |
| }) | |
| else: | |
| os.remove(temp_path) | |
| return True, f"Successfully processed {len(processed_files)} images", processed_files | |
| except Exception as e: | |
| return False, f"Error processing files: {str(e)}", [] | |
| def update_huggingface_dataset(self, processed_files): | |
| """Update the sports-cards dataset with processed images""" | |
| try: | |
| # Create a DataFrame with the file information | |
| df = pd.DataFrame(processed_files) | |
| # Create a Hugging Face Dataset from the new files | |
| new_dataset = Dataset.from_pandas(df) | |
| try: | |
| # Try to load existing dataset | |
| existing_dataset = load_dataset(self.dataset_name) | |
| # Concatenate with existing dataset if it exists | |
| if 'train' in existing_dataset: | |
| new_dataset = concatenate_datasets([existing_dataset['train'], new_dataset]) | |
| except Exception: | |
| logger.info("Creating new dataset") | |
| # Push to Hugging Face Hub | |
| new_dataset.push_to_hub(self.dataset_name, split="train") | |
| return True, f"Successfully updated dataset '{self.dataset_name}' with {len(processed_files)} processed images" | |
| except Exception as e: | |
| return False, f"Error updating Hugging Face dataset: {str(e)}" | |
| def process_pipeline(folder_id): | |
| """Main pipeline to process images and update dataset""" | |
| manager = DatasetManager() | |
| # Step 1: Authenticate | |
| auth_success, auth_message = manager.authenticate_drive() | |
| if not auth_success: | |
| return auth_message | |
| # Step 2: Download and process files | |
| success, message, processed_files = manager.download_and_rename_files(folder_id) | |
| if not success: | |
| return message | |
| # Step 3: Update Hugging Face dataset | |
| success, hf_message = manager.update_huggingface_dataset(processed_files) | |
| # Create detailed report | |
| report = f"{message}\n{hf_message}\n\nDetailed Processing Report:\n" | |
| for file in processed_files: | |
| report += f"\nFile: {file['new_name']}\n" | |
| report += f"Extracted Info: {file['extracted_info']}\n" | |
| report += f"Condition Score: {file['condition']['overall_condition']:.2f}\n" | |
| report += f"Orientation: {file['orientation']}\n" | |
| report += "-" * 50 | |
| return report | |
| # Gradio interface | |
| demo = gr.Interface( | |
| fn=process_pipeline, | |
| inputs=[ | |
| gr.Textbox( | |
| label="Google Drive File/Folder ID", | |
| placeholder="Enter the ID from your Google Drive URL", | |
| value="151VOxPO91mg0C3ORiioGUd4hogzP1ujm" | |
| ) | |
| ], | |
| outputs=gr.Textbox(label="Processing Report"), | |
| title="AI-Powered Sports Cards Processor", | |
| description="Upload card images to automatically extract information, analyze condition, and add to dataset" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |