462 lines
17 KiB
Python
462 lines
17 KiB
Python
import pytest
|
|
from unittest.mock import patch, MagicMock, AsyncMock
|
|
import sys
|
|
import os
|
|
import json
|
|
|
|
# Dodanie katalogu nadrzędnego do ścieżki dla importów
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
from src.youtube_utils import (
|
|
extract_youtube_urls,
|
|
extract_video_id,
|
|
get_transcript,
|
|
NoTranscriptFound,
|
|
TranscriptsDisabled,
|
|
APITokenMissing,
|
|
AuthorizationError,
|
|
APIConnectionError,
|
|
APIResponseError,
|
|
NoTranscriptLanguagesAvailable,
|
|
YouTubeUtilsError
|
|
)
|
|
|
|
|
|
# Testy dla funkcji extract_youtube_urls
|
|
def test_extract_youtube_urls():
|
|
# Test dla standardowych linków YouTube
|
|
text = "Sprawdź to wideo https://www.youtube.com/watch?v=abc123 i to https://youtu.be/xyz789"
|
|
result = extract_youtube_urls(text)
|
|
assert len(result) == 2
|
|
assert "https://www.youtube.com/watch?v=abc123" in result
|
|
assert "https://youtu.be/xyz789" in result
|
|
|
|
# Test dla YouTube Shorts
|
|
text = "Ten shorts jest super https://www.youtube.com/shorts/def456"
|
|
result = extract_youtube_urls(text)
|
|
assert len(result) == 1
|
|
assert "https://www.youtube.com/shorts/def456" in result
|
|
|
|
# Test dla mobilnej wersji YouTube
|
|
text = "Link mobilny: https://m.youtube.com/watch?v=mob123"
|
|
result = extract_youtube_urls(text)
|
|
assert len(result) == 1
|
|
assert "https://m.youtube.com/watch?v=mob123" in result
|
|
|
|
# Test dla pustego tekstu
|
|
result = extract_youtube_urls("")
|
|
assert result == []
|
|
|
|
# Test dla tekstu bez linków YouTube
|
|
text = "To jest zwykły tekst bez linków do YouTube"
|
|
result = extract_youtube_urls(text)
|
|
assert result == []
|
|
|
|
|
|
# Testy dla funkcji extract_video_id
|
|
def test_extract_video_id():
|
|
# Test dla standardowego URL
|
|
url = "https://www.youtube.com/watch?v=abc123"
|
|
assert extract_video_id(url) == "abc123"
|
|
|
|
# Test dla skróconego URL
|
|
url = "https://youtu.be/xyz789"
|
|
assert extract_video_id(url) == "xyz789"
|
|
|
|
# Test dla YouTube Shorts
|
|
url = "https://www.youtube.com/shorts/def456"
|
|
assert extract_video_id(url) == "def456"
|
|
|
|
# Test dla URL embed
|
|
url = "https://www.youtube.com/embed/embed123"
|
|
assert extract_video_id(url) == "embed123"
|
|
|
|
# Test dla starego formatu URL
|
|
url = "https://www.youtube.com/v/old456"
|
|
assert extract_video_id(url) == "old456"
|
|
|
|
# Test dla nieprawidłowego URL
|
|
url = "https://example.com/not-youtube"
|
|
assert extract_video_id(url) is None
|
|
|
|
|
|
# Testy dla funkcji get_transcript
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_success():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Przykładowa odpowiedź z API w nowym formacie
|
|
mock_json_data = [
|
|
{
|
|
"id": "abc123",
|
|
"title": "Przykładowy tytuł wideo",
|
|
"tracks": [
|
|
{
|
|
"language": "Polish",
|
|
"transcript": [
|
|
{"text": "To jest", "start": "0.0", "dur": "1.0"},
|
|
{"text": "przykładowa transkrypcja", "start": "1.0", "dur": "2.0"}
|
|
]
|
|
},
|
|
{
|
|
"language": "English",
|
|
"transcript": [
|
|
{"text": "This is", "start": "0.0", "dur": "1.0"},
|
|
{"text": "sample transcript", "start": "1.0", "dur": "2.0"}
|
|
]
|
|
}
|
|
],
|
|
"languages": [
|
|
{"label": "Polish", "languageCode": "pl"},
|
|
{"label": "English", "languageCode": "en"}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Konfiguracja asynchronicznego mocka
|
|
mock_response.json = AsyncMock(return_value=mock_json_data)
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test
|
|
transcript, title = await get_transcript("abc123", ["pl", "en"])
|
|
|
|
# Sprawdzenie wyników
|
|
assert transcript == "To jest przykładowa transkrypcja"
|
|
assert title == "Przykładowy tytuł wideo"
|
|
# Upewnij się, że API zostało wywołane z poprawnymi parametrami
|
|
mock_client_session.post.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_english_fallback():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Przykładowa odpowiedź z API zawierająca tylko transkrypcję angielską
|
|
mock_json_data = [
|
|
{
|
|
"id": "abc123",
|
|
"title": "Sample video title",
|
|
"tracks": [
|
|
{
|
|
"language": "English",
|
|
"transcript": [
|
|
{"text": "This is", "start": "0.0", "dur": "1.0"},
|
|
{"text": "sample transcript", "start": "1.0", "dur": "2.0"}
|
|
]
|
|
}
|
|
],
|
|
"languages": [
|
|
{"label": "English", "languageCode": "en"}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Konfiguracja asynchronicznego mocka
|
|
mock_response.json = AsyncMock(return_value=mock_json_data)
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - preferujemy polski, ale dostępny jest tylko angielski
|
|
transcript, title = await get_transcript("abc123", ["pl", "en"])
|
|
|
|
# Sprawdzenie wyników - powinniśmy otrzymać angielską transkrypcję jako fallback
|
|
assert transcript == "This is sample transcript"
|
|
assert title == "Sample video title"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_no_title():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io bez tytułu
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Przykładowa odpowiedź z API bez tytułu
|
|
mock_json_data = [
|
|
{
|
|
"id": "abc123",
|
|
"tracks": [
|
|
{
|
|
"language": "Polish",
|
|
"transcript": [
|
|
{"text": "To jest", "start": "0.0", "dur": "1.0"},
|
|
{"text": "przykładowa transkrypcja", "start": "1.0", "dur": "2.0"}
|
|
]
|
|
}
|
|
],
|
|
"languages": [
|
|
{"label": "Polish", "languageCode": "pl"}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Konfiguracja asynchronicznego mocka
|
|
mock_response.json = AsyncMock(return_value=mock_json_data)
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test z odpowiedzią bez tytułu
|
|
transcript, title = await get_transcript("abc123", ["pl", "en"])
|
|
|
|
# Sprawdzenie wyników
|
|
assert transcript == "To jest przykładowa transkrypcja"
|
|
assert title == "" # Pusty tytuł
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_no_transcript_found():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Przykładowa odpowiedź z API zawierająca błąd
|
|
mock_json_data = [
|
|
{
|
|
"id": "abc123",
|
|
"error": "No transcript found for this video"
|
|
}
|
|
]
|
|
|
|
# Konfiguracja asynchronicznego mocka
|
|
mock_response.json = AsyncMock(return_value=mock_json_data)
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić NoTranscriptFound
|
|
with pytest.raises(NoTranscriptFound):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_disabled():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Przykładowa odpowiedź z API informująca o wyłączonych transkrypcjach
|
|
mock_json_data = [
|
|
{
|
|
"id": "abc123",
|
|
"error": "Transcriptions disabled for this video"
|
|
}
|
|
]
|
|
|
|
# Konfiguracja asynchronicznego mocka
|
|
mock_response.json = AsyncMock(return_value=mock_json_data)
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić TranscriptsDisabled
|
|
with pytest.raises(TranscriptsDisabled):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_auth_error():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io
|
|
mock_response = MagicMock()
|
|
mock_response.status = 401
|
|
mock_response.text = AsyncMock(return_value="Unauthorized access")
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić AuthorizationError
|
|
with pytest.raises(AuthorizationError):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_no_api_token():
|
|
# Test gdy brak tokenu API
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', None):
|
|
with pytest.raises(APITokenMissing):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_connection_error():
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.side_effect = Exception("Connection error")
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić APIConnectionError
|
|
with pytest.raises(APIConnectionError):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_invalid_json():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io z nieprawidłowym JSON
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
mock_response.json.side_effect = json.JSONDecodeError("Invalid JSON", "{", 0)
|
|
mock_response.text = AsyncMock(return_value="{invalid json")
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić APIResponseError
|
|
with pytest.raises(APIResponseError):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_empty_response():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io z pustą tablicą
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Konfiguracja asynchronicznego mocka z pustą odpowiedzią
|
|
mock_response.json = AsyncMock(return_value=[])
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić APIResponseError
|
|
with pytest.raises(APIResponseError):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_no_tracks():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io bez ścieżek transkrypcji
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Przykładowa odpowiedź z API bez ścieżek transkrypcji
|
|
mock_json_data = [
|
|
{
|
|
"id": "abc123",
|
|
"title": "Wideo bez transkrypcji"
|
|
}
|
|
]
|
|
|
|
# Konfiguracja asynchronicznego mocka
|
|
mock_response.json = AsyncMock(return_value=mock_json_data)
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić NoTranscriptFound
|
|
with pytest.raises(NoTranscriptFound):
|
|
await get_transcript("abc123", ["pl", "en"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_transcript_empty_tracks():
|
|
# Mock dla odpowiedzi z API youtube-transcript.io z pustą tablicą ścieżek
|
|
mock_response = MagicMock()
|
|
mock_response.status = 200
|
|
|
|
# Przykładowa odpowiedź z API z pustą tablicą ścieżek
|
|
mock_json_data = [
|
|
{
|
|
"id": "abc123",
|
|
"title": "Wideo bez transkrypcji",
|
|
"tracks": []
|
|
}
|
|
]
|
|
|
|
# Konfiguracja asynchronicznego mocka
|
|
mock_response.json = AsyncMock(return_value=mock_json_data)
|
|
|
|
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
|
|
mock_session = MagicMock()
|
|
mock_session.__aenter__.return_value = mock_response
|
|
|
|
mock_client_session = MagicMock()
|
|
mock_client_session.post.return_value = mock_session
|
|
mock_client_session.__aenter__.return_value = mock_client_session
|
|
|
|
# Używamy kontekstowego managera dla patcha
|
|
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
|
|
with patch('aiohttp.ClientSession', return_value=mock_client_session):
|
|
# Test - powinien rzucić NoTranscriptFound
|
|
with pytest.raises(NoTranscriptFound):
|
|
await get_transcript("abc123", ["pl", "en"]) |