← Назад к курсу
Подробное пособие по проектированию сервисов API
1. Введение в API
1.1 Что такое API?
API (Application Programming Interface) - это набор правил и спецификаций, позволяющий программам взаимодействовать друг с другом.
Типы API:
- REST (Representational State Transfer)
- GraphQL
- SOAP (Simple Object Access Protocol)
- gRPC (Google Remote Procedure Call)
1.2 REST API - основные принципы
- Stateless - каждый запрос содержит всю необходимую информацию
- Кэширование - ответы должны определять возможность кэширования
- Единообразие интерфейса
- Слоистая система
- Клиент-серверная архитектура
- Код по требованию (опционально)
2. Принципы проектирования RESTful API
2.1 Ресурсы и URI
# Примеры хороших URI
/goods # Коллекция товаров
/goods/123 # Конкретный товар
/goods/123/reviews # Отзывы о товаре
/users/{id}/orders # Заказы пользователя
# Примеры плохих URI
/getGoods
/getGoodById?id=123
2.2 HTTP методы
# Соответствие HTTP методов операциям CRUD GET /goods # Получить список товаров POST /goods # Создать новый товар GET /goods/123 # Получить товар с ID 123 PUT /goods/123 # Обновить товар с ID 123 PATCH /goods/123 # Частично обновить товар DELETE /goods/123 # Удалить товар
2.3 Статус-коды HTTP
# Группы статус-кодов 2xx - Успех 200 OK 201 Created 204 No Content 3xx - Перенаправление 301 Moved Permanently 304 Not Modified 4xx - Ошибка клиента 400 Bad Request 401 Unauthorized 403 Forbidden 404 Not Found 409 Conflict 422 Unprocessable Entity 5xx - Ошибка сервера 500 Internal Server Error 503 Service Unavailable
3. Проектирование API: практический пример
3.1 Определение требований
Спроектируем API для сервиса управления задачами (Task Manager).
Требования:
- Управление пользователями
- Создание/редактирование/удаление задач
- Категории задач
- Поиск и фильтрация задач
3.2 Диаграмма ресурсов
/users
/users/{id}
/users/{id}/tasks
/tasks
/tasks/{id}
/tasks/{id}/comments
/categories
/categories/{id}
/comments
/comments/{id}
3.3 Спецификация API в формате OpenAPI
openapi: 3.0.0
info:
title: Task Manager API
version: 1.0.0
paths:
/tasks:
get:
summary: Получить список задач
parameters:
- name: status
in: query
schema:
type: string
enum: [pending, in_progress, completed]
post:
summary: Создать новую задачу
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/Task'
4. Реализация на Python
4.1 Базовая структура проекта
task_manager_api/ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── api/ │ │ ├── __init__.py │ │ ├── v1/ │ │ │ ├── __init__.py │ │ │ ├── endpoints/ │ │ │ │ ├── tasks.py │ │ │ │ ├── users.py │ │ │ │ └── categories.py │ │ │ └── dependencies.py │ ├── core/ │ │ ├── config.py │ │ └── security.py │ ├── models/ │ │ ├── task.py │ │ └── user.py │ ├── schemas/ │ │ ├── task.py │ │ └── user.py │ └── crud/ │ ├── task.py │ └── user.py ├── tests/ │ ├── test_tasks.py │ └── test_users.py ├── requirements.txt └── Dockerfile
4.2 Реализация с использованием FastAPI
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
openapi_url=f"{settings.API_V1_STR}/openapi.json"
)
# Настройка CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.BACKEND_CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# app/core/config.py
from pydantic import BaseSettings
from typing import List
class Settings(BaseSettings):
PROJECT_NAME: str = "Task Manager API"
VERSION: str = "1.0.0"
API_V1_STR: str = "/api/v1"
# Database
DATABASE_URL: str = "postgresql://user:password@localhost/taskdb"
# Security
SECRET_KEY: str = "your-secret-key-here"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS
BACKEND_CORS_ORIGINS: List[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
settings = Settings()
# app/models/task.py
from sqlalchemy import Column, Integer, String, DateTime, Text, Enum, ForeignKey
from sqlalchemy.orm import relationship
from app.db.base import Base
import enum
class TaskStatus(str, enum.Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text)
status = Column(Enum(TaskStatus), default=TaskStatus.PENDING)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
user_id = Column(Integer, ForeignKey("users.id"))
# Relationships
user = relationship("User", back_populates="tasks")
categories = relationship("Category", secondary="task_categories")
comments = relationship("Comment", back_populates="task")
# app/schemas/task.py
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
class TaskBase(BaseModel):
title: str = Field(..., max_length=200)
description: Optional[str] = None
status: TaskStatus = TaskStatus.PENDING
class TaskCreate(TaskBase):
category_ids: Optional[List[int]] = []
class TaskUpdate(BaseModel):
title: Optional[str] = Field(None, max_length=200)
description: Optional[str] = None
status: Optional[TaskStatus] = None
class TaskInDB(TaskBase):
id: int
user_id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class TaskPublic(TaskInDB):
categories: List["CategoryPublic"] = []
comments: List["CommentPublic"] = []
# app/api/v1/endpoints/tasks.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import List, Optional
from sqlalchemy.orm import Session
from app.crud import task as crud_task
from app.schemas import task as schemas_task
from app.api.v1.dependencies import get_db, get_current_user
from app.models.user import User
router = APIRouter(prefix="/tasks", tags=["tasks"])
@router.get("/", response_model=List[schemas_task.TaskPublic])
async def read_tasks(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
status: Optional[schemas_task.TaskStatus] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Получить список задач с пагинацией и фильтрацией.
"""
tasks = crud_task.get_tasks(
db,
user_id=current_user.id,
skip=skip,
limit=limit,
status=status
)
return tasks
@router.post("/",
response_model=schemas_task.TaskPublic,
status_code=status.HTTP_201_CREATED)
async def create_task(
task_in: schemas_task.TaskCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Создать новую задачу.
"""
task = crud_task.create_task(db, task_in, user_id=current_user.id)
return task
@router.get("/{task_id}", response_model=schemas_task.TaskPublic)
async def read_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Получить задачу по ID.
"""
task = crud_task.get_task(db, task_id=task_id, user_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.put("/{task_id}", response_model=schemas_task.TaskPublic)
async def update_task(
task_id: int,
task_in: schemas_task.TaskUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Обновить задачу.
"""
task = crud_task.get_task(db, task_id=task_id, user_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
task = crud_task.update_task(db, task=task, task_update=task_in)
return task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_task(
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Удалить задачу.
"""
task = crud_task.get_task(db, task_id=task_id, user_id=current_user.id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
crud_task.delete_task(db, task_id=task_id)
return None
# app/crud/task.py
from sqlalchemy.orm import Session
from typing import List, Optional
from app.models import Task
from app.schemas import task as schemas
def get_task(db: Session, task_id: int, user_id: int) -> Optional[Task]:
return db.query(Task).filter(
Task.id == task_id,
Task.user_id == user_id
).first()
def get_tasks(
db: Session,
user_id: int,
skip: int = 0,
limit: int = 100,
status: Optional[schemas.TaskStatus] = None
) -> List[Task]:
query = db.query(Task).filter(Task.user_id == user_id)
if status:
query = query.filter(Task.status == status)
return query.offset(skip).limit(limit).all()
def create_task(db: Session, task: schemas.TaskCreate, user_id: int) -> Task:
db_task = Task(
title=task.title,
description=task.description,
status=task.status,
user_id=user_id
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
def update_task(db: Session, task: Task, task_update: schemas.TaskUpdate) -> Task:
update_data = task_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
return task
def delete_task(db: Session, task_id: int) -> None:
task = db.query(Task).filter(Task.id == task_id).first()
if task:
db.delete(task)
db.commit()
4.3 Аутентификация и авторизация
# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
# app/api/v1/endpoints/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from app.core import security
from app.core.config import settings
from app.crud import user as crud_user
from app.schemas import token as schemas_token
from app.api.v1.dependencies import get_db
router = APIRouter(tags=["authentication"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
@router.post("/login", response_model=schemas_token.Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user = crud_user.authenticate_user(
db,
email=form_data.username,
password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = security.create_access_token(
data={"sub": str(user.id)},
expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer"
}
5. Паттерны проектирования API
5.1 Пагинация
# app/schemas/common.py
from pydantic import BaseModel
from typing import List, TypeVar, Generic, Optional
from pydantic.generics import GenericModel
T = TypeVar('T')
class PaginatedResponse(GenericModel, Generic[T]):
items: List[T]
total: int
page: int
size: int
pages: int
# В эндпоинте
@router.get("/paginated", response_model=PaginatedResponse[schemas_task.TaskPublic])
async def read_tasks_paginated(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
total = crud_task.count_tasks(db, user_id=current_user.id)
tasks = crud_task.get_tasks(
db,
user_id=current_user.id,
skip=(page - 1) * size,
limit=size
)
return PaginatedResponse(
items=tasks,
total=total,
page=page,
size=size,
pages=(total + size - 1) // size
)
5.2 Фильтрация и сортировка
from typing import Optional, List
from enum import Enum
class SortField(str, Enum):
CREATED_AT = "created_at"
UPDATED_AT = "updated_at"
TITLE = "title"
class SortOrder(str, Enum):
ASC = "asc"
DESC = "desc"
@router.get("/filtered")
async def get_filtered_tasks(
status: Optional[TaskStatus] = None,
created_after: Optional[datetime] = None,
created_before: Optional[datetime] = None,
sort_by: SortField = SortField.CREATED_AT,
sort_order: SortOrder = SortOrder.DESC,
db: Session = Depends(get_db)
):
query = db.query(Task)
# Фильтрация
if status:
query = query.filter(Task.status == status)
if created_after:
query = query.filter(Task.created_at >= created_after)
if created_before:
query = query.filter(Task.created_at <= created_before)
# Сортировка
order_column = getattr(Task, sort_by)
if sort_order == SortOrder.DESC:
order_column = order_column.desc()
query = query.order_by(order_column)
return query.all()
5.3 Валидация запросов
from pydantic import validator, root_validator
class TaskCreate(BaseModel):
title: str = Field(..., max_length=200)
description: Optional[str] = None
due_date: Optional[datetime] = None
@validator('title')
def title_must_not_be_empty(cls, v):
if not v.strip():
raise ValueError('Title must not be empty')
return v.strip()
@root_validator
def validate_due_date(cls, values):
due_date = values.get('due_date')
if due_date and due_date < datetime.utcnow():
raise ValueError('Due date must be in the future')
return values
6. Обработка ошибок
6.1 Кастомные исключения
# app/core/exceptions.py
from fastapi import HTTPException, status
class TaskManagerException(HTTPException):
pass
class NotFoundException(TaskManagerException):
def __init__(self, detail: str = "Resource not found"):
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=detail
)
class UnauthorizedException(TaskManagerException):
def __init__(self, detail: str = "Not authenticated"):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers={"WWW-Authenticate": "Bearer"}
)
class ForbiddenException(TaskManagerException):
def __init__(self, detail: str = "Not enough permissions"):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail=detail
)
class ValidationException(TaskManagerException):
def __init__(self, detail: str = "Validation error"):
super().__init__(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=detail
)
6.2 Глобальный обработчик ошибок
# app/main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app.core.exceptions import TaskManagerException
@app.exception_handler(TaskManagerException)
async def task_manager_exception_handler(
request: Request,
exc: TaskManagerException
):
return JSONResponse(
status_code=exc.status_code,
content={
"detail": exc.detail,
"error_code": exc.__class__.__name__
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
# Логирование ошибки
logger.error(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"detail": "Internal server error",
"error_code": "INTERNAL_SERVER_ERROR"
}
)
7. Тестирование API
7.1 Тесты с использованием pytest
# tests/test_tasks.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.db.base import Base
from app.core.config import settings
from app.api.v1.dependencies import get_db
# Тестовая база данных
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
@pytest.fixture(scope="module")
def test_user():
# Создание тестового пользователя
response = client.post(
"/api/v1/auth/register",
json={
"email": "test@example.com",
"password": "testpassword123"
}
)
return response.json()
@pytest.fixture(scope="module")
def auth_headers(test_user):
# Получение токена
response = client.post(
"/api/v1/auth/login",
data={
"username": "test@example.com",
"password": "testpassword123"
}
)
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
def test_create_task(auth_headers):
response = client.post(
"/api/v1/tasks/",
headers=auth_headers,
json={
"title": "Test Task",
"description": "Test Description",
"status": "pending"
}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Test Task"
assert data["status"] == "pending"
assert "id" in data
def test_get_tasks(auth_headers):
response = client.get(
"/api/v1/tasks/",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
def test_get_nonexistent_task(auth_headers):
response = client.get(
"/api/v1/tasks/999999",
headers=auth_headers
)
assert response.status_code == 404
7.2 Интеграционные тесты
# tests/integration/test_task_workflow.py
def test_complete_task_workflow(auth_headers):
# 1. Создание задачи
create_response = client.post(
"/api/v1/tasks/",
headers=auth_headers,
json={
"title": "Complete this task",
"description": "Test workflow"
}
)
task_id = create_response.json()["id"]
# 2. Проверка создания
get_response = client.get(
f"/api/v1/tasks/{task_id}",
headers=auth_headers
)
assert get_response.status_code == 200
assert get_response.json()["status"] == "pending"
# 3. Обновление статуса
update_response = client.put(
f"/api/v1/tasks/{task_id}",
headers=auth_headers,
json={
"status": "completed"
}
)
assert update_response.status_code == 200
assert update_response.json()["status"] == "completed"
# 4. Удаление задачи
delete_response = client.delete(
f"/api/v1/tasks/{task_id}",
headers=auth_headers
)
assert delete_response.status_code == 204
# 5. Проверка удаления
get_deleted_response = client.get(
f"/api/v1/tasks/{task_id}",
headers=auth_headers
)
assert get_deleted_response.status_code == 404
8. Документирование API
8.1 OpenAPI документация (автогенерация FastAPI)
# app/main.py с улучшенной документацией
app = FastAPI(
title="Task Manager API",
description="## API для управления задачами\n\n"
"Этот API позволяет:\n"
"- Создавать, читать, обновлять и удалять задачи\n"
"- Управлять категориями задач\n"
"- Добавлять комментарии к задачам\n"
"- Фильтровать и сортировать задачи",
version="1.0.0",
contact={
"name": "Support Team",
"email": "support@example.com",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
openapi_tags=[
{
"name": "tasks",
"description": "Операции с задачами",
},
{
"name": "users",
"description": "Управление пользователями",
},
{
"name": "auth",
"description": "Аутентификация и авторизация",
},
]
)
# Пример с подробным описанием эндпоинта
@router.post(
"/",
response_model=schemas_task.TaskPublic,
status_code=status.HTTP_201_CREATED,
summary="Создать новую задачу",
description="""
Создает новую задачу для текущего пользователя.
**Требования:**
- Пользователь должен быть аутентифицирован
- Заголовок задачи обязателен
**Возвращает:**
- Созданную задачу со всеми полями
- ID созданной задачи
- Даты создания и обновления
""",
responses={
201: {
"description": "Задача успешно создана",
"content": {
"application/json": {
"example": {
"id": 1,
"title": "Новая задача",
"description": "Описание задачи",
"status": "pending",
"created_at": "2024-01-01T12:00:00Z",
"updated_at": "2024-01-01T12:00:00Z"
}
}
}
},
400: {
"description": "Некорректные данные",
"content": {
"application/json": {
"example": {
"detail": "Title must not be empty"
}
}
}
},
401: {
"description": "Не авторизован"
}
}
)
8.2 Ручная документация с примерами
# Пример схемы с документацией
class TaskCreate(BaseModel):
"""
Модель для создания задачи
Attributes:
title: Заголовок задачи (макс. 200 символов)
description: Описание задачи (опционально)
status: Статус задачи
category_ids: Список ID категорий
"""
title: str = Field(
...,
max_length=200,
example="Купить продукты",
description="Заголовок задачи, обязательное поле"
)
description: Optional[str] = Field(
None,
example="Молоко, хлеб, яйца",
description="Подробное описание задачи"
)
status: TaskStatus = Field(
TaskStatus.PENDING,
example="pending",
description="Статус задачи: pending, in_progress, completed"
)
category_ids: Optional[List[int]] = Field(
[],
example=[1, 2],
description="Список ID категорий для задачи"
)
9. Оптимизация производительности
9.1 Кэширование
# app/core/cache.py
from redis import Redis
from functools import wraps
import pickle
import hashlib
redis_client = Redis(host='localhost', port=6379, db=0)
def cache_response(ttl: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Создание ключа кэша
key_parts = [func.__name__, str(args), str(kwargs)]
key = hashlib.md5(str(key_parts).encode()).hexdigest()
# Попытка получить из кэша
cached = redis_client.get(key)
if cached:
return pickle.loads(cached)
# Выполнение функции
result = await func(*args, **kwargs)
# Сохранение в кэш
redis_client.setex(key, ttl, pickle.dumps(result))
return result
return wrapper
return decorator
# Использование
@router.get("/cached")
@cache_response(ttl=60) # Кэшировать на 60 секунд
async def get_cached_tasks(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
# Дорогостоящая операция
return complex_query(db, current_user.id)
9.2 Пагинация с курсором
# Для больших наборов данных
@router.get("/cursor")
async def get_tasks_cursor(
cursor: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=1000),
db: Session = Depends(get_db)
):
query = db.query(Task)
if cursor:
# cursor = last_id из предыдущего запроса
query = query.filter(Task.id > int(cursor))
tasks = query.order_by(Task.id).limit(limit).all()
next_cursor = None
if tasks and len(tasks) == limit:
next_cursor = str(tasks[-1].id)
return {
"tasks": tasks,
"next_cursor": next_cursor,
"has_more": next_cursor is not None
}
10. Мониторинг и логирование
10.1 Структурированное логирование
# app/core/logging.py
import logging
import json
from pythonjsonlogger import jsonlogger
def setup_logging():
logger = logging.getLogger()
# Обработчик для JSON логов
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(levelname)s %(name)s %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
logger = setup_logging()
# Использование
@router.post("/")
async def create_task_with_logging(
task_in: schemas_task.TaskCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
logger.info("Creating task", extra={
"user_id": current_user.id,
"task_title": task_in.title,
"task_status": task_in.status
})
try:
task = crud_task.create_task(db, task_in, current_user.id)
logger.info("Task created successfully", extra={
"task_id": task.id
})
return task
except Exception as e:
logger.error("Failed to create task", extra={
"error": str(e),
"user_id": current_user.id
})
raise
10.2 Метрики и мониторинг
# app/core/metrics.py
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
# Метрики
REQUEST_COUNT = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'api_request_duration_seconds',
'API request latency',
['method', 'endpoint']
)
# Middleware для сбора метрик
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
method = request.method
endpoint = request.url.path
with REQUEST_LATENCY.labels(method=method, endpoint=endpoint).time():
response = await call_next(request)
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status=response.status_code
).inc()
return response
@router.get("/metrics")
async def get_metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
11. Безопасность API
11.1 Защита от распространенных атак
# app/main.py с middleware безопасности
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
# Принудительное использование HTTPS в production
if settings.ENVIRONMENT == "production":
app.add_middleware(HTTPSRedirectMiddleware)
# Защита от Host header атак
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["example.com", "api.example.com"]
)
# Сжатие ответов
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Rate limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@router.get("/limited")
@limiter.limit("5/minute")
async def limited_endpoint(request: Request):
return {"message": "This is rate limited"}
# Защита от SQL инъекций (используется SQLAlchemy с параметризованными запросами)
def get_user_safe(db: Session, user_id: int):
# Безопасно - параметризованный запрос
return db.query(User).filter(User.id == user_id).first()
# НЕ ДЕЛАЙТЕ ТАК (уязвимо к SQL инъекциям):
def get_user_unsafe(db: Session, user_id: int):
# Опасный код!
return db.execute(f"SELECT * FROM users WHERE id = {user_id}")
11.2 Валидация входных данных
from pydantic import validator, constr, EmailStr
import re
class UserCreate(BaseModel):
email: EmailStr
password: constr(min_length=8, max_length=128)
username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')
@validator('password')
def validate_password_strength(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'[0-9]', v):
raise ValueError('Password must contain at least one digit')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain at least one special character')
return v
12. Версионирование API
12.1 Стратегии версионирования
# Версионирование через URL (рекомендуется)
# app/api/v1/endpoints/tasks.py
# app/api/v2/endpoints/tasks.py
# Версионирование через заголовки
@router.get("/tasks", include_in_schema=False)
async def get_tasks_v1():
return {"version": "v1", "tasks": []}
@router.get("/tasks")
async def get_tasks_v2(
request: Request,
db: Session = Depends(get_db)
):
accept_header = request.headers.get("Accept", "")
if "application/vnd.api.v2+json" in accept_header:
return {"version": "v2", "tasks": [], "metadata": {}}
else:
return {"version": "v1", "tasks": []}
# Миграция с v1 на v2
# app/api/v1/endpoints/tasks.py
@router.get("/tasks", deprecated=True)
async def get_tasks_v1():
return {"message": "This endpoint is deprecated. Use /api/v2/tasks"}
# app/api/v2/endpoints/tasks.py
@router.get("/tasks")
async def get_tasks_v2():
return {"tasks": [], "pagination": {}}
Заключение
Лучшие практики проектирования API:
- Используйте правильные HTTP методы и статус-коды
- Версионируйте API с самого начала
- Используйте пагинацию для списков
- Обрабатывайте ошибки единообразно
- Документируйте API
- Тестируйте API
- Обеспечьте безопасность
- Мониторьте производительность
- Используйте кэширование где это возможно
- Поддерживайте обратную совместимость
Полезные инструменты:
- FastAPI - современный фреймворк для Python
- Swagger/OpenAPI - документация
- Postman/Insomnia - тестирование API
- Pytest - тестирование
- Docker - контейнеризация
- Kubernetes - оркестрация
- Prometheus+Grafana - мониторинг
- ELK Stack - логирование
Это пособие покрывает основные аспекты проектирования и реализации API. Помните, что хороший API - это тот, который понятен, предсказуем и удобен для использования как для разработчиков, так и для клиентов.