В прошлую пятницу, ровно в 18:47, когда я уже мысленно открывал великолепный, наполненный витаминами, напиток, мне прилетело сообщение от тимлида: «Бот лежит, пользователи жалуются, Gemini API возвращает 429». Наш корпоративный Telegram-бот, который должен был помогать саппорту отвечать на тикеты, просто встал колом. Причина оказалась до банальности простой: мы не учли rate limiting и думали, что 50 RPM (запросов в минуту) на бесплатном тарифе — это «бесконечно много». С тех пор мы переписали архитектуру, добавили очереди, кэширование и middleware для retry. В этой статье разберу, как с нуля подружить Gemini API с Telegram-ботом на aiogram 3.x, не наступая на те же грабли.

Архитектура: что и зачем

Классическая схема выглядит так:
[Telegram User] → [aiogram Bot] → [Gemini API]

[Response Queue / Cache]

Но в реальном продакшене появляется дополнительная обвязка:


[Telegram] → [aiogram] → [asyncio Queue] → [Rate Limiter] → [Gemini API]
↑ ↓
[Response Cache] ←───────────────── [Streaming Handler]


Почему это важно? Gemini API имеет жёсткие лимиты:
1.Бесплатный тариф: 5–15 RPM в зависимости от модели
2.Ответы могут идти до 10–15 секунд на длинных промптах.
3.Платный: до 60 RPM для Gemini 2.0 Flash
Если просто вызывать await client.models.generate_content() внутри хендлера — вы положите ивент-луп aiogram и получите таймауты от Telegram. Асинхронность aiogram здесь не спасает — блокирующий вызов остаётся блокирующим.

Шаг 1. Установка и настройка Gemini API

Ставим библиотеку Google GenAI SDK (она же google-genai):

pip install google-genai aiogram python-dotenv
Скрытый текст

Важно: библиотека google-generativeai устарела и с мая 2025 года не поддерживается. Используйте именно google-genai

Создаём .env файл:

GEMINI_API_KEY=your-api-key-here
TELEGRAM_BOT_TOKEN=your-bot-token

Получить API-ключ можно в Google AI Studio → API Keys → Create API Key.

Базовый клиент:

# gemini_client.py
import os
from google import genai
from google.genai import types

class GeminiClient:
    def __init__(self, model: str = "gemini-3-flash-preview"):
        self.client = genai.Client()  # ключ берётся из GEMINI_API_KEY
        self.model = model
    
    async def generate(self, prompt: str) -> str:
        # Обратите внимание: это синхронный вызов!
        # await здесь не поможет, нужен asyncio.to_thread
        response = self.client.models.generate_content(
            model=self.model,
            contents=prompt
        )
        return response.text
Скрытый текст

Важное замечание: клиент google-genai синхронный! В асинхронном aiogram его вызовы будут блокировать ивент-луп.

Шаг 2. Асинхронная обёртка через asyncio.to_thread

Чтобы не вешать весь бот на каждый запрос к Gemini, используем asyncio.to_thread:

# async_gemini.py
import asyncio
from google import genai

class AsyncGeminiClient:
    def __init__(self, model: str = "gemini-3-flash-preview"):
        self.client = genai.Client()
        self.model = model
    
    async def generate(self, prompt: str) -> str:
        loop = asyncio.get_event_loop()
        # Выполняем синхронный вызов в отдельном потоке
        response = await loop.run_in_executor(
            None,  # используем дефолтный ThreadPoolExecutor
            self._sync_generate,
            prompt
        )
        return response
    
    def _sync_generate(self, prompt: str) -> str:
        response = self.client.models.generate_content(
            model=self.model,
            contents=prompt
        )
        return response.text
Скрытый текст

Это минимально жизнеспособный вариант. Для продакшена понадобится ещё очередь запросов.

Шаг 3. Интеграция с aiogram

Базовый хендлер для aiogram 3.x:

# bot.py
import asyncio
import logging
from aiogram import Bot, Dispatcher, Router, types
from aiogram.filters import Command
from aiogram.enums import ParseMode
from dotenv import load_dotenv

from async_gemini import AsyncGeminiClient

load_dotenv()
logging.basicConfig(level=logging.INFO)

router = Router()
gemini = AsyncGeminiClient()

@router.message(Command("start"))
async def cmd_start(message: types.Message):
    await message.answer(
        "Привет! Я бот с Gemini API. Просто напиши мне вопрос, и я отвечу."
    )

@router.message()
async def handle_message(message: types.Message):
    # Показываем, что бот "печатает"
    await message.bot.send_chat_action(
        chat_id=message.chat.id,
        action="typing"
    )
    
    try:
        response = await gemini.generate(message.text)
        # Telegram имеет лимит 4096 символов на сообщение
        if len(response) > 4000:
            # Разбиваем на части
            for i in range(0, len(response), 4000):
                await message.answer(response[i:i+4000])
        else:
            await message.answer(response)
    except Exception as e:
        logging.error(f"Gemini error: {e}")
        await message.answer(
            "Что-то пошло не так. Попробуйте позже или сформулируйте запрос иначе."
        )

async def main():
    bot = Bot(token=os.getenv("TELEGRAM_BOT_TOKEN"))
    dp = Dispatcher()
    dp.include_router(router)
    
    await dp.start_polling(bot)

if __name__ == "__main__":
    asyncio.run(main())

Шаг 4. Функциональный вызов (Function Calling) для расширения возможностей

Gemini умеет не только генерировать текст, но и вызывать внешние функции. Это полезно, если бот должен работать с реальными данными: бронировать встречи, проверять статус заказа, искать информацию в базе. Пример для планирования встреч:

# function_calling.py
from google import genai
from google.genai import types

# Описываем функцию, которую Gemini может вызвать
schedule_meeting_function = {
    "name": "schedule_meeting",
    "description": "Создаёт встречу с указанными участниками",
    "parameters": {
        "type": "object",
        "properties": {
            "attendees": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Список email участников",
            },
            "date": {
                "type": "string",
                "description": "Дата встречи (ГГГГ-ММ-ДД)",
            },
            "time": {
                "type": "string",
                "description": "Время встречи (ЧЧ:ММ)",
            },
            "topic": {
                "type": "string",
                "description": "Тема встречи",
            },
        },
        "required": ["attendees", "date", "time", "topic"],
    },
}

# Реальная функция, которую мы будем вызывать
def schedule_meeting(attendees: list, date: str, time: str, topic: str):
    # Здесь может быть вызов Google Calendar API, БД и т.д.
    return f"Встреча '{topic}' запланирована на {date} в {time} с {', '.join(attendees)}"

# Интеграция с Gemini
client = genai.Client()
tools = types.Tool(function_declarations=[schedule_meeting_function])
config = types.GenerateContentConfig(tools=[tools])

response = client.models.generate_content(
    model="gemini-3-flash-preview",
    contents="Запланируй встречу с bob@company.com и alice@company.com на 15.04.2026 в 14:00 по поводу запуска продукта",
    config=config,
)

if response.candidates[0].content.parts[0].function_call:
    fc = response.candidates[0].content.parts[0].function_call
    print(f"Gemini хочет вызвать: {fc.name}")
    print(f"С аргументами: {fc.args}")
    # Вызываем нашу функцию с аргументами от Gemini
    result = schedule_meeting(**fc.args)
    print(result)

Это мощный паттерн, который превращает простого чат-бота в настоящего агента.

Шаг 5. Кэширование ответов (чтобы не платить дважды)

Gemini API тарифицируется по токенам. Если пользователи часто задают одни и те же вопросы (например, «как сбросить пароль»), вы будете платить за каждый запрос. Решение — простой in-memory кэш:

# cache.py
import hashlib
from datetime import datetime, timedelta
from typing import Optional

class SimpleCache:
    def __init__(self, ttl_seconds: int = 3600):
        self._cache = {}
        self._ttl = ttl_seconds
    
    def get(self, key: str) -> Optional[str]:
        if key in self._cache:
            value, timestamp = self._cache[key]
            if datetime.now() - timestamp < timedelta(seconds=self._ttl):
                return value
            else:
                del self._cache[key]
        return None
    
    def set(self, key: str, value: str):
        self._cache[key] = (value, datetime.now())
    
    @staticmethod
    def hash_prompt(prompt: str) -> str:
        return hashlib.md5(prompt.lower().strip().encode()).hexdigest()

В хендлере добавляем:

cache = SimpleCache(ttl_seconds=7200)  # 2 часа

@router.message()
async def handle_message(message: types.Message):
    prompt_hash = cache.hash_prompt(message.text)
    cached = cache.get(prompt_hash)
    
    if cached:
        await message.answer(cached)
        return
    
    response = await gemini.generate(message.text)
    cache.set(prompt_hash, response)
    await message.answer(response)

Грабли (то, о чём не пишут в документации)

Грабли №1: 429 ошибка в пятницу вечером

Самая частая проблема — RESOURCE_EXHAUSTED (429). Причины:

  1. RPM-лимит. Бесплатный тариф даёт 5 RPM, платный — до 60 RPM

  2. TPM-лимит. Ограничение на количество токенов в минуту (1M для бесплатного тарифа).

  3. RPD-лимит. Ограничение на количество запросов в день (25–1500 в зависимости от модели).

Решение: используйте rate limiter на стороне бота:

# rate_limiter.py
import asyncio
import time

class AsyncRateLimiter:
    def __init__(self, max_requests: int, time_window: int = 60):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = []
        self._lock = asyncio.Lock()
    
    async def acquire(self):
        async with self._lock:
            now = time.time()
            # Удаляем старые запросы
            self.requests = [t for t in self.requests if now - t < self.time_window]
            
            if len(self.requests) >= self.max_requests:
                sleep_time = self.time_window - (now - self.requests[0])
                await asyncio.sleep(sleep_time + 0.1)
                return await self.acquire()
            
            self.requests.append(now)

Грабли №2: Таймауты от Telegram

Telegram ждёт ответ от бота 10 секунд. Если Gemini думает дольше, вы получите таймаут и пользователь увидит ошибку. Решение — показывать промежуточные сообщения или использовать streaming:

# streaming_example.py
async def generate_stream(prompt: str):
    for chunk in client.models.generate_content_stream(
        model="gemini-3-flash-preview",
        contents=prompt
    ):
        yield chunk.text

В aiogram можно обновлять одно сообщение:

sent = await message.answer("Думаю...")
full_response = ""
async for chunk in gemini.generate_stream(message.text):
    full_response += chunk
    if len(full_response) % 100 == 0:  # обновляем каждые 100 символов
        try:
            await sent.edit_text(full_response)
        except:
            pass
await sent.edit_text(full_response)

Грабли №3: Модели устаревают быстрее, чем вы читаете документацию

Gemini обновляется каждые несколько месяцев. На момент написания статьи актуальны:
gemini-3-flash-preview — быстрая и дешёвая модель
gemini-3.1-pro-preview — мощная, но дорогая ($2.00 за 1M входных токенов, $12.00 за 1M выходных)
С марта 2026 года Pro-модели недоступны на бесплатном тарифе — только платная подписка

Заключение

Интеграция Gemini API в Telegram-бота — задача на пару часов, если знать все подводные камни. Ключевые выводы:

  1. Используйте asyncio.to_thread или очереди, чтобы не блокировать ивент-луп aiogram.

  2. Внедряйте rate limiter и retry-логику до того, как получите 429 в продакшене.

  3. Кэшируйте частые запросы — экономия на токенах может быть существенной.

  4. Function Calling — ваш друг, если бот должен взаимодействовать с реальными сервисами.

Что бы вы добавили? Сталкивались ли вы с проблемами при интеграции LLM в ботов? Может, у кого-то есть опыт использования Gemini API в высоконагруженных проектах? Давайте обсудим в комментариях — особенно интересно услышать про ваши кейсы с 429 и таймаутами.

Комментарии (4)


  1. janvarev
    19.04.2026 15:41

    особенно интересно услышать про ваши кейсы с 429 и таймаутами.

    Можно использовать СУПЕРСИЛУ ДЕНЯК и оплачивать API-запросы, чтобы почти никогда не сталкиваться с лимитами.

    Тем более что вроде у вас не студенческий проект (там понятен поиск бесплатного), а решение для техподдержки. А API оплатить имхо быстрее, чем тратить время и силы разработчиков для сложной внутренней логики.

    ...впрочем, кеширование в любом случае пригодилось бы, да.


  1. Smartor
    19.04.2026 15:41

    Позвольте покритиковать вас немножко, по-доброму:)
    Во-первых, бесплатные тарифы - это для хобби-проектов и прочих экспериментов. И там специально режут лимиты, чтобы всякие хитрые коммерсанты не использовали спонсируемые мощности в целях зарабатывания денег. Для использования в корпоративных целях естественно не искать халяву, а платить, как положено:)

    "Функциональный вызов (Function Calling) для расширения возможностей"

    Можно также сделать командами, это надёжнее и защищает от пространных запросов и "от дурака".

    Грабли №1: 429 ошибка в пятницу вечером
    Решение: используйте rate limiter на стороне бота:

    Это вообще не решение, что оно решает? Решение проблемы - это использование другой модели (даже "других моделей") при ошибках типа "нет доступа".

    Грабли №2: Таймауты от Telegram

    Telegram ждёт ответ от бота 10 секунд. Если Gemini думает дольше, вы получите таймаут и пользователь увидит ошибку. Решение — показывать промежуточные сообщения или использовать streaming

    Это потому что вы неправильно делаете. Зачем вам потоковое соединение? Получили ВЕСЬ ответ от нейросети, сохранили(!), переслали сохранённое в чат. Так у вас будет контроль на каждом этапе. Если сообщение слишком большое - аккуратно разрезали на части, применили обработку незакрытых тегов, отослали в чат по частям. Ведь у вас бывают большие сообщения, или вы просто зажали длину ответов количеством исходящих от нейросети токенов?:)

    Про стриминг - это тоже не решение "проблемы 10 секунд". Это про красоту (но стриминг нравится не всем) .
    Стриминг работает хорошо при нормальных условиях, а при работе через средства обхода блокировок, он может прерываться, или зависнуть, или идти больше 10 секунд у клиента - и что вы будете делать в таком случае, как это обрабатывать? Вам всё равно нужен будет запасной план из сохранённого сообщения с предобработкой (см. предыдущий абзац). К тому же, для "показывать промежуточные сообщения" в Телеграме тоже есть свои лимиты, при любом сбое в сети или в телеграме вы легко получите каскад проблем. Я не знаю, может, Aiogram как-то сам с этим разберётся, но сомневаюсь.

    Для предотвращения состояний гонки - везде, абсолютно везде используйте очереди выполнения, длинные сообщения не зажимайте лимитами, а режьте на куски с валидацией с аварийным fallback на текст без тегов, всегда имейте варианты на отказ API и отказы любого этапа работы. Относитесь к сетевым ресурсам как к ненадёжным элементам :)

    Удачи в разработке:)


    1. kardanShurup Автор
      19.04.2026 15:41

      1. Про бесплатные тарифы и коммерцию

      Абсолютно согласен. В статье я сознательно сделал акцент на бесплатном тарифе, потому что целевая аудитория туториала — разработчики, которые только начинают щупать Gemini API, пишут pet-проекты или внутренние утилиты для команды. Для них 5 RPM — уже проблема, о которой они узнают постфактум. Платный тариф, конечно, снимает большинство ограничений, но и там есть потолок, и rate limiter на стороне бота не помешает (хотя бы для защиты от случайных флудов). В корпоративной среде, как верно замечено, экономия на токенах через кэш и очереди — это не про халяву, а про архитектурную гигиену и снижение счетов на круглых цифрах.

      2. Function Calling vs Команды

      И снова правда. Команды (/schedule/book) — более надёжный и детерминированный способ. Function Calling я включил как демонстрацию возможностей API и как мостик к более сложным агентным сценариям, где пользователь формулирует запрос свободным текстом. В реальном проекте я бы делал гибрид:

      • Команды для критичных действий с чёткими параметрами.

      • Function Calling как fallback для «поговорить с ботом как с секретарём» с обязательной валидацией аргументов и подтверждением от пользователя перед вызовом реального сервиса.
        Так и «от дурака» защита, и UX приятнее.

      3. Rate Limiter и ошибка 429

      Тут позволю себе уточнить, потому что мы, кажется, немного о разном.

      "Решение проблемы - это использование другой модели при ошибках типа 'нет доступа'"

      Переключение на другую модель — это стратегия fallback, а не rate limiting. Rate limiter на стороне бота нужен, чтобы не доводить до 429 вообще. Он ограничивает поток запросов к API, соблюдая заявленный RPM (например, не чаще 10 запросов в секунду для платной Gemini 3 Flash). Если лимит всё же превысили (скажем, из-за пиковой нагрузки или сбоя самого Google), то fallback-стратегия — следующий эшелон: другая модель, кэшированный ответ, или "извините, повторите позже".

      В статье я описал именно rate limiter для профилактики 429. Fallback-стратегии заслуживают отдельной статьи, и в комментарии отлично подсвечено, что о них нельзя забывать.

      4. Таймауты Telegram и Streaming

      Здесь самый сочный момент. Попробую разложить по полочкам.

      "Получили ВЕСЬ ответ от нейросети, сохранили, переслали сохранённое"

      Да, это идеальный сценарий. Именно так и надо делать в production: сначала дождаться полного ответа от LLM, потом обработать (разрезать, экранировать Markdown/HTML), потом отправлять.

      Проблема, которую я решал стримингом — не техническая, а UX-психологическая. Пользователь жмёт кнопку «Отправить» и ждёт. Если ответ генерируется 15 секунд, а бот молчит, пользователь:

      • Начинает долбить кнопку повторно (создавая каскад запросов).

      • Думает, что бот сломался, и уходит.

      Streaming + промежуточное редактирование сообщения (метод edit_message_text) — это способ сказать пользователю: «Я работаю, видишь? Буквы бегут». Это не решение таймаута, а маскировка задержки под активную работу.

      Что касается обхода блокировок и нестабильного соединения — вы абсолютно правы: в таких условиях стриминг может стать источником проблем. Поэтому я всегда комбинирую:

      1. Стриминг для визуального фидбека.

      2. Параллельное накопление полного ответа в памяти.

      3. По завершении генерации — финальное редактирование сообщения целиком (или отправка частями, если длинное).

      Если стрим оборвался — у меня есть полный ответ, и я могу отправить его обычным методом. Aiogram, кстати, предоставляет удобные декораторы для обработки ошибок редактирования (например, если сообщение было удалено), но основная логика резервирования ложится на плечи разработчика.

      "длинные сообщения не зажимайте лимитами, а режьте на куски с валидацией"

      Здесь +100500. В коде статьи я как раз привёл примитивный пример разрезания строки на куски по 4000 символов. Для MarkdownV2 это, конечно, недостаточно — нужно умное разрезание с учётом открытых/закрытых тегов, иначе сообщение не отправится.

      Если кратко:

      По тарифам — согласен, статья скорее для тех, кто только начинает. В проде — только платные модели и нормальный мониторинг.

      По Function Calling — да, команды надёжнее. В гибридном подходе им самое место.

      По rate limiter'у — вы правы, он именно профилактический. Fallback на другие модели — следующий уровень защиты, о котором стоило упомянуть.

      По стримингу и таймаутам — тут мы говорим про одно и то же, но с разных углов. Я про UX-иллюзию, вы про надёжность инфраструктуры. Истина как всегда в комбинации: стримим для души, а сохраняем и режем — для гарантии доставки.

      Про очереди — святая правда. Без них любое сетевое взаимодействие в асинхронном боте превращается в гонки.

      Cпасибо, утащил несколько идей для доработки.


  1. markoni
    19.04.2026 15:41

    Платить не пробовали? Реально помогает, и цены - не то, чтобы космические.