← Назад к курсу

System Design для собеседований

System Design для собеседований

Язык: Python
Тематика: Искусственный интеллект, веб‑приложения, CRM


1. Обычные уточняющие вопросы

Категория Пример вопроса Зачем
Трафик Какой объём запросов (QPS/RPS) ожидается в пиковый момент? Планировать размер пула, кэширование, масштабирование
Латентность Допустимый тайм‑аут для пользовательского взаимодействия? Выбирать синхрон/асинхронный путь, CDN, кэш
Данные Объём хранимых данных (TB/GB) и их рост? Выбор базы, шардинг, архитектуры хранения
Состояние Какой уровень согласованности нужен (strong / eventual)? Выбирать транзакционные DB vs eventual‑consistency
Авторизация Какой тип аутентификации (OAuth2, JWT, SAML) используется? Выбирать middleware, инфраструктуру
Безопасность Как реализуется DDoS‑защита, CSRF, XSS? Nginx, WAF, CSP, rate‑limit
Нагрузка Количество пользователей одновременно (активные/пассивные)? Выбор кластеров, репликации
Надежность Допустимое время простоя, SLA? Redundancy, backup, circuit‑breaker
Распределение Географическое покрытие (локации, Z‑lat) CDN, edge‑nodes, regional DB
Интеграции Требуется внешняя интеграция (API, CRM, payment)? Очереди, сервис‑медиаторы, SDKs

2. Кейсы — примеры систем и стек технологий

Каждый кейс включает: описание задачи, типичные уточняющие вопросы, рекомендованный стек (с акцентом на ключевые компоненты), небольшой рабочий пример кода (Python).

1️⃣ AI‑powered рекомендательный движок (e‑commerce)

Типичный запрос: “Система, которая показывает персонализированные товары в rеaltime‑рекомендации”.

Уточняющие вопросы
Ожидаемый QPS (рекомендации) – 10k‑100k / сек?
Как часто обновлять данные (rеaltime vs batch)?
Какова чувствительность к latency (≤ 50 ms)?
Каков объём данных (миллионы пользователей, млрд товаров)?
Требуется explainable‑relevance?

Стек

  • Backend: FastAPI + asyncio, Docker/K8s
  • AI: PyTorch / HuggingFace Transformers, FAISS для векторного поиска
  • Хранилище: PostgreSQL (текстовые метаданные), Redis (кэш промежуточных результатов)
  • Очередь: Kafka (events “new item”, “user action”) → Celery (offline batch)
  • Микросервисы: Recommendation‑service, Vector‑indexer, Metadata‑service

Ключевые технологии
FAISS (index‑embedding), ChromaDB (Vector DB), Redis LRU cache, FastAPI async, Kafka streams.

# FastAPI‑endpoint для получения top‑k рекомендаций
from fastapi import FastAPI
import chromadb
import uvicorn
import os

app = FastAPI()

db_path = os.getenv("CHROMA_PATH", "/data/chroma")
client = chromadb.PersistentClient(path=db_path)

@app.get("/recommend")
async def recommend(user_id: str, top_k: int = 5):
    # retrieve user embedding (could be stored in Redis)
    user_vec = await get_user_vector(user_id)          # async wrapper
    results = client.query(query_embeddings=[user_vec], 
                          collection="items", 
                          n_results=top_k,
                          include=["documents", "distances"])
    return [{"id": doc, "score": score} for doc, score in zip(results["documents"], results["distances"])]

2️⃣ LLM‑чатабот (real‑time)

Типичный запрос: “Бот, который отвечает на пользовательские запросы с помощью LLM”.

Уточняющие вопросы

  • Требуемый latency ≤ 200 ms?
  • Как обрабатывать N‑параллельный inference?
  • Политика безопасности (фильтрация, jailbreak‑protection)?
  • Сколько одновременных сессий?

Стек

  • API: FastAPI (ASGI) + uvicorn (workers)
  • LLM: HuggingFace Inference API / vLLM (GPU‑inferencing)
  • Сессия: Redis (TTL = session‑timeout)
  • Сообщение: Kafka (streaming) → Consumer → LLM
  • Безопасность: Moderation pipeline (OpenAI moderation endpoint)

Ключевые
vLLM (GPU‑throughput), Redis (state), async FastAPI, Kafka.

import redis
import requests
import uvicorn
from fastapi import FastAPI

app = FastAPI()
r = redis.StrictRedis(host="redis", port=6379, decode_responses=True)

@app.post("/chat")
async def chat(user_id: str, message: str):
    history = r.get(f"chat:{user_id}") or ""
    response = await async_infra_call(
        model="openai/gpt-4o-mini",
        prompt=message + "\nAssistant:" + history
    )
    # optional moderation
    if response.get("content") and any(w for w in response["content"].lower() if w in {"danger", "illegal"} ):
        raise ValueError("Sensitive content detected")
    # store history
    r.setex(f"chat:{user_id}", timeout=300, value=history + "\nUser:" + message)
    return {"reply": response["content"]}

3️⃣ Электронная почта‑система (SMTP)

Типичный запрос: “Отправка и хранение писем, с поддержкой SMTP/IMAP”.

Уточняющие вопросы

  • Максимальный объём писем (TB/сек) → требуется sharding?
  • Требуется поддержка категорий (spam‑filter, priority)?
  • SLA доставки (как быстро “delivered‑to‑inbox”)?

Стек

  • API: Celery (asynchronous) + Django/Flask (REST)
  • SMTP: smtplib + aio‑smtplib (async)
  • Message Queue: RabbitMQ / Kafka (для задач отправки)
  • Хранение: PostgreSQL + file storage (S3) for attachments
  • Спам‑фильтр: SpamAssassin API, ML‑модель (scikit‑learn)

Ключевые
Celery (worker pool), async SMTP, RabbitMQ/Kafka, PostgreSQL.

# Celery task for sending email
from celery import shared_task
from django.core.mail import EmailMessage
import smtplib
import aio_smtplib
import asyncio
import json

@shared_task
def send_email_async(to, subject, body, attachment_path=None):
    async def _send():
        async with aio_smtplib.SMTP("smtp.yandex.ru") as client:
            await client.login("service@yandex.ru", "password")
            await client.send_message(EmailMessage(
                from_addr="service@yandex.ru",
                to=to,
                subject=subject,
                body=body,
                attachments=[attachment_path]
            ))
    asyncio.run(_send())

4️⃣ Много‑канальная система уведомлений (push / SMS / email)

Типичный запрос: “Уведомлять пользователей сразу после события (order, comment)”.

Уточняющие вопросы

  • Какие каналы поддерживать (Firebase, Twilio, Email)?
  • Rate‑limit на каждый канал (max msg/min).
  • Какова SLA по deliverability (≤ 30 s)?

Стек

  • API: FastAPI + async workers (Celery + Redis)
  • Push: Firebase Cloud Messaging SDK
  • SMS: Twilio REST API
  • Email: Django Email backend (asynchronous)
  • Rate‑limit: Redis LRU + Lua script
  • Отказоустойчивость: fallback‑queue, dead‑letter (DLQ)

Ключевые
Twilio (SMS), Firebase (push), Celery (workers), Redis (rate‑limit).

# Example: unified notification dispatcher
import redis
import requests
from celery import shared_task

notifications = redis.StrictRedis()
push_endpoint = "https://fcm.googleapis.com/fcm/send"
twilio_sid = "..."
twilio_token = "..."

@shared_task
def dispatch_notification(user_id, payload):
    # choose channel based on preferences stored in Redis
    pref = notifications.hgetall(f"pref:{user_id}")
    if "push" in pref:
        send_push(payload)
    if "sms" in pref:
        send_sms(payload)
    if "email" in pref:
        send_email(payload)

def send_push(data):
    r = requests.post(push_endpoint, json=data, auth=(twilio_sid, twilio_token))
    # handle response

def send_sms(data):
    r = requests.post("https://api.twilio.com/...", data=data, auth=(twilio_sid, twilio_token))

def send_email(data):
    EmailMessage.from_email(...).send()

5️⃣ Асинхронный конвейер обработки изображений с AI‑моделью

Типичный запрос: “Система, где пользователи загружают фото, а сервис автоматически распознаёт их (фейс, OCR)”.

Уточняющие вопросы

  • Ожидаемый throughput (images / sec).
  • GPU‑ресурсы (GPU‑workers, autoscaling).
  • Требуется предобработка (resizing) vs inference.

Стек

  • Данные: MinIO/S3 (blob storage)
  • Очереди: Kafka (raw‑image → processing)
  • Worker: Docker + GPU (Docker‑GPU‑runtime) + Dask for parallel jobs
  • ML‑pipeline: PyTorch + torchvision + OpenCV (pre‑processing)
  • Хранилище результатов: PostgreSQL (metadata) + S3 (raw outputs)

Ключевые
Docker‑GPU, Dask, PyTorch, Kafka.

# Dask worker task (simplified)
import dask
import torch
import cv2

def process_image(image_bytes, model_path):
    img = cv2.imdecode(image_bytes, cv2.IMREAD_COLOR)
    model = torch.load(model_path).eval()
    with torch.no_grad():
        out = model(img)                 # e.g., face‑detection
    return {"bbox": out.tolist(), "confidence": out.max().item()}

6️⃣ CRM‑платформа для управления продажами и автоматизаций

Типичный запрос: “CRM с задачами, leads, отчётами, интеграцией с внешними сервисами (Mailchimp, Stripe)”.

Уточняющие вопросы

  • Сколько пользователей одновременно? (< 10 k)
  • Какие типы лидов (web, телефон, событие) требуют богатый UI?
  • Требуется поддержка webhook‑integration?

Стек

  • Backend: Django + Django‑Channels (WebSocket) / FastAPI
  • Data: PostgreSQL (relational) + Redis (session cache)
  • Scheduler: Celery + Beat (periodic tasks)
  • Integration: Stripe SDK, Mailchimp API, Zapier
  • Search: ElasticSearch (full‑text, filters)
  • UI: React + TailwindCSS (SPAs)

Ключевые
Celery (scheduling), Django ORM, Stripe, ElasticSearch.

# Django model for Lead
class Lead(models.Model):
    name = models.CharField(max_length=255)
    email = models.EmailField()
    source = models.CharField(max_length=50, choices=[('web','Web'), ('phone','Phone'), ('event','Event')])
    created_at = models.DateTimeField(auto_now_add=True)

# Periodic task to sync leads with external system
@shared_task
def sync_leads_to_external():
    leads = Lead.objects.filter(created_at__gte=now() - timezone.timedelta(hours=24))
    for lead in leads:
        # pseudo‑code for Mailchimp import
        external_api.update(lead.email, lead.name, lead.source)

7️⃣ Система логирования и аналитики (ELK‑подобная)

Типичный запрос: “Сбор, парсинг, поиск и визуализация огромных объёмов логов”.

Уточняющие вопросы

  • QPS логов (операций / сек) – 1M / min?
  • Насколько длительное хранение (30 дн, 1 год)?
  • Требуется агрегировать в реальном времени?

Стек

  • Концентратор: Kafka (high‑throughput)
  • Парсер: Python (Apache Beam, Flink) → Elasticsearch
  • Хранилище: Elasticsearch (index) + Kibana (dashboards)
  • Длительность: S3 (cold storage) + Glacier for archival
  • Monitoring: Prometheus + Grafana

Ключевые
Kafka, Apache Beam, Elasticsearch.

# Beam pipeline example (Google Cloud Dataflow / Apache Beam)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class LogParserFn(beam.DoFn):
    def process(self, element):
        # element = raw JSON string
        parsed = json.loads(element)
        yield {
            "timestamp": parsed["ts"],
            "host": parsed["host"],
            "level": parsed["level"],
            "msg": parsed["msg"]
        }

with beam.Pipeline(options=PipelineOptions(...)) as p:
    (p
     | "ReadKafka" >> beam.io.ReadFromKafka(topic='logs')
     | "Parse" >> beam.ParDo(LogParserFn())
     | "WriteES" >> beam.io.WriteToElasticsearch(
          elasticsearch_config="cluster:localhost",
          doc_type="log",
          index="logs"
      ))

8️⃣ Поиск‑движок для каталога товаров (текст + векторный)

Типичный запрос: “Пользователи ищут товары по тексту и по изображениям”.

Уточняющие вопросы

  • Какой уровень relevance? (top‑10 % vs top‑3 %).
  • Насколько быстро индексация новых товаров (real‑time vs batch).
  • Потребность в faceted search (price, brand).

Стек

  • Текстовый поиск: Elasticsearch (BM25)
  • Векторный: ChromaDB / FAISS (embedding‑based)
  • База данных: PostgreSQL (товары)
  • API: FastAPI (combined results)
  • Индексация: async jobs (Celery) + S3 (storing media)

Ключевые
Elasticsearch (text), ChromaDB (vectors), FAISS (fast search), FastAPI.

# Combined query function
def search_products(query, top_k=10):
    # 1. BM25 from ES
    es_hits = es.search(index="products", body={"query": {"multi_match": {"query": query, "fields": ["name", "desc"]}}})
    # 2. Vector search from Chroma
    chroma_hits = chromadb.query(query_embeddings=[embedding], n_results=top_k)
    # merge scores (simple sum)
    result = []
    for doc_id in es_hits['hits']['hits']:
        es_score = doc_id['_score']
        if doc_id['_id'] in chroma_hits['documents']:
            vec_score = chroma_hits['distances'][chroma_hits['documents'].index(doc_id['_id'])]
        else:
            vec_score = 0
        result.append({'id': doc_id['_id'], 'score': es_score + vec_score})
    # sort and return
    return sorted(result, key=lambda x: x['score'], reverse=True)[:top_k]

9️⃣ Очередь задач для обучения AI‑моделей (GPU‑планирование)

Типичный запрос: “Система, где пользователи отправляют задачи обучения (data‑prep, model‑train)”.

Уточняющие вопросы

  • Требуемый SLA на обучение (minutes‑hours).
  • Как планировать GPU‑ресурсы (fair‑share, priority).
  • Какие типы задач (batch, streaming).

Стек

  • Queue: Celery + Redis (or RabbitMQ)
  • Executor: Kubernetes (GPU‑node pools)
  • Job‑tracker: Prefect / Airflow (DAG‑based)
  • Модель: PyTorch Lightning, TensorFlow + Horovod for distributed
  • Мониторинг: TensorBoard, Prometheus (GPU metrics)

Ключевые
Celery, Kubernetes GPU, Prefect, PyTorch Lightning.

# Celery task that runs on a GPU‑enabled worker
@shared_task(bind=True, max_retries=3)
def train_model(self, task_id, dataset_path):
    try:
        import torch
        model = torch.load("best.pth").to("cuda")
        train_loader = torch.utils.data.DataLoader(
            torch.load(dataset_path),
            batch_size=32, shuffle=True
        )
        # dummy training loop
        for epoch in range(1):
            for batch in train_loader:
                x, y = batch
                x, y = x.to("cuda"), y.to("cuda")
                loss = model(x, y)
                loss.backward()
                # update optimizer...
                # ...
        # upload checkpoint
        torch.save(model.state_dict(), f"/data/models/{task_id}.pth")
    except Exception as exc:
        self.retry(exc=exc)

🔟 API‑шлюз для микросервисной архитектуры

Типичный запрос: “Сегментировать доступ к сервисам, добавить аутентификацию, rate‑limit”.

Уточняющие вопросы

  • Требуется JWT‑validation?
  • Какие уровни шлюзов (edge, internal)?
  • Как реализовать fallback‑routing (circuit‑breaker).

Стек

  • Шлюз: FastAPI + ASGI (Uvicorn)
  • Auth: OAuth2‑server (Authlib) + JWT (PyJWT)
  • Rate‑limit: Redis (token‑bucket) + fastapi‑limiter
  • Routing: Proxy‑router, HTTPS termination (NGINX)
  • Observability: OpenTelemetry + Prometheus

Ключевые
FastAPI (gateway), Authlib, PyJWT, Redis token‑bucket, NGINX.

# FastAPI gateway example
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer
from fastapi_limiter import Limiter
import redis

app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"])

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
limiter = Limiter(key_func=lambda r: r.client.host)

@app.post("/auth/token")
async def get_token(username: str, password: str):
    # dummy check
    if username != "admin" or password != "secret":
        raise HTTPException(401, "Invalid credentials")
    return {"access_token": "jwt-token"}

@app.get("/protected")
@limiter.limit("10/minute")
async def protected_api(token: str = Depends(oauth2_scheme)):
    # verify JWT
    if not verify_jwt(token):
        raise HTTPException(401)
    # forward request to downstream service (example: httpbin.org)
    async with httpx.AsyncClient() as client:
        r = await client.get("http://internal-service/api/v1/data")
    return r.json()

3. Как использовать это пособие на собеседовании

  1. Задайте все уточняющие вопросы – это покажет системное мышление.
  2. Выберите стек – адаптируйте к задаче: для Python‑AI‑проектов лучшими являются FastAPI + async + Vector DB + Docker/K8s.
  3. Подготовьте мини‑прототипы – в ответе покажите 2‑3 кода (по 3‑5 строк) и объясните их.
  4. Обсудите trade‑offs – например, Redis vs. DB для кэша, FAISS vs. Elasticsearch для векторного поиска, synchronous vs. async processing.
  5. Говорите о мониторинге – Prometheus + Grafana, logging → Loki, alerting (Alertmanager).

4. Краткое резюме

Технология Почему важна для Python‑AI‑Web‑CRM Пример её место
FastAPI async, OpenAPI docs, легко для ML‑API любой кейс с API‑endpoint
FAISS / ChromaDB быстрый векторный поиск рекомендации, поиск изображений
Celery распределённые фоновые задачи (email, sync, model‑train) почти все кейсы
Redis кэш, сессии, rate‑limit, pub/sub почти все
Kafka потоковая обработка событий рекомендации, уведомления, логи
Docker / K8s repeatable environments, GPU‑workers AI‑конвейер, модель‑train
Elasticsearch full‑text + facets поиск каталога, CRM‑поиск
PostgreSQL реляционные данные (логины, CRM) основной DB
Twilio / Firebase внешние каналы уведомлений мульти‑канальные уведомления
OAuth2 / JWT аутентификация/авторизация любой шлюз, CRM
Prometheus / Grafana мониторинг, SLA, alerts любые сервисы

📚 Как дальше учиться

  • Бесплатные курсы: “Designing Data‑Intensive Applications” (Martin Kleppmann) – примените к Python‑примеру.
  • Статьи: “High‑Performance FAISS with Python” (HuggingFace Blog), “FastAPI vs Flask in Production”.
  • Практика: создайте 5‑минутный микросервис‑прототип (FastAPI + Docker + Redis) и разверните его на k3d/kind.