Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| from flask_sqlalchemy import SQLAlchemy | |
| from flask_login import UserMixin | |
| from datetime import datetime, timedelta | |
| import math | |
| from sqlalchemy import func, text | |
| import logging | |
| import hashlib | |
| db = SQLAlchemy() | |
| class User(db.Model, UserMixin): | |
| id = db.Column(db.Integer, primary_key=True) | |
| username = db.Column(db.String(100), unique=True, nullable=False) | |
| hf_id = db.Column(db.String(100), unique=True, nullable=False) | |
| join_date = db.Column(db.DateTime, default=datetime.utcnow) | |
| hf_account_created = db.Column(db.DateTime, nullable=True) # HF account creation date | |
| votes = db.relationship("Vote", backref="user", lazy=True) | |
| show_in_leaderboard = db.Column(db.Boolean, default=True) | |
| def __repr__(self): | |
| return f"<User {self.username}>" | |
| class ModelType: | |
| TTS = "tts" | |
| CONVERSATIONAL = "conversational" | |
| class Model(db.Model): | |
| id = db.Column(db.String(100), primary_key=True) | |
| name = db.Column(db.String(100), nullable=False) | |
| model_type = db.Column(db.String(20), nullable=False) # 'tts' or 'conversational' | |
| # Fix ambiguous foreign keys by specifying which foreign key to use | |
| votes = db.relationship( | |
| "Vote", | |
| primaryjoin="or_(Model.id==Vote.model_chosen, Model.id==Vote.model_rejected)", | |
| viewonly=True, | |
| ) | |
| current_elo = db.Column(db.Float, default=1500.0) | |
| win_count = db.Column(db.Integer, default=0) | |
| match_count = db.Column(db.Integer, default=0) | |
| is_open = db.Column(db.Boolean, default=False) | |
| is_active = db.Column( | |
| db.Boolean, default=True | |
| ) # Whether the model is active and can be voted on | |
| model_url = db.Column(db.String(255), nullable=True) | |
| def win_rate(self): | |
| if self.match_count == 0: | |
| return 0 | |
| return (self.win_count / self.match_count) * 100 | |
| def __repr__(self): | |
| return f"<Model {self.name} ({self.model_type})>" | |
| class Vote(db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) | |
| text = db.Column(db.String(1000), nullable=False) | |
| vote_date = db.Column(db.DateTime, default=datetime.utcnow) | |
| model_chosen = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False) | |
| model_rejected = db.Column( | |
| db.String(100), db.ForeignKey("model.id"), nullable=False | |
| ) | |
| model_type = db.Column(db.String(20), nullable=False) # 'tts' or 'conversational' | |
| # New analytics columns - added with temporary checks for migration | |
| session_duration_seconds = db.Column(db.Float, nullable=True) # Time from generation to vote | |
| ip_address_partial = db.Column(db.String(20), nullable=True) # IP with last digits removed | |
| user_agent = db.Column(db.String(500), nullable=True) # Browser/device info | |
| generation_date = db.Column(db.DateTime, nullable=True) # When audio was generated | |
| cache_hit = db.Column(db.Boolean, nullable=True) # Whether generation was from cache | |
| # Sentence origin tracking | |
| sentence_hash = db.Column(db.String(64), nullable=True, index=True) # SHA-256 hash of the sentence | |
| sentence_origin = db.Column(db.String(20), nullable=True) # 'dataset', 'custom', 'unknown' | |
| counts_for_public_leaderboard = db.Column(db.Boolean, default=True) # Whether this vote counts for public leaderboard | |
| chosen = db.relationship( | |
| "Model", | |
| foreign_keys=[model_chosen], | |
| backref=db.backref("chosen_votes", lazy=True), | |
| ) | |
| rejected = db.relationship( | |
| "Model", | |
| foreign_keys=[model_rejected], | |
| backref=db.backref("rejected_votes", lazy=True), | |
| ) | |
| def __repr__(self): | |
| return f"<Vote {self.id}: {self.model_chosen} over {self.model_rejected} ({self.model_type})>" | |
| class EloHistory(db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| model_id = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False) | |
| timestamp = db.Column(db.DateTime, default=datetime.utcnow) | |
| elo_score = db.Column(db.Float, nullable=False) | |
| vote_id = db.Column(db.Integer, db.ForeignKey("vote.id"), nullable=True) | |
| model_type = db.Column(db.String(20), nullable=False) # 'tts' or 'conversational' | |
| model = db.relationship("Model", backref=db.backref("elo_history", lazy=True)) | |
| vote = db.relationship("Vote", backref=db.backref("elo_changes", lazy=True)) | |
| def __repr__(self): | |
| return f"<EloHistory {self.model_id}: {self.elo_score} at {self.timestamp} ({self.model_type})>" | |
| class CoordinatedVotingCampaign(db.Model): | |
| """Log detected coordinated voting campaigns""" | |
| id = db.Column(db.Integer, primary_key=True) | |
| model_id = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False) | |
| model_type = db.Column(db.String(20), nullable=False) | |
| detected_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| time_window_hours = db.Column(db.Integer, nullable=False) # Detection window (e.g., 6 hours) | |
| vote_count = db.Column(db.Integer, nullable=False) # Total votes in the campaign | |
| user_count = db.Column(db.Integer, nullable=False) # Number of users involved | |
| confidence_score = db.Column(db.Float, nullable=False) # 0-1 confidence level | |
| status = db.Column(db.String(20), default='active') # active, resolved, false_positive | |
| admin_notes = db.Column(db.Text, nullable=True) | |
| resolved_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) | |
| resolved_at = db.Column(db.DateTime, nullable=True) | |
| model = db.relationship("Model", backref=db.backref("coordinated_campaigns", lazy=True)) | |
| resolver = db.relationship("User", backref=db.backref("resolved_campaigns", lazy=True)) | |
| def __repr__(self): | |
| return f"<CoordinatedVotingCampaign {self.id}: {self.model_id} ({self.vote_count} votes, {self.user_count} users)>" | |
| class CampaignParticipant(db.Model): | |
| """Track users involved in coordinated voting campaigns""" | |
| id = db.Column(db.Integer, primary_key=True) | |
| campaign_id = db.Column(db.Integer, db.ForeignKey("coordinated_voting_campaign.id"), nullable=False) | |
| user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) | |
| votes_in_campaign = db.Column(db.Integer, nullable=False) | |
| first_vote_at = db.Column(db.DateTime, nullable=False) | |
| last_vote_at = db.Column(db.DateTime, nullable=False) | |
| suspicion_level = db.Column(db.String(20), nullable=False) # low, medium, high | |
| campaign = db.relationship("CoordinatedVotingCampaign", backref=db.backref("participants", lazy=True)) | |
| user = db.relationship("User", backref=db.backref("campaign_participations", lazy=True)) | |
| def __repr__(self): | |
| return f"<CampaignParticipant {self.user_id} in campaign {self.campaign_id} ({self.votes_in_campaign} votes)>" | |
| class UserTimeout(db.Model): | |
| """Track user timeouts/bans for suspicious activity""" | |
| id = db.Column(db.Integer, primary_key=True) | |
| user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) | |
| reason = db.Column(db.String(500), nullable=False) # Reason for timeout | |
| timeout_type = db.Column(db.String(50), nullable=False) # coordinated_voting, rapid_voting, manual, etc. | |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| expires_at = db.Column(db.DateTime, nullable=False) | |
| created_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) # Admin who created timeout | |
| is_active = db.Column(db.Boolean, default=True) | |
| cancelled_at = db.Column(db.DateTime, nullable=True) | |
| cancelled_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) | |
| cancel_reason = db.Column(db.String(500), nullable=True) | |
| # Related campaign if timeout was due to coordinated voting | |
| related_campaign_id = db.Column(db.Integer, db.ForeignKey("coordinated_voting_campaign.id"), nullable=True) | |
| user = db.relationship("User", foreign_keys=[user_id], backref=db.backref("timeouts", lazy=True)) | |
| creator = db.relationship("User", foreign_keys=[created_by], backref=db.backref("created_timeouts", lazy=True)) | |
| canceller = db.relationship("User", foreign_keys=[cancelled_by], backref=db.backref("cancelled_timeouts", lazy=True)) | |
| related_campaign = db.relationship("CoordinatedVotingCampaign", backref=db.backref("resulting_timeouts", lazy=True)) | |
| def is_currently_active(self): | |
| """Check if timeout is currently active""" | |
| if not self.is_active: | |
| return False | |
| return datetime.utcnow() < self.expires_at | |
| def __repr__(self): | |
| return f"<UserTimeout {self.user_id}: {self.timeout_type} until {self.expires_at}>" | |
| class ConsumedSentence(db.Model): | |
| """Track sentences that have been used to ensure each sentence is only used once""" | |
| id = db.Column(db.Integer, primary_key=True) | |
| sentence_hash = db.Column(db.String(64), unique=True, nullable=False, index=True) # SHA-256 hash | |
| sentence_text = db.Column(db.Text, nullable=False) # Store original text for debugging/admin purposes | |
| consumed_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| session_id = db.Column(db.String(100), nullable=True) # Track which session consumed it | |
| usage_type = db.Column(db.String(20), nullable=False) # 'cache', 'direct', 'random' | |
| def __repr__(self): | |
| return f"<ConsumedSentence {self.sentence_hash[:8]}...({self.usage_type})>" | |
| def calculate_elo_change(winner_elo, loser_elo, k_factor=32): | |
| """Calculate Elo rating changes for a match.""" | |
| expected_winner = 1 / (1 + math.pow(10, (loser_elo - winner_elo) / 400)) | |
| expected_loser = 1 / (1 + math.pow(10, (winner_elo - loser_elo) / 400)) | |
| winner_new_elo = winner_elo + k_factor * (1 - expected_winner) | |
| loser_new_elo = loser_elo + k_factor * (0 - expected_loser) | |
| return winner_new_elo, loser_new_elo | |
| def anonymize_ip_address(ip_address): | |
| """ | |
| Remove the last 1-2 octets from an IP address for privacy compliance. | |
| Examples: | |
| - 192.168.1.100 -> 192.168.0.0 | |
| - 2001:db8::1 -> 2001:db8:: | |
| """ | |
| if not ip_address: | |
| return None | |
| try: | |
| if ':' in ip_address: # IPv6 | |
| # Keep first 4 groups, zero out the rest | |
| parts = ip_address.split(':') | |
| if len(parts) >= 4: | |
| return ':'.join(parts[:4]) + '::' | |
| return ip_address | |
| else: # IPv4 | |
| # Keep first 2 octets, zero out last 2 | |
| parts = ip_address.split('.') | |
| if len(parts) == 4: | |
| return f"{parts[0]}.{parts[1]}.0.0" | |
| return ip_address | |
| except Exception: | |
| return None | |
| def record_vote(user_id, text, chosen_model_id, rejected_model_id, model_type, | |
| session_duration=None, ip_address=None, user_agent=None, | |
| generation_date=None, cache_hit=None, all_dataset_sentences=None): | |
| """Record a vote and update Elo ratings.""" | |
| # Determine sentence origin and whether it should count for public leaderboard | |
| sentence_hash = hash_sentence(text) | |
| sentence_origin = 'unknown' | |
| counts_for_public = True # 한국어 TTS Arena: 모든 투표가 public leaderboard에 반영됨 | |
| if all_dataset_sentences and text in all_dataset_sentences: | |
| sentence_origin = 'dataset' | |
| else: | |
| sentence_origin = 'custom' | |
| # 한국어 TTS Arena는 새로 시작하므로 모든 투표를 public leaderboard에 반영 | |
| # Create the vote | |
| vote = Vote( | |
| user_id=user_id, # Required - user must be logged in to vote | |
| text=text, | |
| model_chosen=chosen_model_id, | |
| model_rejected=rejected_model_id, | |
| model_type=model_type, | |
| session_duration_seconds=session_duration, | |
| ip_address_partial=anonymize_ip_address(ip_address), | |
| user_agent=user_agent[:500] if user_agent else None, # Truncate if too long | |
| generation_date=generation_date, | |
| cache_hit=cache_hit, | |
| sentence_hash=sentence_hash, | |
| sentence_origin=sentence_origin, | |
| counts_for_public_leaderboard=counts_for_public, | |
| ) | |
| db.session.add(vote) | |
| db.session.flush() # Get the vote ID without committing | |
| # Get the models | |
| chosen_model = Model.query.filter_by( | |
| id=chosen_model_id, model_type=model_type | |
| ).first() | |
| rejected_model = Model.query.filter_by( | |
| id=rejected_model_id, model_type=model_type | |
| ).first() | |
| if not chosen_model or not rejected_model: | |
| db.session.rollback() | |
| return None, "One or both models not found for the specified model type" | |
| # Only update Elo ratings and public stats if this vote counts for public leaderboard | |
| if counts_for_public: | |
| # Calculate new Elo ratings | |
| new_chosen_elo, new_rejected_elo = calculate_elo_change( | |
| chosen_model.current_elo, rejected_model.current_elo | |
| ) | |
| # Update model stats | |
| chosen_model.current_elo = new_chosen_elo | |
| chosen_model.win_count += 1 | |
| chosen_model.match_count += 1 | |
| rejected_model.current_elo = new_rejected_elo | |
| rejected_model.match_count += 1 | |
| else: | |
| # For votes that don't count for public leaderboard, keep current Elo | |
| new_chosen_elo = chosen_model.current_elo | |
| new_rejected_elo = rejected_model.current_elo | |
| # Record Elo history | |
| chosen_history = EloHistory( | |
| model_id=chosen_model_id, | |
| elo_score=new_chosen_elo, | |
| vote_id=vote.id, | |
| model_type=model_type, | |
| ) | |
| rejected_history = EloHistory( | |
| model_id=rejected_model_id, | |
| elo_score=new_rejected_elo, | |
| vote_id=vote.id, | |
| model_type=model_type, | |
| ) | |
| db.session.add_all([chosen_history, rejected_history]) | |
| db.session.commit() | |
| return vote, None | |
| import re | |
| def categorize_text(text): | |
| """ | |
| 텍스트를 카테고리로 분류 | |
| Returns: list of categories the text belongs to | |
| """ | |
| categories = ['all'] # 모든 텍스트는 'all'에 포함 | |
| # 한영혼합: 알파벳 5글자 이상 | |
| alphabet_count = len(re.findall(r'[a-zA-Z]', text)) | |
| if alphabet_count >= 5: | |
| categories.append('mixed_lang') | |
| # 숫자혼합: 숫자 2개 이상 | |
| digit_count = len(re.findall(r'\d', text)) | |
| if digit_count >= 2: | |
| categories.append('with_numbers') | |
| # 긴문장: 50글자 이상 | |
| if len(text) >= 50: | |
| categories.append('long_text') | |
| return categories | |
| def get_leaderboard_data(model_type, category='all'): | |
| """ | |
| Get leaderboard data for the specified model type and category. | |
| Only includes votes that count for the public leaderboard. | |
| Only shows active models. | |
| Args: | |
| model_type (str): The model type ('tts' or 'conversational') | |
| category (str): Category filter ('all', 'mixed_lang', 'with_numbers', 'long_text') | |
| Returns: | |
| list: List of dictionaries containing model data for the leaderboard | |
| """ | |
| # 활성 모델 가져오기 | |
| active_models = Model.query.filter_by(model_type=model_type, is_active=True).all() | |
| if category == 'all': | |
| # 기존 로직: Model 테이블의 집계 데이터 사용 | |
| models = [m for m in active_models if m.match_count > 0] | |
| models.sort(key=lambda x: x.current_elo, reverse=True) | |
| result = [] | |
| for rank, model in enumerate(models, 1): | |
| tier = "tier-s" if rank <= 2 else "tier-a" if rank <= 4 else "tier-b" if rank <= 7 else "" | |
| result.append({ | |
| "rank": rank, | |
| "id": model.id, | |
| "name": model.name, | |
| "model_url": model.model_url, | |
| "win_rate": f"{model.win_rate:.0f}%", | |
| "total_votes": model.match_count, | |
| "elo": int(model.current_elo), | |
| "tier": tier, | |
| "is_open": model.is_open, | |
| }) | |
| return result | |
| # 카테고리별 필터링: Vote 테이블에서 직접 계산 | |
| active_model_ids = {m.id for m in active_models} | |
| # 해당 카테고리에 맞는 투표 가져오기 | |
| all_votes = Vote.query.filter( | |
| Vote.model_type == model_type, | |
| Vote.counts_for_public_leaderboard == True | |
| ).all() | |
| # 카테고리에 맞는 투표만 필터링 | |
| filtered_votes = [] | |
| for vote in all_votes: | |
| text_categories = categorize_text(vote.text) | |
| if category in text_categories: | |
| filtered_votes.append(vote) | |
| # 모델별 승/패 계산 | |
| model_stats = {m.id: {"wins": 0, "matches": 0, "model": m} for m in active_models} | |
| for vote in filtered_votes: | |
| if vote.model_chosen in active_model_ids: | |
| model_stats[vote.model_chosen]["wins"] += 1 | |
| model_stats[vote.model_chosen]["matches"] += 1 | |
| if vote.model_rejected in active_model_ids: | |
| model_stats[vote.model_rejected]["matches"] += 1 | |
| # 승률 계산 및 정렬 | |
| result = [] | |
| for model_id, stats in model_stats.items(): | |
| if stats["matches"] > 0: | |
| win_rate = (stats["wins"] / stats["matches"]) * 100 | |
| result.append({ | |
| "id": model_id, | |
| "name": stats["model"].name, | |
| "model_url": stats["model"].model_url, | |
| "win_rate": f"{win_rate:.0f}%", | |
| "total_votes": stats["matches"], | |
| "elo": int(1500 + (win_rate - 50) * 10), # 간단한 ELO 추정 | |
| "is_open": stats["model"].is_open, | |
| "_win_rate_num": win_rate, | |
| }) | |
| # 승률로 정렬 | |
| result.sort(key=lambda x: x["_win_rate_num"], reverse=True) | |
| # 랭크와 티어 추가 | |
| for rank, item in enumerate(result, 1): | |
| item["rank"] = rank | |
| item["tier"] = "tier-s" if rank <= 2 else "tier-a" if rank <= 4 else "tier-b" if rank <= 7 else "" | |
| del item["_win_rate_num"] | |
| return result | |
| def get_user_leaderboard(user_id, model_type): | |
| """ | |
| Get personalized leaderboard data for a specific user. | |
| Includes ALL votes (both dataset and custom sentences). | |
| Args: | |
| user_id (int): The user ID | |
| model_type (str): The model type ('tts' or 'conversational') | |
| Returns: | |
| list: List of dictionaries containing model data for the user's personal leaderboard | |
| """ | |
| # Get all models of the specified type | |
| models = Model.query.filter_by(model_type=model_type).all() | |
| # Get user's votes (includes both public and custom sentence votes) | |
| user_votes = Vote.query.filter_by(user_id=user_id, model_type=model_type).all() | |
| # Calculate win counts and match counts for each model based on user's votes | |
| model_stats = {model.id: {"wins": 0, "matches": 0} for model in models} | |
| for vote in user_votes: | |
| model_stats[vote.model_chosen]["wins"] += 1 | |
| model_stats[vote.model_chosen]["matches"] += 1 | |
| model_stats[vote.model_rejected]["matches"] += 1 | |
| # Calculate win rates and prepare result | |
| result = [] | |
| for model in models: | |
| stats = model_stats[model.id] | |
| win_rate = ( | |
| (stats["wins"] / stats["matches"] * 100) if stats["matches"] > 0 else 0 | |
| ) | |
| # Only include models the user has voted on | |
| if stats["matches"] > 0: | |
| result.append( | |
| { | |
| "id": model.id, | |
| "name": model.name, | |
| "model_url": model.model_url, | |
| "win_rate": f"{win_rate:.0f}%", | |
| "total_votes": stats["matches"], | |
| "wins": stats["wins"], | |
| "is_open": model.is_open, | |
| } | |
| ) | |
| # Sort by win rate descending | |
| result.sort(key=lambda x: float(x["win_rate"].rstrip("%")), reverse=True) | |
| # Add rank | |
| for i, item in enumerate(result, 1): | |
| item["rank"] = i | |
| return result | |
| def get_historical_leaderboard_data(model_type, target_date=None): | |
| """ | |
| Get leaderboard data at a specific date in history. | |
| Args: | |
| model_type (str): The model type ('tts' or 'conversational') | |
| target_date (datetime): The target date for historical data, defaults to current time | |
| Returns: | |
| list: List of dictionaries containing model data for the historical leaderboard | |
| """ | |
| if not target_date: | |
| target_date = datetime.utcnow() | |
| # Get all models of the specified type | |
| models = Model.query.filter_by(model_type=model_type).all() | |
| # Create a result list for the models | |
| result = [] | |
| for model in models: | |
| # Get the most recent EloHistory entry for each model before the target date | |
| elo_entry = ( | |
| EloHistory.query.filter( | |
| EloHistory.model_id == model.id, | |
| EloHistory.model_type == model_type, | |
| EloHistory.timestamp <= target_date, | |
| ) | |
| .order_by(EloHistory.timestamp.desc()) | |
| .first() | |
| ) | |
| # Skip models that have no history before the target date | |
| if not elo_entry: | |
| continue | |
| # Count wins and matches up to the target date (only public leaderboard votes) | |
| match_count = Vote.query.filter( | |
| db.or_(Vote.model_chosen == model.id, Vote.model_rejected == model.id), | |
| Vote.model_type == model_type, | |
| Vote.vote_date <= target_date, | |
| Vote.counts_for_public_leaderboard == True, | |
| ).count() | |
| win_count = Vote.query.filter( | |
| Vote.model_chosen == model.id, | |
| Vote.model_type == model_type, | |
| Vote.vote_date <= target_date, | |
| Vote.counts_for_public_leaderboard == True, | |
| ).count() | |
| # Calculate win rate | |
| win_rate = (win_count / match_count * 100) if match_count > 0 else 0 | |
| # Add to result | |
| result.append( | |
| { | |
| "id": model.id, | |
| "name": model.name, | |
| "model_url": model.model_url, | |
| "win_rate": f"{win_rate:.0f}%", | |
| "total_votes": match_count, | |
| "elo": int(elo_entry.elo_score), | |
| "is_open": model.is_open, | |
| } | |
| ) | |
| # Sort by ELO score descending | |
| result.sort(key=lambda x: x["elo"], reverse=True) | |
| # Add rank and tier | |
| for i, item in enumerate(result, 1): | |
| item["rank"] = i | |
| # Determine tier based on rank | |
| if i <= 2: | |
| item["tier"] = "tier-s" | |
| elif i <= 4: | |
| item["tier"] = "tier-a" | |
| elif i <= 7: | |
| item["tier"] = "tier-b" | |
| else: | |
| item["tier"] = "" | |
| return result | |
| def get_key_historical_dates(model_type): | |
| """ | |
| Get a list of key dates in the leaderboard history. | |
| Args: | |
| model_type (str): The model type ('tts' or 'conversational') | |
| Returns: | |
| list: List of datetime objects representing key dates | |
| """ | |
| # Get first and most recent vote dates | |
| first_vote = ( | |
| Vote.query.filter_by(model_type=model_type) | |
| .order_by(Vote.vote_date.asc()) | |
| .first() | |
| ) | |
| last_vote = ( | |
| Vote.query.filter_by(model_type=model_type) | |
| .order_by(Vote.vote_date.desc()) | |
| .first() | |
| ) | |
| if not first_vote or not last_vote: | |
| return [] | |
| # Generate a list of key dates - first day of each month between the first and last vote | |
| dates = [] | |
| current_date = first_vote.vote_date.replace(day=1) | |
| end_date = last_vote.vote_date | |
| while current_date <= end_date: | |
| dates.append(current_date) | |
| # Move to next month | |
| if current_date.month == 12: | |
| current_date = current_date.replace(year=current_date.year + 1, month=1) | |
| else: | |
| current_date = current_date.replace(month=current_date.month + 1) | |
| # Add latest date | |
| if dates and dates[-1].month != end_date.month or dates[-1].year != end_date.year: | |
| dates.append(end_date) | |
| return dates | |
| def insert_initial_models(): | |
| """Insert initial models into the database (한국어 TTS 전용).""" | |
| import os | |
| # 환경 변수로 API 키 확인하여 활성화 여부 결정 | |
| has_openai = bool(os.getenv("OPENAI_API_KEY")) | |
| has_elevenlabs = bool(os.getenv("ELEVENLABS_API_KEY")) | |
| has_google = bool(os.getenv("GOOGLE_API_KEY")) | |
| has_supertone = bool(os.getenv("SUPERTONE_API_KEY")) | |
| has_clova = bool(os.getenv("CLOVA_CLIENT_ID") and os.getenv("CLOVA_API_KEY")) | |
| has_humelo = bool(os.getenv("HUMELO_API_KEY")) | |
| has_typecast = bool(os.getenv("TYPECAST_API_KEY")) | |
| # Gemini TTS는 서비스 계정 JSON이 필요 (API Key 미지원) | |
| has_gemini_tts = bool(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")) | |
| tts_models = [ | |
| # 채널톡 TTS (한국어 특화) - 항상 활성화 | |
| Model( | |
| id="channel-hana", | |
| name="채널톡 하나", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=True, | |
| model_url="https://channel.io/", | |
| ), | |
| # ElevenLabs (다국어 지원) - API 키 있을 때만 활성화 | |
| Model( | |
| id="eleven-multilingual-v2", | |
| name="ElevenLabs Multilingual v2", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=has_elevenlabs, | |
| model_url="https://elevenlabs.io/", | |
| ), | |
| # OpenAI TTS (gpt-4o-mini-tts) - API 키 있을 때만 활성화 | |
| Model( | |
| id="openai-gpt-4o-mini-tts", | |
| name="OpenAI GPT-4o Mini TTS", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=has_openai, | |
| model_url="https://platform.openai.com/docs/guides/text-to-speech", | |
| ), | |
| # Google Cloud TTS - 비활성화 (Gemini TTS 사용) | |
| Model( | |
| id="google-wavenet", | |
| name="Google Wavenet (ko-KR)", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=False, # Gemini TTS로 대체 | |
| model_url="https://cloud.google.com/text-to-speech", | |
| ), | |
| Model( | |
| id="google-neural2", | |
| name="Google Neural2 (ko-KR)", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=False, # Gemini TTS로 대체 | |
| model_url="https://cloud.google.com/text-to-speech", | |
| ), | |
| # CLOVA TTS (네이버 클라우드 - 한국어 특화) - API 키 있을 때만 활성화 | |
| Model( | |
| id="clova-nara", | |
| name="CLOVA Voice (나라)", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=has_clova, | |
| model_url="https://clova.ai/", | |
| ), | |
| # Supertone TTS (한국어 특화) - API 키 있을 때만 활성화 | |
| Model( | |
| id="supertone-sona", | |
| name="Supertone Sona", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=has_supertone, | |
| model_url="https://supertone.ai/", | |
| ), | |
| # Humelo DIVE TTS (한국어 특화) - API 키 있을 때만 활성화 | |
| Model( | |
| id="humelo-sia", | |
| name="Humelo DIVE (리아)", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=has_humelo, | |
| model_url="https://humelo.com/", | |
| ), | |
| # Typecast TTS v3.0 (한국어 특화) - 자인만 활성화 | |
| Model( | |
| id="typecast-jaesun", | |
| name="Typecast v3 (재선)", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=False, # 비활성화 | |
| model_url="https://typecast.ai/", | |
| ), | |
| Model( | |
| id="typecast-jain", | |
| name="Typecast v3 (자인)", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=has_typecast, # 자인만 활성화 | |
| model_url="https://typecast.ai/", | |
| ), | |
| # Legacy Typecast - 비활성화 (v3.0으로 대체) | |
| Model( | |
| id="typecast-geumhee", | |
| name="Typecast (GeumHee) [Legacy]", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=False, # 비활성화 | |
| model_url="https://typecast.ai/", | |
| ), | |
| # Gemini TTS (Google Cloud - 다국어 지원) - 서비스 계정 JSON 필요 | |
| Model( | |
| id="gemini-tts-aoede", | |
| name="Gemini TTS (Aoede)", | |
| model_type=ModelType.TTS, | |
| is_open=False, | |
| is_active=has_gemini_tts, | |
| model_url="https://cloud.google.com/text-to-speech/docs/gemini-tts", | |
| ), | |
| ] | |
| for model in tts_models: | |
| existing = Model.query.filter_by( | |
| id=model.id, model_type=model.model_type | |
| ).first() | |
| if not existing: | |
| db.session.add(model) | |
| else: | |
| # Update model attributes if they've changed, but preserve other data | |
| existing.name = model.name | |
| existing.is_open = model.is_open | |
| existing.model_url = model.model_url | |
| if model.is_active is not None: | |
| existing.is_active = model.is_active | |
| db.session.commit() | |
| # Deactivate legacy Typecast model (JaeYi) if it still exists | |
| legacy_typecast = Model.query.filter_by(id="typecast-jaeyi").first() | |
| if legacy_typecast and legacy_typecast.is_active: | |
| legacy_typecast.is_active = False | |
| db.session.commit() | |
| def get_top_voters(limit=10): | |
| """ | |
| Get the top voters by number of votes. | |
| Args: | |
| limit (int): Number of users to return | |
| Returns: | |
| list: List of dictionaries containing user data and vote counts | |
| """ | |
| # Query users who have opted in to the leaderboard and have at least one vote | |
| top_users = db.session.query( | |
| User, func.count(Vote.id).label('vote_count') | |
| ).join(Vote).filter( | |
| User.show_in_leaderboard == True | |
| ).group_by(User.id).order_by( | |
| func.count(Vote.id).desc() | |
| ).limit(limit).all() | |
| result = [] | |
| for i, (user, vote_count) in enumerate(top_users, 1): | |
| result.append({ | |
| "rank": i, | |
| "username": user.username, | |
| "vote_count": vote_count, | |
| "join_date": user.join_date.strftime("%b %d, %Y") | |
| }) | |
| return result | |
| def get_voting_statistics(): | |
| """ | |
| Get voting statistics for the arena. | |
| Returns: | |
| dict: Dictionary containing voting statistics | |
| """ | |
| # Total number of unique voters | |
| total_voters = db.session.query(func.count(func.distinct(Vote.user_id))).scalar() or 0 | |
| # Total number of votes | |
| total_votes = Vote.query.count() | |
| # Total evaluation time in seconds (sum of session durations) | |
| total_eval_seconds = db.session.query(func.sum(Vote.session_duration_seconds)).scalar() or 0 | |
| # Convert to hours, minutes format | |
| total_eval_hours = int(total_eval_seconds // 3600) | |
| total_eval_minutes = int((total_eval_seconds % 3600) // 60) | |
| # Total characters evaluated (sum of text lengths) | |
| total_characters = db.session.query(func.sum(func.length(Vote.text))).scalar() or 0 | |
| # Votes in the last 24 hours | |
| yesterday = datetime.utcnow() - timedelta(days=1) | |
| votes_last_24h = Vote.query.filter(Vote.vote_date >= yesterday).count() | |
| # Votes in the last 7 days | |
| last_week = datetime.utcnow() - timedelta(days=7) | |
| votes_last_7d = Vote.query.filter(Vote.vote_date >= last_week).count() | |
| # Total number of active models | |
| active_models = Model.query.filter_by(model_type=ModelType.TTS, is_active=True).count() | |
| return { | |
| "total_voters": total_voters, | |
| "total_votes": total_votes, | |
| "total_eval_hours": total_eval_hours, | |
| "total_eval_minutes": total_eval_minutes, | |
| "total_characters": total_characters, | |
| "votes_last_24h": votes_last_24h, | |
| "votes_last_7d": votes_last_7d, | |
| "active_models": active_models, | |
| } | |
| def toggle_user_leaderboard_visibility(user_id): | |
| """Toggle user's leaderboard visibility setting""" | |
| user = User.query.get(user_id) | |
| if not user: | |
| return None | |
| user.show_in_leaderboard = not user.show_in_leaderboard | |
| db.session.commit() | |
| return user.show_in_leaderboard | |
| def check_user_timeout(user_id): | |
| """Check if a user is currently timed out""" | |
| if not user_id: | |
| return False, None | |
| active_timeout = UserTimeout.query.filter_by( | |
| user_id=user_id, | |
| is_active=True | |
| ).filter( | |
| UserTimeout.expires_at > datetime.utcnow() | |
| ).order_by(UserTimeout.expires_at.desc()).first() | |
| return active_timeout is not None, active_timeout | |
| def create_user_timeout(user_id, reason, timeout_type, duration_days, created_by=None, related_campaign_id=None): | |
| """Create a new user timeout""" | |
| expires_at = datetime.utcnow() + timedelta(days=duration_days) | |
| timeout = UserTimeout( | |
| user_id=user_id, | |
| reason=reason, | |
| timeout_type=timeout_type, | |
| expires_at=expires_at, | |
| created_by=created_by, | |
| related_campaign_id=related_campaign_id | |
| ) | |
| db.session.add(timeout) | |
| db.session.commit() | |
| return timeout | |
| def cancel_user_timeout(timeout_id, cancelled_by, cancel_reason): | |
| """Cancel an active timeout""" | |
| timeout = UserTimeout.query.get(timeout_id) | |
| if not timeout: | |
| return False, "Timeout not found" | |
| timeout.is_active = False | |
| timeout.cancelled_at = datetime.utcnow() | |
| timeout.cancelled_by = cancelled_by | |
| timeout.cancel_reason = cancel_reason | |
| db.session.commit() | |
| return True, "Timeout cancelled successfully" | |
| def log_coordinated_campaign(model_id, model_type, vote_count, user_count, | |
| time_window_hours, confidence_score, participants_data): | |
| """Log a detected coordinated voting campaign""" | |
| campaign = CoordinatedVotingCampaign( | |
| model_id=model_id, | |
| model_type=model_type, | |
| time_window_hours=time_window_hours, | |
| vote_count=vote_count, | |
| user_count=user_count, | |
| confidence_score=confidence_score | |
| ) | |
| db.session.add(campaign) | |
| db.session.flush() # Get campaign ID | |
| # Add participants | |
| for participant_data in participants_data: | |
| participant = CampaignParticipant( | |
| campaign_id=campaign.id, | |
| user_id=participant_data['user_id'], | |
| votes_in_campaign=participant_data['votes_in_campaign'], | |
| first_vote_at=participant_data['first_vote_at'], | |
| last_vote_at=participant_data['last_vote_at'], | |
| suspicion_level=participant_data['suspicion_level'] | |
| ) | |
| db.session.add(participant) | |
| db.session.commit() | |
| return campaign | |
| def get_user_timeouts(user_id=None, active_only=True, limit=50): | |
| """Get user timeouts with optional filtering""" | |
| query = UserTimeout.query | |
| if user_id: | |
| query = query.filter_by(user_id=user_id) | |
| if active_only: | |
| query = query.filter_by(is_active=True).filter( | |
| UserTimeout.expires_at > datetime.utcnow() | |
| ) | |
| return query.order_by(UserTimeout.created_at.desc()).limit(limit).all() | |
| def get_coordinated_campaigns(status=None, limit=50): | |
| """Get coordinated voting campaigns with optional status filtering""" | |
| query = CoordinatedVotingCampaign.query | |
| if status: | |
| query = query.filter_by(status=status) | |
| return query.order_by(CoordinatedVotingCampaign.detected_at.desc()).limit(limit).all() | |
| def resolve_campaign(campaign_id, resolved_by, status, admin_notes=None): | |
| """Mark a campaign as resolved""" | |
| campaign = CoordinatedVotingCampaign.query.get(campaign_id) | |
| if not campaign: | |
| return False, "Campaign not found" | |
| campaign.status = status | |
| campaign.resolved_by = resolved_by | |
| campaign.resolved_at = datetime.utcnow() | |
| if admin_notes: | |
| campaign.admin_notes = admin_notes | |
| db.session.commit() | |
| return True, "Campaign resolved successfully" | |
| def hash_sentence(sentence_text): | |
| """Generate a SHA-256 hash for a sentence""" | |
| return hashlib.sha256(sentence_text.strip().encode('utf-8')).hexdigest() | |
| def is_sentence_consumed(sentence_text): | |
| """Check if a sentence has already been consumed""" | |
| sentence_hash = hash_sentence(sentence_text) | |
| return ConsumedSentence.query.filter_by(sentence_hash=sentence_hash).first() is not None | |
| def mark_sentence_consumed(sentence_text, session_id=None, usage_type='direct'): | |
| """Mark a sentence as consumed""" | |
| sentence_hash = hash_sentence(sentence_text) | |
| # Check if already consumed | |
| existing = ConsumedSentence.query.filter_by(sentence_hash=sentence_hash).first() | |
| if existing: | |
| return existing # Already consumed | |
| consumed_sentence = ConsumedSentence( | |
| sentence_hash=sentence_hash, | |
| sentence_text=sentence_text, | |
| session_id=session_id, | |
| usage_type=usage_type | |
| ) | |
| db.session.add(consumed_sentence) | |
| db.session.commit() | |
| return consumed_sentence | |
| def get_unconsumed_sentences(sentence_pool): | |
| """Filter a list of sentences to only include unconsumed ones""" | |
| if not sentence_pool: | |
| return [] | |
| # Get all consumed sentence hashes | |
| consumed_hashes = set( | |
| row[0] for row in db.session.query(ConsumedSentence.sentence_hash).all() | |
| ) | |
| # Filter out consumed sentences | |
| unconsumed = [] | |
| for sentence in sentence_pool: | |
| if hash_sentence(sentence) not in consumed_hashes: | |
| unconsumed.append(sentence) | |
| return unconsumed | |
| def get_consumed_sentences_count(): | |
| """Get the total count of consumed sentences""" | |
| return ConsumedSentence.query.count() | |
| def get_random_unconsumed_sentence(sentence_pool): | |
| """Get a random unconsumed sentence from the pool""" | |
| unconsumed = get_unconsumed_sentences(sentence_pool) | |
| if not unconsumed: | |
| return None | |
| import random | |
| return random.choice(unconsumed) | |