Курс 2025

Разработка Чат-ботов: От Прототипа до Продакшена

С FastAPI, Redis, SQLite, Docker Compose и Aiogram

📚 Оглавление

1) Введение и архитектура

Этот курс посвящен созданию надежных, масштабируемых и поддерживаемых чат-ботов с использованием современного стека технологий: FastAPI для создания API, Redis для управления состоянием и очередями, SQLite для персистентного хранения данных, Docker Compose для оркестрации сервисов и Aiogram для разработки Telegram-ботов.

Почему именно этот стек?

  • FastAPI: Современный, быстрый веб-фреймворк с автоматической генерацией документации.
  • Redis: Высокопроизводительное хранилище ключ-значение для состояния, кэширования и очередей.
  • SQLite: Легковесная встраиваемая СУБД для персистентности данных.
  • Docker Compose: Упрощает развертывание и управление многоконтейнерными приложениями.
  • Aiogram: Асинхронная библиотека для создания Telegram-ботов.

Когда использовать этот подход

  • Нужен масштабируемый и поддерживаемый чат-бот.
  • Требуется разделение бизнес-логики и интерфейса бота.
  • Необходимо хранить состояние пользователей и историю взаимодействий.
  • Планируется интеграция с внешними API и сервисами.
  • Важна возможность тестирования и деплоя.

Архитектура системы

graph TD A[Пользователь Telegram] --> B[Aiogram Бот] B --> C[FastAPI API] C --> D[Бизнес-логика] D --> E[Redis] D --> F[SQLite] D --> G[LangChain/LMM]

Дорожная карта обучения

Настройка окружения
FastAPI API
SQLite данные
Redis кэш/состояние
Aiogram бот
Docker Compose
Интеграция
Тестирование
Деплой

2) Настройка окружения и основы

Установка необходимых инструментов

# Установка Python (если еще не установлен)
# Скачайте с python.org или используйте менеджер пакетов

# Установка Docker и Docker Compose
# Следуйте инструкциям на https://docs.docker.com/get-docker/
# и https://docs.docker.com/compose/install/

# Проверка установки
python --version
docker --version
docker-compose --version

Создание виртуального окружения Python

# Создание виртуального окружения
python -m venv chatbot-env

# Активация (Linux/macOS)
source chatbot-env/bin/activate

# Активация (Windows)
chatbot-env\Scripts\activate

# Обновление pip
pip install --upgrade pip

Базовые команды Docker/Docker Compose

# Запуск сервисов
docker-compose up

# Запуск в фоновом режиме
docker-compose up -d

# Остановка сервисов
docker-compose down

# Просмотр логов
docker-compose logs

# Выполнение команды в контейнере
docker-compose exec service_name command

Создание структуры проекта

mkdir chatbot-project
cd chatbot-project

# Создание структуры каталогов
mkdir -p app app/api app/bot app/models app/schemas app/database app/core tests

# Создание файлов
touch app/__init__.py
touch app/main.py
touch app/api/__init__.py
touch app/bot/__init__.py
touch app/bot/bot.py
touch app/models/__init__.py
touch app/schemas/__init__.py
touch app/database/__init__.py
touch app/database/database.py
touch app/core/__init__.py
touch app/core/config.py
touch docker-compose.yml
touch Dockerfile
touch requirements.txt
touch .env
touch .gitignore

3) FastAPI: Создание RESTful API

Установка зависимостей

pip install fastapi uvicorn pydantic

Основы FastAPI: маршруты и модели Pydantic

# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI(title="Chatbot API", version="1.0.0")

class User(BaseModel):
    id: int
    username: str
    first_name: Optional[str] = None
    last_name: Optional[str] = None

class Message(BaseModel):
    id: int
    user_id: int
    text: str
    timestamp: str

# Пример маршрутов
@app.get("/")
def read_root():
    return {"message": "Добро пожаловать в Chatbot API"}

@app.get("/users/", response_model=List[User])
def read_users():
    return [
        User(id=1, username="user1", first_name="Иван", last_name="Иванов"),
        User(id=2, username="user2", first_name="Мария", last_name="Петрова")
    ]

@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int):
    return User(id=user_id, username="user1", first_name="Иван", last_name="Иванов")

@app.post("/messages/", response_model=Message)
def create_message(message: Message):
    return message

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Автоматическая документация (Swagger)

FastAPI автоматически генерирует интерактивную документацию:

Валидация входных данных

# app/schemas/user.py
from pydantic import BaseModel, Field, validator
from typing import Optional

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    first_name: Optional[str] = Field(None, max_length=50)
    last_name: Optional[str] = Field(None, max_length=50)
    email: str
    
    @validator('email')
    def validate_email(cls, v):
        if '@' not in v:
            raise ValueError('Некорректный email')
        return v

class UserUpdate(BaseModel):
    first_name: Optional[str] = Field(None, max_length=50)
    last_name: Optional[str] = Field(None, max_length=50)

Middleware и обработка ошибок

# app/main.py (дополнение)
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import logging

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Chatbot API", version="1.0.0")

# Добавление CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Обработчики ошибок
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
    logger.error(f"HTTP error: {exc.detail}")
    return {"error": exc.detail}

@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
    logger.error(f"General error: {str(exc)}")
    return {"error": "Internal server error"}

4) SQLite: Локальное хранение данных

Установка зависимостей

pip install sqlalchemy aiosqlite

Основы SQL и SQLite

SQLite - это легковесная встраиваемая СУБД, которая хранит всю базу данных в одном файле. Она идеально подходит для разработки и небольших приложений.

Использование SQLAlchemy (ORM) с FastAPI

# app/database/database.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.sql import func
import os

# Создание базы данных в памяти для разработки или файл для продакшена
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./chatbot.db")

engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# Получение сессии базы данных
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
# app/models/user.py
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from app.database.database import Base

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    telegram_id = Column(Integer, unique=True, index=True)
    username = Column(String, index=True)
    first_name = Column(String)
    last_name = Column(String)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# app/models/message.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database.database import Base

class Message(Base):
    __tablename__ = "messages"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    text = Column(String)
    timestamp = Column(DateTime(timezone=True), server_default=func.now())
    
    # Связь с пользователем
    user = relationship("User", back_populates="messages")
# app/models/user.py (дополнение)
from sqlalchemy.orm import relationship

class User(Base):
    # ... предыдущий код ...
    
    # Связь с сообщениями
    messages = relationship("Message", back_populates="user")

CRUD операции через API

# app/api/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.database.database import get_db
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate

router = APIRouter(prefix="/users", tags=["users"])

@router.get("/", response_model=List[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = db.query(User).offset(skip).limit(limit).all()
    return users

@router.get("/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="Пользователь не найден")
    return db_user

@router.post("/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = User(**user.dict())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@router.put("/{user_id}", response_model=User)
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="Пользователь не найден")
    
    for key, value in user.dict(exclude_unset=True).items():
        setattr(db_user, key, value)
    
    db.commit()
    db.refresh(db_user)
    return db_user

@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="Пользователь не найден")
    
    db.delete(db_user)
    db.commit()
    return {"message": "Пользователь удален"}

5) Redis: Состояние, кэш и очереди

Установка зависимостей

pip install redis

Основы Redis: ключ-значение, структуры данных

Redis - это высокопроизводительное хранилище ключ-значение в памяти, которое поддерживает различные структуры данных, такие как строки, хэши, списки, множества и отсортированные множества.

Подключение к Redis

# app/core/config.py
import os

class Settings:
    DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./chatbot.db")
    REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
    TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "")

settings = Settings()
# app/core/redis_client.py
import redis
import json
from app.core.config import settings

# Создание клиента Redis
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)

def set_user_state(user_id: int, state: str):
    """Сохранение состояния пользователя"""
    redis_client.set(f"user:{user_id}:state", state)

def get_user_state(user_id: int) -> str:
    """Получение состояния пользователя"""
    return redis_client.get(f"user:{user_id}:state") or "default"

def set_user_data(user_id: int, key: str, value):
    """Сохранение пользовательских данных"""
    redis_client.hset(f"user:{user_id}:data", key, json.dumps(value))

def get_user_data(user_id: int, key: str):
    """Получение пользовательских данных"""
    value = redis_client.hget(f"user:{user_id}:data", key)
    return json.loads(value) if value else None

def delete_user_data(user_id: int, key: str):
    """Удаление пользовательских данных"""
    redis_client.hdel(f"user:{user_id}:data", key)

Использование Redis для хранения состояния FSM

# app/core/fsm.py
from app.core.redis_client import redis_client
import json

class FSMContext:
    def __init__(self, user_id: int):
        self.user_id = user_id
        self.storage_key = f"fsm:{user_id}"
    
    async def set_state(self, state: str):
        """Установка состояния"""
        redis_client.hset(self.storage_key, "state", state)
    
    async def get_state(self) -> str:
        """Получение состояния"""
        return redis_client.hget(self.storage_key, "state") or "*"
    
    async def set_data(self, data: dict):
        """Установка данных"""
        redis_client.hset(self.storage_key, "data", json.dumps(data))
    
    async def get_data(self) -> dict:
        """Получение данных"""
        data = redis_client.hget(self.storage_key, "data")
        return json.loads(data) if data else {}
    
    async def update_data(self, **kwargs):
        """Обновление данных"""
        current_data = await self.get_data()
        current_data.update(kwargs)
        await self.set_data(current_data)
    
    async def reset_state(self):
        """Сброс состояния"""
        redis_client.delete(self.storage_key)

Кэширование часто запрашиваемых данных

# app/core/cache.py
from app.core.redis_client import redis_client
import json
from typing import Optional

def cache_set(key: str, value, expiration: int = 3600):
    """Сохранение значения в кэш"""
    redis_client.setex(key, expiration, json.dumps(value))

def cache_get(key: str):
    """Получение значения из кэша"""
    value = redis_client.get(key)
    return json.loads(value) if value else None

def cache_delete(key: str):
    """Удаление значения из кэша"""
    redis_client.delete(key)

# Пример использования в API
from app.database.database import get_db
from app.models.user import User

def get_user_cached(user_id: int, db):
    """Получение пользователя с кэшированием"""
    # Попытка получить из кэша
    cached_user = cache_get(f"user:{user_id}")
    if cached_user:
        return cached_user
    
    # Если нет в кэше, получаем из БД
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user:
        user_data = {
            "id": db_user.id,
            "username": db_user.username,
            "first_name": db_user.first_name,
            "last_name": db_user.last_name
        }
        # Сохраняем в кэш на 10 минут
        cache_set(f"user:{user_id}", user_data, 600)
        return user_data
    
    return None

Очереди задач

# app/core/queue.py
from app.core.redis_client import redis_client
import json

def enqueue_task(queue_name: str, task_data: dict):
    """Добавление задачи в очередь"""
    redis_client.lpush(queue_name, json.dumps(task_data))

def dequeue_task(queue_name: str) -> Optional[dict]:
    """Получение задачи из очереди"""
    task = redis_client.brpop(queue_name, timeout=1)
    if task:
        return json.loads(task[1])
    return None

def get_queue_length(queue_name: str) -> int:
    """Получение длины очереди"""
    return redis_client.llen(queue_name)

# Пример задачи для обработки сообщений
def enqueue_message_processing(user_id: int, message_text: str):
    """Добавление задачи обработки сообщения в очередь"""
    task_data = {
        "user_id": user_id,
        "message_text": message_text,
        "timestamp": str(datetime.now())
    }
    enqueue_task("message_processing_queue", task_data)

6) Aiogram: Создание Telegram бота

Установка зависимостей

pip install aiogram python-dotenv

Основы Aiogram 3.x: диспетчер, роутеры, хэндлеры

# app/bot/bot.py
import asyncio
import logging
from aiogram import Bot, Dispatcher, Router, types
from aiogram.filters import Command
from aiogram.types import Message
from app.core.config import settings
from app.core.fsm import FSMContext

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Создание бота и диспетчера
bot = Bot(token=settings.TELEGRAM_TOKEN)
dp = Dispatcher()

# Создание роутера
router = Router()

@router.message(Command(commands=["start"]))
async def command_start_handler(message: Message) -> None:
    """Обработчик команды /start"""
    await message.answer(f"Привет, {message.from_user.full_name}! Я чат-бот.")

@router.message(Command(commands=["help"]))
async def command_help_handler(message: Message) -> None:
    """Обработчик команды /help"""
    help_text = """
Доступные команды:
/start - Начать работу
/help - Показать помощь
/status - Показать статус
"""
    await message.answer(help_text)

@router.message()
async def echo_handler(message: Message) -> None:
    """Эхо-обработчик для всех сообщений"""
    try:
        # Отправка сообщения в API для обработки
        # Здесь будет интеграция с FastAPI
        await message.answer(f"Вы сказали: {message.text}")
    except Exception as e:
        logger.error(f"Ошибка обработки сообщения: {e}")
        await message.answer("Произошла ошибка при обработке вашего сообщения.")

# Регистрация роутера
dp.include_router(router)

async def main() -> None:
    """Главная функция запуска бота"""
    logger.info("Запуск бота...")
    try:
        await dp.start_polling(bot)
    except Exception as e:
        logger.error(f"Ошибка запуска бота: {e}")
    finally:
        await bot.session.close()

if __name__ == "__main__":
    asyncio.run(main())

Получение и отправка сообщений

# app/bot/handlers/message_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
import aiohttp
from app.core.config import settings

router = Router()

@router.message(Command(commands=["status"]))
async def status_handler(message: Message):
    """Проверка статуса API"""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{settings.API_BASE_URL}/health") as response:
                if response.status == 200:
                    await message.answer("✅ API работает нормально")
                else:
                    await message.answer("❌ API недоступно")
    except Exception as e:
        await message.answer(f"❌ Ошибка подключения к API: {str(e)}")

@router.message()
async def message_handler(message: Message):
    """Обработчик всех текстовых сообщений"""
    user_id = message.from_user.id
    text = message.text
    
    # Сохранение сообщения в API
    try:
        async with aiohttp.ClientSession() as session:
            user_data = {
                "telegram_id": user_id,
                "username": message.from_user.username,
                "first_name": message.from_user.first_name,
                "last_name": message.from_user.last_name
            }
            
            # Создание или обновление пользователя
            async with session.post(
                f"{settings.API_BASE_URL}/users/", 
                json=user_data
            ) as response:
                if response.status == 200 or response.status == 409:  # 409 - уже существует
                    # Отправка сообщения в API
                    message_data = {
                        "user_id": user_id,
                        "text": text
                    }
                    async with session.post(
                        f"{settings.API_BASE_URL}/messages/", 
                        json=message_data
                    ) as msg_response:
                        if msg_response.status == 200:
                            result = await msg_response.json()
                            await message.answer(f"Сообщение получено: {result.get('id')}")
                        else:
                            await message.answer("Ошибка при сохранении сообщения")
                else:
                    await message.answer("Ошибка при регистрации пользователя")
    except Exception as e:
        await message.answer(f"Ошибка: {str(e)}")

Машина состояний (FSM) с использованием Redis

# app/bot/states.py
from enum import Enum

class UserState(Enum):
    DEFAULT = "default"
    WAITING_FOR_NAME = "waiting_for_name"
    WAITING_FOR_AGE = "waiting_for_age"
    PROFILE_COMPLETE = "profile_complete"

# app/bot/handlers/fsm_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext
from app.bot.states import UserState

router = Router()

@router.message(Command(commands=["register"]))
async def start_registration(message: Message, state: FSMContext):
    """Начало регистрации пользователя"""
    await state.set_state(UserState.WAITING_FOR_NAME)
    await message.answer("Пожалуйста, введите ваше имя:")

@router.message(lambda message: message.text and not message.text.startswith("/"))
async def process_name(message: Message, state: FSMContext):
    """Обработка имени пользователя"""
    current_state = await state.get_state()
    
    if current_state == UserState.WAITING_FOR_NAME.value:
        await state.update_data(name=message.text)
        await state.set_state(UserState.WAITING_FOR_AGE)
        await message.answer("Спасибо! Теперь введите ваш возраст:")
    
    elif current_state == UserState.WAITING_FOR_AGE.value:
        try:
            age = int(message.text)
            await state.update_data(age=age)
            await state.set_state(UserState.PROFILE_COMPLETE)
            
            # Получение всех данных
            user_data = await state.get_data()
            name = user_data.get("name")
            
            await message.answer(f"Регистрация завершена!\nИмя: {name}\nВозраст: {age}")
            await state.reset_state()
        except ValueError:
            await message.answer("Пожалуйста, введите корректный возраст (число):")
    
    else:
        await message.answer("Не понимаю. Используйте /register для начала регистрации.")

Inline-кнопки, клавиатуры

# app/bot/keyboards.py
from aiogram.types import ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton

# Обычная клавиатура
def get_main_keyboard():
    keyboard = ReplyKeyboardMarkup(
        keyboard=[
            [KeyboardButton(text="Профиль"), KeyboardButton(text="Настройки")],
            [KeyboardButton(text="Помощь"), KeyboardButton(text="О боте")]
        ],
        resize_keyboard=True
    )
    return keyboard

# Inline-клавиатура
def get_profile_keyboard():
    keyboard = InlineKeyboardMarkup(
        inline_keyboard=[
            [InlineKeyboardButton(text="Редактировать профиль", callback_data="edit_profile")],
            [InlineKeyboardButton(text="Удалить профиль", callback_data="delete_profile")],
            [InlineKeyboardButton(text="Назад", callback_data="back_to_main")]
        ]
    )
    return keyboard

# app/bot/handlers/callback_handlers.py
from aiogram import Router
from aiogram.types import CallbackQuery

router = Router()

@router.callback_query(lambda c: c.data == "edit_profile")
async def edit_profile_callback(callback: CallbackQuery):
    await callback.message.edit_text("Редактирование профиля...")
    await callback.answer()

@router.callback_query(lambda c: c.data == "delete_profile")
async def delete_profile_callback(callback: CallbackQuery):
    await callback.message.edit_text("Профиль удален.")
    await callback.answer()

@router.callback_query(lambda c: c.data == "back_to_main")
async def back_to_main_callback(callback: CallbackQuery):
    from app.bot.keyboards import get_main_keyboard
    await callback.message.edit_text("Главное меню", reply_markup=get_main_keyboard())
    await callback.answer()

7) Docker Compose: Оркестрация сервисов

Создание Dockerfile для приложения FastAPI

# Dockerfile
FROM python:3.11-slim

# Установка зависимостей системы
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Установка рабочей директории
WORKDIR /app

# Копирование файлов зависимостей
COPY requirements.txt .

# Установка Python зависимостей
RUN pip install --no-cache-dir -r requirements.txt

# Копирование исходного кода
COPY . .

# Создание директории для базы данных
RUN mkdir -p /app/data

# Открытие порта
EXPOSE 8000

# Команда запуска
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Создание Dockerfile для бота

# Dockerfile.bot
FROM python:3.11-slim

# Установка зависимостей системы
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Установка рабочей директории
WORKDIR /app

# Копирование файлов зависимостей
COPY requirements.txt .

# Установка Python зависимостей
RUN pip install --no-cache-dir -r requirements.txt

# Копирование исходного кода
COPY . .

# Команда запуска
CMD ["python", "app/bot/bot.py"]

Конфигурация docker-compose.yml

# docker-compose.yml
version: '3.8'

services:
  # База данных SQLite (в данном случае файл в volume)
  # Для SQLite не нужен отдельный сервис, но для других БД может понадобиться
  
  # Redis
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

  # FastAPI приложение
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=sqlite:///./data/chatbot.db
      - REDIS_URL=redis://redis:6379/0
    volumes:
      - ./data:/app/data
    depends_on:
      - redis
    restart: unless-stopped

  # Telegram бот
  bot:
    build:
      context: .
      dockerfile: Dockerfile.bot
    environment:
      - TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
      - API_BASE_URL=http://api:8000
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - api
      - redis
    restart: unless-stopped

volumes:
  redis_data:

Управление переменными окружения (.env)

# .env
TELEGRAM_TOKEN=ваш_токен_бота_от_@BotFather
DATABASE_URL=sqlite:///./data/chatbot.db
REDIS_URL=redis://localhost:6379/0
API_BASE_URL=http://localhost:8000
# app/core/config.py (обновленная версия)
import os
from dotenv import load_dotenv

load_dotenv()

class Settings:
    TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
    DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./data/chatbot.db")
    REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
    API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000")

    # Проверка обязательных переменных
    def validate(self):
        if not self.TELEGRAM_TOKEN:
            raise ValueError("TELEGRAM_TOKEN не установлен в .env файле")

settings = Settings()

Запуск, остановка, масштабирование

# Запуск всех сервисов
docker-compose up

# Запуск в фоновом режиме
docker-compose up -d

# Просмотр статуса сервисов
docker-compose ps

# Просмотр логов
docker-compose logs

# Просмотр логов конкретного сервиса
docker-compose logs api

# Остановка сервисов
docker-compose down

# Остановка и удаление volumes
docker-compose down -v

# Пересборка образов
docker-compose build

# Перезапуск конкретного сервиса
docker-compose restart bot

# Масштабирование (если нужно несколько экземпляров бота)
docker-compose up -d --scale bot=3

8) Интеграция компонентов

Подключение Aiogram бота к FastAPI API

# app/bot/services/api_client.py
import aiohttp
from typing import Optional, Dict, Any
from app.core.config import settings
import logging

logger = logging.getLogger(__name__)

class APIClient:
    def __init__(self):
        self.base_url = settings.API_BASE_URL
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def get_session(self) -> aiohttp.ClientSession:
        """Получение сессии HTTP клиента"""
        if self.session is None or self.session.closed:
            self.session = aiohttp.ClientSession()
        return self.session
    
    async def close(self):
        """Закрытие сессии"""
        if self.session and not self.session.closed:
            await self.session.close()
    
    async def create_user(self, user_data: Dict[str, Any]) -> Optional[Dict]:
        """Создание пользователя через API"""
        try:
            session = await self.get_session()
            async with session.post(f"{self.base_url}/users/", json=user_data) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    logger.error(f"Ошибка создания пользователя: {response.status}")
                    return None
        except Exception as e:
            logger.error(f"Исключение при создании пользователя: {e}")
            return None
    
    async def get_user(self, user_id: int) -> Optional[Dict]:
        """Получение пользователя через API"""
        try:
            session = await self.get_session()
            async with session.get(f"{self.base_url}/users/{user_id}") as response:
                if response.status == 200:
                    return await response.json()
                else:
                    return None
        except Exception as e:
            logger.error(f"Исключение при получении пользователя: {e}")
            return None
    
    async def create_message(self, message_data: Dict[str, Any]) -> Optional[Dict]:
        """Создание сообщения через API"""
        try:
            session = await self.get_session()
            async with session.post(f"{self.base_url}/messages/", json=message_data) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    logger.error(f"Ошибка создания сообщения: {response.status}")
                    return None
        except Exception as e:
            logger.error(f"Исключение при создании сообщения: {e}")
            return None

# Глобальный экземпляр клиента
api_client = APIClient()
# app/bot/bot.py (обновленная версия)
import asyncio
import logging
from aiogram import Bot, Dispatcher, Router
from aiogram.filters import Command
from aiogram.types import Message
from app.core.config import settings
from app.bot.services.api_client import api_client

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Создание бота и диспетчера
bot = Bot(token=settings.TELEGRAM_TOKEN)
dp = Dispatcher()

# Создание роутера
router = Router()

@router.message(Command(commands=["start"]))
async def command_start_handler(message: Message) -> None:
    """Обработчик команды /start"""
    user_data = {
        "telegram_id": message.from_user.id,
        "username": message.from_user.username,
        "first_name": message.from_user.first_name,
        "last_name": message.from_user.last_name
    }
    
    # Создание пользователя через API
    result = await api_client.create_user(user_data)
    if result:
        await message.answer(f"Привет, {message.from_user.full_name}! Вы зарегистрированы.")
    else:
        await message.answer(f"Привет, {message.from_user.full_name}!")

@router.message()
async def message_handler(message: Message) -> None:
    """Обработчик всех текстовых сообщений"""
    try:
        # Сохранение сообщения через API
        message_data = {
            "user_id": message.from_user.id,
            "text": message.text
        }
        
        result = await api_client.create_message(message_data)
        if result:
            await message.answer(f"Сообщение получено и сохранено (ID: {result.get('id')})")
        else:
            await message.answer("Сообщение получено, но не удалось сохранить.")
    except Exception as e:
        logger.error(f"Ошибка обработки сообщения: {e}")
        await message.answer("Произошла ошибка при обработке вашего сообщения.")

# Регистрация роутера
dp.include_router(router)

async def main() -> None:
    """Главная функция запуска бота"""
    logger.info("Запуск бота...")
    try:
        await dp.start_polling(bot)
    except Exception as e:
        logger.error(f"Ошибка запуска бота: {e}")
    finally:
        await bot.session.close()
        await api_client.close()

if __name__ == "__main__":
    asyncio.run(main())

Использование Redis для обмена данными между ботом и API

# app/core/redis_client.py (расширенная версия)
import redis
import json
from typing import Optional, Dict, Any
from app.core.config import settings

# Создание клиента Redis
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)

def publish_event(channel: str, event_data: Dict[str, Any]):
    """Публикация события в Redis"""
    redis_client.publish(channel, json.dumps(event_data))

def subscribe_to_channel(channel: str):
    """Подписка на канал Redis"""
    pubsub = redis_client.pubsub()
    pubsub.subscribe(channel)
    return pubsub

# Функции для работы с состоянием пользователей (уже были)
def set_user_state(user_id: int, state: str):
    redis_client.set(f"user:{user_id}:state", state)

def get_user_state(user_id: int) -> str:
    return redis_client.get(f"user:{user_id}:state") or "default"

def set_user_data(user_id: int, key: str, value):
    redis_client.hset(f"user:{user_id}:data", key, json.dumps(value))

def get_user_data(user_id: int, key: str):
    value = redis_client.hget(f"user:{user_id}:data", key)
    return json.loads(value) if value else None

def delete_user_data(user_id: int, key: str):
    redis_client.hdel(f"user:{user_id}:data", key)
# app/bot/services/notification_service.py
from app.core.redis_client import subscribe_to_channel
import asyncio
import json
import logging

logger = logging.getLogger(__name__)

class NotificationService:
    def __init__(self, bot):
        self.bot = bot
        self.pubsub = subscribe_to_channel("notifications")
    
    async def listen_for_notifications(self):
        """Прослушивание уведомлений из Redis"""
        for message in self.pubsub.listen():
            if message['type'] == 'message':
                try:
                    data = json.loads(message['data'])
                    await self.handle_notification(data)
                except Exception as e:
                    logger.error(f"Ошибка обработки уведомления: {e}")
    
    async def handle_notification(self, data: dict):
        """Обработка уведомления"""
        notification_type = data.get('type')
        user_id = data.get('user_id')
        message = data.get('message')
        
        if notification_type == 'reminder':
            try:
                await self.bot.send_message(chat_id=user_id, text=message)
            except Exception as e:
                logger.error(f"Ошибка отправки уведомления пользователю {user_id}: {e}")

# Использование в боте
# notification_service = NotificationService(bot)
# asyncio.create_task(notification_service.listen_for_notifications())

Обеспечение согласованности данных между SQLite и Redis

# app/services/data_sync.py
from app.core.redis_client import redis_client
from app.database.database import get_db
from app.models.user import User
import json
import logging

logger = logging.getLogger(__name__)

def sync_user_to_cache(user_id: int, db):
    """Синхронизация пользователя из БД в Redis кэш"""
    try:
        user = db.query(User).filter(User.id == user_id).first()
        if user:
            user_data = {
                "id": user.id,
                "telegram_id": user.telegram_id,
                "username": user.username,
                "first_name": user.first_name,
                "last_name": user.last_name
            }
            # Сохраняем в Redis на 10 минут
            redis_client.setex(f"user:{user_id}", 600, json.dumps(user_data))
            logger.info(f"Пользователь {user_id} синхронизирован в кэш")
    except Exception as e:
        logger.error(f"Ошибка синхронизации пользователя {user_id}: {e}")

def get_user_with_cache(user_id: int, db):
    """Получение пользователя с использованием кэша"""
    try:
        # Попытка получить из кэша
        cached_user = redis_client.get(f"user:{user_id}")
        if cached_user:
            return json.loads(cached_user)
        
        # Если нет в кэше, получаем из БД
        user = db.query(User).filter(User.id == user_id).first()
        if user:
            user_data = {
                "id": user.id,
                "telegram_id": user.telegram_id,
                "username": user.username,
                "first_name": user.first_name,
                "last_name": user.last_name
            }
            # Сохраняем в кэш
            redis_client.setex(f"user:{user_id}", 600, json.dumps(user_data))
            return user_data
    except Exception as e:
        logger.error(f"Ошибка получения пользователя {user_id}: {e}")
    
    return None

Обработка ошибок во всей системе

# app/core/exceptions.py
class ChatBotException(Exception):
    """Базовое исключение для чат-бота"""
    pass

class UserNotFoundException(ChatBotException):
    """Пользователь не найден"""
    pass

class MessageProcessingException(ChatBotException):
    """Ошибка обработки сообщения"""
    pass

class ExternalServiceException(ChatBotException):
    """Ошибка внешнего сервиса"""
    pass

# app/core/error_handler.py
import logging
from fastapi import HTTPException
from app.core.exceptions import ChatBotException

logger = logging.getLogger(__name__)

def handle_exception(exc: Exception):
    """Универсальный обработчик исключений"""
    if isinstance(exc, ChatBotException):
        logger.error(f"ChatBot ошибка: {exc}")
        # Здесь можно отправить уведомление в мониторинг
        raise HTTPException(status_code=400, detail=str(exc))
    elif isinstance(exc, HTTPException):
        logger.error(f"HTTP ошибка: {exc.detail}")
        raise exc
    else:
        logger.error(f"Неожиданная ошибка: {exc}")
        raise HTTPException(status_code=500, detail="Внутренняя ошибка сервера")

9) Продвинутые возможности

Интеграция с LangChain

pip install langchain langchain-openai
# app/services/llm_service.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import os
from typing import Dict, Any

class LLMService:
    def __init__(self):
        self.llm = ChatOpenAI(
            model="gpt-4o-mini",
            temperature=0.7,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )
        self.parser = StrOutputParser()
    
    def create_chain(self, system_prompt: str):
        """Создание цепочки для обработки запросов"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", "{input}")
        ])
        return prompt | self.llm | self.parser
    
    async def process_message(self, message: str, context: Dict[str, Any] = None) -> str:
        """Обработка сообщения с помощью LLM"""
        try:
            # Создание контекста для промпта
            system_prompt = "Вы полезный ассистент. Отвечайте кратко и по делу."
            if context:
                system_prompt += f"\nКонтекст пользователя: {context}"
            
            chain = self.create_chain(system_prompt)
            response = chain.invoke({"input": message})
            return response
        except Exception as e:
            return f"Извините, произошла ошибка при обработке запроса: {str(e)}"

# Инициализация сервиса
llm_service = LLMService()
# app/api/llm.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any
from app.services.llm_service import llm_service

router = APIRouter(prefix="/llm", tags=["llm"])

class LLMRequest(BaseModel):
    message: str
    context: Optional[Dict[str, Any]] = None

class LLMResponse(BaseModel):
    response: str

@router.post("/process", response_model=LLMResponse)
async def process_message(request: LLMRequest):
    """Обработка сообщения с помощью LLM"""
    try:
        response = await llm_service.process_message(request.message, request.context)
        return LLMResponse(response=response)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Background tasks в FastAPI

# app/tasks/message_processor.py
from fastapi import BackgroundTasks
from app.database.database import get_db
from app.models.message import Message
from app.services.llm_service import llm_service
import logging

logger = logging.getLogger(__name__)

async def process_message_background(message_id: int, message_text: str, user_id: int):
    """Фоновая задача обработки сообщения"""
    try:
        logger.info(f"Начало обработки сообщения {message_id}")
        
        # Обработка с помощью LLM
        response = await llm_service.process_message(message_text)
        
        # Сохранение ответа в базу данных
        # Здесь можно добавить логику сохранения ответа
        
        logger.info(f"Сообщение {message_id} обработано успешно")
    except Exception as e:
        logger.error(f"Ошибка обработки сообщения {message_id}: {e}")

# Использование в API
from fastapi import APIRouter, Depends

router = APIRouter(prefix="/messages", tags=["messages"])

@router.post("/")
async def create_message(
    message: Message, 
    background_tasks: BackgroundTasks,
    db = Depends(get_db)
):
    # Сохранение сообщения в БД
    db_message = Message(**message.dict())
    db.add(db_message)
    db.commit()
    db.refresh(db_message)
    
    # Добавление фоновой задачи
    background_tasks.add_task(
        process_message_background, 
        db_message.id, 
        db_message.text, 
        db_message.user_id
    )
    
    return db_message

Логирование и мониторинг

# app/core/logging.py
import logging
import logging.handlers
import os
from datetime import datetime

def setup_logging():
    """Настройка логирования"""
    # Создание директории для логов
    log_dir = "logs"
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # Формат логов
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Корневой логгер
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    
    # Файловый обработчик с ротацией
    file_handler = logging.handlers.RotatingFileHandler(
        f"{log_dir}/app.log",
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5
    )
    file_handler.setFormatter(formatter)
    root_logger.addHandler(file_handler)
    
    # Консольный обработчик
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)
    
    return root_logger

# Использование в приложении
# logger = setup_logging()
# app/core/monitoring.py
import time
import functools
import logging
from typing import Callable, Any

logger = logging.getLogger(__name__)

def monitor_performance(func: Callable) -> Callable:
    """Декоратор для мониторинга производительности"""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            execution_time = time.time() - start_time
            logger.info(f"{func.__name__} выполнен за {execution_time:.4f} секунд")
            return result
        except Exception as e:
            execution_time = time.time() - start_time
            logger.error(f"{func.__name__} завершился с ошибкой за {execution_time:.4f} секунд: {e}")
            raise
    return wrapper

# Пример использования
@monitor_performance
async def process_user_message(message: str) -> str:
    # Логика обработки сообщения
    time.sleep(0.1)  # Имитация обработки
    return f"Обработано: {message}"

Аутентификация и авторизация

# app/core/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from typing import Optional
from datetime import datetime, timedelta
from app.core.config import settings

security = HTTPBearer()

# Секретный ключ для JWT (в production используйте env переменную)
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """Создание JWT токена"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Проверка JWT токена"""
    try:
        token = credentials.credentials
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.PyJWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Неверный токен",
            headers={"WWW-Authenticate": "Bearer"},
        )

# Пример защищенного маршрута
from fastapi import APIRouter

router = APIRouter(prefix="/admin", tags=["admin"])

@router.get("/protected")
async def protected_route(user: dict = Depends(verify_token)):
    return {"message": "Защищенный маршрут", "user": user}

10) Тестирование

Установка зависимостей для тестирования

pip install pytest pytest-asyncio httpx aiounittest

Модульное тестирование (pytest) для бизнес-логики

# tests/test_llm_service.py
import pytest
from unittest.mock import AsyncMock, patch
from app.services.llm_service import LLMService

@pytest.fixture
def llm_service():
    return LLMService()

@pytest.mark.asyncio
async def test_process_message_success(llm_service):
    """Тест успешной обработки сообщения"""
    with patch.object(llm_service, 'llm', new_callable=AsyncMock) as mock_llm:
        mock_llm.invoke.return_value = "Тестовый ответ"
        
        result = await llm_service.process_message("Привет")
        assert result == "Тестовый ответ"

@pytest.mark.asyncio
async def test_process_message_with_context(llm_service):
    """Тест обработки сообщения с контекстом"""
    with patch.object(llm_service, 'llm', new_callable=AsyncMock) as mock_llm:
        mock_llm.invoke.return_value = "Ответ с контекстом"
        
        context = {"user_name": "Алекс"}
        result = await llm_service.process_message("Привет", context)
        assert result == "Ответ с контекстом"
# tests/test_fsm.py
import pytest
from app.core.fsm import FSMContext

@pytest.mark.asyncio
async def test_fsm_state_management():
    """Тест управления состоянием FSM"""
    fsm = FSMContext(user_id=123)
    
    # Установка состояния
    await fsm.set_state("waiting_for_input")
    state = await fsm.get_state()
    assert state == "waiting_for_input"
    
    # Установка данных
    await fsm.set_data({"name": "Алекс", "age": 30})
    data = await fsm.get_data()
    assert data == {"name": "Алекс", "age": 30}
    
    # Обновление данных
    await fsm.update_data(age=31)
    data = await fsm.get_data()
    assert data == {"name": "Алекс", "age": 31}
    
    # Сброс состояния
    await fsm.reset_state()
    state = await fsm.get_state()
    assert state == "*"

Тестирование API (FastAPI TestClient)

# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.database.database import Base, engine
from sqlalchemy.orm import sessionmaker

# Создание тестовой базы данных
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

@pytest.fixture(scope="session")
def test_db():
    Base.metadata.create_all(bind=engine)
    yield
    Base.metadata.drop_all(bind=engine)

@pytest.fixture
def client(test_db):
    with TestClient(app) as c:
        yield c

def test_read_root(client):
    """Тест корневого маршрута"""
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Добро пожаловать в Chatbot API"}

def test_create_user(client):
    """Тест создания пользователя"""
    user_data = {
        "telegram_id": 123456789,
        "username": "testuser",
        "first_name": "Тест",
        "last_name": "Пользователь"
    }
    
    response = client.post("/users/", json=user_data)
    assert response.status_code == 200
    data = response.json()
    assert data["username"] == "testuser"
    assert "id" in data

def test_get_users(client):
    """Тест получения списка пользователей"""
    response = client.get("/users/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)

Тестирование Aiogram хэндлеров

# tests/test_bot_handlers.py
import pytest
from aiogram.types import Message, User, Chat
from aiogram.fsm.state import State, StatesGroup
from unittest.mock import AsyncMock, Mock
from app.bot.handlers.message_handlers import message_handler

class TestUserState(StatesGroup):
    state = State()

@pytest.mark.asyncio
async def test_message_handler():
    """Тест обработчика сообщений"""
    # Создание мок-объектов
    message = Mock(spec=Message)
    message.from_user = Mock(spec=User)
    message.from_user.id = 123456789
    message.from_user.username = "testuser"
    message.from_user.first_name = "Тест"
    message.from_user.last_name = "Пользователь"
    message.text = "Привет, бот!"
    message.answer = AsyncMock()
    
    # Вызов обработчика
    await message_handler(message)
    
    # Проверка, что был вызван метод answer
    message.answer.assert_called_once()

Интеграционные тесты

# tests/test_integration.py
import pytest
from fastapi.testclient import TestClient
from aiogram import Bot, Dispatcher
from unittest.mock import AsyncMock, patch
from app.main import app
from app.bot.bot import bot, dp

@pytest.fixture
def api_client():
    return TestClient(app)

@pytest.mark.asyncio
async def test_user_registration_flow(api_client):
    """Тест полного цикла регистрации пользователя"""
    # 1. Создание пользователя через API
    user_data = {
        "telegram_id": 987654321,
        "username": "newuser",
        "first_name": "Новый",
        "last_name": "Пользователь"
    }
    
    response = api_client.post("/users/", json=user_data)
    assert response.status_code == 200
    user = response.json()
    assert user["username"] == "newuser"
    
    # 2. Получение пользователя через API
    response = api_client.get(f"/users/{user['id']}")
    assert response.status_code == 200
    retrieved_user = response.json()
    assert retrieved_user["username"] == "newuser"

@pytest.mark.asyncio
async def test_message_processing_flow():
    """Тест обработки сообщения через бота и API"""
    with patch('app.bot.services.api_client.api_client') as mock_api:
        # Мокаем API клиент
        mock_api.create_user.return_value = {"id": 1, "username": "testuser"}
        mock_api.create_message.return_value = {"id": 1, "text": "Тест"}
        
        # Создание тестового сообщения
        from aiogram.types import Message, User, Chat
        
        message = Message(
            message_id=1,
            from_user=User(id=123456789, is_bot=False, first_name="Тест"),
            chat=Chat(id=123456789, type="private"),
            date=1234567890,
            text="Тестовое сообщение"
        )
        
        message.answer = AsyncMock()
        
        # Импорт обработчика
        from app.bot.handlers.message_handlers import message_handler
        
        # Вызов обработчика
        await message_handler(message)
        
        # Проверка вызовов API
        mock_api.create_user.assert_called_once()
        mock_api.create_message.assert_called_once()
        
        # Проверка ответа пользователю
        message.answer.assert_called_once()

Запуск тестов

# Запуск всех тестов
pytest

# Запуск тестов с подробным выводом
pytest -v

# Запуск тестов с покрытием кода
pytest --cov=app tests/

# Запуск конкретного теста
pytest tests/test_api.py::test_read_root

# Запуск тестов в параллельном режиме
pytest -n auto

11) Деплой и Best Practices

Подготовка к продакшену: конфигурации, секреты

# app/core/config.py (расширенная версия)
import os
from dotenv import load_dotenv

load_dotenv()

class Settings:
    # Основные настройки
    PROJECT_NAME: str = "Chatbot API"
    VERSION: str = "1.0.0"
    DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true"
    
    # Телеграм
    TELEGRAM_TOKEN: str = os.getenv("TELEGRAM_TOKEN")
    
    # База данных
    DATABASE_URL: str = os.getenv("DATABASE_URL", "sqlite:///./data/chatbot.db")
    
    # Redis
    REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379/0")
    
    # API
    API_BASE_URL: str = os.getenv("API_BASE_URL", "http://localhost:8000")
    
    # OpenAI
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
    
    # Безопасность
    SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
    ACCESS_TOKEN_EXPIRE_MINUTES: int = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30))
    
    # Лимиты
    MESSAGE_RATE_LIMIT: int = int(os.getenv("MESSAGE_RATE_LIMIT", 10))  # сообщений в минуту
    
    def validate(self):
        """Проверка обязательных переменных"""
        required_vars = []
        if not self.TELEGRAM_TOKEN:
            required_vars.append("TELEGRAM_TOKEN")
        if not self.SECRET_KEY or self.SECRET_KEY == "your-secret-key-change-in-production":
            required_vars.append("SECRET_KEY")
            
        if required_vars:
            raise ValueError(f"Следующие переменные окружения должны быть установлены: {', '.join(required_vars)}")

settings = Settings()
# .env.production (пример)
DEBUG=False
TELEGRAM_TOKEN=ваш_токен_бота
DATABASE_URL=postgresql://user:password@db:5432/chatbot
REDIS_URL=redis://redis:6379/0
API_BASE_URL=http://api:8000
SECRET_KEY=ваш_секретный_ключ
OPENAI_API_KEY=ваш_openai_ключ
MESSAGE_RATE_LIMIT=5

Использование docker-compose.prod.yml

# docker-compose.prod.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    restart: unless-stopped
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    networks:
      - chatbot_network

  db:
    image: postgres:15-alpine
    restart: unless-stopped
    environment:
      POSTGRES_DB: chatbot
      POSTGRES_USER: chatbot_user
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - chatbot_network

  api:
    build: .
    restart: unless-stopped
    environment:
      - DATABASE_URL=postgresql://chatbot_user:${DB_PASSWORD}@db:5432/chatbot
      - REDIS_URL=redis://redis:6379/0
      - TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
      - SECRET_KEY=${SECRET_KEY}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DEBUG=False
    volumes:
      - ./logs:/app/logs
    depends_on:
      - db
      - redis
    networks:
      - chatbot_network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  bot:
    build:
      context: .
      dockerfile: Dockerfile.bot
    restart: unless-stopped
    environment:
      - TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
      - API_BASE_URL=http://api:8000
      - REDIS_URL=redis://redis:6379/0
      - DEBUG=False
    depends_on:
      - api
      - redis
    networks:
      - chatbot_network
    healthcheck:
      test: ["CMD", "python", "-c", "import aiogram; print('OK')"]
      interval: 30s
      timeout: 10s
      retries: 3

  nginx:
    image: nginx:alpine
    restart: unless-stopped
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - api
    networks:
      - chatbot_network

volumes:
  redis_data:
  postgres_data:

networks:
  chatbot_network:
    driver: bridge

Healthchecks

# app/api/health.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.database.database import get_db
from app.models.user import User
import redis
from app.core.config import settings

router = APIRouter(prefix="/health", tags=["health"])

@router.get("/")
async def health_check(db: Session = Depends(get_db)):
    """Проверка состояния приложения"""
    try:
        # Проверка базы данных
        db.query(User).first()
        db_status = "ok"
    except Exception as e:
        db_status = f"error: {str(e)}"
    
    try:
        # Проверка Redis
        redis_client = redis.from_url(settings.REDIS_URL)
        redis_client.ping()
        redis_status = "ok"
    except Exception as e:
        redis_status = f"error: {str(e)}"
    
    return {
        "status": "ok" if db_status == "ok" and redis_status == "ok" else "error",
        "database": db_status,
        "redis": redis_status,
        "version": settings.VERSION
    }
# app/main.py (добавление healthcheck)
from app.api import health

app.include_router(health.router)

Резервное копирование данных (SQLite, Redis)

# backup.sh
#!/bin/bash

# Настройки
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=7

# Создание директории для бэкапов
mkdir -p $BACKUP_DIR

# Бэкап SQLite (если используется)
if [ -f "./data/chatbot.db" ]; then
    cp ./data/chatbot.db $BACKUP_DIR/chatbot_backup_$DATE.db
    echo "SQLite backup created: $BACKUP_DIR/chatbot_backup_$DATE.db"
fi

# Бэкап Redis
docker-compose exec redis redis-cli BGSAVE
echo "Redis backup initiated"

# Удаление старых бэкапов
find $BACKUP_DIR -name "chatbot_backup_*.db" -mtime +$RETENTION_DAYS -delete
echo "Old backups cleaned up"

echo "Backup completed at $DATE"
# docker-compose.backup.yml
version: '3.8'

services:
  backup:
    image: alpine
    volumes:
      - ./data:/app/data
      - ./backups:/app/backups
      - ./backup.sh:/app/backup.sh
    command: >
      sh -c "
        chmod +x /app/backup.sh &&
        crond -f -l 8 &
        /app/backup.sh &&
        tail -f /dev/null
      "
    environment:
      - CRON_SCHEDULE="0 2 * * *"  # Ежедневно в 2:00
    depends_on:
      - redis

Мониторинг и логирование в контейнерах

# app/core/logging.py (расширенная версия)
import logging
import logging.handlers
import os
from datetime import datetime

def setup_logging(log_level: str = "INFO"):
    """Настройка логирования для продакшена"""
    # Создание директории для логов
    log_dir = "/app/logs" if os.path.exists("/app/logs") else "./logs"
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # Формат логов
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Корневой логгер
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, log_level.upper()))
    
    # Файловый обработчик с ротацией
    file_handler = logging.handlers.RotatingFileHandler(
        f"{log_dir}/app.log",
        maxBytes=50*1024*1024,  # 50MB
        backupCount=10
    )
    file_handler.setFormatter(formatter)
    root_logger.addHandler(file_handler)
    
    # Консольный обработчик (для Docker)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)
    
    return root_logger

# Использование в приложении
# logger = setup_logging(os.getenv("LOG_LEVEL", "INFO"))
# docker-compose.logging.yml (для ELK стека)
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data

  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - ./logs:/app/logs
    depends_on:
      - elasticsearch

volumes:
  es_data:

Best Practices

Советы по деплою:
  • Используйте переменные окружения для конфигурации
  • Разделяйте конфигурации для разработки и продакшена
  • Настройте healthchecks для всех сервисов
  • Используйте репликацию и балансировку нагрузки при необходимости
  • Регулярно создавайте резервные копии данных
  • Мониторьте логи и метрики производительности
  • Используйте HTTPS в продакшене

12) Практические проекты

Проект 1: Простой бот-напоминатель с персистентными данными

# app/bot/handlers/reminder_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext
from datetime import datetime, timedelta
import re

router = Router()

@router.message(Command(commands=["remind"]))
async def start_reminder(message: Message, state: FSMContext):
    """Начало создания напоминания"""
    await state.set_state("waiting_for_reminder_text")
    await message.answer("Что вам напомнить?")

@router.message(lambda message: not message.text.startswith("/"))
async def process_reminder_text(message: Message, state: FSMContext):
    """Обработка текста напоминания"""
    current_state = await state.get_state()
    
    if current_state == "waiting_for_reminder_text":
        await state.update_data(reminder_text=message.text)
        await state.set_state("waiting_for_reminder_time")
        await message.answer("Через сколько минут напомнить? (введите число)")
    
    elif current_state == "waiting_for_reminder_time":
        try:
            minutes = int(message.text)
            await state.update_data(reminder_minutes=minutes)
            
            # Получение данных
            data = await state.get_data()
            text = data.get("reminder_text")
            
            # Здесь можно добавить сохранение в БД и планирование
            
            await message.answer(f"Хорошо, я напомню вам '{text}' через {minutes} минут.")
            await state.reset_state()
        except ValueError:
            await message.answer("Пожалуйста, введите корректное число минут:")
# app/models/reminder.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean
from sqlalchemy.sql import func
from app.database.database import Base

class Reminder(Base):
    __tablename__ = "reminders"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    text = Column(String)
    remind_at = Column(DateTime(timezone=True))
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    is_completed = Column(Boolean, default=False)

Проект 2: Бот с многошаговым опросом (используя FSM)

# app/bot/handlers/survey_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext

router = Router()

# Вопросы опросника
SURVEY_QUESTIONS = [
    "Как вас зовут?",
    "Сколько вам лет?",
    "Какой ваш любимый цвет?",
    "Чем вы занимаетесь?",
    "Что вам нравится в нашем боте?"
]

@router.message(Command(commands=["survey"]))
async def start_survey(message: Message, state: FSMContext):
    """Начало опроса"""
    await state.set_state("survey_question_0")
    await state.update_data(current_question=0)
    await message.answer(SURVEY_QUESTIONS[0])

@router.message(lambda message: not message.text.startswith("/"))
async def process_survey_answer(message: Message, state: FSMContext):
    """Обработка ответов на опрос"""
    current_state = await state.get_state()
    
    if current_state and current_state.startswith("survey_question_"):
        # Сохранение ответа
        data = await state.get_data()
        current_question = data.get("current_question", 0)
        answers = data.get("survey_answers", {})
        answers[current_question] = message.text
        await state.update_data(survey_answers=answers)
        
        # Переход к следующему вопросу
        next_question = current_question + 1
        if next_question < len(SURVEY_QUESTIONS):
            await state.set_state(f"survey_question_{next_question}")
            await state.update_data(current_question=next_question)
            await message.answer(SURVEY_QUESTIONS[next_question])
        else:
            # Опрос завершен
            await finish_survey(message, state, answers)

async def finish_survey(message: Message, state: FSMContext, answers: dict):
    """Завершение опроса и вывод результатов"""
    result = "Спасибо за участие в опросе!\n\nВаши ответы:\n"
    for i, (question, answer) in enumerate(zip(SURVEY_QUESTIONS, answers.values())):
        result += f"{i+1}. {question}\n   Ответ: {answer}\n\n"
    
    await message.answer(result)
    await state.reset_state()

Проект 3: Бот с интеграцией ИИ и очередями задач

# app/tasks/ai_processing.py
from app.core.redis_client import enqueue_task, dequeue_task
from app.services.llm_service import llm_service
import asyncio
import json
import logging

logger = logging.getLogger(__name__)

async def ai_processing_worker():
    """Воркер для обработки задач ИИ"""
    logger.info("Запуск AI processing worker")
    
    while True:
        try:
            # Получение задачи из очереди
            task_data = dequeue_task("ai_processing_queue")
            if task_data:
                await process_ai_task(task_data)
            else:
                # Если нет задач, ждем немного
                await asyncio.sleep(1)
        except Exception as e:
            logger.error(f"Ошибка в AI worker: {e}")
            await asyncio.sleep(5)

async def process_ai_task(task_data: dict):
    """Обработка задачи ИИ"""
    try:
        user_id = task_data.get("user_id")
        message_text = task_data.get("message_text")
        
        logger.info(f"Обработка сообщения от пользователя {user_id}: {message_text}")
        
        # Обработка с помощью LLM
        response = await llm_service.process_message(message_text)
        
        # Отправка ответа пользователю (через Redis или напрямую)
        # Здесь можно использовать Redis pub/sub для отправки сообщения боту
        from app.core.redis_client import publish_event
        publish_event("bot_messages", {
            "user_id": user_id,
            "message": response,
            "type": "ai_response"
        })
        
        logger.info(f"Ответ отправлен пользователю {user_id}")
    except Exception as e:
        logger.error(f"Ошибка обработки задачи ИИ: {e}")

# app/bot/handlers/ai_handlers.py
from aiogram import Router
from aiogram.types import Message
from app.core.redis_client import enqueue_task

router = Router()

@router.message()
async def ai_message_handler(message: Message):
    """Обработчик сообщений с использованием ИИ"""
    # Добавление задачи в очередь
    task_data = {
        "user_id": message.from_user.id,
        "message_text": message.text,
        "timestamp": str(datetime.now())
    }
    enqueue_task("ai_processing_queue", task_data)
    
    await message.answer("Ваш запрос принят. Обрабатываю с помощью ИИ...")

Проект 4: Бот для управления задачами

# app/models/task.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum
from sqlalchemy.sql import func
from app.database.database import Base
import enum

class TaskStatus(enum.Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    CANCELLED = "cancelled"

class Task(Base):
    __tablename__ = "tasks"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    title = Column(String)
    description = Column(String)
    status = Column(Enum(TaskStatus), default=TaskStatus.PENDING)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    due_date = Column(DateTime(timezone=True))
# app/bot/handlers/task_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext
import re
from datetime import datetime

router = Router()

@router.message(Command(commands=["addtask"]))
async def start_add_task(message: Message, state: FSMContext):
    """Начало добавления задачи"""
    await state.set_state("waiting_for_task_title")
    await message.answer("Введите название задачи:")

@router.message(Command(commands=["tasks"]))
async def list_tasks(message: Message):
    """Просмотр списка задач"""
    # Здесь должна быть логика получения задач из API
    await message.answer("Список ваших задач:\n1. Пример задачи (в работе)")

@router.message(lambda message: not message.text.startswith("/"))
async def process_task_creation(message: Message, state: FSMContext):
    """Обработка создания задачи"""
    current_state = await state.get_state()
    
    if current_state == "waiting_for_task_title":
        await state.update_data(task_title=message.text)
        await state.set_state("waiting_for_task_description")
        await message.answer("Введите описание задачи:")
    
    elif current_state == "waiting_for_task_description":
        await state.update_data(task_description=message.text)
        await state.set_state("waiting_for_task_due_date")
        await message.answer("Введите дату выполнения (в формате ДД.ММ.ГГГГ):")
    
    elif current_state == "waiting_for_task_due_date":
        try:
            due_date = datetime.strptime(message.text, "%d.%m.%Y")
            data = await state.get_data()
            
            # Здесь должна быть логика сохранения задачи через API
            
            await message.answer(f"Задача '{data['task_title']}' создана!")
            await state.reset_state()
        except ValueError:
            await message.answer("Неверный формат даты. Пожалуйста, используйте ДД.ММ.ГГГГ:")

13) FAQ и ресурсы

Как обрабатывать ошибки в асинхронных функциях?

В асинхронных функциях используйте try/except блоки и логируйте ошибки:

async def async_function():
    try:
        # Асинхронная операция
        result = await some_async_operation()
        return result
    except Exception as e:
        logger.error(f"Ошибка в async_function: {e}")
        # Можно отправить уведомление или повторить попытку
        raise
Как оптимизировать производительность бота?
  • Используйте кэширование для часто запрашиваемых данных
  • Выносите тяжелые операции в фоновые задачи
  • Ограничивайте количество одновременных запросов к внешним API
  • Используйте connection pooling для базы данных
  • Мониторьте использование памяти и CPU
Как реализовать rate limiting?
# app/core/rate_limiter.py
import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = defaultdict(list)
    
    def is_allowed(self, user_id: int) -> bool:
        now = time.time()
        # Удаление старых запросов
        self.requests[user_id] = [
            req_time for req_time in self.requests[user_id] 
            if now - req_time < self.time_window
        ]
        
        # Проверка лимита
        if len(self.requests[user_id]) < self.max_requests:
            self.requests[user_id].append(now)
            return True
        return False

# Использование
rate_limiter = RateLimiter(max_requests=10, time_window=60)  # 10 запросов в минуту

async def message_handler(message: Message):
    if not rate_limiter.is_allowed(message.from_user.id):
        await message.answer("Вы превысили лимит запросов. Пожалуйста, подождите.")
        return
    # Обработка сообщения...
Как добавить поддержку нескольких языков?
# app/core/i18n.py
from typing import Dict

translations: Dict[str, Dict[str, str]] = {
    "ru": {
        "start_message": "Привет! Я чат-бот.",
        "help_message": "Доступные команды: /start, /help"
    },
    "en": {
        "start_message": "Hello! I'm a chatbot.",
        "help_message": "Available commands: /start, /help"
    }
}

def get_text(key: str, language: str = "ru") -> str:
    return translations.get(language, {}).get(key, key)

# Использование
await message.answer(get_text("start_message", "ru"))
Как реализовать webhook для Telegram бота?

Для высоконагруженных ботов рекомендуется использовать webhook вместо polling:

# app/bot/webhook.py
from aiogram import Bot, Dispatcher
from fastapi import FastAPI, Request, BackgroundTasks
import hashlib
import hmac

app = FastAPI()

WEBHOOK_SECRET = "your-webhook-secret"
WEBHOOK_PATH = "/webhook/telegram"

@app.post(WEBHOOK_PATH)
async def telegram_webhook(request: Request, background_tasks: BackgroundTasks):
    # Проверка подписи (опционально, но рекомендуется)
    signature = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
    if signature != WEBHOOK_SECRET:
        return {"error": "Invalid signature"}
    
    # Получение данных
    update_data = await request.json()
    
    # Обработка в фоне
    background_tasks.add_task(process_update, update_data)
    
    return {"ok": True}

async def process_update(update_data: dict):
    """Обработка обновления от Telegram"""
    # Здесь создается Update объект и передается в диспетчер
    from aiogram.types import Update
    update = Update(**update_data)
    await dp.feed_update(bot, update)

# Установка webhook
async def set_webhook():
    webhook_url = f"https://yourdomain.com{WEBHOOK_PATH}"
    await bot.set_webhook(webhook_url, secret_token=WEBHOOK_SECRET)
Как масштабировать приложение?
  • Используйте горизонтальное масштабирование для ботов (несколько экземпляров)
  • Выносите тяжелые операции в отдельные микросервисы
  • Используйте Redis для синхронизации состояния между экземплярами
  • Реализуйте load balancing для API
  • Используйте message queues для обработки фоновых задач

Полезные ресурсы

Сообщества и форумы

Советы по дальнейшему изучению:
  • Изучите advanced features FastAPI (dependencies, middleware, events)
  • Познакомьтесь с другими базами данных (PostgreSQL, MongoDB)
  • Изучите message brokers (RabbitMQ, Apache Kafka)
  • Попробуйте облачные платформы для деплоя (Heroku, AWS, Google Cloud)
  • Изучите паттерны проектирования для микросервисов