feat/telegram_client #2

Merged
tbs093a merged 5 commits from feat/telegram_client into master 2025-05-14 15:09:59 +00:00
6 changed files with 390 additions and 56 deletions

20
docker.compose.sh 100755
View File

@ -0,0 +1,20 @@
#!/bin/bash
# Zmienne środowiskowe
DB_NAME="telegram_bot"
DB_USER="telegram_user"
DB_PASSWORD="telegram_pass"
DB_PORT="5432"
VOLUME_NAME="telegram_bot_pgdata"
export DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@localhost:${DB_PORT}/${DB_NAME}"
docker run \
--name "${CONTAINER_NAME}" \
-e POSTGRES_DB="${DB_NAME}" \
-e POSTGRES_USER="${DB_USER}" \
-e POSTGRES_PASSWORD="${DB_PASSWORD}" \
-p "${DB_PORT}:5432" \
--restart unless-stopped \
-d \
postgres:14-alpine

View File

@ -2,9 +2,9 @@ import asyncio
import logging import logging
from telegram import Update from telegram import Update
from telegram.ext import ApplicationBuilder, MessageHandler, filters, CommandHandler from telegram.ext import ApplicationBuilder, MessageHandler, filters, CommandHandler
from .src.config import TELEGRAM_BOT_TOKEN, logger # Import logger z config from src.config import TELEGRAM_BOT_TOKEN, logger # Import logger z config
from .src.handlers import handle_message, error_handler from src.handlers import handle_message, error_handler
from .src.db import init_db, close_db from src.db import init_db, close_db
async def post_init(application): async def post_init(application):
"""Funkcja wykonywana po inicjalizacji aplikacji bota.""" """Funkcja wykonywana po inicjalizacji aplikacji bota."""

View File

@ -40,7 +40,8 @@ if not YOUTUBE_TRANSCRIPT_API_TOKEN:
# Inne ustawienia # Inne ustawienia
TRANSCRIPT_LANGUAGES = ['pl', 'en'] # Priorytet języków transkrypcji TRANSCRIPT_LANGUAGES = ['pl', 'en'] # Priorytet języków transkrypcji
SUMMARY_PROMPT = """Streść poniższy transkrypt filmu z YouTube w zwięzły sposób w języku polskim. SUMMARY_PROMPT = """Streść poniższy transkrypt filmu z YouTube w zwięzły sposób w języku polskim.
Skup się na głównych tematach i wnioskach. Skup się na głównych tematach i wnioskach.
prosiłbym o dokonanie tego streszczenia w formacie Markdown.
Transkrypt: Transkrypt:
{transcript}""" {transcript}"""

View File

@ -3,11 +3,14 @@ import re
from telegram import Update from telegram import Update
from telegram.ext import ContextTypes from telegram.ext import ContextTypes
from telegram.constants import ParseMode from telegram.constants import ParseMode
from .youtube_utils import extract_youtube_urls, extract_video_id, get_transcript, get_video_title from .youtube_utils import extract_youtube_urls, extract_video_id, get_transcript
from .openai_utils import summarize_text from .openai_utils import summarize_long_text, chunk_text, summarize_text
from .db import save_video_summary, check_if_url_exists from .db import save_video_summary, check_if_url_exists
from .config import TRANSCRIPT_LANGUAGES from .config import TRANSCRIPT_LANGUAGES
# Stała dla maksymalnej długości wiadomości Telegram
MAX_MESSAGE_LENGTH = 4096
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
@ -37,7 +40,8 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Opcjonalnie: sprawdź, czy już istnieje w bazie, zanim zaczniesz przetwarzać # Opcjonalnie: sprawdź, czy już istnieje w bazie, zanim zaczniesz przetwarzać
# if await check_if_url_exists(url): # if await check_if_url_exists(url):
# logger.info(f"URL {url} już istnieje w bazie danych. Pomijam.") # logger.info(f"URL {url} już istnieje w bazie danych. Pomijam.")
# await context.bot.send_message( # await safe_send_message(
# context.bot,
# chat_id=chat_id, # chat_id=chat_id,
# text=f"Informacje o filmie {url} są już w bazie.", # text=f"Informacje o filmie {url} są już w bazie.",
# disable_web_page_preview=True # disable_web_page_preview=True
@ -52,64 +56,142 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_chat_action(chat_id=chat_id, action='typing') await context.bot.send_chat_action(chat_id=chat_id, action='typing')
# 1. Pobierz tytuł # Pobierz transkrypcję i tytuł
title = await get_video_title(url) try:
if not title: transcript, title = await get_transcript(video_id, TRANSCRIPT_LANGUAGES)
logger.warning(f"Nie udało się pobrać tytułu dla ID filmu: {video_id}") if not title:
await context.bot.send_message( logger.warning(f"Nie udało się pobrać tytułu dla ID filmu: {video_id}")
title = f"Film YouTube {video_id}" # Użyj zastępczego tytułu
except Exception as e:
logger.warning(f"Nie udało się pobrać transkrypcji dla ID filmu: {video_id}: {str(e)}")
await safe_send_message(
context.bot,
chat_id=chat_id, chat_id=chat_id,
text=f"Nie udało się pobrać tytułu dla filmu: {url}", text=f"Nie udało się pobrać transkrypcji dla filmu: {url}",
disable_web_page_preview=True
)
processed_urls_in_message.add(url)
continue # Potrzebujemy tytułu
await context.bot.send_chat_action(chat_id=chat_id, action='typing')
# 2. Pobierz transkrypcję
transcript = await get_transcript(video_id, TRANSCRIPT_LANGUAGES)
if not transcript:
logger.warning(f"Nie udało się pobrać transkrypcji dla ID filmu: {video_id}")
await context.bot.send_message(
chat_id=chat_id,
text=f"Nie udało się pobrać transkrypcji dla filmu: {title} ({url})",
disable_web_page_preview=True disable_web_page_preview=True
) )
processed_urls_in_message.add(url) processed_urls_in_message.add(url)
continue # Potrzebujemy transkrypcji do streszczenia continue # Potrzebujemy transkrypcji do streszczenia
await context.bot.send_chat_action(chat_id=chat_id, action='typing') # Informuj o rozpoczęciu przetwarzania
await safe_send_message(
context.bot,
chat_id=chat_id,
text=f"*Rozpoczynam przetwarzanie filmu:*\n{title}\n\n*Link:* {url}",
parse_mode="Markdown",
disable_web_page_preview=True
)
# 3. Wygeneruj streszczenie # Podziel tekst na fragmenty do przetworzenia
summary = await summarize_text(transcript) chunks = await chunk_text(transcript)
if not summary: all_summaries = []
logger.error(f"Nie udało się wygenerować streszczenia dla ID filmu: {video_id}") combined_summary = ""
await context.bot.send_message(
# Dla każdego fragmentu, wygeneruj i wyślij streszczenie
for i, chunk in enumerate(chunks):
await context.bot.send_chat_action(chat_id=chat_id, action='typing')
progress_msg = f"Przetwarzanie fragmentu {i+1}/{len(chunks)}..."
logger.info(progress_msg)
# Jeśli jest więcej niż jeden fragment, powiadom użytkownika o postępie
if len(chunks) > 1 and i == 0:
await safe_send_message(
context.bot,
chat_id=chat_id,
text=f"Film jest długi, podzielono transkrypcję na {len(chunks)} części. Przetwarzam każdą z nich...",
disable_web_page_preview=True
)
try:
# Generuj streszczenie fragmentu
partial_summary = await summarize_text(
chunk,
is_partial=len(chunks) > 1,
part_num=i+1,
total_parts=len(chunks)
)
all_summaries.append(partial_summary)
# Jeśli mamy więcej niż jeden fragment, wysyłaj postęp na bieżąco
if len(chunks) > 1:
part_header = f"*Streszczenie - część {i+1}/{len(chunks)}:*\n\n"
response_text = f"{part_header}{partial_summary}"
await send_long_message(
context.bot,
chat_id=chat_id,
text=response_text,
parse_mode="Markdown",
disable_web_page_preview=True
)
except Exception as e:
logger.error(f"Błąd podczas streszczania fragmentu {i+1}: {str(e)}")
await safe_send_message(
context.bot,
chat_id=chat_id,
text=f"Wystąpił błąd podczas przetwarzania fragmentu {i+1}/{len(chunks)}: {str(e)}",
disable_web_page_preview=True
)
# Jeśli mamy więcej niż 3 fragmenty, generuj końcowe streszczenie
if len(chunks) > 3:
await context.bot.send_chat_action(chat_id=chat_id, action='typing')
await safe_send_message(
context.bot,
chat_id=chat_id, chat_id=chat_id,
text=f"Nie udało się wygenerować streszczenia dla filmu: {title} ({url})", text="Tworzę ostateczne streszczenie łączące wszystkie fragmenty...",
disable_web_page_preview=True disable_web_page_preview=True
) )
processed_urls_in_message.add(url)
continue try:
# Połącz wszystkie częściowe streszczenia
combined_text = "\n\n".join(all_summaries)
combined_summary = await summarize_text(
combined_text,
is_final_summary=True
)
except Exception as e:
logger.error(f"Błąd podczas generowania końcowego streszczenia: {str(e)}")
# Jeśli końcowe streszczenie się nie powiedzie, użyj połączonych częściowych
combined_summary = "\n\n".join(all_summaries)
await safe_send_message(
context.bot,
chat_id=chat_id,
text="Nie udało się wygenerować końcowego streszczenia. Wyświetlam połączone częściowe streszczenia.",
disable_web_page_preview=True
)
else:
# Dla mniejszej liczby fragmentów po prostu połącz streszczenia
combined_summary = "\n\n".join(all_summaries)
# 4. Zapisz do bazy danych # Zapisz pełne streszczenie do bazy danych
saved = await save_video_summary(url, title, transcript, summary) saved = await save_video_summary(url, title, transcript, combined_summary)
# Wyślij końcowe streszczenie, jeśli były więcej niż 3 fragmenty
if len(chunks) > 3:
final_text = f"*Ostateczne streszczenie filmu:*\n*{title}*\n\n{combined_summary}"
await send_long_message(
context.bot,
chat_id=chat_id,
text=final_text,
parse_mode="Markdown",
disable_web_page_preview=True
)
# Podsumowanie procesu
if saved: if saved:
logger.info(f"Pomyślnie przetworzono i zapisano film: {title} ({url})") logger.info(f"Pomyślnie przetworzono i zapisano film: {title} ({url})")
response_text = ( if len(chunks) <= 3: # Nie wysyłaj podsumowania ponownie dla dłuższych filmów
f"*Przetworzono film:* {escape_markdown_v2(title)}\n\n" await safe_send_message(
f"*Link:* {escape_markdown_v2(url)}\n\n" context.bot,
f"*Streszczenie:*\n{escape_markdown_v2(summary)}" chat_id=chat_id,
) text=f"Pomyślnie zapisano streszczenie filmu w bazie danych: {title}",
await context.bot.send_message( disable_web_page_preview=True
chat_id=chat_id, )
text=response_text,
parse_mode=ParseMode.MARKDOWN_V2,
disable_web_page_preview=True # Wyłącz podgląd linku w odpowiedzi bota
)
else: else:
logger.error(f"Nie udało się zapisać danych do bazy dla filmu: {title} ({url})") logger.error(f"Nie udało się zapisać danych do bazy dla filmu: {title} ({url})")
await context.bot.send_message( await safe_send_message(
context.bot,
chat_id=chat_id, chat_id=chat_id,
text=f"Wystąpił błąd podczas zapisywania danych dla filmu: {title} ({url})", text=f"Wystąpił błąd podczas zapisywania danych dla filmu: {title} ({url})",
disable_web_page_preview=True disable_web_page_preview=True
@ -120,9 +202,110 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Funkcja pomocnicza do escape'owania znaków specjalnych MarkdownV2 # Funkcja pomocnicza do escape'owania znaków specjalnych MarkdownV2
def escape_markdown_v2(text: str) -> str: def escape_markdown_v2(text: str) -> str:
"""Ucieka znaki specjalne dla parsowania Telegram MarkdownV2.""" """Ucieka znaki specjalne dla parsowania Telegram MarkdownV2."""
if not text:
return ""
# Pełna lista znaków specjalnych w Telegram MarkdownV2
escape_chars = r'_*[]()~`>#+-=|{}.!' escape_chars = r'_*[]()~`>#+-=|{}.!'
# Zamieniamy każdy znak specjalny na jego wersję z dwoma backslashami przed nim
return re.sub(f'([{re.escape(escape_chars)}])', r'\\\1', text) return re.sub(f'([{re.escape(escape_chars)}])', r'\\\1', text)
# Ulepszona funkcja do bezpiecznego wysyłania wiadomości
async def safe_send_message(bot, chat_id, text, parse_mode=None, disable_web_page_preview=False, max_retries=3):
"""
Bezpiecznie wysyła wiadomość, obsługując błędy związane z formatowaniem.
Args:
bot: Instancja bota Telegram
chat_id: ID czatu
text: Tekst do wysłania
parse_mode: Tryb parsowania (None, HTML, Markdown, MarkdownV2)
disable_web_page_preview: Czy wyłączyć podgląd linków
max_retries: Maksymalna liczba prób
Returns:
Obiekt wysłanej wiadomości lub None w przypadku błędu
"""
# Próbuj wysłać z formatowaniem
for attempt in range(max_retries):
try:
return await bot.send_message(
chat_id=chat_id,
text=text,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview
)
except Exception as e:
logger.warning(f"Błąd wysyłania wiadomości (próba {attempt+1}/{max_retries}): {str(e)}")
error_msg = str(e).lower()
# Jeśli jest to błąd parsowania Markdown
if "parse" in error_msg and "entities" in error_msg:
# Spróbuj ponownie bez formatowania
if attempt == max_retries - 1:
logger.info("Wysyłam wiadomość bez formatowania")
try:
return await bot.send_message(
chat_id=chat_id,
text=text,
parse_mode=None, # Bez formatowania
disable_web_page_preview=disable_web_page_preview
)
except Exception as e2:
logger.error(f"Nie udało się wysłać wiadomości nawet bez formatowania: {str(e2)}")
return None
return None
# Funkcja do dzielenia długich wiadomości
async def send_long_message(bot, chat_id, text, parse_mode=None, disable_web_page_preview=False):
"""
Wysyła długą wiadomość, dzieląc na części, jeśli przekracza limit Telegram.
Args:
bot: Instancja bota Telegram
chat_id: ID czatu
text: Tekst do wysłania
parse_mode: Tryb parsowania (None, HTML, Markdown, MarkdownV2)
disable_web_page_preview: Czy wyłączyć podgląd linków
"""
# Maksymalna długość wiadomości w Telegram to 4096 znaków
max_length = MAX_MESSAGE_LENGTH # 4096 znaków
if len(text) <= max_length:
# Jeśli wiadomość nie przekracza limitu, wyślij ją normalnie
return await safe_send_message(
bot,
chat_id=chat_id,
text=text,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview
)
# Jeśli wiadomość jest za długa, podziel ją
parts = []
for i in range(0, len(text), max_length):
parts.append(text[i:i+max_length])
# Wyślij części wiadomości
for i, part in enumerate(parts):
# Użyj odpowiedniego formatu nagłówka w zależności od parse_mode
if parse_mode == ParseMode.MARKDOWN_V2 or parse_mode == "Markdown":
part_header = f"*Część {i+1}/{len(parts)}*\n\n"
else:
part_header = f"Część {i+1}/{len(parts)}\n\n"
await safe_send_message(
bot,
chat_id=chat_id,
text=part_header + part,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview
)
return None # Nie ma sensu zwracać ostatniej wiadomości, bo wysłaliśmy kilka
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Loguje błędy zgłoszone przez `python-telegram-bot`.""" """Loguje błędy zgłoszone przez `python-telegram-bot`."""
logger.error(f"Wyjątek podczas obsługi aktualizacji: {context.error}", exc_info=context.error) logger.error(f"Wyjątek podczas obsługi aktualizacji: {context.error}", exc_info=context.error)

View File

@ -1,10 +1,13 @@
import logging import logging
from typing import Optional, Tuple, Dict, Any from typing import Optional, Tuple, Dict, Any, List
from openai import AsyncOpenAI # Używamy AsyncOpenAI dla kompatybilności z asyncio from openai import AsyncOpenAI # Używamy AsyncOpenAI dla kompatybilności z asyncio
from .config import OPENAI_API_KEY, SUMMARY_PROMPT from .config import OPENAI_API_KEY, SUMMARY_PROMPT
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Stała dla maksymalnego rozmiaru pojedynczego zapytania
MAX_CHUNK_SIZE = 20000
class OpenAIUtilsError(Exception): class OpenAIUtilsError(Exception):
"""Bazowa klasa wyjątków dla modułu openai_utils.""" """Bazowa klasa wyjątków dla modułu openai_utils."""
pass pass
@ -92,12 +95,121 @@ async def check_openai_api_status() -> Tuple[bool, Dict[str, Any]]:
logger.error(f"Błąd podczas sprawdzania statusu API OpenAI: {e}", exc_info=True) logger.error(f"Błąd podczas sprawdzania statusu API OpenAI: {e}", exc_info=True)
return False, status_info return False, status_info
async def summarize_text(text: str) -> str: async def chunk_text(text: str, max_size: int = MAX_CHUNK_SIZE) -> List[str]:
"""
Dzieli długi tekst na mniejsze fragmenty, starając się zachować granice zdań.
Args:
text: Tekst do podzielenia
max_size: Maksymalny rozmiar pojedynczego fragmentu
Returns:
Lista fragmentów tekstu
"""
if len(text) <= max_size:
return [text]
chunks = []
start = 0
while start < len(text):
# Określ koniec fragmentu
end = start + max_size
if end >= len(text):
# Jeśli to ostatni fragment, po prostu użyj reszty tekstu
chunks.append(text[start:])
break
# Znajdź ostatnią kropkę, wykrzyknik lub pytajnik przed lub w miejscu końca
last_period = max(
text.rfind('. ', start, end),
text.rfind('! ', start, end),
text.rfind('? ', start, end)
)
if last_period != -1:
# Jeśli znaleziono znak końca zdania, podziel w tym miejscu
end = last_period + 2 # +2 aby uwzględnić znak i spację
else:
# Jeśli nie ma znaku końca zdania, znajdź ostatnią spację
last_space = text.rfind(' ', start, end)
if last_space != -1:
end = last_space + 1
chunks.append(text[start:end])
start = end
logger.info(f"Podzielono tekst o długości {len(text)} znaków na {len(chunks)} fragmentów")
return chunks
async def summarize_long_text(text: str) -> str:
"""
Streszcza długi tekst, dzieląc go na mniejsze części, jeśli jest to konieczne.
Args:
text: Tekst do streszczenia
Returns:
Streszczenie tekstu
Raises:
EmptyTextError: Gdy tekst jest pusty
APIKeyMissingError: Gdy brak klucza API OpenAI
QuotaExceededError: Gdy przekroczono limit zapytań API
SummarizationError: Przy innych błędach API OpenAI
"""
if not text:
logger.warning("Próba streszczenia pustego tekstu.")
raise EmptyTextError("Próba streszczenia pustego tekstu.")
# Podziel tekst na fragmenty, jeśli jest zbyt długi
chunks = await chunk_text(text)
if len(chunks) == 1:
# Jeśli tekst nie wymaga podziału, użyj normalnej funkcji streszczania
return await summarize_text(chunks[0])
# Dla wielu fragmentów, streszczaj każdy osobno, a następnie połącz streszczenia
partial_summaries = []
for i, chunk in enumerate(chunks):
logger.info(f"Streszczanie fragmentu {i+1}/{len(chunks)} (długość: {len(chunk)} znaków)")
# Użyj zmodyfikowanego promptu dla fragmentów
partial_summary = await summarize_text(
chunk,
is_partial=True,
part_num=i+1,
total_parts=len(chunks)
)
partial_summaries.append(partial_summary)
# Połącz wszystkie częściowe streszczenia
combined_summary = "\n\n".join(partial_summaries)
# Jeśli mamy więcej niż 3 fragmenty, może być potrzebne dodatkowe streszczenie
if len(chunks) > 3:
logger.info("Tworzenie końcowego streszczenia z połączonych fragmentów")
final_summary = await summarize_text(
combined_summary,
is_final_summary=True
)
return final_summary
return combined_summary
async def summarize_text(text: str, is_partial: bool = False, part_num: int = 0,
total_parts: int = 0, is_final_summary: bool = False) -> str:
""" """
Wysyła tekst do API OpenAI w celu streszczenia. Wysyła tekst do API OpenAI w celu streszczenia.
Args: Args:
text: Tekst do streszczenia text: Tekst do streszczenia
is_partial: Czy to część większego tekstu
part_num: Numer bieżącej części
total_parts: Całkowita liczba części
is_final_summary: Czy to końcowe streszczenie z połączonych części
Returns: Returns:
Streszczenie tekstu Streszczenie tekstu
@ -116,7 +228,23 @@ async def summarize_text(text: str) -> str:
logger.error("Klient OpenAI nie został zainicjalizowany.") logger.error("Klient OpenAI nie został zainicjalizowany.")
raise APIKeyMissingError("Klient OpenAI nie został zainicjalizowany. Sprawdź klucz API.") raise APIKeyMissingError("Klient OpenAI nie został zainicjalizowany. Sprawdź klucz API.")
prompt = SUMMARY_PROMPT.format(transcript=text) # Wybierz odpowiedni prompt
if is_final_summary:
prompt = f"""Poniżej znajduje się połączone streszczenie długiego filmu, podzielone na części.
Stwórz z tego spójne, ostateczne streszczenie w języku polskim w formacie markdown.
Połączone streszczenia:
{text}"""
elif is_partial:
prompt = f"""Streść poniższy fragment transkryptu filmu z YouTube (część {part_num} z {total_parts})
w zwięzły sposób w języku polskim. Skup się na głównych tematach i wnioskach.
Użyj formatu markdown.
Fragment transkryptu (część {part_num}/{total_parts}):
{text}"""
else:
prompt = SUMMARY_PROMPT.format(transcript=text)
logger.debug(f"Długość tekstu do streszczenia: {len(text)} znaków") logger.debug(f"Długość tekstu do streszczenia: {len(text)} znaków")
try: try:
@ -127,8 +255,8 @@ async def summarize_text(text: str) -> str:
{"role": "system", "content": "Jesteś pomocnym asystentem specjalizującym się w streszczaniu transkryptów wideo."}, {"role": "system", "content": "Jesteś pomocnym asystentem specjalizującym się w streszczaniu transkryptów wideo."},
{"role": "user", "content": prompt} {"role": "user", "content": prompt}
], ],
temperature=0.5, # Niższa temperatura dla bardziej spójnych streszczeń temperature=0.2, # Niższa temperatura dla bardziej spójnych streszczeń
max_tokens=150, # Ogranicz długość odpowiedzi max_tokens=5000, # Ogranicz długość odpowiedzi
) )
summary = response.choices[0].message.content.strip() summary = response.choices[0].message.content.strip()

View File

@ -14,6 +14,7 @@ YOUTUBE_URL_PATTERNS = [
r'(https?://youtu\.be/[\w-]+)', r'(https?://youtu\.be/[\w-]+)',
r'(https?://m\.youtube\.com/watch\?v=[\w-]+)', r'(https?://m\.youtube\.com/watch\?v=[\w-]+)',
r'(https?://(?:www\.)?youtube\.com/shorts/[\w-]+)', r'(https?://(?:www\.)?youtube\.com/shorts/[\w-]+)',
r'(https?://(?:www\.)?youtube\.com/live/[\w-]+(?:\?[^&]*)?)',
] ]
COMPILED_YOUTUBE_REGEX = re.compile('|'.join(YOUTUBE_URL_PATTERNS), re.IGNORECASE) COMPILED_YOUTUBE_REGEX = re.compile('|'.join(YOUTUBE_URL_PATTERNS), re.IGNORECASE)
@ -69,7 +70,8 @@ def extract_video_id(url: str) -> Optional[str]:
r'youtu\.be/([\w-]+)', # Skrócony URL r'youtu\.be/([\w-]+)', # Skrócony URL
r'embed/([\w-]+)', # URL do osadzania r'embed/([\w-]+)', # URL do osadzania
r'v/([\w-]+)', # Starszy format r'v/([\w-]+)', # Starszy format
r'shorts/([\w-]+)' # YouTube Shorts r'shorts/([\w-]+)', # YouTube Shorts
r'live/([\w-]+)', # Transmisje na żywo
] ]
for pattern in patterns: for pattern in patterns: