|
|
import pandas as pd |
|
|
from googleapiclient.discovery import build |
|
|
import os |
|
|
|
|
|
API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
|
|
|
def get_youtube_comments(video_id, limit=30, order='time'): |
|
|
|
|
|
youtube = build("youtube", "v3", developerKey=API_KEY); comments = []; next_page_token = None |
|
|
while len(comments) < limit: |
|
|
response = youtube.commentThreads().list(part="snippet", videoId=video_id, maxResults=min(100, limit - len(comments)), order=order, pageToken=next_page_token).execute() |
|
|
for item in response['items']: |
|
|
comment = item['snippet']['topLevelComment']['snippet'] |
|
|
if len(comment['textDisplay'].split()) <= 300: comments.append({'comment': f"{comment['textDisplay']} {comment['likeCount']} likes", 'likes': comment['likeCount'], 'published': comment['publishedAt']}) |
|
|
next_page_token = response.get('nextPageToken') |
|
|
if not next_page_token: break |
|
|
return pd.DataFrame(comments[:limit]) |
|
|
|
|
|
def get_youtube_video_info(video_id): |
|
|
youtube = build("youtube", "v3", developerKey=API_KEY) |
|
|
response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute() |
|
|
if not response['items']: return None, None |
|
|
s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return [f"์ ๋ชฉ: {s['title']}\n์ค๋ช
: {s['description']}\n์ฑ๋: {s['channelTitle']}\n๊ฒ์์ผ: {s['publishedAt']}\n์กฐํ์: {int(st.get('viewCount', 0)):,}\n์ข์์: {int(st.get('likeCount', 0)):,}\n๋๊ธ์: {int(st.get('commentCount', 0)):,}\n๊ธธ์ด: {d['duration']}\nํ๊ทธ: {s.get('tags', [])}", |
|
|
{'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'], |
|
|
'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'), |
|
|
'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],}] |
|
|
|
|
|
|
|
|
def get_youtube_video_info_dict(video_id): |
|
|
youtube = build("youtube", "v3", developerKey=API_KEY) |
|
|
response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute() |
|
|
if not response['items']: return None |
|
|
s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] |
|
|
return { |
|
|
'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'], |
|
|
'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'), |
|
|
'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'], |
|
|
} |
|
|
|
|
|
def get_channel_id_by_name(channel_name): |
|
|
"""Convert channel name to channel ID""" |
|
|
youtube = build("youtube", "v3", developerKey=API_KEY) |
|
|
|
|
|
search_response = youtube.search().list(q=channel_name,type='channel',part='id,snippet',maxResults=1).execute() |
|
|
if search_response['items']: channel_id = search_response['items'][0]['id']['channelId'];return channel_id |
|
|
return None |
|
|
|
|
|
def get_channel_videos(channel_id, limit=10): |
|
|
youtube = build("youtube", "v3", developerKey=API_KEY) |
|
|
response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute() |
|
|
videos = [] |
|
|
for item in response['items']: |
|
|
video_id = item['id']['videoId'] |
|
|
|
|
|
video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute() |
|
|
if video_detail['items']: |
|
|
duration = video_detail['items'][0]['contentDetails']['duration'] |
|
|
|
|
|
if 'M' in duration or 'H' in duration: |
|
|
videos.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))}) |
|
|
if len(videos) >= limit: break |
|
|
return pd.DataFrame(videos) |
|
|
|
|
|
def get_channel_shorts(channel_id, limit=10): |
|
|
youtube = build("youtube", "v3", developerKey=API_KEY) |
|
|
response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute() |
|
|
shorts = [] |
|
|
for item in response['items']: |
|
|
video_id = item['id']['videoId'] |
|
|
|
|
|
video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute() |
|
|
if video_detail['items']: |
|
|
duration = video_detail['items'][0]['contentDetails']['duration'] |
|
|
|
|
|
if 'H' not in duration and 'M' not in duration and 'S' in duration: |
|
|
shorts.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))}) |
|
|
if len(shorts) >= limit: break |
|
|
return pd.DataFrame(shorts) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
channel_id = "UCX6OQ3DkcsbYNE6H8uQQuVA" |
|
|
|
|
|
latest_videos = get_channel_videos(channel_id, limit=10) |
|
|
latest_shorts = get_channel_shorts(channel_id, limit=10) |
|
|
|
|
|
print(f"์ต์ ์ผ๋ฐ ์์ {len(latest_videos)}๊ฐ:") |
|
|
for i, row in latest_videos.iterrows(): |
|
|
print(f"{i+1}. {row['title']} ({row['duration']}) - ์กฐํ์: {row['views']:,}") |
|
|
|
|
|
print(f"\n์ต์ ์ผ์ธ {len(latest_shorts)}๊ฐ:") |
|
|
for i, row in latest_shorts.iterrows(): |
|
|
print(f"{i+1}. {row['title']} ({row['duration']}) - ์กฐํ์: {row['views']:,}") |
|
|
|