Ko-TTS-Arena / models.py
Ko-TTS-Arena Contributors
feat: Add category filters (한영혼합, 숫자혼합, 긴문장) to leaderboard
e1cb33d
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from datetime import datetime, timedelta
import math
from sqlalchemy import func, text
import logging
import hashlib
db = SQLAlchemy()
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), unique=True, nullable=False)
hf_id = db.Column(db.String(100), unique=True, nullable=False)
join_date = db.Column(db.DateTime, default=datetime.utcnow)
hf_account_created = db.Column(db.DateTime, nullable=True) # HF account creation date
votes = db.relationship("Vote", backref="user", lazy=True)
show_in_leaderboard = db.Column(db.Boolean, default=True)
def __repr__(self):
return f"<User {self.username}>"
class ModelType:
TTS = "tts"
CONVERSATIONAL = "conversational"
class Model(db.Model):
id = db.Column(db.String(100), primary_key=True)
name = db.Column(db.String(100), nullable=False)
model_type = db.Column(db.String(20), nullable=False) # 'tts' or 'conversational'
# Fix ambiguous foreign keys by specifying which foreign key to use
votes = db.relationship(
"Vote",
primaryjoin="or_(Model.id==Vote.model_chosen, Model.id==Vote.model_rejected)",
viewonly=True,
)
current_elo = db.Column(db.Float, default=1500.0)
win_count = db.Column(db.Integer, default=0)
match_count = db.Column(db.Integer, default=0)
is_open = db.Column(db.Boolean, default=False)
is_active = db.Column(
db.Boolean, default=True
) # Whether the model is active and can be voted on
model_url = db.Column(db.String(255), nullable=True)
@property
def win_rate(self):
if self.match_count == 0:
return 0
return (self.win_count / self.match_count) * 100
def __repr__(self):
return f"<Model {self.name} ({self.model_type})>"
class Vote(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
text = db.Column(db.String(1000), nullable=False)
vote_date = db.Column(db.DateTime, default=datetime.utcnow)
model_chosen = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False)
model_rejected = db.Column(
db.String(100), db.ForeignKey("model.id"), nullable=False
)
model_type = db.Column(db.String(20), nullable=False) # 'tts' or 'conversational'
# New analytics columns - added with temporary checks for migration
session_duration_seconds = db.Column(db.Float, nullable=True) # Time from generation to vote
ip_address_partial = db.Column(db.String(20), nullable=True) # IP with last digits removed
user_agent = db.Column(db.String(500), nullable=True) # Browser/device info
generation_date = db.Column(db.DateTime, nullable=True) # When audio was generated
cache_hit = db.Column(db.Boolean, nullable=True) # Whether generation was from cache
# Sentence origin tracking
sentence_hash = db.Column(db.String(64), nullable=True, index=True) # SHA-256 hash of the sentence
sentence_origin = db.Column(db.String(20), nullable=True) # 'dataset', 'custom', 'unknown'
counts_for_public_leaderboard = db.Column(db.Boolean, default=True) # Whether this vote counts for public leaderboard
chosen = db.relationship(
"Model",
foreign_keys=[model_chosen],
backref=db.backref("chosen_votes", lazy=True),
)
rejected = db.relationship(
"Model",
foreign_keys=[model_rejected],
backref=db.backref("rejected_votes", lazy=True),
)
def __repr__(self):
return f"<Vote {self.id}: {self.model_chosen} over {self.model_rejected} ({self.model_type})>"
class EloHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
model_id = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
elo_score = db.Column(db.Float, nullable=False)
vote_id = db.Column(db.Integer, db.ForeignKey("vote.id"), nullable=True)
model_type = db.Column(db.String(20), nullable=False) # 'tts' or 'conversational'
model = db.relationship("Model", backref=db.backref("elo_history", lazy=True))
vote = db.relationship("Vote", backref=db.backref("elo_changes", lazy=True))
def __repr__(self):
return f"<EloHistory {self.model_id}: {self.elo_score} at {self.timestamp} ({self.model_type})>"
class CoordinatedVotingCampaign(db.Model):
"""Log detected coordinated voting campaigns"""
id = db.Column(db.Integer, primary_key=True)
model_id = db.Column(db.String(100), db.ForeignKey("model.id"), nullable=False)
model_type = db.Column(db.String(20), nullable=False)
detected_at = db.Column(db.DateTime, default=datetime.utcnow)
time_window_hours = db.Column(db.Integer, nullable=False) # Detection window (e.g., 6 hours)
vote_count = db.Column(db.Integer, nullable=False) # Total votes in the campaign
user_count = db.Column(db.Integer, nullable=False) # Number of users involved
confidence_score = db.Column(db.Float, nullable=False) # 0-1 confidence level
status = db.Column(db.String(20), default='active') # active, resolved, false_positive
admin_notes = db.Column(db.Text, nullable=True)
resolved_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
resolved_at = db.Column(db.DateTime, nullable=True)
model = db.relationship("Model", backref=db.backref("coordinated_campaigns", lazy=True))
resolver = db.relationship("User", backref=db.backref("resolved_campaigns", lazy=True))
def __repr__(self):
return f"<CoordinatedVotingCampaign {self.id}: {self.model_id} ({self.vote_count} votes, {self.user_count} users)>"
class CampaignParticipant(db.Model):
"""Track users involved in coordinated voting campaigns"""
id = db.Column(db.Integer, primary_key=True)
campaign_id = db.Column(db.Integer, db.ForeignKey("coordinated_voting_campaign.id"), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False)
votes_in_campaign = db.Column(db.Integer, nullable=False)
first_vote_at = db.Column(db.DateTime, nullable=False)
last_vote_at = db.Column(db.DateTime, nullable=False)
suspicion_level = db.Column(db.String(20), nullable=False) # low, medium, high
campaign = db.relationship("CoordinatedVotingCampaign", backref=db.backref("participants", lazy=True))
user = db.relationship("User", backref=db.backref("campaign_participations", lazy=True))
def __repr__(self):
return f"<CampaignParticipant {self.user_id} in campaign {self.campaign_id} ({self.votes_in_campaign} votes)>"
class UserTimeout(db.Model):
"""Track user timeouts/bans for suspicious activity"""
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False)
reason = db.Column(db.String(500), nullable=False) # Reason for timeout
timeout_type = db.Column(db.String(50), nullable=False) # coordinated_voting, rapid_voting, manual, etc.
created_at = db.Column(db.DateTime, default=datetime.utcnow)
expires_at = db.Column(db.DateTime, nullable=False)
created_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) # Admin who created timeout
is_active = db.Column(db.Boolean, default=True)
cancelled_at = db.Column(db.DateTime, nullable=True)
cancelled_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
cancel_reason = db.Column(db.String(500), nullable=True)
# Related campaign if timeout was due to coordinated voting
related_campaign_id = db.Column(db.Integer, db.ForeignKey("coordinated_voting_campaign.id"), nullable=True)
user = db.relationship("User", foreign_keys=[user_id], backref=db.backref("timeouts", lazy=True))
creator = db.relationship("User", foreign_keys=[created_by], backref=db.backref("created_timeouts", lazy=True))
canceller = db.relationship("User", foreign_keys=[cancelled_by], backref=db.backref("cancelled_timeouts", lazy=True))
related_campaign = db.relationship("CoordinatedVotingCampaign", backref=db.backref("resulting_timeouts", lazy=True))
def is_currently_active(self):
"""Check if timeout is currently active"""
if not self.is_active:
return False
return datetime.utcnow() < self.expires_at
def __repr__(self):
return f"<UserTimeout {self.user_id}: {self.timeout_type} until {self.expires_at}>"
class ConsumedSentence(db.Model):
"""Track sentences that have been used to ensure each sentence is only used once"""
id = db.Column(db.Integer, primary_key=True)
sentence_hash = db.Column(db.String(64), unique=True, nullable=False, index=True) # SHA-256 hash
sentence_text = db.Column(db.Text, nullable=False) # Store original text for debugging/admin purposes
consumed_at = db.Column(db.DateTime, default=datetime.utcnow)
session_id = db.Column(db.String(100), nullable=True) # Track which session consumed it
usage_type = db.Column(db.String(20), nullable=False) # 'cache', 'direct', 'random'
def __repr__(self):
return f"<ConsumedSentence {self.sentence_hash[:8]}...({self.usage_type})>"
def calculate_elo_change(winner_elo, loser_elo, k_factor=32):
"""Calculate Elo rating changes for a match."""
expected_winner = 1 / (1 + math.pow(10, (loser_elo - winner_elo) / 400))
expected_loser = 1 / (1 + math.pow(10, (winner_elo - loser_elo) / 400))
winner_new_elo = winner_elo + k_factor * (1 - expected_winner)
loser_new_elo = loser_elo + k_factor * (0 - expected_loser)
return winner_new_elo, loser_new_elo
def anonymize_ip_address(ip_address):
"""
Remove the last 1-2 octets from an IP address for privacy compliance.
Examples:
- 192.168.1.100 -> 192.168.0.0
- 2001:db8::1 -> 2001:db8::
"""
if not ip_address:
return None
try:
if ':' in ip_address: # IPv6
# Keep first 4 groups, zero out the rest
parts = ip_address.split(':')
if len(parts) >= 4:
return ':'.join(parts[:4]) + '::'
return ip_address
else: # IPv4
# Keep first 2 octets, zero out last 2
parts = ip_address.split('.')
if len(parts) == 4:
return f"{parts[0]}.{parts[1]}.0.0"
return ip_address
except Exception:
return None
def record_vote(user_id, text, chosen_model_id, rejected_model_id, model_type,
session_duration=None, ip_address=None, user_agent=None,
generation_date=None, cache_hit=None, all_dataset_sentences=None):
"""Record a vote and update Elo ratings."""
# Determine sentence origin and whether it should count for public leaderboard
sentence_hash = hash_sentence(text)
sentence_origin = 'unknown'
counts_for_public = True # 한국어 TTS Arena: 모든 투표가 public leaderboard에 반영됨
if all_dataset_sentences and text in all_dataset_sentences:
sentence_origin = 'dataset'
else:
sentence_origin = 'custom'
# 한국어 TTS Arena는 새로 시작하므로 모든 투표를 public leaderboard에 반영
# Create the vote
vote = Vote(
user_id=user_id, # Required - user must be logged in to vote
text=text,
model_chosen=chosen_model_id,
model_rejected=rejected_model_id,
model_type=model_type,
session_duration_seconds=session_duration,
ip_address_partial=anonymize_ip_address(ip_address),
user_agent=user_agent[:500] if user_agent else None, # Truncate if too long
generation_date=generation_date,
cache_hit=cache_hit,
sentence_hash=sentence_hash,
sentence_origin=sentence_origin,
counts_for_public_leaderboard=counts_for_public,
)
db.session.add(vote)
db.session.flush() # Get the vote ID without committing
# Get the models
chosen_model = Model.query.filter_by(
id=chosen_model_id, model_type=model_type
).first()
rejected_model = Model.query.filter_by(
id=rejected_model_id, model_type=model_type
).first()
if not chosen_model or not rejected_model:
db.session.rollback()
return None, "One or both models not found for the specified model type"
# Only update Elo ratings and public stats if this vote counts for public leaderboard
if counts_for_public:
# Calculate new Elo ratings
new_chosen_elo, new_rejected_elo = calculate_elo_change(
chosen_model.current_elo, rejected_model.current_elo
)
# Update model stats
chosen_model.current_elo = new_chosen_elo
chosen_model.win_count += 1
chosen_model.match_count += 1
rejected_model.current_elo = new_rejected_elo
rejected_model.match_count += 1
else:
# For votes that don't count for public leaderboard, keep current Elo
new_chosen_elo = chosen_model.current_elo
new_rejected_elo = rejected_model.current_elo
# Record Elo history
chosen_history = EloHistory(
model_id=chosen_model_id,
elo_score=new_chosen_elo,
vote_id=vote.id,
model_type=model_type,
)
rejected_history = EloHistory(
model_id=rejected_model_id,
elo_score=new_rejected_elo,
vote_id=vote.id,
model_type=model_type,
)
db.session.add_all([chosen_history, rejected_history])
db.session.commit()
return vote, None
import re
def categorize_text(text):
"""
텍스트를 카테고리로 분류
Returns: list of categories the text belongs to
"""
categories = ['all'] # 모든 텍스트는 'all'에 포함
# 한영혼합: 알파벳 5글자 이상
alphabet_count = len(re.findall(r'[a-zA-Z]', text))
if alphabet_count >= 5:
categories.append('mixed_lang')
# 숫자혼합: 숫자 2개 이상
digit_count = len(re.findall(r'\d', text))
if digit_count >= 2:
categories.append('with_numbers')
# 긴문장: 50글자 이상
if len(text) >= 50:
categories.append('long_text')
return categories
def get_leaderboard_data(model_type, category='all'):
"""
Get leaderboard data for the specified model type and category.
Only includes votes that count for the public leaderboard.
Only shows active models.
Args:
model_type (str): The model type ('tts' or 'conversational')
category (str): Category filter ('all', 'mixed_lang', 'with_numbers', 'long_text')
Returns:
list: List of dictionaries containing model data for the leaderboard
"""
# 활성 모델 가져오기
active_models = Model.query.filter_by(model_type=model_type, is_active=True).all()
if category == 'all':
# 기존 로직: Model 테이블의 집계 데이터 사용
models = [m for m in active_models if m.match_count > 0]
models.sort(key=lambda x: x.current_elo, reverse=True)
result = []
for rank, model in enumerate(models, 1):
tier = "tier-s" if rank <= 2 else "tier-a" if rank <= 4 else "tier-b" if rank <= 7 else ""
result.append({
"rank": rank,
"id": model.id,
"name": model.name,
"model_url": model.model_url,
"win_rate": f"{model.win_rate:.0f}%",
"total_votes": model.match_count,
"elo": int(model.current_elo),
"tier": tier,
"is_open": model.is_open,
})
return result
# 카테고리별 필터링: Vote 테이블에서 직접 계산
active_model_ids = {m.id for m in active_models}
# 해당 카테고리에 맞는 투표 가져오기
all_votes = Vote.query.filter(
Vote.model_type == model_type,
Vote.counts_for_public_leaderboard == True
).all()
# 카테고리에 맞는 투표만 필터링
filtered_votes = []
for vote in all_votes:
text_categories = categorize_text(vote.text)
if category in text_categories:
filtered_votes.append(vote)
# 모델별 승/패 계산
model_stats = {m.id: {"wins": 0, "matches": 0, "model": m} for m in active_models}
for vote in filtered_votes:
if vote.model_chosen in active_model_ids:
model_stats[vote.model_chosen]["wins"] += 1
model_stats[vote.model_chosen]["matches"] += 1
if vote.model_rejected in active_model_ids:
model_stats[vote.model_rejected]["matches"] += 1
# 승률 계산 및 정렬
result = []
for model_id, stats in model_stats.items():
if stats["matches"] > 0:
win_rate = (stats["wins"] / stats["matches"]) * 100
result.append({
"id": model_id,
"name": stats["model"].name,
"model_url": stats["model"].model_url,
"win_rate": f"{win_rate:.0f}%",
"total_votes": stats["matches"],
"elo": int(1500 + (win_rate - 50) * 10), # 간단한 ELO 추정
"is_open": stats["model"].is_open,
"_win_rate_num": win_rate,
})
# 승률로 정렬
result.sort(key=lambda x: x["_win_rate_num"], reverse=True)
# 랭크와 티어 추가
for rank, item in enumerate(result, 1):
item["rank"] = rank
item["tier"] = "tier-s" if rank <= 2 else "tier-a" if rank <= 4 else "tier-b" if rank <= 7 else ""
del item["_win_rate_num"]
return result
def get_user_leaderboard(user_id, model_type):
"""
Get personalized leaderboard data for a specific user.
Includes ALL votes (both dataset and custom sentences).
Args:
user_id (int): The user ID
model_type (str): The model type ('tts' or 'conversational')
Returns:
list: List of dictionaries containing model data for the user's personal leaderboard
"""
# Get all models of the specified type
models = Model.query.filter_by(model_type=model_type).all()
# Get user's votes (includes both public and custom sentence votes)
user_votes = Vote.query.filter_by(user_id=user_id, model_type=model_type).all()
# Calculate win counts and match counts for each model based on user's votes
model_stats = {model.id: {"wins": 0, "matches": 0} for model in models}
for vote in user_votes:
model_stats[vote.model_chosen]["wins"] += 1
model_stats[vote.model_chosen]["matches"] += 1
model_stats[vote.model_rejected]["matches"] += 1
# Calculate win rates and prepare result
result = []
for model in models:
stats = model_stats[model.id]
win_rate = (
(stats["wins"] / stats["matches"] * 100) if stats["matches"] > 0 else 0
)
# Only include models the user has voted on
if stats["matches"] > 0:
result.append(
{
"id": model.id,
"name": model.name,
"model_url": model.model_url,
"win_rate": f"{win_rate:.0f}%",
"total_votes": stats["matches"],
"wins": stats["wins"],
"is_open": model.is_open,
}
)
# Sort by win rate descending
result.sort(key=lambda x: float(x["win_rate"].rstrip("%")), reverse=True)
# Add rank
for i, item in enumerate(result, 1):
item["rank"] = i
return result
def get_historical_leaderboard_data(model_type, target_date=None):
"""
Get leaderboard data at a specific date in history.
Args:
model_type (str): The model type ('tts' or 'conversational')
target_date (datetime): The target date for historical data, defaults to current time
Returns:
list: List of dictionaries containing model data for the historical leaderboard
"""
if not target_date:
target_date = datetime.utcnow()
# Get all models of the specified type
models = Model.query.filter_by(model_type=model_type).all()
# Create a result list for the models
result = []
for model in models:
# Get the most recent EloHistory entry for each model before the target date
elo_entry = (
EloHistory.query.filter(
EloHistory.model_id == model.id,
EloHistory.model_type == model_type,
EloHistory.timestamp <= target_date,
)
.order_by(EloHistory.timestamp.desc())
.first()
)
# Skip models that have no history before the target date
if not elo_entry:
continue
# Count wins and matches up to the target date (only public leaderboard votes)
match_count = Vote.query.filter(
db.or_(Vote.model_chosen == model.id, Vote.model_rejected == model.id),
Vote.model_type == model_type,
Vote.vote_date <= target_date,
Vote.counts_for_public_leaderboard == True,
).count()
win_count = Vote.query.filter(
Vote.model_chosen == model.id,
Vote.model_type == model_type,
Vote.vote_date <= target_date,
Vote.counts_for_public_leaderboard == True,
).count()
# Calculate win rate
win_rate = (win_count / match_count * 100) if match_count > 0 else 0
# Add to result
result.append(
{
"id": model.id,
"name": model.name,
"model_url": model.model_url,
"win_rate": f"{win_rate:.0f}%",
"total_votes": match_count,
"elo": int(elo_entry.elo_score),
"is_open": model.is_open,
}
)
# Sort by ELO score descending
result.sort(key=lambda x: x["elo"], reverse=True)
# Add rank and tier
for i, item in enumerate(result, 1):
item["rank"] = i
# Determine tier based on rank
if i <= 2:
item["tier"] = "tier-s"
elif i <= 4:
item["tier"] = "tier-a"
elif i <= 7:
item["tier"] = "tier-b"
else:
item["tier"] = ""
return result
def get_key_historical_dates(model_type):
"""
Get a list of key dates in the leaderboard history.
Args:
model_type (str): The model type ('tts' or 'conversational')
Returns:
list: List of datetime objects representing key dates
"""
# Get first and most recent vote dates
first_vote = (
Vote.query.filter_by(model_type=model_type)
.order_by(Vote.vote_date.asc())
.first()
)
last_vote = (
Vote.query.filter_by(model_type=model_type)
.order_by(Vote.vote_date.desc())
.first()
)
if not first_vote or not last_vote:
return []
# Generate a list of key dates - first day of each month between the first and last vote
dates = []
current_date = first_vote.vote_date.replace(day=1)
end_date = last_vote.vote_date
while current_date <= end_date:
dates.append(current_date)
# Move to next month
if current_date.month == 12:
current_date = current_date.replace(year=current_date.year + 1, month=1)
else:
current_date = current_date.replace(month=current_date.month + 1)
# Add latest date
if dates and dates[-1].month != end_date.month or dates[-1].year != end_date.year:
dates.append(end_date)
return dates
def insert_initial_models():
"""Insert initial models into the database (한국어 TTS 전용)."""
import os
# 환경 변수로 API 키 확인하여 활성화 여부 결정
has_openai = bool(os.getenv("OPENAI_API_KEY"))
has_elevenlabs = bool(os.getenv("ELEVENLABS_API_KEY"))
has_google = bool(os.getenv("GOOGLE_API_KEY"))
has_supertone = bool(os.getenv("SUPERTONE_API_KEY"))
has_clova = bool(os.getenv("CLOVA_CLIENT_ID") and os.getenv("CLOVA_API_KEY"))
has_humelo = bool(os.getenv("HUMELO_API_KEY"))
has_typecast = bool(os.getenv("TYPECAST_API_KEY"))
# Gemini TTS는 서비스 계정 JSON이 필요 (API Key 미지원)
has_gemini_tts = bool(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"))
tts_models = [
# 채널톡 TTS (한국어 특화) - 항상 활성화
Model(
id="channel-hana",
name="채널톡 하나",
model_type=ModelType.TTS,
is_open=False,
is_active=True,
model_url="https://channel.io/",
),
# ElevenLabs (다국어 지원) - API 키 있을 때만 활성화
Model(
id="eleven-multilingual-v2",
name="ElevenLabs Multilingual v2",
model_type=ModelType.TTS,
is_open=False,
is_active=has_elevenlabs,
model_url="https://elevenlabs.io/",
),
# OpenAI TTS (gpt-4o-mini-tts) - API 키 있을 때만 활성화
Model(
id="openai-gpt-4o-mini-tts",
name="OpenAI GPT-4o Mini TTS",
model_type=ModelType.TTS,
is_open=False,
is_active=has_openai,
model_url="https://platform.openai.com/docs/guides/text-to-speech",
),
# Google Cloud TTS - 비활성화 (Gemini TTS 사용)
Model(
id="google-wavenet",
name="Google Wavenet (ko-KR)",
model_type=ModelType.TTS,
is_open=False,
is_active=False, # Gemini TTS로 대체
model_url="https://cloud.google.com/text-to-speech",
),
Model(
id="google-neural2",
name="Google Neural2 (ko-KR)",
model_type=ModelType.TTS,
is_open=False,
is_active=False, # Gemini TTS로 대체
model_url="https://cloud.google.com/text-to-speech",
),
# CLOVA TTS (네이버 클라우드 - 한국어 특화) - API 키 있을 때만 활성화
Model(
id="clova-nara",
name="CLOVA Voice (나라)",
model_type=ModelType.TTS,
is_open=False,
is_active=has_clova,
model_url="https://clova.ai/",
),
# Supertone TTS (한국어 특화) - API 키 있을 때만 활성화
Model(
id="supertone-sona",
name="Supertone Sona",
model_type=ModelType.TTS,
is_open=False,
is_active=has_supertone,
model_url="https://supertone.ai/",
),
# Humelo DIVE TTS (한국어 특화) - API 키 있을 때만 활성화
Model(
id="humelo-sia",
name="Humelo DIVE (리아)",
model_type=ModelType.TTS,
is_open=False,
is_active=has_humelo,
model_url="https://humelo.com/",
),
# Typecast TTS v3.0 (한국어 특화) - 자인만 활성화
Model(
id="typecast-jaesun",
name="Typecast v3 (재선)",
model_type=ModelType.TTS,
is_open=False,
is_active=False, # 비활성화
model_url="https://typecast.ai/",
),
Model(
id="typecast-jain",
name="Typecast v3 (자인)",
model_type=ModelType.TTS,
is_open=False,
is_active=has_typecast, # 자인만 활성화
model_url="https://typecast.ai/",
),
# Legacy Typecast - 비활성화 (v3.0으로 대체)
Model(
id="typecast-geumhee",
name="Typecast (GeumHee) [Legacy]",
model_type=ModelType.TTS,
is_open=False,
is_active=False, # 비활성화
model_url="https://typecast.ai/",
),
# Gemini TTS (Google Cloud - 다국어 지원) - 서비스 계정 JSON 필요
Model(
id="gemini-tts-aoede",
name="Gemini TTS (Aoede)",
model_type=ModelType.TTS,
is_open=False,
is_active=has_gemini_tts,
model_url="https://cloud.google.com/text-to-speech/docs/gemini-tts",
),
]
for model in tts_models:
existing = Model.query.filter_by(
id=model.id, model_type=model.model_type
).first()
if not existing:
db.session.add(model)
else:
# Update model attributes if they've changed, but preserve other data
existing.name = model.name
existing.is_open = model.is_open
existing.model_url = model.model_url
if model.is_active is not None:
existing.is_active = model.is_active
db.session.commit()
# Deactivate legacy Typecast model (JaeYi) if it still exists
legacy_typecast = Model.query.filter_by(id="typecast-jaeyi").first()
if legacy_typecast and legacy_typecast.is_active:
legacy_typecast.is_active = False
db.session.commit()
def get_top_voters(limit=10):
"""
Get the top voters by number of votes.
Args:
limit (int): Number of users to return
Returns:
list: List of dictionaries containing user data and vote counts
"""
# Query users who have opted in to the leaderboard and have at least one vote
top_users = db.session.query(
User, func.count(Vote.id).label('vote_count')
).join(Vote).filter(
User.show_in_leaderboard == True
).group_by(User.id).order_by(
func.count(Vote.id).desc()
).limit(limit).all()
result = []
for i, (user, vote_count) in enumerate(top_users, 1):
result.append({
"rank": i,
"username": user.username,
"vote_count": vote_count,
"join_date": user.join_date.strftime("%b %d, %Y")
})
return result
def get_voting_statistics():
"""
Get voting statistics for the arena.
Returns:
dict: Dictionary containing voting statistics
"""
# Total number of unique voters
total_voters = db.session.query(func.count(func.distinct(Vote.user_id))).scalar() or 0
# Total number of votes
total_votes = Vote.query.count()
# Total evaluation time in seconds (sum of session durations)
total_eval_seconds = db.session.query(func.sum(Vote.session_duration_seconds)).scalar() or 0
# Convert to hours, minutes format
total_eval_hours = int(total_eval_seconds // 3600)
total_eval_minutes = int((total_eval_seconds % 3600) // 60)
# Total characters evaluated (sum of text lengths)
total_characters = db.session.query(func.sum(func.length(Vote.text))).scalar() or 0
# Votes in the last 24 hours
yesterday = datetime.utcnow() - timedelta(days=1)
votes_last_24h = Vote.query.filter(Vote.vote_date >= yesterday).count()
# Votes in the last 7 days
last_week = datetime.utcnow() - timedelta(days=7)
votes_last_7d = Vote.query.filter(Vote.vote_date >= last_week).count()
# Total number of active models
active_models = Model.query.filter_by(model_type=ModelType.TTS, is_active=True).count()
return {
"total_voters": total_voters,
"total_votes": total_votes,
"total_eval_hours": total_eval_hours,
"total_eval_minutes": total_eval_minutes,
"total_characters": total_characters,
"votes_last_24h": votes_last_24h,
"votes_last_7d": votes_last_7d,
"active_models": active_models,
}
def toggle_user_leaderboard_visibility(user_id):
"""Toggle user's leaderboard visibility setting"""
user = User.query.get(user_id)
if not user:
return None
user.show_in_leaderboard = not user.show_in_leaderboard
db.session.commit()
return user.show_in_leaderboard
def check_user_timeout(user_id):
"""Check if a user is currently timed out"""
if not user_id:
return False, None
active_timeout = UserTimeout.query.filter_by(
user_id=user_id,
is_active=True
).filter(
UserTimeout.expires_at > datetime.utcnow()
).order_by(UserTimeout.expires_at.desc()).first()
return active_timeout is not None, active_timeout
def create_user_timeout(user_id, reason, timeout_type, duration_days, created_by=None, related_campaign_id=None):
"""Create a new user timeout"""
expires_at = datetime.utcnow() + timedelta(days=duration_days)
timeout = UserTimeout(
user_id=user_id,
reason=reason,
timeout_type=timeout_type,
expires_at=expires_at,
created_by=created_by,
related_campaign_id=related_campaign_id
)
db.session.add(timeout)
db.session.commit()
return timeout
def cancel_user_timeout(timeout_id, cancelled_by, cancel_reason):
"""Cancel an active timeout"""
timeout = UserTimeout.query.get(timeout_id)
if not timeout:
return False, "Timeout not found"
timeout.is_active = False
timeout.cancelled_at = datetime.utcnow()
timeout.cancelled_by = cancelled_by
timeout.cancel_reason = cancel_reason
db.session.commit()
return True, "Timeout cancelled successfully"
def log_coordinated_campaign(model_id, model_type, vote_count, user_count,
time_window_hours, confidence_score, participants_data):
"""Log a detected coordinated voting campaign"""
campaign = CoordinatedVotingCampaign(
model_id=model_id,
model_type=model_type,
time_window_hours=time_window_hours,
vote_count=vote_count,
user_count=user_count,
confidence_score=confidence_score
)
db.session.add(campaign)
db.session.flush() # Get campaign ID
# Add participants
for participant_data in participants_data:
participant = CampaignParticipant(
campaign_id=campaign.id,
user_id=participant_data['user_id'],
votes_in_campaign=participant_data['votes_in_campaign'],
first_vote_at=participant_data['first_vote_at'],
last_vote_at=participant_data['last_vote_at'],
suspicion_level=participant_data['suspicion_level']
)
db.session.add(participant)
db.session.commit()
return campaign
def get_user_timeouts(user_id=None, active_only=True, limit=50):
"""Get user timeouts with optional filtering"""
query = UserTimeout.query
if user_id:
query = query.filter_by(user_id=user_id)
if active_only:
query = query.filter_by(is_active=True).filter(
UserTimeout.expires_at > datetime.utcnow()
)
return query.order_by(UserTimeout.created_at.desc()).limit(limit).all()
def get_coordinated_campaigns(status=None, limit=50):
"""Get coordinated voting campaigns with optional status filtering"""
query = CoordinatedVotingCampaign.query
if status:
query = query.filter_by(status=status)
return query.order_by(CoordinatedVotingCampaign.detected_at.desc()).limit(limit).all()
def resolve_campaign(campaign_id, resolved_by, status, admin_notes=None):
"""Mark a campaign as resolved"""
campaign = CoordinatedVotingCampaign.query.get(campaign_id)
if not campaign:
return False, "Campaign not found"
campaign.status = status
campaign.resolved_by = resolved_by
campaign.resolved_at = datetime.utcnow()
if admin_notes:
campaign.admin_notes = admin_notes
db.session.commit()
return True, "Campaign resolved successfully"
def hash_sentence(sentence_text):
"""Generate a SHA-256 hash for a sentence"""
return hashlib.sha256(sentence_text.strip().encode('utf-8')).hexdigest()
def is_sentence_consumed(sentence_text):
"""Check if a sentence has already been consumed"""
sentence_hash = hash_sentence(sentence_text)
return ConsumedSentence.query.filter_by(sentence_hash=sentence_hash).first() is not None
def mark_sentence_consumed(sentence_text, session_id=None, usage_type='direct'):
"""Mark a sentence as consumed"""
sentence_hash = hash_sentence(sentence_text)
# Check if already consumed
existing = ConsumedSentence.query.filter_by(sentence_hash=sentence_hash).first()
if existing:
return existing # Already consumed
consumed_sentence = ConsumedSentence(
sentence_hash=sentence_hash,
sentence_text=sentence_text,
session_id=session_id,
usage_type=usage_type
)
db.session.add(consumed_sentence)
db.session.commit()
return consumed_sentence
def get_unconsumed_sentences(sentence_pool):
"""Filter a list of sentences to only include unconsumed ones"""
if not sentence_pool:
return []
# Get all consumed sentence hashes
consumed_hashes = set(
row[0] for row in db.session.query(ConsumedSentence.sentence_hash).all()
)
# Filter out consumed sentences
unconsumed = []
for sentence in sentence_pool:
if hash_sentence(sentence) not in consumed_hashes:
unconsumed.append(sentence)
return unconsumed
def get_consumed_sentences_count():
"""Get the total count of consumed sentences"""
return ConsumedSentence.query.count()
def get_random_unconsumed_sentence(sentence_pool):
"""Get a random unconsumed sentence from the pool"""
unconsumed = get_unconsumed_sentences(sentence_pool)
if not unconsumed:
return None
import random
return random.choice(unconsumed)