File size: 7,332 Bytes
e7251ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
import pandas as pd
from googleapiclient.discovery import build
import os
API_KEY = os.getenv("GOOGLE_API_KEY")
def get_youtube_comments(video_id, limit=30, order='time'):
# YouTube ๋๊ธ ๊ฐ์ ธ์ค๊ธฐ; order: ์ ๋ ฌ ๋ฐฉ์ ('time': ์ต์ ์, 'relevance': ์ข์์์)
youtube = build("youtube", "v3", developerKey=API_KEY); comments = []; next_page_token = None
while len(comments) < limit:
response = youtube.commentThreads().list(part="snippet", videoId=video_id, maxResults=min(100, limit - len(comments)), order=order, pageToken=next_page_token).execute()
for item in response['items']:
comment = item['snippet']['topLevelComment']['snippet']
if len(comment['textDisplay'].split()) <= 300: comments.append({'comment': f"{comment['textDisplay']} {comment['likeCount']} likes", 'likes': comment['likeCount'], 'published': comment['publishedAt']})
next_page_token = response.get('nextPageToken')
if not next_page_token: break
return pd.DataFrame(comments[:limit])
def get_youtube_video_info(video_id):
youtube = build("youtube", "v3", developerKey=API_KEY)
response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute()
if not response['items']: return None, None
s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] # s=snipet, st=status, d=details
# return {
# 'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'],
# 'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'),
# 'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],
# }
return [f"์ ๋ชฉ: {s['title']}\n์ค๋ช
: {s['description']}\n์ฑ๋: {s['channelTitle']}\n๊ฒ์์ผ: {s['publishedAt']}\n์กฐํ์: {int(st.get('viewCount', 0)):,}\n์ข์์: {int(st.get('likeCount', 0)):,}\n๋๊ธ์: {int(st.get('commentCount', 0)):,}\n๊ธธ์ด: {d['duration']}\nํ๊ทธ: {s.get('tags', [])}",
{'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'],
'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'),
'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],}]
def get_youtube_video_info_dict(video_id):
youtube = build("youtube", "v3", developerKey=API_KEY)
response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute()
if not response['items']: return None
s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] # s=snipet, st=status, d=details
return {
'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'],
'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'),
'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],
}
def get_channel_id_by_name(channel_name):
"""Convert channel name to channel ID"""
youtube = build("youtube", "v3", developerKey=API_KEY)
search_response = youtube.search().list(q=channel_name,type='channel',part='id,snippet',maxResults=1).execute()
if search_response['items']: channel_id = search_response['items'][0]['id']['channelId'];return channel_id
return None
def get_channel_videos(channel_id, limit=10):
youtube = build("youtube", "v3", developerKey=API_KEY)
response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute()
videos = []
for item in response['items']:
video_id = item['id']['videoId']
# ์์ ์ธ๋ถ์ ๋ณด ๊ฐ์ ธ์์ ๊ธธ์ด ํ์ธ
video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute()
if video_detail['items']:
duration = video_detail['items'][0]['contentDetails']['duration']
# PT1M ์ด์์ธ ๊ฒฝ์ฐ๋ง (์ผ์ธ ๊ฐ ์๋ ์ผ๋ฐ ์์)
if 'M' in duration or 'H' in duration:
videos.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))})
if len(videos) >= limit: break
return pd.DataFrame(videos)
def get_channel_shorts(channel_id, limit=10):
youtube = build("youtube", "v3", developerKey=API_KEY)
response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute()
shorts = []
for item in response['items']:
video_id = item['id']['videoId']
# ์์ ์ธ๋ถ์ ๋ณด ๊ฐ์ ธ์์ ๊ธธ์ด ํ์ธ
video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute()
if video_detail['items']:
duration = video_detail['items'][0]['contentDetails']['duration']
# PT60S ์ดํ์ด๊ณ M์ด๋ H๊ฐ ์๋ ๊ฒฝ์ฐ (์ผ์ธ )
if 'H' not in duration and 'M' not in duration and 'S' in duration:
shorts.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))})
if len(shorts) >= limit: break
return pd.DataFrame(shorts)
# ์ฌ์ฉ ์์
if __name__ == "__main__":
# video_id = "9P6H2QywDjM"
# video_info = get_youtube_video_info(video_id)
# # ์ต์ ์ 100๊ฐ
# latest_comments = get_youtube_comments(video_id, limit=100, order='time') # order = 'time' or 'relevance'
# print(f"\n์ด ๋๊ธ ์: {len(latest_comments)}")
# print(f"ํ๊ท ์ข์์: {latest_comments['likes'].mean():.1f}")
# by_likes = latest_comments.sort_values('likes', ascending=False)
# by_date = latest_comments.sort_values('published', ascending=False)
# comments_text = '\n'.join([f"{i+1}. {comment}" for i, comment in enumerate(by_likes['comment'].tolist())])
# print(f"\n๋๊ธ:\n{comments_text}")
channel_id = "UCX6OQ3DkcsbYNE6H8uQQuVA" # MrBeast ์ฑ๋ ์์
latest_videos = get_channel_videos(channel_id, limit=10)
latest_shorts = get_channel_shorts(channel_id, limit=10)
print(f"์ต์ ์ผ๋ฐ ์์ {len(latest_videos)}๊ฐ:")
for i, row in latest_videos.iterrows():
print(f"{i+1}. {row['title']} ({row['duration']}) - ์กฐํ์: {row['views']:,}")
print(f"\n์ต์ ์ผ์ธ {len(latest_shorts)}๊ฐ:")
for i, row in latest_shorts.iterrows():
print(f"{i+1}. {row['title']} ({row['duration']}) - ์กฐํ์: {row['views']:,}")
|