import pandas as pd from googleapiclient.discovery import build import os API_KEY = os.getenv("GOOGLE_API_KEY") def get_youtube_comments(video_id, limit=30, order='time'): # YouTube 댓글 가져오기; order: 정렬 방식 ('time': 최신순, 'relevance': 좋아요순) youtube = build("youtube", "v3", developerKey=API_KEY); comments = []; next_page_token = None while len(comments) < limit: response = youtube.commentThreads().list(part="snippet", videoId=video_id, maxResults=min(100, limit - len(comments)), order=order, pageToken=next_page_token).execute() for item in response['items']: comment = item['snippet']['topLevelComment']['snippet'] if len(comment['textDisplay'].split()) <= 300: comments.append({'comment': f"{comment['textDisplay']} {comment['likeCount']} likes", 'likes': comment['likeCount'], 'published': comment['publishedAt']}) next_page_token = response.get('nextPageToken') if not next_page_token: break return pd.DataFrame(comments[:limit]) def get_youtube_video_info(video_id): youtube = build("youtube", "v3", developerKey=API_KEY) response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute() if not response['items']: return None, None s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] # s=snipet, st=status, d=details # return { # 'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'], # 'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'), # 'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'], # } return [f"제목: {s['title']}\n설명: {s['description']}\n채널: {s['channelTitle']}\n게시일: {s['publishedAt']}\n조회수: {int(st.get('viewCount', 0)):,}\n좋아요: {int(st.get('likeCount', 0)):,}\n댓글수: {int(st.get('commentCount', 0)):,}\n길이: {d['duration']}\n태그: {s.get('tags', [])}", {'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'], 'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'), 'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],}] def get_youtube_video_info_dict(video_id): youtube = build("youtube", "v3", developerKey=API_KEY) response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute() if not response['items']: return None s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] # s=snipet, st=status, d=details return { 'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'], 'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'), 'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'], } def get_channel_id_by_name(channel_name): """Convert channel name to channel ID""" youtube = build("youtube", "v3", developerKey=API_KEY) search_response = youtube.search().list(q=channel_name,type='channel',part='id,snippet',maxResults=1).execute() if search_response['items']: channel_id = search_response['items'][0]['id']['channelId'];return channel_id return None def get_channel_videos(channel_id, limit=10): youtube = build("youtube", "v3", developerKey=API_KEY) response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute() videos = [] for item in response['items']: video_id = item['id']['videoId'] # 영상 세부정보 가져와서 길이 확인 video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute() if video_detail['items']: duration = video_detail['items'][0]['contentDetails']['duration'] # PT1M 이상인 경우만 (쇼츠가 아닌 일반 영상) if 'M' in duration or 'H' in duration: videos.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))}) if len(videos) >= limit: break return pd.DataFrame(videos) def get_channel_shorts(channel_id, limit=10): youtube = build("youtube", "v3", developerKey=API_KEY) response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute() shorts = [] for item in response['items']: video_id = item['id']['videoId'] # 영상 세부정보 가져와서 길이 확인 video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute() if video_detail['items']: duration = video_detail['items'][0]['contentDetails']['duration'] # PT60S 이하이고 M이나 H가 없는 경우 (쇼츠) if 'H' not in duration and 'M' not in duration and 'S' in duration: shorts.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))}) if len(shorts) >= limit: break return pd.DataFrame(shorts) # 사용 예시 if __name__ == "__main__": # video_id = "9P6H2QywDjM" # video_info = get_youtube_video_info(video_id) # # 최신순 100개 # latest_comments = get_youtube_comments(video_id, limit=100, order='time') # order = 'time' or 'relevance' # print(f"\n총 댓글 수: {len(latest_comments)}") # print(f"평균 좋아요: {latest_comments['likes'].mean():.1f}") # by_likes = latest_comments.sort_values('likes', ascending=False) # by_date = latest_comments.sort_values('published', ascending=False) # comments_text = '\n'.join([f"{i+1}. {comment}" for i, comment in enumerate(by_likes['comment'].tolist())]) # print(f"\n댓글:\n{comments_text}") channel_id = "UCX6OQ3DkcsbYNE6H8uQQuVA" # MrBeast 채널 예시 latest_videos = get_channel_videos(channel_id, limit=10) latest_shorts = get_channel_shorts(channel_id, limit=10) print(f"최신 일반 영상 {len(latest_videos)}개:") for i, row in latest_videos.iterrows(): print(f"{i+1}. {row['title']} ({row['duration']}) - 조회수: {row['views']:,}") print(f"\n최신 쇼츠 {len(latest_shorts)}개:") for i, row in latest_shorts.iterrows(): print(f"{i+1}. {row['title']} ({row['duration']}) - 조회수: {row['views']:,}")