Spaces:
Sleeping
Sleeping
| """ | |
| Advanced Reddit Scraper with Exponential Backoff, Hierarchy Tracking, and User History | |
| """ | |
| import praw | |
| import pandas as pd | |
| import time | |
| import json | |
| import sqlite3 | |
| import hashlib | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional, Tuple, Set | |
| import concurrent.futures | |
| from functools import lru_cache | |
| import pytz | |
| import os | |
| import pickle | |
| from pathlib import Path | |
| class ExponentialBackoff: | |
| """Implements exponential backoff with jitter for rate limiting""" | |
| def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0, factor: float = 2.0): | |
| self.base_delay = base_delay | |
| self.max_delay = max_delay | |
| self.factor = factor | |
| self.attempt = 0 | |
| def reset(self): | |
| """Reset backoff counter""" | |
| self.attempt = 0 | |
| def wait(self): | |
| """Calculate and execute wait with exponential backoff""" | |
| if self.attempt == 0: | |
| delay = self.base_delay | |
| else: | |
| # Exponential backoff with jitter | |
| delay = min(self.base_delay * (self.factor ** self.attempt), self.max_delay) | |
| # Add jitter (±25% randomization) | |
| import random | |
| jitter = delay * 0.25 * (2 * random.random() - 1) | |
| delay = delay + jitter | |
| time.sleep(delay) | |
| self.attempt += 1 | |
| return delay | |
| def success(self): | |
| """Call on successful request to reset or reduce backoff""" | |
| self.attempt = max(0, self.attempt - 1) | |
| class CommentHierarchyTracker: | |
| """Tracks and reconstructs comment hierarchies with parent relationships""" | |
| def __init__(self): | |
| self.comments = {} | |
| self.submissions = {} | |
| self.orphaned_comments = set() | |
| def add_submission(self, submission_id: str, submission_data: Dict): | |
| """Add a submission with t3_ prefix""" | |
| if not submission_id.startswith('t3_'): | |
| submission_id = f't3_{submission_id}' | |
| self.submissions[submission_id] = submission_data | |
| def add_comment(self, comment_id: str, parent_id: str, comment_data: Dict): | |
| """Add a comment with proper t1_/t3_ prefixes and parent tracking""" | |
| # Ensure proper prefixes | |
| if not comment_id.startswith('t1_'): | |
| comment_id = f't1_{comment_id}' | |
| if parent_id: | |
| if not parent_id.startswith(('t1_', 't3_')): | |
| # Determine if parent is submission or comment | |
| if parent_id in [s.replace('t3_', '') for s in self.submissions.keys()]: | |
| parent_id = f't3_{parent_id}' | |
| else: | |
| parent_id = f't1_{parent_id}' | |
| comment_data['parent_id'] = parent_id | |
| self.comments[comment_id] = comment_data | |
| # Track orphans | |
| if parent_id and parent_id not in self.submissions and parent_id not in self.comments: | |
| self.orphaned_comments.add(comment_id) | |
| def reconstruct_thread(self, submission_id: str) -> Dict: | |
| """Reconstruct complete thread hierarchy""" | |
| if not submission_id.startswith('t3_'): | |
| submission_id = f't3_{submission_id}' | |
| thread = { | |
| 'submission': self.submissions.get(submission_id, {}), | |
| 'comments': {}, | |
| 'hierarchy': {} | |
| } | |
| # Build hierarchy | |
| for comment_id, comment in self.comments.items(): | |
| if comment.get('submission_id') == submission_id: | |
| parent_id = comment.get('parent_id') | |
| if parent_id == submission_id: | |
| # Top-level comment | |
| thread['hierarchy'][comment_id] = comment | |
| thread['hierarchy'][comment_id]['replies'] = {} | |
| else: | |
| # Reply to another comment | |
| self._add_to_hierarchy(thread['hierarchy'], comment_id, parent_id, comment) | |
| thread['comments'] = {cid: c for cid, c in self.comments.items() | |
| if c.get('submission_id') == submission_id} | |
| return thread | |
| def _add_to_hierarchy(self, hierarchy: Dict, comment_id: str, parent_id: str, comment: Dict): | |
| """Recursively add comment to hierarchy""" | |
| for cid, node in hierarchy.items(): | |
| if cid == parent_id: | |
| if 'replies' not in node: | |
| node['replies'] = {} | |
| node['replies'][comment_id] = comment | |
| node['replies'][comment_id]['replies'] = {} | |
| return True | |
| elif 'replies' in node and node['replies']: | |
| if self._add_to_hierarchy(node['replies'], comment_id, parent_id, comment): | |
| return True | |
| return False | |
| def get_orphan_statistics(self) -> Dict: | |
| """Get statistics about orphaned comments""" | |
| return { | |
| 'total_comments': len(self.comments), | |
| 'orphaned_count': len(self.orphaned_comments), | |
| 'orphan_rate': len(self.orphaned_comments) / max(len(self.comments), 1), | |
| 'orphaned_ids': list(self.orphaned_comments)[:10] # Sample | |
| } | |
| class UserHistoryCollector: | |
| """Collects complete user histories across subreddits""" | |
| def __init__(self, reddit_instance: praw.Reddit, backoff: ExponentialBackoff): | |
| self.reddit = reddit_instance | |
| self.backoff = backoff | |
| self.user_data = {} | |
| self.processed_users = set() | |
| def collect_user_history(self, username: str, limit: int = 1000, | |
| include_comments: bool = True, | |
| include_submissions: bool = True) -> Dict: | |
| """Collect complete history for a user""" | |
| if username in self.processed_users: | |
| return self.user_data.get(username, {}) | |
| user_history = { | |
| 'username': username, | |
| 'submissions': [], | |
| 'comments': [], | |
| 'subreddits': set(), | |
| 'first_activity': None, | |
| 'last_activity': None, | |
| 'total_karma': 0, | |
| 'metadata': {} | |
| } | |
| try: | |
| user = self.reddit.redditor(username) | |
| # Get user metadata | |
| try: | |
| user_history['metadata'] = { | |
| 'created_utc': datetime.fromtimestamp(user.created_utc, tz=pytz.UTC), | |
| 'comment_karma': user.comment_karma, | |
| 'link_karma': user.link_karma, | |
| 'is_gold': user.is_gold if hasattr(user, 'is_gold') else False, | |
| 'is_mod': user.is_mod if hasattr(user, 'is_mod') else False, | |
| 'verified': user.verified if hasattr(user, 'verified') else False | |
| } | |
| user_history['total_karma'] = user.comment_karma + user.link_karma | |
| except Exception: | |
| pass # User metadata not accessible | |
| # Collect submissions | |
| if include_submissions: | |
| for submission in user.submissions.new(limit=limit): | |
| self.backoff.success() # Reset backoff on success | |
| sub_data = { | |
| 'id': f't3_{submission.id}', | |
| 'title': submission.title, | |
| 'subreddit': str(submission.subreddit), | |
| 'created_utc': datetime.fromtimestamp(submission.created_utc, tz=pytz.UTC), | |
| 'score': submission.score, | |
| 'num_comments': submission.num_comments, | |
| 'selftext': submission.selftext[:1000] if submission.selftext else '', | |
| 'url': submission.url, | |
| 'permalink': f"https://reddit.com{submission.permalink}" | |
| } | |
| user_history['submissions'].append(sub_data) | |
| user_history['subreddits'].add(str(submission.subreddit)) | |
| # Track activity timeline | |
| if not user_history['first_activity'] or sub_data['created_utc'] < user_history['first_activity']: | |
| user_history['first_activity'] = sub_data['created_utc'] | |
| if not user_history['last_activity'] or sub_data['created_utc'] > user_history['last_activity']: | |
| user_history['last_activity'] = sub_data['created_utc'] | |
| # Collect comments | |
| if include_comments: | |
| for comment in user.comments.new(limit=limit): | |
| self.backoff.success() # Reset backoff on success | |
| com_data = { | |
| 'id': f't1_{comment.id}', | |
| 'body': comment.body[:1000], | |
| 'subreddit': str(comment.subreddit), | |
| 'submission_id': f't3_{comment.submission.id}' if comment.submission else None, | |
| 'parent_id': comment.parent_id, | |
| 'created_utc': datetime.fromtimestamp(comment.created_utc, tz=pytz.UTC), | |
| 'score': comment.score, | |
| 'permalink': f"https://reddit.com{comment.permalink}" | |
| } | |
| user_history['comments'].append(com_data) | |
| user_history['subreddits'].add(str(comment.subreddit)) | |
| # Track activity timeline | |
| if not user_history['first_activity'] or com_data['created_utc'] < user_history['first_activity']: | |
| user_history['first_activity'] = com_data['created_utc'] | |
| if not user_history['last_activity'] or com_data['created_utc'] > user_history['last_activity']: | |
| user_history['last_activity'] = com_data['created_utc'] | |
| # Convert subreddits set to list for JSON serialization | |
| user_history['subreddits'] = list(user_history['subreddits']) | |
| # Mark as processed | |
| self.processed_users.add(username) | |
| self.user_data[username] = user_history | |
| return user_history | |
| except praw.exceptions.APIException as e: | |
| if e.error_type == "USER_DOESNT_EXIST": | |
| self.processed_users.add(username) | |
| return {'username': username, 'error': 'User does not exist'} | |
| else: | |
| # Rate limited - use exponential backoff | |
| delay = self.backoff.wait() | |
| print(f"Rate limited. Waiting {delay:.2f} seconds...") | |
| return self.collect_user_history(username, limit, include_comments, include_submissions) | |
| except Exception as e: | |
| print(f"Error collecting history for {username}: {e}") | |
| return {'username': username, 'error': str(e)} | |
| def collect_users_from_subreddit(self, subreddit_name: str, | |
| post_limit: int = 100, | |
| user_limit: int = 50) -> List[str]: | |
| """Discover users from a subreddit""" | |
| users = set() | |
| try: | |
| subreddit = self.reddit.subreddit(subreddit_name) | |
| # Get users from hot posts | |
| for submission in subreddit.hot(limit=post_limit): | |
| if submission.author: | |
| users.add(str(submission.author)) | |
| # Get users from comments | |
| submission.comments.replace_more(limit=0) | |
| for comment in submission.comments.list()[:10]: # Limit comments per post | |
| if comment.author: | |
| users.add(str(comment.author)) | |
| if len(users) >= user_limit: | |
| break | |
| return list(users)[:user_limit] | |
| except Exception as e: | |
| print(f"Error discovering users from r/{subreddit_name}: {e}") | |
| return [] | |
| def get_user_network(self, users: List[str]) -> Dict: | |
| """Build interaction network from user histories""" | |
| network = { | |
| 'nodes': [], | |
| 'edges': [], | |
| 'subreddit_overlap': {}, | |
| 'temporal_overlap': {} | |
| } | |
| # Create nodes | |
| for username in users: | |
| if username in self.user_data: | |
| user = self.user_data[username] | |
| network['nodes'].append({ | |
| 'id': username, | |
| 'karma': user.get('total_karma', 0), | |
| 'subreddits': len(user.get('subreddits', [])), | |
| 'submissions': len(user.get('submissions', [])), | |
| 'comments': len(user.get('comments', [])) | |
| }) | |
| # Calculate edges based on subreddit overlap | |
| for i, user1 in enumerate(users): | |
| if user1 not in self.user_data: | |
| continue | |
| for user2 in users[i+1:]: | |
| if user2 not in self.user_data: | |
| continue | |
| subs1 = set(self.user_data[user1].get('subreddits', [])) | |
| subs2 = set(self.user_data[user2].get('subreddits', [])) | |
| overlap = subs1.intersection(subs2) | |
| if overlap: | |
| network['edges'].append({ | |
| 'source': user1, | |
| 'target': user2, | |
| 'weight': len(overlap), | |
| 'subreddits': list(overlap) | |
| }) | |
| return network | |
| class CheckpointManager: | |
| """Manages checkpoint saving and restoration for long-running operations""" | |
| def __init__(self, checkpoint_dir: str = None): | |
| # Use /tmp for HuggingFace Spaces compatibility (read-only filesystem) | |
| if checkpoint_dir is None: | |
| checkpoint_dir = os.environ.get('CHECKPOINT_DIR', '/tmp/checkpoints') | |
| self.checkpoint_dir = Path(checkpoint_dir) | |
| self.checkpoint_dir.mkdir(exist_ok=True, parents=True) | |
| def save_checkpoint(self, state: Dict, checkpoint_name: str): | |
| """Save current state to checkpoint file""" | |
| checkpoint_file = self.checkpoint_dir / f"{checkpoint_name}.pkl" | |
| with open(checkpoint_file, 'wb') as f: | |
| pickle.dump(state, f) | |
| # Also save a JSON version for debugging | |
| json_file = self.checkpoint_dir / f"{checkpoint_name}.json" | |
| json_state = self._make_json_serializable(state) | |
| with open(json_file, 'w') as f: | |
| json.dump(json_state, f, indent=2, default=str) | |
| def load_checkpoint(self, checkpoint_name: str) -> Optional[Dict]: | |
| """Load state from checkpoint file""" | |
| checkpoint_file = self.checkpoint_dir / f"{checkpoint_name}.pkl" | |
| if checkpoint_file.exists(): | |
| with open(checkpoint_file, 'rb') as f: | |
| return pickle.load(f) | |
| return None | |
| def checkpoint_exists(self, checkpoint_name: str) -> bool: | |
| """Check if checkpoint exists""" | |
| checkpoint_file = self.checkpoint_dir / f"{checkpoint_name}.pkl" | |
| return checkpoint_file.exists() | |
| def delete_checkpoint(self, checkpoint_name: str): | |
| """Delete checkpoint files""" | |
| for ext in ['.pkl', '.json']: | |
| checkpoint_file = self.checkpoint_dir / f"{checkpoint_name}{ext}" | |
| if checkpoint_file.exists(): | |
| checkpoint_file.unlink() | |
| def list_checkpoints(self) -> List[str]: | |
| """List all available checkpoints""" | |
| return [f.stem for f in self.checkpoint_dir.glob("*.pkl")] | |
| def _make_json_serializable(self, obj): | |
| """Convert objects to JSON-serializable format""" | |
| if isinstance(obj, dict): | |
| return {k: self._make_json_serializable(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [self._make_json_serializable(item) for item in obj] | |
| elif isinstance(obj, set): | |
| return list(obj) | |
| elif isinstance(obj, (datetime, pd.Timestamp)): | |
| return obj.isoformat() | |
| elif hasattr(obj, '__dict__'): | |
| return str(obj) | |
| else: | |
| return obj | |
| class AdvancedRedditScraper: | |
| """ | |
| Advanced Reddit scraper with all research-grade features: | |
| - Exponential backoff for rate limiting | |
| - Comment hierarchy tracking with parent relationships | |
| - Complete user history collection | |
| - Checkpoint/resume capability | |
| - Database persistence | |
| """ | |
| def __init__(self, client_id: str, client_secret: str, user_agent: str, | |
| db_path: str = None): | |
| """Initialize advanced scraper with all components""" | |
| # Reddit instance | |
| self.reddit = praw.Reddit( | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| user_agent=user_agent, | |
| check_for_async=False | |
| ) | |
| # Components | |
| self.backoff = ExponentialBackoff(base_delay=1.0, max_delay=60.0) | |
| self.hierarchy_tracker = CommentHierarchyTracker() | |
| self.user_collector = UserHistoryCollector(self.reddit, self.backoff) | |
| self.checkpoint_manager = CheckpointManager() | |
| # Database setup - use /tmp for HuggingFace Spaces | |
| if db_path is None: | |
| db_path = os.environ.get('DB_PATH', '/tmp/reddit_data.db') | |
| self.db_path = db_path | |
| self._init_database() | |
| # State tracking | |
| self.state = { | |
| 'processed_submissions': set(), | |
| 'processed_users': set(), | |
| 'failed_items': [], | |
| 'statistics': {} | |
| } | |
| def _init_database(self): | |
| """Initialize SQLite database with proper schema""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create tables | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS submissions ( | |
| id TEXT PRIMARY KEY, | |
| title TEXT, | |
| author TEXT, | |
| subreddit TEXT, | |
| created_utc TIMESTAMP, | |
| score INTEGER, | |
| num_comments INTEGER, | |
| selftext TEXT, | |
| url TEXT, | |
| permalink TEXT, | |
| scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS comments ( | |
| id TEXT PRIMARY KEY, | |
| submission_id TEXT, | |
| parent_id TEXT, | |
| author TEXT, | |
| body TEXT, | |
| created_utc TIMESTAMP, | |
| score INTEGER, | |
| subreddit TEXT, | |
| permalink TEXT, | |
| depth INTEGER, | |
| scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (submission_id) REFERENCES submissions(id) | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS users ( | |
| username TEXT PRIMARY KEY, | |
| created_utc TIMESTAMP, | |
| comment_karma INTEGER, | |
| link_karma INTEGER, | |
| is_gold BOOLEAN, | |
| is_mod BOOLEAN, | |
| verified BOOLEAN, | |
| first_activity TIMESTAMP, | |
| last_activity TIMESTAMP, | |
| total_submissions INTEGER, | |
| total_comments INTEGER, | |
| scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS user_activity ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT, | |
| item_type TEXT, | |
| item_id TEXT, | |
| subreddit TEXT, | |
| created_utc TIMESTAMP, | |
| score INTEGER, | |
| FOREIGN KEY (username) REFERENCES users(username) | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| def scrape_with_hierarchy(self, subreddit_name: str, limit: int = 100, | |
| checkpoint_name: str = None) -> Dict: | |
| """ | |
| Scrape subreddit with full comment hierarchy tracking and checkpointing | |
| """ | |
| # Load checkpoint if exists | |
| if checkpoint_name and self.checkpoint_manager.checkpoint_exists(checkpoint_name): | |
| state = self.checkpoint_manager.load_checkpoint(checkpoint_name) | |
| self.state = state['scraper_state'] | |
| self.hierarchy_tracker = state['hierarchy_tracker'] | |
| start_after = state.get('last_submission_id') | |
| print(f"Resuming from checkpoint: {checkpoint_name}") | |
| else: | |
| start_after = None | |
| results = { | |
| 'submissions': [], | |
| 'comments': [], | |
| 'hierarchies': {}, | |
| 'statistics': {} | |
| } | |
| try: | |
| subreddit = self.reddit.subreddit(subreddit_name) | |
| submission_count = 0 | |
| for submission in subreddit.hot(limit=limit): | |
| # Skip if already processed | |
| if f't3_{submission.id}' in self.state['processed_submissions']: | |
| continue | |
| # Skip until we reach the checkpoint | |
| if start_after and f't3_{submission.id}' != start_after: | |
| continue | |
| elif start_after: | |
| start_after = None # Found checkpoint, continue from here | |
| try: | |
| # Process submission | |
| sub_data = self._process_submission_with_comments(submission) | |
| results['submissions'].append(sub_data['submission']) | |
| results['comments'].extend(sub_data['comments']) | |
| # Track in hierarchy | |
| self.hierarchy_tracker.add_submission( | |
| submission.id, | |
| sub_data['submission'] | |
| ) | |
| for comment in sub_data['comments']: | |
| self.hierarchy_tracker.add_comment( | |
| comment['id'].replace('t1_', ''), | |
| comment['parent_id'], | |
| comment | |
| ) | |
| # Save to database | |
| self._save_to_database(sub_data) | |
| # Update state | |
| self.state['processed_submissions'].add(f't3_{submission.id}') | |
| submission_count += 1 | |
| # Checkpoint every 10 submissions | |
| if checkpoint_name and submission_count % 10 == 0: | |
| self._save_checkpoint(checkpoint_name, f't3_{submission.id}') | |
| # Success - reduce backoff | |
| self.backoff.success() | |
| except praw.exceptions.APIException: | |
| # Rate limited - use exponential backoff | |
| delay = self.backoff.wait() | |
| print(f"Rate limited. Waiting {delay:.2f} seconds...") | |
| except Exception as e: | |
| print(f"Error processing submission {submission.id}: {e}") | |
| self.state['failed_items'].append({ | |
| 'id': f't3_{submission.id}', | |
| 'error': str(e) | |
| }) | |
| except Exception as e: | |
| print(f"Error accessing subreddit {subreddit_name}: {e}") | |
| # Build hierarchies for all submissions | |
| for sub_id in self.state['processed_submissions']: | |
| hierarchy = self.hierarchy_tracker.reconstruct_thread(sub_id) | |
| if hierarchy['submission']: | |
| results['hierarchies'][sub_id] = hierarchy | |
| # Calculate statistics | |
| results['statistics'] = { | |
| 'total_submissions': len(results['submissions']), | |
| 'total_comments': len(results['comments']), | |
| 'orphan_stats': self.hierarchy_tracker.get_orphan_statistics(), | |
| 'failed_items': len(self.state['failed_items']) | |
| } | |
| # Final checkpoint | |
| if checkpoint_name: | |
| self._save_checkpoint(checkpoint_name, None) | |
| return results | |
| def _process_submission_with_comments(self, submission) -> Dict: | |
| """Process a submission with all its comments""" | |
| # Process submission | |
| sub_data = { | |
| 'id': f't3_{submission.id}', | |
| 'title': submission.title, | |
| 'author': str(submission.author) if submission.author else '[deleted]', | |
| 'subreddit': str(submission.subreddit), | |
| 'created_utc': datetime.fromtimestamp(submission.created_utc, tz=pytz.UTC), | |
| 'score': submission.score, | |
| 'num_comments': submission.num_comments, | |
| 'selftext': submission.selftext, | |
| 'url': submission.url, | |
| 'permalink': f"https://reddit.com{submission.permalink}" | |
| } | |
| # Process all comments | |
| comments = [] | |
| submission.comments.replace_more(limit=None) # Get ALL comments | |
| for comment in submission.comments.list(): | |
| com_data = { | |
| 'id': f't1_{comment.id}', | |
| 'submission_id': f't3_{submission.id}', | |
| 'parent_id': comment.parent_id, | |
| 'author': str(comment.author) if comment.author else '[deleted]', | |
| 'body': comment.body, | |
| 'created_utc': datetime.fromtimestamp(comment.created_utc, tz=pytz.UTC), | |
| 'score': comment.score, | |
| 'subreddit': str(submission.subreddit), | |
| 'permalink': f"https://reddit.com{comment.permalink}", | |
| 'depth': comment.depth | |
| } | |
| comments.append(com_data) | |
| return { | |
| 'submission': sub_data, | |
| 'comments': comments | |
| } | |
| def collect_user_histories(self, users: List[str], | |
| checkpoint_name: str = None) -> Dict: | |
| """Collect complete histories for a list of users""" | |
| histories = {} | |
| for i, username in enumerate(users): | |
| print(f"Collecting history for {username} ({i+1}/{len(users)})") | |
| history = self.user_collector.collect_user_history(username) | |
| histories[username] = history | |
| # Save to database | |
| if history and 'error' not in history: | |
| self._save_user_to_database(history) | |
| # Checkpoint every 5 users | |
| if checkpoint_name and (i + 1) % 5 == 0: | |
| self.checkpoint_manager.save_checkpoint({ | |
| 'users_processed': list(histories.keys()), | |
| 'current_index': i | |
| }, checkpoint_name) | |
| return histories | |
| def _save_to_database(self, data: Dict): | |
| """Save submission and comments to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Save submission | |
| sub = data['submission'] | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO submissions | |
| (id, title, author, subreddit, created_utc, score, | |
| num_comments, selftext, url, permalink) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| sub['id'], sub['title'], sub['author'], sub['subreddit'], | |
| sub['created_utc'], sub['score'], sub['num_comments'], | |
| sub['selftext'], sub['url'], sub['permalink'] | |
| )) | |
| # Save comments | |
| for comment in data['comments']: | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO comments | |
| (id, submission_id, parent_id, author, body, created_utc, | |
| score, subreddit, permalink, depth) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| comment['id'], comment['submission_id'], comment['parent_id'], | |
| comment['author'], comment['body'], comment['created_utc'], | |
| comment['score'], comment['subreddit'], comment['permalink'], | |
| comment.get('depth', 0) | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def _save_user_to_database(self, user_history: Dict): | |
| """Save user history to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Save user metadata | |
| metadata = user_history.get('metadata', {}) | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO users | |
| (username, created_utc, comment_karma, link_karma, is_gold, | |
| is_mod, verified, first_activity, last_activity, | |
| total_submissions, total_comments) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| user_history['username'], | |
| metadata.get('created_utc'), | |
| metadata.get('comment_karma', 0), | |
| metadata.get('link_karma', 0), | |
| metadata.get('is_gold', False), | |
| metadata.get('is_mod', False), | |
| metadata.get('verified', False), | |
| user_history.get('first_activity'), | |
| user_history.get('last_activity'), | |
| len(user_history.get('submissions', [])), | |
| len(user_history.get('comments', [])) | |
| )) | |
| # Save user activity | |
| for sub in user_history.get('submissions', []): | |
| cursor.execute(""" | |
| INSERT INTO user_activity | |
| (username, item_type, item_id, subreddit, created_utc, score) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, ( | |
| user_history['username'], 'submission', sub['id'], | |
| sub['subreddit'], sub['created_utc'], sub['score'] | |
| )) | |
| for com in user_history.get('comments', []): | |
| cursor.execute(""" | |
| INSERT INTO user_activity | |
| (username, item_type, item_id, subreddit, created_utc, score) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, ( | |
| user_history['username'], 'comment', com['id'], | |
| com['subreddit'], com['created_utc'], com['score'] | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def _save_checkpoint(self, checkpoint_name: str, last_submission_id: str): | |
| """Save current state to checkpoint""" | |
| checkpoint_data = { | |
| 'scraper_state': self.state, | |
| 'hierarchy_tracker': self.hierarchy_tracker, | |
| 'last_submission_id': last_submission_id, | |
| 'timestamp': datetime.now(pytz.UTC) | |
| } | |
| self.checkpoint_manager.save_checkpoint(checkpoint_data, checkpoint_name) | |
| print(f"Checkpoint saved: {checkpoint_name}") | |
| def get_statistics(self) -> Dict: | |
| """Get comprehensive statistics from database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| stats = {} | |
| # Submission stats | |
| cursor.execute("SELECT COUNT(*) FROM submissions") | |
| stats['total_submissions'] = cursor.fetchone()[0] | |
| # Comment stats | |
| cursor.execute("SELECT COUNT(*) FROM comments") | |
| stats['total_comments'] = cursor.fetchone()[0] | |
| # User stats | |
| cursor.execute("SELECT COUNT(*) FROM users") | |
| stats['total_users'] = cursor.fetchone()[0] | |
| # Orphan analysis | |
| cursor.execute(""" | |
| SELECT COUNT(*) FROM comments | |
| WHERE parent_id NOT LIKE 't3_%' | |
| AND parent_id NOT IN (SELECT id FROM comments) | |
| """) | |
| stats['orphaned_comments'] = cursor.fetchone()[0] | |
| # Subreddit distribution | |
| cursor.execute(""" | |
| SELECT subreddit, COUNT(*) as count | |
| FROM submissions | |
| GROUP BY subreddit | |
| ORDER BY count DESC | |
| LIMIT 10 | |
| """) | |
| stats['top_subreddits'] = cursor.fetchall() | |
| # Temporal range | |
| cursor.execute(""" | |
| SELECT MIN(created_utc), MAX(created_utc) | |
| FROM submissions | |
| """) | |
| time_range = cursor.fetchone() | |
| if time_range[0]: | |
| stats['date_range'] = { | |
| 'first': time_range[0], | |
| 'last': time_range[1] | |
| } | |
| conn.close() | |
| return stats |