С FastAPI, Redis, SQLite, Docker Compose и Aiogram
Этот курс посвящен созданию надежных, масштабируемых и поддерживаемых чат-ботов с использованием современного стека технологий: FastAPI для создания API, Redis для управления состоянием и очередями, SQLite для персистентного хранения данных, Docker Compose для оркестрации сервисов и Aiogram для разработки Telegram-ботов.
# Установка Python (если еще не установлен)
# Скачайте с python.org или используйте менеджер пакетов
# Установка Docker и Docker Compose
# Следуйте инструкциям на https://docs.docker.com/get-docker/
# и https://docs.docker.com/compose/install/
# Проверка установки
python --version
docker --version
docker-compose --version
# Создание виртуального окружения
python -m venv chatbot-env
# Активация (Linux/macOS)
source chatbot-env/bin/activate
# Активация (Windows)
chatbot-env\Scripts\activate
# Обновление pip
pip install --upgrade pip
# Запуск сервисов
docker-compose up
# Запуск в фоновом режиме
docker-compose up -d
# Остановка сервисов
docker-compose down
# Просмотр логов
docker-compose logs
# Выполнение команды в контейнере
docker-compose exec service_name command
mkdir chatbot-project
cd chatbot-project
# Создание структуры каталогов
mkdir -p app app/api app/bot app/models app/schemas app/database app/core tests
# Создание файлов
touch app/__init__.py
touch app/main.py
touch app/api/__init__.py
touch app/bot/__init__.py
touch app/bot/bot.py
touch app/models/__init__.py
touch app/schemas/__init__.py
touch app/database/__init__.py
touch app/database/database.py
touch app/core/__init__.py
touch app/core/config.py
touch docker-compose.yml
touch Dockerfile
touch requirements.txt
touch .env
touch .gitignore
pip install fastapi uvicorn pydantic
# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI(title="Chatbot API", version="1.0.0")
class User(BaseModel):
id: int
username: str
first_name: Optional[str] = None
last_name: Optional[str] = None
class Message(BaseModel):
id: int
user_id: int
text: str
timestamp: str
# Пример маршрутов
@app.get("/")
def read_root():
return {"message": "Добро пожаловать в Chatbot API"}
@app.get("/users/", response_model=List[User])
def read_users():
return [
User(id=1, username="user1", first_name="Иван", last_name="Иванов"),
User(id=2, username="user2", first_name="Мария", last_name="Петрова")
]
@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int):
return User(id=user_id, username="user1", first_name="Иван", last_name="Иванов")
@app.post("/messages/", response_model=Message)
def create_message(message: Message):
return message
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
FastAPI автоматически генерирует интерактивную документацию:
# app/schemas/user.py
from pydantic import BaseModel, Field, validator
from typing import Optional
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
first_name: Optional[str] = Field(None, max_length=50)
last_name: Optional[str] = Field(None, max_length=50)
email: str
@validator('email')
def validate_email(cls, v):
if '@' not in v:
raise ValueError('Некорректный email')
return v
class UserUpdate(BaseModel):
first_name: Optional[str] = Field(None, max_length=50)
last_name: Optional[str] = Field(None, max_length=50)
# app/main.py (дополнение)
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import logging
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Chatbot API", version="1.0.0")
# Добавление CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Обработчики ошибок
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
logger.error(f"HTTP error: {exc.detail}")
return {"error": exc.detail}
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
logger.error(f"General error: {str(exc)}")
return {"error": "Internal server error"}
pip install sqlalchemy aiosqlite
SQLite - это легковесная встраиваемая СУБД, которая хранит всю базу данных в одном файле. Она идеально подходит для разработки и небольших приложений.
# app/database/database.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.sql import func
import os
# Создание базы данных в памяти для разработки или файл для продакшена
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./chatbot.db")
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Получение сессии базы данных
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# app/models/user.py
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from app.database.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
telegram_id = Column(Integer, unique=True, index=True)
username = Column(String, index=True)
first_name = Column(String)
last_name = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# app/models/message.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database.database import Base
class Message(Base):
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
text = Column(String)
timestamp = Column(DateTime(timezone=True), server_default=func.now())
# Связь с пользователем
user = relationship("User", back_populates="messages")
# app/models/user.py (дополнение)
from sqlalchemy.orm import relationship
class User(Base):
# ... предыдущий код ...
# Связь с сообщениями
messages = relationship("Message", back_populates="user")
# app/api/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.database.database import get_db
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/", response_model=List[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="Пользователь не найден")
return db_user
@router.post("/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.put("/{user_id}", response_model=User)
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="Пользователь не найден")
for key, value in user.dict(exclude_unset=True).items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="Пользователь не найден")
db.delete(db_user)
db.commit()
return {"message": "Пользователь удален"}
pip install redis
Redis - это высокопроизводительное хранилище ключ-значение в памяти, которое поддерживает различные структуры данных, такие как строки, хэши, списки, множества и отсортированные множества.
# app/core/config.py
import os
class Settings:
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./chatbot.db")
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "")
settings = Settings()
# app/core/redis_client.py
import redis
import json
from app.core.config import settings
# Создание клиента Redis
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)
def set_user_state(user_id: int, state: str):
"""Сохранение состояния пользователя"""
redis_client.set(f"user:{user_id}:state", state)
def get_user_state(user_id: int) -> str:
"""Получение состояния пользователя"""
return redis_client.get(f"user:{user_id}:state") or "default"
def set_user_data(user_id: int, key: str, value):
"""Сохранение пользовательских данных"""
redis_client.hset(f"user:{user_id}:data", key, json.dumps(value))
def get_user_data(user_id: int, key: str):
"""Получение пользовательских данных"""
value = redis_client.hget(f"user:{user_id}:data", key)
return json.loads(value) if value else None
def delete_user_data(user_id: int, key: str):
"""Удаление пользовательских данных"""
redis_client.hdel(f"user:{user_id}:data", key)
# app/core/fsm.py
from app.core.redis_client import redis_client
import json
class FSMContext:
def __init__(self, user_id: int):
self.user_id = user_id
self.storage_key = f"fsm:{user_id}"
async def set_state(self, state: str):
"""Установка состояния"""
redis_client.hset(self.storage_key, "state", state)
async def get_state(self) -> str:
"""Получение состояния"""
return redis_client.hget(self.storage_key, "state") or "*"
async def set_data(self, data: dict):
"""Установка данных"""
redis_client.hset(self.storage_key, "data", json.dumps(data))
async def get_data(self) -> dict:
"""Получение данных"""
data = redis_client.hget(self.storage_key, "data")
return json.loads(data) if data else {}
async def update_data(self, **kwargs):
"""Обновление данных"""
current_data = await self.get_data()
current_data.update(kwargs)
await self.set_data(current_data)
async def reset_state(self):
"""Сброс состояния"""
redis_client.delete(self.storage_key)
# app/core/cache.py
from app.core.redis_client import redis_client
import json
from typing import Optional
def cache_set(key: str, value, expiration: int = 3600):
"""Сохранение значения в кэш"""
redis_client.setex(key, expiration, json.dumps(value))
def cache_get(key: str):
"""Получение значения из кэша"""
value = redis_client.get(key)
return json.loads(value) if value else None
def cache_delete(key: str):
"""Удаление значения из кэша"""
redis_client.delete(key)
# Пример использования в API
from app.database.database import get_db
from app.models.user import User
def get_user_cached(user_id: int, db):
"""Получение пользователя с кэшированием"""
# Попытка получить из кэша
cached_user = cache_get(f"user:{user_id}")
if cached_user:
return cached_user
# Если нет в кэше, получаем из БД
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
user_data = {
"id": db_user.id,
"username": db_user.username,
"first_name": db_user.first_name,
"last_name": db_user.last_name
}
# Сохраняем в кэш на 10 минут
cache_set(f"user:{user_id}", user_data, 600)
return user_data
return None
# app/core/queue.py
from app.core.redis_client import redis_client
import json
def enqueue_task(queue_name: str, task_data: dict):
"""Добавление задачи в очередь"""
redis_client.lpush(queue_name, json.dumps(task_data))
def dequeue_task(queue_name: str) -> Optional[dict]:
"""Получение задачи из очереди"""
task = redis_client.brpop(queue_name, timeout=1)
if task:
return json.loads(task[1])
return None
def get_queue_length(queue_name: str) -> int:
"""Получение длины очереди"""
return redis_client.llen(queue_name)
# Пример задачи для обработки сообщений
def enqueue_message_processing(user_id: int, message_text: str):
"""Добавление задачи обработки сообщения в очередь"""
task_data = {
"user_id": user_id,
"message_text": message_text,
"timestamp": str(datetime.now())
}
enqueue_task("message_processing_queue", task_data)
pip install aiogram python-dotenv
# app/bot/bot.py
import asyncio
import logging
from aiogram import Bot, Dispatcher, Router, types
from aiogram.filters import Command
from aiogram.types import Message
from app.core.config import settings
from app.core.fsm import FSMContext
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Создание бота и диспетчера
bot = Bot(token=settings.TELEGRAM_TOKEN)
dp = Dispatcher()
# Создание роутера
router = Router()
@router.message(Command(commands=["start"]))
async def command_start_handler(message: Message) -> None:
"""Обработчик команды /start"""
await message.answer(f"Привет, {message.from_user.full_name}! Я чат-бот.")
@router.message(Command(commands=["help"]))
async def command_help_handler(message: Message) -> None:
"""Обработчик команды /help"""
help_text = """
Доступные команды:
/start - Начать работу
/help - Показать помощь
/status - Показать статус
"""
await message.answer(help_text)
@router.message()
async def echo_handler(message: Message) -> None:
"""Эхо-обработчик для всех сообщений"""
try:
# Отправка сообщения в API для обработки
# Здесь будет интеграция с FastAPI
await message.answer(f"Вы сказали: {message.text}")
except Exception as e:
logger.error(f"Ошибка обработки сообщения: {e}")
await message.answer("Произошла ошибка при обработке вашего сообщения.")
# Регистрация роутера
dp.include_router(router)
async def main() -> None:
"""Главная функция запуска бота"""
logger.info("Запуск бота...")
try:
await dp.start_polling(bot)
except Exception as e:
logger.error(f"Ошибка запуска бота: {e}")
finally:
await bot.session.close()
if __name__ == "__main__":
asyncio.run(main())
# app/bot/handlers/message_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
import aiohttp
from app.core.config import settings
router = Router()
@router.message(Command(commands=["status"]))
async def status_handler(message: Message):
"""Проверка статуса API"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{settings.API_BASE_URL}/health") as response:
if response.status == 200:
await message.answer("✅ API работает нормально")
else:
await message.answer("❌ API недоступно")
except Exception as e:
await message.answer(f"❌ Ошибка подключения к API: {str(e)}")
@router.message()
async def message_handler(message: Message):
"""Обработчик всех текстовых сообщений"""
user_id = message.from_user.id
text = message.text
# Сохранение сообщения в API
try:
async with aiohttp.ClientSession() as session:
user_data = {
"telegram_id": user_id,
"username": message.from_user.username,
"first_name": message.from_user.first_name,
"last_name": message.from_user.last_name
}
# Создание или обновление пользователя
async with session.post(
f"{settings.API_BASE_URL}/users/",
json=user_data
) as response:
if response.status == 200 or response.status == 409: # 409 - уже существует
# Отправка сообщения в API
message_data = {
"user_id": user_id,
"text": text
}
async with session.post(
f"{settings.API_BASE_URL}/messages/",
json=message_data
) as msg_response:
if msg_response.status == 200:
result = await msg_response.json()
await message.answer(f"Сообщение получено: {result.get('id')}")
else:
await message.answer("Ошибка при сохранении сообщения")
else:
await message.answer("Ошибка при регистрации пользователя")
except Exception as e:
await message.answer(f"Ошибка: {str(e)}")
# app/bot/states.py
from enum import Enum
class UserState(Enum):
DEFAULT = "default"
WAITING_FOR_NAME = "waiting_for_name"
WAITING_FOR_AGE = "waiting_for_age"
PROFILE_COMPLETE = "profile_complete"
# app/bot/handlers/fsm_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext
from app.bot.states import UserState
router = Router()
@router.message(Command(commands=["register"]))
async def start_registration(message: Message, state: FSMContext):
"""Начало регистрации пользователя"""
await state.set_state(UserState.WAITING_FOR_NAME)
await message.answer("Пожалуйста, введите ваше имя:")
@router.message(lambda message: message.text and not message.text.startswith("/"))
async def process_name(message: Message, state: FSMContext):
"""Обработка имени пользователя"""
current_state = await state.get_state()
if current_state == UserState.WAITING_FOR_NAME.value:
await state.update_data(name=message.text)
await state.set_state(UserState.WAITING_FOR_AGE)
await message.answer("Спасибо! Теперь введите ваш возраст:")
elif current_state == UserState.WAITING_FOR_AGE.value:
try:
age = int(message.text)
await state.update_data(age=age)
await state.set_state(UserState.PROFILE_COMPLETE)
# Получение всех данных
user_data = await state.get_data()
name = user_data.get("name")
await message.answer(f"Регистрация завершена!\nИмя: {name}\nВозраст: {age}")
await state.reset_state()
except ValueError:
await message.answer("Пожалуйста, введите корректный возраст (число):")
else:
await message.answer("Не понимаю. Используйте /register для начала регистрации.")
# app/bot/keyboards.py
from aiogram.types import ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton
# Обычная клавиатура
def get_main_keyboard():
keyboard = ReplyKeyboardMarkup(
keyboard=[
[KeyboardButton(text="Профиль"), KeyboardButton(text="Настройки")],
[KeyboardButton(text="Помощь"), KeyboardButton(text="О боте")]
],
resize_keyboard=True
)
return keyboard
# Inline-клавиатура
def get_profile_keyboard():
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="Редактировать профиль", callback_data="edit_profile")],
[InlineKeyboardButton(text="Удалить профиль", callback_data="delete_profile")],
[InlineKeyboardButton(text="Назад", callback_data="back_to_main")]
]
)
return keyboard
# app/bot/handlers/callback_handlers.py
from aiogram import Router
from aiogram.types import CallbackQuery
router = Router()
@router.callback_query(lambda c: c.data == "edit_profile")
async def edit_profile_callback(callback: CallbackQuery):
await callback.message.edit_text("Редактирование профиля...")
await callback.answer()
@router.callback_query(lambda c: c.data == "delete_profile")
async def delete_profile_callback(callback: CallbackQuery):
await callback.message.edit_text("Профиль удален.")
await callback.answer()
@router.callback_query(lambda c: c.data == "back_to_main")
async def back_to_main_callback(callback: CallbackQuery):
from app.bot.keyboards import get_main_keyboard
await callback.message.edit_text("Главное меню", reply_markup=get_main_keyboard())
await callback.answer()
# Dockerfile
FROM python:3.11-slim
# Установка зависимостей системы
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Установка рабочей директории
WORKDIR /app
# Копирование файлов зависимостей
COPY requirements.txt .
# Установка Python зависимостей
RUN pip install --no-cache-dir -r requirements.txt
# Копирование исходного кода
COPY . .
# Создание директории для базы данных
RUN mkdir -p /app/data
# Открытие порта
EXPOSE 8000
# Команда запуска
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# Dockerfile.bot
FROM python:3.11-slim
# Установка зависимостей системы
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Установка рабочей директории
WORKDIR /app
# Копирование файлов зависимостей
COPY requirements.txt .
# Установка Python зависимостей
RUN pip install --no-cache-dir -r requirements.txt
# Копирование исходного кода
COPY . .
# Команда запуска
CMD ["python", "app/bot/bot.py"]
# docker-compose.yml
version: '3.8'
services:
# База данных SQLite (в данном случае файл в volume)
# Для SQLite не нужен отдельный сервис, но для других БД может понадобиться
# Redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
# FastAPI приложение
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=sqlite:///./data/chatbot.db
- REDIS_URL=redis://redis:6379/0
volumes:
- ./data:/app/data
depends_on:
- redis
restart: unless-stopped
# Telegram бот
bot:
build:
context: .
dockerfile: Dockerfile.bot
environment:
- TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
- API_BASE_URL=http://api:8000
- REDIS_URL=redis://redis:6379/0
depends_on:
- api
- redis
restart: unless-stopped
volumes:
redis_data:
# .env
TELEGRAM_TOKEN=ваш_токен_бота_от_@BotFather
DATABASE_URL=sqlite:///./data/chatbot.db
REDIS_URL=redis://localhost:6379/0
API_BASE_URL=http://localhost:8000
# app/core/config.py (обновленная версия)
import os
from dotenv import load_dotenv
load_dotenv()
class Settings:
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./data/chatbot.db")
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000")
# Проверка обязательных переменных
def validate(self):
if not self.TELEGRAM_TOKEN:
raise ValueError("TELEGRAM_TOKEN не установлен в .env файле")
settings = Settings()
# Запуск всех сервисов
docker-compose up
# Запуск в фоновом режиме
docker-compose up -d
# Просмотр статуса сервисов
docker-compose ps
# Просмотр логов
docker-compose logs
# Просмотр логов конкретного сервиса
docker-compose logs api
# Остановка сервисов
docker-compose down
# Остановка и удаление volumes
docker-compose down -v
# Пересборка образов
docker-compose build
# Перезапуск конкретного сервиса
docker-compose restart bot
# Масштабирование (если нужно несколько экземпляров бота)
docker-compose up -d --scale bot=3
# app/bot/services/api_client.py
import aiohttp
from typing import Optional, Dict, Any
from app.core.config import settings
import logging
logger = logging.getLogger(__name__)
class APIClient:
def __init__(self):
self.base_url = settings.API_BASE_URL
self.session: Optional[aiohttp.ClientSession] = None
async def get_session(self) -> aiohttp.ClientSession:
"""Получение сессии HTTP клиента"""
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession()
return self.session
async def close(self):
"""Закрытие сессии"""
if self.session and not self.session.closed:
await self.session.close()
async def create_user(self, user_data: Dict[str, Any]) -> Optional[Dict]:
"""Создание пользователя через API"""
try:
session = await self.get_session()
async with session.post(f"{self.base_url}/users/", json=user_data) as response:
if response.status == 200:
return await response.json()
else:
logger.error(f"Ошибка создания пользователя: {response.status}")
return None
except Exception as e:
logger.error(f"Исключение при создании пользователя: {e}")
return None
async def get_user(self, user_id: int) -> Optional[Dict]:
"""Получение пользователя через API"""
try:
session = await self.get_session()
async with session.get(f"{self.base_url}/users/{user_id}") as response:
if response.status == 200:
return await response.json()
else:
return None
except Exception as e:
logger.error(f"Исключение при получении пользователя: {e}")
return None
async def create_message(self, message_data: Dict[str, Any]) -> Optional[Dict]:
"""Создание сообщения через API"""
try:
session = await self.get_session()
async with session.post(f"{self.base_url}/messages/", json=message_data) as response:
if response.status == 200:
return await response.json()
else:
logger.error(f"Ошибка создания сообщения: {response.status}")
return None
except Exception as e:
logger.error(f"Исключение при создании сообщения: {e}")
return None
# Глобальный экземпляр клиента
api_client = APIClient()
# app/bot/bot.py (обновленная версия)
import asyncio
import logging
from aiogram import Bot, Dispatcher, Router
from aiogram.filters import Command
from aiogram.types import Message
from app.core.config import settings
from app.bot.services.api_client import api_client
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Создание бота и диспетчера
bot = Bot(token=settings.TELEGRAM_TOKEN)
dp = Dispatcher()
# Создание роутера
router = Router()
@router.message(Command(commands=["start"]))
async def command_start_handler(message: Message) -> None:
"""Обработчик команды /start"""
user_data = {
"telegram_id": message.from_user.id,
"username": message.from_user.username,
"first_name": message.from_user.first_name,
"last_name": message.from_user.last_name
}
# Создание пользователя через API
result = await api_client.create_user(user_data)
if result:
await message.answer(f"Привет, {message.from_user.full_name}! Вы зарегистрированы.")
else:
await message.answer(f"Привет, {message.from_user.full_name}!")
@router.message()
async def message_handler(message: Message) -> None:
"""Обработчик всех текстовых сообщений"""
try:
# Сохранение сообщения через API
message_data = {
"user_id": message.from_user.id,
"text": message.text
}
result = await api_client.create_message(message_data)
if result:
await message.answer(f"Сообщение получено и сохранено (ID: {result.get('id')})")
else:
await message.answer("Сообщение получено, но не удалось сохранить.")
except Exception as e:
logger.error(f"Ошибка обработки сообщения: {e}")
await message.answer("Произошла ошибка при обработке вашего сообщения.")
# Регистрация роутера
dp.include_router(router)
async def main() -> None:
"""Главная функция запуска бота"""
logger.info("Запуск бота...")
try:
await dp.start_polling(bot)
except Exception as e:
logger.error(f"Ошибка запуска бота: {e}")
finally:
await bot.session.close()
await api_client.close()
if __name__ == "__main__":
asyncio.run(main())
# app/core/redis_client.py (расширенная версия)
import redis
import json
from typing import Optional, Dict, Any
from app.core.config import settings
# Создание клиента Redis
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)
def publish_event(channel: str, event_data: Dict[str, Any]):
"""Публикация события в Redis"""
redis_client.publish(channel, json.dumps(event_data))
def subscribe_to_channel(channel: str):
"""Подписка на канал Redis"""
pubsub = redis_client.pubsub()
pubsub.subscribe(channel)
return pubsub
# Функции для работы с состоянием пользователей (уже были)
def set_user_state(user_id: int, state: str):
redis_client.set(f"user:{user_id}:state", state)
def get_user_state(user_id: int) -> str:
return redis_client.get(f"user:{user_id}:state") or "default"
def set_user_data(user_id: int, key: str, value):
redis_client.hset(f"user:{user_id}:data", key, json.dumps(value))
def get_user_data(user_id: int, key: str):
value = redis_client.hget(f"user:{user_id}:data", key)
return json.loads(value) if value else None
def delete_user_data(user_id: int, key: str):
redis_client.hdel(f"user:{user_id}:data", key)
# app/bot/services/notification_service.py
from app.core.redis_client import subscribe_to_channel
import asyncio
import json
import logging
logger = logging.getLogger(__name__)
class NotificationService:
def __init__(self, bot):
self.bot = bot
self.pubsub = subscribe_to_channel("notifications")
async def listen_for_notifications(self):
"""Прослушивание уведомлений из Redis"""
for message in self.pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
await self.handle_notification(data)
except Exception as e:
logger.error(f"Ошибка обработки уведомления: {e}")
async def handle_notification(self, data: dict):
"""Обработка уведомления"""
notification_type = data.get('type')
user_id = data.get('user_id')
message = data.get('message')
if notification_type == 'reminder':
try:
await self.bot.send_message(chat_id=user_id, text=message)
except Exception as e:
logger.error(f"Ошибка отправки уведомления пользователю {user_id}: {e}")
# Использование в боте
# notification_service = NotificationService(bot)
# asyncio.create_task(notification_service.listen_for_notifications())
# app/services/data_sync.py
from app.core.redis_client import redis_client
from app.database.database import get_db
from app.models.user import User
import json
import logging
logger = logging.getLogger(__name__)
def sync_user_to_cache(user_id: int, db):
"""Синхронизация пользователя из БД в Redis кэш"""
try:
user = db.query(User).filter(User.id == user_id).first()
if user:
user_data = {
"id": user.id,
"telegram_id": user.telegram_id,
"username": user.username,
"first_name": user.first_name,
"last_name": user.last_name
}
# Сохраняем в Redis на 10 минут
redis_client.setex(f"user:{user_id}", 600, json.dumps(user_data))
logger.info(f"Пользователь {user_id} синхронизирован в кэш")
except Exception as e:
logger.error(f"Ошибка синхронизации пользователя {user_id}: {e}")
def get_user_with_cache(user_id: int, db):
"""Получение пользователя с использованием кэша"""
try:
# Попытка получить из кэша
cached_user = redis_client.get(f"user:{user_id}")
if cached_user:
return json.loads(cached_user)
# Если нет в кэше, получаем из БД
user = db.query(User).filter(User.id == user_id).first()
if user:
user_data = {
"id": user.id,
"telegram_id": user.telegram_id,
"username": user.username,
"first_name": user.first_name,
"last_name": user.last_name
}
# Сохраняем в кэш
redis_client.setex(f"user:{user_id}", 600, json.dumps(user_data))
return user_data
except Exception as e:
logger.error(f"Ошибка получения пользователя {user_id}: {e}")
return None
# app/core/exceptions.py
class ChatBotException(Exception):
"""Базовое исключение для чат-бота"""
pass
class UserNotFoundException(ChatBotException):
"""Пользователь не найден"""
pass
class MessageProcessingException(ChatBotException):
"""Ошибка обработки сообщения"""
pass
class ExternalServiceException(ChatBotException):
"""Ошибка внешнего сервиса"""
pass
# app/core/error_handler.py
import logging
from fastapi import HTTPException
from app.core.exceptions import ChatBotException
logger = logging.getLogger(__name__)
def handle_exception(exc: Exception):
"""Универсальный обработчик исключений"""
if isinstance(exc, ChatBotException):
logger.error(f"ChatBot ошибка: {exc}")
# Здесь можно отправить уведомление в мониторинг
raise HTTPException(status_code=400, detail=str(exc))
elif isinstance(exc, HTTPException):
logger.error(f"HTTP ошибка: {exc.detail}")
raise exc
else:
logger.error(f"Неожиданная ошибка: {exc}")
raise HTTPException(status_code=500, detail="Внутренняя ошибка сервера")
pip install langchain langchain-openai
# app/services/llm_service.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import os
from typing import Dict, Any
class LLMService:
def __init__(self):
self.llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
self.parser = StrOutputParser()
def create_chain(self, system_prompt: str):
"""Создание цепочки для обработки запросов"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}")
])
return prompt | self.llm | self.parser
async def process_message(self, message: str, context: Dict[str, Any] = None) -> str:
"""Обработка сообщения с помощью LLM"""
try:
# Создание контекста для промпта
system_prompt = "Вы полезный ассистент. Отвечайте кратко и по делу."
if context:
system_prompt += f"\nКонтекст пользователя: {context}"
chain = self.create_chain(system_prompt)
response = chain.invoke({"input": message})
return response
except Exception as e:
return f"Извините, произошла ошибка при обработке запроса: {str(e)}"
# Инициализация сервиса
llm_service = LLMService()
# app/api/llm.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any
from app.services.llm_service import llm_service
router = APIRouter(prefix="/llm", tags=["llm"])
class LLMRequest(BaseModel):
message: str
context: Optional[Dict[str, Any]] = None
class LLMResponse(BaseModel):
response: str
@router.post("/process", response_model=LLMResponse)
async def process_message(request: LLMRequest):
"""Обработка сообщения с помощью LLM"""
try:
response = await llm_service.process_message(request.message, request.context)
return LLMResponse(response=response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# app/tasks/message_processor.py
from fastapi import BackgroundTasks
from app.database.database import get_db
from app.models.message import Message
from app.services.llm_service import llm_service
import logging
logger = logging.getLogger(__name__)
async def process_message_background(message_id: int, message_text: str, user_id: int):
"""Фоновая задача обработки сообщения"""
try:
logger.info(f"Начало обработки сообщения {message_id}")
# Обработка с помощью LLM
response = await llm_service.process_message(message_text)
# Сохранение ответа в базу данных
# Здесь можно добавить логику сохранения ответа
logger.info(f"Сообщение {message_id} обработано успешно")
except Exception as e:
logger.error(f"Ошибка обработки сообщения {message_id}: {e}")
# Использование в API
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/messages", tags=["messages"])
@router.post("/")
async def create_message(
message: Message,
background_tasks: BackgroundTasks,
db = Depends(get_db)
):
# Сохранение сообщения в БД
db_message = Message(**message.dict())
db.add(db_message)
db.commit()
db.refresh(db_message)
# Добавление фоновой задачи
background_tasks.add_task(
process_message_background,
db_message.id,
db_message.text,
db_message.user_id
)
return db_message
# app/core/logging.py
import logging
import logging.handlers
import os
from datetime import datetime
def setup_logging():
"""Настройка логирования"""
# Создание директории для логов
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# Формат логов
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Корневой логгер
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Файловый обработчик с ротацией
file_handler = logging.handlers.RotatingFileHandler(
f"{log_dir}/app.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Консольный обработчик
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
return root_logger
# Использование в приложении
# logger = setup_logging()
# app/core/monitoring.py
import time
import functools
import logging
from typing import Callable, Any
logger = logging.getLogger(__name__)
def monitor_performance(func: Callable) -> Callable:
"""Декоратор для мониторинга производительности"""
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"{func.__name__} выполнен за {execution_time:.4f} секунд")
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"{func.__name__} завершился с ошибкой за {execution_time:.4f} секунд: {e}")
raise
return wrapper
# Пример использования
@monitor_performance
async def process_user_message(message: str) -> str:
# Логика обработки сообщения
time.sleep(0.1) # Имитация обработки
return f"Обработано: {message}"
# app/core/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from typing import Optional
from datetime import datetime, timedelta
from app.core.config import settings
security = HTTPBearer()
# Секретный ключ для JWT (в production используйте env переменную)
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Создание JWT токена"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Проверка JWT токена"""
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный токен",
headers={"WWW-Authenticate": "Bearer"},
)
# Пример защищенного маршрута
from fastapi import APIRouter
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/protected")
async def protected_route(user: dict = Depends(verify_token)):
return {"message": "Защищенный маршрут", "user": user}
pip install pytest pytest-asyncio httpx aiounittest
# tests/test_llm_service.py
import pytest
from unittest.mock import AsyncMock, patch
from app.services.llm_service import LLMService
@pytest.fixture
def llm_service():
return LLMService()
@pytest.mark.asyncio
async def test_process_message_success(llm_service):
"""Тест успешной обработки сообщения"""
with patch.object(llm_service, 'llm', new_callable=AsyncMock) as mock_llm:
mock_llm.invoke.return_value = "Тестовый ответ"
result = await llm_service.process_message("Привет")
assert result == "Тестовый ответ"
@pytest.mark.asyncio
async def test_process_message_with_context(llm_service):
"""Тест обработки сообщения с контекстом"""
with patch.object(llm_service, 'llm', new_callable=AsyncMock) as mock_llm:
mock_llm.invoke.return_value = "Ответ с контекстом"
context = {"user_name": "Алекс"}
result = await llm_service.process_message("Привет", context)
assert result == "Ответ с контекстом"
# tests/test_fsm.py
import pytest
from app.core.fsm import FSMContext
@pytest.mark.asyncio
async def test_fsm_state_management():
"""Тест управления состоянием FSM"""
fsm = FSMContext(user_id=123)
# Установка состояния
await fsm.set_state("waiting_for_input")
state = await fsm.get_state()
assert state == "waiting_for_input"
# Установка данных
await fsm.set_data({"name": "Алекс", "age": 30})
data = await fsm.get_data()
assert data == {"name": "Алекс", "age": 30}
# Обновление данных
await fsm.update_data(age=31)
data = await fsm.get_data()
assert data == {"name": "Алекс", "age": 31}
# Сброс состояния
await fsm.reset_state()
state = await fsm.get_state()
assert state == "*"
# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.database.database import Base, engine
from sqlalchemy.orm import sessionmaker
# Создание тестовой базы данных
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture(scope="session")
def test_db():
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def client(test_db):
with TestClient(app) as c:
yield c
def test_read_root(client):
"""Тест корневого маршрута"""
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Добро пожаловать в Chatbot API"}
def test_create_user(client):
"""Тест создания пользователя"""
user_data = {
"telegram_id": 123456789,
"username": "testuser",
"first_name": "Тест",
"last_name": "Пользователь"
}
response = client.post("/users/", json=user_data)
assert response.status_code == 200
data = response.json()
assert data["username"] == "testuser"
assert "id" in data
def test_get_users(client):
"""Тест получения списка пользователей"""
response = client.get("/users/")
assert response.status_code == 200
assert isinstance(response.json(), list)
# tests/test_bot_handlers.py
import pytest
from aiogram.types import Message, User, Chat
from aiogram.fsm.state import State, StatesGroup
from unittest.mock import AsyncMock, Mock
from app.bot.handlers.message_handlers import message_handler
class TestUserState(StatesGroup):
state = State()
@pytest.mark.asyncio
async def test_message_handler():
"""Тест обработчика сообщений"""
# Создание мок-объектов
message = Mock(spec=Message)
message.from_user = Mock(spec=User)
message.from_user.id = 123456789
message.from_user.username = "testuser"
message.from_user.first_name = "Тест"
message.from_user.last_name = "Пользователь"
message.text = "Привет, бот!"
message.answer = AsyncMock()
# Вызов обработчика
await message_handler(message)
# Проверка, что был вызван метод answer
message.answer.assert_called_once()
# tests/test_integration.py
import pytest
from fastapi.testclient import TestClient
from aiogram import Bot, Dispatcher
from unittest.mock import AsyncMock, patch
from app.main import app
from app.bot.bot import bot, dp
@pytest.fixture
def api_client():
return TestClient(app)
@pytest.mark.asyncio
async def test_user_registration_flow(api_client):
"""Тест полного цикла регистрации пользователя"""
# 1. Создание пользователя через API
user_data = {
"telegram_id": 987654321,
"username": "newuser",
"first_name": "Новый",
"last_name": "Пользователь"
}
response = api_client.post("/users/", json=user_data)
assert response.status_code == 200
user = response.json()
assert user["username"] == "newuser"
# 2. Получение пользователя через API
response = api_client.get(f"/users/{user['id']}")
assert response.status_code == 200
retrieved_user = response.json()
assert retrieved_user["username"] == "newuser"
@pytest.mark.asyncio
async def test_message_processing_flow():
"""Тест обработки сообщения через бота и API"""
with patch('app.bot.services.api_client.api_client') as mock_api:
# Мокаем API клиент
mock_api.create_user.return_value = {"id": 1, "username": "testuser"}
mock_api.create_message.return_value = {"id": 1, "text": "Тест"}
# Создание тестового сообщения
from aiogram.types import Message, User, Chat
message = Message(
message_id=1,
from_user=User(id=123456789, is_bot=False, first_name="Тест"),
chat=Chat(id=123456789, type="private"),
date=1234567890,
text="Тестовое сообщение"
)
message.answer = AsyncMock()
# Импорт обработчика
from app.bot.handlers.message_handlers import message_handler
# Вызов обработчика
await message_handler(message)
# Проверка вызовов API
mock_api.create_user.assert_called_once()
mock_api.create_message.assert_called_once()
# Проверка ответа пользователю
message.answer.assert_called_once()
# Запуск всех тестов
pytest
# Запуск тестов с подробным выводом
pytest -v
# Запуск тестов с покрытием кода
pytest --cov=app tests/
# Запуск конкретного теста
pytest tests/test_api.py::test_read_root
# Запуск тестов в параллельном режиме
pytest -n auto
# app/core/config.py (расширенная версия)
import os
from dotenv import load_dotenv
load_dotenv()
class Settings:
# Основные настройки
PROJECT_NAME: str = "Chatbot API"
VERSION: str = "1.0.0"
DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true"
# Телеграм
TELEGRAM_TOKEN: str = os.getenv("TELEGRAM_TOKEN")
# База данных
DATABASE_URL: str = os.getenv("DATABASE_URL", "sqlite:///./data/chatbot.db")
# Redis
REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# API
API_BASE_URL: str = os.getenv("API_BASE_URL", "http://localhost:8000")
# OpenAI
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
# Безопасность
SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ACCESS_TOKEN_EXPIRE_MINUTES: int = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30))
# Лимиты
MESSAGE_RATE_LIMIT: int = int(os.getenv("MESSAGE_RATE_LIMIT", 10)) # сообщений в минуту
def validate(self):
"""Проверка обязательных переменных"""
required_vars = []
if not self.TELEGRAM_TOKEN:
required_vars.append("TELEGRAM_TOKEN")
if not self.SECRET_KEY or self.SECRET_KEY == "your-secret-key-change-in-production":
required_vars.append("SECRET_KEY")
if required_vars:
raise ValueError(f"Следующие переменные окружения должны быть установлены: {', '.join(required_vars)}")
settings = Settings()
# .env.production (пример)
DEBUG=False
TELEGRAM_TOKEN=ваш_токен_бота
DATABASE_URL=postgresql://user:password@db:5432/chatbot
REDIS_URL=redis://redis:6379/0
API_BASE_URL=http://api:8000
SECRET_KEY=ваш_секретный_ключ
OPENAI_API_KEY=ваш_openai_ключ
MESSAGE_RATE_LIMIT=5
# docker-compose.prod.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
restart: unless-stopped
volumes:
- redis_data:/data
command: redis-server --appendonly yes
networks:
- chatbot_network
db:
image: postgres:15-alpine
restart: unless-stopped
environment:
POSTGRES_DB: chatbot
POSTGRES_USER: chatbot_user
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- chatbot_network
api:
build: .
restart: unless-stopped
environment:
- DATABASE_URL=postgresql://chatbot_user:${DB_PASSWORD}@db:5432/chatbot
- REDIS_URL=redis://redis:6379/0
- TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
- SECRET_KEY=${SECRET_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DEBUG=False
volumes:
- ./logs:/app/logs
depends_on:
- db
- redis
networks:
- chatbot_network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
bot:
build:
context: .
dockerfile: Dockerfile.bot
restart: unless-stopped
environment:
- TELEGRAM_TOKEN=${TELEGRAM_TOKEN}
- API_BASE_URL=http://api:8000
- REDIS_URL=redis://redis:6379/0
- DEBUG=False
depends_on:
- api
- redis
networks:
- chatbot_network
healthcheck:
test: ["CMD", "python", "-c", "import aiogram; print('OK')"]
interval: 30s
timeout: 10s
retries: 3
nginx:
image: nginx:alpine
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- api
networks:
- chatbot_network
volumes:
redis_data:
postgres_data:
networks:
chatbot_network:
driver: bridge
# app/api/health.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.database.database import get_db
from app.models.user import User
import redis
from app.core.config import settings
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/")
async def health_check(db: Session = Depends(get_db)):
"""Проверка состояния приложения"""
try:
# Проверка базы данных
db.query(User).first()
db_status = "ok"
except Exception as e:
db_status = f"error: {str(e)}"
try:
# Проверка Redis
redis_client = redis.from_url(settings.REDIS_URL)
redis_client.ping()
redis_status = "ok"
except Exception as e:
redis_status = f"error: {str(e)}"
return {
"status": "ok" if db_status == "ok" and redis_status == "ok" else "error",
"database": db_status,
"redis": redis_status,
"version": settings.VERSION
}
# app/main.py (добавление healthcheck)
from app.api import health
app.include_router(health.router)
# backup.sh
#!/bin/bash
# Настройки
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=7
# Создание директории для бэкапов
mkdir -p $BACKUP_DIR
# Бэкап SQLite (если используется)
if [ -f "./data/chatbot.db" ]; then
cp ./data/chatbot.db $BACKUP_DIR/chatbot_backup_$DATE.db
echo "SQLite backup created: $BACKUP_DIR/chatbot_backup_$DATE.db"
fi
# Бэкап Redis
docker-compose exec redis redis-cli BGSAVE
echo "Redis backup initiated"
# Удаление старых бэкапов
find $BACKUP_DIR -name "chatbot_backup_*.db" -mtime +$RETENTION_DAYS -delete
echo "Old backups cleaned up"
echo "Backup completed at $DATE"
# docker-compose.backup.yml
version: '3.8'
services:
backup:
image: alpine
volumes:
- ./data:/app/data
- ./backups:/app/backups
- ./backup.sh:/app/backup.sh
command: >
sh -c "
chmod +x /app/backup.sh &&
crond -f -l 8 &
/app/backup.sh &&
tail -f /dev/null
"
environment:
- CRON_SCHEDULE="0 2 * * *" # Ежедневно в 2:00
depends_on:
- redis
# app/core/logging.py (расширенная версия)
import logging
import logging.handlers
import os
from datetime import datetime
def setup_logging(log_level: str = "INFO"):
"""Настройка логирования для продакшена"""
# Создание директории для логов
log_dir = "/app/logs" if os.path.exists("/app/logs") else "./logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# Формат логов
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Корневой логгер
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level.upper()))
# Файловый обработчик с ротацией
file_handler = logging.handlers.RotatingFileHandler(
f"{log_dir}/app.log",
maxBytes=50*1024*1024, # 50MB
backupCount=10
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Консольный обработчик (для Docker)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
return root_logger
# Использование в приложении
# logger = setup_logging(os.getenv("LOG_LEVEL", "INFO"))
# docker-compose.logging.yml (для ELK стека)
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms1g -Xmx1g
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
ports:
- "5601:5601"
depends_on:
- elasticsearch
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- ./logs:/app/logs
depends_on:
- elasticsearch
volumes:
es_data:
# app/bot/handlers/reminder_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext
from datetime import datetime, timedelta
import re
router = Router()
@router.message(Command(commands=["remind"]))
async def start_reminder(message: Message, state: FSMContext):
"""Начало создания напоминания"""
await state.set_state("waiting_for_reminder_text")
await message.answer("Что вам напомнить?")
@router.message(lambda message: not message.text.startswith("/"))
async def process_reminder_text(message: Message, state: FSMContext):
"""Обработка текста напоминания"""
current_state = await state.get_state()
if current_state == "waiting_for_reminder_text":
await state.update_data(reminder_text=message.text)
await state.set_state("waiting_for_reminder_time")
await message.answer("Через сколько минут напомнить? (введите число)")
elif current_state == "waiting_for_reminder_time":
try:
minutes = int(message.text)
await state.update_data(reminder_minutes=minutes)
# Получение данных
data = await state.get_data()
text = data.get("reminder_text")
# Здесь можно добавить сохранение в БД и планирование
await message.answer(f"Хорошо, я напомню вам '{text}' через {minutes} минут.")
await state.reset_state()
except ValueError:
await message.answer("Пожалуйста, введите корректное число минут:")
# app/models/reminder.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean
from sqlalchemy.sql import func
from app.database.database import Base
class Reminder(Base):
__tablename__ = "reminders"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
text = Column(String)
remind_at = Column(DateTime(timezone=True))
created_at = Column(DateTime(timezone=True), server_default=func.now())
is_completed = Column(Boolean, default=False)
# app/bot/handlers/survey_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext
router = Router()
# Вопросы опросника
SURVEY_QUESTIONS = [
"Как вас зовут?",
"Сколько вам лет?",
"Какой ваш любимый цвет?",
"Чем вы занимаетесь?",
"Что вам нравится в нашем боте?"
]
@router.message(Command(commands=["survey"]))
async def start_survey(message: Message, state: FSMContext):
"""Начало опроса"""
await state.set_state("survey_question_0")
await state.update_data(current_question=0)
await message.answer(SURVEY_QUESTIONS[0])
@router.message(lambda message: not message.text.startswith("/"))
async def process_survey_answer(message: Message, state: FSMContext):
"""Обработка ответов на опрос"""
current_state = await state.get_state()
if current_state and current_state.startswith("survey_question_"):
# Сохранение ответа
data = await state.get_data()
current_question = data.get("current_question", 0)
answers = data.get("survey_answers", {})
answers[current_question] = message.text
await state.update_data(survey_answers=answers)
# Переход к следующему вопросу
next_question = current_question + 1
if next_question < len(SURVEY_QUESTIONS):
await state.set_state(f"survey_question_{next_question}")
await state.update_data(current_question=next_question)
await message.answer(SURVEY_QUESTIONS[next_question])
else:
# Опрос завершен
await finish_survey(message, state, answers)
async def finish_survey(message: Message, state: FSMContext, answers: dict):
"""Завершение опроса и вывод результатов"""
result = "Спасибо за участие в опросе!\n\nВаши ответы:\n"
for i, (question, answer) in enumerate(zip(SURVEY_QUESTIONS, answers.values())):
result += f"{i+1}. {question}\n Ответ: {answer}\n\n"
await message.answer(result)
await state.reset_state()
# app/tasks/ai_processing.py
from app.core.redis_client import enqueue_task, dequeue_task
from app.services.llm_service import llm_service
import asyncio
import json
import logging
logger = logging.getLogger(__name__)
async def ai_processing_worker():
"""Воркер для обработки задач ИИ"""
logger.info("Запуск AI processing worker")
while True:
try:
# Получение задачи из очереди
task_data = dequeue_task("ai_processing_queue")
if task_data:
await process_ai_task(task_data)
else:
# Если нет задач, ждем немного
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Ошибка в AI worker: {e}")
await asyncio.sleep(5)
async def process_ai_task(task_data: dict):
"""Обработка задачи ИИ"""
try:
user_id = task_data.get("user_id")
message_text = task_data.get("message_text")
logger.info(f"Обработка сообщения от пользователя {user_id}: {message_text}")
# Обработка с помощью LLM
response = await llm_service.process_message(message_text)
# Отправка ответа пользователю (через Redis или напрямую)
# Здесь можно использовать Redis pub/sub для отправки сообщения боту
from app.core.redis_client import publish_event
publish_event("bot_messages", {
"user_id": user_id,
"message": response,
"type": "ai_response"
})
logger.info(f"Ответ отправлен пользователю {user_id}")
except Exception as e:
logger.error(f"Ошибка обработки задачи ИИ: {e}")
# app/bot/handlers/ai_handlers.py
from aiogram import Router
from aiogram.types import Message
from app.core.redis_client import enqueue_task
router = Router()
@router.message()
async def ai_message_handler(message: Message):
"""Обработчик сообщений с использованием ИИ"""
# Добавление задачи в очередь
task_data = {
"user_id": message.from_user.id,
"message_text": message.text,
"timestamp": str(datetime.now())
}
enqueue_task("ai_processing_queue", task_data)
await message.answer("Ваш запрос принят. Обрабатываю с помощью ИИ...")
# app/models/task.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum
from sqlalchemy.sql import func
from app.database.database import Base
import enum
class TaskStatus(enum.Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CANCELLED = "cancelled"
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
title = Column(String)
description = Column(String)
status = Column(Enum(TaskStatus), default=TaskStatus.PENDING)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
due_date = Column(DateTime(timezone=True))
# app/bot/handlers/task_handlers.py
from aiogram import Router
from aiogram.types import Message
from aiogram.filters import Command
from app.core.fsm import FSMContext
import re
from datetime import datetime
router = Router()
@router.message(Command(commands=["addtask"]))
async def start_add_task(message: Message, state: FSMContext):
"""Начало добавления задачи"""
await state.set_state("waiting_for_task_title")
await message.answer("Введите название задачи:")
@router.message(Command(commands=["tasks"]))
async def list_tasks(message: Message):
"""Просмотр списка задач"""
# Здесь должна быть логика получения задач из API
await message.answer("Список ваших задач:\n1. Пример задачи (в работе)")
@router.message(lambda message: not message.text.startswith("/"))
async def process_task_creation(message: Message, state: FSMContext):
"""Обработка создания задачи"""
current_state = await state.get_state()
if current_state == "waiting_for_task_title":
await state.update_data(task_title=message.text)
await state.set_state("waiting_for_task_description")
await message.answer("Введите описание задачи:")
elif current_state == "waiting_for_task_description":
await state.update_data(task_description=message.text)
await state.set_state("waiting_for_task_due_date")
await message.answer("Введите дату выполнения (в формате ДД.ММ.ГГГГ):")
elif current_state == "waiting_for_task_due_date":
try:
due_date = datetime.strptime(message.text, "%d.%m.%Y")
data = await state.get_data()
# Здесь должна быть логика сохранения задачи через API
await message.answer(f"Задача '{data['task_title']}' создана!")
await state.reset_state()
except ValueError:
await message.answer("Неверный формат даты. Пожалуйста, используйте ДД.ММ.ГГГГ:")
В асинхронных функциях используйте try/except блоки и логируйте ошибки:
async def async_function():
try:
# Асинхронная операция
result = await some_async_operation()
return result
except Exception as e:
logger.error(f"Ошибка в async_function: {e}")
# Можно отправить уведомление или повторить попытку
raise
# app/core/rate_limiter.py
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(list)
def is_allowed(self, user_id: int) -> bool:
now = time.time()
# Удаление старых запросов
self.requests[user_id] = [
req_time for req_time in self.requests[user_id]
if now - req_time < self.time_window
]
# Проверка лимита
if len(self.requests[user_id]) < self.max_requests:
self.requests[user_id].append(now)
return True
return False
# Использование
rate_limiter = RateLimiter(max_requests=10, time_window=60) # 10 запросов в минуту
async def message_handler(message: Message):
if not rate_limiter.is_allowed(message.from_user.id):
await message.answer("Вы превысили лимит запросов. Пожалуйста, подождите.")
return
# Обработка сообщения...
# app/core/i18n.py
from typing import Dict
translations: Dict[str, Dict[str, str]] = {
"ru": {
"start_message": "Привет! Я чат-бот.",
"help_message": "Доступные команды: /start, /help"
},
"en": {
"start_message": "Hello! I'm a chatbot.",
"help_message": "Available commands: /start, /help"
}
}
def get_text(key: str, language: str = "ru") -> str:
return translations.get(language, {}).get(key, key)
# Использование
await message.answer(get_text("start_message", "ru"))
Для высоконагруженных ботов рекомендуется использовать webhook вместо polling:
# app/bot/webhook.py
from aiogram import Bot, Dispatcher
from fastapi import FastAPI, Request, BackgroundTasks
import hashlib
import hmac
app = FastAPI()
WEBHOOK_SECRET = "your-webhook-secret"
WEBHOOK_PATH = "/webhook/telegram"
@app.post(WEBHOOK_PATH)
async def telegram_webhook(request: Request, background_tasks: BackgroundTasks):
# Проверка подписи (опционально, но рекомендуется)
signature = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if signature != WEBHOOK_SECRET:
return {"error": "Invalid signature"}
# Получение данных
update_data = await request.json()
# Обработка в фоне
background_tasks.add_task(process_update, update_data)
return {"ok": True}
async def process_update(update_data: dict):
"""Обработка обновления от Telegram"""
# Здесь создается Update объект и передается в диспетчер
from aiogram.types import Update
update = Update(**update_data)
await dp.feed_update(bot, update)
# Установка webhook
async def set_webhook():
webhook_url = f"https://yourdomain.com{WEBHOOK_PATH}"
await bot.set_webhook(webhook_url, secret_token=WEBHOOK_SECRET)