System Design для собеседований
System Design для собеседований
Язык: Python
Тематика: Искусственный интеллект, веб‑приложения, CRM
1. Обычные уточняющие вопросы
| Категория | Пример вопроса | Зачем |
|---|---|---|
| Трафик | Какой объём запросов (QPS/RPS) ожидается в пиковый момент? | Планировать размер пула, кэширование, масштабирование |
| Латентность | Допустимый тайм‑аут для пользовательского взаимодействия? | Выбирать синхрон/асинхронный путь, CDN, кэш |
| Данные | Объём хранимых данных (TB/GB) и их рост? | Выбор базы, шардинг, архитектуры хранения |
| Состояние | Какой уровень согласованности нужен (strong / eventual)? | Выбирать транзакционные DB vs eventual‑consistency |
| Авторизация | Какой тип аутентификации (OAuth2, JWT, SAML) используется? | Выбирать middleware, инфраструктуру |
| Безопасность | Как реализуется DDoS‑защита, CSRF, XSS? | Nginx, WAF, CSP, rate‑limit |
| Нагрузка | Количество пользователей одновременно (активные/пассивные)? | Выбор кластеров, репликации |
| Надежность | Допустимое время простоя, SLA? | Redundancy, backup, circuit‑breaker |
| Распределение | Географическое покрытие (локации, Z‑lat) | CDN, edge‑nodes, regional DB |
| Интеграции | Требуется внешняя интеграция (API, CRM, payment)? | Очереди, сервис‑медиаторы, SDKs |
2. Кейсы — примеры систем и стек технологий
Каждый кейс включает: описание задачи, типичные уточняющие вопросы, рекомендованный стек (с акцентом на ключевые компоненты), небольшой рабочий пример кода (Python).
1️⃣ AI‑powered рекомендательный движок (e‑commerce)
Типичный запрос: “Система, которая показывает персонализированные товары в rеaltime‑рекомендации”.
| Уточняющие вопросы |
|---|
| Ожидаемый QPS (рекомендации) – 10k‑100k / сек? |
| Как часто обновлять данные (rеaltime vs batch)? |
| Какова чувствительность к latency (≤ 50 ms)? |
| Каков объём данных (миллионы пользователей, млрд товаров)? |
| Требуется explainable‑relevance? |
Стек
- Backend: FastAPI + asyncio, Docker/K8s
- AI: PyTorch / HuggingFace Transformers, FAISS для векторного поиска
- Хранилище: PostgreSQL (текстовые метаданные), Redis (кэш промежуточных результатов)
- Очередь: Kafka (events “new item”, “user action”) → Celery (offline batch)
- Микросервисы: Recommendation‑service, Vector‑indexer, Metadata‑service
Ключевые технологии
FAISS (index‑embedding), ChromaDB (Vector DB), Redis LRU cache, FastAPI async, Kafka streams.
# FastAPI‑endpoint для получения top‑k рекомендаций
from fastapi import FastAPI
import chromadb
import uvicorn
import os
app = FastAPI()
db_path = os.getenv("CHROMA_PATH", "/data/chroma")
client = chromadb.PersistentClient(path=db_path)
@app.get("/recommend")
async def recommend(user_id: str, top_k: int = 5):
# retrieve user embedding (could be stored in Redis)
user_vec = await get_user_vector(user_id) # async wrapper
results = client.query(query_embeddings=[user_vec],
collection="items",
n_results=top_k,
include=["documents", "distances"])
return [{"id": doc, "score": score} for doc, score in zip(results["documents"], results["distances"])]
2️⃣ LLM‑чатабот (real‑time)
Типичный запрос: “Бот, который отвечает на пользовательские запросы с помощью LLM”.
Уточняющие вопросы
- Требуемый latency ≤ 200 ms?
- Как обрабатывать N‑параллельный inference?
- Политика безопасности (фильтрация, jailbreak‑protection)?
- Сколько одновременных сессий?
Стек
- API: FastAPI (ASGI) + uvicorn (workers)
- LLM: HuggingFace Inference API / vLLM (GPU‑inferencing)
- Сессия: Redis (TTL = session‑timeout)
- Сообщение: Kafka (streaming) → Consumer → LLM
- Безопасность: Moderation pipeline (OpenAI moderation endpoint)
Ключевые
vLLM (GPU‑throughput), Redis (state), async FastAPI, Kafka.
import redis
import requests
import uvicorn
from fastapi import FastAPI
app = FastAPI()
r = redis.StrictRedis(host="redis", port=6379, decode_responses=True)
@app.post("/chat")
async def chat(user_id: str, message: str):
history = r.get(f"chat:{user_id}") or ""
response = await async_infra_call(
model="openai/gpt-4o-mini",
prompt=message + "\nAssistant:" + history
)
# optional moderation
if response.get("content") and any(w for w in response["content"].lower() if w in {"danger", "illegal"} ):
raise ValueError("Sensitive content detected")
# store history
r.setex(f"chat:{user_id}", timeout=300, value=history + "\nUser:" + message)
return {"reply": response["content"]}
3️⃣ Электронная почта‑система (SMTP)
Типичный запрос: “Отправка и хранение писем, с поддержкой SMTP/IMAP”.
Уточняющие вопросы
- Максимальный объём писем (TB/сек) → требуется sharding?
- Требуется поддержка категорий (spam‑filter, priority)?
- SLA доставки (как быстро “delivered‑to‑inbox”)?
Стек
- API: Celery (asynchronous) + Django/Flask (REST)
- SMTP: smtplib + aio‑smtplib (async)
- Message Queue: RabbitMQ / Kafka (для задач отправки)
- Хранение: PostgreSQL + file storage (S3) for attachments
- Спам‑фильтр: SpamAssassin API, ML‑модель (scikit‑learn)
Ключевые
Celery (worker pool), async SMTP, RabbitMQ/Kafka, PostgreSQL.
# Celery task for sending email
from celery import shared_task
from django.core.mail import EmailMessage
import smtplib
import aio_smtplib
import asyncio
import json
@shared_task
def send_email_async(to, subject, body, attachment_path=None):
async def _send():
async with aio_smtplib.SMTP("smtp.yandex.ru") as client:
await client.login("service@yandex.ru", "password")
await client.send_message(EmailMessage(
from_addr="service@yandex.ru",
to=to,
subject=subject,
body=body,
attachments=[attachment_path]
))
asyncio.run(_send())
4️⃣ Много‑канальная система уведомлений (push / SMS / email)
Типичный запрос: “Уведомлять пользователей сразу после события (order, comment)”.
Уточняющие вопросы
- Какие каналы поддерживать (Firebase, Twilio, Email)?
- Rate‑limit на каждый канал (max msg/min).
- Какова SLA по deliverability (≤ 30 s)?
Стек
- API: FastAPI + async workers (Celery + Redis)
- Push: Firebase Cloud Messaging SDK
- SMS: Twilio REST API
- Email: Django Email backend (asynchronous)
- Rate‑limit: Redis LRU + Lua script
- Отказоустойчивость: fallback‑queue, dead‑letter (DLQ)
Ключевые
Twilio (SMS), Firebase (push), Celery (workers), Redis (rate‑limit).
# Example: unified notification dispatcher
import redis
import requests
from celery import shared_task
notifications = redis.StrictRedis()
push_endpoint = "https://fcm.googleapis.com/fcm/send"
twilio_sid = "..."
twilio_token = "..."
@shared_task
def dispatch_notification(user_id, payload):
# choose channel based on preferences stored in Redis
pref = notifications.hgetall(f"pref:{user_id}")
if "push" in pref:
send_push(payload)
if "sms" in pref:
send_sms(payload)
if "email" in pref:
send_email(payload)
def send_push(data):
r = requests.post(push_endpoint, json=data, auth=(twilio_sid, twilio_token))
# handle response
def send_sms(data):
r = requests.post("https://api.twilio.com/...", data=data, auth=(twilio_sid, twilio_token))
def send_email(data):
EmailMessage.from_email(...).send()
5️⃣ Асинхронный конвейер обработки изображений с AI‑моделью
Типичный запрос: “Система, где пользователи загружают фото, а сервис автоматически распознаёт их (фейс, OCR)”.
Уточняющие вопросы
- Ожидаемый throughput (images / sec).
- GPU‑ресурсы (GPU‑workers, autoscaling).
- Требуется предобработка (resizing) vs inference.
Стек
- Данные: MinIO/S3 (blob storage)
- Очереди: Kafka (raw‑image → processing)
- Worker: Docker + GPU (Docker‑GPU‑runtime) + Dask for parallel jobs
- ML‑pipeline: PyTorch + torchvision + OpenCV (pre‑processing)
- Хранилище результатов: PostgreSQL (metadata) + S3 (raw outputs)
Ключевые
Docker‑GPU, Dask, PyTorch, Kafka.
# Dask worker task (simplified)
import dask
import torch
import cv2
def process_image(image_bytes, model_path):
img = cv2.imdecode(image_bytes, cv2.IMREAD_COLOR)
model = torch.load(model_path).eval()
with torch.no_grad():
out = model(img) # e.g., face‑detection
return {"bbox": out.tolist(), "confidence": out.max().item()}
6️⃣ CRM‑платформа для управления продажами и автоматизаций
Типичный запрос: “CRM с задачами, leads, отчётами, интеграцией с внешними сервисами (Mailchimp, Stripe)”.
Уточняющие вопросы
- Сколько пользователей одновременно? (< 10 k)
- Какие типы лидов (web, телефон, событие) требуют богатый UI?
- Требуется поддержка webhook‑integration?
Стек
- Backend: Django + Django‑Channels (WebSocket) / FastAPI
- Data: PostgreSQL (relational) + Redis (session cache)
- Scheduler: Celery + Beat (periodic tasks)
- Integration: Stripe SDK, Mailchimp API, Zapier
- Search: ElasticSearch (full‑text, filters)
- UI: React + TailwindCSS (SPAs)
Ключевые
Celery (scheduling), Django ORM, Stripe, ElasticSearch.
# Django model for Lead
class Lead(models.Model):
name = models.CharField(max_length=255)
email = models.EmailField()
source = models.CharField(max_length=50, choices=[('web','Web'), ('phone','Phone'), ('event','Event')])
created_at = models.DateTimeField(auto_now_add=True)
# Periodic task to sync leads with external system
@shared_task
def sync_leads_to_external():
leads = Lead.objects.filter(created_at__gte=now() - timezone.timedelta(hours=24))
for lead in leads:
# pseudo‑code for Mailchimp import
external_api.update(lead.email, lead.name, lead.source)
7️⃣ Система логирования и аналитики (ELK‑подобная)
Типичный запрос: “Сбор, парсинг, поиск и визуализация огромных объёмов логов”.
Уточняющие вопросы
- QPS логов (операций / сек) – 1M / min?
- Насколько длительное хранение (30 дн, 1 год)?
- Требуется агрегировать в реальном времени?
Стек
- Концентратор: Kafka (high‑throughput)
- Парсер: Python (Apache Beam, Flink) → Elasticsearch
- Хранилище: Elasticsearch (index) + Kibana (dashboards)
- Длительность: S3 (cold storage) + Glacier for archival
- Monitoring: Prometheus + Grafana
Ключевые
Kafka, Apache Beam, Elasticsearch.
# Beam pipeline example (Google Cloud Dataflow / Apache Beam)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class LogParserFn(beam.DoFn):
def process(self, element):
# element = raw JSON string
parsed = json.loads(element)
yield {
"timestamp": parsed["ts"],
"host": parsed["host"],
"level": parsed["level"],
"msg": parsed["msg"]
}
with beam.Pipeline(options=PipelineOptions(...)) as p:
(p
| "ReadKafka" >> beam.io.ReadFromKafka(topic='logs')
| "Parse" >> beam.ParDo(LogParserFn())
| "WriteES" >> beam.io.WriteToElasticsearch(
elasticsearch_config="cluster:localhost",
doc_type="log",
index="logs"
))
8️⃣ Поиск‑движок для каталога товаров (текст + векторный)
Типичный запрос: “Пользователи ищут товары по тексту и по изображениям”.
Уточняющие вопросы
- Какой уровень relevance? (top‑10 % vs top‑3 %).
- Насколько быстро индексация новых товаров (real‑time vs batch).
- Потребность в faceted search (price, brand).
Стек
- Текстовый поиск: Elasticsearch (BM25)
- Векторный: ChromaDB / FAISS (embedding‑based)
- База данных: PostgreSQL (товары)
- API: FastAPI (combined results)
- Индексация: async jobs (Celery) + S3 (storing media)
Ключевые
Elasticsearch (text), ChromaDB (vectors), FAISS (fast search), FastAPI.
# Combined query function
def search_products(query, top_k=10):
# 1. BM25 from ES
es_hits = es.search(index="products", body={"query": {"multi_match": {"query": query, "fields": ["name", "desc"]}}})
# 2. Vector search from Chroma
chroma_hits = chromadb.query(query_embeddings=[embedding], n_results=top_k)
# merge scores (simple sum)
result = []
for doc_id in es_hits['hits']['hits']:
es_score = doc_id['_score']
if doc_id['_id'] in chroma_hits['documents']:
vec_score = chroma_hits['distances'][chroma_hits['documents'].index(doc_id['_id'])]
else:
vec_score = 0
result.append({'id': doc_id['_id'], 'score': es_score + vec_score})
# sort and return
return sorted(result, key=lambda x: x['score'], reverse=True)[:top_k]
9️⃣ Очередь задач для обучения AI‑моделей (GPU‑планирование)
Типичный запрос: “Система, где пользователи отправляют задачи обучения (data‑prep, model‑train)”.
Уточняющие вопросы
- Требуемый SLA на обучение (minutes‑hours).
- Как планировать GPU‑ресурсы (fair‑share, priority).
- Какие типы задач (batch, streaming).
Стек
- Queue: Celery + Redis (or RabbitMQ)
- Executor: Kubernetes (GPU‑node pools)
- Job‑tracker: Prefect / Airflow (DAG‑based)
- Модель: PyTorch Lightning, TensorFlow + Horovod for distributed
- Мониторинг: TensorBoard, Prometheus (GPU metrics)
Ключевые
Celery, Kubernetes GPU, Prefect, PyTorch Lightning.
# Celery task that runs on a GPU‑enabled worker
@shared_task(bind=True, max_retries=3)
def train_model(self, task_id, dataset_path):
try:
import torch
model = torch.load("best.pth").to("cuda")
train_loader = torch.utils.data.DataLoader(
torch.load(dataset_path),
batch_size=32, shuffle=True
)
# dummy training loop
for epoch in range(1):
for batch in train_loader:
x, y = batch
x, y = x.to("cuda"), y.to("cuda")
loss = model(x, y)
loss.backward()
# update optimizer...
# ...
# upload checkpoint
torch.save(model.state_dict(), f"/data/models/{task_id}.pth")
except Exception as exc:
self.retry(exc=exc)
🔟 API‑шлюз для микросервисной архитектуры
Типичный запрос: “Сегментировать доступ к сервисам, добавить аутентификацию, rate‑limit”.
Уточняющие вопросы
- Требуется JWT‑validation?
- Какие уровни шлюзов (edge, internal)?
- Как реализовать fallback‑routing (circuit‑breaker).
Стек
- Шлюз: FastAPI + ASGI (Uvicorn)
- Auth: OAuth2‑server (Authlib) + JWT (PyJWT)
- Rate‑limit: Redis (token‑bucket) + fastapi‑limiter
- Routing: Proxy‑router, HTTPS termination (NGINX)
- Observability: OpenTelemetry + Prometheus
Ключевые
FastAPI (gateway), Authlib, PyJWT, Redis token‑bucket, NGINX.
# FastAPI gateway example
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer
from fastapi_limiter import Limiter
import redis
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
limiter = Limiter(key_func=lambda r: r.client.host)
@app.post("/auth/token")
async def get_token(username: str, password: str):
# dummy check
if username != "admin" or password != "secret":
raise HTTPException(401, "Invalid credentials")
return {"access_token": "jwt-token"}
@app.get("/protected")
@limiter.limit("10/minute")
async def protected_api(token: str = Depends(oauth2_scheme)):
# verify JWT
if not verify_jwt(token):
raise HTTPException(401)
# forward request to downstream service (example: httpbin.org)
async with httpx.AsyncClient() as client:
r = await client.get("http://internal-service/api/v1/data")
return r.json()
3. Как использовать это пособие на собеседовании
- Задайте все уточняющие вопросы – это покажет системное мышление.
- Выберите стек – адаптируйте к задаче: для Python‑AI‑проектов лучшими являются FastAPI + async + Vector DB + Docker/K8s.
- Подготовьте мини‑прототипы – в ответе покажите 2‑3 кода (по 3‑5 строк) и объясните их.
- Обсудите trade‑offs – например, Redis vs. DB для кэша, FAISS vs. Elasticsearch для векторного поиска, synchronous vs. async processing.
- Говорите о мониторинге – Prometheus + Grafana, logging → Loki, alerting (Alertmanager).
4. Краткое резюме
| Технология | Почему важна для Python‑AI‑Web‑CRM | Пример её место |
|---|---|---|
| FastAPI | async, OpenAPI docs, легко для ML‑API | любой кейс с API‑endpoint |
| FAISS / ChromaDB | быстрый векторный поиск | рекомендации, поиск изображений |
| Celery | распределённые фоновые задачи (email, sync, model‑train) | почти все кейсы |
| Redis | кэш, сессии, rate‑limit, pub/sub | почти все |
| Kafka | потоковая обработка событий | рекомендации, уведомления, логи |
| Docker / K8s | repeatable environments, GPU‑workers | AI‑конвейер, модель‑train |
| Elasticsearch | full‑text + facets | поиск каталога, CRM‑поиск |
| PostgreSQL | реляционные данные (логины, CRM) | основной DB |
| Twilio / Firebase | внешние каналы уведомлений | мульти‑канальные уведомления |
| OAuth2 / JWT | аутентификация/авторизация | любой шлюз, CRM |
| Prometheus / Grafana | мониторинг, SLA, alerts | любые сервисы |
📚 Как дальше учиться
- Бесплатные курсы: “Designing Data‑Intensive Applications” (Martin Kleppmann) – примените к Python‑примеру.
- Статьи: “High‑Performance FAISS with Python” (HuggingFace Blog), “FastAPI vs Flask in Production”.
- Практика: создайте 5‑минутный микросервис‑прототип (FastAPI + Docker + Redis) и разверните его на k3d/kind.