Change Data Capture -Part2

How To Process Change Data Capture (CDC) in SQL Server Integration Services SSIS 2008

Assume that we have a customer table and we want to store all changes to our customer table in a customer_audit table.  We’ll write an SSIS package to query the CDC data and copy it to the customer_audit table.  We would like the option of running the SSIS package on demand and/or on a schedule (e.g. a SQL Agent job).  Each time we run the SSIS package we want to pickup whatever changed since the last time we ran the package.

Customer Tables

We will use the following customer table:

create table dbo.customer (

customer_id int identity primary key not null

, name nvarchar(50) not null

, sales_rep nvarchar(50) not null

, region nvarchar(50) not null

, credit_limit int not null

)

We will use the following customer_audit table to store changes to the customer table:

create table dbo.customer_audit (

customer_audit_id int identity primary key not null

, customer_id int not null

, name nvarchar(50) not null

, sales_rep nvarchar(50) not null

, region nvarchar(50) not null

, credit_limit int not null

, effective_date datetime not null

, __$start_lsn binary(10) not null

, __$seqval binary(10) not null

, __$operation int not null

, __$update_mask varbinary(128) not null

)

Enabling CDC

As we see before.

Logging

In order to allow our SSIS package to pickup just the changes since the last time it was run, we’ll populate a log table with the data we need.  We’ll use the following log table:

create table dbo.cdc_capture_log (

cdc_capture_log_id int identity not null

, capture_instance nvarchar(50) not null

, start_time datetime not null

, min_lsn binary(10) not null

, max_lsn binary(10) not null

, end_time datetime null

, insert_count int not null default 0

, update_count int not null default 0

, delete_count int not null default 0

, status_code int not null default 0

)

We’ll create two stored procedures to maintain the log:

  • init_cdc_capture_log will create a new row.
  • end_cdc_capture_log will update the row.

The init_cdc_capture_log is called at the beginning of our SSIS package.  It is shown below:

create procedure dbo.init_cdc_capture_log

@capture_instance nvarchar(50)

as

begin

set nocount on;

declare

@begin_lsn binary(10)

, @end_lsn binary(10)

, @prev_max_lsn binary(10)

— get the max LSN for the capture instance from

— the last extract

select @prev_max_lsn = max(max_lsn)

from dbo.cdc_capture_log

where capture_instance = @capture_instance

— if no row found in cdc_capture_log get the min lsn

— for the capture instance

if @prev_max_lsn is null

set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)

else

set @begin_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)

— get the max lsn

set @end_lsn = sys.fn_cdc_get_max_lsn()

insert into dbo.cdc_capture_log

(capture_instance,start_time,min_lsn,max_lsn)

values

(@capture_instance,getdate(),@begin_lsn,@end_lsn)

select cast(scope_identity() as int) cdc_capture_log_id

end

The end_cdc_capture_log stored procedure updates the row created  by the init_cdc_capture_log stored procedure.  It is shown below:

create procedure dbo.end_cdc_capture_log

@cdc_capture_log_id int

, @insert_count int

, @update_count int

, @delete_count int

as

begin

set nocount on;

update dbo.cdc_capture_log set

end_time = getdate()

, insert_count = @insert_count

, update_count = @update_count

, delete_count = @delete_count

, status_code = 1

where cdc_capture_log_id = @cdc_capture_log_id

end

Creating the SSIS Package

We will create an SSIS package that has the following control flow:

clip_image001

The main points about the above control flow are:

  • Init Log is an Execute SQL task; it calls the init_cdc_capture_log stored procedure (described above) and saves the identity value of the created cdc_capture_log row in a package variable.
  • Process Changes is a Data Flow task that retrieves the latest changes from the CDC table and copies them to our audit table.
  • End Log is an Execute SQL task that calls the end_cdc_capture_log stored procedure (described above) to update the cdc_capture_log row.

The Process Changes Data Flow task is implemented as shown below:

clip_image002

The main points about the above data flow are:

  • Extract Customer Changes is an OLE DB Source that executes the stored procedure extract_customer_capture_log to retrieve the customer changes since the last run.
  • Count Inserts Updates and Deletes is a Script Component Transform task that just counts the number of inserts, updates and deletes in the change data.
  • Save Customer Changes to Custom Audit Table is an OLE DB Destination used to insert each change row into the customer_audit table.

The extract_customer_capture_log stored procedure is shown below:

create procedure dbo.extract_customer_capture_log

@cdc_capture_log_id int

as

begin

set nocount on;

declare

@begin_lsn binary(10)

, @end_lsn binary(10)

— get the lsn range to process

select

@begin_lsn = min_lsn

, @end_lsn = max_lsn

from dbo.cdc_capture_log

where cdc_capture_log_id = @cdc_capture_log_id

— extract and return the changes

select m.tran_end_time modified_ts, x.*

from cdc.fn_cdc_get_all_changes_customer_all(

@begin_lsn, @end_lsn, ‘all’

) x

join cdc.lsn_time_mapping m

on m.start_lsn = x.__$start_lsn ;

end

The main points about the above stored procedure are:

  • The cdc_capture_log_id parameter value is the value returned from the call to the init_cdc_capture_log stored procedure (described above in the Logging section).
  • Retrieve the LSN range from the cdc_capture_log table row.  The LSN range represents all changes that have occurred since the last run of the SSIS package.
  • The cdc.fn_cdc_get_all_changes_customer_all function is generated when you enable CDC.  The function name includes the capture instance.  The function returns the changes that occurred in the LSN range.
  • The cdc.lsn_time_mapping table is populated by CDC with the mapping of transaction times to LSNs.  The join retrieves the transaction time.  This alleviates the need to manually track this in the source table.

Testing the SSIS Package

Before running the SSIS package, we need to execute a script that performs some inserts, updates and deletes.  We’ll use the following script:

use SqlHero

go

insert into dbo.customer

(name,sales_rep,region,credit_limit)

values

(N’BGM Systems’, N’Cane’, N’South’, 2500)

update dbo.customer

set sales_rep = N’Smith’

where [name] = N’BGM Systems’

update dbo.customer

set credit_limit = 3000

where [name] = N’BGM Systems’

delete dbo.customer

where [name] = N’BGM Systems’

delete dbo.customer

where [name] = N’BGM Systems’

After running the above script, execute the SSIS package, then check the customer_audit table to see that there are four rows; one for each change made in the script.  The partial contents of the customer_audit table are shown below:

clip_image003

The main points about the above table are:

  • effective_date is the date and time of the transaction, as retrieved from the cdc.lsn_time_mapping table.
  • Row 1 shows the insert; __$operation=2 for an insert.
  • Row 2 shows the update of the sales_rep; __$operation=4 for the update showing the values after the update.
  • Row 3 shows the update of the credit_limit.
  • Row 4 shows the delete; __$operation=1 for a delete.

The effective_date column provides the ability to query the customer_audit table and see the customer values at any point in time by filtering on the maximum effective_date that is less than or equal to some value.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: