telegram.video.summary.bot/tests/test_youtube_utils.py

462 lines
17 KiB
Python

import pytest
from unittest.mock import patch, MagicMock, AsyncMock
import sys
import os
import json
# Dodanie katalogu nadrzędnego do ścieżki dla importów
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.youtube_utils import (
extract_youtube_urls,
extract_video_id,
get_transcript,
NoTranscriptFound,
TranscriptsDisabled,
APITokenMissing,
AuthorizationError,
APIConnectionError,
APIResponseError,
NoTranscriptLanguagesAvailable,
YouTubeUtilsError
)
# Testy dla funkcji extract_youtube_urls
def test_extract_youtube_urls():
# Test dla standardowych linków YouTube
text = "Sprawdź to wideo https://www.youtube.com/watch?v=abc123 i to https://youtu.be/xyz789"
result = extract_youtube_urls(text)
assert len(result) == 2
assert "https://www.youtube.com/watch?v=abc123" in result
assert "https://youtu.be/xyz789" in result
# Test dla YouTube Shorts
text = "Ten shorts jest super https://www.youtube.com/shorts/def456"
result = extract_youtube_urls(text)
assert len(result) == 1
assert "https://www.youtube.com/shorts/def456" in result
# Test dla mobilnej wersji YouTube
text = "Link mobilny: https://m.youtube.com/watch?v=mob123"
result = extract_youtube_urls(text)
assert len(result) == 1
assert "https://m.youtube.com/watch?v=mob123" in result
# Test dla pustego tekstu
result = extract_youtube_urls("")
assert result == []
# Test dla tekstu bez linków YouTube
text = "To jest zwykły tekst bez linków do YouTube"
result = extract_youtube_urls(text)
assert result == []
# Testy dla funkcji extract_video_id
def test_extract_video_id():
# Test dla standardowego URL
url = "https://www.youtube.com/watch?v=abc123"
assert extract_video_id(url) == "abc123"
# Test dla skróconego URL
url = "https://youtu.be/xyz789"
assert extract_video_id(url) == "xyz789"
# Test dla YouTube Shorts
url = "https://www.youtube.com/shorts/def456"
assert extract_video_id(url) == "def456"
# Test dla URL embed
url = "https://www.youtube.com/embed/embed123"
assert extract_video_id(url) == "embed123"
# Test dla starego formatu URL
url = "https://www.youtube.com/v/old456"
assert extract_video_id(url) == "old456"
# Test dla nieprawidłowego URL
url = "https://example.com/not-youtube"
assert extract_video_id(url) is None
# Testy dla funkcji get_transcript
@pytest.mark.asyncio
async def test_get_transcript_success():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API w nowym formacie
mock_json_data = [
{
"id": "abc123",
"title": "Przykładowy tytuł wideo",
"tracks": [
{
"language": "Polish",
"transcript": [
{"text": "To jest", "start": "0.0", "dur": "1.0"},
{"text": "przykładowa transkrypcja", "start": "1.0", "dur": "2.0"}
]
},
{
"language": "English",
"transcript": [
{"text": "This is", "start": "0.0", "dur": "1.0"},
{"text": "sample transcript", "start": "1.0", "dur": "2.0"}
]
}
],
"languages": [
{"label": "Polish", "languageCode": "pl"},
{"label": "English", "languageCode": "en"}
]
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test
transcript, title = await get_transcript("abc123", ["pl", "en"])
# Sprawdzenie wyników
assert transcript == "To jest przykładowa transkrypcja"
assert title == "Przykładowy tytuł wideo"
# Upewnij się, że API zostało wywołane z poprawnymi parametrami
mock_client_session.post.assert_called_once()
@pytest.mark.asyncio
async def test_get_transcript_english_fallback():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API zawierająca tylko transkrypcję angielską
mock_json_data = [
{
"id": "abc123",
"title": "Sample video title",
"tracks": [
{
"language": "English",
"transcript": [
{"text": "This is", "start": "0.0", "dur": "1.0"},
{"text": "sample transcript", "start": "1.0", "dur": "2.0"}
]
}
],
"languages": [
{"label": "English", "languageCode": "en"}
]
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - preferujemy polski, ale dostępny jest tylko angielski
transcript, title = await get_transcript("abc123", ["pl", "en"])
# Sprawdzenie wyników - powinniśmy otrzymać angielską transkrypcję jako fallback
assert transcript == "This is sample transcript"
assert title == "Sample video title"
@pytest.mark.asyncio
async def test_get_transcript_no_title():
# Mock dla odpowiedzi z API youtube-transcript.io bez tytułu
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API bez tytułu
mock_json_data = [
{
"id": "abc123",
"tracks": [
{
"language": "Polish",
"transcript": [
{"text": "To jest", "start": "0.0", "dur": "1.0"},
{"text": "przykładowa transkrypcja", "start": "1.0", "dur": "2.0"}
]
}
],
"languages": [
{"label": "Polish", "languageCode": "pl"}
]
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test z odpowiedzią bez tytułu
transcript, title = await get_transcript("abc123", ["pl", "en"])
# Sprawdzenie wyników
assert transcript == "To jest przykładowa transkrypcja"
assert title == "" # Pusty tytuł
@pytest.mark.asyncio
async def test_get_transcript_no_transcript_found():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API zawierająca błąd
mock_json_data = [
{
"id": "abc123",
"error": "No transcript found for this video"
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić NoTranscriptFound
with pytest.raises(NoTranscriptFound):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_disabled():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API informująca o wyłączonych transkrypcjach
mock_json_data = [
{
"id": "abc123",
"error": "Transcriptions disabled for this video"
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić TranscriptsDisabled
with pytest.raises(TranscriptsDisabled):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_auth_error():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 401
mock_response.text = AsyncMock(return_value="Unauthorized access")
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić AuthorizationError
with pytest.raises(AuthorizationError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_no_api_token():
# Test gdy brak tokenu API
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', None):
with pytest.raises(APITokenMissing):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_connection_error():
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_client_session = MagicMock()
mock_client_session.post.side_effect = Exception("Connection error")
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić APIConnectionError
with pytest.raises(APIConnectionError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_invalid_json():
# Mock dla odpowiedzi z API youtube-transcript.io z nieprawidłowym JSON
mock_response = MagicMock()
mock_response.status = 200
mock_response.json.side_effect = json.JSONDecodeError("Invalid JSON", "{", 0)
mock_response.text = AsyncMock(return_value="{invalid json")
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić APIResponseError
with pytest.raises(APIResponseError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_empty_response():
# Mock dla odpowiedzi z API youtube-transcript.io z pustą tablicą
mock_response = MagicMock()
mock_response.status = 200
# Konfiguracja asynchronicznego mocka z pustą odpowiedzią
mock_response.json = AsyncMock(return_value=[])
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić APIResponseError
with pytest.raises(APIResponseError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_no_tracks():
# Mock dla odpowiedzi z API youtube-transcript.io bez ścieżek transkrypcji
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API bez ścieżek transkrypcji
mock_json_data = [
{
"id": "abc123",
"title": "Wideo bez transkrypcji"
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić NoTranscriptFound
with pytest.raises(NoTranscriptFound):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_empty_tracks():
# Mock dla odpowiedzi z API youtube-transcript.io z pustą tablicą ścieżek
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API z pustą tablicą ścieżek
mock_json_data = [
{
"id": "abc123",
"title": "Wideo bez transkrypcji",
"tracks": []
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić NoTranscriptFound
with pytest.raises(NoTranscriptFound):
await get_transcript("abc123", ["pl", "en"])