← Назад к курсу

Пособие по оптимизации работы с LLM / LangGraph / OpenAI на Python (Кэширование, батчинг и параллельные запросы)

1️⃣ Почему оптимизировать?

Проблема Что получаем без оптимизации Как оптимизация меняет ситуацию
Ожидание ответа Средняя задержка ≈ 3‑5 сек/запрос Кэш, батчинг и параллелизм могут уменьшить её до ≤ 0,5 сек
API‑лимиты 3‑4 тыс. запросов / час (ChatGPT) Повторные запросы можно свести к 10‑20 % от общего числа
Кошты Платите за каждый токен Кэш → меньше токенов, батчинг → меньше общих запросов
Шкала Работает только с одним запросом Параллельные запросы → многократный рост throughput

2️⃣ Подготовка окружения

# В Python ≥ 3.9
python -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install \
    openai==1.50.0 \
    langchain==0.2.4 \
    langgraph==0.2.1 \
    cachetools \
    redis \
    tqdm \
    aiohttp
  • OpenAI – официальный клиент.
  • LangChain / LangGraph – удобные “batteries‑included”‑слои для кэша, сценариев и графов.
  • cachetools – простая LRU‑кеш в памяти.
  • Redis – быстрый распределённый кэш (опционально).
  • aiohttp – асинхронный клиент, если хотите работать без блокирующих threads.

3️⃣ Кэширование

3.1️⃣ Внутренний LRU‑кеш (без внешнего сервиса)

from cachetools import TTLCache, cached
from typing import Any

# 5 минут жизни каждой записи
cache = TTLCache(maxsize=10_000, ttl=300)

@cached(cache)
def _cached_chat_completion(
    prompt: str,
    model: str = "gpt-4o-mini",
    temperature: float = 0.0,
    max_tokens: int = 1024,
) -> str:
    """Функция‑обёртка, сохраняющая результат в LRU‑кеш."""
    import openai
    resp = openai.ChatCompletion.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=temperature,
        max_tokens=max_tokens,
    )
    return resp.choices[0].message["content"]["text"]
  • Ключ может включать prompt + model + temperature + max_tokens.
  • При одинаковом ключе кэш возвращает уже‑полученный ответ без обращения к API.

3.2️⃣ Redis‑кеш (для распределённых сервисов)

import redis
import json
import hashlib
from functools import lru_cache

redis_client = redis.Redis(host="localhost", port=6379, db=0)

def _hash_prompt(prompt: str) -> str:
    return hashlib.sha256(prompt.encode()).hexdigest()

def _redis_get(prompt: str, model, temperature, max_tokens) -> str | None:
    key = f"{_hash_prompt(prompt)}:{model}:{temperature}:{max_tokens}"
    cached_json = redis_client.get(key)
    return json.loads(cached_json)["text"] if cached_json else None

def _redis_set(prompt: str, model, temperature, max_tokens, text: str):
    key = f"{_hash_prompt(prompt)}:{model}:{temperature}:{max_tokens}"
    redis_client.setex(key, 300, json.dumps({"text": text}))
  • Добавьте это в ваш @cached‑wrapper, либо используйте готовый langchain.cache.Cache‑объект:
from langchain.cache import Cache
cache = Cache.from_cacheir("./my_cache_dir", ttl_seconds=300)

# При создании LLM‑чаина
llm = OpenAI(model_name="gpt-4o-mini", cache=cache)

3.3️⃣ Кэш от OpenAI (встроенный)

OpenAI уже добавил «API cache», но для более гибкой политики лучше пользоваться собственными кэшами (Redis/LRUCache).


4️⃣ Батчинг (batch‑request)

4.1️⃣ Основные ограничения

  • GPT‑4‑Turbo и gpt‑4o‑mini поддерживают до 32 запросов в одном ChatCompletion (для gpt-4o-mini — до 40).
  • Суммарный объём входных токенов ≤ 128 к (= ≈ 32 т. × 4 т/запрос).

4.2️⃣ Батчинг вручную

import openai
from tqdm import tqdm
import asyncio

def batch_chat_completion(
    prompts: list[str],
    model: str = "gpt-4o-mini",
    temperature: float = 0.0,
    max_tokens: int = 1024,
) -> list[str]:
    # Вычисляем, сколько запросов помещается в один batch
    batch_size = 32  # можно изменить, в зависимости от токенов
    batches = [
        prompts[i : i + batch_size]
        for i in range(0, len(prompts), batch_size)
    ]

    async def _async_batch(prompts_batch: list[str]) -> list[str]:
        client = openai.AsyncClient(api_key=openai.api_key)
        resp = await client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": p} for p in prompts_batch],
            temperature=temperature,
            max_tokens=max_tokens,
        )
        # Ответы собираем в одном порядке
        return [r.choices[0].message.content.text for r in resp.choices]

    results = []
    for batch in tqdm(batches, desc="Batching"):
        results.extend(await _async_batch(batch))

    return results
  • await позволяет отправить несколько запросов без блокировки.
  • Токенизация: используйте openai.ChatCompletion.create(..., stream=False) или stream=True + async → можно читать ответ по мере его появления.

4.3️⃣ Батчинг через LangChain

LangChain умеет объединять ChatModels в LLMChain с batch=True:

from langchain.chat_models import ChatOpenAI
from langchain.schema import ChatPromptValue

# Пример: один запросов‑с‑другими‑за‑batch
def batch_llmchain(
    prompt_template: str,
    batch_prompts: list[str],
    model: str = "gpt-4o-mini",
):
    llm = ChatOpenAI(model=model, temperature=0.0, cache=cache)
    chain = LLMChain(
        llm=llm,
        prompt=ChatPromptValue.from_template(prompt_template),
        verbose=False,
        output_key="text",
    )
    # LangChain умеет принимать список `ChatPromptValue`s → автоматический batch
    responses = chain.run_batch(batch_prompts)
    return [r["text"] for r in responses]
  • Плюс: если у вас уже есть prompt_template с переменными, batch‑построение работает «out‑of‑the‑box».

5️⃣ Параллельные запросы

5.1️⃣ ThreadPoolExecutor (CPU‑/IO‑bound, но блокирующий)

from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_chat(prompts: list[str], workers: int = 8):
    results = []
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = {executor.submit(_cached_chat_completion, p): p for p in prompts}
        for fut in as_completed(futures):
            results.append(fut.result())
    return results
  • Подходит, если вы используете blocking‑клиент (openai‑client без async).

5.2️⃣ Asyncio + aiohttp (полный async)

import aiohttp
import asyncio

async def async_chat_completion(prompt: str):
    client = aiohttp.ClientSession()
    payload = {
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0,
        "max_tokens": 1024,
    }
    async with client.post("https://api.openai.com/v1/chat/completions",
                           json=payload,
                           headers={"Authorization": f"Bearer {openai.api_key}"}) as resp:
        data = await resp.json()
    return data["choices"][0]["message"]["content"]["text"]

async def batch_async(prompts: list[str], batch_size: int = 32):
    batches = [prompts[i:i+batch_size] for i in range(0, len(prompts), batch_size)]
    async with aiohttp.ClientSession() as session:
        tasks = []
        for batch in batches:
            if not batch: continue
            payload = {
                "model": "gpt-4o-mini",
                "messages": [{"role": "user", "content": p} for p in batch],
                "temperature": 0,
                "max_tokens": 1024,
            }
            task = asyncio.create_task(
                _async_batch(session, payload)
            )
            tasks.append(task)
        # gather results in order
        results = await asyncio.gather(*tasks, return_exceptions=False)
    return [r["text"] for r in results]

def _async_batch(session: aiohttp.ClientSession, payload: dict) -> dict:
    async with session.post(
        "https://api.openai.com/v1/chat/completions",
        json=payload,
        headers={"Authorization": f"Bearer {openai.api_key}"},
    ) as resp:
        data = await resp.json()
    return data
  • Эффективность: асинхронный запрос не блокирует GIL и позволяет «запускать» десятки запросов одновременно.

5.3️⃣ Параллелизм в LangGraph

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import cache_node

# Определяем простейший граф
async def get_answer(state):
    if "cached" in state:
        return {"response": state["cached"]}
    # TODO: добавить тут реальный LLM‑вызов
    return {"response": "Ответ от LLM"}

# Кеш‑нод
cache = cache_node()  # LangGraph‑поддержка кэша

graph = StateGraph({
    "prompt": None,
    "response": None,
})
graph.add_node("llm", get_answer)               # LLM‑нод
graph.add_node("cache", cache)                  # Кеш‑нод
graph.add_edge("start", "llm")
graph.add_edge("llm", END)                      # если кэш пропустил

# Параллельная часть: можно разветвить несколько входов
def parallel_branch(prompt):
    # Создаём отдельные ветки, которые используют один кэш
    return {"branch": f"branch_{hash(prompt)"}, "prompt": prompt}
  • LangGraph умеет одновременно запускать несколько путей графа, каждый с собственным кэшем.
  • Для более сложных сценариев используйте graph.add_parallel_node("parallel", parallel_branch).

6️⃣ Полный «паттерн» – пример «caching + batching + parallelism»

# 📂 Пример: оптимизированный сервис, принимающий список запросов
from typing import List
import time, asyncio, threading

# ---------- 1. Кеш ----------
cache = TTLCache(maxsize=5_000, ttl=300)

# ---------- 2. Обертка над API ----------
def _run_one(prompt: str):
    key = f"{hash(prompt)}:gpt-4o-mini:0.0:1024"
    cached_resp = cache.get(key)
    if cached_resp:
        return cached_resp

    # Если кэш пуст → запрос
    resp = openai.ChatCompletion.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.0,
        max_tokens=1024,
    )
    text = resp.choices[0].message["content"]["text"]
    cache[key] = text
    return text

# ---------- 3. Батчинг ----------
def batch(prompts: List[str], batch_size: int = 32):
    async def _batch(prompts_batch):
        async with openai.AsyncClient(api_key=openai.api_key) as client:
            payload = [
                {"role": "user", "content": p}
                for p in prompts_batch
            ]
            resp = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=payload,
                temperature=0,
                max_tokens=1024,
                n=prompts_batch[-1]["max_tokens"],   # аппроксимировано, но можно указать n≤32
            )
            return [r["choices"][0]["message"]["content"]["text"]
                    for r in resp.choices]
    # Делим список на batches и запускаем их в отдельных asyncio‑loops
    batches = [prompts[i:i+batch_size] for i in range(0, len(prompts), batch_size)]
    # Каждый batch – отдельный thread, чтобы не блокировать GIL
    threads = [
        threading.Thread(target=lambda: list(_batch(b)), daemon=True)
        for b in batches
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return sum(threads, [])

# ---------- 4. Параллельный вызов ----------
def parallel_optimize(
    user_inputs: List[str],
    batch_size: int = 32,
) -> List[str]:
    return batch(user_inputs, batch_size)

# ---------- 5. Пример вызова ----------
if __name__ == "__main__":
    prompts = [
        f"Prompt #{i}" for i in range(200)
    ]
    start = time.time()
    answers = parallel_optimize(prompts, batch_size=32)
    print(f"Done in {time.time() - start:.2f}s")
    # Пример кэшированного ответа:
    assert answers[0] == _run_one(prompts[0])
  • Ключевые моменты:
    1. LRU‑кеш в памяти уменьшает количество реальных запросов до ≈ 10‑20 % при редких‑неповторяющихся запросах.
    2. Батчинг собирает до 32 запросов в один ChatCompletion → лишь ≈ 1‑3 запроса к API вместо 32.
    3. Тред‑параллелизм + async‑client позволяет выполнить несколько батчей одновременно, полностью используя один процесс без блокировок.

7️⃣ Как измерить эффективность?

import time, statistics

def benchmark(prompts, batch_size=32, workers=8):
    # Без оптимизации (по одному запросу)
    t0 = time.time()
    [openai.ChatCompletion.create(...) for p in prompts]
    t1 = time.time()
    latency_no_opt = t1 - t0

    # Оптимизированная версия
    t0 = time.time()
    parallel_optimize(prompts, batch_size, workers)
    t1 = time.time()
    latency_opt = t1 - t0

    print(f"Норм.: {latency_no_opt:.2f}s  |  Оптим.: {latency_opt:.2f}s")
    print(f"Ускорение: {latency_no_opt / latency_opt:.2f}x")
  • Для более тонкого анализа используйте cProfile или pyinstrument.
  • По сравнению с baseline (одно‑запрос‑за‑один пользователь) вы обычно получаете 5‑10× ускорение.

8️⃣ Быстрый чек‑лист

Что проверить Как
1 Кеш‑ключ охватывает модель, temperature, max_tokens Добавьте в hash или в TTLCache ключ эти параметры
2 Токенизация запросов – подсчитайте prompt_tokens + response_tokens openai.Engine.list() → max_context_length
3 Batch‑size ≤ 32 (или ≤ 40 для gpt-4o-mini) Перед отправкой считайте len(prompts) × avg_tokens_per_prompt
4 Parallelism – используете ThreadPoolExecutor или asyncio Для CPU‑bound подготовки → ProcessPoolExecutor
5 Обработка ошибок – rate‑limit, 500‑error, token‑exhaustion В кэш‑функции try/except и добавьте retry‑механизм
6 Тайм‑аут – задайте timeout=30 в ChatCompletion Защита от “зависания” в случае сетевых проблем
7 Контроль логирования – logging Установите log_level=INFO для трассировки

9️⃣ Что дальше?

  1. Перенос кэша в Redis – если сервис запускается на нескольких инстансах, персональный LRU‑кеш недостаточен.
  2. Оценка «cold‑start» – помните, что первый запрос всегда будет к API, поэтому планируйте warm‑up‑скрипт при старте.
  3. Мониторинг – используйте Prometheus + Grafana для слежения за requests_per_second, cache_hit_rate, latency.
  4. CI/CD – в автоматизированном тесте проверяйте кеш‑rate и latency — это гарантирует, что после апдейта оптимизация не упала.

10️⃣ Полный пример проекта (скелет)

my_llm_opt/
├─ src/
│   ├─ __init__.py
│   ├─ cache.py            # LRU/Redis‑кеш
│   ├─ batching.py         # batch‑функции
│   ├─ parallelism.py      # thread/async‑обертки
│   └─ graph/
│       ├─ __init__.py
│       ├─ pipeline.py     # LangGraph‑граф
│       └─ flow.py         # Построение графа
├─ requirements.txt
├─ Dockerfile               # ← build‑env + redis
└─ README.md                # ← краткое описание оптимизации

В Dockerfile можно добавить:

FROM python:3.12-slim
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
CMD ["python", "-m", "my_llm_opt.src.main"]

🎉 Итоги

Техника Когда использовать Пример в коде
LRU‑кеш Редкие/повторяющиеся запросы cachetools.TTLCache
Redis‑кеш Распределённые сервисы redis-py + langchain.cache.Cache
Batch‑request Массовый набор коротких запросов openai.AsyncClient + n= параметр
Parallelism (ThreadPool/Async) Высокий throughput, ограниченный API‑rate ThreadPoolExecutor / asyncio
LangGraph + CacheNode Сложные схемы выполнения, поток‑обработка graph.add_node('cache', cache_node())

Эти принципы позволяют сократить количество обращений к OpenAI до ≈ 10 %, уменьшить latency в 5‑10 раз, а также упростить логику (кэш/батчинг/параллелизм) в одном‑единственном месте. Применяйте их в ваших проектах и следите за тем, чтобы ключ кэширования действительно отражал уникальность запроса (модель + параметры + контекст).