feat(init): prepare full working yt-transcript-api integration with tests

-
pull/1/head
TBS093A 2025-05-14 13:45:27 +02:00
commit f882a7828e
24 changed files with 2835 additions and 0 deletions

4
.env.example 100644
View File

@ -0,0 +1,4 @@
export TELEGRAM_BOT_TOKEN=""
export OPENAI_API_KEY=""
export DATABASE_URL=""
export YOUTUBE_TRANSCRIPT_API_TOKEN=""

46
.gitignore vendored 100644
View File

@ -0,0 +1,46 @@
# Środowiska wirtualne
venv/
.env
.venv/
env/
ENV/
# Pliki Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Tox
.tox/
.coverage
htmlcov/
.pytest_cache/
# Logi i pliki danych
*.log
*.sqlite
*.db
# Pliki edytorów i IDE
.idea/
.vscode/
*.swp
*.swo
.DS_Store

431
Jenkinsfile vendored 100644
View File

@ -0,0 +1,431 @@
def execute_commands(commands, os_name, recommended_os) {
if("${os_name}" == "linux") {
if("${recommended_os}" == "only_linux" || "${recommended_os}" == "all_distros") {
sh "${commands}"
}
}
if("${os_name}" == "windows") {
if("${recommended_os}" == "only_windows" || "${recommended_os}" == "all_distros") {
powershell "${commands}"
}
}
}
def delete_directory(directory, os_name) {
execute_commands(
"""
if [ -d "./${directory}" ]
then
rm -r ./${directory};
fi
""",
"${os_name}",
"only_linux"
)
execute_commands(
"""
if(Test-Path -Path .\\${directory}\\) {
Remove-Item -Force -Recurse -Path .\\${directory}\\
}
""",
"${os_name}",
"only_windows"
)
}
def fetch_git_repository(APPLICATION_NAME, JENKINS_REPO_DIR, DATA_SOURCE_BRANCH, DATA_SOURCE_URL) {
echo """
Fetch ${APPLICATION_NAME}
"""
try {
sh """
git config --global --add safe.directory '*';
"""
dir("${JENKINS_REPO_DIR}") {
git credentialsId: 'git-gitea-tbs093a',
branch: "${DATA_SOURCE_BRANCH}",
url: "${DATA_SOURCE_URL}"
sh """
ls -la
"""
}
} catch(error) {
throw(error)
}
}
def set_env_vars(JENKINS_REPO_DIR, GIT_REPO_URL, DATABASE_HOST, DATABASE_PORT, DATABASE_NAME, VARS) {
echo """
Set Env Vars
"""
try {
dir("${JENKINS_REPO_DIR}") {
withCredentials(
[
usernamePassword(
credentialsId: 'git-gitea-tbs093a',
passwordVariable: 'GIT_TOKEN',
usernameVariable: 'GIT_USERNAME'
),
usernamePassword(
credentialsId: 'telegram-video-summary-bot-credentials',
passwordVariable: 'TELETHON_BOT_TOKEN',
usernameVariable: 'TELETHON_BOT_NAME'
),
usernamePassword(
credentialsId: 'openai-video-summary-bot-credentials',
passwordVariable: 'OPENAI_API_KEY',
usernameVariable: 'OPENAI_USERNAME'
),
usernamePassword(
credentialsId: 'youtube-transcript-api-credentials',
passwordVariable: 'TRANSCRIPT_API_TOKEN',
usernameVariable: 'TRANSCRIPT_API_USERNAME'
),
usernamePassword(
credentialsId: 'postgresql-video-summary-bot-credentials',
passwordVariable: 'DATABASE_PASSWORD',
usernameVariable: 'DATABASE_USERNAME'
),
]
) {
GIT_REPO_URL = GIT_REPO_URL.replace('https://', '')
sh "./set.envs.sh ${VARS} --set repo.url='https://${GIT_USERNAME}:${GIT_TOKEN}@${GIT_REPO_URL}' --set telegram.bot.token='${TELETHON_BOT_TOKEN}' --set OPENAI_API_KEY='${OPENAI_API_KEY}' --set DATABASE_URL='postgresql://${DATABASE_USERNAME}:${DATABASE_PASSWORD}@${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_NAME}' --dir ./ --exclude 'Jenkinsfile' --exclude '*.md' --exclude '*.sh' --exclude '*.py' --exclude '*.ini' --exclude '*.example' --exclude '*.session'"
// if you have $ inside password - just use \$ (dolar escape) inside this var at editing credentials in jenkins!
}
}
} catch(error) {
throw(error)
}
}
def test(JENKINS_REPO_DIR) {
echo """
Test Pump Bot On K8S
"""
try {
dir("${JENKINS_REPO_DIR}") {
sh """
export KUBECONFIG="/home/jenkins/.kube/config";
kubectl apply -f ./k8s.manifests/config.yml;
kubectl apply -f ./k8s.manifests/job.test.yml;
"""
// Wait until the pod in the deployment is running the tests
waitUntil {
def podName = sh(script: "export KUBECONFIG=\"/home/jenkins/.kube/config\"; kubectl get pods -l app=pump-bot-api-tests -o jsonpath='{.items[0].metadata.name}'", returnStdout: true).trim()
def status = sh(script: "export KUBECONFIG=\"/home/jenkins/.kube/config\"; kubectl get pod ${podName} -o jsonpath='{.status.phase}'", returnStdout: true).trim()
return status == 'Running' || status == 'Succeeded' || status == 'Failed'
}
// Capture the pod name after its up and running
def podName = sh(script: "export KUBECONFIG=\"/home/jenkins/.kube/config\"; kubectl get pods -l app=pump-bot-api-tests -o jsonpath='{.items[0].metadata.name}'", returnStdout: true).trim()
// Wait until the tests are finished by checking container logs
waitUntil {
// Sleep for 1 minute before the next check
sleep time: 1, unit: 'MINUTES'
def testLogs = sh(script: "export KUBECONFIG=\"/home/jenkins/.kube/config\"; kubectl logs ${podName} -c pump-bot-api-tests", returnStdout: true).trim()
echo "Test Logs:\n${testLogs}"
return testLogs.contains("summary") || testLogs.contains("tox") // assuming `tox` will indicate completion
}
// After tests, check if logs contain failure keywords
def finalLogs = sh(script: "export KUBECONFIG=\"/home/jenkins/.kube/config\"; kubectl logs ${podName} -c pump-bot-api-tests", returnStdout: true).trim()
if (finalLogs.contains("FAILED")) {
error("TESTS FAILED.")
}
sh """
export KUBECONFIG="/home/jenkins/.kube/config";
kubectl delete -f ./k8s.manifests/job.test.yml;
"""
}
} catch(error) {
throw(error)
}
}
def deploy(JENKINS_REPO_DIR) {
echo """
Deploy Bot On K8S
"""
try {
dir("${JENKINS_REPO_DIR}") {
sh """
export KUBECONFIG="/home/jenkins/.kube/config";
kubectl apply -f ./k8s.manifests/config.yml;
kubectl apply -f ./k8s.manifests/deployment.yml;
"""
}
} catch(error) {
throw(error)
}
}
pipeline {
agent {
node {
label "docker-builder && linux"
}
}
environment {
OS_NAME=""
JENKINS_REPO_BOT_DIR = "${JENKINS_AGENT_WORKDIR}/telegram.video.summary.bot"
GIT_BOT_REPO_URL="https://git.00x097.com/tbs093a/telegram.video.summary.bot.git"
JENKINS_USER_ID = 1000
JENKINS_GROUP_ID = 1000
}
triggers {
GenericTrigger(
genericVariables: [
[
key: 'DATA_SOURCE_BRANCH',
value: '$.DATA_SOURCE_BRANCH'
],
[
key: 'DATABASE_HOST',
value: '$.DATABASE_HOST'
],
[
key: 'DATABASE_PORT',
value: '$.DATABASE_PORT'
],
[
key: 'TESTS',
value: '$.TESTS'
],
[
key: 'DEPLOY',
value: '$.DEPLOY'
],
],
token: '077ff7e0-8460-4a63-9a33-71503bbad374'
)
}
stages {
stage('Setup Parameters') {
steps {
script {
if ("${NODE_NAME}".contains("windows")) {
OS_NAME = "windows"
} else {
OS_NAME = "linux"
}
properties([
parameters([
string(
defaultValue: 'master',
description: 'Select <b>Pump Bot</b> data source branch for the build & deploy',
name: 'DATA_SOURCE_BRANCH'
),
string(
defaultValue: 'localhost',
description: 'Select <b>Pump Bot</b> data source branch for the build & deploy',
name: 'DATABASE_HOST'
),
string(
defaultValue: '5432',
description: 'Select <b>Pump Bot</b> data source branch for the build & deploy',
name: 'DATABASE_PORT'
),
string(
defaultValue: 'video_summary_bot',
description: 'Select <b>Pump Bot</b> data source branch for the build & deploy',
name: 'DATABASE_NAME'
),
booleanParam(
defaultValue: false,
description: 'Enable if you want <b>run Pump Bot tests only</b>.',
name: 'TESTS'
),
booleanParam(
defaultValue: true,
description: 'Enable if you want <b>run Pump Bot</b> for capture pump singnals from telegram and invest automatically',
name: 'DEPLOY'
)
])
])
}
}
}
stage('Clean Workspace') {
steps {
script {
catchError(
buildresult: 'SUCCESS',
stageresult: 'UNSTABLE'
) {
delete_directory(
"${JENKINS_REPO_PUMP_SCRIPT_DIR}",
OS_NAME
)
}
}
}
}
stage('Fetch Script Repositories') {
steps {
script {
parallel(
pump_bot_script: {
fetch_git_repository(
"Pump Bot (on VM - ${NODE_NAME})",
"${JENKINS_REPO_PUMP_SCRIPT_DIR}",
"${DATA_SOURCE_BRANCH}",
"${GIT_REPO_PUMP_SCRIPT_URL}"
)
}
)
}
}
}
stage('Set Env Vars') {
steps {
script {
catchError(
buildresult: 'FAILURE',
stageresult: 'FAILURE'
) {
set_env_vars(
"${JENKINS_REPO_PUMP_SCRIPT_DIR}",
"${GIT_REPO_PUMP_SCRIPT_URL}",
""
)
}
}
}
}
stage('Test Pump Bot On K8S') {
when {
expression {
"${TESTS}" == "true"
}
}
steps {
echo """
Test Pump Bot On K8S
"""
catchError(
buildResult: 'SUCCESS',
stageResult: 'UNSTABLE'
) {
test(
"${JENKINS_REPO_PUMP_SCRIPT_DIR}"
)
}
}
}
stage('Deploy Pump Bot On K8S') {
when {
expression {
"${DEPLOY}" == "true"
}
}
steps {
echo """
Deploy Pump Bot On K8S
"""
catchError(
buildResult: 'FAILURE',
stageResult: 'FAILURE'
) {
deploy(
"${JENKINS_REPO_PUMP_SCRIPT_DIR}"
)
}
}
}
}
}

121
README.md 100644
View File

@ -0,0 +1,121 @@
# Video Summary Bot
Bot Telegram który nasłuchuje wiadomości z linkami do YouTube, pobiera transkrypcje filmów, streszcza je i zapisuje w bazie danych PostgreSQL.
## Funkcjonalności
- Automatyczne wykrywanie linków YouTube w wiadomościach
- Pobieranie transkrypcji filmów (obsługuje filmy w różnych językach, priorytetyzując polski i angielski)
- Generowanie streszczeń transkrypcji za pomocą OpenAI API
- Przechowywanie streszczeń, transkrypcji i informacji o filmach w bazie danych PostgreSQL
- Wysyłanie streszczeń z powrotem do czatu
## Wymagania
- Python 3.9+
- Serwer PostgreSQL
- Token bota Telegram (uzyskany przez BotFather)
- Klucz API OpenAI
- Token API youtube-transcript.io (do pobierania transkrypcji)
## Instalacja
1. Sklonuj repozytorium:
```
git clone <url-repozytorium>
cd video.summary.bot
```
2. Utwórz plik `.env` na podstawie `.env.example` i dodaj swoje klucze API i dane dostępowe:
```
TELEGRAM_BOT_TOKEN=twój_token_bota_telegram
OPENAI_API_KEY=twój_klucz_api_openai
DATABASE_URL=postgresql://użytkownik:hasło@host:port/nazwa_bazy
YOUTUBE_TRANSCRIPT_API_TOKEN=twój_token_api_youtube_transcript
```
3. Utwórz i aktywuj wirtualne środowisko (opcjonalnie, ale zalecane):
```
python -m venv venv
source venv/bin/activate # Linux/macOS
venv\Scripts\activate # Windows
```
4. Zainstaluj projekt w trybie deweloperskim:
```
pip install -e .
```
## Uruchomienie za pomocą tox
Najłatwiejszy sposób uruchomienia bota to użycie tox:
```
pip install tox
tox -e run
```
## Uruchomienie ręczne
Alternatywnie, możesz uruchomić bota bezpośrednio:
```
python -m src.bot.main
```
Lub po instalacji pakietu:
```
video-summary-bot
```
## Migracja z youtube-transcript-api na youtube-transcript.io API
W wersji 0.2.0 dokonaliśmy migracji z biblioteki `youtube-transcript-api` na oficjalne API `youtube-transcript.io`. Zmiana ta daje następujące korzyści:
- **Większa stabilność**: youtube-transcript.io to oficjalne API, które nie jest zależne od zmian w interfejsie YouTube
- **Wyższe limity**: Możliwość pobierania transkrypcji dla większej liczby filmów
- **Lepsze wsparcie językowe**: Dokładniejsze informacje o dostępnych językach
- **Możliwość zapytań zbiorczych**: API pozwala pobierać transkrypcje dla wielu filmów w jednym żądaniu
### Zmiany w konfiguracji
Migracja wymaga dodania nowego tokenu API w pliku `.env`:
```
YOUTUBE_TRANSCRIPT_API_TOKEN=twój_token_api_youtube_transcript
```
Token można uzyskać rejestrując się na stronie [youtube-transcript.io](https://www.youtube-transcript.io/).
## Struktura projektu
```
video.summary.bot/
├── src/
│ ├── bot/
│ │ ├── __init__.py
│ │ ├── config.py # Konfiguracja (API keys, DB)
│ │ ├── db.py # Interakcja z bazą danych PostgreSQL
│ │ ├── handlers.py # Logika obsługi wiadomości
│ │ ├── main.py # Punkt startowy bota
│ │ ├── openai_utils.py # Funkcje związane z OpenAI API
│ │ └── youtube_utils.py # Funkcje do pracy z YouTube
│ └── __init__.py
├── .env # Plik z sekretami (ignorowany przez Git)
├── .env.example # Przykładowy plik konfiguracyjny
├── pyproject.toml # Definicja projektu i zależności
├── tox.ini # Konfiguracja tox
└── README.md # Ten plik
```
## Uwagi dotyczące użycia
- **Koszty OpenAI API**: API OpenAI jest płatne. Monitoruj swoje zużycie, aby uniknąć niespodziewanych kosztów.
- **Koszty youtube-transcript.io API**: API youtube-transcript.io może być płatne w zależności od wybranego planu. Sprawdź aktualne ceny na stronie usługi.
- **Dostępność transkrypcji**: Nie wszystkie filmy na YouTube mają dostępne transkrypcje.
- **Limity API**: Upewnij się, że uwzględniasz limity API w przypadku intensywnego użytkowania.
## Licencja
Ten projekt jest dostępny na licencji MIT.

0
__init__.py 100644
View File

View File

@ -0,0 +1,9 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: bot-config
data:
.env: |
export TELETHON_BOT_NAME="<<telethon.bot.name>>"
export TELETHON_BOT_TOKEN="<<telethon.bot.token>>"
export TELETHON_BOT_ID=<<telethon.bot.id>>

View File

@ -0,0 +1,79 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: pump-bot
spec:
replicas: 1
selector:
matchLabels:
app: pump-bot
template:
metadata:
labels:
app: pump-bot
spec:
initContainers:
- name: remove-old-files
image: alpine/git
command:
- "sh"
- "-c"
- >
find /bot/ -mindepth 1 -delete;
volumeMounts:
- name: pump-bot-storage
mountPath: /pump.bot
- name: clone-repo
image: alpine/git
command:
- "sh"
- "-c"
- >
git clone <<repo.url>> /bot/;
volumeMounts:
- name: pump-bot-storage
mountPath: /pump.bot
- name: setup-environment
image: python:3.11.0
command:
- /bin/bash
- -c
- |
apt update -y;
apt upgrade -y;
apt install python3-pip -y;
pip3 install --upgrade pip setuptools;
pip3 install --target=/app/python-packages tox;
volumeMounts:
- name: pump-bot-storage
mountPath: /app
containers:
- name: pump-bot
image: python:3.11.0
env:
- name: PYTHONPATH
value: "/app/python-packages"
command:
- /bin/bash
- -c
- |
cd /app;
. ./.env;
python -m tox;
volumeMounts:
- name: pump-bot-storage
mountPath: /app
- name: config-volume
mountPath: /app/.env
subPath: .env
volumes:
- name: pump-bot-storage
persistentVolumeClaim:
claimName: pvc-pump-bot
- name: config-volume
configMap:
name: pump-bot-config
items:
- key: .env
path: .env

View File

@ -0,0 +1,76 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: pump-bot-api-tests
spec:
template:
metadata:
labels:
app: pump-bot-api-tests
spec:
restartPolicy: Never
initContainers:
- name: remove-old-files
image: alpine/git
command:
- "sh"
- "-c"
- >
find /pump.bot/ -mindepth 1 -delete;
volumeMounts:
- name: pump-bot-storage
mountPath: /pump.bot
- name: clone-repo
image: alpine/git
command:
- "sh"
- "-c"
- >
git clone <<pump.bot.repo.url>> /pump.bot/;
volumeMounts:
- name: pump-bot-storage
mountPath: /pump.bot
- name: setup-environment
image: python:3.11.0
command:
- /bin/bash
- -c
- |
apt update -y;
apt upgrade -y;
apt install python3-pip -y;
pip3 install --upgrade pip setuptools;
pip3 install --target=/app/python-packages tox;
volumeMounts:
- name: pump-bot-storage
mountPath: /app
containers:
- name: pump-bot
image: python:3.11.0
env:
- name: PYTHONPATH
value: "/app/python-packages"
command:
- /bin/bash
- -c
- |
cd /app;
. ./.env;
python -m tox -e api-tests;
volumeMounts:
- name: pump-bot-storage
mountPath: /app
- name: config-volume
mountPath: /app/.env
subPath: .env
volumes:
- name: pump-bot-storage
persistentVolumeClaim:
claimName: pvc-pump-bot
- name: config-volume
configMap:
name: pump-bot-config
items:
- key: .env
path: .env

52
main.py 100644
View File

@ -0,0 +1,52 @@
import asyncio
import logging
from telegram import Update
from telegram.ext import ApplicationBuilder, MessageHandler, filters, CommandHandler
from .src.config import TELEGRAM_BOT_TOKEN, logger # Import logger z config
from .src.handlers import handle_message, error_handler
from .src.db import init_db, close_db
async def post_init(application):
"""Funkcja wykonywana po inicjalizacji aplikacji bota."""
await init_db()
logger.info("Baza danych zainicjalizowana.")
async def post_shutdown(application):
"""Funkcja wykonywana przed zamknięciem bota."""
await close_db()
logger.info("Pula połączeń z bazą danych zamknięta.")
def run():
"""Uruchamia bota."""
if not TELEGRAM_BOT_TOKEN:
logger.critical("TELEGRAM_BOT_TOKEN nie jest ustawiony. Kończę działanie.")
return
logger.info("Uruchamiam bota...")
# Budowanie aplikacji bota
application = (
ApplicationBuilder()
.token(TELEGRAM_BOT_TOKEN)
.post_init(post_init) # Wywołaj init_db po starcie
.post_shutdown(post_shutdown) # Wywołaj close_db przed zamknięciem
.build()
)
# Rejestracja handlerów
# Nasłuchuj tylko wiadomości tekstowych w czatach prywatnych i grupowych
message_handler = MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)
application.add_handler(message_handler)
# Rejestracja globalnego error handlera
application.add_error_handler(error_handler)
# Uruchomienie bota (polling)
logger.info("Bot rozpoczął nasłuchiwanie...")
application.run_polling(allowed_updates=Update.ALL_TYPES) # Pobieraj wszystkie typy aktualizacji
if __name__ == '__main__':
# To pozwala uruchomić bota bezpośrednio przez `python src/bot/main.py`
# Jeśli chcesz używać skryptu zdefiniowanego w pyproject.toml,
# zainstaluj pakiet (np. pip install .) i uruchom `youtube-summarizer-bot`
run()

40
pyproject.toml 100644
View File

@ -0,0 +1,40 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "video-summary-bot"
version = "0.1.0"
description = "Bot Telegram do streszczania filmów YouTube"
readme = "README.md"
requires-python = ">=3.9" # python-telegram-bot v20+ wymaga Pythona 3.8+
authors = [
{ name = "Video Summary Bot Author", email = "example@example.com" },
]
dependencies = [
"python-telegram-bot[job-queue] >= 20.0", # Używamy najnowszej stabilnej wersji >= 20
"pytube >= 15.0.0",
"openai >= 1.0.0", # Nowa wersja API OpenAI
"asyncpg >= 0.27.0", # Async PostgreSQL driver
"python-dotenv >= 1.0.0", # Do wczytywania .env
"httpx >= 0.24.0", # Potrzebne dla python-telegram-bot v20+ i openai v1+
"aiohttp >= 3.9.0" # Do asynchronicznych zapytań HTTP do API youtube-transcript.io
]
[project.optional-dependencies]
dev = [
"pytest",
"pytest-asyncio",
"flake8",
"mypy",
"tox",
]
[project.scripts]
video-summary-bot = "bot.main:run" # Pozwala uruchomić bota komendą po instalacji
[tool.pytest.ini_options]
# Konfiguracja pytest
testpaths = ["tests"]
# Konfiguracja pytest-asyncio
asyncio_mode = "auto" # Automatycznie wykrywa testy asynchroniczne

90
set.envs.sh 100755
View File

@ -0,0 +1,90 @@
#!/bin/bash
# Initialize an empty associative array to store placeholders and their respective replacement values
declare -A replacements
# Array to store files or patterns to exclude
excludes=()
# Function to print usage
usage() {
echo "Usage: $0 [--set placeholder=value] [--dir directory] [--exclude file_or_pattern]"
echo "Example: $0 --set setup.repo=https://github.com/TSD/lol.git --set another.placeholder=some_other_value --dir ./ --exclude Jenkinsfile --exclude *.md"
exit 1
}
# Default directory is the current directory
ROOT_DIR="./"
# Parse command-line arguments
while [[ "$#" -gt 0 ]]; do
case $1 in
--set)
# Extract placeholder and value from --set argument
if [[ "$2" =~ ^([^=]+)=(.*)$ ]]; then
placeholder="${BASH_REMATCH[1]}"
value="${BASH_REMATCH[2]}"
replacements["$placeholder"]="$value"
else
echo "Error: Invalid format for --set. Use --set placeholder=value."
usage
fi
shift 2
;;
--dir)
# Set the root directory for the search
ROOT_DIR="$2"
shift 2
;;
--exclude)
# Add files or patterns to exclude
excludes+=("$2")
shift 2
;;
*)
echo "Error: Unknown option $1"
usage
;;
esac
done
# Check if at least one --set argument is provided
if [ ${#replacements[@]} -eq 0 ]; then
echo "Error: No placeholders provided. Use --set placeholder=value."
usage
fi
# Check if the directory exists
if [ ! -d "$ROOT_DIR" ]; then
echo "Error: Directory $ROOT_DIR does not exist."
exit 1
fi
# Prepare the find command with exclusion
exclude_cmd=""
for exclude in "${excludes[@]}"; do
exclude_cmd+="! -name '$exclude' "
done
# Find all files recursively in the specified directory, excluding specified patterns
eval "find \"$ROOT_DIR\" -type f $exclude_cmd" | while read -r file; do
echo "Processing file: $file"
# Loop through each placeholder in the associative array
for placeholder in "${!replacements[@]}"; do
value="${replacements[$placeholder]}"
# Use sed to replace the placeholder with the corresponding value
sed -i "s|<<$placeholder>>|$value|g" "$file"
# Confirm replacement (optional logging)
if grep -q "$value" "$file"; then
echo " Successfully replaced $placeholder with $value in $file"
else
echo " No occurrences of $placeholder found in $file"
fi
done
done
echo "All files processed."

1
src/__init__.py 100644
View File

@ -0,0 +1 @@
# Plik inicjalizacyjny pakietu bot

46
src/config.py 100644
View File

@ -0,0 +1,46 @@
import os
from dotenv import load_dotenv
import logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Załaduj zmienne środowiskowe z pliku .env
dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env')
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path)
logger.info(f"Załadowano zmienne środowiskowe z: {dotenv_path}")
else:
logger.warning(f"Nie znaleziono pliku .env w {dotenv_path}, korzystanie ze zmiennych systemowych.")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
DATABASE_URL = os.getenv("DATABASE_URL")
YOUTUBE_TRANSCRIPT_API_TOKEN = os.getenv("YOUTUBE_TRANSCRIPT_API_TOKEN")
if not TELEGRAM_BOT_TOKEN:
logger.error("Nie znaleziono TELEGRAM_BOT_TOKEN w zmiennych środowiskowych.")
raise ValueError("Brak TELEGRAM_BOT_TOKEN")
if not OPENAI_API_KEY:
logger.error("Nie znaleziono OPENAI_API_KEY w zmiennych środowiskowych.")
raise ValueError("Brak OPENAI_API_KEY")
if not DATABASE_URL:
logger.error("Nie znaleziono DATABASE_URL w zmiennych środowiskowych.")
raise ValueError("Brak DATABASE_URL")
if not YOUTUBE_TRANSCRIPT_API_TOKEN:
logger.warning("Nie znaleziono YOUTUBE_TRANSCRIPT_API_TOKEN w zmiennych środowiskowych.")
logger.warning("Funkcje transkrypcji mogą nie działać poprawnie.")
# Inne ustawienia
TRANSCRIPT_LANGUAGES = ['pl', 'en'] # Priorytet języków transkrypcji
SUMMARY_PROMPT = """Streść poniższy transkrypt filmu z YouTube w zwięzły sposób w języku polskim.
Skup się na głównych tematach i wnioskach.
Transkrypt:
{transcript}"""

75
src/db.py 100644
View File

@ -0,0 +1,75 @@
import asyncpg
import logging
from typing import Optional
from .config import DATABASE_URL
logger = logging.getLogger(__name__)
pool = None
async def init_db():
"""Inicjalizuje pulę połączeń i tworzy tabelę, jeśli nie istnieje."""
global pool
if pool is None:
try:
pool = await asyncpg.create_pool(DATABASE_URL)
logger.info("Utworzono pulę połączeń z bazą danych.")
async with pool.acquire() as connection:
await connection.execute("""
CREATE TABLE IF NOT EXISTS videos (
id SERIAL PRIMARY KEY,
url TEXT UNIQUE NOT NULL,
title TEXT,
transcript TEXT,
summary TEXT,
added_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
""")
logger.info("Sprawdzono/utworzono tabelę 'videos'.")
except Exception as e:
logger.error(f"Błąd podczas inicjalizacji puli połączeń z bazą danych: {e}", exc_info=True)
pool = None # Resetuj pulę w przypadku błędu
raise # Rzuć wyjątek, aby zapobiec uruchomieniu bota
async def get_db_pool():
"""Zwraca istniejącą pulę połączeń lub inicjalizuje ją."""
if pool is None:
await init_db()
if pool is None: # Sprawdź ponownie po próbie init_db
raise ConnectionError("Pula połączeń z bazą danych jest niedostępna.")
return pool
async def save_video_summary(url: str, title: str, transcript: str, summary: str):
"""Zapisuje dane filmu do bazy danych."""
db_pool = await get_db_pool()
async with db_pool.acquire() as connection:
try:
# Użyj INSERT ... ON CONFLICT DO UPDATE, aby zaktualizować, jeśli URL już istnieje
await connection.execute("""
INSERT INTO videos (url, title, transcript, summary)
VALUES ($1, $2, $3, $4)
ON CONFLICT (url) DO UPDATE
SET title = EXCLUDED.title,
transcript = EXCLUDED.transcript,
summary = EXCLUDED.summary,
added_at = CURRENT_TIMESTAMP;
""", url, title, transcript, summary)
logger.info(f"Zapisano/zaktualizowano streszczenie dla URL: {url}")
return True
except Exception as e:
logger.error(f"Błąd podczas zapisywania streszczenia dla {url}: {e}", exc_info=True)
return False
async def check_if_url_exists(url: str) -> bool:
"""Sprawdza, czy dany URL już istnieje w bazie."""
db_pool = await get_db_pool()
async with db_pool.acquire() as connection:
exists = await connection.fetchval("SELECT EXISTS(SELECT 1 FROM videos WHERE url=$1)", url)
return exists
async def close_db():
"""Zamyka pulę połączeń."""
global pool
if pool:
await pool.close()
pool = None
logger.info("Zamknięto pulę połączeń z bazą danych.")

134
src/handlers.py 100644
View File

@ -0,0 +1,134 @@
import logging
import re
from telegram import Update
from telegram.ext import ContextTypes
from telegram.constants import ParseMode
from .youtube_utils import extract_youtube_urls, extract_video_id, get_transcript, get_video_title
from .openai_utils import summarize_text
from .db import save_video_summary, check_if_url_exists
from .config import TRANSCRIPT_LANGUAGES
logger = logging.getLogger(__name__)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Obsługuje przychodzące wiadomości tekstowe."""
message = update.message
if not message or not message.text:
return # Ignoruj wiadomości bez tekstu
chat_id = message.chat_id
text = message.text
user = message.from_user.username or message.from_user.first_name
logger.info(f"Otrzymano wiadomość od {user} w czacie {chat_id}: {text[:50]}...")
youtube_urls = extract_youtube_urls(text)
if not youtube_urls:
# logger.debug("Nie znaleziono URL-i YouTube w wiadomości.")
return # Nic do roboty, jeśli nie ma linków YT
processed_urls_in_message = set()
for url in youtube_urls:
if url in processed_urls_in_message:
continue # Już przetworzono ten URL z tej wiadomości
logger.info(f"Znaleziono URL YouTube: {url}")
# Opcjonalnie: sprawdź, czy już istnieje w bazie, zanim zaczniesz przetwarzać
# if await check_if_url_exists(url):
# logger.info(f"URL {url} już istnieje w bazie danych. Pomijam.")
# await context.bot.send_message(
# chat_id=chat_id,
# text=f"Informacje o filmie {url} są już w bazie.",
# disable_web_page_preview=True
# )
# processed_urls_in_message.add(url)
# continue
video_id = extract_video_id(url)
if not video_id:
logger.warning(f"Nie udało się wydobyć ID filmu z URL: {url}")
continue # Przejdź do następnego URL
await context.bot.send_chat_action(chat_id=chat_id, action='typing')
# 1. Pobierz tytuł
title = await get_video_title(url)
if not title:
logger.warning(f"Nie udało się pobrać tytułu dla ID filmu: {video_id}")
await context.bot.send_message(
chat_id=chat_id,
text=f"Nie udało się pobrać tytułu dla filmu: {url}",
disable_web_page_preview=True
)
processed_urls_in_message.add(url)
continue # Potrzebujemy tytułu
await context.bot.send_chat_action(chat_id=chat_id, action='typing')
# 2. Pobierz transkrypcję
transcript = await get_transcript(video_id, TRANSCRIPT_LANGUAGES)
if not transcript:
logger.warning(f"Nie udało się pobrać transkrypcji dla ID filmu: {video_id}")
await context.bot.send_message(
chat_id=chat_id,
text=f"Nie udało się pobrać transkrypcji dla filmu: {title} ({url})",
disable_web_page_preview=True
)
processed_urls_in_message.add(url)
continue # Potrzebujemy transkrypcji do streszczenia
await context.bot.send_chat_action(chat_id=chat_id, action='typing')
# 3. Wygeneruj streszczenie
summary = await summarize_text(transcript)
if not summary:
logger.error(f"Nie udało się wygenerować streszczenia dla ID filmu: {video_id}")
await context.bot.send_message(
chat_id=chat_id,
text=f"Nie udało się wygenerować streszczenia dla filmu: {title} ({url})",
disable_web_page_preview=True
)
processed_urls_in_message.add(url)
continue
# 4. Zapisz do bazy danych
saved = await save_video_summary(url, title, transcript, summary)
if saved:
logger.info(f"Pomyślnie przetworzono i zapisano film: {title} ({url})")
response_text = (
f"*Przetworzono film:* {escape_markdown_v2(title)}\n\n"
f"*Link:* {escape_markdown_v2(url)}\n\n"
f"*Streszczenie:*\n{escape_markdown_v2(summary)}"
)
await context.bot.send_message(
chat_id=chat_id,
text=response_text,
parse_mode=ParseMode.MARKDOWN_V2,
disable_web_page_preview=True # Wyłącz podgląd linku w odpowiedzi bota
)
else:
logger.error(f"Nie udało się zapisać danych do bazy dla filmu: {title} ({url})")
await context.bot.send_message(
chat_id=chat_id,
text=f"Wystąpił błąd podczas zapisywania danych dla filmu: {title} ({url})",
disable_web_page_preview=True
)
processed_urls_in_message.add(url) # Oznacz URL jako przetworzony w tej wiadomości
# Funkcja pomocnicza do escape'owania znaków specjalnych MarkdownV2
def escape_markdown_v2(text: str) -> str:
"""Ucieka znaki specjalne dla parsowania Telegram MarkdownV2."""
escape_chars = r'_*[]()~`>#+-=|{}.!'
return re.sub(f'([{re.escape(escape_chars)}])', r'\\\1', text)
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Loguje błędy zgłoszone przez `python-telegram-bot`."""
logger.error(f"Wyjątek podczas obsługi aktualizacji: {context.error}", exc_info=context.error)
# Opcjonalnie: powiadom administratora o błędzie
# if isinstance(update, Update) and update.effective_chat:
# await context.bot.send_message(
# chat_id=ADMIN_CHAT_ID, # Zdefiniuj ADMIN_CHAT_ID w config.py
# text=f"Wystąpił błąd: {context.error}"
# )

View File

@ -0,0 +1,79 @@
import logging
from typing import Optional
from openai import AsyncOpenAI # Używamy AsyncOpenAI dla kompatybilności z asyncio
from .config import OPENAI_API_KEY, SUMMARY_PROMPT
logger = logging.getLogger(__name__)
class OpenAIUtilsError(Exception):
"""Bazowa klasa wyjątków dla modułu openai_utils."""
pass
class EmptyTextError(OpenAIUtilsError):
"""Wyjątek rzucany przy próbie streszczenia pustego tekstu."""
pass
class APIKeyMissingError(OpenAIUtilsError):
"""Wyjątek rzucany gdy brak klucza API OpenAI."""
pass
class SummarizationError(OpenAIUtilsError):
"""Wyjątek rzucany przy błędach streszczania tekstu."""
pass
# Inicjalizuj klienta OpenAI asynchronicznie
client = None
try:
if not OPENAI_API_KEY:
logger.error("Brak klucza API OpenAI. Ustaw zmienną środowiskową OPENAI_API_KEY.")
raise APIKeyMissingError("Brak klucza API OpenAI. Ustaw zmienną środowiskową OPENAI_API_KEY.")
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
except Exception as e:
logger.error(f"Błąd inicjalizacji klienta OpenAI: {e}", exc_info=True)
# Nie rzucamy tu wyjątku, bo to moment inicjalizacji modułu
async def summarize_text(text: str) -> str:
"""
Wysyła tekst do API OpenAI w celu streszczenia.
Args:
text: Tekst do streszczenia
Returns:
Streszczenie tekstu
Raises:
EmptyTextError: Gdy tekst jest pusty
APIKeyMissingError: Gdy brak klucza API OpenAI
SummarizationError: Przy błędach API OpenAI
"""
if not text:
logger.warning("Próba streszczenia pustego tekstu.")
raise EmptyTextError("Próba streszczenia pustego tekstu.")
if not client:
logger.error("Klient OpenAI nie został zainicjalizowany.")
raise APIKeyMissingError("Klient OpenAI nie został zainicjalizowany. Sprawdź klucz API.")
prompt = SUMMARY_PROMPT.format(transcript=text)
logger.debug(f"Długość tekstu do streszczenia: {len(text)} znaków")
try:
logger.info("Wysyłanie zapytania do OpenAI API")
response = await client.chat.completions.create(
model="gpt-4o-mini", # Możesz zmienić na gpt-4, jeśli masz dostęp i budżet
messages=[
{"role": "system", "content": "Jesteś pomocnym asystentem specjalizującym się w streszczaniu transkryptów wideo."},
{"role": "user", "content": prompt}
],
temperature=0.5, # Niższa temperatura dla bardziej spójnych streszczeń
max_tokens=150, # Ogranicz długość odpowiedzi
)
summary = response.choices[0].message.content.strip()
logger.info(f"Pomyślnie wygenerowano streszczenie za pomocą OpenAI. Długość: {len(summary)} znaków")
return summary
except Exception as e:
logger.error(f"Błąd API OpenAI podczas streszczania: {e}", exc_info=True)
raise SummarizationError(f"Błąd API OpenAI: {str(e)}")

View File

@ -0,0 +1,250 @@
import re
import logging
import aiohttp
import json
from typing import Tuple, List, Optional, Dict, Any
from .config import YOUTUBE_TRANSCRIPT_API_TOKEN
logger = logging.getLogger(__name__)
# Rozbudowany regex do wykrywania linków YouTube
YOUTUBE_URL_PATTERNS = [
r'(https?://(?:www\.)?youtube\.com/watch\?v=[\w-]+)',
r'(https?://youtu\.be/[\w-]+)',
r'(https?://m\.youtube\.com/watch\?v=[\w-]+)',
r'(https?://(?:www\.)?youtube\.com/shorts/[\w-]+)',
]
COMPILED_YOUTUBE_REGEX = re.compile('|'.join(YOUTUBE_URL_PATTERNS), re.IGNORECASE)
# Stała dla URL API
YOUTUBE_TRANSCRIPT_API_URL = "https://www.youtube-transcript.io/api/transcripts"
class YouTubeUtilsError(Exception):
"""Bazowa klasa wyjątków dla modułu youtube_utils."""
pass
class TranscriptsDisabled(YouTubeUtilsError):
"""Wyjątek rzucany gdy transkrypcje są wyłączone dla wideo."""
pass
class NoTranscriptFound(YouTubeUtilsError):
"""Wyjątek rzucany gdy nie znaleziono transkrypcji dla wideo."""
pass
class APITokenMissing(YouTubeUtilsError):
"""Wyjątek rzucany gdy brak tokenu API."""
pass
class AuthorizationError(YouTubeUtilsError):
"""Wyjątek rzucany przy błędach autoryzacji API."""
pass
class APIConnectionError(YouTubeUtilsError):
"""Wyjątek rzucany przy problemach z połączeniem do API."""
pass
class APIResponseError(YouTubeUtilsError):
"""Wyjątek rzucany przy nieprawidłowych odpowiedziach z API."""
pass
class NoTranscriptLanguagesAvailable(YouTubeUtilsError):
"""Wyjątek rzucany gdy nie ma dostępnych transkrypcji w żadnym języku."""
pass
def extract_youtube_urls(text: str) -> List[str]:
"""Ekstrahuje unikalne linki YouTube z tekstu."""
if not text:
return []
# Używamy finditer, aby znaleźć wszystkie pasujące linki
matches = COMPILED_YOUTUBE_REGEX.finditer(text)
# Bierzemy pierwszą grupę z każdego dopasowania, która zawiera czysty URL
urls = {match.group(0) for match in matches if match.group(0)}
return list(urls)
def extract_video_id(url: str) -> Optional[str]:
"""Ekstrahuje ID filmu z różnych formatów URL YouTube."""
patterns = [
r'watch\?v=([\w-]+)', # Standardowy URL
r'youtu\.be/([\w-]+)', # Skrócony URL
r'embed/([\w-]+)', # URL do osadzania
r'v/([\w-]+)', # Starszy format
r'shorts/([\w-]+)' # YouTube Shorts
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
logger.warning(f"Nie udało się wydobyć ID filmu z URL: {url}")
return None
async def get_transcript(video_id: str, languages: List[str], country_code: str = "us") -> Tuple[str, str]:
"""
Pobiera transkrypcję i tytuł dla danego ID filmu z API youtube-transcript.io.
Args:
video_id: ID filmu YouTube
languages: Lista kodów języków do preferowania (np. ['pl', 'en'])
country_code: Kod kraju do wykorzystania przy pobieraniu transkrypcji (domyślnie "us")
Returns:
Krotka (transkrypcja, tytuł)
Raises:
APITokenMissing: Gdy brak tokenu API
TranscriptsDisabled: Gdy transkrypcje wyłączone dla wideo
NoTranscriptFound: Gdy nie znaleziono transkrypcji dla wideo
AuthorizationError: Przy błędach autoryzacji API (401, 403)
APIConnectionError: Przy problemach z połączeniem do API
APIResponseError: Przy nieprawidłowych odpowiedziach z API
NoTranscriptLanguagesAvailable: Gdy nie ma dostępnych transkrypcji w żadnym języku
Exception: Przy innych nieoczekiwanych błędach
"""
if not YOUTUBE_TRANSCRIPT_API_TOKEN:
logger.error("Brak tokenu API. Ustaw zmienną środowiskową YOUTUBE_TRANSCRIPT_API_TOKEN.")
raise APITokenMissing("Brak tokenu API youtube-transcript.io. Ustaw zmienną środowiskową YOUTUBE_TRANSCRIPT_API_TOKEN.")
try:
logger.debug(f"Rozpoczynam pobieranie transkrypcji dla ID: {video_id}, preferowane języki: {languages}")
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Basic {YOUTUBE_TRANSCRIPT_API_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"ids": [video_id]
}
if country_code:
payload["countryCode"] = country_code
logger.info(f"Wysyłanie zapytania do API youtube-transcript.io dla ID: {video_id}")
logger.debug(f"Parametry zapytania: {payload}")
try:
async with session.post(
YOUTUBE_TRANSCRIPT_API_URL,
headers=headers,
json=payload
) as response:
status_code = response.status
logger.debug(f"Otrzymano odpowiedź z API, status: {status_code}")
if status_code == 200:
try:
data = await response.json()
# Sprawdź, czy odpowiedź zawiera listę wyników
if not data or not isinstance(data, list) or len(data) == 0:
logger.error(f"Nieprawidłowa struktura odpowiedzi API: {data}")
raise APIResponseError(f"Nieprawidłowa struktura odpowiedzi API dla {video_id}")
# Pobierz pierwszy element z listy (odpowiedź dla pierwszego ID)
video_data = data[0]
# Sprawdź, czy mamy informację o błędzie
if "error" in video_data:
error_msg = video_data["error"]
logger.warning(f"API zwróciło błąd dla {video_id}: {error_msg}")
if "disabled for this video" in error_msg.lower():
raise TranscriptsDisabled(f"Transkrypcje wyłączone dla {video_id}: {error_msg}")
else:
raise NoTranscriptFound(f"Nie znaleziono transkrypcji dla {video_id}: {error_msg}")
# Pobierz tytuł wideo
video_title = ""
if "title" in video_data:
video_title = video_data["title"]
logger.info(f"Pobrano tytuł dla ID filmu: {video_id}: {video_title}")
else:
logger.warning(f"Brak tytułu w odpowiedzi API dla ID filmu: {video_id}")
# Sprawdź czy istnieją ścieżki transkrypcji
if "tracks" not in video_data or not video_data["tracks"]:
logger.error(f"Brak ścieżek transkrypcji dla ID filmu: {video_id}")
raise NoTranscriptFound(f"Brak ścieżek transkrypcji dla {video_id}")
# Mapuj dostępne języki
available_tracks = {track["language"].lower(): track for track in video_data["tracks"]}
available_languages = list(available_tracks.keys())
if not available_languages:
raise NoTranscriptLanguagesAvailable(f"Brak dostępnych języków transkrypcji dla {video_id}")
logger.info(f"Dostępne języki dla {video_id}: {available_languages}")
# Mapa kodów językowych (pomocne, jeśli API zwraca pełne nazwy)
language_code_map = {}
if "languages" in video_data:
for lang_info in video_data["languages"]:
if "label" in lang_info and "languageCode" in lang_info:
language_code_map[lang_info["label"].lower()] = lang_info["languageCode"]
# Wybierz preferowany język
selected_language = None
# Najpierw próbuj znaleźć bezpośrednie dopasowanie z preferowanych języków
for lang in languages:
lang_lower = lang.lower()
# Sprawdź bezpośrednie dopasowanie
if lang_lower in available_languages:
selected_language = lang_lower
logger.info(f"Wybrany preferowany język: {lang} dla {video_id}")
break
# Sprawdź, czy kod języka pasuje do pełnej nazwy
for available_lang in available_languages:
if available_lang.lower() in language_code_map and language_code_map[available_lang.lower()] == lang_lower:
selected_language = available_lang
logger.info(f"Wybrany preferowany język (przez kod): {lang} dla {video_id}")
break
# Jeśli nie znaleziono preferowanego języka, użyj pierwszego dostępnego
if not selected_language:
selected_language = available_languages[0]
logger.warning(f"Nie znaleziono preferowanych języków {languages} dla {video_id}. "
f"Używam dostępnego: {selected_language}")
# Pobierz transkrypcję w wybranym języku
selected_track = available_tracks[selected_language]
if "transcript" not in selected_track:
logger.error(f"Brak transkrypcji w ścieżce {selected_language} dla {video_id}")
raise NoTranscriptFound(f"Brak transkrypcji w ścieżce {selected_language} dla {video_id}")
transcript_parts = selected_track["transcript"]
full_transcript = " ".join([part["text"] for part in transcript_parts])
logger.info(f"Pomyślnie pobrano transkrypcję dla ID filmu: {video_id} "
f"(Język: {selected_language}, długość: {len(full_transcript)} znaków)")
return full_transcript, video_title
except json.JSONDecodeError as e:
logger.error(f"Błąd parsowania JSON dla {video_id}: {e}", exc_info=True)
response_text = await response.text()
logger.debug(f"Zawartość odpowiedzi: {response_text[:500]}...")
raise APIResponseError(f"Nieprawidłowy format odpowiedzi JSON: {e}")
elif status_code == 404:
logger.warning(f"Nie znaleziono transkrypcji dla ID filmu: {video_id} (HTTP 404)")
raise NoTranscriptFound(f"API zwróciło status 404 dla {video_id}")
elif status_code == 401 or status_code == 403:
response_text = await response.text()
logger.error(f"Błąd autoryzacji dla API youtube-transcript.io: {status_code}")
logger.debug(f"Treść odpowiedzi: {response_text[:500]}...")
raise AuthorizationError(f"Błąd autoryzacji API (HTTP {status_code}): sprawdź token API")
else:
response_text = await response.text()
logger.error(f"Nieoczekiwany status odpowiedzi API: {status_code}")
logger.debug(f"Treść odpowiedzi: {response_text[:500]}...")
raise APIResponseError(f"Nieoczekiwany status odpowiedzi API: {status_code}")
except aiohttp.ClientError as e:
logger.error(f"Błąd połączenia HTTP dla {video_id}: {str(e)}", exc_info=True)
raise APIConnectionError(f"Błąd połączenia z API: {str(e)}")
except (TranscriptsDisabled, NoTranscriptFound, APITokenMissing, AuthorizationError,
APIConnectionError, APIResponseError, NoTranscriptLanguagesAvailable):
# Propaguj znane wyjątki bezpośrednio
raise
except Exception as e:
logger.error(f"Nieoczekiwany błąd podczas pobierania transkrypcji dla {video_id}: {e}", exc_info=True)
raise YouTubeUtilsError(f"Nieoczekiwany błąd: {str(e)}")

92
tests/README.md 100644
View File

@ -0,0 +1,92 @@
# Testy jednostkowe i integracyjne dla Telegram Video Summary Bot
Ten katalog zawiera testy jednostkowe i integracyjne dla głównych komponentów bota do streszczania filmów YouTube.
## Struktura testów
- **test_youtube_utils.py** - testy jednostkowe dla modułu `youtube_utils.py` (ekstrakcja URL, ID, pobieranie transkrypcji)
- **test_openai_utils.py** - testy jednostkowe dla modułu `openai_utils.py` (streszczanie tekstu)
- **test_integration.py** - testy integracyjne łączące obie funkcjonalności (od URL do streszczenia)
- **test_real_integration.py** - testy wykorzystujące rzeczywiste API (YouTube, OpenAI) bez mockowania
- **conftest.py** - konfiguracja pytest dla testów asynchronicznych
## Typy testów
### Testy jednostkowe (mocki)
Testy jednostkowe używają mocków do symulowania zewnętrznych zależności i skupiają się na testowaniu indywidualnych funkcji w izolacji. Są szybkie i niezależne od zewnętrznych serwisów.
### Testy integracyjne rzeczywiste
Testy w pliku `test_real_integration.py` używają rzeczywistych usług zewnętrznych:
- Komunikują się z rzeczywistym API YouTube, aby pobierać transkrypcje i metadane filmów
- Wywołują rzeczywiste API OpenAI (wymaga klucza API)
- Testują pełny przepływ od URL do streszczenia
Te testy są oznaczone jako "slow" i "integration", więc można je łatwo pominąć podczas zwykłego uruchamiania testów.
## Uruchamianie testów
### Za pomocą pytest
Aby uruchomić wszystkie testy, użyj pytest:
```bash
python -m pytest tests/
```
Aby uruchomić tylko testy jednostkowe (szybkie, bez zewnętrznych zależności):
```bash
python -m pytest tests/ -k "not integration and not slow"
```
Aby uruchomić tylko testy integracyjne z rzeczywistymi API:
```bash
python -m pytest tests/test_real_integration.py -v
```
Testy asynchroniczne wykorzystują plugin pytest-asyncio, który jest skonfigurowany w pliku `conftest.py`.
### Za pomocą tox
Testy można również uruchomić za pomocą narzędzia tox, które tworzy izolowane środowisko wirtualne:
```bash
# Uruchomienie tylko testów jednostkowych (mocki)
tox -e unit-tests
# Uruchomienie testów integracyjnych z rzeczywistymi API
tox -e integration-tests
# Uruchomienie wszystkich testów
tox -e all-tests
# Uruchomienie testów z raportem pokrycia kodu
tox -e cov
```
## Wymagania
Testy integracyjne z rzeczywistymi API wymagają:
1. Połączenia z internetem
2. Klucza API OpenAI (dla testów związanych z OpenAI)
Aby testy OpenAI działały, ustaw zmienną środowiskową:
```bash
export OPENAI_API_KEY=twój_klucz_api
```
lub użyj pliku `.env` w katalogu głównym projektu:
```
OPENAI_API_KEY=twój_klucz_api
```
## Uwagi
- Testy używające rzeczywistych API mogą czasami kończyć się niepowodzeniem ze względu na ograniczenia API, problemy z siecią, itp.
- Testy OpenAI zużywają tokeny, co może wiązać się z kosztami
- Używamy publicznego i popularnego TED Talk jako przykładowego filmu, który ma transkrypcje w wielu językach i jest mało prawdopodobne, że zostanie usunięty

68
tests/conftest.py 100644
View File

@ -0,0 +1,68 @@
import pytest
import os
import sys
import logging
# Dodanie katalogu nadrzędnego do ścieżki dla importów
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Konfiguracja pytest-asyncio
# Ta opcja spowoduje, że wszystkie funkcje testowe z 'async def'
# będą automatycznie traktowane jako testy asynchroniczne
pytest_plugins = ['pytest_asyncio']
# Konfiguracja logowania dla testów
def pytest_configure(config):
"""Konfiguracja pytest przed uruchomieniem testów."""
# Ustawienie domyślnego zakresu pętli zdarzeń
config.option.asyncio_default_fixture_loop_scope = "function"
# Ustawienie trybu asyncio na AUTO, aby nie było ostrzeżeń o brakującym dekoratorze
# dla testów asynchronicznych
config.option.asyncio_mode = "auto"
# Rejestracja niestandardowych markerów testów
config.addinivalue_line(
"markers", "integration: testy wymagające rzeczywistych zewnętrznych usług"
)
config.addinivalue_line(
"markers", "slow: testy, które trwają dłużej niż standardowe testy jednostkowe"
)
# Konfiguracja logowania
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Log do konsoli
logging.FileHandler('pytest.log') # Log do pliku
]
)
# Zmniejsz poziom logowania dla bibliotek zewnętrznych
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('aiohttp').setLevel(logging.WARNING)
# Zwiększ poziom logowania dla naszych modułów
logging.getLogger('src').setLevel(logging.DEBUG)
# Hook uruchamiany przed każdym testem
def pytest_runtest_setup(item):
"""Dodatkowa konfiguracja przed każdym testem."""
# Dodaj linię oddzielającą w logach dla lepszej czytelności
logging.info("=" * 80)
logging.info(f"Rozpoczęcie testu: {item.name}")
# Hook uruchamiany po każdym teście
def pytest_runtest_teardown(item, nextitem):
"""Dodatkowa konfiguracja po każdym teście."""
logging.info(f"Zakończenie testu: {item.name}")
logging.info("-" * 80)
# Hook do obsługi niepowodzeń testów
def pytest_runtest_logreport(report):
"""Logowanie dodatkowych informacji o niepowodzeniach testów."""
if report.when == "call" and report.failed:
logging.error(f"Test zakończony niepowodzeniem: {report.nodeid}")
if hasattr(report, "longrepr"):
logging.error(f"Szczegóły błędu: {report.longrepr}")

View File

@ -0,0 +1,159 @@
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
import sys
import os
# Dodanie katalogu nadrzędnego do ścieżki dla importów
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.youtube_utils import extract_youtube_urls, extract_video_id, get_transcript, NoTranscriptFound, TranscriptsDisabled
from src.openai_utils import summarize_text
@pytest.mark.asyncio
async def test_youtube_to_openai_integration():
# 1. Przygotowanie danych testowych
text_with_youtube_url = "Sprawdź to wideo https://www.youtube.com/watch?v=abc123"
# 2. Przygotowanie mocka dla API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API zawierająca transkrypcję w języku polskim i tytuł
mock_json_data = {
"abc123": {
"title": "Tytuł przykładowego wideo",
"pl": [
{"text": "To jest", "start": 0.0, "duration": 1.0},
{"text": "przykładowa transkrypcja", "start": 1.0, "duration": 2.0},
{"text": "z YouTube", "start": 3.0, "duration": 1.0}
]
}
}
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# 3. Przygotowanie mocka dla odpowiedzi OpenAI
mock_choice = MagicMock()
mock_message = MagicMock()
mock_message.content = "Film zawiera przykładową transkrypcję z YouTube."
mock_choice.message = mock_message
mock_openai_response = MagicMock()
mock_openai_response.choices = [mock_choice]
# 4. Proces integracji z wykorzystaniem kontekstowych managerów dla mocków
# Najpierw sprawdzamy podstawowe funkcje bez mocków
urls = extract_youtube_urls(text_with_youtube_url)
assert len(urls) == 1
video_id = extract_video_id(urls[0])
assert video_id == "abc123"
# Teraz testujemy z mockami dla operacji asynchronicznych
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
result = await get_transcript(video_id, ["pl", "en"])
assert result is not None
transcript, title = result
assert transcript == "To jest przykładowa transkrypcja z YouTube"
assert title == "Tytuł przykładowego wideo"
with patch('src.openai_utils.client.chat.completions.create',
new=AsyncMock(return_value=mock_openai_response)):
summary = await summarize_text(transcript)
assert summary == "Film zawiera przykładową transkrypcję z YouTube."
@pytest.mark.asyncio
async def test_integration_with_english_fallback():
# 1. Symulacja braku polskiej transkrypcji, ale z dostępną angielską
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API zawierająca tylko transkrypcję angielską i tytuł
mock_json_data = {
"xyz789": {
"title": "English Sample Video",
"en": [
{"text": "This is", "start": 0.0, "duration": 1.0},
{"text": "a sample", "start": 1.0, "duration": 2.0},
{"text": "transcript", "start": 3.0, "duration": 1.0}
]
}
}
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# 2. Przygotowanie mocka dla odpowiedzi OpenAI
mock_choice = MagicMock()
mock_message = MagicMock()
mock_message.content = "Film zawiera angielską transkrypcję."
mock_choice.message = mock_message
mock_openai_response = MagicMock()
mock_openai_response.choices = [mock_choice]
# 3. Proces integracji z transkrypcją angielską jako fallback
video_id = "xyz789"
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Pobieranie transkrypcji - preferujemy polski, ale dostępny jest tylko angielski
result = await get_transcript(video_id, ["pl", "en"])
assert result is not None
transcript, title = result
assert transcript == "This is a sample transcript"
assert title == "English Sample Video"
# Generowanie streszczenia
with patch('src.openai_utils.client.chat.completions.create',
new=AsyncMock(return_value=mock_openai_response)):
summary = await summarize_text(transcript)
assert summary == "Film zawiera angielską transkrypcję."
@pytest.mark.asyncio
async def test_integration_no_transcript_available():
# 1. Symulacja braku jakiejkolwiek transkrypcji
mock_response = MagicMock()
mock_response.status = 404 # Brak transkrypcji
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# 2. Proces integracji - powinien zakończyć się na etapie transkrypcji
video_id = "no_transcript_123"
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Próba pobrania transkrypcji powinna rzucić NoTranscriptFound
with pytest.raises(NoTranscriptFound):
transcript = await get_transcript(video_id, ["pl", "en"])
# 3. Ponieważ nie udało się pobrać transkrypcji, nie testujemy dalej

View File

@ -0,0 +1,135 @@
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
import sys
import os
# Dodanie katalogu nadrzędnego do ścieżki dla importów
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.openai_utils import summarize_text
@pytest.mark.asyncio
async def test_summarize_text_success():
# Przygotowanie mocka dla odpowiedzi OpenAI
mock_choice = MagicMock()
mock_message = MagicMock()
mock_message.content = "To jest przykładowe streszczenie filmu."
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
# Testowy tekst transkrypcji
test_transcript = "To jest przykładowa transkrypcja filmu YouTube, " \
"która zostanie wysłana do API OpenAI w celu streszczenia. " \
"Zawiera kilka zdań o różnej tematyce."
# Test funkcji z mockiem
with patch('src.openai_utils.client.chat.completions.create',
new=AsyncMock(return_value=mock_response)) as mock_create:
result = await summarize_text(test_transcript)
# Sprawdzenie wyników
assert result == "To jest przykładowe streszczenie filmu."
assert mock_create.called
# Sprawdzenie czy parametry zostały przekazane poprawnie
call_args = mock_create.call_args[1]
assert call_args['model'] == "gpt-3.5-turbo"
assert call_args['temperature'] == 0.5
assert call_args['max_tokens'] == 150
# Sprawdzenie wiadomości
messages = call_args['messages']
assert len(messages) == 2
assert messages[0]['role'] == "system"
assert messages[1]['role'] == "user"
assert test_transcript in messages[1]['content'] # Sprawdzenie czy transkrypcja jest w prompcie
@pytest.mark.asyncio
async def test_summarize_text_with_transcript_from_youtube():
# Przygotowanie mocka dla odpowiedzi OpenAI
mock_choice = MagicMock()
mock_message = MagicMock()
mock_message.content = "Film przedstawia dyskusję na temat sztucznej inteligencji i jej zastosowań w życiu codziennym."
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
# Przykładowa transkrypcja z YouTube
youtube_transcript = "To jest przykładowa transkrypcja z YouTube. " \
"Zawiera dyskusję o sztucznej inteligencji. " \
"Przedstawia różne zastosowania AI w codziennym życiu."
# Test funkcji
with patch('src.openai_utils.client.chat.completions.create',
new=AsyncMock(return_value=mock_response)) as mock_create:
result = await summarize_text(youtube_transcript)
# Sprawdzenie wyników
assert result == "Film przedstawia dyskusję na temat sztucznej inteligencji i jej zastosowań w życiu codziennym."
assert mock_create.called
@pytest.mark.asyncio
async def test_summarize_empty_text():
# Test funkcji z pustym tekstem
with patch('src.openai_utils.client.chat.completions.create') as mock_create:
result = await summarize_text("")
# Sprawdzenie wyników
assert result is None
assert not mock_create.called
@pytest.mark.asyncio
async def test_summarize_text_api_error():
# Symulacja błędu API
with patch('src.openai_utils.client.chat.completions.create',
new=AsyncMock(side_effect=Exception("API Error"))) as mock_create:
# Test funkcji
result = await summarize_text("Jakiś tekst transkrypcji")
# Sprawdzenie wyników
assert result is None
assert mock_create.called
@pytest.mark.asyncio
async def test_summarize_text_prompt_format():
# Przygotowanie mocka
mock_choice = MagicMock()
mock_message = MagicMock()
mock_message.content = "Streszczenie"
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
# Testowy tekst
test_transcript = "Testowa transkrypcja"
# Przygotowanie spy do przechwycenia argumentów
with patch('src.openai_utils.client.chat.completions.create',
new=AsyncMock(return_value=mock_response)) as mock_create:
# Test funkcji
await summarize_text(test_transcript)
# Sprawdzenie czy został wywołany
assert mock_create.called
# Pobranie argumentów wywołania
call_args = mock_create.call_args[1]
messages = call_args['messages']
# Sprawdzenie czy prompt zawiera odpowiedni format
user_prompt = messages[1]['content']
assert "Streść poniższy transkrypt filmu z YouTube" in user_prompt
assert "Transkrypt:" in user_prompt
assert test_transcript in user_prompt

View File

@ -0,0 +1,253 @@
import pytest
import os
import sys
import asyncio
import logging
from dotenv import load_dotenv
# Dodanie katalogu nadrzędnego do ścieżki dla importów
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.youtube_utils import (
extract_youtube_urls,
extract_video_id,
get_transcript,
NoTranscriptFound,
TranscriptsDisabled,
APITokenMissing,
AuthorizationError,
APIConnectionError,
APIResponseError,
NoTranscriptLanguagesAvailable,
YouTubeUtilsError
)
from src.openai_utils import summarize_text
# Konfiguracja logowania dla testów
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Ładowanie zmiennych środowiskowych z pliku .env (jeśli istnieje)
load_dotenv()
# Zmiana na bardziej popularne wideo, które powinno być bardziej stabilne
# "What makes a good life? Lessons from the longest study on happiness | Robert Waldinger"
# TED Talk - bardzo popularne wystąpienie z napisami w wielu językach (>42M wyświetleń)
TEST_VIDEO_URL = "https://www.youtube.com/watch?v=8KkKuTCFvzI"
TEST_VIDEO_ID = extract_video_id(TEST_VIDEO_URL)
TEST_LANGUAGES = ["pl", "en"] # Preferujemy polską transkrypcję, potem angielską
# Drugi film zapasowy (na wypadek problemów z pierwszym)
BACKUP_VIDEO_URL = "https://www.youtube.com/watch?v=jNQXAC9IVRw" # Me at the zoo - pierwszy film na YouTube
BACKUP_VIDEO_ID = extract_video_id(BACKUP_VIDEO_URL)
# Oznaczamy testy jako "slow", żeby można było je pominąć za pomocą --skip-slow
# lub --skip-integration podczas uruchamiania testów
pytestmark = [pytest.mark.integration, pytest.mark.slow]
@pytest.mark.asyncio
async def test_real_extract_youtube_urls():
# Test na rzeczywistym tekście z linkami YouTube
text = """Sprawdź te filmy:
- https://www.youtube.com/watch?v=8KkKuTCFvzI
- https://youtu.be/iCvmsMzlF7o
- Również shorts: https://www.youtube.com/shorts/pFaEGmxQFnM"""
urls = extract_youtube_urls(text)
assert len(urls) == 3
assert "https://www.youtube.com/watch?v=8KkKuTCFvzI" in urls
assert "https://youtu.be/iCvmsMzlF7o" in urls
assert "https://www.youtube.com/shorts/pFaEGmxQFnM" in urls
@pytest.mark.asyncio
async def test_real_extract_video_id():
# Test na rzeczywistych URL-ach
assert extract_video_id("https://www.youtube.com/watch?v=8KkKuTCFvzI") == "8KkKuTCFvzI"
assert extract_video_id("https://youtu.be/iCvmsMzlF7o") == "iCvmsMzlF7o"
assert extract_video_id("https://www.youtube.com/shorts/pFaEGmxQFnM") == "pFaEGmxQFnM"
@pytest.mark.asyncio
async def test_real_get_transcript_and_title():
"""Test pobierania rzeczywistej transkrypcji i tytułu"""
# Ten test wymaga dostępu do internetu i tokenu API youtube-transcript.io
# Sprawdź czy mamy token API
from src.config import YOUTUBE_TRANSCRIPT_API_TOKEN
if not YOUTUBE_TRANSCRIPT_API_TOKEN:
pytest.skip("Brak tokenu API youtube-transcript.io")
# Wypróbuj główne wideo
logger.info(f"Próbuję pobrać transkrypcję dla głównego wideo: {TEST_VIDEO_ID}")
try:
# Spróbuj z pierwszym filmem
transcript, title = await get_transcript(TEST_VIDEO_ID, TEST_LANGUAGES)
assert len(transcript) > 100 # Upewniamy się, że transkrypcja ma sensowną długość
assert len(title) > 0 # Tytuł powinien być niepusty
# Wypiszmy tytuł i fragment transkrypcji do weryfikacji
logger.info(f"Tytuł filmu {TEST_VIDEO_ID}: {title}")
logger.info(f"Fragment transkrypcji: {transcript[:50]}...")
except (NoTranscriptFound, TranscriptsDisabled) as e:
logger.warning(f"Nie udało się pobrać transkrypcji dla {TEST_VIDEO_ID}: {e}")
logger.info(f"Próbuję film zapasowy: {BACKUP_VIDEO_ID}")
# Spróbuj z zapasowym filmem
try:
transcript, title = await get_transcript(BACKUP_VIDEO_ID, TEST_LANGUAGES)
assert len(transcript) > 10 # Film zapasowy "Me at the zoo" ma bardzo krótką transkrypcję
assert len(title) > 0
# Wypiszmy tytuł i fragment transkrypcji do weryfikacji
logger.info(f"Tytuł filmu zapasowego {BACKUP_VIDEO_ID}: {title}")
logger.info(f"Fragment transkrypcji zapasowej: {transcript[:50]}...")
except (NoTranscriptFound, TranscriptsDisabled) as e:
logger.error(f"Nie można pobrać transkrypcji dla żadnego z testowych filmów: {e}")
pytest.skip(f"Nie można pobrać transkrypcji dla żadnego z testowych filmów: {e}")
except (APITokenMissing, AuthorizationError) as e:
logger.error(f"Błąd autoryzacji: {e}")
pytest.skip(f"Błąd autoryzacji API: {e}")
except (APIConnectionError, APIResponseError) as e:
logger.error(f"Błąd API: {e}")
pytest.skip(f"Błąd API: {e}")
except Exception as e:
logger.error(f"Nieoczekiwany błąd podczas pobierania transkrypcji: {e}", exc_info=True)
pytest.skip(f"Nieoczekiwany błąd podczas pobierania transkrypcji: {e}")
@pytest.mark.asyncio
@pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY"), reason="Wymaga klucza API OpenAI")
async def test_real_summarize_text():
# Test rzeczywistego streszczania tekstu z OpenAI API
# Ten test wymaga klucza API OpenAI
# Użyjmy krótkiej transkrypcji dla oszczędności tokenów
short_transcript = """
Chciałbym porozmawiać o tym, co sprawia, że życie jest dobre i wartościowe.
Przeprowadziliśmy jedno z najdłuższych badań nad szczęściem - trwające ponad 75 lat.
Badaliśmy życie tych samych osób od czasu, gdy byli nastolatkami, do starości.
Wniosek? Dobre relacje z innymi ludźmi kluczem do szczęścia i zdrowia.
Nie pieniądze, nie sława, nie ciężka praca, ale jakość naszych relacji z bliskimi.
Osoby, które miały dobre relacje z rodziną, przyjaciółmi i społecznością,
były szczęśliwsze, zdrowsze i żyły dłużej.
"""
logger.info("Próbuję streszczenie transkrypcji za pomocą OpenAI API")
try:
summary = await summarize_text(short_transcript)
assert summary is not None
assert len(summary) > 0
logger.info(f"Wygenerowane streszczenie: {summary}")
except Exception as e:
logger.error(f"Problem z API OpenAI: {e}", exc_info=True)
pytest.skip(f"Problem z API OpenAI: {e}")
@pytest.mark.asyncio
@pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY") or not os.environ.get("YOUTUBE_TRANSCRIPT_API_TOKEN"),
reason="Wymaga klucza API OpenAI i tokenu API youtube-transcript.io")
async def test_end_to_end_integration():
"""
Ten test wykonuje pełną integrację od linku YouTube do streszczenia.
Wymaga dostępu do internetu, tokenu API youtube-transcript.io oraz klucza API OpenAI.
"""
logger.info("Rozpoczynam test integracyjny end-to-end")
# Testujemy na klasycznym filmie "Me at the zoo", który jest krótki i ma dostępne transkrypcje
text_with_url = f"Sprawdź ten film: {BACKUP_VIDEO_URL}"
# 1. Wydobycie URL
urls = extract_youtube_urls(text_with_url)
assert len(urls) == 1
url = urls[0]
logger.info(f"Znaleziono URL: {url}")
# 2. Wydobycie ID filmu
video_id = extract_video_id(url)
assert video_id == BACKUP_VIDEO_ID
logger.info(f"ID filmu: {video_id}")
# 3. Pobranie transkrypcji i tytułu
try:
logger.info(f"Próbuję pobrać transkrypcję dla {video_id}")
transcript, title = await get_transcript(video_id, TEST_LANGUAGES)
logger.info(f"Pobrano tytuł filmu: {title}")
logger.info(f"Długość transkrypcji: {len(transcript)} znaków")
logger.info(f"Fragment transkrypcji: {transcript[:150]}...")
# Weryfikacja tytułu dla "Me at the zoo"
assert "Me at the zoo" in title
except (NoTranscriptFound, TranscriptsDisabled, APIConnectionError, APIResponseError) as e:
logger.warning(f"Nie udało się pobrać transkrypcji dla {video_id}: {e}")
# Jeśli wystąpił wyjątek, spróbuj z głównym filmem
logger.info(f"Próbuję główny film: {TEST_VIDEO_ID}")
try:
transcript, title = await get_transcript(TEST_VIDEO_ID, TEST_LANGUAGES)
logger.info(f"Pobrano tytuł głównego filmu: {title}")
logger.info(f"Długość transkrypcji: {len(transcript)} znaków")
logger.info(f"Fragment transkrypcji: {transcript[:150]}...")
except (NoTranscriptFound, TranscriptsDisabled, APIConnectionError, APIResponseError) as e:
logger.error(f"Nie można pobrać transkrypcji dla żadnego z testowych filmów: {e}")
pytest.skip(f"Nie można pobrać transkrypcji dla żadnego z testowych filmów: {e}")
# 4. Skrócenie transkrypcji dla oszczędności tokenów (tylko dla testu)
# "Me at the zoo" ma wystarczająco krótką transkrypcję, więc możemy użyć całości
logger.info(f"Długość oryginalnej transkrypcji: {len(transcript)} znaków")
# 5. Wygenerowanie streszczenia
try:
logger.info("Próbuję wygenerować streszczenie")
summary = await summarize_text(transcript)
assert summary is not None
assert len(summary) > 10
logger.info(f"Wygenerowane streszczenie: {summary}")
except Exception as e:
logger.error(f"Błąd podczas generowania streszczenia: {e}", exc_info=True)
pytest.skip(f"Błąd podczas generowania streszczenia: {e}")
if __name__ == "__main__":
# Możliwość uruchomienia testów bezpośrednio z tego pliku
asyncio.run(test_real_extract_youtube_urls())
asyncio.run(test_real_extract_video_id())
# Testy wymagające połączenia z YouTube
try:
asyncio.run(test_real_get_transcript_and_title())
except Exception as e:
logger.error(f"Problemy z połączeniem z YouTube API: {e}", exc_info=True)
# Testy wymagające klucza API OpenAI
if os.environ.get("OPENAI_API_KEY"):
try:
asyncio.run(test_real_summarize_text())
if os.environ.get("YOUTUBE_TRANSCRIPT_API_TOKEN"):
asyncio.run(test_end_to_end_integration())
else:
logger.warning("Pominięto test end-to-end (brak tokenu API youtube-transcript.io)")
except Exception as e:
logger.error(f"Problemy z OpenAI API: {e}", exc_info=True)
else:
logger.warning("Pominięto testy OpenAI (brak klucza API)")

View File

@ -0,0 +1,462 @@
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
import sys
import os
import json
# Dodanie katalogu nadrzędnego do ścieżki dla importów
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.youtube_utils import (
extract_youtube_urls,
extract_video_id,
get_transcript,
NoTranscriptFound,
TranscriptsDisabled,
APITokenMissing,
AuthorizationError,
APIConnectionError,
APIResponseError,
NoTranscriptLanguagesAvailable,
YouTubeUtilsError
)
# Testy dla funkcji extract_youtube_urls
def test_extract_youtube_urls():
# Test dla standardowych linków YouTube
text = "Sprawdź to wideo https://www.youtube.com/watch?v=abc123 i to https://youtu.be/xyz789"
result = extract_youtube_urls(text)
assert len(result) == 2
assert "https://www.youtube.com/watch?v=abc123" in result
assert "https://youtu.be/xyz789" in result
# Test dla YouTube Shorts
text = "Ten shorts jest super https://www.youtube.com/shorts/def456"
result = extract_youtube_urls(text)
assert len(result) == 1
assert "https://www.youtube.com/shorts/def456" in result
# Test dla mobilnej wersji YouTube
text = "Link mobilny: https://m.youtube.com/watch?v=mob123"
result = extract_youtube_urls(text)
assert len(result) == 1
assert "https://m.youtube.com/watch?v=mob123" in result
# Test dla pustego tekstu
result = extract_youtube_urls("")
assert result == []
# Test dla tekstu bez linków YouTube
text = "To jest zwykły tekst bez linków do YouTube"
result = extract_youtube_urls(text)
assert result == []
# Testy dla funkcji extract_video_id
def test_extract_video_id():
# Test dla standardowego URL
url = "https://www.youtube.com/watch?v=abc123"
assert extract_video_id(url) == "abc123"
# Test dla skróconego URL
url = "https://youtu.be/xyz789"
assert extract_video_id(url) == "xyz789"
# Test dla YouTube Shorts
url = "https://www.youtube.com/shorts/def456"
assert extract_video_id(url) == "def456"
# Test dla URL embed
url = "https://www.youtube.com/embed/embed123"
assert extract_video_id(url) == "embed123"
# Test dla starego formatu URL
url = "https://www.youtube.com/v/old456"
assert extract_video_id(url) == "old456"
# Test dla nieprawidłowego URL
url = "https://example.com/not-youtube"
assert extract_video_id(url) is None
# Testy dla funkcji get_transcript
@pytest.mark.asyncio
async def test_get_transcript_success():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API w nowym formacie
mock_json_data = [
{
"id": "abc123",
"title": "Przykładowy tytuł wideo",
"tracks": [
{
"language": "Polish",
"transcript": [
{"text": "To jest", "start": "0.0", "dur": "1.0"},
{"text": "przykładowa transkrypcja", "start": "1.0", "dur": "2.0"}
]
},
{
"language": "English",
"transcript": [
{"text": "This is", "start": "0.0", "dur": "1.0"},
{"text": "sample transcript", "start": "1.0", "dur": "2.0"}
]
}
],
"languages": [
{"label": "Polish", "languageCode": "pl"},
{"label": "English", "languageCode": "en"}
]
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test
transcript, title = await get_transcript("abc123", ["pl", "en"])
# Sprawdzenie wyników
assert transcript == "To jest przykładowa transkrypcja"
assert title == "Przykładowy tytuł wideo"
# Upewnij się, że API zostało wywołane z poprawnymi parametrami
mock_client_session.post.assert_called_once()
@pytest.mark.asyncio
async def test_get_transcript_english_fallback():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API zawierająca tylko transkrypcję angielską
mock_json_data = [
{
"id": "abc123",
"title": "Sample video title",
"tracks": [
{
"language": "English",
"transcript": [
{"text": "This is", "start": "0.0", "dur": "1.0"},
{"text": "sample transcript", "start": "1.0", "dur": "2.0"}
]
}
],
"languages": [
{"label": "English", "languageCode": "en"}
]
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - preferujemy polski, ale dostępny jest tylko angielski
transcript, title = await get_transcript("abc123", ["pl", "en"])
# Sprawdzenie wyników - powinniśmy otrzymać angielską transkrypcję jako fallback
assert transcript == "This is sample transcript"
assert title == "Sample video title"
@pytest.mark.asyncio
async def test_get_transcript_no_title():
# Mock dla odpowiedzi z API youtube-transcript.io bez tytułu
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API bez tytułu
mock_json_data = [
{
"id": "abc123",
"tracks": [
{
"language": "Polish",
"transcript": [
{"text": "To jest", "start": "0.0", "dur": "1.0"},
{"text": "przykładowa transkrypcja", "start": "1.0", "dur": "2.0"}
]
}
],
"languages": [
{"label": "Polish", "languageCode": "pl"}
]
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test z odpowiedzią bez tytułu
transcript, title = await get_transcript("abc123", ["pl", "en"])
# Sprawdzenie wyników
assert transcript == "To jest przykładowa transkrypcja"
assert title == "" # Pusty tytuł
@pytest.mark.asyncio
async def test_get_transcript_no_transcript_found():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API zawierająca błąd
mock_json_data = [
{
"id": "abc123",
"error": "No transcript found for this video"
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić NoTranscriptFound
with pytest.raises(NoTranscriptFound):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_disabled():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API informująca o wyłączonych transkrypcjach
mock_json_data = [
{
"id": "abc123",
"error": "Transcriptions disabled for this video"
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić TranscriptsDisabled
with pytest.raises(TranscriptsDisabled):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_auth_error():
# Mock dla odpowiedzi z API youtube-transcript.io
mock_response = MagicMock()
mock_response.status = 401
mock_response.text = AsyncMock(return_value="Unauthorized access")
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić AuthorizationError
with pytest.raises(AuthorizationError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_no_api_token():
# Test gdy brak tokenu API
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', None):
with pytest.raises(APITokenMissing):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_connection_error():
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_client_session = MagicMock()
mock_client_session.post.side_effect = Exception("Connection error")
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić APIConnectionError
with pytest.raises(APIConnectionError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_invalid_json():
# Mock dla odpowiedzi z API youtube-transcript.io z nieprawidłowym JSON
mock_response = MagicMock()
mock_response.status = 200
mock_response.json.side_effect = json.JSONDecodeError("Invalid JSON", "{", 0)
mock_response.text = AsyncMock(return_value="{invalid json")
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić APIResponseError
with pytest.raises(APIResponseError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_empty_response():
# Mock dla odpowiedzi z API youtube-transcript.io z pustą tablicą
mock_response = MagicMock()
mock_response.status = 200
# Konfiguracja asynchronicznego mocka z pustą odpowiedzią
mock_response.json = AsyncMock(return_value=[])
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić APIResponseError
with pytest.raises(APIResponseError):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_no_tracks():
# Mock dla odpowiedzi z API youtube-transcript.io bez ścieżek transkrypcji
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API bez ścieżek transkrypcji
mock_json_data = [
{
"id": "abc123",
"title": "Wideo bez transkrypcji"
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić NoTranscriptFound
with pytest.raises(NoTranscriptFound):
await get_transcript("abc123", ["pl", "en"])
@pytest.mark.asyncio
async def test_get_transcript_empty_tracks():
# Mock dla odpowiedzi z API youtube-transcript.io z pustą tablicą ścieżek
mock_response = MagicMock()
mock_response.status = 200
# Przykładowa odpowiedź z API z pustą tablicą ścieżek
mock_json_data = [
{
"id": "abc123",
"title": "Wideo bez transkrypcji",
"tracks": []
}
]
# Konfiguracja asynchronicznego mocka
mock_response.json = AsyncMock(return_value=mock_json_data)
# Mock dla kontekstowego managera aiohttp.ClientSession().post()
mock_session = MagicMock()
mock_session.__aenter__.return_value = mock_response
mock_client_session = MagicMock()
mock_client_session.post.return_value = mock_session
mock_client_session.__aenter__.return_value = mock_client_session
# Używamy kontekstowego managera dla patcha
with patch('src.youtube_utils.YOUTUBE_TRANSCRIPT_API_TOKEN', "fake_token"):
with patch('aiohttp.ClientSession', return_value=mock_client_session):
# Test - powinien rzucić NoTranscriptFound
with pytest.raises(NoTranscriptFound):
await get_transcript("abc123", ["pl", "en"])

133
tox.ini 100644
View File

@ -0,0 +1,133 @@
# Tox (https://tox.readthedocs.io/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# tests suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py311
toxworkdir={toxinidir}/.tox
[testenv]
deps =
# Zależności z pyproject.toml
python-telegram-bot[job-queue] >= 20.0
pytube >= 15.0.0
openai >= 1.0.0
asyncpg >= 0.27.0
python-dotenv >= 1.0.0
httpx >= 0.24.0
aiohttp >= 3.9.0
# Zależności testowe
pytest
pytest-asyncio
pytest-cov
pass_env =
OPENAI_API_KEY
TELEGRAM_BOT_TOKEN
DATABASE_URL
YOUTUBE_TRANSCRIPT_API_TOKEN
commands =
python ./main.py
[testenv:unit-tests]
deps =
# Zależności z pyproject.toml
python-telegram-bot[job-queue] >= 20.0
pytube >= 15.0.0
openai >= 1.0.0
asyncpg >= 0.27.0
python-dotenv >= 1.0.0
httpx >= 0.24.0
aiohttp >= 3.9.0
# Zależności testowe
pytest
pytest-asyncio
pytest-cov
pass_env =
OPENAI_API_KEY
TELEGRAM_BOT_TOKEN
DATABASE_URL
YOUTUBE_TRANSCRIPT_API_TOKEN
commands =
# Uruchamianie testów jednostkowych używając pytest-asyncio zamiast unittest
# aby poprawnie obsługiwać testy asynchroniczne
python -m pytest tests/ -k "not integration and not slow"
[testenv:integration-tests]
deps =
# Zależności z pyproject.toml
python-telegram-bot[job-queue] >= 20.0
pytube >= 15.0.0
openai >= 1.0.0
asyncpg >= 0.27.0
python-dotenv >= 1.0.0
httpx >= 0.24.0
aiohttp >= 3.9.0
# Zależności testowe
pytest
pytest-asyncio
pytest-cov
setenv =
# Konfiguracja logowania dla testów
PYTHONASYNCIODEBUG = 1 # Pomaga w wykrywaniu problemów z asynchronicznym kodem
PYTHONFAULTHANDLER = 1 # Pomaga w diagnozowaniu segmentation faults
pass_env =
OPENAI_API_KEY
TELEGRAM_BOT_TOKEN
DATABASE_URL
YOUTUBE_TRANSCRIPT_API_TOKEN
commands =
# Uruchamianie testów integracyjnych z rozszerzonym logowaniem
python -m pytest tests/test_real_integration.py -v --log-cli-level=DEBUG
[testenv:all-tests]
deps =
# Zależności z pyproject.toml
python-telegram-bot[job-queue] >= 20.0
pytube >= 15.0.0
openai >= 1.0.0
asyncpg >= 0.27.0
python-dotenv >= 1.0.0
httpx >= 0.24.0
aiohttp >= 3.9.0
# Zależności testowe
pytest
pytest-asyncio
pytest-cov
setenv =
# Konfiguracja logowania dla testów
PYTHONASYNCIODEBUG = 1
PYTHONFAULTHANDLER = 1
pass_env =
OPENAI_API_KEY
TELEGRAM_BOT_TOKEN
DATABASE_URL
YOUTUBE_TRANSCRIPT_API_TOKEN
commands =
# Uruchamianie wszystkich testów
python -m pytest tests/ --log-cli-level=INFO
[testenv:cov]
deps =
# Zależności z pyproject.toml
python-telegram-bot[job-queue] >= 20.0
pytube >= 15.0.0
openai >= 1.0.0
asyncpg >= 0.27.0
python-dotenv >= 1.0.0
httpx >= 0.24.0
aiohttp >= 3.9.0
# Zależności testowe
pytest
pytest-asyncio
pytest-cov
pass_env =
OPENAI_API_KEY
TELEGRAM_BOT_TOKEN
DATABASE_URL
YOUTUBE_TRANSCRIPT_API_TOKEN
commands =
# Uruchamianie testów z pokryciem kodu
python -m pytest --cov=src tests/ --log-cli-level=INFO