2024-11-27 14:09:33 +00:00
import logging
2024-11-20 18:01:58 +00:00
from typing import Any , Dict , Generator , List , Optional , Sequence , Union
from urllib . parse import parse_qs , urlparse
from langchain_core . documents import Document
2024-11-27 14:09:33 +00:00
from open_webui . env import SRC_LOG_LEVELS
2024-11-20 18:01:58 +00:00
2024-11-27 14:09:33 +00:00
log = logging . getLogger ( __name__ )
log . setLevel ( SRC_LOG_LEVELS [ " RAG " ] )
2024-11-20 18:01:58 +00:00
ALLOWED_SCHEMES = { " http " , " https " }
ALLOWED_NETLOCS = {
" youtu.be " ,
" m.youtube.com " ,
" youtube.com " ,
" www.youtube.com " ,
" www.youtube-nocookie.com " ,
" vid.plus " ,
}
def _parse_video_id ( url : str ) - > Optional [ str ] :
""" Parse a YouTube URL and return the video ID if valid, otherwise None. """
parsed_url = urlparse ( url )
if parsed_url . scheme not in ALLOWED_SCHEMES :
return None
if parsed_url . netloc not in ALLOWED_NETLOCS :
return None
path = parsed_url . path
if path . endswith ( " /watch " ) :
query = parsed_url . query
parsed_query = parse_qs ( query )
if " v " in parsed_query :
ids = parsed_query [ " v " ]
video_id = ids if isinstance ( ids , str ) else ids [ 0 ]
else :
return None
else :
path = parsed_url . path . lstrip ( " / " )
video_id = path . split ( " / " ) [ - 1 ]
if len ( video_id ) != 11 : # Video IDs are 11 characters long
return None
return video_id
class YoutubeLoader :
""" Load `YouTube` video transcripts. """
def __init__ (
self ,
video_id : str ,
language : Union [ str , Sequence [ str ] ] = " en " ,
2024-11-27 14:09:33 +00:00
proxy_url : Optional [ str ] = None ,
2024-11-20 18:01:58 +00:00
) :
""" Initialize with YouTube video ID. """
_video_id = _parse_video_id ( video_id )
self . video_id = _video_id if _video_id is not None else video_id
self . _metadata = { " source " : video_id }
self . language = language
2024-11-27 14:09:33 +00:00
self . proxy_url = proxy_url
2025-05-06 14:24:27 +00:00
# Ensure language is a list
2024-11-20 18:01:58 +00:00
if isinstance ( language , str ) :
self . language = [ language ]
else :
self . language = language
def load ( self ) - > List [ Document ] :
2025-05-05 17:57:21 +00:00
""" Load YouTube transcripts into `Document` objects. """
2024-11-20 18:01:58 +00:00
try :
2025-05-05 17:57:21 +00:00
from youtube_transcript_api import (
NoTranscriptFound ,
TranscriptsDisabled ,
YouTubeTranscriptApi ,
)
except ImportError :
raise ImportError (
' Could not import " youtube_transcript_api " Python package. '
" Please install it with `pip install youtube-transcript-api`. "
)
2025-05-06 14:30:18 +00:00
2025-05-05 17:57:21 +00:00
if self . proxy_url :
youtube_proxies = {
" http " : self . proxy_url ,
" https " : self . proxy_url ,
}
2025-05-05 18:08:25 +00:00
# Don't log complete URL because it might contain secrets
log . debug ( f " Using proxy URL: { self . proxy_url [ : 14 ] } ... " )
2025-05-05 17:57:21 +00:00
else :
youtube_proxies = None
2025-05-06 14:30:18 +00:00
2025-05-05 17:57:21 +00:00
try :
transcript_list = YouTubeTranscriptApi . list_transcripts (
self . video_id , proxies = youtube_proxies
2024-11-20 18:01:58 +00:00
)
2025-05-05 17:57:06 +00:00
except Exception as e :
2025-05-05 17:57:21 +00:00
log . exception ( " Loading YouTube transcript failed " )
return [ ]
2025-05-06 14:30:18 +00:00
2025-05-06 14:16:58 +00:00
# Make a copy of the language list to avoid modifying the original
languages_to_try = list ( self . language )
2025-05-06 14:30:18 +00:00
# Add English as fallback if not already in the list
2025-05-06 14:16:58 +00:00
if " en " not in languages_to_try :
log . debug ( " Adding English as fallback language " )
languages_to_try . append ( " en " )
2025-05-05 17:57:21 +00:00
# Try each language in order of priority
2025-05-06 14:16:58 +00:00
for lang in languages_to_try :
2025-05-05 17:57:21 +00:00
try :
transcript = transcript_list . find_transcript ( [ lang ] )
2025-05-05 18:00:10 +00:00
log . debug ( f " Found transcript for language ' { lang } ' " )
2025-05-05 17:57:21 +00:00
transcript_pieces : List [ Dict [ str , Any ] ] = transcript . fetch ( )
transcript_text = " " . join (
map (
lambda transcript_piece : transcript_piece . text . strip ( " " ) ,
transcript_pieces ,
)
)
return [ Document ( page_content = transcript_text , metadata = self . _metadata ) ]
2025-05-05 18:03:00 +00:00
except NoTranscriptFound :
2025-05-05 17:57:21 +00:00
log . debug ( f " No transcript found for language ' { lang } ' " )
continue
except Exception as e :
2025-05-05 18:40:48 +00:00
log . info ( f " Error finding transcript for language ' { lang } ' " )
2025-05-05 17:57:21 +00:00
raise e
2025-05-06 14:30:18 +00:00
2025-05-06 14:22:40 +00:00
# If we get here, all languages failed
2025-05-06 14:16:58 +00:00
languages_tried = " , " . join ( languages_to_try )
2025-05-06 14:30:18 +00:00
log . warning ( f " No transcript found for any of the specified languages: { languages_tried } . Verify if the video has transcripts, add more languages if needed. " )
raise NoTranscriptFound ( f " No transcript found for any supported language. Verify if the video has transcripts, add more languages if needed. " )