
Необходимость переноса данных из одной среды в другую — задача, с которой разработчики сталкиваются достаточно часто. Например, для отправки таблиц из прода в среды для тестирования. Вместе с тем, такая «перезаливка» таблиц нередко превращается в настоящий квест, по ходу которого нужно не только гарантировать сохранность данных, но и исключить ошибки, связанные с человеческим фактором. Поэтому лучшей практикой является автоматизация переноса.
Меня зовут Евгений Грибков. Я ведущий программист в центре технологий VK. В этой статье мы рассмотрим одно из возможных решений создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL.
Типовой алгоритм перезаливки таблиц в базе данных
Обычно в ситуациях, когда нужно перезалить данные в таблицах БД из одной среды в другую, применяют шаблонный алгоритм восстановления базы:
делают резервную копию с прода;
восстанавливают бэкап на отдельной изолированной, промежуточной среде;
производят обезличивание или изменение персональных данных;
создают бэкап с полученной базы данных;
восстанавливают резервную копию в нужные среды (например, для разработки и тестирования).
Обычно весь описанный алгоритм автоматизирован и работает по расписанию (например, раз в сутки ночью или раз в неделю на выходных).
Но у такого подхода есть существенный недостаток: каждый раз восстанавливать всю базу данных во все среды — весьма длительный и очень дорогой в плане занимаемого места процесс. Поэтому в алгоритме нередко предусматривают чистки как исторических данных до определенного момента времени (например, всё, что старее одного календарного года), так и по определённым критериям. Таким образом достигается уменьшение объема БД в десятки, а то и в сотни раз.
Также надо учитывать, что в средах может быть несколько БД, в которые надо периодически догружать данные. Причем желательно, чтобы была возможность делать это в моменты, когда БД не используется активно — например, раз в неделю ночью или каждую ночь.
Соответственно, в таких кейсах важна автоматизация, которая дает возможность перезаливать таблицы БД из одной среды в другую без ручного контроля и глобального вмешательства разработчиков.
Вариант реализации скрипта
Теперь перейдем от теории к практике. Рассмотрим один из вариантов создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL. При этом сразу оговоримся и примем условие, что обе БД на одном экземпляре СУБД — то есть, БД‑источник уже восстановлен в той же СУБД, в которой находится целевая БД.
Алгоритм работы такого скрипта будет следующим:
Отключить все ограничения.
Отключить все триггеры.
Для заданных таблиц сохранить все внешние ключи, после чего удалить их.
Произвести полную очистку заданных таблиц через команду
TRUNCATEс последующим их заполнением данными.Удалить битые данные.
Обновить статистики перезалитых выше таблиц.
Включить триггеры.
Восстановить внешние ключи.
Включить и перепроверить все ограничения.
Теперь приступим к реализации.
Для начала определим все необходимые переменные и временные таблицы
SET NOCOUNT OFF;
DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()),
@DBMaster VARCHAR(255) = 'БД-мастер',
@ERROR VARCHAR(MAX);
DECLARE @HistoryLimited bit = 1,
@table_name nvarchar(255),
@is_identity int = 0,
@stm nvarchar(max) = '',
@cols nvarchar(max) = '',
@IsNOTInsert bit,
@schema_name nvarchar(255),
@col_name_identity nvarchar(255),
@referencing_object nvarchar(255),
@referenced_object nvarchar(255),
@constraint_name nvarchar(255),
@referencing_columns nvarchar(max),
@referenced_columns nvarchar(max),
@rules nvarchar(max),
@key_cols nvarchar(max),
@StartMoment DATETIME2,
@FinishMoment DATETIME2,
@delete_referential_action INT,
@update_referential_action INT,
@max_row_insert INT = 100000,
@isClearTableFKs BIT = 1,
@RowCount BIGINT = 1,
@WhileDelCount INT = 0;
;
DECLARE @cnt TABLE (cnt BIGINT NOT NULL);
DROP TABLE IF EXISTS #tbl_res;
CREATE TABLE #tbl_res (
SchName NVARCHAR(255) NOT NULL,
TblName NVARCHAR(255) NOT NULL,
StartMoment DATETIME2 NOT NULL,
FinishMoment DATETIME2 NOT NULL,
Cnt BIGINT NOT NULL,
ErrorMsg NVARCHAR(MAX) NULL
);
Здесь определены переменные, которые будут использоваться далее в скрипте, а также табличная переменная, в которой будет записан итог работы перезаливки данных с таймингом.
Далее отключаем все ограничения в БД. Это можно сделать с помощью следующей команды: EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL"
Отключаем все триггеры БД
DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];'
+ CHAR(13)
, SCHEMA_NAME(b.[schema_id])
, OBJECT_NAME(t.parent_id)
, t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.object_id = t.parent_id
WHERE t.is_disabled = 0
AND t.type_desc = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
ORDER BY SCHEMA_NAME(b.[schema_id]) ASC,
OBJECT_NAME(t.parent_id) ASC;
OPEN r_cursor_trigg_off;
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
END
CLOSE r_cursor_trigg_off;
DEALLOCATE r_cursor_trigg_off;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
Далее собираем метаданные по таблицам, с которыми будем работать
DROP TABLE IF EXISTS #tbls;
CREATE TABLE #tbls (
[name] NVARCHAR(255) NOT NULL,
sch_name NVARCHAR(255) NOT NULL,
IsNOTInsert BIT NOT NULL
);
INSERT INTO #tbls (
[name],
sch_name,
IsNOTInsert
)
SELECT t.[name],
SCHEMA_NAME(t.[schema_id]) AS sch_name,
--задается правило, по которому определяем
--нужно ли после очистки наполнять данными таблицу или нет
--по умолчанию нужно (0-да, 1-нет)
0 AS IsNOTInsert
FROM sys.tables AS t
--в фильтре задаем какие таблицы брать в расчет
--(в нашем случае какие не брать в расчёт)
WHERE t.[name] NOT LIKE 'unused%'
AND t.[name] NOT LIKE 'removed%'
AND t.[name] NOT LIKE 'migrated%'
AND t.[name] NOT LIKE 'migration%'
AND t.[name] NOT LIKE 'sysdiag%'
AND t.[name] NOT LIKE 'test%'
AND t.[name] NOT LIKE 'tmp%'
AND t.[name] NOT LIKE '%_cache'
AND t.[name] NOT IN ('FKs');
Теперь соберем все внешние ключи полученных таблиц, сохраним их в таблице dbo.FKs, затем — удалим
IF NOT EXISTS (SELECT 1 FROM sys.tables AS t
WHERE t.[name]= 'FKs'
AND t.[schema_id] = SCHEMA_ID('dbo'))
BEGIN
CREATE TABLE dbo.FKs (
referencing_object NVARCHAR(255) NOT NULL,
constraint_column_id INT NOT NULL,
referencing_column_name NVARCHAR(255) NOT NULL,
referenced_object NVARCHAR(255) NOT NULL,
referenced_column_name NVARCHAR(255) NOT NULL,
constraint_name NVARCHAR(255) NOT NULL,
delete_referential_action INT NOT NULL,
update_referential_action INT NOT NULL
);
END
ELSE IF (@isClearTableFKs = 1)
BEGIN
TRUNCATE TABLE dbo.FKs;
END
INSERT INTO dbo.FKs (
referencing_object,
constraint_column_id,
referencing_column_name,
referenced_object,
referenced_column_name,
constraint_name,
delete_referential_action,
update_referential_action
)
SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].['
, OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object,
FK.constraint_column_id,
CONCAT('['
, COL_NAME(FK.parent_object_id, FK.parent_column_id)
, ']') AS referencing_column_name,
CONCAT('['
, SCHEMA_NAME(R.[schema_id]), '].['
, OBJECT_NAME(FK.referenced_object_id)
, ']') AS referenced_object,
CONCAT('['
, COL_NAME(FK.referenced_object_id, FK.referenced_column_id)
, ']') AS referenced_column_name,
CONCAT('['
, OBJECT_NAME(FK.constraint_object_id)
, ']') AS constraint_name,
FKK.delete_referential_action,
FKK.update_referential_action
FROM sys.foreign_key_columns AS FK
INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id]
= FK.constraint_object_id
INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id
INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id
WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0
WHERE t0.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'));
DELETE FROM trg
FROM dbo.FKs AS trg
WHERE NOT EXISTS (
SELECT 1
FROM #tbls AS src
WHERE trg.referencing_object = CONCAT('['
, src.sch_name, '].[', src.[name], ']')
OR trg.referenced_object = CONCAT('['
, src.sch_name, '].[', src.[name], ']')
)
DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name
FROM dbo.FKs AS t
WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id)
, ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_drop;
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
, ' DROP CONSTRAINT ', @constraint_name, ';');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name;
END
CLOSE r_cursor_fk_drop;
DEALLOCATE r_cursor_fk_drop;
Следом перейдем к фрагменту кода, который отвечает за очистку и наполнение данными выбранных ранее таблиц
DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.[name],
t.sch_name,
t.IsNOTInsert
FROM #tbls AS t
ORDER BY t.[name] ASC;
OPEN r_cursor;
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @cols = '';
SET @is_identity = 0;
SET @col_name_identity = NULL;
SET @stm = CONCAT('TRUNCATE TABLE ', @DB
, '.[', @schema_name, '].[', @table_name, ']');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
IF (@IsNOTInsert = 0)
BEGIN
SELECT @cols = @cols + CASE WHEN @cols = ''
THEN c.[name] ELSE ',' + c.name END,
@is_identity = @is_identity + c.is_identity,
@col_name_identity = CASE WHEN (c.is_identity = 1)
THEN c.[name]
ELSE @col_name_identity END
FROM sys.tables t,
sys.columns c
WHERE t.[object_id] = c.[object_id]
AND t.[name] = @table_name
AND c.is_computed = 0;
SET @stm = '';
IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT '
, @DB, '.[', @schema_name, '].[', @table_name, '] ON');
SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB
, '.[', @schema_name, '].[', @table_name
, '](', @cols, ') SELECT ', @cols
, ' FROM [',@DBMaster,'].['
, @schema_name, '].['
, @table_name, '] WITH(NOLOCK) OPTION(RECOMPILE)');
--здесь можно задать ограничение на наполнение данными
IF @HistoryLimited = 1
BEGIN
IF @table_name LIKE '%History'
SET @stm = CONCAT(@stm
, ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) ');
END
IF @is_identity > 0 SET @stm = CONCAT(@stm
, ' SET IDENTITY_INSERT ', @DB
, '.[', @schema_name, '].[', @table_name, '] OFF');
IF @is_identity > 0 SET @stm = CONCAT(@stm
, ' DBCC CHECKIDENT ("', @table_name, '")');
SET @StartMoment = SYSDATETIME();
SET @ERROR = NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
SET @FinishMoment = SYSDATETIME();
SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM '
, '[', @schema_name, '].[', @table_name, '] WITH (NOLOCK);');
DELETE FROM @cnt;
INSERT INTO @cnt (cnt)
EXEC sys.sp_executesql @stmt = @stm;
INSERT INTO #tbl_res (
SchName,
TblName,
StartMoment,
FinishMoment,
Cnt,
ErrorMsg
)
SELECT @schema_name,
@table_name,
@StartMoment,
@FinishMoment,
COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt,
@ERROR;
END
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
END
CLOSE r_cursor;
DEALLOCATE r_cursor;
Обычно после этого фрагмента производятся какие‑то еще необходимые манипуляции с данными. Например, добавляются нужные пользователи и роли с правами в соответствующие таблицы БД.
Затем производится удаление битых данных, то есть тех, по которым нет для записи из одной таблицы соответствующей записи в другой таблице по внешнему ключу
WHILE (@RowCount > 0)
BEGIN
SET @RowCount = 0;
SET @WhileDelCount += 1;
DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, '=src.', t.referenced_column_name, ')'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, ' IS NOT NULL)'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols
FROM dbo.FKs AS t
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_corr;
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object
,' AS trg WHERE ', @key_cols
, ' AND NOT EXISTS (SELECT 1 FROM ', @referenced_object,
' AS src WITH (NOLOCK) WHERE ', @rules, ');');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
SET @RowCount += @@ROWCOUNT;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
END
CLOSE r_cursor_fk_corr;
DEALLOCATE r_cursor_fk_corr;
END
PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);
Удаление неконсистентных данных происходит до тех пор, пока такие данные обнаруживаются. Это нужно, чтобы исключить ситуацию, когда была удалена запись из одной таблицы, но при этом была потеряна соответствующая связь в другой уже ранее обработанной таблице.
Далее обновляем статистики для рассматриваемых выше таблиц
DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT CONCAT('UPDATE STATISTICS ', @DB, '.['
, t.sch_name, '].[', t.[name], '] WITH FULLSCAN;') AS stm
FROM #tbls AS t;
OPEN r_cursor_stat;
FETCH NEXT FROM r_cursor_stat
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_stat
INTO @stm
END
CLOSE r_cursor_stat;
DEALLOCATE r_cursor_stat;
Теперь необходимо включить триггеры в БД
DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];'
+ CHAR(13), SCHEMA_NAME(b.[schema_id])
, OBJECT_NAME(t.parent_id), t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 1
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
OPEN r_cursor_trigg_on;
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
END
CLOSE r_cursor_trigg_on;
DEALLOCATE r_cursor_trigg_on;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
После этого восстанавливаем все ранее удаленные внешние ключи
DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (t.referencing_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referencing_columns,
STRING_AGG (t.referenced_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referenced_columns,
t.delete_referential_action,
t.update_referential_action
FROM dbo.FKs AS t
WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('[', OBJECT_NAME(FK.constraint_object_id)
, ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name,
t.delete_referential_action,
t.update_referential_action;
OPEN r_cursor_fk_recover;
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
,' WITH CHECK ADD CONSTRAINT ', @constraint_name,
' FOREIGN KEY(', @referencing_columns, ') REFERENCES '
, @referenced_object, ' (', @referenced_columns, ') '
, CASE
WHEN @delete_referential_action = 1
THEN 'ON DELETE CASCADE '
WHEN @delete_referential_action = 2
THEN 'ON DELETE SET NULL '
ELSE ''
END
, CASE
WHEN @update_referential_action = 1
THEN 'ON UPDATE CASCADE '
WHEN @update_referential_action = 2
THEN 'ON UPDATE SET NULL '
ELSE ''
END
, '; '
, 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT '
, @constraint_name, '; ');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
END
CLOSE r_cursor_fk_recover;
DEALLOCATE r_cursor_fk_recover;
В конце запускаем проверку всех ограничений, используя следующую команду:
EXEC sys.sp_msforeachtable @commAND1="PRINT '?'", @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";
После делаем вывод статистики работы перезаливки данных в таблицы:
SELECT t.SchName,
t.TblName,
t.Cnt,
DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec,
t.ErrorMsg
FROM #tbl_res AS t
ORDER BY t.SchName ASC, t.TblName ASC;
В итоге получаем полный скрипт
SET NOCOUNT OFF;
DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()),
@DBMaster VARCHAR(255) = 'БД-мастер',
@ERROR VARCHAR(MAX);
DECLARE @HistoryLimited bit = 1,
@table_name nvarchar(255),
@is_identity int = 0,
@stm nvarchar(max) = '',
@cols nvarchar(max) = '',
@IsNOTInsert bit,
@schema_name nvarchar(255),
@col_name_identity nvarchar(255),
@referencing_object nvarchar(255),
@referenced_object nvarchar(255),
@constraint_name nvarchar(255),
@referencing_columns nvarchar(max),
@referenced_columns nvarchar(max),
@rules nvarchar(max),
@key_cols nvarchar(max),
@StartMoment DATETIME2,
@FinishMoment DATETIME2,
@delete_referential_action INT,
@update_referential_action INT,
@max_row_insert INT = 100000,
@isClearTableFKs BIT = 1,
@RowCount BIGINT = 1,
@WhileDelCount INT = 0;
;
DECLARE @cnt TABLE (cnt BIGINT NOT NULL);
DROP TABLE IF EXISTS #tbl_res;
CREATE TABLE #tbl_res (
SchName NVARCHAR(255) NOT NULL,
TblName NVARCHAR(255) NOT NULL,
StartMoment DATETIME2 NOT NULL,
FinishMoment DATETIME2 NOT NULL,
Cnt BIGINT NOT NULL,
ErrorMsg NVARCHAR(MAX) NULL
);
EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL";
DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];'
+ CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id)
, t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.object_id = t.parent_id
WHERE t.is_disabled = 0
AND t.type_desc = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
ORDER BY SCHEMA_NAME(b.[schema_id]) ASC,
OBJECT_NAME(t.parent_id) ASC;
OPEN r_cursor_trigg_off;
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_off
INTO @stm;
END
CLOSE r_cursor_trigg_off;
DEALLOCATE r_cursor_trigg_off;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
DROP TABLE IF EXISTS #tbls;
CREATE TABLE #tbls (
[name] NVARCHAR(255) NOT NULL,
sch_name NVARCHAR(255) NOT NULL,
IsNOTInsert BIT NOT NULL
);
INSERT INTO #tbls (
[name],
sch_name,
IsNOTInsert
)
SELECT t.[name],
SCHEMA_NAME(t.[schema_id]) AS sch_name,
--задаётся правило, по которому определяем
--нужно ли после очистки наполнять данными таблицу или нет
--по умолчанию нужно (0-да, 1-нет)
0 AS IsNOTInsert
FROM sys.tables AS t
--в фильтре задаем какие таблицы брать в расчет
--(в нашем случае какие не брать в расчет)
WHERE t.[name] NOT LIKE 'unused%'
AND t.[name] NOT LIKE 'removed%'
AND t.[name] NOT LIKE 'migrated%'
AND t.[name] NOT LIKE 'migration%'
AND t.[name] NOT LIKE 'sysdiag%'
AND t.[name] NOT LIKE 'test%'
AND t.[name] NOT LIKE 'tmp%'
AND t.[name] NOT LIKE '%_cache'
AND t.[name] NOT IN ('FKs');
IF NOT EXISTS (SELECT 1 FROM sys.tables AS t
WHERE t.[name]= 'FKs' AND t.[schema_id] = SCHEMA_ID('dbo'))
BEGIN
CREATE TABLE dbo.FKs (
referencing_object NVARCHAR(255) NOT NULL,
constraint_column_id INT NOT NULL,
referencing_column_name NVARCHAR(255) NOT NULL,
referenced_object NVARCHAR(255) NOT NULL,
referenced_column_name NVARCHAR(255) NOT NULL,
constraint_name NVARCHAR(255) NOT NULL,
delete_referential_action INT NOT NULL,
update_referential_action INT NOT NULL
);
END
ELSE IF (@isClearTableFKs = 1)
BEGIN
TRUNCATE TABLE dbo.FKs;
END
INSERT INTO dbo.FKs (
referencing_object,
constraint_column_id,
referencing_column_name,
referenced_object,
referenced_column_name,
constraint_name,
delete_referential_action,
update_referential_action
)
SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].['
, OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object,
FK.constraint_column_id,
CONCAT('[', COL_NAME(FK.parent_object_id
, FK.parent_column_id), ']') AS referencing_column_name,
CONCAT('[', SCHEMA_NAME(R.[schema_id]), '].['
, OBJECT_NAME(FK.referenced_object_id), ']') AS referenced_object,
CONCAT('[', COL_NAME(FK.referenced_object_id
, FK.referenced_column_id), ']') AS referenced_column_name,
CONCAT('[', OBJECT_NAME(FK.constraint_object_id)
, ']') AS constraint_name,
FKK.delete_referential_action,
FKK.update_referential_action
FROM sys.foreign_key_columns AS FK
INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id]
= FK.constraint_object_id
INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id
INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id
WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0
WHERE t0.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'));
DELETE FROM trg
FROM dbo.FKs AS trg
WHERE NOT EXISTS (
SELECT 1
FROM #tbls AS src
WHERE trg.referencing_object = CONCAT('[', src.sch_name
, '].[', src.[name], ']')
OR trg.referenced_object = CONCAT('[', src.sch_name
, '].[', src.[name], ']')
)
DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name
FROM dbo.FKs AS t
WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_drop;
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
, ' DROP CONSTRAINT ', @constraint_name, ';');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_drop
INTO @referencing_object,
@referenced_object,
@constraint_name;
END
CLOSE r_cursor_fk_drop;
DEALLOCATE r_cursor_fk_drop;
DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.[name],
t.sch_name,
t.IsNOTInsert
FROM #tbls AS t
ORDER BY t.[name] ASC;
OPEN r_cursor;
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @cols = '';
SET @is_identity = 0;
SET @col_name_identity = NULL;
SET @stm = CONCAT('TRUNCATE TABLE ', @DB
, '.[', @schema_name, '].[', @table_name, ']');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
IF (@IsNOTInsert = 0)
BEGIN
SELECT @cols = @cols + CASE WHEN @cols = ''
THEN c.[name] ELSE ',' + c.name END,
@is_identity = @is_identity + c.is_identity,
@col_name_identity = CASE WHEN (c.is_identity = 1)
THEN c.[name] ELSE @col_name_identity END
FROM sys.tables t,
sys.columns c
WHERE t.[object_id] = c.[object_id]
AND t.[name] = @table_name
AND c.is_computed = 0;
SET @stm = '';
IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT '
, @DB, '.[', @schema_name, '].[', @table_name, '] ON');
SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB, '.['
, @schema_name, '].[', @table_name
, '](', @cols, ') SELECT ', @cols
, ' FROM [',@DBMaster,'].['
, @schema_name, '].['
, @table_name, '] WITH(NOLOCK) OPTION(RECOMPILE)');
--здесь можно задать ограничение на наполнение данными
IF @HistoryLimited = 1
BEGIN
IF @table_name LIKE '%History'
SET @stm = CONCAT(@stm
, ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) ');
END
IF @is_identity > 0 SET @stm = CONCAT(@stm, ' SET IDENTITY_INSERT '
, @DB, '.[', @schema_name, '].[', @table_name, '] OFF');
IF @is_identity > 0 SET @stm = CONCAT(@stm, ' DBCC CHECKIDENT ("'
, @table_name, '")');
SET @StartMoment = SYSDATETIME();
SET @ERROR = NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
SET @FinishMoment = SYSDATETIME();
SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM ', '[', @schema_name
, '].[', @table_name, '] WITH (NOLOCK);');
DELETE FROM @cnt;
INSERT INTO @cnt (cnt)
EXEC sys.sp_executesql @stmt = @stm;
INSERT INTO #tbl_res (
SchName,
TblName,
StartMoment,
FinishMoment,
Cnt,
ErrorMsg
)
SELECT @schema_name,
@table_name,
@StartMoment,
@FinishMoment,
COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt,
@ERROR;
END
FETCH NEXT FROM r_cursor
INTO @table_name,
@schema_name,
@IsNOTInsert;
END
CLOSE r_cursor;
DEALLOCATE r_cursor;
WHILE (@RowCount > 0)
BEGIN
SET @RowCount = 0;
SET @WhileDelCount += 1;
DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, '=src.', t.referenced_column_name, ')'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules,
STRING_AGG (CONCAT('(trg.', t.referencing_column_name
, ' IS NOT NULL)'), ' AND ')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols
FROM dbo.FKs AS t
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name;
OPEN r_cursor_fk_corr;
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object
,' AS trg WHERE ', @key_cols, ' AND NOT EXISTS (SELECT 1 FROM '
, @referenced_object,
' AS src WITH (NOLOCK) WHERE ', @rules, ');');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
SET @RowCount += @@ROWCOUNT;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_corr
INTO @referencing_object
, @referenced_object
, @constraint_name
, @rules
, @key_cols;
END
CLOSE r_cursor_fk_corr;
DEALLOCATE r_cursor_fk_corr;
END
PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);
DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[', t.sch_name, '].['
, t.[name], '] WITH FULLSCAN;') AS stm
FROM #tbls AS t;
OPEN r_cursor_stat;
FETCH NEXT FROM r_cursor_stat
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_stat
INTO @stm
END
CLOSE r_cursor_stat;
DEALLOCATE r_cursor_stat;
DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];'
+ CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id)
, t.[Name]) AS stm
FROM sys.triggers t
LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id
WHERE t.is_disabled = 1
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NOT NULL
OPEN r_cursor_trigg_on;
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_trigg_on
INTO @stm;
END
CLOSE r_cursor_trigg_on;
DEALLOCATE r_cursor_trigg_on;
SET @stm = '';
SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;'
+ CHAR(13), t.[Name])
FROM sys.triggers t
WHERE t.is_disabled = 0
AND t.[type_desc] = 'SQL_TRIGGER'
AND OBJECT_NAME(t.parent_id) IS NULL;
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR
SELECT t.referencing_object,
t.referenced_object,
t.constraint_name,
STRING_AGG (t.referencing_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referencing_columns,
STRING_AGG (t.referenced_column_name, ',')
WITHIN GROUP (ORDER BY t.constraint_column_id ASC)
AS referenced_columns,
t.delete_referential_action,
t.update_referential_action
FROM dbo.FKs AS t
WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK
WHERE t.constraint_name = CONCAT('['
, OBJECT_NAME(FK.constraint_object_id), ']'))
GROUP BY t.referencing_object,
t.referenced_object,
t.constraint_name,
t.delete_referential_action,
t.update_referential_action;
OPEN r_cursor_fk_recover;
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
WHILE @@FETCH_STATUS = 0
BEGIN
SET @stm = CONCAT('ALTER TABLE ', @referencing_object
,' WITH CHECK ADD CONSTRAINT ', @constraint_name,
' FOREIGN KEY(', @referencing_columns, ') REFERENCES '
, @referenced_object, ' (', @referenced_columns, ') '
, CASE
WHEN @delete_referential_action = 1
THEN 'ON DELETE CASCADE '
WHEN @delete_referential_action = 2
THEN 'ON DELETE SET NULL '
ELSE ''
END
, CASE
WHEN @update_referential_action = 1
THEN 'ON UPDATE CASCADE '
WHEN @update_referential_action = 2
THEN 'ON UPDATE SET NULL '
ELSE ''
END
, '; '
, 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT '
, @constraint_name, '; ');
PRINT @stm;
BEGIN TRY
BEGIN TRANSACTION;
EXEC sys.sp_executesql @stmt = @stm;
COMMIT;
END TRY
BEGIN CATCH
SET @ERROR = ERROR_MESSAGE();
PRINT @ERROR;
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
END CATCH
FETCH NEXT FROM r_cursor_fk_recover
INTO @referencing_object
, @referenced_object
, @constraint_name
, @referencing_columns
, @referenced_columns
, @delete_referential_action
, @update_referential_action;
END
CLOSE r_cursor_fk_recover;
DEALLOCATE r_cursor_fk_recover;
EXEC sys.sp_msforeachtable @commAND1="PRINT '?'"
, @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";
SELECT t.SchName,
t.TblName,
t.Cnt,
DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec,
t.ErrorMsg
FROM #tbl_res AS t
ORDER BY t.SchName ASC, t.TblName ASC;
Для проверки скрипта мы прогнали его на тестовой БД:

Скрипт выполнил перенос данных без ошибок.
Важно и то, что при объеме данных в БД‑источнике около 100 ГБ (размер таблиц в строках до 100 млн записей), время выполнения всего скрипта составило 18 — 19 минут, то есть достигается высокая скорость перезаливки таблиц.
Компоненты скрипта
Теперь немного о подробностях реализации. В приведённом скрипте используется целый стек системных объектов:
sys.sp_msforeachtable — недокументированная хранимая процедура в SQL Server, которая позволяет итеративно применять команду T‑SQL к каждой таблице в текущей базе данных;
sys.triggers — системный объект, который содержит информацию о триггерах в БД;
sys.tables — системный объект, который содержит информацию о таблицах в БД;
sys.sp_executesql — системная хранимая процедура для выполнения инструкции Transact‑SQL или пакета, в том числе, созданных динамически;
sys.foreign_key_columns — системный объект, который содержит информацию о составе внешних ключей;
sys.foreign_keys — системный объект, который содержит информацию о внешних ключах;
sys.columns — системный объект, содержащий информацию о колонках.
Вместо выводов
Реализация перезаливки таблиц в базе данных — довольно скрупулезная задача, в которой без автоматизации легко столкнуться с «подводными камнями» и нерациональным расходованием ресурсов. Поэтому автоматизация — must have для любой системы, где надо перегонять данные между средами. Предложенный скрипт — один из способов такой автоматизации.
Безусловно, описанное решение — не «серебряная пуля». Но наш опыт и проведенные тесты показали, что оно вполне эффективно и надежно справляется с переносом данных между средами.
Комментарии (5)

alan008
17.01.2025 22:26Не ожидал что VK использует MS SQL. Представляю стоимость лицензирования. Также не понятно, что с покупкой новых лицензий после 2022.

jobgemws Автор
17.01.2025 22:26Вы не поверите, но многие используют MS SQL в том числе крупные и международные компании.
Порой стоимость внедрения и поддержки открытых решений стоит дороже лицензии аналогичного платного продукта.
Лицензии можно приобрести вполне законно через 3-и лица.

alan008
17.01.2025 22:26Я то как раз поверю, т.к. мы сами его используем, но мы по сравнению с VK на неск. порядков мельче )
FlyingDutchman2
А зачем удалять внешние ключи? Их же можно временно отключить, а потом снова включить.
jobgemws Автор
Даже выключенные ключи не дадут сделать TRUNCATE, потому и удаляем.