← Назад к курсу

Практическое пособие по работе с базами данных в Python

Оглавление

  1. Введение и подходы к работе
  2. Прямые подключения
  3. SQLAlchemy ORM
  4. Реляционные базы данных
  5. NoSQL базы данных
  6. Асинхронные подключения
  7. Паттерны и лучшие практики
  8. Пример приложения

1. Введение и подходы к работе

Три основных подхода:

  1. Прямые подключения (драйверы БД)
  2. ORM (Object-Relational Mapping)
  3. ODM (Object-Document Mapping)

Установка основных драйверов:

# Реляционные БД
pip install pymysql psycopg2-binary

# NoSQL
pip install pymongo redis

# ORM/ODM
pip install sqlalchemy sqlmodel

# Асинхронные
pip install asyncpg aiomysql aioredis motor

# Утилиты
pip install alembic python-dotenv

2. Прямые подключения

Общий паттерн подключения:

import contextlib
from typing import Optional, Any

class DatabaseConnection:
    def __init__(self, config: dict):
        self.config = config
        self.connection: Optional[Any] = None
    
    @contextlib.contextmanager
    def connect(self):
        """Контекстный менеджер для подключения"""
        try:
            self.connection = self._create_connection()
            yield self.connection
        except Exception as e:
            self._handle_error(e)
            raise
        finally:
            self._close_connection()

3. Реляционные базы данных

3.1 SQLite

Прямое подключение:

import sqlite3
from sqlite3 import Error
import json

class SQLiteManager:
    def __init__(self, db_file: str):
        self.db_file = db_file
        self.conn = None
    
    def connect(self):
        """Создание подключения к базе SQLite"""
        try:
            self.conn = sqlite3.connect(
                self.db_file,
                detect_types=sqlite3.PARSE_DECLTYPES
            )
            # Включение поддержки внешних ключей
            self.conn.execute("PRAGMA foreign_keys = ON")
            # Возврат строк как словарей
            self.conn.row_factory = sqlite3.Row
            return self.conn
        except Error as e:
            print(f"Ошибка подключения к SQLite: {e}")
            return None
    
    def create_table(self):
        """Создание таблицы"""
        sql_create_users_table = """
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT NOT NULL UNIQUE,
            email TEXT NOT NULL UNIQUE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            metadata TEXT,
            is_active BOOLEAN DEFAULT 1
        );
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute(sql_create_users_table)
            self.conn.commit()
        except Error as e:
            print(f"Ошибка создания таблицы: {e}")
    
    def insert_user(self, user_data: dict):
        """Добавление пользователя"""
        sql = """
        INSERT INTO users(username, email, metadata)
        VALUES(?, ?, ?)
        """
        try:
            cursor = self.conn.cursor()
            metadata_json = json.dumps(user_data.get('metadata', {}))
            cursor.execute(sql, (
                user_data['username'],
                user_data['email'],
                metadata_json
            ))
            self.conn.commit()
            return cursor.lastrowid
        except Error as e:
            print(f"Ошибка вставки: {e}")
            return None
    
    def get_users(self, active_only: bool = True):
        """Получение пользователей"""
        sql = "SELECT * FROM users"
        if active_only:
            sql += " WHERE is_active = 1"
        
        cursor = self.conn.cursor()
        cursor.execute(sql)
        rows = cursor.fetchall()
        
        # Преобразование в словари
        return [dict(row) for row in rows]
    
    def close(self):
        """Закрытие соединения"""
        if self.conn:
            self.conn.close()

# Использование
if __name__ == "__main__":
    db = SQLiteManager("example.db")
    conn = db.connect()
    
    if conn:
        db.create_table()
        
        # Добавление пользователя
        user_id = db.insert_user({
            'username': 'john_doe',
            'email': 'john@example.com',
            'metadata': {'age': 30, 'city': 'Moscow'}
        })
        
        # Получение пользователей
        users = db.get_users()
        for user in users:
            print(f"User: {user['username']}, Email: {user['email']}")
        
        db.close()

3.2 MySQL

Установка драйвера:

pip install pymysql cryptography  # cryptography для шифрования

Прямое подключение:

import pymysql
from pymysql import MySQLError
from pymysql.cursors import DictCursor
import ssl
from typing import Optional

class MySQLManager:
    def __init__(
        self,
        host: str,
        user: str,
        password: str,
        database: str,
        port: int = 3306,
        ssl_enabled: bool = False
    ):
        self.config = {
            'host': host,
            'user': user,
            'password': password,
            'database': database,
            'port': port,
            'charset': 'utf8mb4',
            'cursorclass': DictCursor,
            'autocommit': False
        }
        
        if ssl_enabled:
            self.config['ssl'] = {
                'ca': '/path/to/ca-cert.pem',
                'ssl_disabled': False
            }
        
        self.conn: Optional[pymysql.Connection] = None
        self.pool = None  # Для пула соединений
    
    def connect(self):
        """Подключение к MySQL"""
        try:
            self.conn = pymysql.connect(**self.config)
            print("Успешное подключение к MySQL")
            return self.conn
        except MySQLError as e:
            print(f"Ошибка подключения к MySQL: {e}")
            return None
    
    def execute_transaction(self, queries: list):
        """Выполнение транзакции"""
        if not self.conn:
            self.connect()
        
        try:
            with self.conn.cursor() as cursor:
                for query, params in queries:
                    cursor.execute(query, params)
            self.conn.commit()
            return True
        except MySQLError as e:
            self.conn.rollback()
            print(f"Ошибка транзакции: {e}")
            return False
        finally:
            self.conn.close()
    
    def create_connection_pool(self, pool_size: int = 5):
        """Создание пула соединений (упрощенный вариант)"""
        import queue
        self.pool = queue.Queue(maxsize=pool_size)
        
        for _ in range(pool_size):
            conn = pymysql.connect(**self.config)
            self.pool.put(conn)
    
    def get_connection_from_pool(self):
        """Получение соединения из пула"""
        if self.pool:
            return self.pool.get()
        return self.connect()
    
    def return_connection_to_pool(self, conn):
        """Возврат соединения в пул"""
        if self.pool:
            self.pool.put(conn)
        else:
            conn.close()
    
    def create_stored_procedure(self):
        """Создание хранимой процедуры"""
        procedure_sql = """
        DELIMITER //
        CREATE PROCEDURE GetActiveUsers()
        BEGIN
            SELECT * FROM users WHERE is_active = 1;
        END //
        DELIMITER ;
        """
        
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(procedure_sql)
            self.conn.commit()
        except MySQLError as e:
            print(f"Ошибка создания процедуры: {e}")
    
    def call_stored_procedure(self):
        """Вызов хранимой процедуры"""
        try:
            with self.conn.cursor() as cursor:
                cursor.callproc('GetActiveUsers')
                results = cursor.fetchall()
                return results
        except MySQLError as e:
            print(f"Ошибка вызова процедуры: {e}")
            return []

# Использование
if __name__ == "__main__":
    mysql = MySQLManager(
        host="localhost",
        user="root",
        password="password",
        database="test_db"
    )
    
    conn = mysql.connect()
    if conn:
        # Создание таблицы
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS products (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            price DECIMAL(10, 2),
            category VARCHAR(100),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            INDEX idx_category (category),
            INDEX idx_created (created_at)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
        """
        
        with conn.cursor() as cursor:
            cursor.execute(create_table_sql)
        
        # Вставка данных с транзакцией
        queries = [
            ("INSERT INTO products (name, price, category) VALUES (%s, %s, %s)", 
             ("Laptop", 999.99, "Electronics")),
            ("INSERT INTO products (name, price, category) VALUES (%s, %s, %s)", 
             ("Mouse", 25.50, "Electronics"))
        ]
        
        mysql.execute_transaction(queries)
        
        conn.close()

3.3 PostgreSQL

Установка драйвера:

pip install psycopg2-binary  # для синхронного использования
pip install asyncpg          # для асинхронного использования

Прямое подключение:

import psycopg2
from psycopg2 import pool, sql
from psycopg2.extras import RealDictCursor, Json
from contextlib import contextmanager
import json

class PostgreSQLManager:
    def __init__(
        self,
        host: str,
        user: str,
        password: str,
        database: str,
        port: int = 5432,
        min_connections: int = 1,
        max_connections: int = 10
    ):
        self.config = {
            'host': host,
            'user': user,
            'password': password,
            'database': database,
            'port': port
        }
        self.connection_pool = None
        self.min_connections = min_connections
        self.max_connections = max_connections
    
    def create_pool(self):
        """Создание пула соединений"""
        try:
            self.connection_pool = psycopg2.pool.SimpleConnectionPool(
                self.min_connections,
                self.max_connections,
                **self.config
            )
            print("Пул соединений создан")
        except psycopg2.Error as e:
            print(f"Ошибка создания пула: {e}")
    
    @contextmanager
    def get_connection(self):
        """Контекстный менеджер для получения соединения"""
        conn = None
        try:
            if self.connection_pool:
                conn = self.connection_pool.getconn()
            else:
                conn = psycopg2.connect(**self.config)
            
            conn.autocommit = False
            yield conn
            conn.commit()
        except psycopg2.Error as e:
            if conn:
                conn.rollback()
            print(f"Ошибка базы данных: {e}")
            raise
        finally:
            if conn and self.connection_pool:
                self.connection_pool.putconn(conn)
            elif conn:
                conn.close()
    
    def execute_query(self, query: str, params: tuple = None):
        """Выполнение запроса с возвратом результатов"""
        with self.get_connection() as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cursor:
                cursor.execute(query, params or ())
                if cursor.description:  # Если есть результат
                    return cursor.fetchall()
                return None
    
    def create_json_table(self):
        """Создание таблицы с JSON полем"""
        query = """
        CREATE TABLE IF NOT EXISTS documents (
            id SERIAL PRIMARY KEY,
            title VARCHAR(255) NOT NULL,
            content JSONB,
            metadata JSONB,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            tags TEXT[],
            
            -- Индексы для JSON полей
            CONSTRAINT valid_json CHECK (content IS NULL OR jsonb_typeof(content) = 'object'),
            
            -- GIN индексы для быстрого поиска по JSON
            INDEX idx_gin_content ON documents USING GIN (content),
            INDEX idx_gin_metadata ON documents USING GIN (metadata),
            INDEX idx_gin_tags ON documents USING GIN (tags)
        );
        """
        
        self.execute_query(query)
    
    def insert_json_document(self, title: str, content: dict, metadata: dict, tags: list):
        """Вставка JSON документа"""
        query = """
        INSERT INTO documents (title, content, metadata, tags)
        VALUES (%s, %s, %s, %s)
        RETURNING id, created_at;
        """
        
        result = self.execute_query(
            query,
            (title, Json(content), Json(metadata), tags)
        )
        return result[0] if result else None
    
    def search_json(self, key: str, value: str):
        """Поиск по JSON полю"""
        query = sql.SQL("""
        SELECT * FROM documents 
        WHERE content->>%s = %s
        """)
        
        return self.execute_query(query, (key, value))
    
    def full_text_search(self, search_term: str):
        """Полнотекстовый поиск"""
        query = """
        SELECT * FROM documents
        WHERE to_tsvector('russian', title || ' ' || content::text) 
              @@ plainto_tsquery('russian', %s)
        ORDER BY ts_rank(
            to_tsvector('russian', title || ' ' || content::text),
            plainto_tsquery('russian', %s)
        ) DESC;
        """
        
        return self.execute_query(query, (search_term, search_term))
    
    def create_partitioned_table(self):
        """Создание секционированной таблицы"""
        queries = [
            """
            CREATE TABLE IF NOT EXISTS logs (
                id BIGSERIAL,
                log_time TIMESTAMP NOT NULL,
                level VARCHAR(10),
                message TEXT,
                application VARCHAR(50)
            ) PARTITION BY RANGE (log_time);
            """,
            """
            CREATE TABLE logs_y2024m01 PARTITION OF logs
            FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
            """,
            """
            CREATE TABLE logs_y2024m02 PARTITION OF logs
            FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
            """
        ]
        
        for query in queries:
            self.execute_query(query)
    
    def close_pool(self):
        """Закрытие пула соединений"""
        if self.connection_pool:
            self.connection_pool.closeall()

# Использование
if __name__ == "__main__":
    pg = PostgreSQLManager(
        host="localhost",
        user="postgres",
        password="password",
        database="test_db"
    )
    
    pg.create_pool()
    
    # Создание таблицы с JSON
    pg.create_json_table()
    
    # Вставка JSON документа
    doc_id = pg.insert_json_document(
        title="Пример документа",
        content={"text": "Привет мир", "author": "Иван"},
        metadata={"views": 100, "language": "ru"},
        tags=["python", "postgres", "json"]
    )
    
    # Поиск по JSON
    results = pg.search_json("author", "Иван")
    for row in results:
        print(f"Найден документ: {row['title']}")
    
    # Закрытие пула
    pg.close_pool()

4. SQLAlchemy ORM

4.1 Базовое использование

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, JSON, ForeignKey, Text, Float, UniqueConstraint, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, Session, joinedload
from sqlalchemy.sql import func, text
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
from datetime import datetime
from typing import Optional, List
import json

Base = declarative_base()

# Модели
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), unique=True, nullable=False, index=True)
    email = Column(String(100), unique=True, nullable=False)
    full_name = Column(String(100))
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, onupdate=func.now())
    preferences = Column(JSON, default=dict)
    
    # Отношения
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
    
    # Составной индекс
    __table_args__ = (
        Index('idx_user_active_created', 'is_active', 'created_at'),
    )
    
    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(Text)
    author_id = Column(Integer, ForeignKey('users.id', ondelete="CASCADE"))
    published = Column(Boolean, default=False)
    tags = Column(ARRAY(String))
    metadata = Column(JSONB)
    views = Column(Integer, default=0)
    created_at = Column(DateTime, server_default=func.now())
    
    # Отношения
    author = relationship("User", back_populates="posts")
    comments = relationship("Comment", back_populates="post")
    
    # Полнотекстовый поиск (для PostgreSQL)
    __ts_vector__ = func.to_tsvector('russian', title + ' ' + content)
    
    __table_args__ = (
        Index('idx_post_author', 'author_id'),
        Index('idx_post_published', 'published'),
        Index('idx_post_created', 'created_at'),
        Index('ix_posts_fts', __ts_vector__, postgresql_using='gin'),
    )
    
    def to_dict(self):
        """Преобразование объекта в словарь"""
        return {
            'id': self.id,
            'title': self.title,
            'author': self.author.username if self.author else None,
            'created_at': self.created_at.isoformat() if self.created_at else None
        }

class Comment(Base):
    __tablename__ = 'comments'
    
    id = Column(Integer, primary_key=True)
    content = Column(Text, nullable=False)
    post_id = Column(Integer, ForeignKey('posts.id'))
    user_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, server_default=func.now())
    
    # Отношения
    post = relationship("Post", back_populates="comments")
    user = relationship("User")

class DatabaseManager:
    def __init__(self, connection_string: str, echo: bool = False):
        """Инициализация менеджера базы данных"""
        self.engine = create_engine(
            connection_string,
            echo=echo,
            pool_size=5,
            max_overflow=10,
            pool_pre_ping=True,  # Проверка соединения перед использованием
            pool_recycle=3600    # Пересоздание соединений каждый час
        )
        self.SessionLocal = sessionmaker(
            bind=self.engine,
            autocommit=False,
            autoflush=False,
            expire_on_commit=False
        )
        Base.metadata.create_all(self.engine)
    
    def get_session(self) -> Session:
        """Получение сессии"""
        return self.SessionLocal()
    
    @contextmanager
    def session_scope(self):
        """Контекстный менеджер для сессии"""
        session = self.get_session()
        try:
            yield session
            session.commit()
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()
    
    def add_user(self, username: str, email: str, **kwargs):
        """Добавление пользователя"""
        with self.session_scope() as session:
            user = User(username=username, email=email, **kwargs)
            session.add(user)
            session.flush()  # Получаем ID без коммита
            return user.id
    
    def get_user_with_posts(self, user_id: int):
        """Получение пользователя с его постами (жадная загрузка)"""
        with self.session_scope() as session:
            user = session.query(User).options(
                joinedload(User.posts)
            ).filter(User.id == user_id).first()
            return user
    
    def search_posts(self, search_term: str, limit: int = 10):
        """Поиск постов (полнотекстовый для PostgreSQL)"""
        with self.session_scope() as session:
            # Для PostgreSQL
            if 'postgresql' in str(self.engine.url):
                query = session.query(Post).filter(
                    Post.__ts_vector__.match(search_term)
                ).limit(limit)
            else:
                # Для других БД - обычный LIKE
                query = session.query(Post).filter(
                    Post.title.ilike(f"%{search_term}%") |
                    Post.content.ilike(f"%{search_term}%")
                ).limit(limit)
            
            return query.all()
    
    def bulk_insert_users(self, users_data: List[dict]):
        """Массовая вставка пользователей"""
        with self.session_scope() as session:
            users = [User(**data) for data in users_data]
            session.bulk_save_objects(users)
    
    def execute_raw_sql(self, sql: str, params: dict = None):
        """Выполнение сырого SQL"""
        with self.session_scope() as session:
            result = session.execute(text(sql), params or {})
            if result.returns_rows:
                return [dict(row) for row in result]
            return result.rowcount
    
    def get_engine_stats(self):
        """Получение статистики пула соединений"""
        return {
            'checked_out': self.engine.pool.checkedout(),
            'checked_in': self.engine.pool.checkedin(),
            'size': self.engine.pool.size()
        }

# Использование SQLAlchemy с разными БД
class SQLAlchemyFactory:
    @staticmethod
    def create_mysql_connection():
        """Создание подключения к MySQL"""
        connection_string = "mysql+pymysql://user:password@localhost/dbname?charset=utf8mb4"
        return DatabaseManager(connection_string)
    
    @staticmethod
    def create_postgresql_connection():
        """Создание подключения к PostgreSQL"""
        connection_string = "postgresql+psycopg2://user:password@localhost/dbname"
        return DatabaseManager(connection_string)
    
    @staticmethod
    def create_sqlite_connection():
        """Создание подключения к SQLite"""
        connection_string = "sqlite:///./test.db"
        return DatabaseManager(connection_string)

# Асинхронная версия SQLAlchemy
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base

AsyncBase = declarative_base()

class AsyncDatabaseManager:
    def __init__(self, connection_string: str, echo: bool = False):
        """Асинхронный менеджер базы данных"""
        self.engine = create_async_engine(
            connection_string,
            echo=echo,
            pool_size=5,
            max_overflow=10,
            pool_pre_ping=True
        )
        self.AsyncSessionLocal = async_sessionmaker(
            bind=self.engine,
            class_=AsyncSession,
            expire_on_commit=False
        )
    
    async def init_db(self):
        """Инициализация базы данных"""
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    
    async def get_session(self) -> AsyncSession:
        """Получение асинхронной сессии"""
        async with self.AsyncSessionLocal() as session:
            try:
                yield session
                await session.commit()
            except Exception:
                await session.rollback()
                raise
            finally:
                await session.close()
    
    async def add_user_async(self, username: str, email: str):
        """Асинхронное добавление пользователя"""
        async with self.AsyncSessionLocal() as session:
            user = User(username=username, email=email)
            session.add(user)
            await session.flush()
            await session.refresh(user)
            return user

# Использование
if __name__ == "__main__":
    # Подключение к SQLite
    db_manager = SQLAlchemyFactory.create_sqlite_connection()
    
    # Добавление пользователя
    user_id = db_manager.add_user("test_user", "test@example.com")
    print(f"Добавлен пользователь с ID: {user_id}")
    
    # Получение пользователя с постами
    user = db_manager.get_user_with_posts(user_id)
    if user:
        print(f"Пользователь: {user.username}")
        print(f"Количество постов: {len(user.posts)}")
    
    # Выполнение сырого SQL
    results = db_manager.execute_raw_sql("SELECT * FROM users")
    for row in results:
        print(row)
    
    # Статистика пула
    stats = db_manager.get_engine_stats()
    print(f"Статистика пула: {stats}")

4.2 Миграции с Alembic

# alembic.ini конфигурация
"""
[alembic]
script_location = alembic
sqlalchemy.url = postgresql://user:password@localhost/dbname

[post_write_hooks]
hooks = black
black.type = console_scripts
black.entrypoint = black
black.options = -q
"""

# alembic/env.py
"""
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import sys
import os

sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from models import Base

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

def run_migrations_offline():
    context.configure(
        url=config.get_main_option("sqlalchemy.url"),
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(
            connection=connection, 
            target_metadata=target_metadata,
            compare_type=True,
            compare_server_default=True
        )
        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()
"""

# Создание миграции
"""
alembic revision --autogenerate -m "Добавление таблицы комментариев"
alembic upgrade head
alembic downgrade -1
"""

5. NoSQL базы данных

5.1 MongoDB

from pymongo import MongoClient, ASCENDING, DESCENDING, TEXT, ReturnDocument
from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure
from pymongo.collection import Collection
from pymongo.database import Database
from typing import List, Dict, Any, Optional
from datetime import datetime
import gridfs
from bson import ObjectId, json_util
import json
from motor.motor_asyncio import AsyncIOMotorClient

class MongoDBManager:
    def __init__(
        self,
        host: str = "localhost",
        port: int = 27017,
        username: str = None,
        password: str = None,
        database: str = "test_db",
        auth_source: str = "admin",
        replica_set: str = None,
        **kwargs
    ):
        """Инициализация MongoDB менеджера"""
        self.connection_string = self._build_connection_string(
            host, port, username, password, auth_source, replica_set
        )
        self.database_name = database
        self.client: Optional[MongoClient] = None
        self.db: Optional[Database] = None
        self.fs = None
        
        # Параметры подключения
        self.connection_params = {
            'maxPoolSize': 100,
            'minPoolSize': 10,
            'maxIdleTimeMS': 30000,
            'socketTimeoutMS': 30000,
            'connectTimeoutMS': 10000,
            'serverSelectionTimeoutMS': 5000,
            **kwargs
        }
    
    def _build_connection_string(
        self,
        host: str,
        port: int,
        username: str,
        password: str,
        auth_source: str,
        replica_set: str
    ) -> str:
        """Построение строки подключения"""
        if username and password:
            auth = f"{username}:{password}@"
        else:
            auth = ""
        
        if replica_set:
            return f"mongodb://{auth}{host}:{port}/?replicaSet={replica_set}&authSource={auth_source}"
        else:
            return f"mongodb://{auth}{host}:{port}/?authSource={auth_source}"
    
    def connect(self):
        """Подключение к MongoDB"""
        try:
            self.client = MongoClient(
                self.connection_string,
                **self.connection_params
            )
            # Проверка подключения
            self.client.admin.command('ping')
            self.db = self.client[self.database_name]
            self.fs = gridfs.GridFS(self.db)
            print("Успешное подключение к MongoDB")
            return True
        except ConnectionFailure as e:
            print(f"Ошибка подключения к MongoDB: {e}")
            return False
    
    def create_collection(self, collection_name: str, **kwargs):
        """Создание коллекции с валидацией"""
        if collection_name not in self.db.list_collection_names():
            self.db.create_collection(collection_name, **kwargs)
            print(f"Коллекция {collection_name} создана")
        return self.db[collection_name]
    
    def create_user_collection(self):
        """Создание коллекции пользователей с валидацией"""
        validator = {
            '$jsonSchema': {
                'bsonType': 'object',
                'required': ['username', 'email', 'created_at'],
                'properties': {
                    'username': {
                        'bsonType': 'string',
                        'description': 'Имя пользователя должно быть строкой'
                    },
                    'email': {
                        'bsonType': 'string',
                        'pattern': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$',
                        'description': 'Email должен быть валидным'
                    },
                    'age': {
                        'bsonType': 'int',
                        'minimum': 0,
                        'maximum': 150,
                        'description': 'Возраст должен быть числом от 0 до 150'
                    },
                    'created_at': {
                        'bsonType': 'date',
                        'description': 'Дата создания должна быть датой'
                    }
                }
            }
        }
        
        return self.create_collection(
            'users',
            validator=validator,
            validationLevel='strict',
            validationAction='error'
        )
    
    def create_indexes(self, collection: Collection):
        """Создание индексов"""
        # Составной индекс
        collection.create_index([('username', ASCENDING), ('email', ASCENDING)], unique=True)
        
        # TTL индекс для автоматического удаления устаревших документов
        collection.create_index([('created_at', ASCENDING)], expireAfterSeconds=3600)
        
        # Текстовый индекс для поиска
        collection.create_index([('bio', TEXT)], default_language='russian')
        
        # Геопространственный индекс
        collection.create_index([('location', '2dsphere')])
    
    def insert_user(self, user_data: Dict[str, Any]):
        """Вставка пользователя"""
        collection = self.db['users']
        
        # Добавление метаданных
        user_data['created_at'] = datetime.utcnow()
        user_data['updated_at'] = datetime.utcnow()
        
        try:
            result = collection.insert_one(user_data)
            print(f"Добавлен пользователь с ID: {result.inserted_id}")
            return result.inserted_id
        except DuplicateKeyError as e:
            print(f"Дубликат ключа: {e}")
            return None
    
    def find_users(self, query: Dict = None, projection: Dict = None, limit: int = 10):
        """Поиск пользователей"""
        collection = self.db['users']
        query = query or {}
        projection = projection or {}
        
        cursor = collection.find(query, projection).limit(limit)
        return list(cursor)
    
    def aggregate_users(self, pipeline: List[Dict]):
        """Агрегация данных пользователей"""
        collection = self.db['users']
        return list(collection.aggregate(pipeline))
    
    def update_user(self, user_id: str, update_data: Dict, upsert: bool = False):
        """Обновление пользователя"""
        collection = self.db['users']
        
        # Добавление поля updated_at
        update_data['$set']['updated_at'] = datetime.utcnow()
        
        result = collection.update_one(
            {'_id': ObjectId(user_id)},
            update_data,
            upsert=upsert
        )
        
        if result.modified_count > 0:
            print(f"Обновлено документов: {result.modified_count}")
        return result
    
    def transaction_example(self):
        """Пример транзакции (требует replica set)"""
        with self.client.start_session() as session:
            try:
                with session.start_transaction():
                    # Операция 1
                    self.db.accounts.update_one(
                        {'_id': 'account1'},
                        {'$inc': {'balance': -100}},
                        session=session
                    )
                    
                    # Операция 2
                    self.db.accounts.update_one(
                        {'_id': 'account2'},
                        {'$inc': {'balance': 100}},
                        session=session
                    )
                    
                    session.commit_transaction()
                    print("Транзакция завершена успешно")
                    
            except Exception as e:
                session.abort_transaction()
                print(f"Ошибка транзакции: {e}")
                raise
    
    def store_file(self, filename: str, data: bytes, metadata: Dict = None):
        """Хранение файла в GridFS"""
        file_id = self.fs.put(
            data,
            filename=filename,
            metadata=metadata or {}
        )
        return file_id
    
    def get_file(self, file_id):
        """Получение файла из GridFS"""
        return self.fs.get(file_id)
    
    def change_stream_example(self):
        """Пример Change Streams (реактивное программирование)"""
        collection = self.db['users']
        
        with collection.watch(
            [
                {'$match': {'operationType': 'insert'}},
                {'$match': {'operationType': 'update'}}
            ]
        ) as stream:
            for change in stream:
                print(f"Изменение: {change}")
                # Реакция на изменение в реальном времени
    
    def close(self):
        """Закрытие подключения"""
        if self.client:
            self.client.close()
            print("Подключение к MongoDB закрыто")

# Асинхронная версия MongoDB
class AsyncMongoDBManager:
    def __init__(self, connection_string: str, database: str = "test_db"):
        self.client = AsyncIOMotorClient(connection_string)
        self.db = self.client[database]
    
    async def find_users_async(self, query: Dict, limit: int = 10):
        """Асинхронный поиск пользователей"""
        cursor = self.db.users.find(query).limit(limit)
        return await cursor.to_list(length=limit)
    
    async def insert_user_async(self, user_data: Dict):
        """Асинхронная вставка пользователя"""
        result = await self.db.users.insert_one(user_data)
        return result.inserted_id

# Использование
if __name__ == "__main__":
    # Синхронное подключение
    mongo = MongoDBManager(
        host="localhost",
        port=27017,
        username="admin",
        password="password",
        database="myapp"
    )
    
    if mongo.connect():
        # Создание коллекции с валидацией
        users_collection = mongo.create_user_collection()
        
        # Создание индексов
        mongo.create_indexes(users_collection)
        
        # Вставка пользователя
        user_id = mongo.insert_user({
            'username': 'john_doe',
            'email': 'john@example.com',
            'age': 30,
            'bio': 'Python разработчик из Москвы',
            'location': {
                'type': 'Point',
                'coordinates': [37.6173, 55.7558]  # Москва
            }
        })
        
        # Поиск пользователей
        users = mongo.find_users({'age': {'$gte': 25}}, limit=5)
        for user in users:
            print(f"Найден: {user['username']}")
        
        # Агрегация
        pipeline = [
            {'$match': {'age': {'$gte': 25}}},
            {'$group': {
                '_id': None,
                'average_age': {'$avg': '$age'},
                'count': {'$sum': 1}
            }}
        ]
        
        result = mongo.aggregate_users(pipeline)
        print(f"Средний возраст: {result}")
        
        # Хранение файла
        with open('document.pdf', 'rb') as f:
            file_id = mongo.store_file('document.pdf', f.read())
            print(f"Файл сохранен с ID: {file_id}")
        
        mongo.close()

5.2 Redis

import redis
from redis import Redis, ConnectionPool
from redis.exceptions import ConnectionError, RedisError
import json
import pickle
from typing import Any, Optional, Union, List, Dict
import asyncio
import aioredis

class RedisManager:
    def __init__(
        self,
        host: str = 'localhost',
        port: int = 6379,
        db: int = 0,
        password: str = None,
        decode_responses: bool = True,
        max_connections: int = 10,
        **kwargs
    ):
        """Инициализация Redis менеджера"""
        self.config = {
            'host': host,
            'port': port,
            'db': db,
            'password': password,
            'decode_responses': decode_responses,
            'socket_timeout': 5,
            'socket_connect_timeout': 5,
            'retry_on_timeout': True,
            'health_check_interval': 30,
            **kwargs
        }
        
        self.pool = ConnectionPool(max_connections=max_connections, **self.config)
        self.client: Optional[Redis] = None
    
    def connect(self) -> bool:
        """Подключение к Redis"""
        try:
            self.client = redis.Redis(connection_pool=self.pool)
            # Проверка подключения
            self.client.ping()
            print("Успешное подключение к Redis")
            return True
        except ConnectionError as e:
            print(f"Ошибка подключения к Redis: {e}")
            return False
    
    def set_value(self, key: str, value: Any, expire: int = None) -> bool:
        """Установка значения"""
        try:
            # Сериализация значения
            if isinstance(value, (dict, list)):
                serialized_value = json.dumps(value)
            elif isinstance(value, (int, float, str)):
                serialized_value = str(value)
            else:
                serialized_value = pickle.dumps(value)
            
            if expire:
                return bool(self.client.setex(key, expire, serialized_value))
            else:
                return bool(self.client.set(key, serialized_value))
        except RedisError as e:
            print(f"Ошибка записи в Redis: {e}")
            return False
    
    def get_value(self, key: str, default: Any = None) -> Any:
        """Получение значения"""
        try:
            value = self.client.get(key)
            if value is None:
                return default
            
            # Попытка десериализации
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                try:
                    return pickle.loads(value)
                except:
                    return value.decode() if isinstance(value, bytes) else value
        except RedisError as e:
            print(f"Ошибка чтения из Redis: {e}")
            return default
    
    def delete_key(self, key: str) -> bool:
        """Удаление ключа"""
        try:
            return bool(self.client.delete(key))
        except RedisError as e:
            print(f"Ошибка удаления ключа: {e}")
            return False
    
    def set_hash(self, name: str, mapping: Dict[str, Any]) -> bool:
        """Установка хэша"""
        try:
            # Сериализация значений
            serialized_mapping = {
                k: json.dumps(v) if isinstance(v, (dict, list)) else str(v)
                for k, v in mapping.items()
            }
            return bool(self.client.hset(name, mapping=serialized_mapping))
        except RedisError as e:
            print(f"Ошибка записи хэша: {e}")
            return False
    
    def get_hash(self, name: str) -> Dict[str, Any]:
        """Получение хэша"""
        try:
            data = self.client.hgetall(name)
            result = {}
            for k, v in data.items():
                try:
                    result[k] = json.loads(v)
                except json.JSONDecodeError:
                    result[k] = v
            return result
        except RedisError as e:
            print(f"Ошибка чтения хэша: {e}")
            return {}
    
    def push_to_list(self, key: str, value: Any, side: str = 'right', max_length: int = None):
        """Добавление в список"""
        try:
            serialized_value = json.dumps(value) if isinstance(value, (dict, list)) else str(value)
            
            if side == 'right':
                self.client.rpush(key, serialized_value)
            else:
                self.client.lpush(key, serialized_value)
            
            # Ограничение длины списка
            if max_length:
                self.client.ltrim(key, 0, max_length - 1)
        except RedisError as e:
            print(f"Ошибка добавления в список: {e}")
    
    def get_list(self, key: str, start: int = 0, end: int = -1) -> List[Any]:
        """Получение списка"""
        try:
            values = self.client.lrange(key, start, end)
            result = []
            for v in values:
                try:
                    result.append(json.loads(v))
                except json.JSONDecodeError:
                    result.append(v.decode() if isinstance(v, bytes) else v)
            return result
        except RedisError as e:
            print(f"Ошибка чтения списка: {e}")
            return []
    
    def add_to_set(self, key: str, *values: Any) -> int:
        """Добавление в множество"""
        try:
            serialized_values = [
                json.dumps(v) if isinstance(v, (dict, list)) else str(v)
                for v in values
            ]
            return self.client.sadd(key, *serialized_values)
        except RedisError as e:
            print(f"Ошибка добавления в множество: {e}")
            return 0
    
    def publish_message(self, channel: str, message: Any):
        """Публикация сообщения в канал"""
        try:
            serialized_message = json.dumps(message) if isinstance(message, (dict, list)) else str(message)
            self.client.publish(channel, serialized_message)
        except RedisError as e:
            print(f"Ошибка публикации: {e}")
    
    def subscribe_to_channel(self, channel: str):
        """Подписка на канал"""
        pubsub = self.client.pubsub()
        pubsub.subscribe(channel)
        
        for message in pubsub.listen():
            if message['type'] == 'message':
                data = message['data']
                try:
                    yield json.loads(data)
                except json.JSONDecodeError:
                    yield data.decode() if isinstance(data, bytes) else data
    
    def acquire_lock(self, lock_name: str, timeout: int = 10) -> bool:
        """Приобретение распределенной блокировки"""
        import uuid
        identifier = str(uuid.uuid4())
        
        end = time.time() + timeout
        while time.time() < end:
            if self.client.set(lock_name, identifier, nx=True, ex=timeout):
                return identifier
            time.sleep(0.001)
        
        return False
    
    def release_lock(self, lock_name: str, identifier: str) -> bool:
        """Освобождение распределенной блокировки"""
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        try:
            result = self.client.eval(script, 1, lock_name, identifier)
            return bool(result)
        except RedisError:
            return False
    
    def pipeline_example(self):
        """Использование pipeline для пакетных операций"""
        try:
            pipe = self.client.pipeline()
            
            # Множественные операции
            pipe.set('key1', 'value1')
            pipe.set('key2', 'value2')
            pipe.get('key1')
            pipe.get('key2')
            
            # Выполнение всех команд за один round-trip
            results = pipe.execute()
            print(f"Результаты pipeline: {results}")
            
        except RedisError as e:
            print(f"Ошибка pipeline: {e}")
    
    def transaction_example(self):
        """Использование транзакций"""
        try:
            with self.client.pipeline() as pipe:
                while True:
                    try:
                        # Начало транзакции
                        pipe.watch('balance')
                        
                        current_balance = int(pipe.get('balance') or 0)
                        
                        # Множественные команды
                        pipe.multi()
                        pipe.set('balance', current_balance - 100)
                        pipe.set('transaction_log', 'withdrawal_100')
                        
                        # Выполнение
                        pipe.execute()
                        break
                    except redis.WatchError:
                        continue
        except RedisError as e:
            print(f"Ошибка транзакции: {e}")
    
    def get_info(self) -> Dict:
        """Получение информации о Redis сервере"""
        try:
            return self.client.info()
        except RedisError as e:
            print(f"Ошибка получения информации: {e}")
            return {}
    
    def scan_keys(self, pattern: str = "*", count: int = 100) -> List[str]:
        """Сканирование ключей (вместо keys)"""
        try:
            keys = []
            cursor = 0
            
            while True:
                cursor, found_keys = self.client.scan(
                    cursor=cursor,
                    match=pattern,
                    count=count
                )
                keys.extend(found_keys)
                if cursor == 0:
                    break
            
            return keys
        except RedisError as e:
            print(f"Ошибка сканирования ключей: {e}")
            return []
    
    def close(self):
        """Закрытие подключения"""
        if self.client:
            self.client.close()
            self.pool.disconnect()
            print("Подключение к Redis закрыто")

# Асинхронная версия Redis
class AsyncRedisManager:
    def __init__(self, host: str = 'localhost', port: int = 6379):
        self.redis = None
        self.host = host
        self.port = port
    
    async def connect(self):
        """Асинхронное подключение"""
        self.redis = await aioredis.create_redis_pool(
            f'redis://{self.host}:{self.port}',
            minsize=5,
            maxsize=10
        )
        return self.redis
    
    async def set_value_async(self, key: str, value: str, expire: int = None):
        """Асинхронная установка значения"""
        if expire:
            await self.redis.setex(key, expire, value)
        else:
            await self.redis.set(key, value)
    
    async def get_value_async(self, key: str) -> Optional[str]:
        """Асинхронное получение значения"""
        return await self.redis.get(key)
    
    async def close(self):
        """Закрытие асинхронного подключения"""
        if self.redis:
            self.redis.close()
            await self.redis.wait_closed()

# Кэширование с Redis
class RedisCache:
    def __init__(self, redis_manager: RedisManager, default_ttl: int = 3600):
        self.redis = redis_manager
        self.default_ttl = default_ttl
    
    def cache_decorator(self, ttl: int = None):
        """Декоратор для кэширования результатов функций"""
        def decorator(func):
            def wrapper(*args, **kwargs):
                # Создание ключа кэша
                cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
                
                # Попытка получить из кэша
                cached_result = self.redis.get_value(cache_key)
                if cached_result is not None:
                    print(f"Кэш попадание для {cache_key}")
                    return cached_result
                
                # Выполнение функции
                result = func(*args, **kwargs)
                
                # Сохранение в кэш
                self.redis.set_value(
                    cache_key,
                    result,
                    expire=ttl or self.default_ttl
                )
                
                print(f"Кэш промах для {cache_key}, результат сохранен")
                return result
            return wrapper
        return decorator

# Использование
if __name__ == "__main__":
    # Синхронное использование
    redis_manager = RedisManager(
        host='localhost',
        port=6379,
        db=0,
        password=None,
        decode_responses=True
    )
    
    if redis_manager.connect():
        # Простые операции
        redis_manager.set_value('user:1:name', 'John Doe', expire=60)
        name = redis_manager.get_value('user:1:name')
        print(f"Имя пользователя: {name}")
        
        # Хэш
        redis_manager.set_hash('user:1', {
            'name': 'John',
            'age': 30,
            'email': 'john@example.com'
        })
        
        user_data = redis_manager.get_hash('user:1')
        print(f"Данные пользователя: {user_data}")
        
        # Список
        redis_manager.push_to_list('recent_users', 'user1', max_length=10)
        redis_manager.push_to_list('recent_users', 'user2', max_length=10)
        
        recent_users = redis_manager.get_list('recent_users')
        print(f"Последние пользователи: {recent_users}")
        
        # Pub/Sub
        # В отдельном потоке можно запустить подписку
        # redis_manager.subscribe_to_channel('notifications')
        
        # Pipeline
        redis_manager.pipeline_example()
        
        # Сканирование ключей
        keys = redis_manager.scan_keys(pattern='user:*')
        print(f"Найдено ключей: {len(keys)}")
        
        redis_manager.close()
    
    # Асинхронное использование
    async def async_example():
        async_redis = AsyncRedisManager()
        await async_redis.connect()
        
        await async_redis.set_value_async('async_key', 'async_value', expire=10)
        value = await async_redis.get_value_async('async_key')
        print(f"Асинхронное значение: {value}")
        
        await async_redis.close()
    
    asyncio.run(async_example())

5.3 Другие популярные NoSQL БД

Cassandra:

"""
pip install cassandra-driver
"""

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory, BatchStatement, ConsistencyLevel
from cassandra.policies import DCAwareRoundRobinPolicy

class CassandraManager:
    def __init__(self, hosts: list, keyspace: str, username: str = None, password: str = None):
        auth_provider = None
        if username and password:
            auth_provider = PlainTextAuthProvider(username=username, password=password)
        
        self.cluster = Cluster(
            hosts,
            auth_provider=auth_provider,
            load_balancing_policy=DCAwareRoundRobinPolicy(),
            protocol_version=4
        )
        self.session = self.cluster.connect(keyspace)
        self.session.row_factory = dict_factory
    
    def execute_query(self, query: str, params: tuple = None):
        return self.session.execute(query, params)
    
    def batch_insert(self, queries: list):
        batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
        for query, params in queries:
            batch.add(query, params)
        self.session.execute(batch)
    
    def close(self):
        self.cluster.shutdown()

Elasticsearch:

"""
pip install elasticsearch
"""

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticsearchManager:
    def __init__(self, hosts: list, **kwargs):
        self.client = Elasticsearch(
            hosts,
            http_auth=('user', 'password'),
            **kwargs
        )
    
    def index_document(self, index: str, doc: dict, doc_id: str = None):
        return self.client.index(
            index=index,
            id=doc_id,
            document=doc,
            refresh=True
        )
    
    def search(self, index: str, query: dict):
        return self.client.search(index=index, body=query)
    
    def bulk_index(self, actions: list):
        return bulk(self.client, actions)

6. Асинхронные подключения

6.1 Асинхронный паттерн для разных БД

import asyncio
from typing import Dict, Any
from dataclasses import dataclass

@dataclass
class DatabaseConfig:
    """Конфигурация базы данных"""
    db_type: str
    host: str
    port: int
    database: str
    username: str = None
    password: str = None
    pool_size: int = 5
    ssl: bool = False

class AsyncDatabaseManager:
    """Универсальный асинхронный менеджер баз данных"""
    
    _drivers = {
        'postgresql': 'asyncpg',
        'mysql': 'aiomysql',
        'mongodb': 'motor',
        'redis': 'aioredis',
        'sqlite': 'aiosqlite'
    }
    
    def __init__(self, config: DatabaseConfig):
        self.config = config
        self.connection = None
        self.pool = None
    
    async def connect(self):
        """Подключение к базе данных"""
        db_type = self.config.db_type
        
        if db_type == 'postgresql':
            await self._connect_postgresql()
        elif db_type == 'mysql':
            await self._connect_mysql()
        elif db_type == 'mongodb':
            await self._connect_mongodb()
        elif db_type == 'redis':
            await self._connect_redis()
        elif db_type == 'sqlite':
            await self._connect_sqlite()
        else:
            raise ValueError(f"Неизвестный тип БД: {db_type}")
    
    async def _connect_postgresql(self):
        """Подключение к PostgreSQL"""
        import asyncpg
        
        self.pool = await asyncpg.create_pool(
            host=self.config.host,
            port=self.config.port,
            user=self.config.username,
            password=self.config.password,
            database=self.config.database,
            min_size=1,
            max_size=self.config.pool_size,
            ssl='require' if self.config.ssl else None
        )
        
        # Регистрация типов
        await self._register_postgresql_types()
    
    async def _register_postgresql_types(self):
        """Регистрация пользовательских типов PostgreSQL"""
        import asyncpg
        
        async def encode_jsonb(value):
            return json.dumps(value)
        
        async def decode_jsonb(value):
            return json.loads(value)
        
        await self.pool.set_type_codec(
            'jsonb',
            schema='pg_catalog',
            encoder=encode_jsonb,
            decoder=decode_jsonb,
            format='text'
        )
    
    async def _connect_mysql(self):
        """Подключение к MySQL"""
        import aiomysql
        
        self.pool = await aiomysql.create_pool(
            host=self.config.host,
            port=self.config.port,
            user=self.config.username,
            password=self.config.password,
            db=self.config.database,
            minsize=1,
            maxsize=self.config.pool_size,
            autocommit=False
        )
    
    async def _connect_mongodb(self):
        """Подключение к MongoDB"""
        from motor.motor_asyncio import AsyncIOMotorClient
        
        if self.config.username and self.config.password:
            uri = f"mongodb://{self.config.username}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}"
        else:
            uri = f"mongodb://{self.config.host}:{self.config.port}/{self.config.database}"
        
        self.client = AsyncIOMotorClient(uri)
        self.connection = self.client[self.config.database]
    
    async def execute_query(self, query: str, params: tuple = None):
        """Выполнение SQL запроса"""
        if self.config.db_type in ['postgresql', 'mysql']:
            async with self.pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(query, params or ())
                    if cur.description:  # SELECT запрос
                        result = await cur.fetchall()
                        columns = [desc[0] for desc in cur.description]
                        return [dict(zip(columns, row)) for row in result]
                    return cur.rowcount
        else:
            raise NotImplementedError("Этот метод только для SQL БД")
    
    async def execute_transaction(self, queries: list):
        """Выполнение транзакции"""
        if self.config.db_type in ['postgresql', 'mysql']:
            async with self.pool.acquire() as conn:
                async with conn.cursor() as cur:
                    try:
                        await conn.begin()
                        for query, params in queries:
                            await cur.execute(query, params or ())
                        await conn.commit()
                        return True
                    except Exception as e:
                        await conn.rollback()
                        raise e
    
    async def close(self):
        """Закрытие соединений"""
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()
        elif self.connection:
            self.connection.close()

7. Паттерны и лучшие практики

7.1 Repository Pattern

from abc import ABC, abstractmethod
from typing import List, Optional, TypeVar, Generic
from contextlib import contextmanager

T = TypeVar('T')

class Repository(ABC, Generic[T]):
    """Абстрактный репозиторий"""
    
    @abstractmethod
    async def get(self, id: int) -> Optional[T]:
        pass
    
    @abstractmethod
    async def get_all(self, skip: int = 0, limit: int = 100) -> List[T]:
        pass
    
    @abstractmethod
    async def create(self, entity: T) -> T:
        pass
    
    @abstractmethod
    async def update(self, id: int, entity: T) -> Optional[T]:
        pass
    
    @abstractmethod
    async def delete(self, id: int) -> bool:
        pass

class UserRepository(Repository):
    """Репозиторий пользователей"""
    
    def __init__(self, db_manager):
        self.db = db_manager
    
    async def get_by_email(self, email: str):
        """Получение пользователя по email"""
        query = "SELECT * FROM users WHERE email = $1"
        return await self.db.execute_query(query, (email,))
    
    async def search(self, search_term: str):
        """Поиск пользователей"""
        query = """
        SELECT * FROM users 
        WHERE username ILIKE $1 
           OR email ILIKE $1 
        LIMIT 50
        """
        return await self.db.execute_query(query, (f"%{search_term}%",))

7.2 Unit of Work Pattern

from abc import ABC, abstractmethod

class UnitOfWork(ABC):
    """Паттерн Unit of Work"""
    
    async def __aenter__(self):
        await self.begin()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            await self.rollback()
        else:
            await self.commit()
    
    @abstractmethod
    async def begin(self):
        pass
    
    @abstractmethod
    async def commit(self):
        pass
    
    @abstractmethod
    async def rollback(self):
        pass
    
    @property
    @abstractmethod
    def users(self):
        pass
    
    @property
    @abstractmethod
    def products(self):
        pass

class SQLAlchemyUnitOfWork(UnitOfWork):
    """Реализация Unit of Work для SQLAlchemy"""
    
    def __init__(self, session_factory):
        self.session_factory = session_factory
    
    async def begin(self):
        self.session = self.session_factory()
    
    async def commit(self):
        await self.session.commit()
    
    async def rollback(self):
        await self.session.rollback()
    
    @property
    def users(self):
        return UserRepository(self.session)
    
    @property
    def products(self):
        return ProductRepository(self.session)

7.3 Connection Pooling

import queue
import threading
import contextlib
from typing import Callable

class ConnectionPool:
    """Пул соединений"""
    
    def __init__(
        self,
        create_connection: Callable,
        max_size: int = 10,
        timeout: int = 5
    ):
        self.create_connection = create_connection
        self.max_size = max_size
        self.timeout = timeout
        self._pool = queue.Queue(maxsize=max_size)
        self._lock = threading.Lock()
        self._current_size = 0
        
        # Предварительное создание соединений
        for _ in range(min(3, max_size)):
            self._create_connection()
    
    def _create_connection(self):
        """Создание нового соединения"""
        with self._lock:
            if self._current_size < self.max_size:
                conn = self.create_connection()
                self._pool.put(conn)
                self._current_size += 1
                return True
        return False
    
    @contextlib.contextmanager
    def get_connection(self):
        """Получение соединения из пула"""
        conn = None
        try:
            # Попытка получить соединение из пула
            try:
                conn = self._pool.get(timeout=self.timeout)
            except queue.Empty:
                # Попытка создать новое соединение
                if not self._create_connection():
                    raise TimeoutError("Достигнут лимит соединений")
                conn = self._pool.get(timeout=self.timeout)
            
            yield conn
        finally:
            if conn:
                self._pool.put(conn)
    
    def close_all(self):
        """Закрытие всех соединений"""
        while not self._pool.empty():
            try:
                conn = self._pool.get_nowait()
                conn.close()
            except queue.Empty:
                break

7.4 Health Check и мониторинг

import time
from typing import Dict, Any
from dataclasses import dataclass
from enum import Enum

class HealthStatus(Enum):
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"

@dataclass
class HealthCheckResult:
    status: HealthStatus
    message: str = ""
    response_time: float = 0.0
    details: Dict[str, Any] = None

class DatabaseHealthChecker:
    """Проверка здоровья базы данных"""
    
    def __init__(self, db_manager, check_interval: int = 30):
        self.db_manager = db_manager
        self.check_interval = check_interval
        self.last_check = 0
        self.last_result = None
    
    async def check_health(self) -> HealthCheckResult:
        """Проверка здоровья"""
        start_time = time.time()
        
        try:
            # Простой запрос для проверки
            if hasattr(self.db_manager, 'execute_query'):
                await self.db_manager.execute_query("SELECT 1")
                status = HealthStatus.HEALTHY
                message = "База данных доступна"
            else:
                # Для NoSQL БД
                await self.db_manager.client.admin.command('ping')
                status = HealthStatus.HEALTHY
                message = "База данных доступна"
        
        except Exception as e:
            status = HealthStatus.UNHEALTHY
            message = f"Ошибка подключения: {str(e)}"
        
        response_time = time.time() - start_time
        
        result = HealthCheckResult(
            status=status,
            message=message,
            response_time=response_time,
            details={
                'database': self.db_manager.__class__.__name__,
                'timestamp': time.time()
            }
        )
        
        self.last_check = time.time()
        self.last_result = result
        
        return result
    
    def should_check(self) -> bool:
        """Нужно ли выполнять проверку"""
        return (time.time() - self.last_check) > self.check_interval

8. Пример приложения

8.1 Конфигурация

# config.py
import os
from typing import Dict, Any
from dataclasses import dataclass
from enum import Enum

class DatabaseType(Enum):
    POSTGRESQL = "postgresql"
    MYSQL = "mysql"
    SQLITE = "sqlite"
    MONGODB = "mongodb"
    REDIS = "redis"

@dataclass
class DatabaseConfig:
    """Конфигурация базы данных"""
    type: DatabaseType
    host: str
    port: int
    database: str
    username: str = None
    password: str = None
    pool_size: int = 5
    ssl: bool = False
    
    @classmethod
    def from_env(cls, prefix: str = "DB_"):
        """Загрузка конфигурации из переменных окружения"""
        return cls(
            type=DatabaseType(os.getenv(f"{prefix}TYPE", "sqlite")),
            host=os.getenv(f"{prefix}HOST", "localhost"),
            port=int(os.getenv(f"{prefix}PORT", "5432")),
            database=os.getenv(f"{prefix}NAME", "app_db"),
            username=os.getenv(f"{prefix}USER"),
            password=os.getenv(f"{prefix}PASSWORD"),
            pool_size=int(os.getenv(f"{prefix}POOL_SIZE", "5")),
            ssl=os.getenv(f"{prefix}SSL", "false").lower() == "true"
        )
    
    def get_connection_string(self) -> str:
        """Получение строки подключения"""
        if self.type == DatabaseType.POSTGRESQL:
            auth = f"{self.username}:{self.password}@" if self.username else ""
            ssl = "?sslmode=require" if self.ssl else ""
            return f"postgresql://{auth}{self.host}:{self.port}/{self.database}{ssl}"
        elif self.type == DatabaseType.MYSQL:
            auth = f"{self.username}:{self.password}@" if self.username else ""
            return f"mysql://{auth}{self.host}:{self.port}/{self.database}"
        elif self.type == DatabaseType.SQLITE:
            return f"sqlite:///{self.database}.db"
        else:
            raise ValueError(f"Неизвестный тип БД: {self.type}")

class AppConfig:
    """Конфигурация приложения"""
    
    def __init__(self):
        self.database = DatabaseConfig.from_env()
        self.redis = RedisConfig.from_env()
        self.debug = os.getenv("DEBUG", "false").lower() == "true"
        self.secret_key = os.getenv("SECRET_KEY", "secret")
    
    @property
    def is_production(self) -> bool:
        return os.getenv("ENVIRONMENT", "development") == "production"

8.2 Фабрика БД

# factories.py
from typing import Union
from config import DatabaseConfig, DatabaseType
from managers import (
    PostgreSQLManager,
    MySQLManager,
    SQLiteManager,
    MongoDBManager,
    RedisManager,
    AsyncDatabaseManager
)

class DatabaseFactory:
    """Фабрика для создания менеджеров БД"""
    
    @staticmethod
    def create_manager(config: DatabaseConfig, async_mode: bool = False):
        """Создание менеджера базы данных"""
        
        if async_mode:
            return AsyncDatabaseManager(config)
        
        if config.type == DatabaseType.POSTGRESQL:
            return PostgreSQLManager(
                host=config.host,
                port=config.port,
                user=config.username,
                password=config.password,
                database=config.database,
                pool_size=config.pool_size,
                ssl=config.ssl
            )
        elif config.type == DatabaseType.MYSQL:
            return MySQLManager(
                host=config.host,
                port=config.port,
                user=config.username,
                password=config.password,
                database=config.database
            )
        elif config.type == DatabaseType.SQLITE:
            return SQLiteManager(config.database)
        elif config.type == DatabaseType.MONGODB:
            return MongoDBManager(
                host=config.host,
                port=config.port,
                username=config.username,
                password=config.password,
                database=config.database
            )
        elif config.type == DatabaseType.REDIS:
            return RedisManager(
                host=config.host,
                port=config.port,
                db=0,
                password=config.password
            )
        else:
            raise ValueError(f"Неизвестный тип БД: {config.type}")

8.3 Основное приложение

# app.py
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict, Any

from config import AppConfig
from factories import DatabaseFactory
from health import DatabaseHealthChecker
from repositories import UserRepository, ProductRepository
from models import User, Product

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Application:
    """Основной класс приложения"""
    
    def __init__(self):
        self.config = AppConfig()
        self.db_manager = None
        self.redis_manager = None
        self.health_checker = None
        self.repositories = {}
    
    async def startup(self):
        """Запуск приложения"""
        logger.info("Запуск приложения...")
        
        # Инициализация основной БД
        self.db_manager = DatabaseFactory.create_manager(
            self.config.database,
            async_mode=True
        )
        await self.db_manager.connect()
        
        # Инициализация Redis для кэширования
        if self.config.redis:
            self.redis_manager = DatabaseFactory.create_manager(
                self.config.redis,
                async_mode=False
            )
            self.redis_manager.connect()
        
        # Инициализация репозиториев
        self.repositories['users'] = UserRepository(self.db_manager)
        self.repositories['products'] = ProductRepository(self.db_manager)
        
        # Инициализация health checker
        self.health_checker = DatabaseHealthChecker(
            self.db_manager,
            check_interval=30
        )
        
        # Проверка здоровья БД
        health = await self.health_checker.check_health()
        logger.info(f"Состояние БД: {health.status.value} - {health.message}")
        
        logger.info("Приложение запущено")
    
    async def shutdown(self):
        """Завершение работы приложения"""
        logger.info("Завершение работы приложения...")
        
        if self.db_manager:
            await self.db_manager.close()
        
        if self.redis_manager:
            self.redis_manager.close()
        
        logger.info("Приложение завершило работу")
    
    @asynccontextmanager
    async def lifespan(self):
        """Контекстный менеджер жизненного цикла"""
        await self.startup()
        try:
            yield self
        finally:
            await self.shutdown()
    
    async def create_user(self, user_data: Dict[str, Any]) -> User:
        """Создание пользователя"""
        # Проверка существования пользователя
        existing = await self.repositories['users'].get_by_email(user_data['email'])
        if existing:
            raise ValueError("Пользователь с таким email уже существует")
        
        # Создание пользователя
        user = await self.repositories['users'].create(user_data)
        
        # Кэширование в Redis
        if self.redis_manager:
            await self.redis_manager.set_value(
                f"user:{user.id}",
                user.to_dict(),
                expire=3600
            )
        
        logger.info(f"Создан пользователь: {user.username}")
        return user
    
    async def get_user(self, user_id: int) -> User:
        """Получение пользователя"""
        # Попытка получить из кэша
        if self.redis_manager:
            cached = await self.redis_manager.get_value(f"user:{user_id}")
            if cached:
                logger.info(f"Пользователь {user_id} получен из кэша")
                return User(**cached)
        
        # Получение из БД
        user = await self.repositories['users'].get(user_id)
        
        # Сохранение в кэш
        if user and self.redis_manager:
            await self.redis_manager.set_value(
                f"user:{user.id}",
                user.to_dict(),
                expire=3600
            )
        
        return user
    
    async def run_background_tasks(self):
        """Фоновые задачи"""
        while True:
            try:
                # Проверка здоровья
                if self.health_checker.should_check():
                    health = await self.health_checker.check_health()
                    if health.status != HealthStatus.HEALTHY:
                        logger.warning(f"Проблема с БД: {health.message}")
                
                # Очистка устаревших данных
                await self.cleanup_old_data()
                
                await asyncio.sleep(60)  # Пауза 60 секунд
                
            except Exception as e:
                logger.error(f"Ошибка в фоновой задаче: {e}")
                await asyncio.sleep(300)  # Пауза 5 минут при ошибке
    
    async def cleanup_old_data(self):
        """Очистка устаревших данных"""
        # Пример: удаление неактивных пользователей старше 30 дней
        cutoff_date = datetime.utcnow() - timedelta(days=30)
        
        await self.repositories['users'].delete_inactive_before(cutoff_date)
        
        logger.info("Очистка устаревших данных завершена")

# Запуск приложения
async def main():
    """Основная функция"""
    app = Application()
    
    async with app.lifespan():
        # Пример использования
        user = await app.create_user({
            'username': 'test_user',
            'email': 'test@example.com',
            'full_name': 'Test User'
        })
        
        # Получение пользователя
        retrieved_user = await app.get_user(user.id)
        print(f"Получен пользователь: {retrieved_user}")
        
        # Запуск фоновых задач
        background_task = asyncio.create_task(app.run_background_tasks())
        
        try:
            # Основной цикл приложения
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print("\nЗавершение работы...")
        finally:
            background_task.cancel()

if __name__ == "__main__":
    asyncio.run(main())

8.4 Docker Compose для тестирования

# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: app_db
      POSTGRES_USER: app_user
      POSTGRES_PASSWORD: app_password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U app_user"]
      interval: 10s
      timeout: 5s
      retries: 5

  mysql:
    image: mysql:8
    environment:
      MYSQL_DATABASE: app_db
      MYSQL_USER: app_user
      MYSQL_PASSWORD: app_password
      MYSQL_ROOT_PASSWORD: root_password
    ports:
      - "3306:3306"
    volumes:
      - mysql_data:/var/lib/mysql

  mongodb:
    image: mongo:6
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin_password
    ports:
      - "27017:27017"
    volumes:
      - mongo_data:/data/db

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  elasticsearch:
    image: elasticsearch:8.10.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data

volumes:
  postgres_data:
  mysql_data:
  mongo_data:
  redis_data:
  elasticsearch_data:

Заключение

Это пособие охватывает основные аспекты работы с различными базами данных в Python:

  1. Прямые подключения для полного контроля
  2. SQLAlchemy ORM для удобной работы с SQL базами
  3. Асинхронные драйверы для высоконагруженных приложений
  4. Различные типы БД: SQLite, MySQL, PostgreSQL, MongoDB, Redis
  5. Паттерны проектирования для структурирования кода
  6. Лучшие практики по управлению соединениями и ошибками

Рекомендации по выбору подхода:

  1. Для простых проектов: SQLite с прямым подключением
  2. Для веб-приложений: PostgreSQL + SQLAlchemy ORM
  3. Для высоконагруженных систем: асинхронные драйверы + пулы соединений
  4. Для кэширования: Redis
  5. Для документ-ориентированных данных: MongoDB
  6. Для поиска: Elasticsearch

Ключевые принципы:

  1. Всегда используйте контекстные менеджеры для соединений
  2. Реализуйте пулы соединений для production
  3. Добавляйте обработку ошибок и retry логику
  4. Используйте миграции для управления схемой БД
  5. Реализуйте health checks для мониторинга
  6. Кэшируйте часто запрашиваемые данные
  7. Пишите тесты для работы с БД

Это пособие дает прочную основу для работы с любыми базами данных в Python, от простых скриптов до сложных распределенных систем.