← Назад к курсу

Подробный обзор протоколов интеграции: TCP, UDP, HTTP, gRPC

Подробный обзор протоколов интеграции: TCP, UDP, HTTP, gRPC

Содержание

  1. Сетевая модель OSI/TCP-IP
  2. TCP (Transmission Control Protocol)
  3. UDP (User Datagram Protocol)
  4. HTTP (HyperText Transfer Protocol)
  5. gRPC (gRPC Remote Procedure Calls)
  6. Сравнительная таблица

Сетевая модель OSI/TCP-IP {#сетевая-модель}

Прикладной уровень (HTTP, gRPC, FTP, DNS)
└── Транспортный уровень (TCP, UDP)
    └── Сетевой уровень (IP)
        └── Канальный уровень (Ethernet)

TCP (Transmission Control Protocol) {#tcp}

Теория

TCP — надежный потоковый протокол транспортного уровня с установлением соединения.

Основные характеристики:

  • Гарантированная доставка данных
  • Упорядоченность пакетов
  • Контроль перегрузки
  • Управление потоком
  • Установление соединения (3-way handshake)
  • Поддержка дуплексной передачи

Заголовок TCP (20-60 байт):

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|          Source Port          |       Destination Port        |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                        Sequence Number                        |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                    Acknowledgment Number                      |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|  Data |           |U|A|P|R|S|F|                               |
| Offset| Reserved  |R|C|S|S|Y|I|            Window             |
|       |           |G|K|H|T|N|N|                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|           Checksum            |         Urgent Pointer        |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                    Options                    |    Padding    |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Пример TCP-сервера на Python

import socket
import threading
import time

class TCPServer:
    def __init__(self, host='127.0.0.1', port=8888):
        self.host = host
        self.port = port
        self.server_socket = None
        self.clients = {}
        
    def start(self):
        """Запуск TCP-сервера"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        
        print(f"TCP Server started on {self.host}:{self.port}")
        
        try:
            while True:
                client_socket, client_address = self.server_socket.accept()
                print(f"New connection from {client_address}")
                
                # Создаем отдельный поток для клиента
                client_thread = threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, client_address)
                )
                client_thread.daemon = True
                client_thread.start()
                
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.stop()
    
    def handle_client(self, client_socket, client_address):
        """Обработка клиентского соединения"""
        try:
            # Отправляем приветственное сообщение
            welcome_msg = f"Welcome to TCP Server! Your address: {client_address}"
            client_socket.send(welcome_msg.encode('utf-8'))
            
            while True:
                # Получаем данные от клиента (максимум 1024 байта)
                data = client_socket.recv(1024)
                
                if not data:
                    print(f"Connection closed by {client_address}")
                    break
                
                message = data.decode('utf-8').strip()
                print(f"Received from {client_address}: {message}")
                
                # Эхо-ответ
                response = f"ECHO: {message}"
                client_socket.send(response.encode('utf-8'))
                
                # Пример обработки команды
                if message.lower() == 'time':
                    current_time = time.strftime('%Y-%m-%d %H:%M:%S')
                    client_socket.send(f"Server time: {current_time}".encode('utf-8'))
                elif message.lower() == 'exit':
                    client_socket.send("Goodbye!".encode('utf-8'))
                    break
                    
        except ConnectionResetError:
            print(f"Connection lost with {client_address}")
        except Exception as e:
            print(f"Error with {client_address}: {e}")
        finally:
            client_socket.close()
            print(f"Connection with {client_address} closed")
    
    def stop(self):
        """Остановка сервера"""
        if self.server_socket:
            self.server_socket.close()
            print("Server stopped")

# Дополнительный пример: TCP-клиент
class TCPClient:
    def __init__(self, host='127.0.0.1', port=8888):
        self.host = host
        self.port = port
        self.client_socket = None
        
    def connect(self):
        """Подключение к TCP-серверу"""
        try:
            self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # Устанавливаем таймаут
            self.client_socket.settimeout(10)
            self.client_socket.connect((self.host, self.port))
            
            # Получаем приветственное сообщение
            welcome = self.client_socket.recv(1024).decode('utf-8')
            print(f"Server says: {welcome}")
            
            return True
            
        except Exception as e:
            print(f"Connection error: {e}")
            return False
    
    def send_message(self, message):
        """Отправка сообщения на сервер"""
        try:
            self.client_socket.send(message.encode('utf-8'))
            
            # Получаем ответ
            response = self.client_socket.recv(1024).decode('utf-8')
            print(f"Server response: {response}")
            
            return response
            
        except Exception as e:
            print(f"Communication error: {e}")
            return None
    
    def disconnect(self):
        """Отключение от сервера"""
        if self.client_socket:
            self.client_socket.close()
            print("Disconnected from server")

# Пример использования
if __name__ == "__main__":
    # Запуск сервера в отдельном потоке
    server = TCPServer()
    server_thread = threading.Thread(target=server.start, daemon=True)
    server_thread.start()
    
    # Даем серверу время на запуск
    time.sleep(1)
    
    # Тестируем клиент
    client = TCPClient()
    
    if client.connect():
        # Тестовая сессия
        test_messages = ["Hello, Server!", "time", "How are you?", "exit"]
        
        for msg in test_messages:
            print(f"\nSending: {msg}")
            client.send_message(msg)
            time.sleep(1)
        
        client.disconnect()
    
    # Даем время для завершения работы
    time.sleep(2)

UDP (User Datagram Protocol) {#udp}

Теория

UDP — ненадежный дейтаграммный протокол транспортного уровня без установления соединения.

Основные характеристики:

  • Минимальная задержка
  • Нет гарантии доставки
  • Нет упорядочивания пакетов
  • Нет контроля перегрузки
  • Заголовок фиксированного размера (8 байт)
  • Подходит для streaming, DNS, VoIP

Заголовок UDP (8 байт):

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|          Source Port          |       Destination Port        |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|            Length             |           Checksum            |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Пример UDP-сервера и клиента на Python

import socket
import threading
import time
import struct

class UDPServer:
    def __init__(self, host='127.0.0.1', port=8889):
        self.host = host
        self.port = port
        self.server_socket = None
        self.running = False
        
    def start(self):
        """Запуск UDP-сервера"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.server_socket.bind((self.host, self.port))
        self.running = True
        
        print(f"UDP Server started on {self.host}:{self.port}")
        
        # Обработка пакетов
        while self.running:
            try:
                # Получаем данные и адрес клиента
                data, client_address = self.server_socket.recvfrom(1024)
                
                # Обрабатываем в отдельном потоке
                client_thread = threading.Thread(
                    target=self.handle_client,
                    args=(data, client_address)
                )
                client_thread.daemon = True
                client_thread.start()
                
            except KeyboardInterrupt:
                print("\nServer shutting down...")
                self.stop()
            except Exception as e:
                print(f"Server error: {e}")
    
    def handle_client(self, data, client_address):
        """Обработка UDP-пакета"""
        try:
            message = data.decode('utf-8')
            print(f"Received from {client_address}: {message}")
            
            # Эхо-ответ
            response = f"UDP-ECHO: {message}"
            self.server_socket.sendto(response.encode('utf-8'), client_address)
            
            # Пример работы с бинарными данными
            if message.startswith("BINARY"):
                # Отправляем бинарные данные (int + float)
                binary_data = struct.pack('If', 42, 3.14)  # Integer + Float
                self.server_socket.sendto(binary_data, client_address)
                
        except Exception as e:
            print(f"Error processing data from {client_address}: {e}")
    
    def stop(self):
        """Остановка сервера"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()
        print("UDP Server stopped")

class UDPClient:
    def __init__(self, server_host='127.0.0.1', server_port=8889):
        self.server_host = server_host
        self.server_port = server_port
        self.client_socket = None
        self.timeout = 5  # секунд
        
    def create_socket(self):
        """Создание UDP-сокета"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.client_socket.settimeout(self.timeout)
        
    def send_message(self, message, expect_binary=False):
        """Отправка сообщения на UDP-сервер"""
        try:
            if not self.client_socket:
                self.create_socket()
            
            # Отправка данных
            print(f"Sending: {message}")
            self.client_socket.sendto(
                message.encode('utf-8'),
                (self.server_host, self.server_port)
            )
            
            # Ожидание ответа
            response, server_address = self.client_socket.recvfrom(1024)
            
            if expect_binary:
                # Декодируем бинарные данные
                unpacked = struct.unpack('If', response)
                print(f"Binary response from {server_address}: {unpacked}")
                return unpacked
            else:
                decoded_response = response.decode('utf-8')
                print(f"Response from {server_address}: {decoded_response}")
                return decoded_response
                
        except socket.timeout:
            print(f"No response from server within {self.timeout} seconds")
            return None
        except Exception as e:
            print(f"Communication error: {e}")
            return None
    
    def close(self):
        """Закрытие сокета"""
        if self.client_socket:
            self.client_socket.close()
            print("UDP socket closed")

# Пример использования UDP с мультикастом
class UDPMulticastReceiver:
    """Приемник мультикаст-трафика"""
    
    def __init__(self, multicast_group='224.0.0.100', port=9999):
        self.multicast_group = multicast_group
        self.port = port
        self.socket = None
        
    def start(self):
        """Запуск приемника мультикаст-сообщений"""
        # Создаем UDP-сокет
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        
        # Привязываем к порту
        self.socket.bind(('', self.port))
        
        # Присоединяемся к мультикаст-группе
        group = socket.inet_aton(self.multicast_group)
        mreq = struct.pack('4sL', group, socket.INADDR_ANY)
        self.socket.setsockopt(
            socket.IPPROTO_IP, 
            socket.IP_ADD_MEMBERSHIP, 
            mreq
        )
        
        print(f"Multicast receiver started on {self.multicast_group}:{self.port}")
        
        try:
            while True:
                data, address = self.socket.recvfrom(1024)
                print(f"Multicast from {address}: {data.decode('utf-8')}")
        except KeyboardInterrupt:
            print("\nMulticast receiver stopped")
        finally:
            self.stop()
    
    def stop(self):
        if self.socket:
            self.socket.close()

class UDPMulticastSender:
    """Отправитель мультикаст-трафика"""
    
    def __init__(self, multicast_group='224.0.0.100', port=9999, ttl=1):
        self.multicast_group = multicast_group
        self.port = port
        self.ttl = ttl
        self.socket = None
        
    def send(self, message):
        """Отправка мультикаст-сообщения"""
        if not self.socket:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            
            # Устанавливаем TTL (Time To Live)
            self.socket.setsockopt(
                socket.IPPROTO_IP, 
                socket.IP_MULTICAST_TTL, 
                struct.pack('b', self.ttl)
            )
        
        self.socket.sendto(
            message.encode('utf-8'),
            (self.multicast_group, self.port)
        )
        print(f"Multicast sent to {self.multicast_group}:{self.port}")

# Пример использования
if __name__ == "__main__":
    # Запуск UDP-сервера
    udp_server = UDPServer()
    server_thread = threading.Thread(target=udp_server.start, daemon=True)
    server_thread.start()
    
    time.sleep(1)
    
    # Тестируем UDP-клиент
    udp_client = UDPClient()
    
    # Отправляем несколько сообщений
    messages = ["Hello UDP!", "BINARY_TEST", "Another message"]
    
    for msg in messages:
        udp_client.send_message(msg, expect_binary=("BINARY" in msg))
        time.sleep(1)
    
    udp_client.close()
    
    # Пример мультикаста
    print("\n--- Multicast Example ---")
    
    # Запускаем приемник в отдельном потоке
    multicast_receiver = UDPMulticastReceiver()
    receiver_thread = threading.Thread(target=multicast_receiver.start, daemon=True)
    receiver_thread.start()
    
    time.sleep(1)
    
    # Отправляем мультикаст-сообщения
    multicast_sender = UDPMulticastSender()
    
    for i in range(3):
        multicast_sender.send(f"Multicast message {i}")
        time.sleep(1)
    
    time.sleep(2)

HTTP (HyperText Transfer Protocol) {#http}

Теория

HTTP — протокол прикладного уровня для передачи гипертекстовых документов.

Версии:

  • HTTP/1.0 (одно соединение на запрос)
  • HTTP/1.1 (persistent connections, pipelining)
  • HTTP/2 (бинарный, мультиплексирование, server push)
  • HTTP/3 (на базе QUIC поверх UDP)

Основные методы:

  • GET - получение ресурса
  • POST - создание ресурса
  • PUT - обновление ресурса
  • DELETE - удаление ресурса
  • PATCH - частичное обновление
  • HEAD - получение заголовков

Коды состояния:

  • 1xx - информационные
  • 2xx - успех (200 OK)
  • 3xx - перенаправление
  • 4xx - ошибка клиента (404 Not Found)
  • 5xx - ошибка сервера (500 Internal Server Error)

Пример HTTP-сервера и клиента на Python

from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import urllib.parse
from typing import Dict, Any
import threading
import time
import requests
from dataclasses import dataclass, asdict

@dataclass
class User:
    """Модель пользователя"""
    id: int
    name: str
    email: str
    age: int

class RESTAPIHandler(BaseHTTPRequestHandler):
    """Обработчик HTTP-запросов для REST API"""
    
    # "База данных" в памяти
    users: Dict[int, User] = {
        1: User(1, "Alice", "alice@example.com", 30),
        2: User(2, "Bob", "bob@example.com", 25),
        3: User(3, "Charlie", "charlie@example.com", 35)
    }
    next_id = 4
    
    def _set_headers(self, status_code=200, content_type='application/json'):
        """Установка заголовков ответа"""
        self.send_response(status_code)
        self.send_header('Content-Type', content_type)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.end_headers()
    
    def do_OPTIONS(self):
        """Обработка CORS preflight запросов"""
        self._set_headers(200)
    
    def do_GET(self):
        """Обработка GET-запросов"""
        parsed_path = urllib.parse.urlparse(self.path)
        
        # API endpoints
        if parsed_path.path == '/api/users':
            # Получение всех пользователей
            users_list = [asdict(user) for user in self.users.values()]
            response = json.dumps({
                "success": True,
                "data": users_list,
                "count": len(users_list)
            })
            self._set_headers(200)
            self.wfile.write(response.encode('utf-8'))
            
        elif parsed_path.path.startswith('/api/users/'):
            # Получение конкретного пользователя
            try:
                user_id = int(parsed_path.path.split('/')[-1])
                if user_id in self.users:
                    response = json.dumps({
                        "success": True,
                        "data": asdict(self.users[user_id])
                    })
                    self._set_headers(200)
                else:
                    response = json.dumps({
                        "success": False,
                        "error": f"User {user_id} not found"
                    })
                    self._set_headers(404)
            except ValueError:
                response = json.dumps({
                    "success": False,
                    "error": "Invalid user ID"
                })
                self._set_headers(400)
                
            self.wfile.write(response.encode('utf-8'))
            
        elif parsed_path.path == '/api/health':
            # Health check endpoint
            response = json.dumps({
                "status": "healthy",
                "timestamp": time.time(),
                "users_count": len(self.users)
            })
            self._set_headers(200)
            self.wfile.write(response.encode('utf-8'))
            
        else:
            # 404 для неизвестных путей
            response = json.dumps({
                "success": False,
                "error": "Endpoint not found"
            })
            self._set_headers(404)
            self.wfile.write(response.encode('utf-8'))
    
    def do_POST(self):
        """Обработка POST-запросов"""
        if self.path == '/api/users':
            # Создание нового пользователя
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length)
            
            try:
                user_data = json.loads(post_data.decode('utf-8'))
                
                # Валидация данных
                if not all(k in user_data for k in ['name', 'email', 'age']):
                    raise ValueError("Missing required fields")
                
                # Создание нового пользователя
                new_user = User(
                    id=self.next_id,
                    name=user_data['name'],
                    email=user_data['email'],
                    age=user_data['age']
                )
                
                self.users[self.next_id] = new_user
                self.next_id += 1
                
                response = json.dumps({
                    "success": True,
                    "data": asdict(new_user),
                    "message": "User created successfully"
                })
                self._set_headers(201)
                
            except json.JSONDecodeError:
                response = json.dumps({
                    "success": False,
                    "error": "Invalid JSON"
                })
                self._set_headers(400)
            except ValueError as e:
                response = json.dumps({
                    "success": False,
                    "error": str(e)
                })
                self._set_headers(400)
            
            self.wfile.write(response.encode('utf-8'))
        else:
            self._set_headers(404)
            self.wfile.write(json.dumps({
                "success": False,
                "error": "Endpoint not found"
            }).encode('utf-8'))
    
    def do_PUT(self):
        """Обработка PUT-запросов"""
        if self.path.startswith('/api/users/'):
            try:
                user_id = int(self.path.split('/')[-1])
                
                if user_id not in self.users:
                    self._set_headers(404)
                    self.wfile.write(json.dumps({
                        "success": False,
                        "error": f"User {user_id} not found"
                    }).encode('utf-8'))
                    return
                
                content_length = int(self.headers['Content-Length'])
                put_data = self.rfile.read(content_length)
                user_data = json.loads(put_data.decode('utf-8'))
                
                # Обновление пользователя
                user = self.users[user_id]
                if 'name' in user_data:
                    user.name = user_data['name']
                if 'email' in user_data:
                    user.email = user_data['email']
                if 'age' in user_data:
                    user.age = user_data['age']
                
                response = json.dumps({
                    "success": True,
                    "data": asdict(user),
                    "message": "User updated successfully"
                })
                self._set_headers(200)
                
            except (ValueError, json.JSONDecodeError) as e:
                response = json.dumps({
                    "success": False,
                    "error": str(e)
                })
                self._set_headers(400)
            
            self.wfile.write(response.encode('utf-8'))
    
    def do_DELETE(self):
        """Обработка DELETE-запросов"""
        if self.path.startswith('/api/users/'):
            try:
                user_id = int(self.path.split('/')[-1])
                
                if user_id in self.users:
                    deleted_user = self.users.pop(user_id)
                    response = json.dumps({
                        "success": True,
                        "message": f"User {deleted_user.name} deleted",
                        "data": asdict(deleted_user)
                    })
                    self._set_headers(200)
                else:
                    response = json.dumps({
                        "success": False,
                        "error": f"User {user_id} not found"
                    })
                    self._set_headers(404)
                    
            except ValueError:
                response = json.dumps({
                    "success": False,
                    "error": "Invalid user ID"
                })
                self._set_headers(400)
            
            self.wfile.write(response.encode('utf-8'))
    
    def log_message(self, format: str, *args: Any) -> None:
        """Кастомное логирование"""
        print(f"{self.log_date_time_string()} - {self.address_string()} - {format % args}")

class ThreadedHTTPServer:
    """Многопоточный HTTP-сервер"""
    
    def __init__(self, host='localhost', port=8080):
        self.host = host
        self.port = port
        self.server = HTTPServer((host, port), RESTAPIHandler)
        
    def start(self):
        """Запуск сервера"""
        print(f"HTTP Server started on http://{self.host}:{self.port}")
        print("Available endpoints:")
        print("  GET    /api/users     - Get all users")
        print("  GET    /api/users/{id} - Get user by ID")
        print("  POST   /api/users     - Create new user")
        print("  PUT    /api/users/{id} - Update user")
        print("  DELETE /api/users/{id} - Delete user")
        print("  GET    /api/health    - Health check")
        
        try:
            self.server.serve_forever()
        except KeyboardInterrupt:
            print("\nHTTP Server shutting down...")
        finally:
            self.stop()
    
    def stop(self):
        """Остановка сервера"""
        self.server.server_close()
        print("HTTP Server stopped")

class HTTPClientExample:
    """Примеры использования HTTP-клиента"""
    
    def __init__(self, base_url='http://localhost:8080'):
        self.base_url = base_url
        self.session = requests.Session()
        
    def test_all_endpoints(self):
        """Тестирование всех endpoints API"""
        
        print("=== Testing REST API ===")
        
        # 1. Health check
        print("\n1. Testing health check:")
        response = self.session.get(f"{self.base_url}/api/health")
        print(f"   Status: {response.status_code}")
        print(f"   Response: {response.json()}")
        
        # 2. Получение всех пользователей
        print("\n2. Getting all users:")
        response = self.session.get(f"{self.base_url}/api/users")
        print(f"   Status: {response.status_code}")
        users = response.json()
        print(f"   Users count: {users.get('count')}")
        
        # 3. Получение конкретного пользователя
        print("\n3. Getting user with ID 1:")
        response = self.session.get(f"{self.base_url}/api/users/1")
        print(f"   Status: {response.status_code}")
        print(f"   User: {response.json()}")
        
        # 4. Создание нового пользователя
        print("\n4. Creating new user:")
        new_user = {
            "name": "David",
            "email": "david@example.com",
            "age": 28
        }
        response = self.session.post(
            f"{self.base_url}/api/users",
            json=new_user
        )
        print(f"   Status: {response.status_code}")
        print(f"   Created user: {response.json()}")
        
        # 5. Обновление пользователя
        print("\n5. Updating user with ID 1:")
        update_data = {
            "name": "Alice Updated",
            "age": 31
        }
        response = self.session.put(
            f"{self.base_url}/api/users/1",
            json=update_data
        )
        print(f"   Status: {response.status_code}")
        print(f"   Updated user: {response.json()}")
        
        # 6. Удаление пользователя
        print("\n6. Deleting user with ID 2:")
        response = self.session.delete(f"{self.base_url}/api/users/2")
        print(f"   Status: {response.status_code}")
        print(f"   Response: {response.json()}")
        
        # 7. Проверка ошибок
        print("\n7. Testing error cases:")
        
        # Несуществующий пользователь
        response = self.session.get(f"{self.base_url}/api/users/999")
        print(f"   Non-existent user: Status {response.status_code}")
        
        # Неверный JSON
        response = self.session.post(
            f"{self.base_url}/api/users",
            data="invalid json",
            headers={'Content-Type': 'application/json'}
        )
        print(f"   Invalid JSON: Status {response.status_code}")
        
        print("\n=== Testing completed ===")

# Асинхронный HTTP-клиент (используя aiohttp)
async def async_http_example():
    """Пример асинхронного HTTP-клиента"""
    try:
        import aiohttp
        import asyncio
        
        async with aiohttp.ClientSession() as session:
            # Параллельные запросы
            urls = [
                'http://localhost:8080/api/users/1',
                'http://localhost:8080/api/users/2',
                'http://localhost:8080/api/health'
            ]
            
            tasks = []
            for url in urls:
                task = asyncio.create_task(session.get(url))
                tasks.append(task)
            
            responses = await asyncio.gather(*tasks)
            
            for i, response in enumerate(responses):
                data = await response.json()
                print(f"Response from {urls[i]}: {data}")
                
    except ImportError:
        print("aiohttp not installed. Install with: pip install aiohttp")

# Пример использования
if __name__ == "__main__":
    # Запуск HTTP-сервера в отдельном потоке
    http_server = ThreadedHTTPServer(port=8080)
    server_thread = threading.Thread(target=http_server.start, daemon=True)
    server_thread.start()
    
    # Даем серверу время на запуск
    time.sleep(2)
    
    # Тестируем клиент
    client = HTTPClientExample()
    client.test_all_endpoints()
    
    # Дополнительные примеры с requests
    print("\n=== Advanced requests examples ===")
    
    # Работа с заголовками
    headers = {
        'User-Agent': 'MyCustomClient/1.0',
        'Accept': 'application/json',
        'Authorization': 'Bearer test_token'
    }
    
    # Query parameters
    params = {'page': 1, 'limit': 10}
    
    # Timeout and error handling
    try:
        response = requests.get(
            'http://localhost:8080/api/users',
            headers=headers,
            params=params,
            timeout=5
        )
        response.raise_for_status()  # Выбрасывает исключение для 4xx/5xx
        print("Request successful")
    except requests.exceptions.Timeout:
        print("Request timed out")
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error: {e}")
    except requests.exceptions.RequestException as e:
        print(f"Request error: {e}")
    
    # Keep-alive connection
    with requests.Session() as session:
        # Первый запрос устанавливает соединение
        session.get('http://localhost:8080/api/health')
        
        # Последующие запросы используют существующее соединение
        for i in range(3):
            response = session.get(f'http://localhost:8080/api/users/{i+1}')
            print(f"Request {i+1}: {response.status_code}")
    
    # Завершаем программу
    print("\nAll tests completed")
    time.sleep(1)

gRPC (gRPC Remote Procedure Calls) {#grpc}

Теория

gRPC — современный высокопроизводительный RPC-фреймворк с открытым исходным кодом.

Основные характеристики:

  • Использует Protocol Buffers (protobuf) как IDL
  • Работает поверх HTTP/2
  • Поддерживает потоковую передачу
  • Кроссплатформенность
  • Автоматическая генерация кода
  • Поддержка аутентификации, балансировки нагрузки

Типы RPC:

  1. Unary RPC — один запрос, один ответ
  2. Server streaming RPC — один запрос, поток ответов
  3. Client streaming RPC — поток запросов, один ответ
  4. Bidirectional streaming RPC — двунаправленный поток

Установка зависимостей

pip install grpcio grpcio-tools protobuf

Пример gRPC сервиса

1. Создаем файл protobuf (user_service.proto):

syntax = "proto3";

package user_service;

// Определение службы
service UserService {
  // Unary RPC
  rpc GetUser (GetUserRequest) returns (UserResponse);
  
  // Server streaming RPC
  rpc GetUsers (GetUsersRequest) returns (stream UserResponse);
  
  // Client streaming RPC
  rpc CreateUsers (stream CreateUserRequest) returns (CreateUsersResponse);
  
  // Bidirectional streaming RPC
  rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

// Запросы и ответы
message GetUserRequest {
  int32 user_id = 1;
}

message GetUsersRequest {
  int32 limit = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  int32 age = 3;
}

message CreateUsersResponse {
  int32 created_count = 1;
  repeated UserResponse users = 2;
}

message UserResponse {
  int32 id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
  string status = 5;
}

message ChatMessage {
  string user_id = 1;
  string message = 2;
  int64 timestamp = 3;
}

2. Генерация Python кода из .proto:

# generate_proto.py
import subprocess
import os

def generate_proto_code():
    """Генерация Python кода из .proto файла"""
    proto_file = "user_service.proto"
    output_dir = "generated"
    
    # Создаем директорию для сгенерированного кода
    os.makedirs(output_dir, exist_ok=True)
    
    # Команда для генерации кода
    cmd = [
        "python", "-m", "grpc_tools.protoc",
        f"-I.",
        f"--python_out={output_dir}",
        f"--grpc_python_out={output_dir}",
        proto_file
    ]
    
    # Выполняем команду
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"Successfully generated code in {output_dir}/")
        print("Generated files:")
        for file in os.listdir(output_dir):
            print(f"  - {file}")
    else:
        print("Error generating code:")
        print(result.stderr)
    
    return result.returncode

if __name__ == "__main__":
    generate_proto_code()

3. Реализация gRPC сервера:

# grpc_server.py
import grpc
from concurrent import futures
import time
import threading
from typing import Iterator, List
import uuid
from datetime import datetime

# Импортируем сгенерированный код
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc

class User:
    """Модель пользователя"""
    def __init__(self, id: int, name: str, email: str, age: int):
        self.id = id
        self.name = name
        self.email = email
        self.age = age
    
    def to_proto(self) -> pb2.UserResponse:
        """Преобразование в protobuf сообщение"""
        return pb2.UserResponse(
            id=self.id,
            name=self.name,
            email=self.email,
            age=self.age,
            status="ACTIVE"
        )

class UserService(pb2_grpc.UserServiceServicer):
    """Реализация gRPC сервиса"""
    
    def __init__(self):
        # Имитация базы данных
        self.users = {
            1: User(1, "Alice", "alice@example.com", 30),
            2: User(2, "Bob", "bob@example.com", 25),
            3: User(3, "Charlie", "charlie@example.com", 35)
        }
        self.next_id = 4
        self.chat_clients = {}
        self.lock = threading.Lock()
    
    def GetUser(self, request: pb2.GetUserRequest, context) -> pb2.UserResponse:
        """Unary RPC: Получение пользователя по ID"""
        print(f"[GetUser] Request for user_id: {request.user_id}")
        
        user = self.users.get(request.user_id)
        if user:
            return user.to_proto()
        else:
            # Устанавливаем статус ошибки
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {request.user_id} not found")
            return pb2.UserResponse()
    
    def GetUsers(self, request: pb2.GetUsersRequest, context) -> Iterator[pb2.UserResponse]:
        """Server streaming RPC: Получение списка пользователей с потоковой передачей"""
        print(f"[GetUsers] Streaming users, limit: {request.limit}")
        
        limit = request.limit or len(self.users)
        users_sent = 0
        
        for user in self.users.values():
            if users_sent >= limit:
                break
            
            # Имитация задержки для демонстрации потока
            time.sleep(0.5)
            
            print(f"[GetUsers] Sending user: {user.name}")
            yield user.to_proto()
            users_sent += 1
        
        print(f"[GetUsers] Stream completed. Sent {users_sent} users")
    
    def CreateUsers(self, request_iterator: Iterator[pb2.CreateUserRequest], context) -> pb2.CreateUsersResponse:
        """Client streaming RPC: Создание нескольких пользователей"""
        print("[CreateUsers] Receiving user stream...")
        
        created_users = []
        
        for i, user_request in enumerate(request_iterator):
            print(f"[CreateUsers] Received user #{i+1}: {user_request.name}")
            
            # Создаем нового пользователя
            with self.lock:
                user_id = self.next_id
                new_user = User(
                    id=user_id,
                    name=user_request.name,
                    email=user_request.email,
                    age=user_request.age
                )
                self.users[user_id] = new_user
                self.next_id += 1
            
            created_users.append(new_user)
            time.sleep(0.2)  # Имитация обработки
        
        print(f"[CreateUsers] Created {len(created_users)} users")
        
        # Возвращаем ответ
        return pb2.CreateUsersResponse(
            created_count=len(created_users),
            users=[user.to_proto() for user in created_users]
        )
    
    def Chat(self, request_iterator: Iterator[pb2.ChatMessage], context) -> Iterator[pb2.ChatMessage]:
        """Bidirectional streaming RPC: Чат в реальном времени"""
        client_id = str(uuid.uuid4())
        print(f"[Chat] Client connected: {client_id}")
        
        # Регистрируем клиента
        with self.lock:
            self.chat_clients[client_id] = context
        
        try:
            # Обрабатываем входящие сообщения
            for message in request_iterator:
                print(f"[Chat] Message from {message.user_id}: {message.message}")
                
                # Рассылаем сообщение всем клиентам
                with self.lock:
                    for cid, client_context in self.chat_clients.items():
                        if cid != client_id:  # Не отправляем себе
                            try:
                                # Создаем ответное сообщение
                                response = pb2.ChatMessage(
                                    user_id="Server",
                                    message=f"Echo: {message.message}",
                                    timestamp=int(datetime.now().timestamp())
                                )
                                yield response
                            except Exception as e:
                                print(f"[Chat] Error sending to client {cid}: {e}")
                
        except grpc.RpcError as e:
            print(f"[Chat] Client {client_id} disconnected: {e}")
        finally:
            # Удаляем клиента при отключении
            with self.lock:
                if client_id in self.chat_clients:
                    del self.chat_clients[client_id]
            print(f"[Chat] Client {client_id} removed")

class GRPCServer:
    """Управляемый gRPC сервер"""
    
    def __init__(self, host='[::]', port=50051, max_workers=10):
        self.host = host
        self.port = port
        self.max_workers = max_workers
        self.server = None
    
    def start(self):
        """Запуск gRPC сервера"""
        # Создаем сервер с пулом потоков
        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers))
        
        # Регистрируем сервис
        pb2_grpc.add_UserServiceServicer_to_server(UserService(), self.server)
        
        # Добавляем порт
        server_address = f"{self.host}:{self.port}"
        self.server.add_insecure_port(server_address)
        
        # Запускаем сервер
        self.server.start()
        print(f"gRPC Server started on {server_address}")
        print("Available methods:")
        print("  - GetUser (Unary)")
        print("  - GetUsers (Server streaming)")
        print("  - CreateUsers (Client streaming)")
        print("  - Chat (Bidirectional streaming)")
        
        # Блокируем основной поток
        try:
            self.server.wait_for_termination()
        except KeyboardInterrupt:
            print("\ngRPC Server shutting down...")
        finally:
            self.stop()
    
    def stop(self):
        """Остановка сервера"""
        if self.server:
            self.server.stop(0)
            print("gRPC Server stopped")

# Пример middleware для логирования
class LoggingInterceptor(grpc.ServerInterceptor):
    """Interceptor для логирования запросов"""
    
    def intercept_service(self, continuation, handler_call_details):
        print(f"[Interceptor] Method: {handler_call_details.method}")
        print(f"[Interceptor] Time: {datetime.now().isoformat()}")
        return continuation(handler_call_details)

4. Реализация gRPC клиента:

# grpc_client.py
import grpc
import time
import threading
from typing import List
from datetime import datetime

# Импортируем сгенерированный код
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc

class GRPCClient:
    """gRPC клиент с примерами всех типов RPC"""
    
    def __init__(self, host='localhost', port=50051):
        self.channel = None
        self.stub = None
        self.server_address = f"{host}:{port}"
    
    def connect(self):
        """Подключение к серверу"""
        try:
            # Создаем канал с опциями
            options = [
                ('grpc.max_send_message_length', 100 * 1024 * 1024),  # 100MB
                ('grpc.max_receive_message_length', 100 * 1024 * 1024),  # 100MB
                ('grpc.keepalive_time_ms', 10000),  # Keepalive каждые 10 секунд
            ]
            
            self.channel = grpc.insecure_channel(self.server_address, options=options)
            self.stub = pb2_grpc.UserServiceStub(self.channel)
            
            # Тестовый запрос для проверки соединения
            response = self.stub.GetUser(pb2.GetUserRequest(user_id=1), timeout=5)
            print(f"Connected to gRPC server at {self.server_address}")
            print(f"Test response: {response.name} ({response.email})")
            
            return True
            
        except grpc.RpcError as e:
            print(f"Connection error: {e.code()}: {e.details()}")
            return False
    
    def test_unary_rpc(self):
        """Тестирование Unary RPC"""
        print("\n=== Testing Unary RPC ===")
        
        try:
            # Получение существующего пользователя
            response = self.stub.GetUser(pb2.GetUserRequest(user_id=1))
            print(f"User 1: {response.name}, Age: {response.age}, Status: {response.status}")
            
            # Попытка получить несуществующего пользователя
            try:
                response = self.stub.GetUser(pb2.GetUserRequest(user_id=999))
            except grpc.RpcError as e:
                print(f"Error as expected: {e.code()}: {e.details()}")
                
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()}: {e.details()}")
    
    def test_server_streaming(self):
        """Тестирование Server Streaming RPC"""
        print("\n=== Testing Server Streaming RPC ===")
        
        try:
            # Запрос потока пользователей
            request = pb2.GetUsersRequest(limit=3)
            responses = self.stub.GetUsers(request)
            
            print("Receiving user stream...")
            for i, user in enumerate(responses, 1):
                print(f"User {i}: {user.name} ({user.email})")
                time.sleep(0.1)  # Имитация обработки
                
            print("Stream completed")
            
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()}: {e.details()}")
    
    def test_client_streaming(self):
        """Тестирование Client Streaming RPC"""
        print("\n=== Testing Client Streaming RPC ===")
        
        def generate_requests():
            """Генератор запросов для stream"""
            users_data = [
                {"name": "David", "email": "david@example.com", "age": 28},
                {"name": "Eve", "email": "eve@example.com", "age": 22},
                {"name": "Frank", "email": "frank@example.com", "age": 40},
            ]
            
            for user_data in users_data:
                yield pb2.CreateUserRequest(**user_data)
                time.sleep(0.3)  # Имитация задержки между запросами
        
        try:
            # Отправка потока запросов
            response = self.stub.CreateUsers(generate_requests())
            print(f"Created {response.created_count} users")
            for user in response.users:
                print(f"  - {user.name} (ID: {user.id})")
                
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()}: {e.details()}")
    
    def test_bidirectional_streaming(self):
        """Тестирование Bidirectional Streaming RPC"""
        print("\n=== Testing Bidirectional Streaming RPC ===")
        
        def send_messages():
            """Поток для отправки сообщений"""
            messages = [
                "Hello from client!",
                "How are you?",
                "This is bidirectional streaming",
                "Goodbye!"
            ]
            
            for i, message in enumerate(messages):
                time.sleep(1)
                msg = pb2.ChatMessage(
                    user_id="Client",
                    message=message,
                    timestamp=int(datetime.now().timestamp())
                )
                print(f"Sending: {message}")
                yield msg
        
        def receive_messages():
            """Получение сообщений от сервера"""
            try:
                # Отправляем запрос и получаем поток ответов
                responses = self.stub.Chat(send_messages())
                
                for response in responses:
                    print(f"Received from {response.user_id}: {response.message}")
                    print(f"  Timestamp: {datetime.fromtimestamp(response.timestamp)}")
                    
            except grpc.RpcError as e:
                print(f"Chat error: {e.code()}: {e.details()}")
        
        # Запускаем в отдельном потоке, чтобы не блокировать
        chat_thread = threading.Thread(target=receive_messages)
        chat_thread.daemon = True
        chat_thread.start()
        
        # Ждем завершения чата
        time.sleep(6)
        print("Chat test completed")
    
    def test_concurrent_requests(self):
        """Тестирование конкурентных запросов"""
        print("\n=== Testing Concurrent Requests ===")
        
        def make_request(user_id):
            try:
                response = self.stub.GetUser(pb2.GetUserRequest(user_id=user_id))
                print(f"Thread {user_id}: Got {response.name}")
            except grpc.RpcError as e:
                print(f"Thread {user_id}: Error {e.code()}")
        
        # Создаем несколько потоков
        threads = []
        user_ids = [1, 2, 3, 999]  # Последний вызовет ошибку
        
        for user_id in user_ids:
            thread = threading.Thread(target=make_request, args=(user_id,))
            threads.append(thread)
            thread.start()
            time.sleep(0.1)
        
        # Ждем завершения всех потоков
        for thread in threads:
            thread.join()
        
        print("Concurrent requests completed")
    
    def disconnect(self):
        """Отключение от сервера"""
        if self.channel:
            self.channel.close()
            print("Disconnected from gRPC server")

# Пример с метаданными и дедлайнами
class AdvancedGRPCClient:
    """Продвинутый клиент с дополнительными возможностями"""
    
    def __init__(self, stub):
        self.stub = stub
    
    def call_with_metadata(self):
        """Вызов с метаданными"""
        print("\n=== Testing with Metadata ===")
        
        # Создаем метаданные
        metadata = [
            ('authorization', 'Bearer my_token'),
            ('client-id', 'python-client-1.0'),
            ('request-id', '12345'),
        ]
        
        try:
            # Вызов с метаданными и дедлайном
            response = self.stub.GetUser(
                pb2.GetUserRequest(user_id=1),
                metadata=metadata,
                timeout=3,  # 3 секунды дедлайн
            )
            print(f"Response with metadata: {response.name}")
            
        except grpc.RpcError as e:
            print(f"Error: {e.code()}: {e.details()}")
    
    def call_with_compression(self):
        """Вызов со сжатием"""
        print("\n=== Testing with Compression ===")
        
        try:
            # Включаем сжатие для запроса
            response = self.stub.GetUser(
                pb2.GetUserRequest(user_id=1),
                compression=grpc.Compression.Gzip
            )
            print(f"Response with compression: {response.name}")
            
        except grpc.RpcError as e:
            print(f"Error: {e.code()}: {e.details()}")

# Пример использования
if __name__ == "__main__":
    # Запуск сервера в отдельном потоке
    import subprocess
    import sys
    
    # Запускаем сервер в отдельном процессе
    server_process = subprocess.Popen(
        [sys.executable, "grpc_server.py"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    
    # Даем серверу время на запуск
    time.sleep(3)
    
    # Тестируем клиент
    client = GRPCClient()
    
    if client.connect():
        # Запускаем все тесты
        client.test_unary_rpc()
        time.sleep(1)
        
        client.test_server_streaming()
        time.sleep(1)
        
        client.test_client_streaming()
        time.sleep(1)
        
        client.test_bidirectional_streaming()
        time.sleep(2)
        
        client.test_concurrent_requests()
        
        # Тестируем продвинутые возможности
        advanced_client = AdvancedGRPCClient(client.stub)
        advanced_client.call_with_metadata()
        advanced_client.call_with_compression()
        
        client.disconnect()
    
    # Останавливаем сервер
    server_process.terminate()
    server_process.wait()
    print("\nAll tests completed")

5. Дополнительно: мониторинг и health checks для gRPC:

# grpc_health.py
from generated import user_service_pb2_grpc as pb2_grpc
from generated import user_service_pb2 as pb2
import grpc
from concurrent import futures
import time

class HealthService(pb2_grpc.HealthServicer):
    """gRPC Health Checking Service"""
    
    def Check(self, request, context):
        # Здесь можно добавить логику проверки здоровья
        return pb2.HealthCheckResponse(status=pb2.HealthCheckResponse.SERVING)
    
    def Watch(self, request, context):
        """Streaming health check"""
        try:
            while True:
                yield pb2.HealthCheckResponse(status=pb2.HealthCheckResponse.SERVING)
                time.sleep(5)  # Отправляем статус каждые 5 секунд
        except:
            pass

# Использование в сервере
def create_server_with_health_check():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    # Основной сервис
    pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)
    
    # Health check сервис
    from grpc_health.v1 import health_pb2_grpc, health_pb2
    health_servicer = HealthService()
    health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
    
    return server

Сравнительная таблица {#сравнение}

Параметр TCP UDP HTTP/1.1 gRPC (HTTP/2)
Тип протокола Транспортный Транспортный Прикладной Прикладной
Надежность Гарантированная Не гарантирована Гарантированная Гарантированная
Установка соединения Требуется (handshake) Не требуется Требуется (TCP) Требуется (HTTP/2)
Порядок доставки Гарантирован Не гарантирован Гарантирован Гарантирован
Контроль перегрузки Есть Нет Есть (через TCP) Есть
Управление потоком Есть Нет Ограниченное Продвинутое
Заголовки 20-60 байт 8 байт Текстовые, большие Бинарные, HPACK сжатие
Мультиплексирование Нет Нет Ограниченное (pipelining) Есть
Push-уведомления Нет Нет Нет Есть (server push)
Тип данных Бинарный поток Дейтаграммы Текстовый (обычно) Бинарный (protobuf)
Производительность Высокая Очень высокая Средняя Очень высокая
Сложность Средняя Низкая Низкая Высокая
Использование Файлы, веб, SSH Видео, VoIP, DNS Веб-API, REST Микросервисы, стриминг

Критерии выбора протокола:

  1. TCP:

    • Когда нужна гарантированная доставка
    • Файловые трансферы, SSH, базы данных
    • Веб-сокеты, онлайн-игры (где важна надежность)
  2. UDP:

    • Приложения реального времени (VoIP, видеостриминг)
    • DNS запросы
    • Мультимедиа, онлайн-игры (где важна скорость)
    • Broadcast/multicast трафик
  3. HTTP:

    • Веб-приложения и API
    • Когда нужна простота и стандартизация
    • Совместимость с существующей инфраструктурой
    • Когда клиенты — это браузеры или мобильные приложения
  4. gRPC:

    • Микросервисная архитектура
    • Высокопроизводительные внутренние API
    • Стриминг данных в реальном времени
    • Полиглотные среды (разные языки программирования)
    • Когда нужны strict API contracts

Заключение

Каждый протокол имеет свои сильные стороны и оптимальные сферы применения:

  • TCP/UDP — фундаментальные транспортные протоколы
  • HTTP — универсальный протокол для веб
  • gRPC — современное решение для микросервисов

Выбор зависит от требований к надежности, производительности, сложности реализации и существующей инфраструктуры. В современных системах часто используется комбинация этих протоколов для разных задач внутри одной архитектуры.