Привет, Хабр.
Сегодня разбираем COPY в PostgreSQL. Это рабочая лошадка для массовой загрузки и выгрузки данных.
Что делает COPY и чем он отличается от INSERT
COPY
переносит данные между таблицей и файлом или потоками STDIN/STDOUT. Вариант COPY FROM
загружает, COPY TO
выгружает. Умеет в форматы text
, csv
, binary
.
Поддерживает параметры ON_ERROR
, FREEZE
, HEADER
и HEADER MATCH
, FORCE_*
, ENCODING
, WHERE
, а также запуск внешних программ через PROGRAM
. Это раза в два быстрее любого батчевого INSERT
при равных условиях и заметно проще в эксплуатации.
Минимальные конструкции, которыми пользуемся:
-- выгрузка таблицы в CSV в поток
COPY public.events TO STDOUT WITH (FORMAT csv, HEADER true);
-- загрузка из CSV из потока
COPY public.events FROM STDIN WITH (FORMAT csv, HEADER true);
-- загрузка из файла на сервере (нужны права)
COPY public.events FROM '/var/lib/postgresql/import.csv' WITH (FORMAT csv, HEADER true);
-- через внешнюю программу (распаковка на лету)
COPY public.events FROM PROGRAM 'gzip -dc /data/import.csv.gz' WITH (FORMAT csv, HEADER true);
-- выгрузка результата запроса, не всей таблицы
COPY (
SELECT id, payload, created_at
FROM public.events
WHERE created_at >= DATE '2025-01-01'
) TO STDOUT WITH (FORMAT csv, HEADER true);
COPY против \copy в psql
Важно разделять два мира. COPY ... FROM/TO 'filename'
и COPY ... PROGRAM
работают на стороне сервера и требуют специальных ролей. Файлы должны быть доступны именно серверу. Метакоманда \copy
из psql
это обёртка вокруг COPY ... FROM STDIN/TO STDOUT
, и файлы читаются/пишутся на стороне клиента. Если нет серверных прав или файл лежит у вас локально, то используем уже \copy
. Если нужно максимальное быстродействие и файлы уже на сервере,то тут уже нужен серверный COPY
.
Пример для psql
:
-- клиентская выгрузка в сжатый файл
\copy (SELECT * FROM public.events) TO PROGRAM 'gzip > /tmp/events.csv.gz' WITH (FORMAT csv, HEADER true)
-- клиентская загрузка
\copy public.events FROM '/home/user/import.csv' WITH (FORMAT csv, HEADER true)
Обработка ошибок при загрузке
По дефолту COPY FROM
завершится на первой проблемной строке. В ситуациях «загружаем всё, что валидно, остальное в карантин» помогает ON_ERROR ignore
:
COPY public.events FROM STDIN
WITH (
FORMAT csv,
HEADER true,
ON_ERROR ignore,
LOG_VERBOSITY verbose
);
При ignore
все некорректные строки будут пропущены, в конце придёт NOTICE
с количеством пропусков. С LOG_VERBOSITY verbose
сервер дополнительно пишет, какая строка и какая колонка упали на конверсии. Это удобно, но не превращаем это в норму на постоянной основе: для чистых пайплайнов лучше staging-таблицы с текстовыми колонками, грузить туда, а затем явной валидацией и приведение типов вносить в целевую структуру.
Пример staging-потока:
-- сырой слой
CREATE UNLOGGED TABLE staging.events_raw (
id_text text,
payload_text text,
created_at_text text
);
-- без ограничений, чтобы грузить максимально быстро
COPY staging.events_raw FROM STDIN WITH (FORMAT csv, HEADER true);
-- чистовой слой
INSERT INTO public.events (id, payload, created_at)
SELECT
id_text::bigint,
payload_text::jsonb,
created_at_text::timestamptz
FROM staging.events_raw
WHERE id_text ~ '^\d+$'
AND created_at_text IS NOT NULL
ON CONFLICT (id) DO UPDATE
SET payload = EXCLUDED.payload,
created_at = EXCLUDED.created_at;
Где смотреть прогресс
Во время работы COPY
сервер публикует состояние в pg_stat_progress_copy
.
SELECT pid, datname, relid::regclass AS relation, command, type, bytes_processed, bytes_total,
tuples_processed, tuples_excluded, error_count
FROM pg_stat_progress_copy;
tuples_excluded
растёт, если используется WHERE
и строки отбрасываются. error_count
полезен при ON_ERROR ignore
.
Форматы: когда какой
Три режима покрывают почти все случаи.
Текстовый формат даёт табуляцию как разделитель и \N
как NULL. Он есть, но редко нужен: CSV удобнее и предсказуемее, а бинарный быстрее.
CSV — рабочий стандарт для интеграций. Пара нюансов:
HEADER true
добавляет заголовок на выгрузке и пропускает первую строку на загрузке. Вход можно усилитьHEADER MATCH
, тогда заголовок обязан совпасть с реальными именами колонок и порядком.Пустая строка и NULL различаются. Пустая строка это
""
, NULL это пусто без кавычек, либо строка из параметраNULL '...'
.Для явного управления кавычками и экранированием используем
QUOTE
,ESCAPE
, а для принудительного заключения некоторых колонокFORCE_QUOTE (col1, col2)
.
Бинарный формат выгоден для потока Postgres → Postgres одной версии, например при переносе больших объёмов между кластерами в одной инфраструктуре. Версии должны совпадать, типы колонок совместимы, данные не перемещаем между архитектурами, где отличается порядок байтов.
Примеры CSV-выгрузки и загрузки с тонкостями:
-- выгружаем с принудительными кавычками для текстовых колонок
COPY public.users (id, email, country)
TO STDOUT WITH (FORMAT csv, HEADER true, FORCE_QUOTE (email, country));
-- грузим, требуя строгого совпадения заголовка
COPY public.users (id, email, country)
FROM STDIN WITH (FORMAT csv, HEADER match);
FREEZE: когда да, когда нет
COPY FROM ... WITH (FREEZE true)
замораживает строки сразу, как после VACUUM FREEZE
. Это ускоряет первичную загрузку новой или только что очищенной таблицы и уменьшает давление на автovacuum. Условия там строгие: таблица должна быть создана или опустошена в текущей транзакции, без открытых курсоров и конкурирующих снимков. На секционированных таблицах FREEZE
сейчас не применяется.
Сценарий:
BEGIN;
CREATE TABLE public.import_users (LIKE public.users INCLUDING ALL);
COPY public.import_users FROM PROGRAM 'gzip -dc /data/users.csv.gz'
WITH (FORMAT csv, HEADER true, FREEZE true);
ANALYZE public.import_users;
COMMIT;
WHERE прямо в COPY
На загрузке можно отбрасывать строки по условию.
COPY public.events (id, payload, created_at) FROM STDIN
WITH (FORMAT csv, HEADER true)
WHERE id IS NOT NULL AND created_at >= DATE '2025-01-01';
Количество исключённых строк будет видно в pg_stat_progress_copy
.
Безопасность
Если используете COPY ... FROM/TO 'filename'
или PROGRAM
, нужны привилегии суперпользователя или членство в ролях pg_read_server_files
, pg_write_server_files
, pg_execute_server_program
.
Это осознанное ограничение: сервер получает доступ к файловой системе и shell. В PROGRAM
команда запускается через оболочку, поэтому недоверенный ввод встраивать нельзя. Для таких сценариев держите фиксированные строки, белые списки и внимательное экранирование.
Ещё два момента. Для COPY TO
путь обязан быть абсолютным. Для COPY FROM
это рекомендация, но лучше не полагаться на рабочую директорию кластера. И не забываем про права на таблицы: для COPY TO
нужно право SELECT
на колонки, для COPY FROM
право INSERT
.
Кодировки и форматы дат
Если файл не в кодировке клиента, задаем ENCODING '...'
в COPY
. Для переносимой выгрузки я перед выгрузкой ставим ISO-формат дат:
SET DateStyle TO ISO, YMD;
COPY (SELECT * FROM public.events WHERE created_at >= DATE '2025-01-01')
TO STDOUT WITH (FORMAT csv, HEADER true, ENCODING 'UTF8');
На CSV все символы значимы, включая пробелы. Если источник дополняет строки пробелами до фиксированной ширины, чистим файл до загрузки. В CSV значение может включать переносы строк, если оно в кавычках.
COPY из кода: Python, Go, Rust
Python, psycopg3
Нормальная потоковая работа делается через cursor.copy(...)
. COPY не поддерживает параметры, поэтому SQL-строку формируем сами из белого списка идентификаторов.
import psycopg
from psycopg.rows import dict_row
def copy_csv_to_table(dsn: str, table: str, csv_path: str):
if table not in {"public_events", "public_users"}:
raise ValueError("table not allowed")
copy_sql = f"COPY {table} FROM STDIN WITH (FORMAT csv, HEADER true)"
with psycopg.connect(dsn, autocommit=False) as conn:
with conn.cursor(row_factory=dict_row) as cur, open(csv_path, "rb") as f:
with cur.copy(copy_sql) as cp:
while chunk := f.read(256 * 1024):
cp.write(chunk)
conn.commit()
def copy_table_to_csv(dsn: str, table: str, out_path: str):
if table not in {"public_events", "public_users"}:
raise ValueError("table not allowed")
copy_sql = f"COPY {table} TO STDOUT WITH (FORMAT csv, HEADER true)"
with psycopg.connect(dsn, autocommit=True) as conn:
with conn.cursor() as cur, open(out_path, "wb") as f:
with cur.copy(copy_sql) as cp:
for data in cp:
f.write(data)
def copy_between(dsn_src: str, dsn_dst: str, src_query: str, dst_table: str):
# перенос Postgres → Postgres в бинарном формате при совпадающих версиях
sql_out = f"COPY ({src_query}) TO STDOUT WITH (FORMAT binary)"
sql_in = f"COPY {dst_table} FROM STDIN WITH (FORMAT binary)"
with psycopg.connect(dsn_src) as s, psycopg.connect(dsn_dst) as d:
with s.cursor() as cs, d.cursor() as cd:
with cs.copy(sql_out) as cp_out, cd.copy(sql_in) as cp_in:
for data in cp_out:
cp_in.write(data)
d.commit()
Замечания по безопасности: никаких пользовательских фрагментов в строке COPY
, только whitelisting таблиц и колонок. Пакуем данные крупными блоками, на маленьких кусках будет лишняя системная возня.
Go, pgx
В pgx
есть CopyFrom
, который забирает источник строк и шлёт их по протоколу COPY. Это удобнее, чем самому форматировать CSV, и быстрее, чем INSERT
.
package bulk
import (
"context"
"encoding/csv"
"io"
"strconv"
"github.com/jackc/pgx/v5"
)
func CopyRows(ctx context.Context, conn *pgx.Conn, table string, columns []string, rows [][]any) (int64, error) {
// columns вроде []string{"id", "payload", "created_at"}
return conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(rows))
}
func CopyCSV(ctx context.Context, conn *pgx.Conn, table string, columns []string, r io.Reader) (int64, error) {
dec := csv.NewReader(r)
// пропускаем заголовок
if _, err := dec.Read(); err != nil {
return 0, err
}
const batch = 5000
buf := make([][]any, 0, batch)
var total int64
flush := func() error {
if len(buf) == 0 {
return nil
}
n, err := conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(buf))
total += n
buf = buf[:0]
return err
}
for {
rec, err := dec.Read()
if err == io.EOF {
break
}
if err != nil {
return total, err
}
id, _ := strconv.ParseInt(rec[0], 10, 64)
row := []any{id, rec[1], rec[2]} // пример
buf = append(buf, row)
if len(buf) >= batch {
if err := flush(); err != nil {
return total, err
}
}
}
if err := flush(); err != nil {
return total, err
}
return total, nil
}
Здесь не генерируем CSV сами. pgx
упакует всё в протокол COPY
.
Rust, tokio-postgres
У клиента есть copy_out
и copy_in
. Работает потоками, без промежуточных файлов.
use tokio_postgres::{NoTls, Error};
use futures_util::{StreamExt, SinkExt};
use bytes::Bytes;
pub async fn copy_out_csv(conn_str: &str, table: &str) -> Result<Vec<u8>, Error> {
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;
tokio::spawn(async move { let _ = connection.await; });
let sql = format!("COPY {} TO STDOUT WITH (FORMAT csv, HEADER true)", table);
let mut stream = client.copy_out(&*sql).await?;
let mut out = Vec::new();
while let Some(chunk) = stream.next().await {
out.extend_from_slice(&chunk?);
}
Ok(out)
}
pub async fn copy_in_csv(conn_str: &str, table: &str, data: &[u8]) -> Result<(), Error> {
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;
tokio::spawn(async move { let _ = connection.await; });
let sql = format!("COPY {} FROM STDIN WITH (FORMAT csv, HEADER true)", table);
let mut sink = client.copy_in(&*sql).await?;
sink.send(Bytes::from(data.to_vec())).await?;
sink.close().await?;
Ok(())
}
COPY
закрывает большинство задач массовых загрузок и выгрузок в Postgres. Если использовать его с оглядкой на права, форматы, кодировки и план обслуживания таблицы, получаем стабильные и быстрые пайплайны. В спорных местах проще выдержать staging, в горячих работать потоками без диска, в инфраструктурных переносах просто использовать бинарный формат при совпадающих версиях.
Приглашаем вас пройти вступительное тестирование по теме «Базы данных». Тестирование поможет объективно оценить текущий уровень знаний и навыков, необходимых для эффективного усвоения материала курса.
Курс «Базы данных» подробно рассматривает современные подходы к работе с системами управления базами данных, включая эффективные методы загрузки и выгрузки данных, управление таблицами и транзакциями, а также практическое применение SQL. Приглашаем вас присоединиться к обучению, чтобы получить системные знания и навыки в этой области.
Akina
Ещё достаточно важным моментом является то, что с помощью этой команды нельзя выполнять загрузку в таблицу, на которой определены триггеры BEFORE/AFTER INSERT. А вот INSTEAD OF INSERT допустим.
И сразу возникает вопрос, сохраняются ли в таблице уже скопированные строки или нет. Или, если смотреть шире: транзакционна ли COPY? И вообще - является ли она DML?
bolk
Откуда в Постгресе нетранзакционная работа с данными?
Akina
Я не зря задал вопрос о судьбе записей, скопированных в таблицу до возникновения ошибки. Ваш встречный вопрос никак не помогает получить ответ.
kmatveev
Да, COPY транзакционна.
Akina
То есть при возникновении ошибки исполнения все ранее импортированные этой командой записи удаляются. Жаль, что это не описано явно - некоторые СУБД в аналогичных случаях ведут себя иначе.
Спасибо.
astentx
copy
поддерживает commit/rollback, так что судьбу импортированных записей можно решить самому, в том числе в случае удачного завершения (в отличие, к примеру, от ораклового sqlldr). В статье бы, конечно, стоило добавить описание работы "под капотом", чтобы было понятно, за счёт чего это достигается (что это не просто fast=true).