File size: 7,332 Bytes
e7251ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import pandas as pd
from googleapiclient.discovery import build
import os

API_KEY = os.getenv("GOOGLE_API_KEY")

def get_youtube_comments(video_id, limit=30, order='time'):
    # YouTube ๋Œ“๊ธ€ ๊ฐ€์ ธ์˜ค๊ธฐ; order: ์ •๋ ฌ ๋ฐฉ์‹ ('time': ์ตœ์‹ ์ˆœ, 'relevance': ์ข‹์•„์š”์ˆœ)
    youtube = build("youtube", "v3", developerKey=API_KEY); comments = []; next_page_token = None
    while len(comments) < limit:
        response = youtube.commentThreads().list(part="snippet", videoId=video_id, maxResults=min(100, limit - len(comments)), order=order, pageToken=next_page_token).execute()
        for item in response['items']:
            comment = item['snippet']['topLevelComment']['snippet']
            if len(comment['textDisplay'].split()) <= 300: comments.append({'comment': f"{comment['textDisplay']} {comment['likeCount']} likes", 'likes': comment['likeCount'], 'published': comment['publishedAt']})
        next_page_token = response.get('nextPageToken')
        if not next_page_token: break
    return pd.DataFrame(comments[:limit])

def get_youtube_video_info(video_id):
    youtube = build("youtube", "v3", developerKey=API_KEY)
    response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute()
    if not response['items']: return None, None
    s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] # s=snipet, st=status, d=details
    # return {
    #     'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'],
    #     'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'),
    #     'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],
    # }
    return [f"์ œ๋ชฉ: {s['title']}\n์„ค๋ช…: {s['description']}\n์ฑ„๋„: {s['channelTitle']}\n๊ฒŒ์‹œ์ผ: {s['publishedAt']}\n์กฐํšŒ์ˆ˜: {int(st.get('viewCount', 0)):,}\n์ข‹์•„์š”: {int(st.get('likeCount', 0)):,}\n๋Œ“๊ธ€์ˆ˜: {int(st.get('commentCount', 0)):,}\n๊ธธ์ด: {d['duration']}\nํƒœ๊ทธ: {s.get('tags', [])}",
            {'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'],
             'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'),
             'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],}]


def get_youtube_video_info_dict(video_id):
    youtube = build("youtube", "v3", developerKey=API_KEY)
    response = youtube.videos().list(part="snippet,statistics,contentDetails", id=video_id).execute()
    if not response['items']: return None
    s, st, d = response['items'][0]['snippet'], response['items'][0]['statistics'], response['items'][0]['contentDetails'] # s=snipet, st=status, d=details
    return {
        'title': s['title'], 'description': s['description'], 'channel_title': s['channelTitle'], 'channel_id': s['channelId'],
        'published_at': s['publishedAt'], 'tags': s.get('tags', []), 'category_id': s['categoryId'], 'default_language': s.get('defaultLanguage'),
        'view_count': int(st.get('viewCount', 0)),'like_count': int(st.get('likeCount', 0)), 'comment_count': int(st.get('commentCount', 0)), 'duration': d['duration'],
    }

def get_channel_id_by_name(channel_name):
   """Convert channel name to channel ID"""
   youtube = build("youtube", "v3", developerKey=API_KEY)
   
   search_response = youtube.search().list(q=channel_name,type='channel',part='id,snippet',maxResults=1).execute()
   if search_response['items']: channel_id = search_response['items'][0]['id']['channelId'];return channel_id
   return None

def get_channel_videos(channel_id, limit=10):
    youtube = build("youtube", "v3", developerKey=API_KEY)
    response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute()
    videos = []
    for item in response['items']:
        video_id = item['id']['videoId']
        # ์˜์ƒ ์„ธ๋ถ€์ •๋ณด ๊ฐ€์ ธ์™€์„œ ๊ธธ์ด ํ™•์ธ
        video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute()
        if video_detail['items']:
            duration = video_detail['items'][0]['contentDetails']['duration']
            # PT1M ์ด์ƒ์ธ ๊ฒฝ์šฐ๋งŒ (์‡ผ์ธ ๊ฐ€ ์•„๋‹Œ ์ผ๋ฐ˜ ์˜์ƒ)
            if 'M' in duration or 'H' in duration:
                videos.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))})
                if len(videos) >= limit: break
    return pd.DataFrame(videos)

def get_channel_shorts(channel_id, limit=10):
    youtube = build("youtube", "v3", developerKey=API_KEY)
    response = youtube.search().list(part="snippet", channelId=channel_id, maxResults=50, order="date", type="video").execute()
    shorts = []
    for item in response['items']:
        video_id = item['id']['videoId']
        # ์˜์ƒ ์„ธ๋ถ€์ •๋ณด ๊ฐ€์ ธ์™€์„œ ๊ธธ์ด ํ™•์ธ
        video_detail = youtube.videos().list(part="contentDetails,statistics", id=video_id).execute()
        if video_detail['items']:
            duration = video_detail['items'][0]['contentDetails']['duration']
            # PT60S ์ดํ•˜์ด๊ณ  M์ด๋‚˜ H๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ (์‡ผ์ธ )
            if 'H' not in duration and 'M' not in duration and 'S' in duration:
                shorts.append({'video_id': video_id, 'title': item['snippet']['title'], 'published': item['snippet']['publishedAt'], 'duration': duration, 'views': int(video_detail['items'][0]['statistics'].get('viewCount', 0))})
                if len(shorts) >= limit: break
    return pd.DataFrame(shorts)


# ์‚ฌ์šฉ ์˜ˆ์‹œ
if __name__ == "__main__":
    # video_id = "9P6H2QywDjM"
    # video_info = get_youtube_video_info(video_id)
    
    # # ์ตœ์‹ ์ˆœ 100๊ฐœ
    # latest_comments = get_youtube_comments(video_id, limit=100, order='time') # order = 'time' or 'relevance'    
    # print(f"\n์ด ๋Œ“๊ธ€ ์ˆ˜: {len(latest_comments)}")
    # print(f"ํ‰๊ท  ์ข‹์•„์š”: {latest_comments['likes'].mean():.1f}")
    # by_likes = latest_comments.sort_values('likes', ascending=False)
    # by_date = latest_comments.sort_values('published', ascending=False)
    # comments_text = '\n'.join([f"{i+1}. {comment}" for i, comment in enumerate(by_likes['comment'].tolist())])
    # print(f"\n๋Œ“๊ธ€:\n{comments_text}")
    
    channel_id = "UCX6OQ3DkcsbYNE6H8uQQuVA"  # MrBeast ์ฑ„๋„ ์˜ˆ์‹œ
    
    latest_videos = get_channel_videos(channel_id, limit=10)
    latest_shorts = get_channel_shorts(channel_id, limit=10)
    
    print(f"์ตœ์‹  ์ผ๋ฐ˜ ์˜์ƒ {len(latest_videos)}๊ฐœ:")
    for i, row in latest_videos.iterrows():
        print(f"{i+1}. {row['title']} ({row['duration']}) - ์กฐํšŒ์ˆ˜: {row['views']:,}")
    
    print(f"\n์ตœ์‹  ์‡ผ์ธ  {len(latest_shorts)}๊ฐœ:")
    for i, row in latest_shorts.iterrows():
        print(f"{i+1}. {row['title']} ({row['duration']}) - ์กฐํšŒ์ˆ˜: {row['views']:,}")