Подробный обзор протоколов интеграции: TCP, UDP, HTTP, gRPC
Подробный обзор протоколов интеграции: TCP, UDP, HTTP, gRPC
Содержание
- Сетевая модель OSI/TCP-IP
- TCP (Transmission Control Protocol)
- UDP (User Datagram Protocol)
- HTTP (HyperText Transfer Protocol)
- gRPC (gRPC Remote Procedure Calls)
- Сравнительная таблица
Сетевая модель OSI/TCP-IP {#сетевая-модель}
Прикладной уровень (HTTP, gRPC, FTP, DNS)
└── Транспортный уровень (TCP, UDP)
└── Сетевой уровень (IP)
└── Канальный уровень (Ethernet)
TCP (Transmission Control Protocol) {#tcp}
Теория
TCP — надежный потоковый протокол транспортного уровня с установлением соединения.
Основные характеристики:
- Гарантированная доставка данных
- Упорядоченность пакетов
- Контроль перегрузки
- Управление потоком
- Установление соединения (3-way handshake)
- Поддержка дуплексной передачи
Заголовок TCP (20-60 байт):
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Source Port | Destination Port | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Sequence Number | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Acknowledgment Number | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Data | |U|A|P|R|S|F| | | Offset| Reserved |R|C|S|S|Y|I| Window | | | |G|K|H|T|N|N| | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Checksum | Urgent Pointer | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Options | Padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Пример TCP-сервера на Python
import socket
import threading
import time
class TCPServer:
def __init__(self, host='127.0.0.1', port=8888):
self.host = host
self.port = port
self.server_socket = None
self.clients = {}
def start(self):
"""Запуск TCP-сервера"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"TCP Server started on {self.host}:{self.port}")
try:
while True:
client_socket, client_address = self.server_socket.accept()
print(f"New connection from {client_address}")
# Создаем отдельный поток для клиента
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.stop()
def handle_client(self, client_socket, client_address):
"""Обработка клиентского соединения"""
try:
# Отправляем приветственное сообщение
welcome_msg = f"Welcome to TCP Server! Your address: {client_address}"
client_socket.send(welcome_msg.encode('utf-8'))
while True:
# Получаем данные от клиента (максимум 1024 байта)
data = client_socket.recv(1024)
if not data:
print(f"Connection closed by {client_address}")
break
message = data.decode('utf-8').strip()
print(f"Received from {client_address}: {message}")
# Эхо-ответ
response = f"ECHO: {message}"
client_socket.send(response.encode('utf-8'))
# Пример обработки команды
if message.lower() == 'time':
current_time = time.strftime('%Y-%m-%d %H:%M:%S')
client_socket.send(f"Server time: {current_time}".encode('utf-8'))
elif message.lower() == 'exit':
client_socket.send("Goodbye!".encode('utf-8'))
break
except ConnectionResetError:
print(f"Connection lost with {client_address}")
except Exception as e:
print(f"Error with {client_address}: {e}")
finally:
client_socket.close()
print(f"Connection with {client_address} closed")
def stop(self):
"""Остановка сервера"""
if self.server_socket:
self.server_socket.close()
print("Server stopped")
# Дополнительный пример: TCP-клиент
class TCPClient:
def __init__(self, host='127.0.0.1', port=8888):
self.host = host
self.port = port
self.client_socket = None
def connect(self):
"""Подключение к TCP-серверу"""
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Устанавливаем таймаут
self.client_socket.settimeout(10)
self.client_socket.connect((self.host, self.port))
# Получаем приветственное сообщение
welcome = self.client_socket.recv(1024).decode('utf-8')
print(f"Server says: {welcome}")
return True
except Exception as e:
print(f"Connection error: {e}")
return False
def send_message(self, message):
"""Отправка сообщения на сервер"""
try:
self.client_socket.send(message.encode('utf-8'))
# Получаем ответ
response = self.client_socket.recv(1024).decode('utf-8')
print(f"Server response: {response}")
return response
except Exception as e:
print(f"Communication error: {e}")
return None
def disconnect(self):
"""Отключение от сервера"""
if self.client_socket:
self.client_socket.close()
print("Disconnected from server")
# Пример использования
if __name__ == "__main__":
# Запуск сервера в отдельном потоке
server = TCPServer()
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()
# Даем серверу время на запуск
time.sleep(1)
# Тестируем клиент
client = TCPClient()
if client.connect():
# Тестовая сессия
test_messages = ["Hello, Server!", "time", "How are you?", "exit"]
for msg in test_messages:
print(f"\nSending: {msg}")
client.send_message(msg)
time.sleep(1)
client.disconnect()
# Даем время для завершения работы
time.sleep(2)
UDP (User Datagram Protocol) {#udp}
Теория
UDP — ненадежный дейтаграммный протокол транспортного уровня без установления соединения.
Основные характеристики:
- Минимальная задержка
- Нет гарантии доставки
- Нет упорядочивания пакетов
- Нет контроля перегрузки
- Заголовок фиксированного размера (8 байт)
- Подходит для streaming, DNS, VoIP
Заголовок UDP (8 байт):
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Source Port | Destination Port | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Length | Checksum | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Пример UDP-сервера и клиента на Python
import socket
import threading
import time
import struct
class UDPServer:
def __init__(self, host='127.0.0.1', port=8889):
self.host = host
self.port = port
self.server_socket = None
self.running = False
def start(self):
"""Запуск UDP-сервера"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((self.host, self.port))
self.running = True
print(f"UDP Server started on {self.host}:{self.port}")
# Обработка пакетов
while self.running:
try:
# Получаем данные и адрес клиента
data, client_address = self.server_socket.recvfrom(1024)
# Обрабатываем в отдельном потоке
client_thread = threading.Thread(
target=self.handle_client,
args=(data, client_address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
self.stop()
except Exception as e:
print(f"Server error: {e}")
def handle_client(self, data, client_address):
"""Обработка UDP-пакета"""
try:
message = data.decode('utf-8')
print(f"Received from {client_address}: {message}")
# Эхо-ответ
response = f"UDP-ECHO: {message}"
self.server_socket.sendto(response.encode('utf-8'), client_address)
# Пример работы с бинарными данными
if message.startswith("BINARY"):
# Отправляем бинарные данные (int + float)
binary_data = struct.pack('If', 42, 3.14) # Integer + Float
self.server_socket.sendto(binary_data, client_address)
except Exception as e:
print(f"Error processing data from {client_address}: {e}")
def stop(self):
"""Остановка сервера"""
self.running = False
if self.server_socket:
self.server_socket.close()
print("UDP Server stopped")
class UDPClient:
def __init__(self, server_host='127.0.0.1', server_port=8889):
self.server_host = server_host
self.server_port = server_port
self.client_socket = None
self.timeout = 5 # секунд
def create_socket(self):
"""Создание UDP-сокета"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.client_socket.settimeout(self.timeout)
def send_message(self, message, expect_binary=False):
"""Отправка сообщения на UDP-сервер"""
try:
if not self.client_socket:
self.create_socket()
# Отправка данных
print(f"Sending: {message}")
self.client_socket.sendto(
message.encode('utf-8'),
(self.server_host, self.server_port)
)
# Ожидание ответа
response, server_address = self.client_socket.recvfrom(1024)
if expect_binary:
# Декодируем бинарные данные
unpacked = struct.unpack('If', response)
print(f"Binary response from {server_address}: {unpacked}")
return unpacked
else:
decoded_response = response.decode('utf-8')
print(f"Response from {server_address}: {decoded_response}")
return decoded_response
except socket.timeout:
print(f"No response from server within {self.timeout} seconds")
return None
except Exception as e:
print(f"Communication error: {e}")
return None
def close(self):
"""Закрытие сокета"""
if self.client_socket:
self.client_socket.close()
print("UDP socket closed")
# Пример использования UDP с мультикастом
class UDPMulticastReceiver:
"""Приемник мультикаст-трафика"""
def __init__(self, multicast_group='224.0.0.100', port=9999):
self.multicast_group = multicast_group
self.port = port
self.socket = None
def start(self):
"""Запуск приемника мультикаст-сообщений"""
# Создаем UDP-сокет
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Привязываем к порту
self.socket.bind(('', self.port))
# Присоединяемся к мультикаст-группе
group = socket.inet_aton(self.multicast_group)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
self.socket.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
mreq
)
print(f"Multicast receiver started on {self.multicast_group}:{self.port}")
try:
while True:
data, address = self.socket.recvfrom(1024)
print(f"Multicast from {address}: {data.decode('utf-8')}")
except KeyboardInterrupt:
print("\nMulticast receiver stopped")
finally:
self.stop()
def stop(self):
if self.socket:
self.socket.close()
class UDPMulticastSender:
"""Отправитель мультикаст-трафика"""
def __init__(self, multicast_group='224.0.0.100', port=9999, ttl=1):
self.multicast_group = multicast_group
self.port = port
self.ttl = ttl
self.socket = None
def send(self, message):
"""Отправка мультикаст-сообщения"""
if not self.socket:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Устанавливаем TTL (Time To Live)
self.socket.setsockopt(
socket.IPPROTO_IP,
socket.IP_MULTICAST_TTL,
struct.pack('b', self.ttl)
)
self.socket.sendto(
message.encode('utf-8'),
(self.multicast_group, self.port)
)
print(f"Multicast sent to {self.multicast_group}:{self.port}")
# Пример использования
if __name__ == "__main__":
# Запуск UDP-сервера
udp_server = UDPServer()
server_thread = threading.Thread(target=udp_server.start, daemon=True)
server_thread.start()
time.sleep(1)
# Тестируем UDP-клиент
udp_client = UDPClient()
# Отправляем несколько сообщений
messages = ["Hello UDP!", "BINARY_TEST", "Another message"]
for msg in messages:
udp_client.send_message(msg, expect_binary=("BINARY" in msg))
time.sleep(1)
udp_client.close()
# Пример мультикаста
print("\n--- Multicast Example ---")
# Запускаем приемник в отдельном потоке
multicast_receiver = UDPMulticastReceiver()
receiver_thread = threading.Thread(target=multicast_receiver.start, daemon=True)
receiver_thread.start()
time.sleep(1)
# Отправляем мультикаст-сообщения
multicast_sender = UDPMulticastSender()
for i in range(3):
multicast_sender.send(f"Multicast message {i}")
time.sleep(1)
time.sleep(2)
HTTP (HyperText Transfer Protocol) {#http}
Теория
HTTP — протокол прикладного уровня для передачи гипертекстовых документов.
Версии:
- HTTP/1.0 (одно соединение на запрос)
- HTTP/1.1 (persistent connections, pipelining)
- HTTP/2 (бинарный, мультиплексирование, server push)
- HTTP/3 (на базе QUIC поверх UDP)
Основные методы:
- GET - получение ресурса
- POST - создание ресурса
- PUT - обновление ресурса
- DELETE - удаление ресурса
- PATCH - частичное обновление
- HEAD - получение заголовков
Коды состояния:
- 1xx - информационные
- 2xx - успех (200 OK)
- 3xx - перенаправление
- 4xx - ошибка клиента (404 Not Found)
- 5xx - ошибка сервера (500 Internal Server Error)
Пример HTTP-сервера и клиента на Python
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import urllib.parse
from typing import Dict, Any
import threading
import time
import requests
from dataclasses import dataclass, asdict
@dataclass
class User:
"""Модель пользователя"""
id: int
name: str
email: str
age: int
class RESTAPIHandler(BaseHTTPRequestHandler):
"""Обработчик HTTP-запросов для REST API"""
# "База данных" в памяти
users: Dict[int, User] = {
1: User(1, "Alice", "alice@example.com", 30),
2: User(2, "Bob", "bob@example.com", 25),
3: User(3, "Charlie", "charlie@example.com", 35)
}
next_id = 4
def _set_headers(self, status_code=200, content_type='application/json'):
"""Установка заголовков ответа"""
self.send_response(status_code)
self.send_header('Content-Type', content_type)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_OPTIONS(self):
"""Обработка CORS preflight запросов"""
self._set_headers(200)
def do_GET(self):
"""Обработка GET-запросов"""
parsed_path = urllib.parse.urlparse(self.path)
# API endpoints
if parsed_path.path == '/api/users':
# Получение всех пользователей
users_list = [asdict(user) for user in self.users.values()]
response = json.dumps({
"success": True,
"data": users_list,
"count": len(users_list)
})
self._set_headers(200)
self.wfile.write(response.encode('utf-8'))
elif parsed_path.path.startswith('/api/users/'):
# Получение конкретного пользователя
try:
user_id = int(parsed_path.path.split('/')[-1])
if user_id in self.users:
response = json.dumps({
"success": True,
"data": asdict(self.users[user_id])
})
self._set_headers(200)
else:
response = json.dumps({
"success": False,
"error": f"User {user_id} not found"
})
self._set_headers(404)
except ValueError:
response = json.dumps({
"success": False,
"error": "Invalid user ID"
})
self._set_headers(400)
self.wfile.write(response.encode('utf-8'))
elif parsed_path.path == '/api/health':
# Health check endpoint
response = json.dumps({
"status": "healthy",
"timestamp": time.time(),
"users_count": len(self.users)
})
self._set_headers(200)
self.wfile.write(response.encode('utf-8'))
else:
# 404 для неизвестных путей
response = json.dumps({
"success": False,
"error": "Endpoint not found"
})
self._set_headers(404)
self.wfile.write(response.encode('utf-8'))
def do_POST(self):
"""Обработка POST-запросов"""
if self.path == '/api/users':
# Создание нового пользователя
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
user_data = json.loads(post_data.decode('utf-8'))
# Валидация данных
if not all(k in user_data for k in ['name', 'email', 'age']):
raise ValueError("Missing required fields")
# Создание нового пользователя
new_user = User(
id=self.next_id,
name=user_data['name'],
email=user_data['email'],
age=user_data['age']
)
self.users[self.next_id] = new_user
self.next_id += 1
response = json.dumps({
"success": True,
"data": asdict(new_user),
"message": "User created successfully"
})
self._set_headers(201)
except json.JSONDecodeError:
response = json.dumps({
"success": False,
"error": "Invalid JSON"
})
self._set_headers(400)
except ValueError as e:
response = json.dumps({
"success": False,
"error": str(e)
})
self._set_headers(400)
self.wfile.write(response.encode('utf-8'))
else:
self._set_headers(404)
self.wfile.write(json.dumps({
"success": False,
"error": "Endpoint not found"
}).encode('utf-8'))
def do_PUT(self):
"""Обработка PUT-запросов"""
if self.path.startswith('/api/users/'):
try:
user_id = int(self.path.split('/')[-1])
if user_id not in self.users:
self._set_headers(404)
self.wfile.write(json.dumps({
"success": False,
"error": f"User {user_id} not found"
}).encode('utf-8'))
return
content_length = int(self.headers['Content-Length'])
put_data = self.rfile.read(content_length)
user_data = json.loads(put_data.decode('utf-8'))
# Обновление пользователя
user = self.users[user_id]
if 'name' in user_data:
user.name = user_data['name']
if 'email' in user_data:
user.email = user_data['email']
if 'age' in user_data:
user.age = user_data['age']
response = json.dumps({
"success": True,
"data": asdict(user),
"message": "User updated successfully"
})
self._set_headers(200)
except (ValueError, json.JSONDecodeError) as e:
response = json.dumps({
"success": False,
"error": str(e)
})
self._set_headers(400)
self.wfile.write(response.encode('utf-8'))
def do_DELETE(self):
"""Обработка DELETE-запросов"""
if self.path.startswith('/api/users/'):
try:
user_id = int(self.path.split('/')[-1])
if user_id in self.users:
deleted_user = self.users.pop(user_id)
response = json.dumps({
"success": True,
"message": f"User {deleted_user.name} deleted",
"data": asdict(deleted_user)
})
self._set_headers(200)
else:
response = json.dumps({
"success": False,
"error": f"User {user_id} not found"
})
self._set_headers(404)
except ValueError:
response = json.dumps({
"success": False,
"error": "Invalid user ID"
})
self._set_headers(400)
self.wfile.write(response.encode('utf-8'))
def log_message(self, format: str, *args: Any) -> None:
"""Кастомное логирование"""
print(f"{self.log_date_time_string()} - {self.address_string()} - {format % args}")
class ThreadedHTTPServer:
"""Многопоточный HTTP-сервер"""
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.server = HTTPServer((host, port), RESTAPIHandler)
def start(self):
"""Запуск сервера"""
print(f"HTTP Server started on http://{self.host}:{self.port}")
print("Available endpoints:")
print(" GET /api/users - Get all users")
print(" GET /api/users/{id} - Get user by ID")
print(" POST /api/users - Create new user")
print(" PUT /api/users/{id} - Update user")
print(" DELETE /api/users/{id} - Delete user")
print(" GET /api/health - Health check")
try:
self.server.serve_forever()
except KeyboardInterrupt:
print("\nHTTP Server shutting down...")
finally:
self.stop()
def stop(self):
"""Остановка сервера"""
self.server.server_close()
print("HTTP Server stopped")
class HTTPClientExample:
"""Примеры использования HTTP-клиента"""
def __init__(self, base_url='http://localhost:8080'):
self.base_url = base_url
self.session = requests.Session()
def test_all_endpoints(self):
"""Тестирование всех endpoints API"""
print("=== Testing REST API ===")
# 1. Health check
print("\n1. Testing health check:")
response = self.session.get(f"{self.base_url}/api/health")
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
# 2. Получение всех пользователей
print("\n2. Getting all users:")
response = self.session.get(f"{self.base_url}/api/users")
print(f" Status: {response.status_code}")
users = response.json()
print(f" Users count: {users.get('count')}")
# 3. Получение конкретного пользователя
print("\n3. Getting user with ID 1:")
response = self.session.get(f"{self.base_url}/api/users/1")
print(f" Status: {response.status_code}")
print(f" User: {response.json()}")
# 4. Создание нового пользователя
print("\n4. Creating new user:")
new_user = {
"name": "David",
"email": "david@example.com",
"age": 28
}
response = self.session.post(
f"{self.base_url}/api/users",
json=new_user
)
print(f" Status: {response.status_code}")
print(f" Created user: {response.json()}")
# 5. Обновление пользователя
print("\n5. Updating user with ID 1:")
update_data = {
"name": "Alice Updated",
"age": 31
}
response = self.session.put(
f"{self.base_url}/api/users/1",
json=update_data
)
print(f" Status: {response.status_code}")
print(f" Updated user: {response.json()}")
# 6. Удаление пользователя
print("\n6. Deleting user with ID 2:")
response = self.session.delete(f"{self.base_url}/api/users/2")
print(f" Status: {response.status_code}")
print(f" Response: {response.json()}")
# 7. Проверка ошибок
print("\n7. Testing error cases:")
# Несуществующий пользователь
response = self.session.get(f"{self.base_url}/api/users/999")
print(f" Non-existent user: Status {response.status_code}")
# Неверный JSON
response = self.session.post(
f"{self.base_url}/api/users",
data="invalid json",
headers={'Content-Type': 'application/json'}
)
print(f" Invalid JSON: Status {response.status_code}")
print("\n=== Testing completed ===")
# Асинхронный HTTP-клиент (используя aiohttp)
async def async_http_example():
"""Пример асинхронного HTTP-клиента"""
try:
import aiohttp
import asyncio
async with aiohttp.ClientSession() as session:
# Параллельные запросы
urls = [
'http://localhost:8080/api/users/1',
'http://localhost:8080/api/users/2',
'http://localhost:8080/api/health'
]
tasks = []
for url in urls:
task = asyncio.create_task(session.get(url))
tasks.append(task)
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
data = await response.json()
print(f"Response from {urls[i]}: {data}")
except ImportError:
print("aiohttp not installed. Install with: pip install aiohttp")
# Пример использования
if __name__ == "__main__":
# Запуск HTTP-сервера в отдельном потоке
http_server = ThreadedHTTPServer(port=8080)
server_thread = threading.Thread(target=http_server.start, daemon=True)
server_thread.start()
# Даем серверу время на запуск
time.sleep(2)
# Тестируем клиент
client = HTTPClientExample()
client.test_all_endpoints()
# Дополнительные примеры с requests
print("\n=== Advanced requests examples ===")
# Работа с заголовками
headers = {
'User-Agent': 'MyCustomClient/1.0',
'Accept': 'application/json',
'Authorization': 'Bearer test_token'
}
# Query parameters
params = {'page': 1, 'limit': 10}
# Timeout and error handling
try:
response = requests.get(
'http://localhost:8080/api/users',
headers=headers,
params=params,
timeout=5
)
response.raise_for_status() # Выбрасывает исключение для 4xx/5xx
print("Request successful")
except requests.exceptions.Timeout:
print("Request timed out")
except requests.exceptions.HTTPError as e:
print(f"HTTP error: {e}")
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
# Keep-alive connection
with requests.Session() as session:
# Первый запрос устанавливает соединение
session.get('http://localhost:8080/api/health')
# Последующие запросы используют существующее соединение
for i in range(3):
response = session.get(f'http://localhost:8080/api/users/{i+1}')
print(f"Request {i+1}: {response.status_code}")
# Завершаем программу
print("\nAll tests completed")
time.sleep(1)
gRPC (gRPC Remote Procedure Calls) {#grpc}
Теория
gRPC — современный высокопроизводительный RPC-фреймворк с открытым исходным кодом.
Основные характеристики:
- Использует Protocol Buffers (protobuf) как IDL
- Работает поверх HTTP/2
- Поддерживает потоковую передачу
- Кроссплатформенность
- Автоматическая генерация кода
- Поддержка аутентификации, балансировки нагрузки
Типы RPC:
- Unary RPC — один запрос, один ответ
- Server streaming RPC — один запрос, поток ответов
- Client streaming RPC — поток запросов, один ответ
- Bidirectional streaming RPC — двунаправленный поток
Установка зависимостей
pip install grpcio grpcio-tools protobuf
Пример gRPC сервиса
1. Создаем файл protobuf (user_service.proto):
syntax = "proto3";
package user_service;
// Определение службы
service UserService {
// Unary RPC
rpc GetUser (GetUserRequest) returns (UserResponse);
// Server streaming RPC
rpc GetUsers (GetUsersRequest) returns (stream UserResponse);
// Client streaming RPC
rpc CreateUsers (stream CreateUserRequest) returns (CreateUsersResponse);
// Bidirectional streaming RPC
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
// Запросы и ответы
message GetUserRequest {
int32 user_id = 1;
}
message GetUsersRequest {
int32 limit = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message CreateUsersResponse {
int32 created_count = 1;
repeated UserResponse users = 2;
}
message UserResponse {
int32 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
string status = 5;
}
message ChatMessage {
string user_id = 1;
string message = 2;
int64 timestamp = 3;
}
2. Генерация Python кода из .proto:
# generate_proto.py
import subprocess
import os
def generate_proto_code():
"""Генерация Python кода из .proto файла"""
proto_file = "user_service.proto"
output_dir = "generated"
# Создаем директорию для сгенерированного кода
os.makedirs(output_dir, exist_ok=True)
# Команда для генерации кода
cmd = [
"python", "-m", "grpc_tools.protoc",
f"-I.",
f"--python_out={output_dir}",
f"--grpc_python_out={output_dir}",
proto_file
]
# Выполняем команду
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"Successfully generated code in {output_dir}/")
print("Generated files:")
for file in os.listdir(output_dir):
print(f" - {file}")
else:
print("Error generating code:")
print(result.stderr)
return result.returncode
if __name__ == "__main__":
generate_proto_code()
3. Реализация gRPC сервера:
# grpc_server.py
import grpc
from concurrent import futures
import time
import threading
from typing import Iterator, List
import uuid
from datetime import datetime
# Импортируем сгенерированный код
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc
class User:
"""Модель пользователя"""
def __init__(self, id: int, name: str, email: str, age: int):
self.id = id
self.name = name
self.email = email
self.age = age
def to_proto(self) -> pb2.UserResponse:
"""Преобразование в protobuf сообщение"""
return pb2.UserResponse(
id=self.id,
name=self.name,
email=self.email,
age=self.age,
status="ACTIVE"
)
class UserService(pb2_grpc.UserServiceServicer):
"""Реализация gRPC сервиса"""
def __init__(self):
# Имитация базы данных
self.users = {
1: User(1, "Alice", "alice@example.com", 30),
2: User(2, "Bob", "bob@example.com", 25),
3: User(3, "Charlie", "charlie@example.com", 35)
}
self.next_id = 4
self.chat_clients = {}
self.lock = threading.Lock()
def GetUser(self, request: pb2.GetUserRequest, context) -> pb2.UserResponse:
"""Unary RPC: Получение пользователя по ID"""
print(f"[GetUser] Request for user_id: {request.user_id}")
user = self.users.get(request.user_id)
if user:
return user.to_proto()
else:
# Устанавливаем статус ошибки
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"User {request.user_id} not found")
return pb2.UserResponse()
def GetUsers(self, request: pb2.GetUsersRequest, context) -> Iterator[pb2.UserResponse]:
"""Server streaming RPC: Получение списка пользователей с потоковой передачей"""
print(f"[GetUsers] Streaming users, limit: {request.limit}")
limit = request.limit or len(self.users)
users_sent = 0
for user in self.users.values():
if users_sent >= limit:
break
# Имитация задержки для демонстрации потока
time.sleep(0.5)
print(f"[GetUsers] Sending user: {user.name}")
yield user.to_proto()
users_sent += 1
print(f"[GetUsers] Stream completed. Sent {users_sent} users")
def CreateUsers(self, request_iterator: Iterator[pb2.CreateUserRequest], context) -> pb2.CreateUsersResponse:
"""Client streaming RPC: Создание нескольких пользователей"""
print("[CreateUsers] Receiving user stream...")
created_users = []
for i, user_request in enumerate(request_iterator):
print(f"[CreateUsers] Received user #{i+1}: {user_request.name}")
# Создаем нового пользователя
with self.lock:
user_id = self.next_id
new_user = User(
id=user_id,
name=user_request.name,
email=user_request.email,
age=user_request.age
)
self.users[user_id] = new_user
self.next_id += 1
created_users.append(new_user)
time.sleep(0.2) # Имитация обработки
print(f"[CreateUsers] Created {len(created_users)} users")
# Возвращаем ответ
return pb2.CreateUsersResponse(
created_count=len(created_users),
users=[user.to_proto() for user in created_users]
)
def Chat(self, request_iterator: Iterator[pb2.ChatMessage], context) -> Iterator[pb2.ChatMessage]:
"""Bidirectional streaming RPC: Чат в реальном времени"""
client_id = str(uuid.uuid4())
print(f"[Chat] Client connected: {client_id}")
# Регистрируем клиента
with self.lock:
self.chat_clients[client_id] = context
try:
# Обрабатываем входящие сообщения
for message in request_iterator:
print(f"[Chat] Message from {message.user_id}: {message.message}")
# Рассылаем сообщение всем клиентам
with self.lock:
for cid, client_context in self.chat_clients.items():
if cid != client_id: # Не отправляем себе
try:
# Создаем ответное сообщение
response = pb2.ChatMessage(
user_id="Server",
message=f"Echo: {message.message}",
timestamp=int(datetime.now().timestamp())
)
yield response
except Exception as e:
print(f"[Chat] Error sending to client {cid}: {e}")
except grpc.RpcError as e:
print(f"[Chat] Client {client_id} disconnected: {e}")
finally:
# Удаляем клиента при отключении
with self.lock:
if client_id in self.chat_clients:
del self.chat_clients[client_id]
print(f"[Chat] Client {client_id} removed")
class GRPCServer:
"""Управляемый gRPC сервер"""
def __init__(self, host='[::]', port=50051, max_workers=10):
self.host = host
self.port = port
self.max_workers = max_workers
self.server = None
def start(self):
"""Запуск gRPC сервера"""
# Создаем сервер с пулом потоков
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers))
# Регистрируем сервис
pb2_grpc.add_UserServiceServicer_to_server(UserService(), self.server)
# Добавляем порт
server_address = f"{self.host}:{self.port}"
self.server.add_insecure_port(server_address)
# Запускаем сервер
self.server.start()
print(f"gRPC Server started on {server_address}")
print("Available methods:")
print(" - GetUser (Unary)")
print(" - GetUsers (Server streaming)")
print(" - CreateUsers (Client streaming)")
print(" - Chat (Bidirectional streaming)")
# Блокируем основной поток
try:
self.server.wait_for_termination()
except KeyboardInterrupt:
print("\ngRPC Server shutting down...")
finally:
self.stop()
def stop(self):
"""Остановка сервера"""
if self.server:
self.server.stop(0)
print("gRPC Server stopped")
# Пример middleware для логирования
class LoggingInterceptor(grpc.ServerInterceptor):
"""Interceptor для логирования запросов"""
def intercept_service(self, continuation, handler_call_details):
print(f"[Interceptor] Method: {handler_call_details.method}")
print(f"[Interceptor] Time: {datetime.now().isoformat()}")
return continuation(handler_call_details)
4. Реализация gRPC клиента:
# grpc_client.py
import grpc
import time
import threading
from typing import List
from datetime import datetime
# Импортируем сгенерированный код
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc
class GRPCClient:
"""gRPC клиент с примерами всех типов RPC"""
def __init__(self, host='localhost', port=50051):
self.channel = None
self.stub = None
self.server_address = f"{host}:{port}"
def connect(self):
"""Подключение к серверу"""
try:
# Создаем канал с опциями
options = [
('grpc.max_send_message_length', 100 * 1024 * 1024), # 100MB
('grpc.max_receive_message_length', 100 * 1024 * 1024), # 100MB
('grpc.keepalive_time_ms', 10000), # Keepalive каждые 10 секунд
]
self.channel = grpc.insecure_channel(self.server_address, options=options)
self.stub = pb2_grpc.UserServiceStub(self.channel)
# Тестовый запрос для проверки соединения
response = self.stub.GetUser(pb2.GetUserRequest(user_id=1), timeout=5)
print(f"Connected to gRPC server at {self.server_address}")
print(f"Test response: {response.name} ({response.email})")
return True
except grpc.RpcError as e:
print(f"Connection error: {e.code()}: {e.details()}")
return False
def test_unary_rpc(self):
"""Тестирование Unary RPC"""
print("\n=== Testing Unary RPC ===")
try:
# Получение существующего пользователя
response = self.stub.GetUser(pb2.GetUserRequest(user_id=1))
print(f"User 1: {response.name}, Age: {response.age}, Status: {response.status}")
# Попытка получить несуществующего пользователя
try:
response = self.stub.GetUser(pb2.GetUserRequest(user_id=999))
except grpc.RpcError as e:
print(f"Error as expected: {e.code()}: {e.details()}")
except grpc.RpcError as e:
print(f"RPC failed: {e.code()}: {e.details()}")
def test_server_streaming(self):
"""Тестирование Server Streaming RPC"""
print("\n=== Testing Server Streaming RPC ===")
try:
# Запрос потока пользователей
request = pb2.GetUsersRequest(limit=3)
responses = self.stub.GetUsers(request)
print("Receiving user stream...")
for i, user in enumerate(responses, 1):
print(f"User {i}: {user.name} ({user.email})")
time.sleep(0.1) # Имитация обработки
print("Stream completed")
except grpc.RpcError as e:
print(f"RPC failed: {e.code()}: {e.details()}")
def test_client_streaming(self):
"""Тестирование Client Streaming RPC"""
print("\n=== Testing Client Streaming RPC ===")
def generate_requests():
"""Генератор запросов для stream"""
users_data = [
{"name": "David", "email": "david@example.com", "age": 28},
{"name": "Eve", "email": "eve@example.com", "age": 22},
{"name": "Frank", "email": "frank@example.com", "age": 40},
]
for user_data in users_data:
yield pb2.CreateUserRequest(**user_data)
time.sleep(0.3) # Имитация задержки между запросами
try:
# Отправка потока запросов
response = self.stub.CreateUsers(generate_requests())
print(f"Created {response.created_count} users")
for user in response.users:
print(f" - {user.name} (ID: {user.id})")
except grpc.RpcError as e:
print(f"RPC failed: {e.code()}: {e.details()}")
def test_bidirectional_streaming(self):
"""Тестирование Bidirectional Streaming RPC"""
print("\n=== Testing Bidirectional Streaming RPC ===")
def send_messages():
"""Поток для отправки сообщений"""
messages = [
"Hello from client!",
"How are you?",
"This is bidirectional streaming",
"Goodbye!"
]
for i, message in enumerate(messages):
time.sleep(1)
msg = pb2.ChatMessage(
user_id="Client",
message=message,
timestamp=int(datetime.now().timestamp())
)
print(f"Sending: {message}")
yield msg
def receive_messages():
"""Получение сообщений от сервера"""
try:
# Отправляем запрос и получаем поток ответов
responses = self.stub.Chat(send_messages())
for response in responses:
print(f"Received from {response.user_id}: {response.message}")
print(f" Timestamp: {datetime.fromtimestamp(response.timestamp)}")
except grpc.RpcError as e:
print(f"Chat error: {e.code()}: {e.details()}")
# Запускаем в отдельном потоке, чтобы не блокировать
chat_thread = threading.Thread(target=receive_messages)
chat_thread.daemon = True
chat_thread.start()
# Ждем завершения чата
time.sleep(6)
print("Chat test completed")
def test_concurrent_requests(self):
"""Тестирование конкурентных запросов"""
print("\n=== Testing Concurrent Requests ===")
def make_request(user_id):
try:
response = self.stub.GetUser(pb2.GetUserRequest(user_id=user_id))
print(f"Thread {user_id}: Got {response.name}")
except grpc.RpcError as e:
print(f"Thread {user_id}: Error {e.code()}")
# Создаем несколько потоков
threads = []
user_ids = [1, 2, 3, 999] # Последний вызовет ошибку
for user_id in user_ids:
thread = threading.Thread(target=make_request, args=(user_id,))
threads.append(thread)
thread.start()
time.sleep(0.1)
# Ждем завершения всех потоков
for thread in threads:
thread.join()
print("Concurrent requests completed")
def disconnect(self):
"""Отключение от сервера"""
if self.channel:
self.channel.close()
print("Disconnected from gRPC server")
# Пример с метаданными и дедлайнами
class AdvancedGRPCClient:
"""Продвинутый клиент с дополнительными возможностями"""
def __init__(self, stub):
self.stub = stub
def call_with_metadata(self):
"""Вызов с метаданными"""
print("\n=== Testing with Metadata ===")
# Создаем метаданные
metadata = [
('authorization', 'Bearer my_token'),
('client-id', 'python-client-1.0'),
('request-id', '12345'),
]
try:
# Вызов с метаданными и дедлайном
response = self.stub.GetUser(
pb2.GetUserRequest(user_id=1),
metadata=metadata,
timeout=3, # 3 секунды дедлайн
)
print(f"Response with metadata: {response.name}")
except grpc.RpcError as e:
print(f"Error: {e.code()}: {e.details()}")
def call_with_compression(self):
"""Вызов со сжатием"""
print("\n=== Testing with Compression ===")
try:
# Включаем сжатие для запроса
response = self.stub.GetUser(
pb2.GetUserRequest(user_id=1),
compression=grpc.Compression.Gzip
)
print(f"Response with compression: {response.name}")
except grpc.RpcError as e:
print(f"Error: {e.code()}: {e.details()}")
# Пример использования
if __name__ == "__main__":
# Запуск сервера в отдельном потоке
import subprocess
import sys
# Запускаем сервер в отдельном процессе
server_process = subprocess.Popen(
[sys.executable, "grpc_server.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Даем серверу время на запуск
time.sleep(3)
# Тестируем клиент
client = GRPCClient()
if client.connect():
# Запускаем все тесты
client.test_unary_rpc()
time.sleep(1)
client.test_server_streaming()
time.sleep(1)
client.test_client_streaming()
time.sleep(1)
client.test_bidirectional_streaming()
time.sleep(2)
client.test_concurrent_requests()
# Тестируем продвинутые возможности
advanced_client = AdvancedGRPCClient(client.stub)
advanced_client.call_with_metadata()
advanced_client.call_with_compression()
client.disconnect()
# Останавливаем сервер
server_process.terminate()
server_process.wait()
print("\nAll tests completed")
5. Дополнительно: мониторинг и health checks для gRPC:
# grpc_health.py
from generated import user_service_pb2_grpc as pb2_grpc
from generated import user_service_pb2 as pb2
import grpc
from concurrent import futures
import time
class HealthService(pb2_grpc.HealthServicer):
"""gRPC Health Checking Service"""
def Check(self, request, context):
# Здесь можно добавить логику проверки здоровья
return pb2.HealthCheckResponse(status=pb2.HealthCheckResponse.SERVING)
def Watch(self, request, context):
"""Streaming health check"""
try:
while True:
yield pb2.HealthCheckResponse(status=pb2.HealthCheckResponse.SERVING)
time.sleep(5) # Отправляем статус каждые 5 секунд
except:
pass
# Использование в сервере
def create_server_with_health_check():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# Основной сервис
pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)
# Health check сервис
from grpc_health.v1 import health_pb2_grpc, health_pb2
health_servicer = HealthService()
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
return server
Сравнительная таблица {#сравнение}
| Параметр | TCP | UDP | HTTP/1.1 | gRPC (HTTP/2) |
|---|---|---|---|---|
| Тип протокола | Транспортный | Транспортный | Прикладной | Прикладной |
| Надежность | Гарантированная | Не гарантирована | Гарантированная | Гарантированная |
| Установка соединения | Требуется (handshake) | Не требуется | Требуется (TCP) | Требуется (HTTP/2) |
| Порядок доставки | Гарантирован | Не гарантирован | Гарантирован | Гарантирован |
| Контроль перегрузки | Есть | Нет | Есть (через TCP) | Есть |
| Управление потоком | Есть | Нет | Ограниченное | Продвинутое |
| Заголовки | 20-60 байт | 8 байт | Текстовые, большие | Бинарные, HPACK сжатие |
| Мультиплексирование | Нет | Нет | Ограниченное (pipelining) | Есть |
| Push-уведомления | Нет | Нет | Нет | Есть (server push) |
| Тип данных | Бинарный поток | Дейтаграммы | Текстовый (обычно) | Бинарный (protobuf) |
| Производительность | Высокая | Очень высокая | Средняя | Очень высокая |
| Сложность | Средняя | Низкая | Низкая | Высокая |
| Использование | Файлы, веб, SSH | Видео, VoIP, DNS | Веб-API, REST | Микросервисы, стриминг |
Критерии выбора протокола:
-
TCP:
- Когда нужна гарантированная доставка
- Файловые трансферы, SSH, базы данных
- Веб-сокеты, онлайн-игры (где важна надежность)
-
UDP:
- Приложения реального времени (VoIP, видеостриминг)
- DNS запросы
- Мультимедиа, онлайн-игры (где важна скорость)
- Broadcast/multicast трафик
-
HTTP:
- Веб-приложения и API
- Когда нужна простота и стандартизация
- Совместимость с существующей инфраструктурой
- Когда клиенты — это браузеры или мобильные приложения
-
gRPC:
- Микросервисная архитектура
- Высокопроизводительные внутренние API
- Стриминг данных в реальном времени
- Полиглотные среды (разные языки программирования)
- Когда нужны strict API contracts
Заключение
Каждый протокол имеет свои сильные стороны и оптимальные сферы применения:
- TCP/UDP — фундаментальные транспортные протоколы
- HTTP — универсальный протокол для веб
- gRPC — современное решение для микросервисов
Выбор зависит от требований к надежности, производительности, сложности реализации и существующей инфраструктуры. В современных системах часто используется комбинация этих протоколов для разных задач внутри одной архитектуры.