← Назад к курсу

Apache Kafka: подробный обзор и пример проекта на Python

Что такое Apache Kafka?

Apache Kafka — это распределенная потоковая платформа с открытым исходным кодом, разработанная LinkedIn и переданная Apache Foundation. Она предназначена для обработки потоковых данных в реальном времени, построения конвейеров данных и создания событийно-ориентированных архитектур.

Ключевые концепции Kafka:

  1. Топики (Topics) — категории или потоки сообщений
  2. Партиции (Partitions) — топики разделяются на партиции для параллельной обработки
  3. Производители (Producers) — приложения, отправляющие сообщения в топики
  4. Потребители (Consumers) — приложения, читающие сообщения из топиков
  5. Брокеры (Brokers) — серверы Kafka, хранящие данные
  6. Кластер (Cluster) — группа брокеров, работающих вместе
  7. Zookeeper (или KRaft в новых версиях) — координация кластера

Установка и настройка Kafka

1. Установка Kafka локально

# Скачивание Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

# Запуск Zookeeper (в отдельном терминале)
bin/zookeeper-server-start.sh config/zookeeper.properties

# Запуск Kafka брокера (в другом терминале)
bin/kafka-server-start.sh config/server.properties

2. Docker Compose для Kafka

Создайте файл docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Пример проекта: Система мониторинга веб-сайтов

Давайте создадим систему, которая:

  1. Отправляет события о доступности веб-сайтов в Kafka
  2. Обрабатывает эти события для вычисления статистики
  3. Сохраняет результаты в базу данных
  4. Отображает дашборд с метриками

Структура проекта

kafka-monitoring-system/
│
├── docker-compose.yml
├── requirements.txt
├── config.py
├── producer.py
├── consumer.py
├── dashboard.py
├── database.py
└── websites.json

1. requirements.txt

confluent-kafka==2.2.0
requests==2.31.0
psycopg2-binary==2.9.9
fastapi==0.104.1
uvicorn==0.24.0
sqlalchemy==2.0.23
python-json-logger==2.0.7

2. Конфигурация (config.py)

import json
import logging

class Config:
    # Настройки Kafka
    KAFKA_BROKER = "localhost:29092"
    TOPIC_WEBSITE_CHECKS = "website_checks"
    TOPIC_ALERTS = "website_alerts"
    TOPIC_STATISTICS = "website_statistics"
    
    # Настройки базы данных
    DB_HOST = "localhost"
    DB_PORT = 5432
    DB_NAME = "website_monitoring"
    DB_USER = "postgres"
    DB_PASSWORD = "password"
    
    # Интервал проверки сайтов (в секундах)
    CHECK_INTERVAL = 60
    
    # Порог для алертов (время ответа в секундах)
    RESPONSE_TIME_THRESHOLD = 3
    UPTIME_THRESHOLD = 95  # процент

def setup_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    return logging.getLogger(__name__)

logger = setup_logging()

3. Производитель (producer.py)

import json
import time
import requests
from datetime import datetime
from confluent_kafka import Producer
from config import Config, logger

class WebsiteMonitorProducer:
    def __init__(self, websites_file="websites.json"):
        self.config = Config()
        self.producer = Producer({
            'bootstrap.servers': self.config.KAFKA_BROKER,
            'client.id': 'website-monitor-producer'
        })
        self.load_websites(websites_file)
        
    def load_websites(self, filepath):
        with open(filepath, 'r') as f:
            self.websites = json.load(f)
        logger.info(f"Загружено {len(self.websites)} сайтов для мониторинга")
    
    def check_website(self, website):
        """Проверяет доступность сайта и возвращает метрики"""
        start_time = time.time()
        try:
            response = requests.get(
                website['url'],
                timeout=10,
                headers={'User-Agent': 'WebsiteMonitor/1.0'}
            )
            response_time = time.time() - start_time
            status_code = response.status_code
            is_up = 200 <= status_code < 400
            content_length = len(response.content)
            
            return {
                'url': website['url'],
                'name': website['name'],
                'timestamp': datetime.utcnow().isoformat(),
                'response_time': response_time,
                'status_code': status_code,
                'is_up': is_up,
                'content_length': content_length,
                'checked_at': datetime.utcnow().isoformat()
            }
        except Exception as e:
            return {
                'url': website['url'],
                'name': website['name'],
                'timestamp': datetime.utcnow().isoformat(),
                'response_time': None,
                'status_code': None,
                'is_up': False,
                'error': str(e),
                'checked_at': datetime.utcnow().isoformat()
            }
    
    def delivery_report(self, err, msg):
        """Отчет о доставке сообщения в Kafka"""
        if err is not None:
            logger.error(f"Ошибка доставки: {err}")
        else:
            logger.debug(f"Сообщение доставлено в {msg.topic()} [{msg.partition()}]")
    
    def produce_website_check(self, website_data):
        """Отправляет данные проверки сайта в Kafka"""
        message = json.dumps(website_data).encode('utf-8')
        self.producer.produce(
            self.config.TOPIC_WEBSITE_CHECKS,
            value=message,
            callback=self.delivery_report
        )
        self.producer.poll(0)
    
    def run(self):
        """Основной цикл проверки сайтов"""
        logger.info("Запуск производителя мониторинга сайтов")
        
        while True:
            for website in self.websites:
                check_result = self.check_website(website)
                self.produce_website_check(check_result)
                
                # Логируем результат проверки
                status = "UP" if check_result['is_up'] else "DOWN"
                logger.info(
                    f"{website['name']} ({website['url']}) - {status} - "
                    f"{check_result.get('response_time', 0):.2f} сек"
                )
            
            # Ожидание перед следующей проверкой
            time.sleep(self.config.CHECK_INTERVAL)

if __name__ == "__main__":
    producer = WebsiteMonitorProducer()
    try:
        producer.run()
    except KeyboardInterrupt:
        logger.info("Остановка производителя")

4. Потребитель для обработки данных (consumer.py)

import json
import statistics
from datetime import datetime, timedelta
from collections import defaultdict
from confluent_kafka import Consumer, Producer
from config import Config, logger
from database import DatabaseManager

class WebsiteMetricsConsumer:
    def __init__(self):
        self.config = Config()
        self.db = DatabaseManager()
        
        # Инициализация Kafka Consumer
        self.consumer = Consumer({
            'bootstrap.servers': self.config.KAFKA_BROKER,
            'group.id': 'website-metrics-group',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': True
        })
        
        # Инициализация Kafka Producer для алертов
        self.producer = Producer({
            'bootstrap.servers': self.config.KAFKA_BROKER,
            'client.id': 'metrics-processor'
        })
        
        # Подписываемся на топики
        self.consumer.subscribe([self.config.TOPIC_WEBSITE_CHECKS])
        
        # Структуры для хранения метрик
        self.website_metrics = defaultdict(list)
        self.alerts = []
        
    def calculate_statistics(self, website_data):
        """Вычисляет статистику для сайта"""
        url = website_data['url']
        
        # Добавляем новую метрику
        self.website_metrics[url].append(website_data)
        
        # Ограничиваем историю последними 100 точками
        if len(self.website_metrics[url]) > 100:
            self.website_metrics[url] = self.website_metrics[url][-100:]
        
        metrics = self.website_metrics[url]
        
        # Вычисляем статистику
        response_times = [m['response_time'] for m in metrics if m.get('response_time')]
        is_up_count = sum(1 for m in metrics if m['is_up'])
        
        if response_times:
            stats = {
                'url': url,
                'name': website_data['name'],
                'timestamp': datetime.utcnow().isoformat(),
                'avg_response_time': statistics.mean(response_times),
                'min_response_time': min(response_times),
                'max_response_time': max(response_times),
                'uptime_percentage': (is_up_count / len(metrics)) * 100,
                'total_checks': len(metrics),
                'successful_checks': is_up_count,
                'last_check': website_data['checked_at']
            }
            
            # Проверяем пороги для алертов
            self.check_alerts(stats, website_data)
            
            # Сохраняем в базу данных
            self.db.save_website_metrics(stats)
            
            # Отправляем статистику в Kafka
            self.publish_statistics(stats)
            
            return stats
        
        return None
    
    def check_alerts(self, stats, last_check):
        """Проверяет условия для алертов"""
        alerts = []
        
        # Алерт по времени ответа
        if stats['avg_response_time'] > self.config.RESPONSE_TIME_THRESHOLD:
            alert = {
                'type': 'HIGH_RESPONSE_TIME',
                'severity': 'WARNING',
                'website': stats['name'],
                'url': stats['url'],
                'value': stats['avg_response_time'],
                'threshold': self.config.RESPONSE_TIME_THRESHOLD,
                'timestamp': datetime.utcnow().isoformat(),
                'message': f"Высокое время ответа: {stats['avg_response_time']:.2f} сек"
            }
            alerts.append(alert)
        
        # Алерт по uptime
        if stats['uptime_percentage'] < self.config.UPTIME_THRESHOLD:
            alert = {
                'type': 'LOW_UPTIME',
                'severity': 'CRITICAL',
                'website': stats['name'],
                'url': stats['url'],
                'value': stats['uptime_percentage'],
                'threshold': self.config.UPTIME_THRESHOLD,
                'timestamp': datetime.utcnow().isoformat(),
                'message': f"Низкий uptime: {stats['uptime_percentage']:.1f}%"
            }
            alerts.append(alert)
        
        # Алерт если сайт недоступен
        if not last_check['is_up']:
            alert = {
                'type': 'WEBSITE_DOWN',
                'severity': 'CRITICAL',
                'website': stats['name'],
                'url': stats['url'],
                'timestamp': datetime.utcnow().isoformat(),
                'message': f"Сайт недоступен: {last_check.get('error', 'Unknown error')}"
            }
            alerts.append(alert)
        
        # Отправляем алерты
        for alert in alerts:
            self.publish_alert(alert)
            logger.warning(f"Алерт: {alert['message']}")
    
    def publish_statistics(self, stats):
        """Отправляет статистику в Kafka"""
        message = json.dumps(stats).encode('utf-8')
        self.producer.produce(self.config.TOPIC_STATISTICS, value=message)
        self.producer.poll(0)
    
    def publish_alert(self, alert):
        """Отправляет алерт в Kafka"""
        message = json.dumps(alert).encode('utf-8')
        self.producer.produce(self.config.TOPIC_ALERTS, value=message)
        self.producer.poll(0)
    
    def run(self):
        """Основной цикл обработки сообщений"""
        logger.info("Запуск потребителя метрик сайтов")
        
        try:
            while True:
                msg = self.consumer.poll(1.0)
                
                if msg is None:
                    continue
                if msg.error():
                    logger.error(f"Ошибка потребителя: {msg.error()}")
                    continue
                
                # Обработка сообщения
                try:
                    website_data = json.loads(msg.value().decode('utf-8'))
                    stats = self.calculate_statistics(website_data)
                    
                    if stats:
                        logger.info(
                            f"Обработано: {stats['name']} - "
                            f"Uptime: {stats['uptime_percentage']:.1f}% - "
                            f"Avg response: {stats['avg_response_time']:.2f} сек"
                        )
                
                except json.JSONDecodeError as e:
                    logger.error(f"Ошибка декодирования JSON: {e}")
                except Exception as e:
                    logger.error(f"Ошибка обработки сообщения: {e}")
        
        except KeyboardInterrupt:
            logger.info("Остановка потребителя")
        finally:
            self.consumer.close()
            self.db.close()

if __name__ == "__main__":
    consumer = WebsiteMetricsConsumer()
    consumer.run()

5. База данных (database.py)

import psycopg2
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
from config import Config, logger

class DatabaseManager:
    def __init__(self):
        self.config = Config()
        self.init_database()
    
    @contextmanager
    def get_connection(self):
        conn = psycopg2.connect(
            host=self.config.DB_HOST,
            port=self.config.DB_PORT,
            database=self.config.DB_NAME,
            user=self.config.DB_USER,
            password=self.config.DB_PASSWORD
        )
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
    
    def init_database(self):
        """Инициализирует базу данных и таблицы"""
        try:
            with self.get_connection() as conn:
                with conn.cursor() as cur:
                    # Таблица для метрик сайтов
                    cur.execute("""
                        CREATE TABLE IF NOT EXISTS website_metrics (
                            id SERIAL PRIMARY KEY,
                            url VARCHAR(500) NOT NULL,
                            name VARCHAR(200) NOT NULL,
                            timestamp TIMESTAMP NOT NULL,
                            avg_response_time FLOAT,
                            min_response_time FLOAT,
                            max_response_time FLOAT,
                            uptime_percentage FLOAT,
                            total_checks INTEGER,
                            successful_checks INTEGER,
                            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        )
                    """)
                    
                    # Таблица для алертов
                    cur.execute("""
                        CREATE TABLE IF NOT EXISTS alerts (
                            id SERIAL PRIMARY KEY,
                            type VARCHAR(50) NOT NULL,
                            severity VARCHAR(20) NOT NULL,
                            website VARCHAR(200) NOT NULL,
                            url VARCHAR(500) NOT NULL,
                            value FLOAT,
                            threshold FLOAT,
                            message TEXT,
                            timestamp TIMESTAMP NOT NULL,
                            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                            resolved_at TIMESTAMP,
                            is_resolved BOOLEAN DEFAULT FALSE
                        )
                    """)
                    
                    # Создаем индексы для быстрого поиска
                    cur.execute("CREATE INDEX IF NOT EXISTS idx_website_metrics_url ON website_metrics(url)")
                    cur.execute("CREATE INDEX IF NOT EXISTS idx_website_metrics_timestamp ON website_metrics(timestamp)")
                    cur.execute("CREATE INDEX IF NOT EXISTS idx_alerts_timestamp ON alerts(timestamp)")
                    cur.execute("CREATE INDEX IF NOT EXISTS idx_alerts_is_resolved ON alerts(is_resolved)")
                    
            logger.info("База данных инициализирована")
        
        except Exception as e:
            logger.error(f"Ошибка инициализации БД: {e}")
    
    def save_website_metrics(self, metrics):
        """Сохраняет метрики сайта в базу данных"""
        try:
            with self.get_connection() as conn:
                with conn.cursor() as cur:
                    cur.execute("""
                        INSERT INTO website_metrics 
                        (url, name, timestamp, avg_response_time, min_response_time, 
                         max_response_time, uptime_percentage, total_checks, successful_checks)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                    """, (
                        metrics['url'],
                        metrics['name'],
                        metrics['timestamp'],
                        metrics['avg_response_time'],
                        metrics['min_response_time'],
                        metrics['max_response_time'],
                        metrics['uptime_percentage'],
                        metrics['total_checks'],
                        metrics['successful_checks']
                    ))
        
        except Exception as e:
            logger.error(f"Ошибка сохранения метрик: {e}")
    
    def save_alert(self, alert):
        """Сохраняет алерт в базу данных"""
        try:
            with self.get_connection() as conn:
                with conn.cursor() as cur:
                    cur.execute("""
                        INSERT INTO alerts 
                        (type, severity, website, url, value, threshold, message, timestamp)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    """, (
                        alert['type'],
                        alert['severity'],
                        alert['website'],
                        alert['url'],
                        alert.get('value'),
                        alert.get('threshold'),
                        alert['message'],
                        alert['timestamp']
                    ))
        
        except Exception as e:
            logger.error(f"Ошибка сохранения алерта: {e}")
    
    def get_website_stats(self, website_url, hours=24):
        """Получает статистику сайта за указанный период"""
        try:
            with self.get_connection() as conn:
                with conn.cursor(cursor_factory=RealDictCursor) as cur:
                    cur.execute("""
                        SELECT 
                            url,
                            name,
                            AVG(avg_response_time) as avg_response_time,
                            MIN(min_response_time) as min_response_time,
                            MAX(max_response_time) as max_response_time,
                            AVG(uptime_percentage) as uptime_percentage,
                            SUM(total_checks) as total_checks,
                            SUM(successful_checks) as successful_checks
                        FROM website_metrics
                        WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
                        GROUP BY url, name
                    """, (website_url, hours))
                    
                    return cur.fetchone()
        
        except Exception as e:
            logger.error(f"Ошибка получения статистики: {e}")
            return None
    
    def get_recent_alerts(self, limit=50, unresolved_only=False):
        """Получает последние алерты"""
        try:
            with self.get_connection() as conn:
                with conn.cursor(cursor_factory=RealDictCursor) as cur:
                    query = """
                        SELECT * FROM alerts
                        WHERE 1=1
                    """
                    params = []
                    
                    if unresolved_only:
                        query += " AND is_resolved = FALSE"
                    
                    query += " ORDER BY timestamp DESC LIMIT %s"
                    params.append(limit)
                    
                    cur.execute(query, params)
                    return cur.fetchall()
        
        except Exception as e:
            logger.error(f"Ошибка получения алертов: {e}")
            return []
    
    def close(self):
        """Закрывает соединения с БД"""
        pass  # Соединения закрываются автоматически в контекстном менеджере

6. Дашборд (dashboard.py)

from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import RealDictCursor
from config import Config, logger

app = FastAPI(title="Website Monitoring Dashboard")
templates = Jinja2Templates(directory="templates")

# Подключение к БД
def get_db_connection():
    config = Config()
    return psycopg2.connect(
        host=config.DB_HOST,
        port=config.DB_PORT,
        database=config.DB_NAME,
        user=config.DB_USER,
        password=config.DB_PASSWORD
    )

@app.get("/", response_class=HTMLResponse)
async def dashboard():
    """Главная страница дашборда"""
    html_content = """
    <html>
        <head>
            <title>Website Monitoring Dashboard</title>
            <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                .website-card { 
                    border: 1px solid #ddd; 
                    padding: 15px; 
                    margin: 10px 0; 
                    border-radius: 5px;
                    background-color: #f9f9f9;
                }
                .status-up { color: green; }
                .status-down { color: red; }
                .alert { background-color: #ffebee; border-color: #ffcdd2; }
            </style>
        </head>
        <body>
            <h1>Website Monitoring Dashboard</h1>
            
            <div id="websites">
                <h2>Мониторинг сайтов</h2>
                <div id="websites-list"></div>
            </div>
            
            <div id="alerts">
                <h2>Последние алерты</h2>
                <div id="alerts-list"></div>
            </div>
            
            <div id="charts">
                <h2>Статистика ответов</h2>
                <canvas id="responseTimeChart" width="800" height="400"></canvas>
            </div>
            
            <script>
                async function loadData() {
                    // Загрузка данных о сайтах
                    const websitesResponse = await fetch('/api/websites');
                    const websites = await websitesResponse.json();
                    
                    // Отображение сайтов
                    const websitesList = document.getElementById('websites-list');
                    websitesList.innerHTML = websites.map(website => `
                        <div class="website-card ${website.has_alerts ? 'alert' : ''}">
                            <h3>${website.name}</h3>
                            <p>URL: ${website.url}</p>
                            <p class="${website.uptime_percentage > 95 ? 'status-up' : 'status-down'}">
                                Uptime: ${website.uptime_percentage}%
                            </p>
                            <p>Среднее время ответа: ${website.avg_response_time} сек</p>
                            <p>Проверок: ${website.total_checks} (Успешно: ${website.successful_checks})</p>
                        </div>
                    `).join('');
                    
                    // Загрузка алертов
                    const alertsResponse = await fetch('/api/alerts');
                    const alerts = await alertsResponse.json();
                    
                    // Отображение алертов
                    const alertsList = document.getElementById('alerts-list');
                    alertsList.innerHTML = alerts.map(alert => `
                        <div class="website-card alert">
                            <h3>${alert.severity}: ${alert.type}</h3>
                            <p>Сайт: ${alert.website}</p>
                            <p>${alert.message}</p>
                            <small>${new Date(alert.timestamp).toLocaleString()}</small>
                        </div>
                    `).join('');
                    
                    // Построение графика
                    const ctx = document.getElementById('responseTimeChart').getContext('2d');
                    new Chart(ctx, {
                        type: 'line',
                        data: {
                            labels: websites.map(w => w.name),
                            datasets: [{
                                label: 'Среднее время ответа (сек)',
                                data: websites.map(w => w.avg_response_time),
                                borderColor: 'rgb(75, 192, 192)',
                                tension: 0.1
                            }]
                        },
                        options: {
                            responsive: true,
                            scales: {
                                y: {
                                    beginAtZero: true
                                }
                            }
                        }
                    });
                }
                
                // Загрузка данных при загрузке страницы
                document.addEventListener('DOMContentLoaded', loadData);
                
                // Обновление каждые 30 секунд
                setInterval(loadData, 30000);
            </script>
        </body>
    </html>
    """
    return HTMLResponse(content=html_content)

@app.get("/api/websites")
async def get_websites(hours: int = 24):
    """API для получения статистики по всем сайтам"""
    try:
        conn = get_db_connection()
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            # Получаем список уникальных сайтов
            cur.execute("""
                SELECT DISTINCT url, name 
                FROM website_metrics 
                WHERE timestamp >= NOW() - INTERVAL '%s hours'
            """, (hours,))
            websites = cur.fetchall()
            
            # Получаем статистику для каждого сайта
            result = []
            for website in websites:
                cur.execute("""
                    SELECT 
                        AVG(avg_response_time) as avg_response_time,
                        MIN(min_response_time) as min_response_time,
                        MAX(max_response_time) as max_response_time,
                        AVG(uptime_percentage) as uptime_percentage,
                        SUM(total_checks) as total_checks,
                        SUM(successful_checks) as successful_checks
                    FROM website_metrics
                    WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
                """, (website['url'], hours))
                
                stats = cur.fetchone()
                if stats:
                    website.update(stats)
                    
                    # Проверяем есть ли неразрешенные алерты
                    cur.execute("""
                        SELECT COUNT(*) as alert_count 
                        FROM alerts 
                        WHERE url = %s AND is_resolved = FALSE
                    """, (website['url'],))
                    
                    alert_count = cur.fetchone()['alert_count']
                    website['has_alerts'] = alert_count > 0
                    
                    result.append(website)
        
        conn.close()
        return result
    
    except Exception as e:
        logger.error(f"Ошибка получения данных сайтов: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/alerts")
async def get_alerts(limit: int = 10, unresolved: bool = False):
    """API для получения алертов"""
    try:
        conn = get_db_connection()
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            query = "SELECT * FROM alerts WHERE 1=1"
            params = []
            
            if unresolved:
                query += " AND is_resolved = FALSE"
            
            query += " ORDER BY timestamp DESC LIMIT %s"
            params.append(limit)
            
            cur.execute(query, params)
            alerts = cur.fetchall()
        
        conn.close()
        return alerts
    
    except Exception as e:
        logger.error(f"Ошибка получения алертов: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/website/{url}")
async def get_website_details(url: str, hours: int = 24):
    """API для получения детальной статистики по сайту"""
    try:
        conn = get_db_connection()
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            # Основная статистика
            cur.execute("""
                SELECT 
                    url,
                    name,
                    AVG(avg_response_time) as avg_response_time,
                    MIN(min_response_time) as min_response_time,
                    MAX(max_response_time) as max_response_time,
                    AVG(uptime_percentage) as uptime_percentage,
                    SUM(total_checks) as total_checks,
                    SUM(successful_checks) as successful_checks
                FROM website_metrics
                WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
                GROUP BY url, name
            """, (url, hours))
            
            stats = cur.fetchone()
            
            if not stats:
                raise HTTPException(status_code=404, detail="Website not found")
            
            # История метрик по часам
            cur.execute("""
                SELECT 
                    DATE_TRUNC('hour', timestamp) as hour,
                    AVG(avg_response_time) as avg_response_time,
                    AVG(uptime_percentage) as uptime_percentage
                FROM website_metrics
                WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
                GROUP BY DATE_TRUNC('hour', timestamp)
                ORDER BY hour
            """, (url, hours))
            
            history = cur.fetchall()
            
            # Алерты для этого сайта
            cur.execute("""
                SELECT * FROM alerts
                WHERE url = %s AND timestamp >= NOW() - INTERVAL '%s hours'
                ORDER BY timestamp DESC
            """, (url, hours))
            
            alerts = cur.fetchall()
            
            result = {
                "stats": stats,
                "history": history,
                "alerts": alerts
            }
        
        conn.close()
        return result
    
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Ошибка получения деталей сайта: {e}")
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

7. Файл с сайтами для мониторинга (websites.json)

[
    {
        "name": "Google",
        "url": "https://www.google.com",
        "category": "search"
    },
    {
        "name": "GitHub",
        "url": "https://github.com",
        "category": "development"
    },
    {
        "name": "Stack Overflow",
        "url": "https://stackoverflow.com",
        "category": "development"
    },
    {
        "name": "Python.org",
        "url": "https://www.python.org",
        "category": "development"
    },
    {
        "name": "Apache Kafka",
        "url": "https://kafka.apache.org",
        "category": "data"
    },
    {
        "name": "Docker Hub",
        "url": "https://hub.docker.com",
        "category": "devops"
    }
]

Запуск проекта

1. Настройка окружения

# Создание виртуального окружения
python -m venv venv
source venv/bin/activate  # На Windows: venv\Scripts\activate

# Установка зависимостей
pip install -r requirements.txt

# Установка и запуск PostgreSQL
# или используйте Docker для PostgreSQL:
docker run --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 -d postgres

# Создание базы данных
psql -h localhost -U postgres -c "CREATE DATABASE website_monitoring;"

2. Запуск инфраструктуры Kafka

# Запуск Kafka через Docker Compose
docker-compose up -d

# Создание топиков
docker exec -it kafka_kafka_1 kafka-topics --create --topic website_checks --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
docker exec -it kafka_kafka_1 kafka-topics --create --topic website_alerts --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
docker exec -it kafka_kafka_1 kafka-topics --create --topic website_statistics --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1

3. Запуск компонентов

# В первом терминале: Производитель
python producer.py

# Во втором терминале: Потребитель
python consumer.py

# В третьем терминале: Дашборд
python dashboard.py

4. Проверка работы

  1. Откройте браузер и перейдите по адресу: http://localhost:8000
  2. Вы увидите дашборд с метриками сайтов
  3. Данные будут обновляться каждые 30 секунд
  4. Проверьте топики Kafka:
# Просмотр сообщений в топиках
docker exec -it kafka_kafka_1 kafka-console-consumer --topic website_checks --bootstrap-server localhost:9092 --from-beginning

Продвинутые возможности Kafka в проекте

1. Обработка ошибок и повторная отправка

class RetryProducer:
    def __init__(self, max_retries=3):
        self.max_retries = max_retries
        
    def send_with_retry(self, topic, message):
        for attempt in range(self.max_retries):
            try:
                self.producer.produce(topic, message)
                self.producer.flush()
                return True
            except Exception as e:
                logger.error(f"Attempt {attempt + 1} failed: {e}")
                time.sleep(2 ** attempt)  # Exponential backoff
        return False

2. Консьюмер группы и балансировка нагрузки

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'website-monitoring-group',
    'partition.assignment.strategy': 'roundrobin',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False  # Ручное подтверждение
})

3. Сериализация Avro

from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_serializer = AvroSerializer(
    schema_str=website_schema,
    schema_registry_client=schema_registry_client,
    to_dict=lambda obj, ctx: obj.__dict__
)

producer.produce(
    topic='website_checks',
    value=avro_serializer(website_data, SerializationContext('website_checks', MessageField.VALUE))
)

4. Kafka Streams для обработки в реальном времени

from confluent_kafka import Producer, Consumer
import json

class KafkaStreamsProcessor:
    def __init__(self):
        self.consumer = Consumer({
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'streams-processor',
            'auto.offset.reset': 'earliest'
        })
        
        self.producer = Producer({
            'bootstrap.servers': 'localhost:9092'
        })
        
        self.state_store = {}  # In-memory состояние
        
    def process_windowed_metrics(self):
        """Обработка оконных метрик"""
        window_size = 300  # 5 минут
        self.consumer.subscribe(['website_checks'])
        
        while True:
            msg = self.consumer.poll(1.0)
            if msg is None:
                continue
                
            data = json.loads(msg.value())
            url = data['url']
            
            # Добавление в окно
            if url not in self.state_store:
                self.state_store[url] = []
            
            self.state_store[url].append(data)
            
            # Удаление старых данных
            current_time = datetime.utcnow().timestamp()
            self.state_store[url] = [
                d for d in self.state_store[url]
                if current_time - datetime.fromisoformat(d['timestamp']).timestamp() < window_size
            ]
            
            # Расчет метрик окна
            if len(self.state_store[url]) > 0:
                window_metrics = self.calculate_window_metrics(self.state_store[url])
                self.produce_window_metrics(window_metrics)

Мониторинг и администрирование Kafka

1. Команды для администрирования

# Просмотр топиков
kafka-topics --list --bootstrap-server localhost:9092

# Информация о топике
kafka-topics --describe --topic website_checks --bootstrap-server localhost:9092

# Просмотр смещений консьюмер группы
kafka-consumer-groups --bootstrap-server localhost:9092 --group website-metrics-group --describe

# Удаление топика
kafka-topics --delete --topic test_topic --bootstrap-server localhost:9092

# Мониторинг трафика
kafka-run-class kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi

2. Интеграция с Prometheus и Grafana

# Конфигурация JMX Exporter для Kafka
---
startDelaySeconds: 0
hostPort: localhost:9999
username: 
password: 
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
rules:
  - pattern: "kafka.server<type=(.+), name=(.+)><>Count"
    name: "kafka_server_$1_$2"
  - pattern: "kafka.server<type=(.+), name=(.+)PerSec, (.+)=(.+)><>Count"
    name: "kafka_server_$1_$2_per_sec"
    labels:
      "$3": "$4"

Заключение

Этот проект демонстрирует ключевые возможности Apache Kafka:

  1. Надежная доставка сообщений через производителей и потребителей
  2. Масштабируемость через партиционирование топиков
  3. Обработка в реальном времени потоковых данных
  4. Отказоустойчивость через репликацию и потребительские группы
  5. Интеграция с внешними системами (БД, веб-интерфейсы)

Для продакшен-использования рекомендуется добавить:

  • Аутентификацию и авторизацию SASL/SSL
  • Мониторинг через JMX и Prometheus
  • Резервное копирование и восстановление
  • Автоматическое масштабирование
  • Тщательное тестирование и нагрузочное тестирование

Kafka является мощным инструментом для построения современных распределенных систем и обработки потоковых данных в реальном времени.