/** * SQL Server Utility Objects Script for Databricks Lakeflow Connect * Version 1.5 * * This script creates versioned utility stored procedures that can be used * to automatically remediate common SQL Server setup issues for ingestion. * * FEATURES: * - Automatic table discovery with @Tables = 'ALL', 'SCHEMAS:Sales,HR', wildcards * - Smart CT/CDC selection based on primary key presence * - Table-level CT/CDC enablement and DDL support objects * - Multi-platform detection and optimization * - Idempotent * * Platform Support: * - On-premises SQL Server * - Azure SQL Database * - Azure SQL Managed Instance * - Amazon RDS for SQL Server * * CHANGE HISTORY: * Version 1.5 * - Fixed DDL audit trigger failure on Azure SQL Database by removing three-part name (db.dbo.table) from INSERT; Azure SQL does not support cross-database references inside triggers * - Fixed Azure SQL Database compatibility issue using dynamic SQL for RDS-specific operations * - Fixed backward compatibility with older DDL support objects that used different capture instance naming * - Fixed data loss during schema changes when a pre-existing (non-Lakeflow) capture instance is present: unread change data is now correctly merged into the new Lakeflow capture instance * * Version 1.4 * - Fixed issue with back-to-back schema changes causing replication loops * - Fixed Amazon RDS SQL Server platform support for CDC enablement * - Improved handling of capture instance management during schema evolution * - Capture instances are now created with lakeflow naming (lakeflow_schema_table_1) * for better identification and management * - Fixed variable case mismatch in ALTER TABLE trigger * * Version 1.3 * - DDL support objects for inline schema evolution * - Pre-existing capture instance preservation logic * - Lakeflow capture instance management * * Version 1.2 * - Multi-platform support enhancements * * Version 1.1 * - Initial versioned release * */ SET QUOTED_IDENTIFIER ON; SET ANSI_NULLS ON; BEGIN PRINT N'Starting Lakeflow Connect Utility Objects installation...'; PRINT N'Version: 1.5'; PRINT N'Catalog: ' + DB_NAME(); PRINT N'Executed by: ' + SUSER_NAME(); PRINT N'Date/Time: ' + CONVERT(VARCHAR, GETDATE(), 120); PRINT N''; -- Detect platform DECLARE @engineEdition INT = CAST(SERVERPROPERTY('EngineEdition') AS INT); DECLARE @serverName NVARCHAR(255) = @@SERVERNAME; DECLARE @currentPlatform NVARCHAR(50); IF @engineEdition = 5 SET @currentPlatform = 'AZURE_SQL_DATABASE'; ELSE IF @engineEdition = 8 SET @currentPlatform = 'AZURE_SQL_MANAGED_INSTANCE'; ELSE IF @serverName LIKE '%.rds.amazonaws.com' SET @currentPlatform = 'AMAZON_RDS'; ELSE IF @engineEdition IN (1, 2, 3, 4) SET @currentPlatform = 'ON_PREMISES'; ELSE SET @currentPlatform = 'UNKNOWN'; PRINT N'Detected platform: ' + @currentPlatform; -- Validate that current user has sufficient privileges IF (IS_ROLEMEMBER('db_owner') = 0) BEGIN RAISERROR ('User executing this script is not a ''db_owner'' role member. To execute this script, please use a user that is a member of the db_owner role.', 16, 1); RETURN; END -- Cleanup existing objects PRINT N'Cleaning up existing utility objects (all versions)...'; DECLARE @dropSql NVARCHAR(MAX) = ''; -- Drop lakeflowFixPermissions procedures SELECT @dropSql = @dropSql + 'DROP PROCEDURE dbo.[' + name + '];' + CHAR(13) FROM sys.procedures WHERE name = 'lakeflowFixPermissions'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing lakeflowFixPermissions procedures'; END -- Drop lakeflowSetupChangeTracking procedures SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP PROCEDURE dbo.[' + name + '];' + CHAR(13) FROM sys.procedures WHERE name = 'lakeflowSetupChangeTracking'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing lakeflowSetupChangeTracking procedures'; END -- Drop lakeflowSetupChangeDataCapture procedures SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP PROCEDURE dbo.[' + name + '];' + CHAR(13) FROM sys.procedures WHERE name = 'lakeflowSetupChangeDataCapture'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing lakeflowSetupChangeDataCapture procedures'; END -- Drop lakeflowDetectPlatform functions SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP FUNCTION dbo.[' + name + '];' + CHAR(13) FROM sys.objects WHERE name = 'lakeflowDetectPlatform' AND type = 'FN'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing lakeflowDetectPlatform functions'; END -- Drop lakeflowUtilityVersion functions SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP FUNCTION dbo.[' + name + '];' + CHAR(13) FROM sys.objects WHERE name LIKE 'lakeflowUtilityVersion_%_%' AND type = 'FN'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing lakeflowUtilityVersion functions'; END -- Drop older versions of change tracking DDL objects SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP TABLE [dbo].[' + name + '];' + CHAR(13) FROM sys.tables WHERE name LIKE 'lakeflowDdlAudit_%_%'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing DDL audit tables from all versions'; END SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP TRIGGER [' + name + '] ON DATABASE;' + CHAR(13) FROM sys.triggers WHERE name LIKE 'lakeflowDdlAuditTrigger_%_%' AND parent_class = 0; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing DDL audit triggers from all versions'; END -- Drop older versions of CDC objects SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP PROCEDURE dbo.[' + name + '];' + CHAR(13) FROM sys.procedures WHERE name LIKE 'lakeflowDisableOldCaptureInstance_%_%' OR name LIKE 'lakeflowMergeCaptureInstances_%_%' OR name LIKE 'lakeflowRefreshCaptureInstance_%_%'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing CDC procedures from all versions'; END SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP TABLE [dbo].[' + name + '];' + CHAR(13) FROM sys.tables WHERE name LIKE 'lakeflowCaptureInstanceInfo_%_%'; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing capture instance tables from all versions'; END SET @dropSql = ''; SELECT @dropSql = @dropSql + 'DROP TRIGGER [' + name + '] ON DATABASE;' + CHAR(13) FROM sys.triggers WHERE name LIKE 'lakeflowAlterTableTrigger_%_%' AND parent_class = 0; IF LEN(@dropSql) > 0 BEGIN EXEC sp_executesql @dropSql; PRINT N'Dropped existing ALTER TABLE triggers from all versions'; END PRINT N'Cleanup completed.'; PRINT N''; END -- Create versioned functions first (dependencies) PRINT N'Creating lakeflowDetectPlatform function...'; EXEC sp_executesql N' CREATE FUNCTION dbo.lakeflowDetectPlatform() RETURNS NVARCHAR(50) AS BEGIN DECLARE @engineEdition INT = CAST(SERVERPROPERTY(''EngineEdition'') AS INT); DECLARE @serverName NVARCHAR(255) = @@SERVERNAME; DECLARE @platform NVARCHAR(50); IF @engineEdition = 5 SET @platform = ''AZURE_SQL_DATABASE''; ELSE IF @engineEdition = 8 SET @platform = ''AZURE_SQL_MANAGED_INSTANCE''; ELSE IF @serverName LIKE ''%.rds.amazonaws.com'' SET @platform = ''AMAZON_RDS''; ELSE IF DB_ID(''msdb'') IS NOT NULL AND OBJECT_ID(''msdb.dbo.rds_cdc_enable_db'', ''P'') IS NOT NULL SET @platform = ''AMAZON_RDS''; ELSE IF @engineEdition IN (1, 2, 3, 4) SET @platform = ''ON_PREMISES''; ELSE SET @platform = ''UNKNOWN''; RETURN @platform; END'; PRINT N'Created lakeflowDetectPlatform function'; PRINT N'Creating lakeflowUtilityVersion_1_5 function...'; EXEC sp_executesql N' CREATE FUNCTION dbo.lakeflowUtilityVersion_1_5() RETURNS NVARCHAR(10) AS BEGIN RETURN ''1.5''; END'; PRINT N'Created lakeflowUtilityVersion_1_5 function'; -- Create lakeflowFixPermissions PRINT N'Creating lakeflowFixPermissions procedure...'; EXEC sp_executesql N' CREATE PROCEDURE dbo.lakeflowFixPermissions @User NVARCHAR(128), @Tables NVARCHAR(MAX) = NULL AS BEGIN SET NOCOUNT ON; DECLARE @DatabaseUser NVARCHAR(128) = @User; DECLARE @Platform NVARCHAR(50) = dbo.lakeflowDetectPlatform(); DECLARE @CatalogName NVARCHAR(128) = DB_NAME(); DECLARE @ErrorMessage NVARCHAR(4000); DECLARE @SQL NVARCHAR(MAX); DECLARE @CurrentObject NVARCHAR(255); -- Error codes and messages DECLARE @invalidModeErrorCode INT = 100000; DECLARE @insufficientUserPrivilegesCode INT = 100400; DECLARE @insufficientUserPrivilegesErrorMessage NVARCHAR(200); SET @insufficientUserPrivilegesErrorMessage = ''User executing this script is not a ''''db_owner'''' role member. To execute this script, please use a user that is.''; PRINT N''Starting permission fixes for: '' + @CatalogName; PRINT N''Platform: '' + @Platform; PRINT N''User: '' + @User; IF @Tables IS NOT NULL PRINT N''Tables parameter: '' + @Tables; BEGIN TRY -- Validate that current user is db_owner IF (IS_ROLEMEMBER(''db_owner'') = 0) BEGIN THROW @insufficientUserPrivilegesCode, @insufficientUserPrivilegesErrorMessage, 1; END -- User resolution IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = @User) BEGIN -- Check if user exists as database user SELECT @DatabaseUser = dp.name FROM sys.database_principals dp INNER JOIN sys.server_principals sp ON dp.sid = sp.sid WHERE sp.name = @User AND dp.type IN (''S'', ''U'', ''G'') AND dp.name NOT IN (''guest''); -- If still no database user found, warn and skip IF @DatabaseUser IS NULL OR @DatabaseUser = @User BEGIN PRINT N''⚠ Warning: User/Login ['' + @User + ''] not found as database user. Skipping permission grants.''; PRINT N'' To fix: CREATE USER ['' + @User + ''] FOR LOGIN ['' + @User + ''];''; RETURN; END ELSE BEGIN PRINT N''Server login ['' + @User + ''] maps to database user ['' + @DatabaseUser + ''].''; END END IF @DatabaseUser = ''dbo'' BEGIN PRINT N''Skipping permission grants (dbo already has all permissions).''; PRINT N''Permission setup completed for user: '' + @User; RETURN; END -- Grant SELECT permissions on required system views and tables DECLARE @SystemObjects TABLE (ObjectName NVARCHAR(255), IsServerScoped BIT); INSERT INTO @SystemObjects VALUES (''sys.objects'', 0), (''sys.schemas'', 0), (''sys.tables'', 0), (''sys.columns'', 0), (''sys.key_constraints'', 0), (''sys.foreign_keys'', 0), (''sys.check_constraints'', 0), (''sys.default_constraints'', 0), (''sys.triggers'', 0), (''sys.indexes'', 0), (''sys.index_columns'', 0), (''sys.fulltext_index_columns'', 0), (''sys.fulltext_indexes'', 0), (''sys.change_tracking_databases'', 1), (''sys.change_tracking_tables'', 0), (''cdc.change_tables'', 0), (''cdc.captured_columns'', 0), (''cdc.index_columns'', 0); -- Grant EXECUTE permissions on required system stored procedures (all server-scoped) DECLARE @SystemProcedures TABLE (ProcedureName NVARCHAR(255)); INSERT INTO @SystemProcedures VALUES (''sp_tables''), (''sp_columns_100''), (''sp_pkeys''), (''sp_statistics_100''); PRINT N''''; PRINT N''=== System Object Permissions ===''; DECLARE sys_cursor CURSOR FOR SELECT ObjectName FROM @SystemObjects WHERE IsServerScoped = 0 OR @Platform NOT IN (''AZURE_SQL_DATABASE''); OPEN sys_cursor; FETCH NEXT FROM sys_cursor INTO @CurrentObject; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY -- Check if object exists before trying to grant (helps with CDC objects) IF @CurrentObject LIKE ''cdc.%'' BEGIN IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name = ''cdc'') BEGIN PRINT N''ℹ Skipping '' + @CurrentObject + '' (CDC not enabled)''; FETCH NEXT FROM sys_cursor INTO @CurrentObject; CONTINUE; END END SET @SQL = ''GRANT SELECT ON '' + @CurrentObject + '' TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Granted SELECT on '' + @CurrentObject; END TRY BEGIN CATCH PRINT N''⚠ Could not grant SELECT on '' + @CurrentObject + '': '' + ERROR_MESSAGE(); END CATCH FETCH NEXT FROM sys_cursor INTO @CurrentObject; END CLOSE sys_cursor; DEALLOCATE sys_cursor; -- Grant EXECUTE permissions on system procedures (skip for Azure SQL Database - implicit access) IF @Platform NOT IN (''AZURE_SQL_DATABASE'') BEGIN DECLARE proc_cursor CURSOR FOR SELECT ProcedureName FROM @SystemProcedures; OPEN proc_cursor; FETCH NEXT FROM proc_cursor INTO @CurrentObject; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY SET @SQL = ''GRANT EXECUTE ON '' + @CurrentObject + '' TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Granted EXECUTE on '' + @CurrentObject; END TRY BEGIN CATCH PRINT N''⚠ Could not grant EXECUTE on '' + @CurrentObject + '': '' + ERROR_MESSAGE(); END CATCH FETCH NEXT FROM proc_cursor INTO @CurrentObject; END CLOSE proc_cursor; DEALLOCATE proc_cursor; END ELSE BEGIN PRINT N''ℹ Skipping system stored procedure permissions on Azure SQL Database''; PRINT N'' Database users have implicit EXECUTE access to system stored procedures''; END -- Handle table-specific permissions if @Tables parameter is provided IF @Tables IS NOT NULL BEGIN PRINT N''''; PRINT N''=== Table-Level SELECT Permissions ===''; DECLARE @TargetTables TABLE ( SchemaName NVARCHAR(128), TableName NVARCHAR(128), FullName NVARCHAR(261), ObjectId INT ); -- Table discovery logic IF @Tables = ''ALL'' BEGIN PRINT N''Discovering all user tables in database...''; INSERT INTO @TargetTables (SchemaName, TableName, FullName, ObjectId) SELECT s.name, t.name, QUOTENAME(s.name) + ''.'' + QUOTENAME(t.name), t.object_id FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.type = ''U'' AND s.name NOT IN (''sys'', ''information_schema'', ''cdc'', ''INFORMATION_SCHEMA'', ''guest''); END ELSE IF @Tables LIKE ''SCHEMAS:%'' BEGIN DECLARE @SchemaList NVARCHAR(MAX) = SUBSTRING(@Tables, 9, LEN(@Tables)); PRINT N''Discovering tables in schemas: '' + @SchemaList; DECLARE @SchemaXML XML; SET @SchemaXML = CAST('''' + REPLACE(@SchemaList, '','', '''') + '''' AS XML); INSERT INTO @TargetTables (SchemaName, TableName, FullName, ObjectId) SELECT s.name, t.name, QUOTENAME(s.name) + ''.'' + QUOTENAME(t.name), t.object_id FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.type = ''U'' AND s.name IN ( SELECT LTRIM(RTRIM(x.value(''(./text())[1]'', ''NVARCHAR(MAX)''))) FROM @SchemaXML.nodes(''/schema'') AS T(x) WHERE LTRIM(RTRIM(x.value(''(./text())[1]'', ''NVARCHAR(MAX)''))) != '''' ); END ELSE BEGIN PRINT N''Processing specified tables: '' + @Tables; DECLARE @TableList TABLE (FullTableName NVARCHAR(261)); INSERT INTO @TableList (FullTableName) SELECT LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) AS value FROM ( SELECT CAST('''' + REPLACE(@Tables, '','', '''') + '''' AS XML) AS Data ) AS A CROSS APPLY Data.nodes(''/M'') AS Split(a) WHERE LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) != ''''; INSERT INTO @TargetTables (SchemaName, TableName, FullName, ObjectId) SELECT s.name, t.name, QUOTENAME(s.name) + ''.'' + QUOTENAME(t.name), t.object_id FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id INNER JOIN @TableList tl ON (tl.FullTableName = s.name + ''.*'' OR tl.FullTableName = s.name + ''.'' + t.name OR (CHARINDEX(''.'', tl.FullTableName) = 0 AND tl.FullTableName = t.name AND s.name = ''dbo'')) WHERE t.type = ''U''; END -- Grant SELECT permissions on discovered tables DECLARE @ProcessedCount INT = 0, @ErrorCount INT = 0; DECLARE @CurrentSchema NVARCHAR(128), @CurrentTable NVARCHAR(128), @CurrentFullName NVARCHAR(261); DECLARE table_cursor CURSOR FOR SELECT SchemaName, TableName, FullName FROM @TargetTables ORDER BY SchemaName, TableName; OPEN table_cursor; FETCH NEXT FROM table_cursor INTO @CurrentSchema, @CurrentTable, @CurrentFullName; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY SET @SQL = N''GRANT SELECT ON '' + @CurrentFullName + '' TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Granted SELECT on '' + @CurrentFullName; SET @ProcessedCount = @ProcessedCount + 1; END TRY BEGIN CATCH PRINT N''✗ Error granting SELECT on '' + @CurrentFullName + '': '' + ERROR_MESSAGE(); SET @ErrorCount = @ErrorCount + 1; END CATCH FETCH NEXT FROM table_cursor INTO @CurrentSchema, @CurrentTable, @CurrentFullName; END CLOSE table_cursor; DEALLOCATE table_cursor; -- Summary for table permissions PRINT N''''; PRINT N''Table permission summary:''; PRINT N'' - Tables processed: '' + CAST(@ProcessedCount AS NVARCHAR(10)); PRINT N'' - Tables with errors: '' + CAST(@ErrorCount AS NVARCHAR(10)); END PRINT N''''; PRINT N''Permission fixes completed for user: '' + @User; -- Platform-specific guidance IF @Platform = ''AZURE_SQL_DATABASE'' BEGIN PRINT N''''; PRINT N''=== Azure SQL Database Platform Notes ===''; PRINT N''• System stored procedures: Accessible by default to database users (no grants needed)''; PRINT N''• Server-scoped catalog views: Limited access in Azure SQL Database''; PRINT N''• Consider granting db_datareader role for broader access''; PRINT N''• CDC objects are only available when CDC is enabled on the database''; PRINT N''''; PRINT N''=== Recommended Additional Access ===''; PRINT N''-- Grant broader database-level access for comprehensive permissions:''; PRINT N''USE ['' + @CatalogName + ''];''; PRINT N''ALTER ROLE db_datareader ADD MEMBER ['' + @DatabaseUser + ''];''; PRINT N''''; PRINT N''=== Server-Scoped Limitations ===''; PRINT N''• sys.change_tracking_databases: Requires server-level access (typically not available)''; PRINT N''• Most Azure SQL Database deployments cannot grant server-level permissions''; PRINT N''• Contact your Azure administrator if server-level access is specifically required''; END ELSE IF @Platform = ''AZURE_SQL_MANAGED_INSTANCE'' BEGIN PRINT N''''; PRINT N''=== Azure SQL Managed Instance Platform Notes ===''; PRINT N''• Most permissions granted successfully at database level''; PRINT N''• If server-scoped permissions are needed, connect to master:''; PRINT N''USE master;''; PRINT N''GRANT SELECT ON sys.change_tracking_databases TO ['' + @DatabaseUser + ''];''; END ELSE BEGIN PRINT N''''; PRINT N''=== Platform Notes ===''; PRINT N''• All permissions granted successfully at database level''; PRINT N''• No additional server-level configuration required''; END END TRY BEGIN CATCH SET @ErrorMessage = ''Error in lakeflowFixPermissions: '' + ERROR_MESSAGE(); PRINT @ErrorMessage; THROW; END CATCH END'; PRINT N'Created lakeflowFixPermissions procedure'; -- Create lakeflowSetupChangeTracking PRINT N'Creating lakeflowSetupChangeTracking procedure...'; EXEC sp_executesql N' CREATE PROCEDURE dbo.lakeflowSetupChangeTracking @Tables NVARCHAR(MAX) = NULL, @User NVARCHAR(128) = NULL, @Retention NVARCHAR(50) = ''2 DAYS'', @Mode NVARCHAR(10) = ''INSTALL'' AS BEGIN SET NOCOUNT ON; DECLARE @DatabaseUser NVARCHAR(128) = @User; DECLARE @Platform NVARCHAR(50) = dbo.lakeflowDetectPlatform(); DECLARE @CatalogName NVARCHAR(128) = DB_NAME(); DECLARE @ErrorMessage NVARCHAR(4000); DECLARE @SQL NVARCHAR(MAX); DECLARE @versionSuffix NVARCHAR(10) = ''_1_5''; DECLARE @ddlAuditTableName NVARCHAR(100) = ''lakeflowDdlAudit'' + @versionSuffix; DECLARE @ddlAuditTriggerName NVARCHAR(100) = ''lakeflowDdlAuditTrigger'' + @versionSuffix; -- Error codes and messages DECLARE @invalidModeErrorCode INT = 100000; DECLARE @invalidModeErrorMessage NVARCHAR(200); DECLARE @insufficientUserPrivilegesCode INT = 100400; DECLARE @insufficientUserPrivilegesErrorMessage NVARCHAR(200); SET @invalidModeErrorMessage = CONCAT(''Provided execution mode: '', @Mode, '', is not recognized. Allowed values are: INSTALL, CLEANUP''); SET @insufficientUserPrivilegesErrorMessage = ''User executing this script is not a ''''db_owner'''' role member. To execute this script, please use a user that is.''; PRINT N''Starting change tracking setup for: '' + @CatalogName; PRINT N''Platform: '' + @Platform; PRINT N''Mode: '' + @Mode; IF @Tables IS NOT NULL PRINT N''Tables: '' + @Tables; BEGIN TRY -- Validate execution mode IF (@Mode != ''INSTALL'' AND @Mode != ''CLEANUP'') BEGIN THROW @invalidModeErrorCode, @invalidModeErrorMessage, 1; END -- Validate that current user is db_owner IF (IS_ROLEMEMBER(''db_owner'') = 0) BEGIN THROW @insufficientUserPrivilegesCode, @insufficientUserPrivilegesErrorMessage, 1; END -- Cleanup legacy DDL support objects IF EXISTS (SELECT 1 FROM sys.triggers WHERE name = ''replicate_io_audit_ddl_trigger_1'' AND parent_class = 0) OR OBJECT_ID(''dbo.replicate_io_audit_ddl_1'', ''U'') IS NOT NULL OR OBJECT_ID(''dbo.replicate_io_audit_tbl_cons_1'', ''U'') IS NOT NULL OR OBJECT_ID(''dbo.replicate_io_audit_tbl_schema_1'', ''U'') IS NOT NULL OR EXISTS (SELECT 1 FROM sys.triggers WHERE name = ''alterTableTrigger_1'' AND parent_class = 0) OR OBJECT_ID(''dbo.disableOldCaptureInstance_1'', ''P'') IS NOT NULL OR OBJECT_ID(''dbo.refreshCaptureInstance_1'', ''P'') IS NOT NULL OR OBJECT_ID(''dbo.mergeCaptureInstance_1'', ''P'') IS NOT NULL OR OBJECT_ID(''dbo.captureInstanceTracker_1'', ''U'') IS NOT NULL BEGIN PRINT N''Cleaning up legacy DDL support objects...''; IF EXISTS (SELECT 1 FROM sys.triggers WHERE name = ''replicate_io_audit_ddl_trigger_1'' AND parent_class = 0) BEGIN EXEC(''DROP TRIGGER replicate_io_audit_ddl_trigger_1 ON DATABASE''); PRINT N''✓ Dropped legacy trigger: replicate_io_audit_ddl_trigger_1''; END IF OBJECT_ID(''dbo.replicate_io_audit_ddl_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.replicate_io_audit_ddl_1''); PRINT N''✓ Dropped legacy table: replicate_io_audit_ddl_1''; END IF OBJECT_ID(''dbo.replicate_io_audit_tbl_cons_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.replicate_io_audit_tbl_cons_1''); PRINT N''✓ Dropped legacy table: replicate_io_audit_tbl_cons_1''; END IF OBJECT_ID(''dbo.replicate_io_audit_tbl_schema_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.replicate_io_audit_tbl_schema_1''); PRINT N''✓ Dropped legacy table: replicate_io_audit_tbl_schema_1''; END IF EXISTS (SELECT name FROM sys.triggers WHERE name = ''alterTableTrigger_1'' AND type = ''TR'') BEGIN EXEC(''DROP TRIGGER alterTableTrigger_1 ON DATABASE''); PRINT N''✓ Dropped legacy trigger: alterTableTrigger_1''; END IF OBJECT_ID(''dbo.disableOldCaptureInstance_1'', ''P'') IS NOT NULL BEGIN EXEC(''DROP PROCEDURE dbo.disableOldCaptureInstance_1''); PRINT N''✓ Dropped legacy procedure: disableOldCaptureInstance_1''; END IF OBJECT_ID(''dbo.refreshCaptureInstance_1'', ''P'') IS NOT NULL BEGIN EXEC(''DROP PROCEDURE dbo.refreshCaptureInstance_1''); PRINT N''✓ Dropped legacy procedure: refreshCaptureInstance_1''; END IF OBJECT_ID(''dbo.mergeCaptureInstance_1'', ''P'') IS NOT NULL BEGIN EXEC(''DROP PROCEDURE dbo.mergeCaptureInstance_1''); PRINT N''✓ Dropped legacy procedure: mergeCaptureInstance_1''; END IF OBJECT_ID(''dbo.captureInstanceTracker_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.captureInstanceTracker_1''); PRINT N''✓ Dropped legacy table: captureInstanceTracker_1''; END PRINT N''Legacy DDL support objects cleanup completed''; END -- Cleanup mode: Remove DDL support objects IF @Mode = ''CLEANUP'' BEGIN PRINT N''Cleaning up CT DDL support objects...''; -- Drop DDL audit trigger IF EXISTS (SELECT 1 FROM sys.triggers WHERE name = @ddlAuditTriggerName AND parent_class = 0) BEGIN SET @SQL = N''DROP TRIGGER ['' + @ddlAuditTriggerName + ''] ON DATABASE''; EXEC sp_executesql @SQL; PRINT N''✓ Dropped trigger: '' + @ddlAuditTriggerName; END -- Drop DDL audit table IF OBJECT_ID(''dbo.'' + @ddlAuditTableName, ''U'') IS NOT NULL BEGIN SET @SQL = N''DROP TABLE [dbo].['' + @ddlAuditTableName + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Dropped table: '' + @ddlAuditTableName; END -- Pattern-based cleanup for any remaining CT objects across versions DECLARE @ctCleanupSql NVARCHAR(MAX) = ''''; -- Clean up any remaining DDL audit tables across versions (excluding current and legacy) SELECT @ctCleanupSql = @ctCleanupSql + ''DROP TABLE [dbo].['' + name + ''];'' + CHAR(13) FROM sys.tables WHERE name LIKE ''lakeflowDdlAudit_%_%'' AND name != @ddlAuditTableName; IF LEN(@ctCleanupSql) > 0 BEGIN EXEC sp_executesql @ctCleanupSql; PRINT N''✓ Cleaned up remaining DDL audit tables across versions''; END -- Clean up any remaining DDL audit triggers across versions (excluding current and legacy) SET @ctCleanupSql = ''''; SELECT @ctCleanupSql = @ctCleanupSql + ''DROP TRIGGER ['' + name + ''] ON DATABASE;'' + CHAR(13) FROM sys.triggers WHERE name LIKE ''lakeflowDdlAuditTrigger_%_%'' AND name != @ddlAuditTriggerName AND parent_class = 0; IF LEN(@ctCleanupSql) > 0 BEGIN EXEC sp_executesql @ctCleanupSql; PRINT N''✓ Cleaned up remaining DDL audit triggers across versions''; END PRINT N''CT DDL support objects cleanup completed''; RETURN; END -- Install mode continues here PRINT N''Setting up change tracking infrastructure...''; -- Check if change tracking is enabled at database level IF NOT EXISTS (SELECT 1 FROM sys.change_tracking_databases ctd INNER JOIN sys.databases d ON ctd.database_id = d.database_id WHERE d.name = DB_NAME()) BEGIN PRINT N''Enabling change tracking at database level...''; SET @SQL = N''ALTER DATABASE '' + QUOTENAME(@CatalogName) + '' SET CHANGE_TRACKING = ON (CHANGE_RETENTION = '' + @Retention + '', AUTO_CLEANUP = ON)''; EXEC sp_executesql @SQL; PRINT N''✓ Change tracking enabled at database level''; END ELSE BEGIN PRINT N''ℹ Change tracking already enabled at database level''; END -- Create DDL audit table if it does not exist IF OBJECT_ID(''dbo.'' + @ddlAuditTableName, ''U'') IS NULL BEGIN SET @SQL = N''CREATE TABLE [dbo].['' + @ddlAuditTableName + '']( [SERIAL_NUMBER] INT IDENTITY NOT NULL, [CURRENT_USER] NVARCHAR(128) NULL, [SCHEMA_NAME] NVARCHAR(128) NULL, [TABLE_NAME] NVARCHAR(128) NULL, [TYPE] NVARCHAR(30) NULL, [OPERATION_TYPE] NVARCHAR(30) NULL, [SQL_TXT] NVARCHAR(2000) NULL, [LOGICAL_POSITION] BIGINT NOT NULL, CONSTRAINT [replicantDdlAuditPrimaryKey_'' + @versionSuffix + ''] PRIMARY KEY ([SERIAL_NUMBER], [LOGICAL_POSITION]))''; EXEC sp_executesql @SQL; PRINT N''✓ Created DDL audit table: '' + @ddlAuditTableName; -- Enable change tracking on DDL audit table SET @SQL = N''ALTER TABLE [dbo].['' + @ddlAuditTableName + ''] ENABLE CHANGE_TRACKING''; EXEC sp_executesql @SQL; PRINT N''✓ Enabled change tracking on DDL audit table''; END -- Create DDL audit trigger IF NOT EXISTS (SELECT 1 FROM sys.triggers WHERE name = @ddlAuditTriggerName) BEGIN DECLARE @QuotedDbName NVARCHAR(255) = QUOTENAME(DB_NAME()); SET @SQL = N''CREATE TRIGGER ['' + @ddlAuditTriggerName + ''] ON DATABASE FOR ALTER_TABLE AS SET NOCOUNT ON; DECLARE @DbName NVARCHAR(255), @SchemaName NVARCHAR(max), @TableName NVARCHAR(255), @QuotedFullName NVARCHAR(max), @objectType NVARCHAR(30), @data XML, @changeVersion NVARCHAR(30), @operation NVARCHAR(30), @capturedSql NVARCHAR(2000), @isCTEnabledDBLevel bit, @isCTEnabledTableLevel bit, @isColumnAdd nvarchar(255), @isAlterColumn nvarchar(255), @isDropColumn nvarchar(255); SET @data = EVENTDATA(); SET @changeVersion = CHANGE_TRACKING_CURRENT_VERSION(); SET @DbName = DB_NAME(); SET @SchemaName = @data.value(''''(/EVENT_INSTANCE/SchemaName)[1]'''', ''''NVARCHAR(MAX)''''); SET @TableName = @data.value(''''(/EVENT_INSTANCE/ObjectName)[1]'''', ''''NVARCHAR(255)''''); SET @objectType = @data.value(''''(/EVENT_INSTANCE/ObjectType)[1]'''', ''''NVARCHAR(30)''''); SET @QuotedFullName = QUOTENAME(@SchemaName) + ''''.'''' + QUOTENAME(@TableName); SET @operation = @data.value(''''(/EVENT_INSTANCE/EventType)[1]'''', ''''NVARCHAR(30)''''); SET @capturedSql = @data.value(''''(/EVENT_INSTANCE/TSQLCommand/CommandText)[1]'''', ''''NVARCHAR(2000)''''); SET @isCTEnabledDBLevel = (SELECT COUNT(*) FROM sys.change_tracking_databases ctd INNER JOIN sys.databases d ON ctd.database_id = d.database_id WHERE d.name = @DbName); SET @isCTEnabledTableLevel = (SELECT COUNT(*) FROM sys.change_tracking_tables WHERE object_id = object_id(@QuotedFullName)); SET @isColumnAdd = @data.value(''''(/EVENT_INSTANCE/AlterTableActionList/Create)[1]'''', ''''NVARCHAR(255)''''); SET @isAlterColumn = @data.value(''''(/EVENT_INSTANCE/AlterTableActionList/Alter)[1]'''', ''''NVARCHAR(255)''''); SET @isDropColumn = @data.value(''''(/EVENT_INSTANCE/AlterTableActionList/Drop)[1]'''', ''''NVARCHAR(255)''''); IF ((@isCTEnabledDBLevel = 1 AND @isCTEnabledTableLevel = 1) AND ((@isColumnAdd IS NOT NULL) OR (@isAlterColumn IS NOT NULL) OR (@isDropColumn IS NOT NULL))) BEGIN INSERT INTO dbo.['' + @ddlAuditTableName + ''] ( [CURRENT_USER], [SCHEMA_NAME], [TABLE_NAME], [TYPE], [OPERATION_TYPE], [SQL_TXT], [LOGICAL_POSITION] ) VALUES ( SUSER_NAME(), @SchemaName, @TableName, @objectType, @operation, @capturedSql, @changeVersion ); END''; EXEC sp_executesql @SQL; PRINT N''✓ Created DDL audit trigger: '' + @ddlAuditTriggerName; END -- User resolution IF @User IS NOT NULL AND @User != '''' BEGIN -- Check if user exists as database user IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = @User) BEGIN -- Check if it is a server login and find its mapped database user SELECT @DatabaseUser = dp.name FROM sys.database_principals dp INNER JOIN sys.server_principals sp ON dp.sid = sp.sid WHERE sp.name = @User AND dp.type IN (''S'', ''U'', ''G'') AND dp.name NOT IN (''guest''); -- If still no database user found, warn IF @DatabaseUser IS NULL OR @DatabaseUser = @User BEGIN PRINT N''⚠ Warning: User/Login ['' + @User + ''] not found as database user. Skipping permission grants.''; PRINT N'' To fix: CREATE USER ['' + @User + ''] FOR LOGIN ['' + @User + ''];''; SET @DatabaseUser = NULL; END ELSE BEGIN PRINT N''Server login ['' + @User + ''] maps to database user ['' + @DatabaseUser + ''].''; END END IF @DatabaseUser = ''dbo'' BEGIN PRINT N''Skipping permission grants (dbo already has all permissions).''; SET @DatabaseUser = NULL; END END -- Grant permissions to user if specified IF @DatabaseUser IS NOT NULL BEGIN PRINT N''Granting permissions to user: '' + @DatabaseUser; -- Grant SELECT on DDL audit table SET @SQL = N''GRANT SELECT ON [dbo].['' + @ddlAuditTableName + ''] TO '' + QUOTENAME(@DatabaseUser); EXEC sp_executesql @SQL; PRINT N''✓ Granted SELECT on '' + @ddlAuditTableName + '' to '' + @DatabaseUser; -- Grant VIEW CHANGE TRACKING on DDL audit table SET @SQL = N''GRANT VIEW CHANGE TRACKING ON [dbo].['' + @ddlAuditTableName + ''] TO '' + QUOTENAME(@DatabaseUser); EXEC sp_executesql @SQL; PRINT N''✓ Granted VIEW CHANGE TRACKING on '' + @ddlAuditTableName + '' to '' + @DatabaseUser; -- Grant VIEW DEFINITION to see database-level triggers SET @SQL = N''GRANT VIEW DEFINITION TO '' + QUOTENAME(@DatabaseUser); EXEC sp_executesql @SQL; PRINT N''✓ Granted VIEW DEFINITION to '' + @DatabaseUser; END -- Process tables if specified IF @Tables IS NOT NULL BEGIN PRINT N''Processing tables for change tracking enablement...''; -- Declare variables for table processing DECLARE @TargetTables TABLE ( SchemaName NVARCHAR(128), TableName NVARCHAR(128), HasPrimaryKey BIT ); DECLARE @SkippedTables NVARCHAR(MAX) = ''''; DECLARE @SkippedTablesCount INT = 0; DECLARE @CurrentSchema NVARCHAR(128); DECLARE @CurrentTable NVARCHAR(128); DECLARE @ProcessedCount INT = 0; DECLARE @SkippedCount INT = 0; DECLARE @ErrorCount INT = 0; -- Parse table list and populate target tables IF @Tables = ''ALL'' BEGIN INSERT INTO @TargetTables (SchemaName, TableName, HasPrimaryKey) SELECT s.name, t.name, CASE WHEN EXISTS ( SELECT 1 FROM sys.key_constraints kc WHERE kc.parent_object_id = t.object_id AND kc.type = ''PK'' ) THEN 1 ELSE 0 END FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.is_ms_shipped = 0; END ELSE IF @Tables LIKE ''SCHEMAS:%'' BEGIN DECLARE @SchemaList NVARCHAR(MAX) = SUBSTRING(@Tables, 9, LEN(@Tables)); INSERT INTO @TargetTables (SchemaName, TableName, HasPrimaryKey) SELECT s.name, t.name, CASE WHEN pk.CONSTRAINT_NAME IS NOT NULL THEN 1 ELSE 0 END FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS pk ON pk.TABLE_SCHEMA = s.name AND pk.TABLE_NAME = t.name AND pk.CONSTRAINT_TYPE = ''PRIMARY KEY'' WHERE t.type = ''U'' AND s.name IN (SELECT LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) AS value FROM ( SELECT CAST('''' + REPLACE(@SchemaList, '','', '''') + '''' AS XML) AS Data ) AS A CROSS APPLY Data.nodes(''/M'') AS Split(a) WHERE LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) != ''''); END ELSE BEGIN PRINT N''Processing specified tables: '' + @Tables; DECLARE @TableList TABLE (FullTableName NVARCHAR(261)); INSERT INTO @TableList (FullTableName) SELECT LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) AS value FROM ( SELECT CAST('''' + REPLACE(@Tables, '','', '''') + '''' AS XML) AS Data ) AS A CROSS APPLY Data.nodes(''/M'') AS Split(a) WHERE LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) != ''''; INSERT INTO @TargetTables (SchemaName, TableName, HasPrimaryKey) SELECT s.name, t.name, CASE WHEN pk.CONSTRAINT_NAME IS NOT NULL THEN 1 ELSE 0 END FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id INNER JOIN @TableList tl ON (tl.FullTableName = s.name + ''.*'' OR tl.FullTableName = s.name + ''.'' + t.name OR (CHARINDEX(''.'', tl.FullTableName) = 0 AND tl.FullTableName = t.name AND s.name = ''dbo'')) LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS pk ON pk.TABLE_SCHEMA = s.name AND pk.TABLE_NAME = t.name AND pk.CONSTRAINT_TYPE = ''PRIMARY KEY'' WHERE t.type = ''U''; END -- Check for tables without primary keys SELECT @SkippedTables = COALESCE(@SkippedTables + '','', '''') + QUOTENAME(SchemaName) + ''.'' + QUOTENAME(TableName) FROM @TargetTables WHERE HasPrimaryKey = 0; SELECT @SkippedTablesCount = COUNT(*) FROM @TargetTables WHERE HasPrimaryKey = 0; IF @SkippedTablesCount > 0 BEGIN DECLARE @SkippedTableWord NVARCHAR(10) = CASE WHEN @SkippedTablesCount = 1 THEN ''table'' ELSE ''tables'' END; PRINT N''⚠ WARNING: Skipping '' + CAST(@SkippedTablesCount AS NVARCHAR(10)) + '' '' + @SkippedTableWord + '' without primary keys:''; PRINT N'' '' + @SkippedTables; PRINT N'' Consider using lakeflowSetupChangeDataCapture for these tables.''; DELETE FROM @TargetTables WHERE HasPrimaryKey = 0; END -- Process each table for change tracking enablement DECLARE table_cursor CURSOR FOR SELECT SchemaName, TableName FROM @TargetTables ORDER BY SchemaName, TableName; OPEN table_cursor; FETCH NEXT FROM table_cursor INTO @CurrentSchema, @CurrentTable; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY IF NOT EXISTS ( SELECT 1 FROM sys.change_tracking_tables ct INNER JOIN sys.tables t ON ct.object_id = t.object_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE s.name = @CurrentSchema AND t.name = @CurrentTable ) BEGIN SET @SQL = N''ALTER TABLE '' + QUOTENAME(@CurrentSchema) + ''.'' + QUOTENAME(@CurrentTable) + '' ENABLE CHANGE_TRACKING''; EXEC sp_executesql @SQL; PRINT N''✓ Enabled change tracking on ['' + @CurrentSchema + ''].['' + @CurrentTable + '']''; SET @ProcessedCount = @ProcessedCount + 1; END ELSE BEGIN PRINT N''ℹ Change tracking already enabled on ['' + @CurrentSchema + ''].['' + @CurrentTable + '']''; SET @SkippedCount = @SkippedCount + 1; END END TRY BEGIN CATCH PRINT N''✗ Error enabling change tracking on ['' + @CurrentSchema + ''].['' + @CurrentTable + '']: '' + ERROR_MESSAGE(); SET @ErrorCount = @ErrorCount + 1; END CATCH FETCH NEXT FROM table_cursor INTO @CurrentSchema, @CurrentTable; END CLOSE table_cursor; DEALLOCATE table_cursor; -- Grant VIEW CHANGE TRACKING permissions to user (if @User is specified) IF @DatabaseUser IS NOT NULL BEGIN PRINT N''''; PRINT N''=== Granting VIEW CHANGE TRACKING Permissions ===''; DECLARE @PermissionGrantCount INT = 0, @PermissionErrorCount INT = 0; -- Strategy based on @Tables parameter IF @Tables = ''ALL'' BEGIN -- Grant on all user tables with change tracking enabled PRINT N''Granting VIEW CHANGE TRACKING on all change tracking enabled tables...''; DECLARE @CTSchema NVARCHAR(128), @CTTable NVARCHAR(128); DECLARE ct_cursor CURSOR FOR SELECT s.name, t.name FROM sys.change_tracking_tables ct INNER JOIN sys.tables t ON ct.object_id = t.object_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE s.name NOT IN (''sys'', ''information_schema'', ''cdc'', ''INFORMATION_SCHEMA'', ''guest'') ORDER BY s.name, t.name; OPEN ct_cursor; FETCH NEXT FROM ct_cursor INTO @CTSchema, @CTTable; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY SET @SQL = N''GRANT VIEW CHANGE TRACKING ON ['' + @CTSchema + ''].['' + @CTTable + ''] TO '' + QUOTENAME(@DatabaseUser); EXEC sp_executesql @SQL; PRINT N'' ✓ Granted VIEW CHANGE TRACKING on ['' + @CTSchema + ''].['' + @CTTable + '']''; SET @PermissionGrantCount = @PermissionGrantCount + 1; END TRY BEGIN CATCH PRINT N'' ⚠ Could not grant VIEW CHANGE TRACKING on ['' + @CTSchema + ''].['' + @CTTable + '']: '' + ERROR_MESSAGE(); SET @PermissionErrorCount = @PermissionErrorCount + 1; END CATCH FETCH NEXT FROM ct_cursor INTO @CTSchema, @CTTable; END CLOSE ct_cursor; DEALLOCATE ct_cursor; END ELSE IF @Tables LIKE ''SCHEMAS:%'' BEGIN -- Grant on schema level for specified schemas DECLARE @SchemaListForPerms NVARCHAR(MAX) = SUBSTRING(@Tables, 9, LEN(@Tables)); PRINT N''Granting VIEW CHANGE TRACKING on schemas: '' + @SchemaListForPerms; -- Parse schema list and grant on each schema''''s CT-enabled tables DECLARE @Schema NVARCHAR(128); DECLARE @TempSchemasForPerms NVARCHAR(MAX) = @SchemaListForPerms; WHILE LEN(@TempSchemasForPerms) > 0 BEGIN DECLARE @SchemaPosPerm INT = CHARINDEX('','', @TempSchemasForPerms); IF @SchemaPosPerm = 0 BEGIN SET @Schema = LTRIM(RTRIM(@TempSchemasForPerms)); SET @TempSchemasForPerms = N''''; END ELSE BEGIN SET @Schema = LTRIM(RTRIM(LEFT(@TempSchemasForPerms, @SchemaPosPerm - 1))); SET @TempSchemasForPerms = SUBSTRING(@TempSchemasForPerms, @SchemaPosPerm + 1, LEN(@TempSchemasForPerms)); END IF LEN(@Schema) > 0 BEGIN -- Grant on all CT-enabled tables in this schema DECLARE schema_ct_cursor CURSOR FOR SELECT t.name FROM sys.change_tracking_tables ct INNER JOIN sys.tables t ON ct.object_id = t.object_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE s.name = @Schema ORDER BY t.name; OPEN schema_ct_cursor; FETCH NEXT FROM schema_ct_cursor INTO @CTTable; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY SET @SQL = N''GRANT VIEW CHANGE TRACKING ON ['' + @Schema + ''].['' + @CTTable + ''] TO '' + QUOTENAME(@DatabaseUser); EXEC sp_executesql @SQL; PRINT N'' ✓ Granted VIEW CHANGE TRACKING on ['' + @Schema + ''].['' + @CTTable + '']''; SET @PermissionGrantCount = @PermissionGrantCount + 1; END TRY BEGIN CATCH PRINT N'' ⚠ Could not grant VIEW CHANGE TRACKING on ['' + @Schema + ''].['' + @CTTable + '']: '' + ERROR_MESSAGE(); SET @PermissionErrorCount = @PermissionErrorCount + 1; END CATCH FETCH NEXT FROM schema_ct_cursor INTO @CTTable; END CLOSE schema_ct_cursor; DEALLOCATE schema_ct_cursor; END END END ELSE BEGIN -- Grant on specific tables listed in @Tables PRINT N''Granting VIEW CHANGE TRACKING on specified tables...''; -- Use the same @TargetTables that were processed for CT enablement DECLARE specific_ct_cursor CURSOR FOR SELECT SchemaName, TableName FROM @TargetTables WHERE EXISTS ( SELECT 1 FROM sys.change_tracking_tables ct INNER JOIN sys.tables t ON ct.object_id = t.object_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE s.name = SchemaName AND t.name = TableName ) ORDER BY SchemaName, TableName; OPEN specific_ct_cursor; FETCH NEXT FROM specific_ct_cursor INTO @CTSchema, @CTTable; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY SET @SQL = N''GRANT VIEW CHANGE TRACKING ON ['' + @CTSchema + ''].['' + @CTTable + ''] TO '' + QUOTENAME(@DatabaseUser); EXEC sp_executesql @SQL; PRINT N'' ✓ Granted VIEW CHANGE TRACKING on ['' + @CTSchema + ''].['' + @CTTable + '']''; SET @PermissionGrantCount = @PermissionGrantCount + 1; END TRY BEGIN CATCH PRINT N'' ⚠ Could not grant VIEW CHANGE TRACKING on ['' + @CTSchema + ''].['' + @CTTable + '']: '' + ERROR_MESSAGE(); SET @PermissionErrorCount = @PermissionErrorCount + 1; END CATCH FETCH NEXT FROM specific_ct_cursor INTO @CTSchema, @CTTable; END CLOSE specific_ct_cursor; DEALLOCATE specific_ct_cursor; END -- Permission grant summary report PRINT N''''; PRINT N''VIEW CHANGE TRACKING permission summary:''; PRINT N'' - Tables granted: '' + CAST(@PermissionGrantCount AS NVARCHAR(10)); PRINT N'' - Tables with permission errors: '' + CAST(@PermissionErrorCount AS NVARCHAR(10)); IF @PermissionGrantCount > 0 PRINT N''✓ VIEW CHANGE TRACKING permissions granted to user: '' + @DatabaseUser; END -- Final summary report PRINT N''''; PRINT N''CT setup summary:''; PRINT N'' - Tables processed: '' + CAST(@ProcessedCount AS NVARCHAR(10)); PRINT N'' - Tables already enabled: '' + CAST(@SkippedCount AS NVARCHAR(10)); PRINT N'' - Tables with processing errors: '' + CAST(@ErrorCount AS NVARCHAR(10)); PRINT N'' - Tables skipped (no PK): '' + CAST(@SkippedTablesCount AS NVARCHAR(10)); END PRINT N''Change tracking setup completed successfully''; END TRY BEGIN CATCH SET @ErrorMessage = ''Error in lakeflowSetupChangeTracking: '' + ERROR_MESSAGE(); PRINT @ErrorMessage; THROW; END CATCH END'; PRINT N'Created lakeflowSetupChangeTracking procedure'; -- Create lakeflowSetupChangeDataCapture PRINT N'Creating lakeflowSetupChangeDataCapture procedure...'; EXEC sp_executesql N' CREATE PROCEDURE dbo.lakeflowSetupChangeDataCapture @Tables NVARCHAR(MAX) = NULL, @User NVARCHAR(128) = NULL, @Mode NVARCHAR(10) = ''INSTALL'' AS BEGIN SET NOCOUNT ON; DECLARE @DatabaseUser NVARCHAR(128) = @User; DECLARE @Platform NVARCHAR(50) = dbo.lakeflowDetectPlatform(); DECLARE @CatalogName NVARCHAR(128) = DB_NAME(); DECLARE @ErrorMessage NVARCHAR(4000); DECLARE @SQL NVARCHAR(MAX); DECLARE @versionSuffix NVARCHAR(10) = ''_1_5''; DECLARE @captureInstanceTableName NVARCHAR(100) = ''lakeflowCaptureInstanceInfo'' + @versionSuffix; DECLARE @alterTableTriggerName NVARCHAR(100) = ''lakeflowAlterTableTrigger'' + @versionSuffix; DECLARE @disableOldCaptureInstanceProcName NVARCHAR(100) = ''lakeflowDisableOldCaptureInstance'' + @versionSuffix; DECLARE @mergeCaptureInstancesProcName NVARCHAR(100) = ''lakeflowMergeCaptureInstances'' + @versionSuffix; DECLARE @refreshCaptureInstanceProcName NVARCHAR(100) = ''lakeflowRefreshCaptureInstance'' + @versionSuffix; -- Error codes and messages DECLARE @invalidModeErrorCode INT = 100000; DECLARE @invalidModeErrorMessage NVARCHAR(200); DECLARE @insufficientUserPrivilegesCode INT = 100400; DECLARE @insufficientUserPrivilegesErrorMessage NVARCHAR(200); SET @invalidModeErrorMessage = CONCAT(''Provided execution mode: '', @Mode, '', is not recognized. Allowed values are: INSTALL, CLEANUP''); SET @insufficientUserPrivilegesErrorMessage = ''User executing this script is not a ''''db_owner'''' role member. To execute this script, please use a user that is.''; PRINT N''Starting Change Data Capture setup for: '' + @CatalogName; PRINT N''Platform: '' + @Platform; PRINT N''Mode: '' + @Mode; IF @Tables IS NOT NULL PRINT N''Tables: '' + @Tables; BEGIN TRY -- Validate execution mode IF (@Mode != ''INSTALL'' AND @Mode != ''CLEANUP'') BEGIN THROW @invalidModeErrorCode, @invalidModeErrorMessage, 1; END -- Validate that current user is db_owner IF (IS_ROLEMEMBER(''db_owner'') = 0) BEGIN THROW @insufficientUserPrivilegesCode, @insufficientUserPrivilegesErrorMessage, 1; END -- Cleanup legacy DDL support objects IF EXISTS (SELECT 1 FROM sys.triggers WHERE name = ''replicate_io_audit_ddl_trigger_1'' AND parent_class = 0) OR OBJECT_ID(''dbo.replicate_io_audit_ddl_1'', ''U'') IS NOT NULL OR OBJECT_ID(''dbo.replicate_io_audit_tbl_cons_1'', ''U'') IS NOT NULL OR OBJECT_ID(''dbo.replicate_io_audit_tbl_schema_1'', ''U'') IS NOT NULL OR EXISTS (SELECT 1 FROM sys.triggers WHERE name = ''alterTableTrigger_1'' AND parent_class = 0) OR OBJECT_ID(''dbo.disableOldCaptureInstance_1'', ''P'') IS NOT NULL OR OBJECT_ID(''dbo.refreshCaptureInstance_1'', ''P'') IS NOT NULL OR OBJECT_ID(''dbo.mergeCaptureInstance_1'', ''P'') IS NOT NULL OR OBJECT_ID(''dbo.captureInstanceTracker_1'', ''U'') IS NOT NULL BEGIN PRINT N''Cleaning up legacy DDL support objects...''; IF EXISTS (SELECT 1 FROM sys.triggers WHERE name = ''replicate_io_audit_ddl_trigger_1'' AND parent_class = 0) BEGIN EXEC(''DROP TRIGGER replicate_io_audit_ddl_trigger_1 ON DATABASE''); PRINT N''✓ Dropped legacy trigger: replicate_io_audit_ddl_trigger_1''; END IF OBJECT_ID(''dbo.replicate_io_audit_ddl_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.replicate_io_audit_ddl_1''); PRINT N''✓ Dropped legacy table: replicate_io_audit_ddl_1''; END IF OBJECT_ID(''dbo.replicate_io_audit_tbl_cons_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.replicate_io_audit_tbl_cons_1''); PRINT N''✓ Dropped legacy table: replicate_io_audit_tbl_cons_1''; END IF OBJECT_ID(''dbo.replicate_io_audit_tbl_schema_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.replicate_io_audit_tbl_schema_1''); PRINT N''✓ Dropped legacy table: replicate_io_audit_tbl_schema_1''; END IF EXISTS (SELECT name FROM sys.triggers WHERE name = ''alterTableTrigger_1'' AND type = ''TR'') BEGIN EXEC(''DROP TRIGGER alterTableTrigger_1 ON DATABASE''); PRINT N''✓ Dropped legacy trigger: alterTableTrigger_1''; END IF OBJECT_ID(''dbo.disableOldCaptureInstance_1'', ''P'') IS NOT NULL BEGIN EXEC(''DROP PROCEDURE dbo.disableOldCaptureInstance_1''); PRINT N''✓ Dropped legacy procedure: disableOldCaptureInstance_1''; END IF OBJECT_ID(''dbo.refreshCaptureInstance_1'', ''P'') IS NOT NULL BEGIN EXEC(''DROP PROCEDURE dbo.refreshCaptureInstance_1''); PRINT N''✓ Dropped legacy procedure: refreshCaptureInstance_1''; END IF OBJECT_ID(''dbo.mergeCaptureInstance_1'', ''P'') IS NOT NULL BEGIN EXEC(''DROP PROCEDURE dbo.mergeCaptureInstance_1''); PRINT N''✓ Dropped legacy procedure: mergeCaptureInstance_1''; END IF OBJECT_ID(''dbo.captureInstanceTracker_1'', ''U'') IS NOT NULL BEGIN EXEC(''DROP TABLE dbo.captureInstanceTracker_1''); PRINT N''✓ Dropped legacy table: captureInstanceTracker_1''; END PRINT N''Legacy DDL support objects cleanup completed''; END -- Cleanup mode: Remove DDL support objects IF @Mode = ''CLEANUP'' BEGIN PRINT N''Cleaning up CDC DDL support objects...''; -- Drop procedures IF OBJECT_ID(''dbo.'' + @refreshCaptureInstanceProcName, ''P'') IS NOT NULL BEGIN SET @SQL = N''DROP PROCEDURE [dbo].['' + @refreshCaptureInstanceProcName + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Dropped procedure: '' + @refreshCaptureInstanceProcName; END IF OBJECT_ID(''dbo.'' + @mergeCaptureInstancesProcName, ''P'') IS NOT NULL BEGIN SET @SQL = N''DROP PROCEDURE [dbo].['' + @mergeCaptureInstancesProcName + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Dropped procedure: '' + @mergeCaptureInstancesProcName; END IF OBJECT_ID(''dbo.'' + @disableOldCaptureInstanceProcName, ''P'') IS NOT NULL BEGIN SET @SQL = N''DROP PROCEDURE [dbo].['' + @disableOldCaptureInstanceProcName + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Dropped procedure: '' + @disableOldCaptureInstanceProcName; END -- Drop ALTER TABLE trigger IF EXISTS (SELECT 1 FROM sys.triggers WHERE name = @alterTableTriggerName AND parent_class = 0) BEGIN SET @SQL = N''DROP TRIGGER ['' + @alterTableTriggerName + ''] ON DATABASE''; EXEC sp_executesql @SQL; PRINT N''✓ Dropped ALTER TABLE trigger: '' + @alterTableTriggerName; END -- Drop capture instance table IF OBJECT_ID(''dbo.'' + @captureInstanceTableName, ''U'') IS NOT NULL BEGIN SET @SQL = N''DROP TABLE [dbo].['' + @captureInstanceTableName + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Dropped capture instance table: '' + @captureInstanceTableName; END -- Pattern-based cleanup for any remaining CDC objects across versions DECLARE @cdcCleanupSql NVARCHAR(MAX) = ''''; -- Clean up any remaining capture instance tables across versions SELECT @cdcCleanupSql = @cdcCleanupSql + ''DROP TABLE [dbo].['' + name + ''];'' + CHAR(13) FROM sys.tables WHERE name LIKE ''lakeflowCaptureInstanceInfo_%_%'' AND name != @captureInstanceTableName; IF LEN(@cdcCleanupSql) > 0 BEGIN EXEC sp_executesql @cdcCleanupSql; PRINT N''✓ Cleaned up remaining capture instance tables across versions''; END -- Clean up any remaining CDC procedures across versions SET @cdcCleanupSql = ''''; SELECT @cdcCleanupSql = @cdcCleanupSql + ''DROP PROCEDURE [dbo].['' + name + ''];'' + CHAR(13) FROM sys.procedures WHERE (name LIKE ''lakeflowDisableOldCaptureInstance_%_%'' AND name != @disableOldCaptureInstanceProcName) OR (name LIKE ''lakeflowMergeCaptureInstances_%_%'' AND name != @mergeCaptureInstancesProcName) OR (name LIKE ''lakeflowRefreshCaptureInstance_%_%'' AND name != @refreshCaptureInstanceProcName); IF LEN(@cdcCleanupSql) > 0 BEGIN EXEC sp_executesql @cdcCleanupSql; PRINT N''✓ Cleaned up remaining CDC procedures across versions''; END -- Clean up any remaining ALTER TABLE triggers across versions SET @cdcCleanupSql = ''''; SELECT @cdcCleanupSql = @cdcCleanupSql + ''DROP TRIGGER ['' + name + ''] ON DATABASE;'' + CHAR(13) FROM sys.triggers WHERE name LIKE ''lakeflowAlterTableTrigger_%_%'' AND name != @alterTableTriggerName AND parent_class = 0; IF LEN(@cdcCleanupSql) > 0 BEGIN EXEC sp_executesql @cdcCleanupSql; PRINT N''✓ Cleaned up remaining ALTER TABLE triggers across versions''; END PRINT N''CDC DDL support objects cleanup completed''; RETURN; END -- Install mode: Create/upgrade DDL support objects PRINT N''Installing/upgrading CDC DDL support objects...''; -- Enable CDC at database level if not already enabled IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = DB_NAME() AND is_cdc_enabled = 1) BEGIN PRINT N''Enabling Change Data Capture at database level...''; -- Use platform-specific CDC enablement procedure IF @Platform = ''AMAZON_RDS'' BEGIN DECLARE @currentDbName NVARCHAR(128) = DB_NAME(); DECLARE @rdsSql NVARCHAR(MAX); DECLARE @rdsCheckSql NVARCHAR(MAX); DECLARE @rdsExists INT; SET @rdsCheckSql = N''SELECT @rdsExists = CASE WHEN OBJECT_ID(''''msdb.dbo.rds_cdc_enable_db'''', ''''P'''') IS NOT NULL THEN 1 ELSE 0 END''; EXEC sp_executesql @rdsCheckSql, N''@rdsExists INT OUTPUT'', @rdsExists OUTPUT; IF @rdsExists = 0 BEGIN RAISERROR(''Platform detected as Amazon RDS but required procedure msdb.dbo.rds_cdc_enable_db does not exist'', 16, 1); ROLLBACK TRAN; RETURN; END SET @rdsSql = N''EXEC msdb.dbo.rds_cdc_enable_db @db_name = @dbName''; EXEC sp_executesql @rdsSql, N''@dbName NVARCHAR(128)'', @dbName = @currentDbName; END ELSE BEGIN EXEC sys.sp_cdc_enable_db; END PRINT N''✓ Change Data Capture enabled at database level''; END ELSE BEGIN PRINT N''ℹ Change Data Capture already enabled at database level''; END -- Create capture instance table IF OBJECT_ID(''dbo.'' + @captureInstanceTableName, ''U'') IS NULL BEGIN SET @SQL = N''CREATE TABLE [dbo].['' + @captureInstanceTableName + '']( [oldCaptureInstance] VARCHAR(MAX) NULL, [newCaptureInstance] VARCHAR(MAX) NULL, [schemaName] VARCHAR(100) NOT NULL, [tableName] VARCHAR(255) NOT NULL, [committedCursor] VARCHAR(MAX) NULL, [triggerReinit] BIT NULL, CONSTRAINT replicantCaptureInstanceInfoPrimaryKey PRIMARY KEY (schemaName, tableName) )''; EXEC sp_executesql @SQL; PRINT N''✓ Created capture instance table: '' + @captureInstanceTableName; END -- Create lakeflowDisableOldCaptureInstance procedure IF OBJECT_ID(''dbo.'' + @disableOldCaptureInstanceProcName, ''P'') IS NULL BEGIN SET @SQL = N''CREATE PROCEDURE [dbo].['' + @disableOldCaptureInstanceProcName + ''] @schemaName VARCHAR(MAX), @tableName VARCHAR(MAX) WITH EXECUTE AS OWNER AS SET NOCOUNT ON DECLARE @oldCaptureInstance NVARCHAR(MAX); BEGIN TRAN SET @oldCaptureInstance = (SELECT oldCaptureInstance FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName=@schemaName AND tableName=@tableName); -- Only disable capture instances that we own (lakeflow naming or old New_ naming) IF @oldCaptureInstance IS NOT NULL AND (@oldCaptureInstance LIKE ''''lakeflow[_]%[_][1-2]'''' OR @oldCaptureInstance LIKE ''''New[_]%[_]%'''') BEGIN EXEC sys.sp_cdc_disable_table @source_schema = @schemaName, @source_name = @tableName, @capture_instance = @oldCaptureInstance; END IF @oldCaptureInstance IS NOT NULL UPDATE dbo.['' + @captureInstanceTableName + ''] SET oldCaptureInstance=NULL WHERE schemaName=@schemaName AND tableName=@tableName; COMMIT TRAN''; EXEC sp_executesql @SQL; PRINT N''✓ Created procedure: '' + @disableOldCaptureInstanceProcName; END -- Create lakeflowMergeCaptureInstances procedure IF OBJECT_ID(''dbo.'' + @mergeCaptureInstancesProcName, ''P'') IS NULL BEGIN SET @SQL = N''CREATE PROCEDURE [dbo].['' + @mergeCaptureInstancesProcName + ''] @schemaName VARCHAR(MAX), @tableName VARCHAR(MAX) AS SET NOCOUNT ON BEGIN TRAN DECLARE @newCaptureInstanceFullPath NVARCHAR(MAX), @oldCaptureInstanceFullPath NVARCHAR(MAX), @columnList NVARCHAR(MAX), @columnListValues NVARCHAR(MAX), @oldCaptureInstanceName NVARCHAR(MAX), @newCaptureInstanceName NVARCHAR(MAX), @captureInstanceCount INT, @minLSN VARCHAR(MAX), @quotedFullTableName nvarchar(max), @mergeSQL NVARCHAR(MAX); SET @quotedFullTableName = QUOTENAME(@schemaName) + ''''.'''' + QUOTENAME(@tableName); SET @captureInstanceCount = (SELECT COUNT(*) FROM cdc.change_tables WHERE source_object_id = OBJECT_ID(@quotedFullTableName)); IF (@captureInstanceCount = 2) BEGIN SET @oldCaptureInstanceName = (SELECT oldCaptureInstance FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName = @schemaName and tableName = @tableName) + ''''_CT''''; SET @newCaptureInstanceName = (SELECT newCaptureInstance FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName = @schemaName and tableName = @tableName) + ''''_CT''''; SET @newCaptureInstanceFullPath = ''''[cdc].'''' + QUOTENAME(@newCaptureInstanceName); SET @oldCaptureInstanceFullPath = ''''[cdc].'''' + QUOTENAME(@oldCaptureInstanceName); SET @minLSN = (SELECT committedCursor FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName=@schemaName and tableName=@tableName); IF @minLSN is NULL OR @minLSN = '''''''' BEGIN SET @minLSN = ''''0x00000000000000000000'''' END SET @columnList = (SELECT STUFF((SELECT '''','''' + QUOTENAME(A.COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS A JOIN INFORMATION_SCHEMA.COLUMNS B ON A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA=''''cdc'''' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA=''''cdc'''' FOR XML PATH(''''''''), TYPE).value(''''.'''', ''''nvarchar(max)''''), 1, 1, '''''''')); SET @columnListValues = (SELECT STUFF((SELECT '''',source.'''' + QUOTENAME(A.COLUMN_NAME) FROM INFORMATION_SCHEMA.COLUMNS A JOIN INFORMATION_SCHEMA.COLUMNS B ON A.COLUMN_NAME=B.COLUMN_NAME AND A.DATA_TYPE=B.DATA_TYPE WHERE A.TABLE_NAME=@newCaptureInstanceName AND A.TABLE_SCHEMA=''''cdc'''' AND B.TABLE_NAME=@oldCaptureInstanceName AND B.TABLE_SCHEMA=''''cdc'''' FOR XML PATH(''''''''), TYPE).value(''''.'''', ''''nvarchar(max)''''), 1, 1, '''''''')); SET @mergeSQL = ''''MERGE '''' + @newCaptureInstanceFullPath + '''' AS target USING '''' + @oldCaptureInstanceFullPath + '''' AS source ON source.__$start_lsn = target.__$start_lsn AND source.__$seqval = target.__$seqval AND source.__$operation = target.__$operation WHEN NOT MATCHED AND source.__$start_lsn > '''' + @minLSN + '''' THEN INSERT ('''' + @columnList + '''') VALUES ('''' + @columnListValues + '''');''''; EXEC sp_executesql @mergeSQL; END COMMIT TRAN''; EXEC sp_executesql @SQL; PRINT N''✓ Created procedure: '' + @mergeCaptureInstancesProcName; END -- Create lakeflowRefreshCaptureInstance procedure IF OBJECT_ID(''dbo.'' + @refreshCaptureInstanceProcName, ''P'') IS NULL BEGIN SET @SQL = N''CREATE PROCEDURE [dbo].['' + @refreshCaptureInstanceProcName + ''] @schemaName NVARCHAR(MAX), @tableName NVARCHAR(MAX), @reinit INT = 0 WITH EXECUTE AS OWNER AS SET NOCOUNT ON BEGIN TRAN DECLARE @OldCaptureInstance NVARCHAR(MAX), @NewCaptureInstance NVARCHAR(MAX), @FileGroupName NVARCHAR(255), @SupportNetChanges BIT, @RoleName VARCHAR(255), @CaptureInstanceCount INT, @TriggerReinit INT, @SkipCaptureInstanceCreation INT, @LakeflowInstanceCount INT, @OldestLakeflowInstanceForReinit NVARCHAR(MAX), @BothLakeflowErrorMsg NVARCHAR(500), @LakeflowInstanceToDrop NVARCHAR(MAX), @CommittedCursor VARCHAR(MAX), @OldInstanceToTrack nvarchar(max), @QuotedFullName nvarchar(max); SET @QuotedFullName = QUOTENAME(@schemaName) + ''''.'''' + QUOTENAME(@tableName); SET @SkipCaptureInstanceCreation = 0; SET @TriggerReinit = 0; SET @CaptureInstanceCount = (SELECT COUNT(capture_instance) FROM cdc.change_tables WHERE source_object_id = object_id(@QuotedFullName)); IF (@CaptureInstanceCount = 2) BEGIN -- Genuine new DDL with 2 instances already: signal reinit SET @TriggerReinit = 1; -- Check if we have a lakeflow instance to drop SET @LakeflowInstanceCount = (SELECT COUNT(capture_instance) FROM cdc.change_tables WHERE source_object_id = object_id(@QuotedFullName) AND (capture_instance LIKE ''''lakeflow[_]%[_][1-2]'''' OR capture_instance LIKE ''''New[_]%[_]%'''')); IF (@LakeflowInstanceCount = 2) BEGIN IF (@reinit = 1) BEGIN SET @TriggerReinit = 0; -- During reinit, if we have 2 lakeflow instances, drop the oldest one SET @OldestLakeflowInstanceForReinit = ( SELECT TOP 1 capture_instance FROM cdc.change_tables WHERE source_object_id = object_id(@QuotedFullName) AND (capture_instance LIKE ''''lakeflow[_]%[_][1-2]'''' OR capture_instance LIKE ''''New[_]%[_]%'''') ORDER BY create_date ASC ); IF @OldestLakeflowInstanceForReinit IS NOT NULL BEGIN PRINT ''''Reinit recovery: Dropping oldest lakeflow instance '''''''''''' + @OldestLakeflowInstanceForReinit + '''''''''''' to free up a slot for table '''' + @QuotedFullName; EXEC sys.sp_cdc_disable_table @source_schema = @schemaName, @source_name = @tableName, @capture_instance = @OldestLakeflowInstanceForReinit; END END ELSE BEGIN -- During DDL refresh (not reinit), both instances are lakeflow instances -- Trigger reinit instead of dropping instances here to avoid race conditions -- Cleanup will be handled by Replicant code after reinit completes SET @TriggerReinit = 1; SET @SkipCaptureInstanceCreation = 1; PRINT ''''Both capture instance slots occupied by lakeflow instances. Triggering reinit for safe schema migration on table '''' + @QuotedFullName; END END ELSE IF (@LakeflowInstanceCount = 1) BEGIN -- One lakeflow instance and one pre-existing instance -- Drop lakeflow instance without merging to preserve pre-existing instance -- The extractor will handle reinitialization after the new instance is created -- Get the lakeflow instance to drop (oldest one) SET @LakeflowInstanceToDrop = ( SELECT TOP 1 capture_instance FROM cdc.change_tables WHERE source_object_id = object_id(@QuotedFullName) AND (capture_instance LIKE ''''lakeflow[_]%[_][1-2]'''' OR capture_instance LIKE ''''New[_]%[_]%'''') ORDER BY create_date ASC ); -- Drop it immediately to free up a slot IF @LakeflowInstanceToDrop IS NOT NULL EXEC sys.sp_cdc_disable_table @source_schema = @schemaName, @source_name = @tableName, @capture_instance = @LakeflowInstanceToDrop; -- Skip creating a new instance immediately - wait for reinit SET @SkipCaptureInstanceCreation = 1; END ELSE BEGIN -- Both slots are taken by non-lakeflow instances IF (@reinit = 1) BEGIN -- During reinit, we cannot proceed - raise clear error DECLARE @BothNonLakeflowErrorMsg NVARCHAR(500); SET @BothNonLakeflowErrorMsg = ''''Cannot create lakeflow capture instance for table '''' + @QuotedFullName + '''': both CDC slots are occupied by non-lakeflow instances. '''' + ''''Lakeflow requires at least one available slot. '''' + ''''Please drop one of the existing capture instances and retry the pipeline.''''; RAISERROR(@BothNonLakeflowErrorMsg, 16, 1); ROLLBACK TRAN; RETURN; END ELSE BEGIN -- During DDL refresh, trigger reinit to attempt recovery SET @TriggerReinit = 1; SET @SkipCaptureInstanceCreation = 1; PRINT ''''Both CDC slots occupied by non-lakeflow instances. Triggering reinit for table '''' + @QuotedFullName; END END END -- Get existing capture instance, preferring lakeflow instances that we own SET @OldCaptureInstance = ( select top 1 capture_instance from cdc.change_tables where source_object_id=OBJECT_ID(@QuotedFullName) AND (capture_instance LIKE ''''lakeflow[_]%[_][1-2]'''' OR capture_instance LIKE ''''New[_]%[_]%'''') order by create_date ASC ); -- If no lakeflow instance exists, get the oldest instance to use its settings -- (but we will not drop it since it does not have lakeflow prefix) IF @OldCaptureInstance IS NULL BEGIN SET @OldCaptureInstance = ( select top 1 capture_instance from cdc.change_tables where source_object_id=OBJECT_ID(@QuotedFullName) order by create_date ASC ); -- Warn about pre-existing non-lakeflow instance IF @OldCaptureInstance IS NOT NULL BEGIN DECLARE @PreExistingWarningMsg NVARCHAR(500); SET @PreExistingWarningMsg = ''''WARNING: Table '''' + @QuotedFullName + '''' has a pre-existing capture instance named '''''''''''' + @OldCaptureInstance + '''''''''''' that was not created by lakeflow. Lakeflow will preserve this instance and create its own instance alongside it. Settings (filegroup, role, supports_net_changes) will be copied from the pre-existing instance.''''; PRINT @PreExistingWarningMsg; END END SET @SupportNetChanges = (select top 1 supports_net_changes from cdc.change_tables where source_object_id=OBJECT_ID(@QuotedFullName) order by create_date ASC); SET @FileGroupName = (select top 1 filegroup_name from cdc.change_tables where source_object_id=OBJECT_ID(@QuotedFullName) order by create_date ASC); SET @RoleName = (select top 1 role_name from cdc.change_tables where source_object_id=OBJECT_ID(@QuotedFullName) order by create_date ASC); IF @OldCaptureInstance LIKE ''''lakeflow[_]%'''' BEGIN -- Toggle between _1 and _2 suffixes IF @OldCaptureInstance LIKE ''''%[_]1'''' BEGIN SET @NewCaptureInstance = ''''lakeflow_'''' + @schemaName + ''''_'''' + @tableName + ''''_2'''' END ELSE BEGIN SET @NewCaptureInstance = ''''lakeflow_'''' + @schemaName + ''''_'''' + @tableName + ''''_1'''' END END ELSE BEGIN -- First time or non-lakeflow instance: use lakeflow_schemaName_tableName_1 SET @NewCaptureInstance = ''''lakeflow_'''' + @schemaName + ''''_'''' + @tableName + ''''_1'''' END -- Skip capture instance creation ONLY if we cannot create one (e.g., 2 non-lakeflow instances) IF @SkipCaptureInstanceCreation = 0 BEGIN BEGIN TRAN EXEC sys.sp_cdc_enable_table @source_schema = @schemaName, @source_name = @tableName, @role_name = @RoleName, @capture_instance = @NewCaptureInstance, @filegroup_name = @FileGroupName, @supports_net_changes = @SupportNetChanges SET @CommittedCursor = (SELECT committedCursor FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName=@schemaName AND tableName=@tableName); DELETE FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName=@schemaName AND tableName=@tableName; SET @OldInstanceToTrack = @OldCaptureInstance; INSERT INTO dbo.['' + @captureInstanceTableName + ''] VALUES (@OldInstanceToTrack, @NewCaptureInstance, @schemaName, @tableName, @CommittedCursor, @TriggerReinit); IF (@reinit = 0 AND @OldInstanceToTrack IS NOT NULL) EXEC dbo.'' + @mergeCaptureInstancesProcName + '' @schemaName, @tableName; COMMIT TRAN END ELSE BEGIN -- Cannot create capture instance (both slots taken by non-lakeflow instances) -- Just insert tracking row with NULL values to signal reinit BEGIN TRAN SET @CommittedCursor = (SELECT committedCursor FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName=@schemaName AND tableName=@tableName); DELETE FROM dbo.['' + @captureInstanceTableName + ''] WHERE schemaName=@schemaName AND tableName=@tableName; INSERT INTO dbo.['' + @captureInstanceTableName + ''] VALUES (NULL, NULL, @schemaName, @tableName, @CommittedCursor, @TriggerReinit); COMMIT TRAN END COMMIT TRAN''; EXEC sp_executesql @SQL; PRINT N''✓ Created procedure: '' + @refreshCaptureInstanceProcName; END -- Create ALTER TABLE trigger IF NOT EXISTS (SELECT 1 FROM sys.triggers WHERE name = @alterTableTriggerName AND parent_class = 0) BEGIN SET @SQL = N''CREATE TRIGGER ['' + @alterTableTriggerName + ''] ON DATABASE FOR ALTER_TABLE AS BEGIN SET NOCOUNT ON; DECLARE @data XML = EVENTDATA(); DECLARE @DbName NVARCHAR(255) = DB_NAME(); DECLARE @schemaName NVARCHAR(MAX) = @data.value(''''(/EVENT_INSTANCE/SchemaName)[1]'''', ''''NVARCHAR(MAX)''''); DECLARE @tableName NVARCHAR(255) = @data.value(''''(/EVENT_INSTANCE/ObjectName)[1]'''', ''''NVARCHAR(255)''''); DECLARE @isColumnAdd NVARCHAR(255) = @data.value(''''(/EVENT_INSTANCE/AlterTableActionList/Create)[1]'''', ''''NVARCHAR(255)''''); DECLARE @IsCdcEnabledDBLevel BIT = (SELECT is_cdc_enabled FROM sys.databases WHERE name=@DbName); DECLARE @IsCdcEnabledTableLevel BIT = (SELECT is_tracked_by_cdc FROM sys.tables WHERE schema_id = SCHEMA_ID(@schemaName) AND name = @tableName); -- Only trigger refresh if CDC is enabled at both levels AND column add detected IF (@IsCdcEnabledDBLevel = 1 AND @IsCdcEnabledTableLevel = 1 AND @isColumnAdd IS NOT NULL) BEGIN -- Refresh the capture instance for this table EXEC [dbo].['' + @refreshCaptureInstanceProcName + ''] @schemaName, @tableName; END END''; EXEC sp_executesql @SQL; PRINT N''✓ Created ALTER TABLE trigger: '' + @alterTableTriggerName; END -- User resolution IF @User IS NOT NULL AND @User != '''' BEGIN -- Check if user exists as database user IF NOT EXISTS (SELECT 1 FROM sys.database_principals WHERE name = @User) BEGIN -- Check if it is a server login and find its mapped database user SELECT @DatabaseUser = dp.name FROM sys.database_principals dp INNER JOIN sys.server_principals sp ON dp.sid = sp.sid WHERE sp.name = @User AND dp.type IN (''S'', ''U'', ''G'') AND dp.name NOT IN (''guest''); -- If still no database user found, warn and exit IF @DatabaseUser IS NULL OR @DatabaseUser = @User BEGIN PRINT N''⚠ Warning: User/Login ['' + @User + ''] not found as database user. Skipping permission grants.''; PRINT N'' To fix: CREATE USER ['' + @User + ''] FOR LOGIN ['' + @User + ''];''; SET @DatabaseUser = NULL; END ELSE BEGIN PRINT N''Server login ['' + @User + ''] maps to database user ['' + @DatabaseUser + ''].''; END END -- Special handling for dbo user - cannot grant permissions to dbo IF @DatabaseUser = ''dbo'' BEGIN PRINT N''Skipping permission grants (dbo already has all permissions).''; SET @DatabaseUser = NULL; END END -- Grant permissions to user if specified IF @DatabaseUser IS NOT NULL BEGIN PRINT N''Granting CDC DDL support object permissions to user: '' + @DatabaseUser; BEGIN TRY SET @SQL = N''GRANT SELECT, UPDATE ON [dbo].['' + @captureInstanceTableName + ''] TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; SET @SQL = N''GRANT VIEW DEFINITION TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; SET @SQL = N''GRANT VIEW DATABASE STATE TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; SET @SQL = N''GRANT SELECT ON SCHEMA::dbo TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; SET @SQL = N''GRANT SELECT, INSERT ON SCHEMA::cdc TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; SET @SQL = N''GRANT EXECUTE ON [dbo].['' + @disableOldCaptureInstanceProcName + ''] TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; SET @SQL = N''GRANT EXECUTE ON [dbo].['' + @mergeCaptureInstancesProcName + ''] TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; SET @SQL = N''GRANT EXECUTE ON [dbo].['' + @refreshCaptureInstanceProcName + ''] TO ['' + @DatabaseUser + '']''; EXEC sp_executesql @SQL; PRINT N''✓ Granted CDC permissions to '' + @DatabaseUser; END TRY BEGIN CATCH PRINT N''⚠ Could not grant CDC permissions to '' + @DatabaseUser + '': '' + ERROR_MESSAGE(); END CATCH END -- Process tables if specified IF @Tables IS NOT NULL BEGIN PRINT N''Processing tables for Change Data Capture enablement...''; DECLARE @TargetTables TABLE ( SchemaName NVARCHAR(128), TableName NVARCHAR(128), HasPrimaryKey BIT ); -- Parse table list and populate target tables IF @Tables = ''ALL'' BEGIN INSERT INTO @TargetTables (SchemaName, TableName, HasPrimaryKey) SELECT s.name, t.name, CASE WHEN EXISTS ( SELECT 1 FROM sys.key_constraints kc WHERE kc.parent_object_id = t.object_id AND kc.type = ''PK'' ) THEN 1 ELSE 0 END FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.is_ms_shipped = 0; END ELSE IF @Tables LIKE ''SCHEMAS:%'' BEGIN DECLARE @SchemaList NVARCHAR(MAX) = SUBSTRING(@Tables, 9, LEN(@Tables)); INSERT INTO @TargetTables (SchemaName, TableName, HasPrimaryKey) SELECT s.name, t.name, CASE WHEN pk.CONSTRAINT_NAME IS NOT NULL THEN 1 ELSE 0 END FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS pk ON pk.TABLE_SCHEMA = s.name AND pk.TABLE_NAME = t.name AND pk.CONSTRAINT_TYPE = ''PRIMARY KEY'' WHERE t.type = ''U'' AND s.name IN (SELECT LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) AS value FROM ( SELECT CAST('''' + REPLACE(@SchemaList, '','', '''') + '''' AS XML) AS Data ) AS A CROSS APPLY Data.nodes(''/M'') AS Split(a) WHERE LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) != ''''); END ELSE BEGIN DECLARE @TableList TABLE (FullTableName NVARCHAR(261)); INSERT INTO @TableList (FullTableName) SELECT LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) AS value FROM ( SELECT CAST('''' + REPLACE(@Tables, '','', '''') + '''' AS XML) AS Data ) AS A CROSS APPLY Data.nodes(''/M'') AS Split(a) WHERE LTRIM(RTRIM(Split.a.value(''.'', ''NVARCHAR(MAX)''))) != ''''; INSERT INTO @TargetTables (SchemaName, TableName, HasPrimaryKey) SELECT s.name, t.name, CASE WHEN pk.CONSTRAINT_NAME IS NOT NULL THEN 1 ELSE 0 END FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id INNER JOIN @TableList tl ON (tl.FullTableName = s.name + ''.*'' OR tl.FullTableName = s.name + ''.'' + t.name OR (CHARINDEX(''.'', tl.FullTableName) = 0 AND tl.FullTableName = t.name AND s.name = ''dbo'')) LEFT JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS pk ON pk.TABLE_SCHEMA = s.name AND pk.TABLE_NAME = t.name AND pk.CONSTRAINT_TYPE = ''PRIMARY KEY'' WHERE t.type = ''U''; END -- Process each table for CDC enablement DECLARE @CurrentSchema NVARCHAR(128), @CurrentTable NVARCHAR(128); DECLARE @ProcessedCount INT = 0, @SkippedCount INT = 0, @ErrorCount INT = 0; DECLARE table_cursor CURSOR FOR SELECT SchemaName, TableName FROM @TargetTables ORDER BY SchemaName, TableName; OPEN table_cursor; FETCH NEXT FROM table_cursor INTO @CurrentSchema, @CurrentTable; WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY IF NOT EXISTS ( SELECT 1 FROM cdc.change_tables ct INNER JOIN sys.tables t ON ct.source_object_id = t.object_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE s.name = @CurrentSchema AND t.name = @CurrentTable ) BEGIN DECLARE @LakeflowCaptureInstance NVARCHAR(255) = ''lakeflow_'' + @CurrentSchema + ''_'' + @CurrentTable + ''_1''; EXEC sys.sp_cdc_enable_table @source_schema = @CurrentSchema, @source_name = @CurrentTable, @role_name = NULL, @capture_instance = @LakeflowCaptureInstance; PRINT N''✓ Enabled CDC on ['' + @CurrentSchema + ''].['' + @CurrentTable + ''] with capture instance '' + @LakeflowCaptureInstance; SET @ProcessedCount = @ProcessedCount + 1; END ELSE BEGIN PRINT N''ℹ CDC already enabled on ['' + @CurrentSchema + ''].['' + @CurrentTable + '']''; SET @SkippedCount = @SkippedCount + 1; END END TRY BEGIN CATCH PRINT N''✗ Error enabling CDC on ['' + @CurrentSchema + ''].['' + @CurrentTable + '']: '' + ERROR_MESSAGE(); SET @ErrorCount = @ErrorCount + 1; END CATCH FETCH NEXT FROM table_cursor INTO @CurrentSchema, @CurrentTable; END CLOSE table_cursor; DEALLOCATE table_cursor; -- Summary PRINT N''''; PRINT N''CDC setup summary:''; PRINT N'' - Tables processed: '' + CAST(@ProcessedCount AS NVARCHAR(10)); PRINT N'' - Tables already enabled: '' + CAST(@SkippedCount AS NVARCHAR(10)); PRINT N'' - Tables with errors: '' + CAST(@ErrorCount AS NVARCHAR(10)); END PRINT N''Change Data Capture setup completed successfully''; END TRY BEGIN CATCH SET @ErrorMessage = ''Error in lakeflowSetupChangeDataCapture: '' + ERROR_MESSAGE(); PRINT @ErrorMessage; THROW; END CATCH END'; PRINT N'Created lakeflowSetupChangeDataCapture procedure'; -- Final validation and summary BEGIN PRINT N''; PRINT N'=== Installation Summary ==='; DECLARE @Platform NVARCHAR(50) = dbo.lakeflowDetectPlatform(); PRINT N'Platform: ' + @Platform; PRINT N'Version: ' + dbo.lakeflowUtilityVersion_1_5(); -- Verify created objects IF OBJECT_ID('dbo.lakeflowDetectPlatform', 'FN') IS NOT NULL PRINT N'✓ lakeflowDetectPlatform function created successfully' ELSE PRINT N'✗ lakeflowDetectPlatform function creation failed'; IF OBJECT_ID('dbo.lakeflowUtilityVersion_1_5', 'FN') IS NOT NULL PRINT N'✓ lakeflowUtilityVersion_1_5 function created successfully' ELSE PRINT N'✗ lakeflowUtilityVersion_1_5 function creation failed'; IF OBJECT_ID('dbo.lakeflowFixPermissions', 'P') IS NOT NULL PRINT N'✓ lakeflowFixPermissions procedure created successfully' ELSE PRINT N'✗ lakeflowFixPermissions procedure creation failed'; IF OBJECT_ID('dbo.lakeflowSetupChangeTracking', 'P') IS NOT NULL PRINT N'✓ lakeflowSetupChangeTracking procedure created successfully' ELSE PRINT N'✗ lakeflowSetupChangeTracking procedure creation failed'; IF OBJECT_ID('dbo.lakeflowSetupChangeDataCapture', 'P') IS NOT NULL PRINT N'✓ lakeflowSetupChangeDataCapture procedure created successfully' ELSE PRINT N'✗ lakeflowSetupChangeDataCapture procedure creation failed'; PRINT N''; PRINT N'=== Usage Examples ==='; PRINT N'-- Table-specific setup:'; PRINT N'EXEC dbo.lakeflowSetupChangeTracking @Tables = ''dbo.Table1,Sales.Orders'', @User = ''YourUsername'';'; PRINT N''; PRINT N'-- Enable change tracking on all user tables (auto-discovers, skips tables without PKs):'; PRINT N'EXEC dbo.lakeflowSetupChangeTracking @Tables = ''ALL'', @User = ''YourUsername'';'; PRINT N''; PRINT N'-- Enable change tracking on all tables in specific schemas:'; PRINT N'EXEC dbo.lakeflowSetupChangeTracking @Tables = ''SCHEMAS:Sales,HR,Production'', @User = ''YourUsername'';'; PRINT N''; PRINT N'-- Enable change tracking with wildcard support:'; PRINT N'EXEC dbo.lakeflowSetupChangeTracking @Tables = ''Sales.*,HR.Employees,dbo.SpecialTable'', @User = ''YourUsername'';'; PRINT N''; PRINT N'-- Enable CDC on all user tables (processes tables with and without PKs):'; PRINT N'EXEC dbo.lakeflowSetupChangeDataCapture @Tables = ''ALL'', @User = ''YourUsername'';'; PRINT N''; PRINT N'-- Smart two-step approach for complete coverage:'; PRINT N'-- Step 1: Enable CT on tables with primary keys'; PRINT N'EXEC dbo.lakeflowSetupChangeTracking @Tables = ''ALL'', @User = ''YourUsername'';'; PRINT N'-- Step 2: Enable CDC on tables without primary keys'; PRINT N'EXEC dbo.lakeflowSetupChangeDataCapture @Tables = ''ALL'', @User = ''YourUsername'';'; PRINT N''; PRINT N'-- Fix permissions for a user:'; PRINT N'EXEC dbo.lakeflowFixPermissions @User = ''YourUsername'';'; PRINT N''; PRINT N'-- Grant table permissions for specific tables:'; PRINT N'EXEC dbo.lakeflowFixPermissions @User = ''YourUsername'', @Tables = ''ALL'';'; PRINT N'EXEC dbo.lakeflowFixPermissions @User = ''YourUsername'', @Tables = ''Sales.*,HR.Employees'';'; PRINT N''; PRINT N'-- Setup change tracking at database level only (no table processing):'; PRINT N'EXEC dbo.lakeflowSetupChangeTracking @Tables = NULL, @User = ''YourUsername'';'; PRINT N''; PRINT N'-- NOTE: The @User parameter is optional. If provided, the procedures will grant'; PRINT N'-- the specified user permissions to access DDL support objects (audit tables, etc.)'; PRINT N'-- This is useful for granting read access to change tracking metadata.'; PRINT N''; PRINT N'-- Cleanup DDL support objects:'; PRINT N'EXEC dbo.lakeflowSetupChangeTracking @Mode = ''CLEANUP'';'; PRINT N'EXEC dbo.lakeflowSetupChangeDataCapture @Mode = ''CLEANUP'';'; PRINT N''; PRINT N'=== Installation Complete ==='; PRINT N'All utility objects have been installed successfully.'; PRINT N''; PRINT N'=== Available Procedures ==='; PRINT N'1. lakeflowFixPermissions - Fix user permissions for ingestion'; PRINT N'2. lakeflowSetupChangeTracking - Setup change tracking and DDL audit objects'; PRINT N'3. lakeflowSetupChangeDataCapture - Setup CDC and capture instance objects'; PRINT N''; PRINT N'For more information, visit: https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/sql-server-source-setup'; END