In this series we look at building a Streaming ETL with Azure Data Factory and CDC – Creating an Incremental Pipeline in Azure Data Factory. This is Part 6, The rest of the series is below.
- Enabling CDC
- Setting up Audit Tables
- Provisioning Azure Data Factory
- Provisioning Azure Blog Storage
- Create Data Source Connection in ADF
- Create Incremental Pipeline in ADF
- Create a Parameter Driven Pipeline
- Create a Rolling Trigger
Side Note: Want to learn SQL or Python for free. In less then 10 minutes a day and less than an hour total? Signup for my free classes delivered daily right to your email inbox for free!
Now back to the article…
This series uses the Adventureworks database. For more information on how to get that set up see my Youtube video for Downloading and Restoring the database.
Creating an Incremental Pipeline in Azure Data Factory
- Create a new Dataset as a SQL Server and name it “Incremental_Load”
- From the Activities, get the lookup and move it to the main window. This task will be used to track the new changes in the CDC table for a certain time frame. Name the Lookup “GetChangeCount”
- In the properties, add the customer query and replace with the correct table name. The query “SELECT capture_instance FROM cdc.change_tables” will give you the names of the CDC tables and can be tested in SQL Management Sudio
DECLARE @from_lsn binary(10), @to_lsn binary(10);
SET @from_lsn =sys.fn_cdc_get_min_lsn(‘dbo_DimProduct’);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn(‘largest less than or equal’, GETDATE());
SELECT count(1) changecount FROM cdc.fn_cdc_get_all_changes_dbo_DimProduct(@from_lsn, @to_lsn, ‘all’)
- Preview data will show the result of this query. This will show how many changes have been recorded in
- In the activities, expand iteration and add the If condition to the flow.
- Name it “HasChangedRow” and in the Properties window add the code “@greater(int(activity(‘GetChangeCount’).output.firstRow.changecount),0)” and select the Pencil next to the True Condition.
- Add the Copy Activity from the Move & Transform Activity and name it “Copy Incremental Data”.
- In the Source Tab, set the Source to your SQL Dataset and use the following query replacing the highlighted if needed. Select Preview to see the results.
DECLARE @from_lsn binary(10), @to_lsn binary(10);
SET @from_lsn =sys.fn_cdc_get_min_lsn(‘dbo_DimProduct’);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn(‘largest less than or equal’, GETDATE());
SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_DimProduct(@from_lsn, @to_lsn, ‘all’)
- In the Sink tab, select your CSV Blob source.
- You can now debug the main pipeline and check in your storage account to see if data was move to capture the rows from CDC.
Streaming ETL with Azure Data Factory and CDC – Creating an Incremental Pipeline in Azure Data Factory