← Назад к курсу
Практическое пособие по работе с базами данных в Python
Оглавление
- Введение и подходы к работе
- Прямые подключения
- SQLAlchemy ORM
- Реляционные базы данных
- NoSQL базы данных
- Асинхронные подключения
- Паттерны и лучшие практики
- Пример приложения
1. Введение и подходы к работе
Три основных подхода:
- Прямые подключения (драйверы БД)
- ORM (Object-Relational Mapping)
- ODM (Object-Document Mapping)
Установка основных драйверов:
# Реляционные БД pip install pymysql psycopg2-binary # NoSQL pip install pymongo redis # ORM/ODM pip install sqlalchemy sqlmodel # Асинхронные pip install asyncpg aiomysql aioredis motor # Утилиты pip install alembic python-dotenv
2. Прямые подключения
Общий паттерн подключения:
import contextlib
from typing import Optional, Any
class DatabaseConnection:
def __init__(self, config: dict):
self.config = config
self.connection: Optional[Any] = None
@contextlib.contextmanager
def connect(self):
"""Контекстный менеджер для подключения"""
try:
self.connection = self._create_connection()
yield self.connection
except Exception as e:
self._handle_error(e)
raise
finally:
self._close_connection()
3. Реляционные базы данных
3.1 SQLite
Прямое подключение:
import sqlite3
from sqlite3 import Error
import json
class SQLiteManager:
def __init__(self, db_file: str):
self.db_file = db_file
self.conn = None
def connect(self):
"""Создание подключения к базе SQLite"""
try:
self.conn = sqlite3.connect(
self.db_file,
detect_types=sqlite3.PARSE_DECLTYPES
)
# Включение поддержки внешних ключей
self.conn.execute("PRAGMA foreign_keys = ON")
# Возврат строк как словарей
self.conn.row_factory = sqlite3.Row
return self.conn
except Error as e:
print(f"Ошибка подключения к SQLite: {e}")
return None
def create_table(self):
"""Создание таблицы"""
sql_create_users_table = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT,
is_active BOOLEAN DEFAULT 1
);
"""
try:
cursor = self.conn.cursor()
cursor.execute(sql_create_users_table)
self.conn.commit()
except Error as e:
print(f"Ошибка создания таблицы: {e}")
def insert_user(self, user_data: dict):
"""Добавление пользователя"""
sql = """
INSERT INTO users(username, email, metadata)
VALUES(?, ?, ?)
"""
try:
cursor = self.conn.cursor()
metadata_json = json.dumps(user_data.get('metadata', {}))
cursor.execute(sql, (
user_data['username'],
user_data['email'],
metadata_json
))
self.conn.commit()
return cursor.lastrowid
except Error as e:
print(f"Ошибка вставки: {e}")
return None
def get_users(self, active_only: bool = True):
"""Получение пользователей"""
sql = "SELECT * FROM users"
if active_only:
sql += " WHERE is_active = 1"
cursor = self.conn.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
# Преобразование в словари
return [dict(row) for row in rows]
def close(self):
"""Закрытие соединения"""
if self.conn:
self.conn.close()
# Использование
if __name__ == "__main__":
db = SQLiteManager("example.db")
conn = db.connect()
if conn:
db.create_table()
# Добавление пользователя
user_id = db.insert_user({
'username': 'john_doe',
'email': 'john@example.com',
'metadata': {'age': 30, 'city': 'Moscow'}
})
# Получение пользователей
users = db.get_users()
for user in users:
print(f"User: {user['username']}, Email: {user['email']}")
db.close()
3.2 MySQL
Установка драйвера:
pip install pymysql cryptography # cryptography для шифрования
Прямое подключение:
import pymysql
from pymysql import MySQLError
from pymysql.cursors import DictCursor
import ssl
from typing import Optional
class MySQLManager:
def __init__(
self,
host: str,
user: str,
password: str,
database: str,
port: int = 3306,
ssl_enabled: bool = False
):
self.config = {
'host': host,
'user': user,
'password': password,
'database': database,
'port': port,
'charset': 'utf8mb4',
'cursorclass': DictCursor,
'autocommit': False
}
if ssl_enabled:
self.config['ssl'] = {
'ca': '/path/to/ca-cert.pem',
'ssl_disabled': False
}
self.conn: Optional[pymysql.Connection] = None
self.pool = None # Для пула соединений
def connect(self):
"""Подключение к MySQL"""
try:
self.conn = pymysql.connect(**self.config)
print("Успешное подключение к MySQL")
return self.conn
except MySQLError as e:
print(f"Ошибка подключения к MySQL: {e}")
return None
def execute_transaction(self, queries: list):
"""Выполнение транзакции"""
if not self.conn:
self.connect()
try:
with self.conn.cursor() as cursor:
for query, params in queries:
cursor.execute(query, params)
self.conn.commit()
return True
except MySQLError as e:
self.conn.rollback()
print(f"Ошибка транзакции: {e}")
return False
finally:
self.conn.close()
def create_connection_pool(self, pool_size: int = 5):
"""Создание пула соединений (упрощенный вариант)"""
import queue
self.pool = queue.Queue(maxsize=pool_size)
for _ in range(pool_size):
conn = pymysql.connect(**self.config)
self.pool.put(conn)
def get_connection_from_pool(self):
"""Получение соединения из пула"""
if self.pool:
return self.pool.get()
return self.connect()
def return_connection_to_pool(self, conn):
"""Возврат соединения в пул"""
if self.pool:
self.pool.put(conn)
else:
conn.close()
def create_stored_procedure(self):
"""Создание хранимой процедуры"""
procedure_sql = """
DELIMITER //
CREATE PROCEDURE GetActiveUsers()
BEGIN
SELECT * FROM users WHERE is_active = 1;
END //
DELIMITER ;
"""
try:
with self.conn.cursor() as cursor:
cursor.execute(procedure_sql)
self.conn.commit()
except MySQLError as e:
print(f"Ошибка создания процедуры: {e}")
def call_stored_procedure(self):
"""Вызов хранимой процедуры"""
try:
with self.conn.cursor() as cursor:
cursor.callproc('GetActiveUsers')
results = cursor.fetchall()
return results
except MySQLError as e:
print(f"Ошибка вызова процедуры: {e}")
return []
# Использование
if __name__ == "__main__":
mysql = MySQLManager(
host="localhost",
user="root",
password="password",
database="test_db"
)
conn = mysql.connect()
if conn:
# Создание таблицы
create_table_sql = """
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2),
category VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_category (category),
INDEX idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
"""
with conn.cursor() as cursor:
cursor.execute(create_table_sql)
# Вставка данных с транзакцией
queries = [
("INSERT INTO products (name, price, category) VALUES (%s, %s, %s)",
("Laptop", 999.99, "Electronics")),
("INSERT INTO products (name, price, category) VALUES (%s, %s, %s)",
("Mouse", 25.50, "Electronics"))
]
mysql.execute_transaction(queries)
conn.close()
3.3 PostgreSQL
Установка драйвера:
pip install psycopg2-binary # для синхронного использования pip install asyncpg # для асинхронного использования
Прямое подключение:
import psycopg2
from psycopg2 import pool, sql
from psycopg2.extras import RealDictCursor, Json
from contextlib import contextmanager
import json
class PostgreSQLManager:
def __init__(
self,
host: str,
user: str,
password: str,
database: str,
port: int = 5432,
min_connections: int = 1,
max_connections: int = 10
):
self.config = {
'host': host,
'user': user,
'password': password,
'database': database,
'port': port
}
self.connection_pool = None
self.min_connections = min_connections
self.max_connections = max_connections
def create_pool(self):
"""Создание пула соединений"""
try:
self.connection_pool = psycopg2.pool.SimpleConnectionPool(
self.min_connections,
self.max_connections,
**self.config
)
print("Пул соединений создан")
except psycopg2.Error as e:
print(f"Ошибка создания пула: {e}")
@contextmanager
def get_connection(self):
"""Контекстный менеджер для получения соединения"""
conn = None
try:
if self.connection_pool:
conn = self.connection_pool.getconn()
else:
conn = psycopg2.connect(**self.config)
conn.autocommit = False
yield conn
conn.commit()
except psycopg2.Error as e:
if conn:
conn.rollback()
print(f"Ошибка базы данных: {e}")
raise
finally:
if conn and self.connection_pool:
self.connection_pool.putconn(conn)
elif conn:
conn.close()
def execute_query(self, query: str, params: tuple = None):
"""Выполнение запроса с возвратом результатов"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params or ())
if cursor.description: # Если есть результат
return cursor.fetchall()
return None
def create_json_table(self):
"""Создание таблицы с JSON полем"""
query = """
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
content JSONB,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
tags TEXT[],
-- Индексы для JSON полей
CONSTRAINT valid_json CHECK (content IS NULL OR jsonb_typeof(content) = 'object'),
-- GIN индексы для быстрого поиска по JSON
INDEX idx_gin_content ON documents USING GIN (content),
INDEX idx_gin_metadata ON documents USING GIN (metadata),
INDEX idx_gin_tags ON documents USING GIN (tags)
);
"""
self.execute_query(query)
def insert_json_document(self, title: str, content: dict, metadata: dict, tags: list):
"""Вставка JSON документа"""
query = """
INSERT INTO documents (title, content, metadata, tags)
VALUES (%s, %s, %s, %s)
RETURNING id, created_at;
"""
result = self.execute_query(
query,
(title, Json(content), Json(metadata), tags)
)
return result[0] if result else None
def search_json(self, key: str, value: str):
"""Поиск по JSON полю"""
query = sql.SQL("""
SELECT * FROM documents
WHERE content->>%s = %s
""")
return self.execute_query(query, (key, value))
def full_text_search(self, search_term: str):
"""Полнотекстовый поиск"""
query = """
SELECT * FROM documents
WHERE to_tsvector('russian', title || ' ' || content::text)
@@ plainto_tsquery('russian', %s)
ORDER BY ts_rank(
to_tsvector('russian', title || ' ' || content::text),
plainto_tsquery('russian', %s)
) DESC;
"""
return self.execute_query(query, (search_term, search_term))
def create_partitioned_table(self):
"""Создание секционированной таблицы"""
queries = [
"""
CREATE TABLE IF NOT EXISTS logs (
id BIGSERIAL,
log_time TIMESTAMP NOT NULL,
level VARCHAR(10),
message TEXT,
application VARCHAR(50)
) PARTITION BY RANGE (log_time);
""",
"""
CREATE TABLE logs_y2024m01 PARTITION OF logs
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
""",
"""
CREATE TABLE logs_y2024m02 PARTITION OF logs
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
"""
]
for query in queries:
self.execute_query(query)
def close_pool(self):
"""Закрытие пула соединений"""
if self.connection_pool:
self.connection_pool.closeall()
# Использование
if __name__ == "__main__":
pg = PostgreSQLManager(
host="localhost",
user="postgres",
password="password",
database="test_db"
)
pg.create_pool()
# Создание таблицы с JSON
pg.create_json_table()
# Вставка JSON документа
doc_id = pg.insert_json_document(
title="Пример документа",
content={"text": "Привет мир", "author": "Иван"},
metadata={"views": 100, "language": "ru"},
tags=["python", "postgres", "json"]
)
# Поиск по JSON
results = pg.search_json("author", "Иван")
for row in results:
print(f"Найден документ: {row['title']}")
# Закрытие пула
pg.close_pool()
4. SQLAlchemy ORM
4.1 Базовое использование
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, JSON, ForeignKey, Text, Float, UniqueConstraint, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, Session, joinedload
from sqlalchemy.sql import func, text
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
from datetime import datetime
from typing import Optional, List
import json
Base = declarative_base()
# Модели
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(100), unique=True, nullable=False)
full_name = Column(String(100))
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
preferences = Column(JSON, default=dict)
# Отношения
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
# Составной индекс
__table_args__ = (
Index('idx_user_active_created', 'is_active', 'created_at'),
)
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text)
author_id = Column(Integer, ForeignKey('users.id', ondelete="CASCADE"))
published = Column(Boolean, default=False)
tags = Column(ARRAY(String))
metadata = Column(JSONB)
views = Column(Integer, default=0)
created_at = Column(DateTime, server_default=func.now())
# Отношения
author = relationship("User", back_populates="posts")
comments = relationship("Comment", back_populates="post")
# Полнотекстовый поиск (для PostgreSQL)
__ts_vector__ = func.to_tsvector('russian', title + ' ' + content)
__table_args__ = (
Index('idx_post_author', 'author_id'),
Index('idx_post_published', 'published'),
Index('idx_post_created', 'created_at'),
Index('ix_posts_fts', __ts_vector__, postgresql_using='gin'),
)
def to_dict(self):
"""Преобразование объекта в словарь"""
return {
'id': self.id,
'title': self.title,
'author': self.author.username if self.author else None,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class Comment(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
post_id = Column(Integer, ForeignKey('posts.id'))
user_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, server_default=func.now())
# Отношения
post = relationship("Post", back_populates="comments")
user = relationship("User")
class DatabaseManager:
def __init__(self, connection_string: str, echo: bool = False):
"""Инициализация менеджера базы данных"""
self.engine = create_engine(
connection_string,
echo=echo,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Проверка соединения перед использованием
pool_recycle=3600 # Пересоздание соединений каждый час
)
self.SessionLocal = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
Base.metadata.create_all(self.engine)
def get_session(self) -> Session:
"""Получение сессии"""
return self.SessionLocal()
@contextmanager
def session_scope(self):
"""Контекстный менеджер для сессии"""
session = self.get_session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def add_user(self, username: str, email: str, **kwargs):
"""Добавление пользователя"""
with self.session_scope() as session:
user = User(username=username, email=email, **kwargs)
session.add(user)
session.flush() # Получаем ID без коммита
return user.id
def get_user_with_posts(self, user_id: int):
"""Получение пользователя с его постами (жадная загрузка)"""
with self.session_scope() as session:
user = session.query(User).options(
joinedload(User.posts)
).filter(User.id == user_id).first()
return user
def search_posts(self, search_term: str, limit: int = 10):
"""Поиск постов (полнотекстовый для PostgreSQL)"""
with self.session_scope() as session:
# Для PostgreSQL
if 'postgresql' in str(self.engine.url):
query = session.query(Post).filter(
Post.__ts_vector__.match(search_term)
).limit(limit)
else:
# Для других БД - обычный LIKE
query = session.query(Post).filter(
Post.title.ilike(f"%{search_term}%") |
Post.content.ilike(f"%{search_term}%")
).limit(limit)
return query.all()
def bulk_insert_users(self, users_data: List[dict]):
"""Массовая вставка пользователей"""
with self.session_scope() as session:
users = [User(**data) for data in users_data]
session.bulk_save_objects(users)
def execute_raw_sql(self, sql: str, params: dict = None):
"""Выполнение сырого SQL"""
with self.session_scope() as session:
result = session.execute(text(sql), params or {})
if result.returns_rows:
return [dict(row) for row in result]
return result.rowcount
def get_engine_stats(self):
"""Получение статистики пула соединений"""
return {
'checked_out': self.engine.pool.checkedout(),
'checked_in': self.engine.pool.checkedin(),
'size': self.engine.pool.size()
}
# Использование SQLAlchemy с разными БД
class SQLAlchemyFactory:
@staticmethod
def create_mysql_connection():
"""Создание подключения к MySQL"""
connection_string = "mysql+pymysql://user:password@localhost/dbname?charset=utf8mb4"
return DatabaseManager(connection_string)
@staticmethod
def create_postgresql_connection():
"""Создание подключения к PostgreSQL"""
connection_string = "postgresql+psycopg2://user:password@localhost/dbname"
return DatabaseManager(connection_string)
@staticmethod
def create_sqlite_connection():
"""Создание подключения к SQLite"""
connection_string = "sqlite:///./test.db"
return DatabaseManager(connection_string)
# Асинхронная версия SQLAlchemy
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
AsyncBase = declarative_base()
class AsyncDatabaseManager:
def __init__(self, connection_string: str, echo: bool = False):
"""Асинхронный менеджер базы данных"""
self.engine = create_async_engine(
connection_string,
echo=echo,
pool_size=5,
max_overflow=10,
pool_pre_ping=True
)
self.AsyncSessionLocal = async_sessionmaker(
bind=self.engine,
class_=AsyncSession,
expire_on_commit=False
)
async def init_db(self):
"""Инициализация базы данных"""
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_session(self) -> AsyncSession:
"""Получение асинхронной сессии"""
async with self.AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def add_user_async(self, username: str, email: str):
"""Асинхронное добавление пользователя"""
async with self.AsyncSessionLocal() as session:
user = User(username=username, email=email)
session.add(user)
await session.flush()
await session.refresh(user)
return user
# Использование
if __name__ == "__main__":
# Подключение к SQLite
db_manager = SQLAlchemyFactory.create_sqlite_connection()
# Добавление пользователя
user_id = db_manager.add_user("test_user", "test@example.com")
print(f"Добавлен пользователь с ID: {user_id}")
# Получение пользователя с постами
user = db_manager.get_user_with_posts(user_id)
if user:
print(f"Пользователь: {user.username}")
print(f"Количество постов: {len(user.posts)}")
# Выполнение сырого SQL
results = db_manager.execute_raw_sql("SELECT * FROM users")
for row in results:
print(row)
# Статистика пула
stats = db_manager.get_engine_stats()
print(f"Статистика пула: {stats}")
4.2 Миграции с Alembic
# alembic.ini конфигурация
"""
[alembic]
script_location = alembic
sqlalchemy.url = postgresql://user:password@localhost/dbname
[post_write_hooks]
hooks = black
black.type = console_scripts
black.entrypoint = black
black.options = -q
"""
# alembic/env.py
"""
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from models import Base
config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
context.configure(
url=config.get_main_option("sqlalchemy.url"),
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
"""
# Создание миграции
"""
alembic revision --autogenerate -m "Добавление таблицы комментариев"
alembic upgrade head
alembic downgrade -1
"""
5. NoSQL базы данных
5.1 MongoDB
from pymongo import MongoClient, ASCENDING, DESCENDING, TEXT, ReturnDocument
from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure
from pymongo.collection import Collection
from pymongo.database import Database
from typing import List, Dict, Any, Optional
from datetime import datetime
import gridfs
from bson import ObjectId, json_util
import json
from motor.motor_asyncio import AsyncIOMotorClient
class MongoDBManager:
def __init__(
self,
host: str = "localhost",
port: int = 27017,
username: str = None,
password: str = None,
database: str = "test_db",
auth_source: str = "admin",
replica_set: str = None,
**kwargs
):
"""Инициализация MongoDB менеджера"""
self.connection_string = self._build_connection_string(
host, port, username, password, auth_source, replica_set
)
self.database_name = database
self.client: Optional[MongoClient] = None
self.db: Optional[Database] = None
self.fs = None
# Параметры подключения
self.connection_params = {
'maxPoolSize': 100,
'minPoolSize': 10,
'maxIdleTimeMS': 30000,
'socketTimeoutMS': 30000,
'connectTimeoutMS': 10000,
'serverSelectionTimeoutMS': 5000,
**kwargs
}
def _build_connection_string(
self,
host: str,
port: int,
username: str,
password: str,
auth_source: str,
replica_set: str
) -> str:
"""Построение строки подключения"""
if username and password:
auth = f"{username}:{password}@"
else:
auth = ""
if replica_set:
return f"mongodb://{auth}{host}:{port}/?replicaSet={replica_set}&authSource={auth_source}"
else:
return f"mongodb://{auth}{host}:{port}/?authSource={auth_source}"
def connect(self):
"""Подключение к MongoDB"""
try:
self.client = MongoClient(
self.connection_string,
**self.connection_params
)
# Проверка подключения
self.client.admin.command('ping')
self.db = self.client[self.database_name]
self.fs = gridfs.GridFS(self.db)
print("Успешное подключение к MongoDB")
return True
except ConnectionFailure as e:
print(f"Ошибка подключения к MongoDB: {e}")
return False
def create_collection(self, collection_name: str, **kwargs):
"""Создание коллекции с валидацией"""
if collection_name not in self.db.list_collection_names():
self.db.create_collection(collection_name, **kwargs)
print(f"Коллекция {collection_name} создана")
return self.db[collection_name]
def create_user_collection(self):
"""Создание коллекции пользователей с валидацией"""
validator = {
'$jsonSchema': {
'bsonType': 'object',
'required': ['username', 'email', 'created_at'],
'properties': {
'username': {
'bsonType': 'string',
'description': 'Имя пользователя должно быть строкой'
},
'email': {
'bsonType': 'string',
'pattern': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$',
'description': 'Email должен быть валидным'
},
'age': {
'bsonType': 'int',
'minimum': 0,
'maximum': 150,
'description': 'Возраст должен быть числом от 0 до 150'
},
'created_at': {
'bsonType': 'date',
'description': 'Дата создания должна быть датой'
}
}
}
}
return self.create_collection(
'users',
validator=validator,
validationLevel='strict',
validationAction='error'
)
def create_indexes(self, collection: Collection):
"""Создание индексов"""
# Составной индекс
collection.create_index([('username', ASCENDING), ('email', ASCENDING)], unique=True)
# TTL индекс для автоматического удаления устаревших документов
collection.create_index([('created_at', ASCENDING)], expireAfterSeconds=3600)
# Текстовый индекс для поиска
collection.create_index([('bio', TEXT)], default_language='russian')
# Геопространственный индекс
collection.create_index([('location', '2dsphere')])
def insert_user(self, user_data: Dict[str, Any]):
"""Вставка пользователя"""
collection = self.db['users']
# Добавление метаданных
user_data['created_at'] = datetime.utcnow()
user_data['updated_at'] = datetime.utcnow()
try:
result = collection.insert_one(user_data)
print(f"Добавлен пользователь с ID: {result.inserted_id}")
return result.inserted_id
except DuplicateKeyError as e:
print(f"Дубликат ключа: {e}")
return None
def find_users(self, query: Dict = None, projection: Dict = None, limit: int = 10):
"""Поиск пользователей"""
collection = self.db['users']
query = query or {}
projection = projection or {}
cursor = collection.find(query, projection).limit(limit)
return list(cursor)
def aggregate_users(self, pipeline: List[Dict]):
"""Агрегация данных пользователей"""
collection = self.db['users']
return list(collection.aggregate(pipeline))
def update_user(self, user_id: str, update_data: Dict, upsert: bool = False):
"""Обновление пользователя"""
collection = self.db['users']
# Добавление поля updated_at
update_data['$set']['updated_at'] = datetime.utcnow()
result = collection.update_one(
{'_id': ObjectId(user_id)},
update_data,
upsert=upsert
)
if result.modified_count > 0:
print(f"Обновлено документов: {result.modified_count}")
return result
def transaction_example(self):
"""Пример транзакции (требует replica set)"""
with self.client.start_session() as session:
try:
with session.start_transaction():
# Операция 1
self.db.accounts.update_one(
{'_id': 'account1'},
{'$inc': {'balance': -100}},
session=session
)
# Операция 2
self.db.accounts.update_one(
{'_id': 'account2'},
{'$inc': {'balance': 100}},
session=session
)
session.commit_transaction()
print("Транзакция завершена успешно")
except Exception as e:
session.abort_transaction()
print(f"Ошибка транзакции: {e}")
raise
def store_file(self, filename: str, data: bytes, metadata: Dict = None):
"""Хранение файла в GridFS"""
file_id = self.fs.put(
data,
filename=filename,
metadata=metadata or {}
)
return file_id
def get_file(self, file_id):
"""Получение файла из GridFS"""
return self.fs.get(file_id)
def change_stream_example(self):
"""Пример Change Streams (реактивное программирование)"""
collection = self.db['users']
with collection.watch(
[
{'$match': {'operationType': 'insert'}},
{'$match': {'operationType': 'update'}}
]
) as stream:
for change in stream:
print(f"Изменение: {change}")
# Реакция на изменение в реальном времени
def close(self):
"""Закрытие подключения"""
if self.client:
self.client.close()
print("Подключение к MongoDB закрыто")
# Асинхронная версия MongoDB
class AsyncMongoDBManager:
def __init__(self, connection_string: str, database: str = "test_db"):
self.client = AsyncIOMotorClient(connection_string)
self.db = self.client[database]
async def find_users_async(self, query: Dict, limit: int = 10):
"""Асинхронный поиск пользователей"""
cursor = self.db.users.find(query).limit(limit)
return await cursor.to_list(length=limit)
async def insert_user_async(self, user_data: Dict):
"""Асинхронная вставка пользователя"""
result = await self.db.users.insert_one(user_data)
return result.inserted_id
# Использование
if __name__ == "__main__":
# Синхронное подключение
mongo = MongoDBManager(
host="localhost",
port=27017,
username="admin",
password="password",
database="myapp"
)
if mongo.connect():
# Создание коллекции с валидацией
users_collection = mongo.create_user_collection()
# Создание индексов
mongo.create_indexes(users_collection)
# Вставка пользователя
user_id = mongo.insert_user({
'username': 'john_doe',
'email': 'john@example.com',
'age': 30,
'bio': 'Python разработчик из Москвы',
'location': {
'type': 'Point',
'coordinates': [37.6173, 55.7558] # Москва
}
})
# Поиск пользователей
users = mongo.find_users({'age': {'$gte': 25}}, limit=5)
for user in users:
print(f"Найден: {user['username']}")
# Агрегация
pipeline = [
{'$match': {'age': {'$gte': 25}}},
{'$group': {
'_id': None,
'average_age': {'$avg': '$age'},
'count': {'$sum': 1}
}}
]
result = mongo.aggregate_users(pipeline)
print(f"Средний возраст: {result}")
# Хранение файла
with open('document.pdf', 'rb') as f:
file_id = mongo.store_file('document.pdf', f.read())
print(f"Файл сохранен с ID: {file_id}")
mongo.close()
5.2 Redis
import redis
from redis import Redis, ConnectionPool
from redis.exceptions import ConnectionError, RedisError
import json
import pickle
from typing import Any, Optional, Union, List, Dict
import asyncio
import aioredis
class RedisManager:
def __init__(
self,
host: str = 'localhost',
port: int = 6379,
db: int = 0,
password: str = None,
decode_responses: bool = True,
max_connections: int = 10,
**kwargs
):
"""Инициализация Redis менеджера"""
self.config = {
'host': host,
'port': port,
'db': db,
'password': password,
'decode_responses': decode_responses,
'socket_timeout': 5,
'socket_connect_timeout': 5,
'retry_on_timeout': True,
'health_check_interval': 30,
**kwargs
}
self.pool = ConnectionPool(max_connections=max_connections, **self.config)
self.client: Optional[Redis] = None
def connect(self) -> bool:
"""Подключение к Redis"""
try:
self.client = redis.Redis(connection_pool=self.pool)
# Проверка подключения
self.client.ping()
print("Успешное подключение к Redis")
return True
except ConnectionError as e:
print(f"Ошибка подключения к Redis: {e}")
return False
def set_value(self, key: str, value: Any, expire: int = None) -> bool:
"""Установка значения"""
try:
# Сериализация значения
if isinstance(value, (dict, list)):
serialized_value = json.dumps(value)
elif isinstance(value, (int, float, str)):
serialized_value = str(value)
else:
serialized_value = pickle.dumps(value)
if expire:
return bool(self.client.setex(key, expire, serialized_value))
else:
return bool(self.client.set(key, serialized_value))
except RedisError as e:
print(f"Ошибка записи в Redis: {e}")
return False
def get_value(self, key: str, default: Any = None) -> Any:
"""Получение значения"""
try:
value = self.client.get(key)
if value is None:
return default
# Попытка десериализации
try:
return json.loads(value)
except json.JSONDecodeError:
try:
return pickle.loads(value)
except:
return value.decode() if isinstance(value, bytes) else value
except RedisError as e:
print(f"Ошибка чтения из Redis: {e}")
return default
def delete_key(self, key: str) -> bool:
"""Удаление ключа"""
try:
return bool(self.client.delete(key))
except RedisError as e:
print(f"Ошибка удаления ключа: {e}")
return False
def set_hash(self, name: str, mapping: Dict[str, Any]) -> bool:
"""Установка хэша"""
try:
# Сериализация значений
serialized_mapping = {
k: json.dumps(v) if isinstance(v, (dict, list)) else str(v)
for k, v in mapping.items()
}
return bool(self.client.hset(name, mapping=serialized_mapping))
except RedisError as e:
print(f"Ошибка записи хэша: {e}")
return False
def get_hash(self, name: str) -> Dict[str, Any]:
"""Получение хэша"""
try:
data = self.client.hgetall(name)
result = {}
for k, v in data.items():
try:
result[k] = json.loads(v)
except json.JSONDecodeError:
result[k] = v
return result
except RedisError as e:
print(f"Ошибка чтения хэша: {e}")
return {}
def push_to_list(self, key: str, value: Any, side: str = 'right', max_length: int = None):
"""Добавление в список"""
try:
serialized_value = json.dumps(value) if isinstance(value, (dict, list)) else str(value)
if side == 'right':
self.client.rpush(key, serialized_value)
else:
self.client.lpush(key, serialized_value)
# Ограничение длины списка
if max_length:
self.client.ltrim(key, 0, max_length - 1)
except RedisError as e:
print(f"Ошибка добавления в список: {e}")
def get_list(self, key: str, start: int = 0, end: int = -1) -> List[Any]:
"""Получение списка"""
try:
values = self.client.lrange(key, start, end)
result = []
for v in values:
try:
result.append(json.loads(v))
except json.JSONDecodeError:
result.append(v.decode() if isinstance(v, bytes) else v)
return result
except RedisError as e:
print(f"Ошибка чтения списка: {e}")
return []
def add_to_set(self, key: str, *values: Any) -> int:
"""Добавление в множество"""
try:
serialized_values = [
json.dumps(v) if isinstance(v, (dict, list)) else str(v)
for v in values
]
return self.client.sadd(key, *serialized_values)
except RedisError as e:
print(f"Ошибка добавления в множество: {e}")
return 0
def publish_message(self, channel: str, message: Any):
"""Публикация сообщения в канал"""
try:
serialized_message = json.dumps(message) if isinstance(message, (dict, list)) else str(message)
self.client.publish(channel, serialized_message)
except RedisError as e:
print(f"Ошибка публикации: {e}")
def subscribe_to_channel(self, channel: str):
"""Подписка на канал"""
pubsub = self.client.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
data = message['data']
try:
yield json.loads(data)
except json.JSONDecodeError:
yield data.decode() if isinstance(data, bytes) else data
def acquire_lock(self, lock_name: str, timeout: int = 10) -> bool:
"""Приобретение распределенной блокировки"""
import uuid
identifier = str(uuid.uuid4())
end = time.time() + timeout
while time.time() < end:
if self.client.set(lock_name, identifier, nx=True, ex=timeout):
return identifier
time.sleep(0.001)
return False
def release_lock(self, lock_name: str, identifier: str) -> bool:
"""Освобождение распределенной блокировки"""
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
try:
result = self.client.eval(script, 1, lock_name, identifier)
return bool(result)
except RedisError:
return False
def pipeline_example(self):
"""Использование pipeline для пакетных операций"""
try:
pipe = self.client.pipeline()
# Множественные операции
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
# Выполнение всех команд за один round-trip
results = pipe.execute()
print(f"Результаты pipeline: {results}")
except RedisError as e:
print(f"Ошибка pipeline: {e}")
def transaction_example(self):
"""Использование транзакций"""
try:
with self.client.pipeline() as pipe:
while True:
try:
# Начало транзакции
pipe.watch('balance')
current_balance = int(pipe.get('balance') or 0)
# Множественные команды
pipe.multi()
pipe.set('balance', current_balance - 100)
pipe.set('transaction_log', 'withdrawal_100')
# Выполнение
pipe.execute()
break
except redis.WatchError:
continue
except RedisError as e:
print(f"Ошибка транзакции: {e}")
def get_info(self) -> Dict:
"""Получение информации о Redis сервере"""
try:
return self.client.info()
except RedisError as e:
print(f"Ошибка получения информации: {e}")
return {}
def scan_keys(self, pattern: str = "*", count: int = 100) -> List[str]:
"""Сканирование ключей (вместо keys)"""
try:
keys = []
cursor = 0
while True:
cursor, found_keys = self.client.scan(
cursor=cursor,
match=pattern,
count=count
)
keys.extend(found_keys)
if cursor == 0:
break
return keys
except RedisError as e:
print(f"Ошибка сканирования ключей: {e}")
return []
def close(self):
"""Закрытие подключения"""
if self.client:
self.client.close()
self.pool.disconnect()
print("Подключение к Redis закрыто")
# Асинхронная версия Redis
class AsyncRedisManager:
def __init__(self, host: str = 'localhost', port: int = 6379):
self.redis = None
self.host = host
self.port = port
async def connect(self):
"""Асинхронное подключение"""
self.redis = await aioredis.create_redis_pool(
f'redis://{self.host}:{self.port}',
minsize=5,
maxsize=10
)
return self.redis
async def set_value_async(self, key: str, value: str, expire: int = None):
"""Асинхронная установка значения"""
if expire:
await self.redis.setex(key, expire, value)
else:
await self.redis.set(key, value)
async def get_value_async(self, key: str) -> Optional[str]:
"""Асинхронное получение значения"""
return await self.redis.get(key)
async def close(self):
"""Закрытие асинхронного подключения"""
if self.redis:
self.redis.close()
await self.redis.wait_closed()
# Кэширование с Redis
class RedisCache:
def __init__(self, redis_manager: RedisManager, default_ttl: int = 3600):
self.redis = redis_manager
self.default_ttl = default_ttl
def cache_decorator(self, ttl: int = None):
"""Декоратор для кэширования результатов функций"""
def decorator(func):
def wrapper(*args, **kwargs):
# Создание ключа кэша
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# Попытка получить из кэша
cached_result = self.redis.get_value(cache_key)
if cached_result is not None:
print(f"Кэш попадание для {cache_key}")
return cached_result
# Выполнение функции
result = func(*args, **kwargs)
# Сохранение в кэш
self.redis.set_value(
cache_key,
result,
expire=ttl or self.default_ttl
)
print(f"Кэш промах для {cache_key}, результат сохранен")
return result
return wrapper
return decorator
# Использование
if __name__ == "__main__":
# Синхронное использование
redis_manager = RedisManager(
host='localhost',
port=6379,
db=0,
password=None,
decode_responses=True
)
if redis_manager.connect():
# Простые операции
redis_manager.set_value('user:1:name', 'John Doe', expire=60)
name = redis_manager.get_value('user:1:name')
print(f"Имя пользователя: {name}")
# Хэш
redis_manager.set_hash('user:1', {
'name': 'John',
'age': 30,
'email': 'john@example.com'
})
user_data = redis_manager.get_hash('user:1')
print(f"Данные пользователя: {user_data}")
# Список
redis_manager.push_to_list('recent_users', 'user1', max_length=10)
redis_manager.push_to_list('recent_users', 'user2', max_length=10)
recent_users = redis_manager.get_list('recent_users')
print(f"Последние пользователи: {recent_users}")
# Pub/Sub
# В отдельном потоке можно запустить подписку
# redis_manager.subscribe_to_channel('notifications')
# Pipeline
redis_manager.pipeline_example()
# Сканирование ключей
keys = redis_manager.scan_keys(pattern='user:*')
print(f"Найдено ключей: {len(keys)}")
redis_manager.close()
# Асинхронное использование
async def async_example():
async_redis = AsyncRedisManager()
await async_redis.connect()
await async_redis.set_value_async('async_key', 'async_value', expire=10)
value = await async_redis.get_value_async('async_key')
print(f"Асинхронное значение: {value}")
await async_redis.close()
asyncio.run(async_example())
5.3 Другие популярные NoSQL БД
Cassandra:
"""
pip install cassandra-driver
"""
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory, BatchStatement, ConsistencyLevel
from cassandra.policies import DCAwareRoundRobinPolicy
class CassandraManager:
def __init__(self, hosts: list, keyspace: str, username: str = None, password: str = None):
auth_provider = None
if username and password:
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(
hosts,
auth_provider=auth_provider,
load_balancing_policy=DCAwareRoundRobinPolicy(),
protocol_version=4
)
self.session = self.cluster.connect(keyspace)
self.session.row_factory = dict_factory
def execute_query(self, query: str, params: tuple = None):
return self.session.execute(query, params)
def batch_insert(self, queries: list):
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
for query, params in queries:
batch.add(query, params)
self.session.execute(batch)
def close(self):
self.cluster.shutdown()
Elasticsearch:
"""
pip install elasticsearch
"""
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ElasticsearchManager:
def __init__(self, hosts: list, **kwargs):
self.client = Elasticsearch(
hosts,
http_auth=('user', 'password'),
**kwargs
)
def index_document(self, index: str, doc: dict, doc_id: str = None):
return self.client.index(
index=index,
id=doc_id,
document=doc,
refresh=True
)
def search(self, index: str, query: dict):
return self.client.search(index=index, body=query)
def bulk_index(self, actions: list):
return bulk(self.client, actions)
6. Асинхронные подключения
6.1 Асинхронный паттерн для разных БД
import asyncio
from typing import Dict, Any
from dataclasses import dataclass
@dataclass
class DatabaseConfig:
"""Конфигурация базы данных"""
db_type: str
host: str
port: int
database: str
username: str = None
password: str = None
pool_size: int = 5
ssl: bool = False
class AsyncDatabaseManager:
"""Универсальный асинхронный менеджер баз данных"""
_drivers = {
'postgresql': 'asyncpg',
'mysql': 'aiomysql',
'mongodb': 'motor',
'redis': 'aioredis',
'sqlite': 'aiosqlite'
}
def __init__(self, config: DatabaseConfig):
self.config = config
self.connection = None
self.pool = None
async def connect(self):
"""Подключение к базе данных"""
db_type = self.config.db_type
if db_type == 'postgresql':
await self._connect_postgresql()
elif db_type == 'mysql':
await self._connect_mysql()
elif db_type == 'mongodb':
await self._connect_mongodb()
elif db_type == 'redis':
await self._connect_redis()
elif db_type == 'sqlite':
await self._connect_sqlite()
else:
raise ValueError(f"Неизвестный тип БД: {db_type}")
async def _connect_postgresql(self):
"""Подключение к PostgreSQL"""
import asyncpg
self.pool = await asyncpg.create_pool(
host=self.config.host,
port=self.config.port,
user=self.config.username,
password=self.config.password,
database=self.config.database,
min_size=1,
max_size=self.config.pool_size,
ssl='require' if self.config.ssl else None
)
# Регистрация типов
await self._register_postgresql_types()
async def _register_postgresql_types(self):
"""Регистрация пользовательских типов PostgreSQL"""
import asyncpg
async def encode_jsonb(value):
return json.dumps(value)
async def decode_jsonb(value):
return json.loads(value)
await self.pool.set_type_codec(
'jsonb',
schema='pg_catalog',
encoder=encode_jsonb,
decoder=decode_jsonb,
format='text'
)
async def _connect_mysql(self):
"""Подключение к MySQL"""
import aiomysql
self.pool = await aiomysql.create_pool(
host=self.config.host,
port=self.config.port,
user=self.config.username,
password=self.config.password,
db=self.config.database,
minsize=1,
maxsize=self.config.pool_size,
autocommit=False
)
async def _connect_mongodb(self):
"""Подключение к MongoDB"""
from motor.motor_asyncio import AsyncIOMotorClient
if self.config.username and self.config.password:
uri = f"mongodb://{self.config.username}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}"
else:
uri = f"mongodb://{self.config.host}:{self.config.port}/{self.config.database}"
self.client = AsyncIOMotorClient(uri)
self.connection = self.client[self.config.database]
async def execute_query(self, query: str, params: tuple = None):
"""Выполнение SQL запроса"""
if self.config.db_type in ['postgresql', 'mysql']:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, params or ())
if cur.description: # SELECT запрос
result = await cur.fetchall()
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in result]
return cur.rowcount
else:
raise NotImplementedError("Этот метод только для SQL БД")
async def execute_transaction(self, queries: list):
"""Выполнение транзакции"""
if self.config.db_type in ['postgresql', 'mysql']:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await conn.begin()
for query, params in queries:
await cur.execute(query, params or ())
await conn.commit()
return True
except Exception as e:
await conn.rollback()
raise e
async def close(self):
"""Закрытие соединений"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
elif self.connection:
self.connection.close()
7. Паттерны и лучшие практики
7.1 Repository Pattern
from abc import ABC, abstractmethod
from typing import List, Optional, TypeVar, Generic
from contextlib import contextmanager
T = TypeVar('T')
class Repository(ABC, Generic[T]):
"""Абстрактный репозиторий"""
@abstractmethod
async def get(self, id: int) -> Optional[T]:
pass
@abstractmethod
async def get_all(self, skip: int = 0, limit: int = 100) -> List[T]:
pass
@abstractmethod
async def create(self, entity: T) -> T:
pass
@abstractmethod
async def update(self, id: int, entity: T) -> Optional[T]:
pass
@abstractmethod
async def delete(self, id: int) -> bool:
pass
class UserRepository(Repository):
"""Репозиторий пользователей"""
def __init__(self, db_manager):
self.db = db_manager
async def get_by_email(self, email: str):
"""Получение пользователя по email"""
query = "SELECT * FROM users WHERE email = $1"
return await self.db.execute_query(query, (email,))
async def search(self, search_term: str):
"""Поиск пользователей"""
query = """
SELECT * FROM users
WHERE username ILIKE $1
OR email ILIKE $1
LIMIT 50
"""
return await self.db.execute_query(query, (f"%{search_term}%",))
7.2 Unit of Work Pattern
from abc import ABC, abstractmethod
class UnitOfWork(ABC):
"""Паттерн Unit of Work"""
async def __aenter__(self):
await self.begin()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type:
await self.rollback()
else:
await self.commit()
@abstractmethod
async def begin(self):
pass
@abstractmethod
async def commit(self):
pass
@abstractmethod
async def rollback(self):
pass
@property
@abstractmethod
def users(self):
pass
@property
@abstractmethod
def products(self):
pass
class SQLAlchemyUnitOfWork(UnitOfWork):
"""Реализация Unit of Work для SQLAlchemy"""
def __init__(self, session_factory):
self.session_factory = session_factory
async def begin(self):
self.session = self.session_factory()
async def commit(self):
await self.session.commit()
async def rollback(self):
await self.session.rollback()
@property
def users(self):
return UserRepository(self.session)
@property
def products(self):
return ProductRepository(self.session)
7.3 Connection Pooling
import queue
import threading
import contextlib
from typing import Callable
class ConnectionPool:
"""Пул соединений"""
def __init__(
self,
create_connection: Callable,
max_size: int = 10,
timeout: int = 5
):
self.create_connection = create_connection
self.max_size = max_size
self.timeout = timeout
self._pool = queue.Queue(maxsize=max_size)
self._lock = threading.Lock()
self._current_size = 0
# Предварительное создание соединений
for _ in range(min(3, max_size)):
self._create_connection()
def _create_connection(self):
"""Создание нового соединения"""
with self._lock:
if self._current_size < self.max_size:
conn = self.create_connection()
self._pool.put(conn)
self._current_size += 1
return True
return False
@contextlib.contextmanager
def get_connection(self):
"""Получение соединения из пула"""
conn = None
try:
# Попытка получить соединение из пула
try:
conn = self._pool.get(timeout=self.timeout)
except queue.Empty:
# Попытка создать новое соединение
if not self._create_connection():
raise TimeoutError("Достигнут лимит соединений")
conn = self._pool.get(timeout=self.timeout)
yield conn
finally:
if conn:
self._pool.put(conn)
def close_all(self):
"""Закрытие всех соединений"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
conn.close()
except queue.Empty:
break
7.4 Health Check и мониторинг
import time
from typing import Dict, Any
from dataclasses import dataclass
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
DEGRADED = "degraded"
@dataclass
class HealthCheckResult:
status: HealthStatus
message: str = ""
response_time: float = 0.0
details: Dict[str, Any] = None
class DatabaseHealthChecker:
"""Проверка здоровья базы данных"""
def __init__(self, db_manager, check_interval: int = 30):
self.db_manager = db_manager
self.check_interval = check_interval
self.last_check = 0
self.last_result = None
async def check_health(self) -> HealthCheckResult:
"""Проверка здоровья"""
start_time = time.time()
try:
# Простой запрос для проверки
if hasattr(self.db_manager, 'execute_query'):
await self.db_manager.execute_query("SELECT 1")
status = HealthStatus.HEALTHY
message = "База данных доступна"
else:
# Для NoSQL БД
await self.db_manager.client.admin.command('ping')
status = HealthStatus.HEALTHY
message = "База данных доступна"
except Exception as e:
status = HealthStatus.UNHEALTHY
message = f"Ошибка подключения: {str(e)}"
response_time = time.time() - start_time
result = HealthCheckResult(
status=status,
message=message,
response_time=response_time,
details={
'database': self.db_manager.__class__.__name__,
'timestamp': time.time()
}
)
self.last_check = time.time()
self.last_result = result
return result
def should_check(self) -> bool:
"""Нужно ли выполнять проверку"""
return (time.time() - self.last_check) > self.check_interval
8. Пример приложения
8.1 Конфигурация
# config.py
import os
from typing import Dict, Any
from dataclasses import dataclass
from enum import Enum
class DatabaseType(Enum):
POSTGRESQL = "postgresql"
MYSQL = "mysql"
SQLITE = "sqlite"
MONGODB = "mongodb"
REDIS = "redis"
@dataclass
class DatabaseConfig:
"""Конфигурация базы данных"""
type: DatabaseType
host: str
port: int
database: str
username: str = None
password: str = None
pool_size: int = 5
ssl: bool = False
@classmethod
def from_env(cls, prefix: str = "DB_"):
"""Загрузка конфигурации из переменных окружения"""
return cls(
type=DatabaseType(os.getenv(f"{prefix}TYPE", "sqlite")),
host=os.getenv(f"{prefix}HOST", "localhost"),
port=int(os.getenv(f"{prefix}PORT", "5432")),
database=os.getenv(f"{prefix}NAME", "app_db"),
username=os.getenv(f"{prefix}USER"),
password=os.getenv(f"{prefix}PASSWORD"),
pool_size=int(os.getenv(f"{prefix}POOL_SIZE", "5")),
ssl=os.getenv(f"{prefix}SSL", "false").lower() == "true"
)
def get_connection_string(self) -> str:
"""Получение строки подключения"""
if self.type == DatabaseType.POSTGRESQL:
auth = f"{self.username}:{self.password}@" if self.username else ""
ssl = "?sslmode=require" if self.ssl else ""
return f"postgresql://{auth}{self.host}:{self.port}/{self.database}{ssl}"
elif self.type == DatabaseType.MYSQL:
auth = f"{self.username}:{self.password}@" if self.username else ""
return f"mysql://{auth}{self.host}:{self.port}/{self.database}"
elif self.type == DatabaseType.SQLITE:
return f"sqlite:///{self.database}.db"
else:
raise ValueError(f"Неизвестный тип БД: {self.type}")
class AppConfig:
"""Конфигурация приложения"""
def __init__(self):
self.database = DatabaseConfig.from_env()
self.redis = RedisConfig.from_env()
self.debug = os.getenv("DEBUG", "false").lower() == "true"
self.secret_key = os.getenv("SECRET_KEY", "secret")
@property
def is_production(self) -> bool:
return os.getenv("ENVIRONMENT", "development") == "production"
8.2 Фабрика БД
# factories.py
from typing import Union
from config import DatabaseConfig, DatabaseType
from managers import (
PostgreSQLManager,
MySQLManager,
SQLiteManager,
MongoDBManager,
RedisManager,
AsyncDatabaseManager
)
class DatabaseFactory:
"""Фабрика для создания менеджеров БД"""
@staticmethod
def create_manager(config: DatabaseConfig, async_mode: bool = False):
"""Создание менеджера базы данных"""
if async_mode:
return AsyncDatabaseManager(config)
if config.type == DatabaseType.POSTGRESQL:
return PostgreSQLManager(
host=config.host,
port=config.port,
user=config.username,
password=config.password,
database=config.database,
pool_size=config.pool_size,
ssl=config.ssl
)
elif config.type == DatabaseType.MYSQL:
return MySQLManager(
host=config.host,
port=config.port,
user=config.username,
password=config.password,
database=config.database
)
elif config.type == DatabaseType.SQLITE:
return SQLiteManager(config.database)
elif config.type == DatabaseType.MONGODB:
return MongoDBManager(
host=config.host,
port=config.port,
username=config.username,
password=config.password,
database=config.database
)
elif config.type == DatabaseType.REDIS:
return RedisManager(
host=config.host,
port=config.port,
db=0,
password=config.password
)
else:
raise ValueError(f"Неизвестный тип БД: {config.type}")
8.3 Основное приложение
# app.py
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict, Any
from config import AppConfig
from factories import DatabaseFactory
from health import DatabaseHealthChecker
from repositories import UserRepository, ProductRepository
from models import User, Product
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Application:
"""Основной класс приложения"""
def __init__(self):
self.config = AppConfig()
self.db_manager = None
self.redis_manager = None
self.health_checker = None
self.repositories = {}
async def startup(self):
"""Запуск приложения"""
logger.info("Запуск приложения...")
# Инициализация основной БД
self.db_manager = DatabaseFactory.create_manager(
self.config.database,
async_mode=True
)
await self.db_manager.connect()
# Инициализация Redis для кэширования
if self.config.redis:
self.redis_manager = DatabaseFactory.create_manager(
self.config.redis,
async_mode=False
)
self.redis_manager.connect()
# Инициализация репозиториев
self.repositories['users'] = UserRepository(self.db_manager)
self.repositories['products'] = ProductRepository(self.db_manager)
# Инициализация health checker
self.health_checker = DatabaseHealthChecker(
self.db_manager,
check_interval=30
)
# Проверка здоровья БД
health = await self.health_checker.check_health()
logger.info(f"Состояние БД: {health.status.value} - {health.message}")
logger.info("Приложение запущено")
async def shutdown(self):
"""Завершение работы приложения"""
logger.info("Завершение работы приложения...")
if self.db_manager:
await self.db_manager.close()
if self.redis_manager:
self.redis_manager.close()
logger.info("Приложение завершило работу")
@asynccontextmanager
async def lifespan(self):
"""Контекстный менеджер жизненного цикла"""
await self.startup()
try:
yield self
finally:
await self.shutdown()
async def create_user(self, user_data: Dict[str, Any]) -> User:
"""Создание пользователя"""
# Проверка существования пользователя
existing = await self.repositories['users'].get_by_email(user_data['email'])
if existing:
raise ValueError("Пользователь с таким email уже существует")
# Создание пользователя
user = await self.repositories['users'].create(user_data)
# Кэширование в Redis
if self.redis_manager:
await self.redis_manager.set_value(
f"user:{user.id}",
user.to_dict(),
expire=3600
)
logger.info(f"Создан пользователь: {user.username}")
return user
async def get_user(self, user_id: int) -> User:
"""Получение пользователя"""
# Попытка получить из кэша
if self.redis_manager:
cached = await self.redis_manager.get_value(f"user:{user_id}")
if cached:
logger.info(f"Пользователь {user_id} получен из кэша")
return User(**cached)
# Получение из БД
user = await self.repositories['users'].get(user_id)
# Сохранение в кэш
if user and self.redis_manager:
await self.redis_manager.set_value(
f"user:{user.id}",
user.to_dict(),
expire=3600
)
return user
async def run_background_tasks(self):
"""Фоновые задачи"""
while True:
try:
# Проверка здоровья
if self.health_checker.should_check():
health = await self.health_checker.check_health()
if health.status != HealthStatus.HEALTHY:
logger.warning(f"Проблема с БД: {health.message}")
# Очистка устаревших данных
await self.cleanup_old_data()
await asyncio.sleep(60) # Пауза 60 секунд
except Exception as e:
logger.error(f"Ошибка в фоновой задаче: {e}")
await asyncio.sleep(300) # Пауза 5 минут при ошибке
async def cleanup_old_data(self):
"""Очистка устаревших данных"""
# Пример: удаление неактивных пользователей старше 30 дней
cutoff_date = datetime.utcnow() - timedelta(days=30)
await self.repositories['users'].delete_inactive_before(cutoff_date)
logger.info("Очистка устаревших данных завершена")
# Запуск приложения
async def main():
"""Основная функция"""
app = Application()
async with app.lifespan():
# Пример использования
user = await app.create_user({
'username': 'test_user',
'email': 'test@example.com',
'full_name': 'Test User'
})
# Получение пользователя
retrieved_user = await app.get_user(user.id)
print(f"Получен пользователь: {retrieved_user}")
# Запуск фоновых задач
background_task = asyncio.create_task(app.run_background_tasks())
try:
# Основной цикл приложения
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nЗавершение работы...")
finally:
background_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
8.4 Docker Compose для тестирования
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: app_db
POSTGRES_USER: app_user
POSTGRES_PASSWORD: app_password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U app_user"]
interval: 10s
timeout: 5s
retries: 5
mysql:
image: mysql:8
environment:
MYSQL_DATABASE: app_db
MYSQL_USER: app_user
MYSQL_PASSWORD: app_password
MYSQL_ROOT_PASSWORD: root_password
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
mongodb:
image: mongo:6
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin_password
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
elasticsearch:
image: elasticsearch:8.10.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
volumes:
postgres_data:
mysql_data:
mongo_data:
redis_data:
elasticsearch_data:
Заключение
Это пособие охватывает основные аспекты работы с различными базами данных в Python:
- Прямые подключения для полного контроля
- SQLAlchemy ORM для удобной работы с SQL базами
- Асинхронные драйверы для высоконагруженных приложений
- Различные типы БД: SQLite, MySQL, PostgreSQL, MongoDB, Redis
- Паттерны проектирования для структурирования кода
- Лучшие практики по управлению соединениями и ошибками
Рекомендации по выбору подхода:
- Для простых проектов: SQLite с прямым подключением
- Для веб-приложений: PostgreSQL + SQLAlchemy ORM
- Для высоконагруженных систем: асинхронные драйверы + пулы соединений
- Для кэширования: Redis
- Для документ-ориентированных данных: MongoDB
- Для поиска: Elasticsearch
Ключевые принципы:
- Всегда используйте контекстные менеджеры для соединений
- Реализуйте пулы соединений для production
- Добавляйте обработку ошибок и retry логику
- Используйте миграции для управления схемой БД
- Реализуйте health checks для мониторинга
- Кэшируйте часто запрашиваемые данные
- Пишите тесты для работы с БД
Это пособие дает прочную основу для работы с любыми базами данных в Python, от простых скриптов до сложных распределенных систем.