Привет, Хабр.

Сегодня разбираем COPY в PostgreSQL. Это рабочая лошадка для массовой загрузки и выгрузки данных.

Что делает COPY и чем он отличается от INSERT

COPY переносит данные между таблицей и файлом или потоками STDIN/STDOUT. Вариант COPY FROM загружает, COPY TO выгружает. Умеет в форматы text, csv, binary.

Поддерживает параметры ON_ERROR, FREEZE, HEADER и HEADER MATCH, FORCE_*, ENCODING, WHERE, а также запуск внешних программ через PROGRAM. Это раза в два быстрее любого батчевого INSERT при равных условиях и заметно проще в эксплуатации.

Минимальные конструкции, которыми пользуемся:

-- выгрузка таблицы в CSV в поток
COPY public.events TO STDOUT WITH (FORMAT csv, HEADER true);

-- загрузка из CSV из потока
COPY public.events FROM STDIN WITH (FORMAT csv, HEADER true);

-- загрузка из файла на сервере (нужны права)
COPY public.events FROM '/var/lib/postgresql/import.csv' WITH (FORMAT csv, HEADER true);

-- через внешнюю программу (распаковка на лету)
COPY public.events FROM PROGRAM 'gzip -dc /data/import.csv.gz' WITH (FORMAT csv, HEADER true);

-- выгрузка результата запроса, не всей таблицы
COPY (
  SELECT id, payload, created_at
  FROM public.events
  WHERE created_at >= DATE '2025-01-01'
) TO STDOUT WITH (FORMAT csv, HEADER true);

COPY против \copy в psql

Важно разделять два мира. COPY ... FROM/TO 'filename' и COPY ... PROGRAM работают на стороне сервера и требуют специальных ролей. Файлы должны быть доступны именно серверу. Метакоманда \copy из psql это обёртка вокруг COPY ... FROM STDIN/TO STDOUT, и файлы читаются/пишутся на стороне клиента. Если нет серверных прав или файл лежит у вас локально, то используем уже \copy. Если нужно максимальное быстродействие и файлы уже на сервере,то тут уже нужен серверный COPY.

Пример для psql:

-- клиентская выгрузка в сжатый файл
\copy (SELECT * FROM public.events) TO PROGRAM 'gzip > /tmp/events.csv.gz' WITH (FORMAT csv, HEADER true)

-- клиентская загрузка
\copy public.events FROM '/home/user/import.csv' WITH (FORMAT csv, HEADER true)

Обработка ошибок при загрузке

По дефолту COPY FROM завершится на первой проблемной строке. В ситуациях «загружаем всё, что валидно, остальное в карантин» помогает ON_ERROR ignore:

COPY public.events FROM STDIN
  WITH (
    FORMAT csv,
    HEADER true,
    ON_ERROR ignore,
    LOG_VERBOSITY verbose
  );

При ignore все некорректные строки будут пропущены, в конце придёт NOTICE с количеством пропусков. С LOG_VERBOSITY verbose сервер дополнительно пишет, какая строка и какая колонка упали на конверсии. Это удобно, но не превращаем это в норму на постоянной основе: для чистых пайплайнов лучше staging-таблицы с текстовыми колонками, грузить туда, а затем явной валидацией и приведение типов вносить в целевую структуру.

Пример staging-потока:

-- сырой слой
CREATE UNLOGGED TABLE staging.events_raw (
  id_text text,
  payload_text text,
  created_at_text text
);

-- без ограничений, чтобы грузить максимально быстро
COPY staging.events_raw FROM STDIN WITH (FORMAT csv, HEADER true);

-- чистовой слой
INSERT INTO public.events (id, payload, created_at)
SELECT
  id_text::bigint,
  payload_text::jsonb,
  created_at_text::timestamptz
FROM staging.events_raw
WHERE id_text ~ '^\d+$'
  AND created_at_text IS NOT NULL
ON CONFLICT (id) DO UPDATE
SET payload = EXCLUDED.payload,
    created_at = EXCLUDED.created_at;

Где смотреть прогресс

Во время работы COPY сервер публикует состояние в pg_stat_progress_copy.

SELECT pid, datname, relid::regclass AS relation, command, type, bytes_processed, bytes_total,
       tuples_processed, tuples_excluded, error_count
FROM pg_stat_progress_copy;

tuples_excluded растёт, если используется WHERE и строки отбрасываются. error_count полезен при ON_ERROR ignore.

Форматы: когда какой

Три режима покрывают почти все случаи.

Текстовый формат даёт табуляцию как разделитель и \N как NULL. Он есть, но редко нужен: CSV удобнее и предсказуемее, а бинарный быстрее.

CSV — рабочий стандарт для интеграций. Пара нюансов:

  • HEADER true добавляет заголовок на выгрузке и пропускает первую строку на загрузке. Вход можно усилить HEADER MATCH, тогда заголовок обязан совпасть с реальными именами колонок и порядком.

  • Пустая строка и NULL различаются. Пустая строка это "", NULL это пусто без кавычек, либо строка из параметра NULL '...'.

  • Для явного управления кавычками и экранированием используем QUOTE, ESCAPE, а для принудительного заключения некоторых колонок FORCE_QUOTE (col1, col2).

Бинарный формат выгоден для потока Postgres → Postgres одной версии, например при переносе больших объёмов между кластерами в одной инфраструктуре. Версии должны совпадать, типы колонок совместимы, данные не перемещаем между архитектурами, где отличается порядок байтов.

Примеры CSV-выгрузки и загрузки с тонкостями:

-- выгружаем с принудительными кавычками для текстовых колонок
COPY public.users (id, email, country)
TO STDOUT WITH (FORMAT csv, HEADER true, FORCE_QUOTE (email, country));

-- грузим, требуя строгого совпадения заголовка
COPY public.users (id, email, country)
FROM STDIN WITH (FORMAT csv, HEADER match);

FREEZE: когда да, когда нет

COPY FROM ... WITH (FREEZE true) замораживает строки сразу, как после VACUUM FREEZE. Это ускоряет первичную загрузку новой или только что очищенной таблицы и уменьшает давление на автovacuum. Условия там строгие: таблица должна быть создана или опустошена в текущей транзакции, без открытых курсоров и конкурирующих снимков. На секционированных таблицах FREEZE сейчас не применяется.

Сценарий:

BEGIN;
CREATE TABLE public.import_users (LIKE public.users INCLUDING ALL);
COPY public.import_users FROM PROGRAM 'gzip -dc /data/users.csv.gz'
  WITH (FORMAT csv, HEADER true, FREEZE true);
ANALYZE public.import_users;
COMMIT;

WHERE прямо в COPY

На загрузке можно отбрасывать строки по условию.

COPY public.events (id, payload, created_at) FROM STDIN
  WITH (FORMAT csv, HEADER true)
  WHERE id IS NOT NULL AND created_at >= DATE '2025-01-01';

Количество исключённых строк будет видно в pg_stat_progress_copy.

Безопасность

Если используете COPY ... FROM/TO 'filename' или PROGRAM, нужны привилегии суперпользователя или членство в ролях pg_read_server_files, pg_write_server_files, pg_execute_server_program.

Это осознанное ограничение: сервер получает доступ к файловой системе и shell. В PROGRAM команда запускается через оболочку, поэтому недоверенный ввод встраивать нельзя. Для таких сценариев держите фиксированные строки, белые списки и внимательное экранирование.

Ещё два момента. Для COPY TO путь обязан быть абсолютным. Для COPY FROM это рекомендация, но лучше не полагаться на рабочую директорию кластера. И не забываем про права на таблицы: для COPY TO нужно право SELECT на колонки, для COPY FROM право INSERT.

Кодировки и форматы дат

Если файл не в кодировке клиента, задаем ENCODING '...' в COPY. Для переносимой выгрузки я перед выгрузкой ставим ISO-формат дат:

SET DateStyle TO ISO, YMD;
COPY (SELECT * FROM public.events WHERE created_at >= DATE '2025-01-01')
TO STDOUT WITH (FORMAT csv, HEADER true, ENCODING 'UTF8');

На CSV все символы значимы, включая пробелы. Если источник дополняет строки пробелами до фиксированной ширины, чистим файл до загрузки. В CSV значение может включать переносы строк, если оно в кавычках.

COPY из кода: Python, Go, Rust

Python, psycopg3

Нормальная потоковая работа делается через cursor.copy(...). COPY не поддерживает параметры, поэтому SQL-строку формируем сами из белого списка идентификаторов.

import psycopg
from psycopg.rows import dict_row

def copy_csv_to_table(dsn: str, table: str, csv_path: str):
    if table not in {"public_events", "public_users"}:
        raise ValueError("table not allowed")
    copy_sql = f"COPY {table} FROM STDIN WITH (FORMAT csv, HEADER true)"
    with psycopg.connect(dsn, autocommit=False) as conn:
        with conn.cursor(row_factory=dict_row) as cur, open(csv_path, "rb") as f:
            with cur.copy(copy_sql) as cp:
                while chunk := f.read(256 * 1024):
                    cp.write(chunk)
        conn.commit()

def copy_table_to_csv(dsn: str, table: str, out_path: str):
    if table not in {"public_events", "public_users"}:
        raise ValueError("table not allowed")
    copy_sql = f"COPY {table} TO STDOUT WITH (FORMAT csv, HEADER true)"
    with psycopg.connect(dsn, autocommit=True) as conn:
        with conn.cursor() as cur, open(out_path, "wb") as f:
            with cur.copy(copy_sql) as cp:
                for data in cp:
                    f.write(data)

def copy_between(dsn_src: str, dsn_dst: str, src_query: str, dst_table: str):
    # перенос Postgres → Postgres в бинарном формате при совпадающих версиях
    sql_out = f"COPY ({src_query}) TO STDOUT WITH (FORMAT binary)"
    sql_in  = f"COPY {dst_table} FROM STDIN WITH (FORMAT binary)"
    with psycopg.connect(dsn_src) as s, psycopg.connect(dsn_dst) as d:
        with s.cursor() as cs, d.cursor() as cd:
            with cs.copy(sql_out) as cp_out, cd.copy(sql_in) as cp_in:
                for data in cp_out:
                    cp_in.write(data)
        d.commit()

Замечания по безопасности: никаких пользовательских фрагментов в строке COPY, только whitelisting таблиц и колонок. Пакуем данные крупными блоками, на маленьких кусках будет лишняя системная возня.

Go, pgx

В pgx есть CopyFrom, который забирает источник строк и шлёт их по протоколу COPY. Это удобнее, чем самому форматировать CSV, и быстрее, чем INSERT.

package bulk

import (
    "context"
    "encoding/csv"
    "io"
    "strconv"

    "github.com/jackc/pgx/v5"
)

func CopyRows(ctx context.Context, conn *pgx.Conn, table string, columns []string, rows [][]any) (int64, error) {
    // columns вроде []string{"id", "payload", "created_at"}
    return conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(rows))
}

func CopyCSV(ctx context.Context, conn *pgx.Conn, table string, columns []string, r io.Reader) (int64, error) {
    dec := csv.NewReader(r)
    // пропускаем заголовок
    if _, err := dec.Read(); err != nil {
        return 0, err
    }
    const batch = 5000
    buf := make([][]any, 0, batch)
    var total int64

    flush := func() error {
        if len(buf) == 0 {
            return nil
        }
        n, err := conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(buf))
        total += n
        buf = buf[:0]
        return err
    }

    for {
        rec, err := dec.Read()
        if err == io.EOF {
            break
        }
        if err != nil {
            return total, err
        }
        id, _ := strconv.ParseInt(rec[0], 10, 64)
        row := []any{id, rec[1], rec[2]} // пример
        buf = append(buf, row)
        if len(buf) >= batch {
            if err := flush(); err != nil {
                return total, err
            }
        }
    }
    if err := flush(); err != nil {
        return total, err
    }
    return total, nil
}

Здесь не генерируем CSV сами. pgx упакует всё в протокол COPY.

Rust, tokio-postgres

У клиента есть copy_out и copy_in. Работает потоками, без промежуточных файлов.

use tokio_postgres::{NoTls, Error};
use futures_util::{StreamExt, SinkExt};
use bytes::Bytes;

pub async fn copy_out_csv(conn_str: &str, table: &str) -> Result<Vec<u8>, Error> {
    let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;
    tokio::spawn(async move { let _ = connection.await; });

    let sql = format!("COPY {} TO STDOUT WITH (FORMAT csv, HEADER true)", table);
    let mut stream = client.copy_out(&*sql).await?;
    let mut out = Vec::new();

    while let Some(chunk) = stream.next().await {
        out.extend_from_slice(&chunk?);
    }
    Ok(out)
}

pub async fn copy_in_csv(conn_str: &str, table: &str, data: &[u8]) -> Result<(), Error> {
    let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;
    tokio::spawn(async move { let _ = connection.await; });

    let sql = format!("COPY {} FROM STDIN WITH (FORMAT csv, HEADER true)", table);
    let mut sink = client.copy_in(&*sql).await?;
    sink.send(Bytes::from(data.to_vec())).await?;
    sink.close().await?;
    Ok(())
}

COPY закрывает большинство задач массовых загрузок и выгрузок в Postgres. Если использовать его с оглядкой на права, форматы, кодировки и план обслуживания таблицы, получаем стабильные и быстрые пайплайны. В спорных местах проще выдержать staging, в горячих работать потоками без диска, в инфраструктурных переносах просто использовать бинарный формат при совпадающих версиях.

Приглашаем вас пройти вступительное тестирование по теме «Базы данных». Тестирование поможет объективно оценить текущий уровень знаний и навыков, необходимых для эффективного усвоения материала курса.

Курс «Базы данных» подробно рассматривает современные подходы к работе с системами управления базами данных, включая эффективные методы загрузки и выгрузки данных, управление таблицами и транзакциями, а также практическое применение SQL. Приглашаем вас присоединиться к обучению, чтобы получить системные знания и навыки в этой области.

Комментарии (6)


  1. Akina
    11.08.2025 15:54

    Ещё достаточно важным моментом является то, что с помощью этой команды нельзя выполнять загрузку в таблицу, на которой определены триггеры BEFORE/AFTER INSERT. А вот INSTEAD OF INSERT допустим.

    По дефолту COPY FROM завершится на первой проблемной строке.

    И сразу возникает вопрос, сохраняются ли в таблице уже скопированные строки или нет. Или, если смотреть шире: транзакционна ли COPY? И вообще - является ли она DML?


    1. bolk
      11.08.2025 15:54

      Откуда в Постгресе нетранзакционная работа с данными?


      1. Akina
        11.08.2025 15:54

        Я не зря задал вопрос о судьбе записей, скопированных в таблицу до возникновения ошибки. Ваш встречный вопрос никак не помогает получить ответ.


    1. kmatveev
      11.08.2025 15:54

      Да, COPY транзакционна.


      1. Akina
        11.08.2025 15:54

        То есть при возникновении ошибки исполнения все ранее импортированные этой командой записи удаляются. Жаль, что это не описано явно - некоторые СУБД в аналогичных случаях ведут себя иначе.

        Спасибо.


        1. astentx
          11.08.2025 15:54

          copy поддерживает commit/rollback, так что судьбу импортированных записей можно решить самому, в том числе в случае удачного завершения (в отличие, к примеру, от ораклового sqlldr). В статье бы, конечно, стоило добавить описание работы "под капотом", чтобы было понятно, за счёт чего это достигается (что это не просто fast=true).