← Назад к курсу
Apache Kafka: подробный обзор и пример проекта на Python
Что такое Apache Kafka?
Apache Kafka — это распределенная потоковая платформа с открытым исходным кодом, разработанная LinkedIn и переданная Apache Foundation. Она предназначена для обработки потоковых данных в реальном времени, построения конвейеров данных и создания событийно-ориентированных архитектур.
Ключевые концепции Kafka:
- Топики (Topics) — категории или потоки сообщений
- Партиции (Partitions) — топики разделяются на партиции для параллельной обработки
- Производители (Producers) — приложения, отправляющие сообщения в топики
- Потребители (Consumers) — приложения, читающие сообщения из топиков
- Брокеры (Brokers) — серверы Kafka, хранящие данные
- Кластер (Cluster) — группа брокеров, работающих вместе
- Zookeeper (или KRaft в новых версиях) — координация кластера
Установка и настройка Kafka
1. Установка Kafka локально
# Скачивание Kafka wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz tar -xzf kafka_2.13-3.6.0.tgz cd kafka_2.13-3.6.0 # Запуск Zookeeper (в отдельном терминале) bin/zookeeper-server-start.sh config/zookeeper.properties # Запуск Kafka брокера (в другом терминале) bin/kafka-server-start.sh config/server.properties
2. Docker Compose для Kafka
Создайте файл docker-compose.yml:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Пример проекта: Система мониторинга веб-сайтов
Давайте создадим систему, которая:
- Отправляет события о доступности веб-сайтов в Kafka
- Обрабатывает эти события для вычисления статистики
- Сохраняет результаты в базу данных
- Отображает дашборд с метриками
Структура проекта
kafka-monitoring-system/ │ ├── docker-compose.yml ├── requirements.txt ├── config.py ├── producer.py ├── consumer.py ├── dashboard.py ├── database.py └── websites.json
1. requirements.txt
confluent-kafka==2.2.0 requests==2.31.0 psycopg2-binary==2.9.9 fastapi==0.104.1 uvicorn==0.24.0 sqlalchemy==2.0.23 python-json-logger==2.0.7
2. Конфигурация (config.py)
import json
import logging
class Config:
# Настройки Kafka
KAFKA_BROKER = "localhost:29092"
TOPIC_WEBSITE_CHECKS = "website_checks"
TOPIC_ALERTS = "website_alerts"
TOPIC_STATISTICS = "website_statistics"
# Настройки базы данных
DB_HOST = "localhost"
DB_PORT = 5432
DB_NAME = "website_monitoring"
DB_USER = "postgres"
DB_PASSWORD = "password"
# Интервал проверки сайтов (в секундах)
CHECK_INTERVAL = 60
# Порог для алертов (время ответа в секундах)
RESPONSE_TIME_THRESHOLD = 3
UPTIME_THRESHOLD = 95 # процент
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
return logging.getLogger(__name__)
logger = setup_logging()
3. Производитель (producer.py)
import json
import time
import requests
from datetime import datetime
from confluent_kafka import Producer
from config import Config, logger
class WebsiteMonitorProducer:
def __init__(self, websites_file="websites.json"):
self.config = Config()
self.producer = Producer({
'bootstrap.servers': self.config.KAFKA_BROKER,
'client.id': 'website-monitor-producer'
})
self.load_websites(websites_file)
def load_websites(self, filepath):
with open(filepath, 'r') as f:
self.websites = json.load(f)
logger.info(f"Загружено {len(self.websites)} сайтов для мониторинга")
def check_website(self, website):
"""Проверяет доступность сайта и возвращает метрики"""
start_time = time.time()
try:
response = requests.get(
website['url'],
timeout=10,
headers={'User-Agent': 'WebsiteMonitor/1.0'}
)
response_time = time.time() - start_time
status_code = response.status_code
is_up = 200 <= status_code < 400
content_length = len(response.content)
return {
'url': website['url'],
'name': website['name'],
'timestamp': datetime.utcnow().isoformat(),
'response_time': response_time,
'status_code': status_code,
'is_up': is_up,
'content_length': content_length,
'checked_at': datetime.utcnow().isoformat()
}
except Exception as e:
return {
'url': website['url'],
'name': website['name'],
'timestamp': datetime.utcnow().isoformat(),
'response_time': None,
'status_code': None,
'is_up': False,
'error': str(e),
'checked_at': datetime.utcnow().isoformat()
}
def delivery_report(self, err, msg):
"""Отчет о доставке сообщения в Kafka"""
if err is not None:
logger.error(f"Ошибка доставки: {err}")
else:
logger.debug(f"Сообщение доставлено в {msg.topic()} [{msg.partition()}]")
def produce_website_check(self, website_data):
"""Отправляет данные проверки сайта в Kafka"""
message = json.dumps(website_data).encode('utf-8')
self.producer.produce(
self.config.TOPIC_WEBSITE_CHECKS,
value=message,
callback=self.delivery_report
)
self.producer.poll(0)
def run(self):
"""Основной цикл проверки сайтов"""
logger.info("Запуск производителя мониторинга сайтов")
while True:
for website in self.websites:
check_result = self.check_website(website)
self.produce_website_check(check_result)
# Логируем результат проверки
status = "UP" if check_result['is_up'] else "DOWN"
logger.info(
f"{website['name']} ({website['url']}) - {status} - "
f"{check_result.get('response_time', 0):.2f} сек"
)
# Ожидание перед следующей проверкой
time.sleep(self.config.CHECK_INTERVAL)
if __name__ == "__main__":
producer = WebsiteMonitorProducer()
try:
producer.run()
except KeyboardInterrupt:
logger.info("Остановка производителя")
4. Потребитель для обработки данных (consumer.py)
import json
import statistics
from datetime import datetime, timedelta
from collections import defaultdict
from confluent_kafka import Consumer, Producer
from config import Config, logger
from database import DatabaseManager
class WebsiteMetricsConsumer:
def __init__(self):
self.config = Config()
self.db = DatabaseManager()
# Инициализация Kafka Consumer
self.consumer = Consumer({
'bootstrap.servers': self.config.KAFKA_BROKER,
'group.id': 'website-metrics-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
})
# Инициализация Kafka Producer для алертов
self.producer = Producer({
'bootstrap.servers': self.config.KAFKA_BROKER,
'client.id': 'metrics-processor'
})
# Подписываемся на топики
self.consumer.subscribe([self.config.TOPIC_WEBSITE_CHECKS])
# Структуры для хранения метрик
self.website_metrics = defaultdict(list)
self.alerts = []
def calculate_statistics(self, website_data):
"""Вычисляет статистику для сайта"""
url = website_data['url']
# Добавляем новую метрику
self.website_metrics[url].append(website_data)
# Ограничиваем историю последними 100 точками
if len(self.website_metrics[url]) > 100:
self.website_metrics[url] = self.website_metrics[url][-100:]
metrics = self.website_metrics[url]
# Вычисляем статистику
response_times = [m['response_time'] for m in metrics if m.get('response_time')]
is_up_count = sum(1 for m in metrics if m['is_up'])
if response_times:
stats = {
'url': url,
'name': website_data['name'],
'timestamp': datetime.utcnow().isoformat(),
'avg_response_time': statistics.mean(response_times),
'min_response_time': min(response_times),
'max_response_time': max(response_times),
'uptime_percentage': (is_up_count / len(metrics)) * 100,
'total_checks': len(metrics),
'successful_checks': is_up_count,
'last_check': website_data['checked_at']
}
# Проверяем пороги для алертов
self.check_alerts(stats, website_data)
# Сохраняем в базу данных
self.db.save_website_metrics(stats)
# Отправляем статистику в Kafka
self.publish_statistics(stats)
return stats
return None
def check_alerts(self, stats, last_check):
"""Проверяет условия для алертов"""
alerts = []
# Алерт по времени ответа
if stats['avg_response_time'] > self.config.RESPONSE_TIME_THRESHOLD:
alert = {
'type': 'HIGH_RESPONSE_TIME',
'severity': 'WARNING',
'website': stats['name'],
'url': stats['url'],
'value': stats['avg_response_time'],
'threshold': self.config.RESPONSE_TIME_THRESHOLD,
'timestamp': datetime.utcnow().isoformat(),
'message': f"Высокое время ответа: {stats['avg_response_time']:.2f} сек"
}
alerts.append(alert)
# Алерт по uptime
if stats['uptime_percentage'] < self.config.UPTIME_THRESHOLD:
alert = {
'type': 'LOW_UPTIME',
'severity': 'CRITICAL',
'website': stats['name'],
'url': stats['url'],
'value': stats['uptime_percentage'],
'threshold': self.config.UPTIME_THRESHOLD,
'timestamp': datetime.utcnow().isoformat(),
'message': f"Низкий uptime: {stats['uptime_percentage']:.1f}%"
}
alerts.append(alert)
# Алерт если сайт недоступен
if not last_check['is_up']:
alert = {
'type': 'WEBSITE_DOWN',
'severity': 'CRITICAL',
'website': stats['name'],
'url': stats['url'],
'timestamp': datetime.utcnow().isoformat(),
'message': f"Сайт недоступен: {last_check.get('error', 'Unknown error')}"
}
alerts.append(alert)
# Отправляем алерты
for alert in alerts:
self.publish_alert(alert)
logger.warning(f"Алерт: {alert['message']}")
def publish_statistics(self, stats):
"""Отправляет статистику в Kafka"""
message = json.dumps(stats).encode('utf-8')
self.producer.produce(self.config.TOPIC_STATISTICS, value=message)
self.producer.poll(0)
def publish_alert(self, alert):
"""Отправляет алерт в Kafka"""
message = json.dumps(alert).encode('utf-8')
self.producer.produce(self.config.TOPIC_ALERTS, value=message)
self.producer.poll(0)
def run(self):
"""Основной цикл обработки сообщений"""
logger.info("Запуск потребителя метрик сайтов")
try:
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
logger.error(f"Ошибка потребителя: {msg.error()}")
continue
# Обработка сообщения
try:
website_data = json.loads(msg.value().decode('utf-8'))
stats = self.calculate_statistics(website_data)
if stats:
logger.info(
f"Обработано: {stats['name']} - "
f"Uptime: {stats['uptime_percentage']:.1f}% - "
f"Avg response: {stats['avg_response_time']:.2f} сек"
)
except json.JSONDecodeError as e:
logger.error(f"Ошибка декодирования JSON: {e}")
except Exception as e:
logger.error(f"Ошибка обработки сообщения: {e}")
except KeyboardInterrupt:
logger.info("Остановка потребителя")
finally:
self.consumer.close()
self.db.close()
if __name__ == "__main__":
consumer = WebsiteMetricsConsumer()
consumer.run()
5. База данных (database.py)
import psycopg2
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
from config import Config, logger
class DatabaseManager:
def __init__(self):
self.config = Config()
self.init_database()
@contextmanager
def get_connection(self):
conn = psycopg2.connect(
host=self.config.DB_HOST,
port=self.config.DB_PORT,
database=self.config.DB_NAME,
user=self.config.DB_USER,
password=self.config.DB_PASSWORD
)
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def init_database(self):
"""Инициализирует базу данных и таблицы"""
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
# Таблица для метрик сайтов
cur.execute("""
CREATE TABLE IF NOT EXISTS website_metrics (
id SERIAL PRIMARY KEY,
url VARCHAR(500) NOT NULL,
name VARCHAR(200) NOT NULL,
timestamp TIMESTAMP NOT NULL,
avg_response_time FLOAT,
min_response_time FLOAT,
max_response_time FLOAT,
uptime_percentage FLOAT,
total_checks INTEGER,
successful_checks INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Таблица для алертов
cur.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id SERIAL PRIMARY KEY,
type VARCHAR(50) NOT NULL,
severity VARCHAR(20) NOT NULL,
website VARCHAR(200) NOT NULL,
url VARCHAR(500) NOT NULL,
value FLOAT,
threshold FLOAT,
message TEXT,
timestamp TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
resolved_at TIMESTAMP,
is_resolved BOOLEAN DEFAULT FALSE
)
""")
# Создаем индексы для быстрого поиска
cur.execute("CREATE INDEX IF NOT EXISTS idx_website_metrics_url ON website_metrics(url)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_website_metrics_timestamp ON website_metrics(timestamp)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_alerts_timestamp ON alerts(timestamp)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_alerts_is_resolved ON alerts(is_resolved)")
logger.info("База данных инициализирована")
except Exception as e:
logger.error(f"Ошибка инициализации БД: {e}")
def save_website_metrics(self, metrics):
"""Сохраняет метрики сайта в базу данных"""
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO website_metrics
(url, name, timestamp, avg_response_time, min_response_time,
max_response_time, uptime_percentage, total_checks, successful_checks)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
metrics['url'],
metrics['name'],
metrics['timestamp'],
metrics['avg_response_time'],
metrics['min_response_time'],
metrics['max_response_time'],
metrics['uptime_percentage'],
metrics['total_checks'],
metrics['successful_checks']
))
except Exception as e:
logger.error(f"Ошибка сохранения метрик: {e}")
def save_alert(self, alert):
"""Сохраняет алерт в базу данных"""
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO alerts
(type, severity, website, url, value, threshold, message, timestamp)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
alert['type'],
alert['severity'],
alert['website'],
alert['url'],
alert.get('value'),
alert.get('threshold'),
alert['message'],
alert['timestamp']
))
except Exception as e:
logger.error(f"Ошибка сохранения алерта: {e}")
def get_website_stats(self, website_url, hours=24):
"""Получает статистику сайта за указанный период"""
try:
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT
url,
name,
AVG(avg_response_time) as avg_response_time,
MIN(min_response_time) as min_response_time,
MAX(max_response_time) as max_response_time,
AVG(uptime_percentage) as uptime_percentage,
SUM(total_checks) as total_checks,
SUM(successful_checks) as successful_checks
FROM website_metrics
WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
GROUP BY url, name
""", (website_url, hours))
return cur.fetchone()
except Exception as e:
logger.error(f"Ошибка получения статистики: {e}")
return None
def get_recent_alerts(self, limit=50, unresolved_only=False):
"""Получает последние алерты"""
try:
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
query = """
SELECT * FROM alerts
WHERE 1=1
"""
params = []
if unresolved_only:
query += " AND is_resolved = FALSE"
query += " ORDER BY timestamp DESC LIMIT %s"
params.append(limit)
cur.execute(query, params)
return cur.fetchall()
except Exception as e:
logger.error(f"Ошибка получения алертов: {e}")
return []
def close(self):
"""Закрывает соединения с БД"""
pass # Соединения закрываются автоматически в контекстном менеджере
6. Дашборд (dashboard.py)
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import RealDictCursor
from config import Config, logger
app = FastAPI(title="Website Monitoring Dashboard")
templates = Jinja2Templates(directory="templates")
# Подключение к БД
def get_db_connection():
config = Config()
return psycopg2.connect(
host=config.DB_HOST,
port=config.DB_PORT,
database=config.DB_NAME,
user=config.DB_USER,
password=config.DB_PASSWORD
)
@app.get("/", response_class=HTMLResponse)
async def dashboard():
"""Главная страница дашборда"""
html_content = """
<html>
<head>
<title>Website Monitoring Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.website-card {
border: 1px solid #ddd;
padding: 15px;
margin: 10px 0;
border-radius: 5px;
background-color: #f9f9f9;
}
.status-up { color: green; }
.status-down { color: red; }
.alert { background-color: #ffebee; border-color: #ffcdd2; }
</style>
</head>
<body>
<h1>Website Monitoring Dashboard</h1>
<div id="websites">
<h2>Мониторинг сайтов</h2>
<div id="websites-list"></div>
</div>
<div id="alerts">
<h2>Последние алерты</h2>
<div id="alerts-list"></div>
</div>
<div id="charts">
<h2>Статистика ответов</h2>
<canvas id="responseTimeChart" width="800" height="400"></canvas>
</div>
<script>
async function loadData() {
// Загрузка данных о сайтах
const websitesResponse = await fetch('/api/websites');
const websites = await websitesResponse.json();
// Отображение сайтов
const websitesList = document.getElementById('websites-list');
websitesList.innerHTML = websites.map(website => `
<div class="website-card ${website.has_alerts ? 'alert' : ''}">
<h3>${website.name}</h3>
<p>URL: ${website.url}</p>
<p class="${website.uptime_percentage > 95 ? 'status-up' : 'status-down'}">
Uptime: ${website.uptime_percentage}%
</p>
<p>Среднее время ответа: ${website.avg_response_time} сек</p>
<p>Проверок: ${website.total_checks} (Успешно: ${website.successful_checks})</p>
</div>
`).join('');
// Загрузка алертов
const alertsResponse = await fetch('/api/alerts');
const alerts = await alertsResponse.json();
// Отображение алертов
const alertsList = document.getElementById('alerts-list');
alertsList.innerHTML = alerts.map(alert => `
<div class="website-card alert">
<h3>${alert.severity}: ${alert.type}</h3>
<p>Сайт: ${alert.website}</p>
<p>${alert.message}</p>
<small>${new Date(alert.timestamp).toLocaleString()}</small>
</div>
`).join('');
// Построение графика
const ctx = document.getElementById('responseTimeChart').getContext('2d');
new Chart(ctx, {
type: 'line',
data: {
labels: websites.map(w => w.name),
datasets: [{
label: 'Среднее время ответа (сек)',
data: websites.map(w => w.avg_response_time),
borderColor: 'rgb(75, 192, 192)',
tension: 0.1
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
}
// Загрузка данных при загрузке страницы
document.addEventListener('DOMContentLoaded', loadData);
// Обновление каждые 30 секунд
setInterval(loadData, 30000);
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.get("/api/websites")
async def get_websites(hours: int = 24):
"""API для получения статистики по всем сайтам"""
try:
conn = get_db_connection()
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Получаем список уникальных сайтов
cur.execute("""
SELECT DISTINCT url, name
FROM website_metrics
WHERE timestamp >= NOW() - INTERVAL '%s hours'
""", (hours,))
websites = cur.fetchall()
# Получаем статистику для каждого сайта
result = []
for website in websites:
cur.execute("""
SELECT
AVG(avg_response_time) as avg_response_time,
MIN(min_response_time) as min_response_time,
MAX(max_response_time) as max_response_time,
AVG(uptime_percentage) as uptime_percentage,
SUM(total_checks) as total_checks,
SUM(successful_checks) as successful_checks
FROM website_metrics
WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
""", (website['url'], hours))
stats = cur.fetchone()
if stats:
website.update(stats)
# Проверяем есть ли неразрешенные алерты
cur.execute("""
SELECT COUNT(*) as alert_count
FROM alerts
WHERE url = %s AND is_resolved = FALSE
""", (website['url'],))
alert_count = cur.fetchone()['alert_count']
website['has_alerts'] = alert_count > 0
result.append(website)
conn.close()
return result
except Exception as e:
logger.error(f"Ошибка получения данных сайтов: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/alerts")
async def get_alerts(limit: int = 10, unresolved: bool = False):
"""API для получения алертов"""
try:
conn = get_db_connection()
with conn.cursor(cursor_factory=RealDictCursor) as cur:
query = "SELECT * FROM alerts WHERE 1=1"
params = []
if unresolved:
query += " AND is_resolved = FALSE"
query += " ORDER BY timestamp DESC LIMIT %s"
params.append(limit)
cur.execute(query, params)
alerts = cur.fetchall()
conn.close()
return alerts
except Exception as e:
logger.error(f"Ошибка получения алертов: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/website/{url}")
async def get_website_details(url: str, hours: int = 24):
"""API для получения детальной статистики по сайту"""
try:
conn = get_db_connection()
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Основная статистика
cur.execute("""
SELECT
url,
name,
AVG(avg_response_time) as avg_response_time,
MIN(min_response_time) as min_response_time,
MAX(max_response_time) as max_response_time,
AVG(uptime_percentage) as uptime_percentage,
SUM(total_checks) as total_checks,
SUM(successful_checks) as successful_checks
FROM website_metrics
WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
GROUP BY url, name
""", (url, hours))
stats = cur.fetchone()
if not stats:
raise HTTPException(status_code=404, detail="Website not found")
# История метрик по часам
cur.execute("""
SELECT
DATE_TRUNC('hour', timestamp) as hour,
AVG(avg_response_time) as avg_response_time,
AVG(uptime_percentage) as uptime_percentage
FROM website_metrics
WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
GROUP BY DATE_TRUNC('hour', timestamp)
ORDER BY hour
""", (url, hours))
history = cur.fetchall()
# Алерты для этого сайта
cur.execute("""
SELECT * FROM alerts
WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
ORDER BY timestamp DESC
""", (url, hours))
alerts = cur.fetchall()
result = {
"stats": stats,
"history": history,
"alerts": alerts
}
conn.close()
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Ошибка получения деталей сайта: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
7. Файл с сайтами для мониторинга (websites.json)
[
{
"name": "Google",
"url": "https://www.google.com",
"category": "search"
},
{
"name": "GitHub",
"url": "https://github.com",
"category": "development"
},
{
"name": "Stack Overflow",
"url": "https://stackoverflow.com",
"category": "development"
},
{
"name": "Python.org",
"url": "https://www.python.org",
"category": "development"
},
{
"name": "Apache Kafka",
"url": "https://kafka.apache.org",
"category": "data"
},
{
"name": "Docker Hub",
"url": "https://hub.docker.com",
"category": "devops"
}
]
Запуск проекта
1. Настройка окружения
# Создание виртуального окружения python -m venv venv source venv/bin/activate # На Windows: venv\Scripts\activate # Установка зависимостей pip install -r requirements.txt # Установка и запуск PostgreSQL # или используйте Docker для PostgreSQL: docker run --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 -d postgres # Создание базы данных psql -h localhost -U postgres -c "CREATE DATABASE website_monitoring;"
2. Запуск инфраструктуры Kafka
# Запуск Kafka через Docker Compose docker-compose up -d # Создание топиков docker exec -it kafka_kafka_1 kafka-topics --create --topic website_checks --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 docker exec -it kafka_kafka_1 kafka-topics --create --topic website_alerts --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1 docker exec -it kafka_kafka_1 kafka-topics --create --topic website_statistics --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
3. Запуск компонентов
# В первом терминале: Производитель python producer.py # Во втором терминале: Потребитель python consumer.py # В третьем терминале: Дашборд python dashboard.py
4. Проверка работы
- Откройте браузер и перейдите по адресу: http://localhost:8000
- Вы увидите дашборд с метриками сайтов
- Данные будут обновляться каждые 30 секунд
- Проверьте топики Kafka:
# Просмотр сообщений в топиках docker exec -it kafka_kafka_1 kafka-console-consumer --topic website_checks --bootstrap-server localhost:9092 --from-beginning
Продвинутые возможности Kafka в проекте
1. Обработка ошибок и повторная отправка
class RetryProducer:
def __init__(self, max_retries=3):
self.max_retries = max_retries
def send_with_retry(self, topic, message):
for attempt in range(self.max_retries):
try:
self.producer.produce(topic, message)
self.producer.flush()
return True
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt) # Exponential backoff
return False
2. Консьюмер группы и балансировка нагрузки
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'website-monitoring-group',
'partition.assignment.strategy': 'roundrobin',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Ручное подтверждение
})
3. Сериализация Avro
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_serializer = AvroSerializer(
schema_str=website_schema,
schema_registry_client=schema_registry_client,
to_dict=lambda obj, ctx: obj.__dict__
)
producer.produce(
topic='website_checks',
value=avro_serializer(website_data, SerializationContext('website_checks', MessageField.VALUE))
)
4. Kafka Streams для обработки в реальном времени
from confluent_kafka import Producer, Consumer
import json
class KafkaStreamsProcessor:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'streams-processor',
'auto.offset.reset': 'earliest'
})
self.producer = Producer({
'bootstrap.servers': 'localhost:9092'
})
self.state_store = {} # In-memory состояние
def process_windowed_metrics(self):
"""Обработка оконных метрик"""
window_size = 300 # 5 минут
self.consumer.subscribe(['website_checks'])
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
data = json.loads(msg.value())
url = data['url']
# Добавление в окно
if url not in self.state_store:
self.state_store[url] = []
self.state_store[url].append(data)
# Удаление старых данных
current_time = datetime.utcnow().timestamp()
self.state_store[url] = [
d for d in self.state_store[url]
if current_time - datetime.fromisoformat(d['timestamp']).timestamp() < window_size
]
# Расчет метрик окна
if len(self.state_store[url]) > 0:
window_metrics = self.calculate_window_metrics(self.state_store[url])
self.produce_window_metrics(window_metrics)
Мониторинг и администрирование Kafka
1. Команды для администрирования
# Просмотр топиков kafka-topics --list --bootstrap-server localhost:9092 # Информация о топике kafka-topics --describe --topic website_checks --bootstrap-server localhost:9092 # Просмотр смещений консьюмер группы kafka-consumer-groups --bootstrap-server localhost:9092 --group website-metrics-group --describe # Удаление топика kafka-topics --delete --topic test_topic --bootstrap-server localhost:9092 # Мониторинг трафика kafka-run-class kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
2. Интеграция с Prometheus и Grafana
# Конфигурация JMX Exporter для Kafka
---
startDelaySeconds: 0
hostPort: localhost:9999
username:
password:
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
rules:
- pattern: "kafka.server<type=(.+), name=(.+)><>Count"
name: "kafka_server_$1_$2"
- pattern: "kafka.server<type=(.+), name=(.+)PerSec, (.+)=(.+)><>Count"
name: "kafka_server_$1_$2_per_sec"
labels:
"$3": "$4"
Заключение
Этот проект демонстрирует ключевые возможности Apache Kafka:
- Надежная доставка сообщений через производителей и потребителей
- Масштабируемость через партиционирование топиков
- Обработка в реальном времени потоковых данных
- Отказоустойчивость через репликацию и потребительские группы
- Интеграция с внешними системами (БД, веб-интерфейсы)
Для продакшен-использования рекомендуется добавить:
- Аутентификацию и авторизацию SASL/SSL
- Мониторинг через JMX и Prometheus
- Резервное копирование и восстановление
- Автоматическое масштабирование
- Тщательное тестирование и нагрузочное тестирование
Kafka является мощным инструментом для построения современных распределенных систем и обработки потоковых данных в реальном времени.