← Назад к курсу
Пособие по оптимизации работы с LLM / LangGraph / OpenAI на Python (Кэширование, батчинг и параллельные запросы)
1️⃣ Почему оптимизировать?
| Проблема | Что получаем без оптимизации | Как оптимизация меняет ситуацию |
|---|---|---|
| Ожидание ответа | Средняя задержка ≈ 3‑5 сек/запрос | Кэш, батчинг и параллелизм могут уменьшить её до ≤ 0,5 сек |
| API‑лимиты | 3‑4 тыс. запросов / час (ChatGPT) | Повторные запросы можно свести к 10‑20 % от общего числа |
| Кошты | Платите за каждый токен | Кэш → меньше токенов, батчинг → меньше общих запросов |
| Шкала | Работает только с одним запросом | Параллельные запросы → многократный рост throughput |
2️⃣ Подготовка окружения
# В Python ≥ 3.9
python -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install \
openai==1.50.0 \
langchain==0.2.4 \
langgraph==0.2.1 \
cachetools \
redis \
tqdm \
aiohttp
- OpenAI – официальный клиент.
- LangChain / LangGraph – удобные “batteries‑included”‑слои для кэша, сценариев и графов.
- cachetools – простая LRU‑кеш в памяти.
- Redis – быстрый распределённый кэш (опционально).
- aiohttp – асинхронный клиент, если хотите работать без блокирующих threads.
3️⃣ Кэширование
3.1️⃣ Внутренний LRU‑кеш (без внешнего сервиса)
from cachetools import TTLCache, cached
from typing import Any
# 5 минут жизни каждой записи
cache = TTLCache(maxsize=10_000, ttl=300)
@cached(cache)
def _cached_chat_completion(
prompt: str,
model: str = "gpt-4o-mini",
temperature: float = 0.0,
max_tokens: int = 1024,
) -> str:
"""Функция‑обёртка, сохраняющая результат в LRU‑кеш."""
import openai
resp = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=max_tokens,
)
return resp.choices[0].message["content"]["text"]
- Ключ может включать prompt + model + temperature + max_tokens.
- При одинаковом ключе кэш возвращает уже‑полученный ответ без обращения к API.
3.2️⃣ Redis‑кеш (для распределённых сервисов)
import redis
import json
import hashlib
from functools import lru_cache
redis_client = redis.Redis(host="localhost", port=6379, db=0)
def _hash_prompt(prompt: str) -> str:
return hashlib.sha256(prompt.encode()).hexdigest()
def _redis_get(prompt: str, model, temperature, max_tokens) -> str | None:
key = f"{_hash_prompt(prompt)}:{model}:{temperature}:{max_tokens}"
cached_json = redis_client.get(key)
return json.loads(cached_json)["text"] if cached_json else None
def _redis_set(prompt: str, model, temperature, max_tokens, text: str):
key = f"{_hash_prompt(prompt)}:{model}:{temperature}:{max_tokens}"
redis_client.setex(key, 300, json.dumps({"text": text}))
- Добавьте это в ваш @cached‑wrapper, либо используйте готовый langchain.cache.Cache‑объект:
from langchain.cache import Cache
cache = Cache.from_cacheir("./my_cache_dir", ttl_seconds=300)
# При создании LLM‑чаина
llm = OpenAI(model_name="gpt-4o-mini", cache=cache)
3.3️⃣ Кэш от OpenAI (встроенный)
OpenAI уже добавил «API cache», но для более гибкой политики лучше пользоваться собственными кэшами (Redis/LRUCache).
4️⃣ Батчинг (batch‑request)
4.1️⃣ Основные ограничения
- GPT‑4‑Turbo и gpt‑4o‑mini поддерживают до 32 запросов в одном ChatCompletion (для gpt-4o-mini — до 40).
- Суммарный объём входных токенов ≤ 128 к (= ≈ 32 т. × 4 т/запрос).
4.2️⃣ Батчинг вручную
import openai
from tqdm import tqdm
import asyncio
def batch_chat_completion(
prompts: list[str],
model: str = "gpt-4o-mini",
temperature: float = 0.0,
max_tokens: int = 1024,
) -> list[str]:
# Вычисляем, сколько запросов помещается в один batch
batch_size = 32 # можно изменить, в зависимости от токенов
batches = [
prompts[i : i + batch_size]
for i in range(0, len(prompts), batch_size)
]
async def _async_batch(prompts_batch: list[str]) -> list[str]:
client = openai.AsyncClient(api_key=openai.api_key)
resp = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": p} for p in prompts_batch],
temperature=temperature,
max_tokens=max_tokens,
)
# Ответы собираем в одном порядке
return [r.choices[0].message.content.text for r in resp.choices]
results = []
for batch in tqdm(batches, desc="Batching"):
results.extend(await _async_batch(batch))
return results
- await позволяет отправить несколько запросов без блокировки.
- Токенизация: используйте openai.ChatCompletion.create(..., stream=False) или stream=True + async → можно читать ответ по мере его появления.
4.3️⃣ Батчинг через LangChain
LangChain умеет объединять ChatModels в LLMChain с batch=True:
from langchain.chat_models import ChatOpenAI
from langchain.schema import ChatPromptValue
# Пример: один запросов‑с‑другими‑за‑batch
def batch_llmchain(
prompt_template: str,
batch_prompts: list[str],
model: str = "gpt-4o-mini",
):
llm = ChatOpenAI(model=model, temperature=0.0, cache=cache)
chain = LLMChain(
llm=llm,
prompt=ChatPromptValue.from_template(prompt_template),
verbose=False,
output_key="text",
)
# LangChain умеет принимать список `ChatPromptValue`s → автоматический batch
responses = chain.run_batch(batch_prompts)
return [r["text"] for r in responses]
- Плюс: если у вас уже есть prompt_template с переменными, batch‑построение работает «out‑of‑the‑box».
5️⃣ Параллельные запросы
5.1️⃣ ThreadPoolExecutor (CPU‑/IO‑bound, но блокирующий)
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_chat(prompts: list[str], workers: int = 8):
results = []
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(_cached_chat_completion, p): p for p in prompts}
for fut in as_completed(futures):
results.append(fut.result())
return results
- Подходит, если вы используете blocking‑клиент (openai‑client без async).
5.2️⃣ Asyncio + aiohttp (полный async)
import aiohttp
import asyncio
async def async_chat_completion(prompt: str):
client = aiohttp.ClientSession()
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0,
"max_tokens": 1024,
}
async with client.post("https://api.openai.com/v1/chat/completions",
json=payload,
headers={"Authorization": f"Bearer {openai.api_key}"}) as resp:
data = await resp.json()
return data["choices"][0]["message"]["content"]["text"]
async def batch_async(prompts: list[str], batch_size: int = 32):
batches = [prompts[i:i+batch_size] for i in range(0, len(prompts), batch_size)]
async with aiohttp.ClientSession() as session:
tasks = []
for batch in batches:
if not batch: continue
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": p} for p in batch],
"temperature": 0,
"max_tokens": 1024,
}
task = asyncio.create_task(
_async_batch(session, payload)
)
tasks.append(task)
# gather results in order
results = await asyncio.gather(*tasks, return_exceptions=False)
return [r["text"] for r in results]
def _async_batch(session: aiohttp.ClientSession, payload: dict) -> dict:
async with session.post(
"https://api.openai.com/v1/chat/completions",
json=payload,
headers={"Authorization": f"Bearer {openai.api_key}"},
) as resp:
data = await resp.json()
return data
- Эффективность: асинхронный запрос не блокирует GIL и позволяет «запускать» десятки запросов одновременно.
5.3️⃣ Параллелизм в LangGraph
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import cache_node
# Определяем простейший граф
async def get_answer(state):
if "cached" in state:
return {"response": state["cached"]}
# TODO: добавить тут реальный LLM‑вызов
return {"response": "Ответ от LLM"}
# Кеш‑нод
cache = cache_node() # LangGraph‑поддержка кэша
graph = StateGraph({
"prompt": None,
"response": None,
})
graph.add_node("llm", get_answer) # LLM‑нод
graph.add_node("cache", cache) # Кеш‑нод
graph.add_edge("start", "llm")
graph.add_edge("llm", END) # если кэш пропустил
# Параллельная часть: можно разветвить несколько входов
def parallel_branch(prompt):
# Создаём отдельные ветки, которые используют один кэш
return {"branch": f"branch_{hash(prompt)"}, "prompt": prompt}
- LangGraph умеет одновременно запускать несколько путей графа, каждый с собственным кэшем.
- Для более сложных сценариев используйте graph.add_parallel_node("parallel", parallel_branch).
6️⃣ Полный «паттерн» – пример «caching + batching + parallelism»
# 📂 Пример: оптимизированный сервис, принимающий список запросов
from typing import List
import time, asyncio, threading
# ---------- 1. Кеш ----------
cache = TTLCache(maxsize=5_000, ttl=300)
# ---------- 2. Обертка над API ----------
def _run_one(prompt: str):
key = f"{hash(prompt)}:gpt-4o-mini:0.0:1024"
cached_resp = cache.get(key)
if cached_resp:
return cached_resp
# Если кэш пуст → запрос
resp = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
max_tokens=1024,
)
text = resp.choices[0].message["content"]["text"]
cache[key] = text
return text
# ---------- 3. Батчинг ----------
def batch(prompts: List[str], batch_size: int = 32):
async def _batch(prompts_batch):
async with openai.AsyncClient(api_key=openai.api_key) as client:
payload = [
{"role": "user", "content": p}
for p in prompts_batch
]
resp = await client.chat.completions.create(
model="gpt-4o-mini",
messages=payload,
temperature=0,
max_tokens=1024,
n=prompts_batch[-1]["max_tokens"], # аппроксимировано, но можно указать n≤32
)
return [r["choices"][0]["message"]["content"]["text"]
for r in resp.choices]
# Делим список на batches и запускаем их в отдельных asyncio‑loops
batches = [prompts[i:i+batch_size] for i in range(0, len(prompts), batch_size)]
# Каждый batch – отдельный thread, чтобы не блокировать GIL
threads = [
threading.Thread(target=lambda: list(_batch(b)), daemon=True)
for b in batches
]
for t in threads:
t.start()
for t in threads:
t.join()
return sum(threads, [])
# ---------- 4. Параллельный вызов ----------
def parallel_optimize(
user_inputs: List[str],
batch_size: int = 32,
) -> List[str]:
return batch(user_inputs, batch_size)
# ---------- 5. Пример вызова ----------
if __name__ == "__main__":
prompts = [
f"Prompt #{i}" for i in range(200)
]
start = time.time()
answers = parallel_optimize(prompts, batch_size=32)
print(f"Done in {time.time() - start:.2f}s")
# Пример кэшированного ответа:
assert answers[0] == _run_one(prompts[0])
- Ключевые моменты:
- LRU‑кеш в памяти уменьшает количество реальных запросов до ≈ 10‑20 % при редких‑неповторяющихся запросах.
- Батчинг собирает до 32 запросов в один ChatCompletion → лишь ≈ 1‑3 запроса к API вместо 32.
- Тред‑параллелизм + async‑client позволяет выполнить несколько батчей одновременно, полностью используя один процесс без блокировок.
7️⃣ Как измерить эффективность?
import time, statistics
def benchmark(prompts, batch_size=32, workers=8):
# Без оптимизации (по одному запросу)
t0 = time.time()
[openai.ChatCompletion.create(...) for p in prompts]
t1 = time.time()
latency_no_opt = t1 - t0
# Оптимизированная версия
t0 = time.time()
parallel_optimize(prompts, batch_size, workers)
t1 = time.time()
latency_opt = t1 - t0
print(f"Норм.: {latency_no_opt:.2f}s | Оптим.: {latency_opt:.2f}s")
print(f"Ускорение: {latency_no_opt / latency_opt:.2f}x")
- Для более тонкого анализа используйте cProfile или pyinstrument.
- По сравнению с baseline (одно‑запрос‑за‑один пользователь) вы обычно получаете 5‑10× ускорение.
8️⃣ Быстрый чек‑лист
| ✅ | Что проверить | Как |
|---|---|---|
| 1 | Кеш‑ключ охватывает модель, temperature, max_tokens | Добавьте в hash или в TTLCache ключ эти параметры |
| 2 | Токенизация запросов – подсчитайте prompt_tokens + response_tokens | openai.Engine.list() → max_context_length |
| 3 | Batch‑size ≤ 32 (или ≤ 40 для gpt-4o-mini) | Перед отправкой считайте len(prompts) × avg_tokens_per_prompt |
| 4 | Parallelism – используете ThreadPoolExecutor или asyncio | Для CPU‑bound подготовки → ProcessPoolExecutor |
| 5 | Обработка ошибок – rate‑limit, 500‑error, token‑exhaustion | В кэш‑функции try/except и добавьте retry‑механизм |
| 6 | Тайм‑аут – задайте timeout=30 в ChatCompletion | Защита от “зависания” в случае сетевых проблем |
| 7 | Контроль логирования – logging | Установите log_level=INFO для трассировки |
9️⃣ Что дальше?
- Перенос кэша в Redis – если сервис запускается на нескольких инстансах, персональный LRU‑кеш недостаточен.
- Оценка «cold‑start» – помните, что первый запрос всегда будет к API, поэтому планируйте warm‑up‑скрипт при старте.
- Мониторинг – используйте Prometheus + Grafana для слежения за requests_per_second, cache_hit_rate, latency.
- CI/CD – в автоматизированном тесте проверяйте кеш‑rate и latency — это гарантирует, что после апдейта оптимизация не упала.
10️⃣ Полный пример проекта (скелет)
my_llm_opt/ ├─ src/ │ ├─ __init__.py │ ├─ cache.py # LRU/Redis‑кеш │ ├─ batching.py # batch‑функции │ ├─ parallelism.py # thread/async‑обертки │ └─ graph/ │ ├─ __init__.py │ ├─ pipeline.py # LangGraph‑граф │ └─ flow.py # Построение графа ├─ requirements.txt ├─ Dockerfile # ← build‑env + redis └─ README.md # ← краткое описание оптимизации
В Dockerfile можно добавить:
FROM python:3.12-slim RUN pip install --no-cache-dir -r requirements.txt COPY src/ ./src/ CMD ["python", "-m", "my_llm_opt.src.main"]
🎉 Итоги
| Техника | Когда использовать | Пример в коде |
|---|---|---|
| LRU‑кеш | Редкие/повторяющиеся запросы | cachetools.TTLCache |
| Redis‑кеш | Распределённые сервисы | redis-py + langchain.cache.Cache |
| Batch‑request | Массовый набор коротких запросов | openai.AsyncClient + n= параметр |
| Parallelism (ThreadPool/Async) | Высокий throughput, ограниченный API‑rate | ThreadPoolExecutor / asyncio |
| LangGraph + CacheNode | Сложные схемы выполнения, поток‑обработка | graph.add_node('cache', cache_node()) |
Эти принципы позволяют сократить количество обращений к OpenAI до ≈ 10 %, уменьшить latency в 5‑10 раз, а также упростить логику (кэш/батчинг/параллелизм) в одном‑единственном месте. Применяйте их в ваших проектах и следите за тем, чтобы ключ кэширования действительно отражал уникальность запроса (модель + параметры + контекст).