Команда Python for Devs подготовила перевод статьи о том, как найти самый быстрый способ загрузки данных в PostgreSQL с помощью Python. Автор пошагово сравнил разные методы — от построчных вставок до COPY с потоковой генерацией CSV — и показал, как ускорить процесс более чем в 250 раз при нулевом потреблении памяти.


Как разработчики, часто выполняющие роль «сантехников для данных», мы нередко сталкиваемся с задачей загрузки данных, полученных из удалённого источника, в наши системы. Если повезёт, данные будут сериализованы в формате JSON или YAML. Если повезёт меньше — получаем Excel-таблицу или CSV-файл, который всегда почему-то сломан.

Данные от крупных компаний или из старых систем почти всегда закодированы каким-то странным способом, а системные администраторы уверены, что помогают нам, запаковывая файлы в архив (пожалуйста, используйте gzip) или разбивая их на куски с случайными именами.

Современные сервисы иногда предоставляют вменяемый API, но чаще приходится скачивать файл с FTP, SFTP, S3 или из какой-то проприетарной системы, которая работает только под Windows.

В этой статье мы разберём лучший способ импортировать «грязные» данные из удалённого источника в PostgreSQL.

Чтобы предложить рабочее решение для реального кейса, мы определили следующие условия:

  • Данные загружаются из удалённого источника.

  • Данные грязные и требуют преобразования.

  • Данные большие.

Настройка: пивоварня

Я нашёл отличный публичный API связанный c пивом, поэтому будем загружать данные в таблицу beer в базе данных.

Данные

Один объект пива из API выглядит так:

$ curl https://api.punkapi.com/v2/beers/?per\_page=1&page=1
[
    {
        "id": 1,
        "name": "Buzz",
        "tagline": "A Real Bitter Experience.",
        "first_brewed": "09/2007",
        "description": "A light, crisp and bitter IPA ...",
        "image_url": "https://images.punkapi.com/v2/keg.png",
        "abv": 4.5,
        "ibu": 60,
        "target_fg": 1010,
        "target_og": 1044,
        "ebc": 20,
        "srm": 10,
        "ph": 4.4,
        "attenuation_level": 75,
        "volume": {
            "value": 20,
            "unit": "litres"
        },
        "contributed_by": "Sam Mason <samjbmason>"
        "brewers_tips": "The earthy and floral aromas from...",
        "boil_volume": {},
        "method": {},
        "ingredients": {},
        "food_pairing": [],
    }
]

Я немного обрезал вывод для краткости, но информации о пиве здесь очень много. В этой статье мы хотим загрузить в таблицу базы данных все поля до brewers_tips.

Поле volume вложенное. Нам нужно вытащить только значение и сохранить его в поле volume в таблице:

volume = beer['volume']['value']

Поле first_brewed содержит только год и месяц, а иногда только год. Нужно преобразовать это значение в корректную дату.
Например, значение 09/2007 должно стать 2007-09-01.
А значение 2006 — 2006-01-01.

Напишем простую функцию, которая преобразует текстовое значение поля в объект datetime.date:

import datetime

def parse_first_brewed(text: str) -> datetime.date:
    parts = text.split('/')
    if len(parts) == 2:
        return datetime.date(int(parts[1]), int(parts[0]), 1)
    elif len(parts) == 1:
        return datetime.date(int(parts[0]), 1, 1)
    else:
        assert False, 'Unknown date format'

Быстро проверим, что она работает:

>>> parse_first_brewed('09/2007')
datetime.date(2007, 9, 1)

>>> parse_first_brewed('2006')
datetime.date(2006, 1, 1)

В реальных задачах преобразования могут быть гораздо сложнее. Но для нашей цели этого более чем достаточно.

Загрузка данных

API возвращает результаты постранично. Чтобы инкапсулировать пагинацию, создадим генератор, который будет выдавать объекты пива по одному:

from typing import Iterator, Dict, Any
from urllib.parse import urlencode
import requests


def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]:
    session = requests.Session()
    page = 1
    while True:
        response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
            'page': page,
            'per_page': page_size
        }))
        response.raise_for_status()

        data = response.json()
        if not data:
            break

        yield from data

        page += 1

Чтобы воспользоваться функцией-генератором, просто вызываем её и итерируемся:

>>> beers = iter_beers_from_api()
>>> next(beers)
{'id': 1,
 'name': 'Buzz',
 'tagline': 'A Real Bitter Experience.',
 'first_brewed': '09/2007',
 'description': 'A light, crisp and bitter IPA brewed...',
 'image_url': 'https://images.punkapi.com/v2/keg.png',
 'abv': 4.5,
 'ibu': 60,
 'target_fg': 1010,
...
}
>>> next(beers)
{'id': 2,
 'name': 'Trashy Blonde',
 'tagline': "You Know You Shouldn't",
 'first_brewed': '04/2008',
 'description': 'A titillating, ...',
 'image_url': 'https://images.punkapi.com/v2/2.png',
 'abv': 4.1,
 'ibu': 41.5,

Вы заметите, что первый результат каждой страницы появляется чуть дольше. Это потому, что в этот момент выполняется сетевой запрос для получения страницы.

Создание таблицы в базе данных

Следующий шаг — создать таблицу в базе, в которую будем загружать данные.

Сначала создаём базу данных:

$ createdb -O haki testload

Замените haki на имя вашего локального пользователя.

Чтобы подключиться к базе PostgreSQL из Python, используем библиотеку psycopg:

$ python -m pip install psycopg2

Создадим подключение к базе:

import psycopg2

connection = psycopg2.connect(
    host="localhost",
    database="testload",
    user="haki",
    password=None,
)
connection.autocommit = True

Мы включаем autocommit=True, чтобы каждая выполненная команда сразу применялась. Для целей этой статьи это приемлемо.

Теперь, когда подключение готово, напишем функцию для создания таблицы:

def create_staging_table(cursor) -> None:
    cursor.execute("""
        DROP TABLE IF EXISTS staging_beers;
        CREATE UNLOGGED TABLE staging_beers (
            id                  INTEGER,
            name                TEXT,
            tagline             TEXT,
            first_brewed        DATE,
            description         TEXT,
            image_url           TEXT,
            abv                 DECIMAL,
            ibu                 DECIMAL,
            target_fg           DECIMAL,
            target_og           DECIMAL,
            ebc                 DECIMAL,
            srm                 DECIMAL,
            ph                  DECIMAL,
            attenuation_level   DECIMAL,
            brewers_tips        TEXT,
            contributed_by      TEXT,
            volume              INTEGER
        );
    """)

Эта функция принимает курсор и создаёт unlogged-таблицу с именем staging_beers.

Unlogged table: Данные, записанные в unlogged-таблицу, не попадают в журнал предзаписи (WAL), что делает её удобной для временных промежуточных таблиц. Однако стоит помнить, что unlogged-таблицы не будут восстановлены в случае сбоя и не участвуют в репликации.

Пример вызова функции с использованием ранее созданного подключения:

>>> with connection.cursor() as cursor:
>>>     create_staging_table(cursor)

Теперь можно переходить к следующему шагу.

Метрики

В этой статье нас интересуют две основные метрики: время и память.

Измерение времени

Чтобы измерить время выполнения каждого метода, используем встроенный модуль time:

>>> import time
>>> start = time.perf_counter()
>>> time.sleep(1) # выполняем работу
>>> elapsed = time.perf_counter() - start
>>> print(f'Time {elapsed:0.4}')
Time 1.001

Функция perf_counter предоставляет таймер с наибольшим доступным разрешением, что делает её идеальной для наших целей.

Измерение памяти

Для измерения потребления памяти используем пакет memory-profiler:

$ python -m pip install memory-profiler

Этот пакет показывает использование памяти и приращение памяти на каждой строке кода. Это очень полезно при оптимизации по памяти. Для примера возьмём пример с PyPI:

$ python -m memory_profiler example.py
Line #    Mem usage  Increment   Line Contents
==============================================
     3                           @profile
     4      5.97 MB    0.00 MB   def my_func():
     5     13.61 MB    7.64 MB       a = [1] * (10 ** 6)
     6    166.20 MB  152.59 MB       b = [2] * (2 * 10 ** 7)
     7     13.61 MB -152.59 MB       del b
     8     13.61 MB    0.00 MB       return a

Самая интересная колонка здесь — Increment, показывающая, сколько памяти дополнительно выделяется на каждой строке.

В этой статье нас интересует пиковое использование памяти функцией. Пиковое значение — это разница между стартовым значением колонки Mem usage и её максимальным значением (так называемая «high watermark»).

Чтобы получить список значений Mem usage, используем функцию memory_usage из memory_profiler:

>>> from memory_profiler import memory_usage
>>> mem, retval = memory_usage((fn, args, kwargs), retval=True, interval=1e-7)

При таком вызове memory_usage выполняет функцию fn с переданными аргументами args и kwargs, а также запускает отдельный процесс для отслеживания использования памяти каждые interval секунд.

Для очень быстрых операций функция fn может быть выполнена несколько раз. Чтобы этого избежать, устанавливаем interval меньше 1e-6, тогда функция выполнится только один раз.

Аргумент retval говорит функции вернуть результат выполнения fn.

Декоратор profile

Чтобы объединить всё вместе, создадим следующий декоратор, который будет измерять и выводить время и память:

import time
from functools import wraps
from memory_profiler import memory_usage

def profile(fn):
    @wraps(fn)
    def inner(*args, **kwargs):
        fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
        print(f'\n{fn.__name__}({fn_kwargs_str})')

        # Измеряем время
        t = time.perf_counter()
        retval = fn(*args, **kwargs)
        elapsed = time.perf_counter() - t
        print(f'Time   {elapsed:0.4}')

        # Измеряем память
        mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)

        print(f'Memory {max(mem) - min(mem)}')
        return retval

    return inner

Чтобы исключить взаимное влияние измерения времени на измерение памяти и наоборот, мы выполняем функцию дважды:

  • первый раз — чтобы замерить время,

  • второй раз — чтобы замерить потребление памяти.

Декоратор выводит имя функции, переданные именованные аргументы, а затем сообщает время и память, которые она использовала:

>>> @profile
>>> def work(n):
>>>     for i in range(n):
>>>         2 ** n

>>> work(10)
work()
Time   0.06269
Memory 0.0

>>> work(n=10000)
work(n=10000)
Time   0.3865
Memory 0.0234375

Обратите внимание: выводятся только именованные аргументы. Это сделано специально — мы будем использовать это в параметризованных тестах.

Бенчмарк

На момент написания статьи в API с пивом всего 325 сортов. Чтобы поработать с большим набором данных, мы размножим его 100 раз и будем хранить в памяти. Итоговый датасет будет содержать 32 500 записей:

>>> beers = list(iter_beers_from_api()) * 100
>>> len(beers)
32,500

Чтобы имитировать удалённый API, наши функции будут принимать итераторы, похожие на возвращаемое значение iter_beers_from_api:

def process(beers: Iterator[Dict[str, Any]])) -> None:
    # Process beers...

Для бенчмарка мы будем импортировать данные о пиве в базу данных. Чтобы исключить внешние факторы (например, сеть), мы заранее получим данные из API и раздаём их локально.

Чтобы получить точные замеры времени, «подделаем» удалённый API:

>>> beers = list(iter_beers_from_api()) * 100
>>> process(beers)

В реальной ситуации вы бы вызывали функцию iter_beers_from_api напрямую:

>>> process(iter_beers_from_api())

Мы готовы начать!

Вставка строк по одной

Для базовой линии начнём с самого простого подхода — вставлять строки по одной:

@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        for beer in beers:
            cursor.execute("""
                INSERT INTO staging_beers VALUES (
                    %(id)s,
                    %(name)s,
                    %(tagline)s,
                    %(first_brewed)s,
                    %(description)s,
                    %(image_url)s,
                    %(abv)s,
                    %(ibu)s,
                    %(target_fg)s,
                    %(target_og)s,
                    %(ebc)s,
                    %(srm)s,
                    %(ph)s,
                    %(attenuation_level)s,
                    %(brewers_tips)s,
                    %(contributed_by)s,
                    %(volume)s
                );
            """, {
                **beer,
                'first_brewed': parse_first_brewed(beer['first_brewed']),
                'volume': beer['volume']['value'],
            })

Обратите внимание: по мере итерации по списку мы преобразуем first_brewed в datetime.date и извлекаем volume из вложенного поля volume.

Запуск этой функции даёт следующий вывод:

>>> insert_one_by_one(connection, beers)
insert_one_by_one()
Time   128.8
Memory 0.08203125

Функция потратила 129 секунд на импорт 32 тыс. строк. Профилировщик памяти показывает, что функция потребляла совсем немного памяти.

Интуитивно понятно, что вставка строк по одной — не самый эффективный подход. Постоянные переключения контекста между программой и базой данных, скорее всего, сильно тормозят процесс.

Execute Many

Psycopg2 предоставляет способ вставлять сразу много строк с помощью executemany. Из документации:

Выполнить операцию с базой данных (запрос или команду) для всех кортежей параметров или отображений, найденных в последовательности vars_list.

Звучит многообещающе!

Попробуем импортировать данные с использованием executemany:

@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        all_beers = [{
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers]

        cursor.executemany("""
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, all_beers)

Эта функция очень похожа на предыдущую, и преобразования те же самые. Главное отличие в том, что мы сначала преобразуем все данные в памяти, и только потом загружаем их в базу.

Запуск даёт такой вывод:

>>> insert_executemany(connection, beers)
insert_executemany()
Time   124.7
Memory 2.765625

Не слишком радует. Время лишь чуть лучше, зато функция теперь потребляет 2.7 МБ памяти.

Чтобы оценить масштаб, JSON-файл, содержащий только импортируемые данные, весит на диске 25 МБ. Если сохранить пропорцию, импорт 1 ГБ таким методом потребует около 110 МБ памяти.

Вставка с итератором для executemany

Предыдущий подход потреблял много памяти, потому что преобразованные данные сохранялись в памяти перед тем, как их обрабатывал psycopg.

Посмотрим, можно ли использовать итератор, чтобы не хранить данные в памяти:

@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        cursor.executemany("""
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, ({
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers))

Отличие в том, что преобразованные данные «стримятся» в executemany через итератор.

Эта функция даёт такой результат:

>>> insert_executemany_iterator(connection, beers)
insert_executemany_iterator()
Time   129.3
Memory 0.0

Наше «стриминговое» решение сработало как задумано: нам удалось свести потребление памяти к нулю. Однако по времени всё осталось примерно на уровне метода с построчной вставкой.

Execute Batch

В документации psycopg в разделе «fast execution helpers» есть примечание про executemany:

Текущая реализация executemany() (мягко говоря) не отличается высокой производительностью. Эти функции можно использовать для ускорения повторного выполнения одного и того же выражения с набором параметров. За счёт сокращения количества обращений к серверу производительность может быть на порядки выше, чем при использовании executemany().

То есть мы изначально шли не тем путём!

Сразу под этим разделом описана функция execute_batch:

Выполняет группу выражений с меньшим количеством обращений к серверу.

Реализуем загрузку с использованием execute_batch:

import psycopg2.extras

@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        all_beers = [{
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers]

        psycopg2.extras.execute_batch(cursor, """
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, all_beers)

Выполним функцию:

>>> insert_execute_batch(connection, beers)
insert_execute_batch()
Time   3.917
Memory 2.50390625

Вау! Это огромный скачок. Функция завершилась чуть меньше чем за 4 секунды — примерно в 33 раза быстрее, чем исходные 129 секунд.

Execute Batch с итератором

Функция execute_batch использует меньше памяти, чем executemany для тех же данных. Попробуем полностью убрать хранение данных в памяти и «передавать» их в execute_batch с помощью итератора:

@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        iter_beers = ({
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers)

        psycopg2.extras.execute_batch(cursor, """
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, iter_beers)

Выполнение функции:

>>> insert_execute_batch_iterator(connection, beers)
insert_execute_batch_iterator()
Time   4.333
Memory 0.2265625

Мы получили примерно то же самое время, но с меньшим использованием памяти.

Execute Batch с итератором и размером страницы

При чтении документации по execute_batch обращает на себя внимание аргумент page_size:

page_size – максимальное количество элементов argslist, которые включаются в одно SQL-выражение. Если элементов больше — функция выполнит несколько выражений.

В документации также указано, что функция работает быстрее за счёт уменьшения количества обращений к серверу. Логично предположить, что больший page_size уменьшит количество таких обращений и ускорит загрузку.

Добавим аргумент page_size в нашу функцию, чтобы поэкспериментировать:

@profile
def insert_execute_batch_iterator(
    connection,
    beers: Iterator[Dict[str, Any]],
    page_size: int = 100,
) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        iter_beers = ({
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers)

        psycopg2.extras.execute_batch(cursor, """
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, iter_beers, page_size=page_size)

Теперь мы можем варьировать page_size и сравнивать скорость загрузки, чтобы подобрать оптимальное значение.

Размер страницы по умолчанию — 100. Проведём бенчмарк с разными значениями и сравним результаты:

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1)
insert_execute_batch_iterator(page_size=1)
Time   130.2
Memory 0.0

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=100)
insert_execute_batch_iterator(page_size=100)
Time   4.333
Memory 0.0

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1000)
insert_execute_batch_iterator(page_size=1000)
Time   2.537
Memory 0.2265625

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=10000)
insert_execute_batch_iterator(page_size=10000)
Time   2.585
Memory 25.4453125

Получились любопытные результаты, разберём по пунктам:

  • 1: Результаты похожи на вставку строк по одной.

  • 100: Это значение по умолчанию для page_size, поэтому результаты близки к предыдущему бенчмарку.

  • 1000: Время примерно на 40% лучше, при этом память используется мало.

  • 10000: По времени почти не быстрее, чем при размере 1000, но потребление памяти значительно выше.

Результаты показывают компромисс между памятью и скоростью. В данном случае «золотая середина» — page_size = 1000.

Execute Values

На этом «сокровища» в документации psycopg не заканчиваются. Пока листал документацию, на глаза попалась ещё одна функция — execute_values:

Выполнить выражение с использованием VALUES и последовательности параметров.

execute_values формирует большой список VALUES прямо в запросе.

Попробуем:

import psycopg2.extras

@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        psycopg2.extras.execute_values(cursor, """
            INSERT INTO staging_beers VALUES %s;
        """, [(
            beer['id'],
            beer['name'],
            beer['tagline'],
            parse_first_brewed(beer['first_brewed']),
            beer['description'],
            beer['image_url'],
            beer['abv'],
            beer['ibu'],
            beer['target_fg'],
            beer['target_og'],
            beer['ebc'],
            beer['srm'],
            beer['ph'],
            beer['attenuation_level'],
            beer['brewers_tips'],
            beer['contributed_by'],
            beer['volume']['value'],
        ) for beer in beers])

Импорт пива с помощью этой функции:

>>> insert_execute_values(connection, beers)
insert_execute_values()
Time   3.666
Memory 4.50390625

Сразу из коробки мы получили небольшое ускорение по сравнению с execute_batch. Однако потребление памяти чуть выше.

Execute Values c итератором

Как и раньше, чтобы снизить потребление памяти, постараемся не хранить данные в памяти и использовать итератор вместо списка:

@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        psycopg2.extras.execute_values(cursor, """
            INSERT INTO staging_beers VALUES %s;
        """, ((
            beer['id'],
            beer['name'],
            beer['tagline'],
            parse_first_brewed(beer['first_brewed']),
            beer['description'],
            beer['image_url'],
            beer['abv'],
            beer['ibu'],
            beer['target_fg'],
            beer['target_og'],
            beer['ebc'],
            beer['srm'],
            beer['ph'],
            beer['attenuation_level'],
            beer['brewers_tips'],
            beer['contributed_by'],
            beer['volume']['value'],
        ) for beer in beers))

Выполнение функции даёт следующие результаты:

>>> insert_execute_values_iterator(connection, beers)
insert_execute_values_iterator()
Time   3.677
Memory 0.0

По времени почти то же самое, а память снова равна нулю.

Execute Values с итератором и размером страницы

Как и execute_batch, функция execute_values тоже принимает аргумент page_size:

@profile
def insert_execute_values_iterator(
    connection,
    beers: Iterator[Dict[str, Any]],
    page_size: int = 100,
) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        psycopg2.extras.execute_values(cursor, """
            INSERT INTO staging_beers VALUES %s;
        """, ((
            beer['id'],
            beer['name'],
            beer['tagline'],
            parse_first_brewed(beer['first_brewed']),
            beer['description'],
            beer['image_url'],
            beer['abv'],
            beer['ibu'],
            beer['target_fg'],
            beer['target_og'],
            beer['ebc'],
            beer['srm'],
            beer['ph'],
            beer['attenuation_level'],
            beer['brewers_tips'],
            beer['contributed_by'],
            beer['volume']['value'],
        ) for beer in beers), page_size=page_size)

Запуск с разными размерами страниц:

>>> insert_execute_values_iterator(connection, iter(beers), page_size=1)
insert_execute_values_iterator(page_size=1)
Time   127.4
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=100)
insert_execute_values_iterator(page_size=100)
Time   3.677
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=1000)
insert_execute_values_iterator(page_size=1000)
Time   1.468
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=10000)
insert_execute_values_iterator(page_size=10000)
Time   1.503
Memory 2.25

Как и в случае с execute_batch, здесь виден компромисс между памятью и скоростью. Оптимум снова примерно на уровне page_size = 1000. Однако с execute_values при том же размере страницы мы получили результат примерно на 20% быстрее, чем с execute_batch.

COPY

В официальной документации PostgreSQL есть целый раздел о заполнении базы данных. Согласно документации, самый быстрый способ загрузить данные в базу — использовать команду COPY.

Чтобы вызывать COPY из Python, в psycopg есть специальная функция copy_from. Команда COPY ожидает CSV-файл. Попробуем преобразовать наши данные в CSV и загрузить их в базу с помощью copy_from:

import io

def clean_csv_value(value: Optional[Any]) -> str:
    if value is None:
        return r'\N'
    return str(value).replace('\n', '\\n')

@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        csv_file_like_object = io.StringIO()
        for beer in beers:
            csv_file_like_object.write('|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['contributed_by'],
                beer['brewers_tips'],
                beer['volume']['value'],
            ))) + '\n')
        csv_file_like_object.seek(0)
        cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')

Разберём по шагам:

  • clean_csv_value: преобразует одиночное значение.

    • Экранируем переводы строк: некоторые текстовые поля содержат переносы, поэтому заменяем \n → \\n.

    • Пустые значения превращаем в \N: строка "\N" — это значение по умолчанию, которым PostgreSQL обозначает NULL в COPY (это можно изменить опцией NULL).

  • csv_file_like_object: создаём «файлоподобный» объект через io.StringIO. Объект StringIO хранит строку, но ведёт себя как файл. В нашем случае — как CSV-файл.

  • csv_file_like_object.write: превращаем один объект пива в строку CSV.

    • Трансформации: здесь же выполняем преобразования first_brewed и volume.

    • Разделитель: в наборе данных есть поля со свободным текстом и запятыми. Чтобы избежать конфликтов, в качестве разделителя берём "|" (альтернатива — использовать QUOTE).

Посмотрим, окупились ли усилия:

>>> copy_stringio(connection, beers)
copy_stringio()
Time   0.6274
Memory 99.109375

COPY — самый быстрый способ из всех, что мы видели! Процесс завершился меньше чем за секунду. Однако этот метод заметно прожорливее по памяти: функция потребляет 99 МБ — это более чем вдвое больше размера нашего JSON-файла на диске.

Копирование данных из итератора строк

Один из главных недостатков использования COPY с StringIO — весь файл создаётся в памяти. А что, если вместо формирования целого файла в памяти мы создадим файловый объект, который будет служить буфером между удалённым источником и командой COPY? Такой буфер будет получать JSON через итератор, очищать и преобразовывать данные и на выходе выдавать «чистый» CSV.

Вдохновившись этим ответом на Stack Overflow, мы создали объект, который «питается» от итератора и предоставляет интерфейс, как у файла:

from typing import Iterator, Optional
import io

class StringIteratorIO(io.TextIOBase):
    def __init__(self, iter: Iterator[str]):
        self._iter = iter
        self._buff = ''

    def readable(self) -> bool:
        return True

    def _read1(self, n: Optional[int] = None) -> str:
        while not self._buff:
            try:
                self._buff = next(self._iter)
            except StopIteration:
                break
        ret = self._buff[:n]
        self._buff = self._buff[len(ret):]
        return ret

    def read(self, n: Optional[int] = None) -> str:
        line = []
        if n is None or n < 0:
            while True:
                m = self._read1()
                if not m:
                    break
                line.append(m)
        else:
            while n > 0:
                m = self._read1(n)
                if not m:
                    break
                n -= len(m)
                line.append(m)
        return ''.join(line)

Чтобы показать, как это работает, вот как можно получить «файлоподобный» CSV-объект из списка чисел:

>>> gen = (f'{i},{i**2}\n' for i in range(3))
>>> gen
<generator object <genexpr> at 0x7f58bde7f5e8>
>>> f = StringIteratorIO(gen)
>>> print(f.read())
0,0
1,1
2,4

Обратите внимание, что мы использовали f как файл. Внутри он подтягивал строки из gen только тогда, когда его внутренний буфер строк был пуст.

Функция загрузки с использованием StringIteratorIO выглядит так:

@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        beers_string_iterator = StringIteratorIO((
            '|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']).isoformat(),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['brewers_tips'],
                beer['contributed_by'],
                beer['volume']['value'],
            ))) + '\n'
            for beer in beers
        ))
        cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|')

Главное отличие в том, что CSV с пивом потребляется по запросу, и данные не хранятся в памяти после использования.

Запустим функцию и посмотрим на результат:

>>> copy_string_iterator(connection, beers)
copy_string_iterator()
Time   0.4596
Memory 0.0

Отлично! Время небольшое, а память снова равна нулю.

Копирование данных из итератора строк с указанием размера буфера

Пытаясь «выжать» ещё каплю производительности, замечаем, что так же, как и page_size, команда COPY принимает схожий параметр size:

size — размер буфера, используемого для чтения из файла.

Добавим аргумент size в функцию:

@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        beers_string_iterator = StringIteratorIO((
            '|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']).isoformat(),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['brewers_tips'],
                beer['contributed_by'],
                beer['volume']['value'],
            ))) + '\n'
            for beer in beers
        ))
        cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)

Значение по умолчанию для size — 8192, то есть 2 ** 13, поэтому будем использовать степени двойки:

>>> copy_string_iterator(connection, iter(beers), size=1024)
copy_string_iterator(size=1024)
Time   0.4536
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=8192)
copy_string_iterator(size=8192)
Time   0.4596
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=16384)
copy_string_iterator(size=16384)
Time   0.4649
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=65536)
copy_string_iterator(size=65536)
Time   0.6171
Memory 0.0

В отличие от предыдущих примеров, здесь, похоже, нет компромисса между скоростью и памятью. Это логично, потому что метод изначально спроектирован так, чтобы не потреблять память. Тем не менее, при изменении размера буфера время меняется. Для нашего набора данных оптимальным оказалось значение по умолчанию — 8192.

Русскоязычное сообщество про Python

Друзья! Эту статью перевела команда Python for Devs — канала, где каждый день выходят самые свежие и полезные материалы о Python и его экосистеме. Подписывайтесь, чтобы ничего не пропустить!

Результаты

ФУНКЦИЯ                                         ВРЕМЯ (СЕК)   ПАМЯТЬ (МБ)
insert_one_by_one()                                128.8        0.08203125
insert_executemany()                                124.7        2.765625
insert_executemany_iterator()                       129.3        0.0
insert_execute_batch()                                3.917       2.50390625
insert_execute_batch_iterator(page_size=1)          130.2        0.0
insert_execute_batch_iterator(page_size=100)          4.333       0.0
insert_execute_batch_iterator(page_size=1000)         2.537       0.2265625
insert_execute_batch_iterator(page_size=10000)        2.585       25.4453125
insert_execute_values()                               3.666       4.50390625
insert_execute_values_iterator(page_size=1)         127.4        0.0
insert_execute_values_iterator(page_size=100)         3.677       0.0
insert_execute_values_iterator(page_size=1000)        1.468       0.0
insert_execute_values_iterator(page_size=10000)       1.503       2.25
copy_stringio()                                       0.6274     99.109375
copy_string_iterator(size=1024)                       0.4536      0.0
copy_string_iterator(size=8192)                       0.4596      0.0
copy_string_iterator(size=16384)                      0.4649      0.0
copy_string_iterator(size=65536)                      0.6171      0.0

Главный вопрос теперь: «Что использовать?» Как обычно, ответ: «Зависит от ситуации». У каждого метода есть свои плюсы и минусы, и каждый подходит под разные условия:

Вывод 1: Предпочитайте встроенные подходы для сложных типов данных.
executemanyexecute_values и execute_batch сами занимаются преобразованием типов из Python в типы базы данных. Варианты с CSV требуют экранирования.

Вывод 2: Предпочитайте встроенные подходы для небольших объемов данных.
Встроенные подходы читаемее и с меньшей вероятностью сломаются в будущем. Если время и память не критичны — оставляйте все простым!

Вывод 3: Предпочитайте подходы с COPY для больших объемов данных.
COPY лучше подходит для больших объемов, где память может стать проблемой.

Исходный код для этого бенчмарка доступен здесь.

Комментарии (0)


  1. Akuma
    24.09.2025 09:16

    Лучше конечно делать тесты на больших данных. Там лучше видно разницу.

    32 тыс строк - это ни о чем, просто засунул в транзакцию и ладно.


  1. TIEugene
    24.09.2025 09:16

    Вылить во временный CSV/in-memory и залить COPY.
    Щито?