In this series we look at building a Streaming ETL with Azure Data Factory and CDC – Create a Parameter Driver Pipeline. This is Part 6, The rest of the series is below.
- Enabling CDC
- Setting up Audit Tables
- Provisioning Azure Data Factory
- Provisioning Azure Blog Storage
- Create Data Source Connection in ADF
- Create Incremental Pipeline in ADF
- Create a Parameter Driven Pipeline
- Create a Rolling Trigger
This series uses the Adventureworks database. For more information on how to get that set up see my Youtube video for Downloading and Restoring the database.
The previous step will pull all the changes in the CDC table, but we do not want to do this all the time. So let’s look at creating a rolling window for the CDC ETL.
- Navigate to the parameters section and create a new parameter. Add two paramenters “triggerStartTime” and triggerEndTime” and set them to yesterday and todays date in the format “2020-01-07 12:00:00:000”
- On the Lookup Activity, update the query in the settings to the following to use the new variables. SQL Agent must be running for this step the parameters must be valid dates.
@concat(‘DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10);
SET @begin_time = ”’,pipeline().parameters.triggerStartTime,”’;
SET @end_time = ”’,pipeline().parameters.triggerEndTime,”’;
SET @from_lsn = sys.fn_cdc_map_time_to_lsn(”smallest greater than or equal”, @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn(”largest less than”, @end_time);
SELECT count(1) changecount FROM cdc.fn_cdc_get_all_changes_dbo_DimProduct (@from_lsn, @to_lsn, ”all”)’)
- Navigate back to the “True” condition and paste the following query in to track the changes with the variables as well
@concat(‘DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10);
SET @begin_time = ”’,pipeline().parameters.triggerStartTime,”’;
SET @end_time = ”’,pipeline().parameters.triggerEndTime,”’;
SET @from_lsn = sys.fn_cdc_map_time_to_lsn(”smallest greater than or equal”, @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn(”largest less than”, @end_time);
SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_DimProduct(@from_lsn, @to_lsn, ”all”)’)
- Edit the Sink tab in the true statement and click on parameters.
- Add a new parameter called triggerStart
- Head back to the Connections Tab for the dataset where we will be adding dynamic content for the directory and file.
- Add the following for the directory and file sections.
Directory
@concat(‘dimProduct/incremental/’,formatDateTime(dataset().triggerStart,’yyyy/MM/dd’))
File
@concat(formatDateTime(dataset().triggerStart,’yyyyMMddHHmmssf
ff’),’.csv’)
- Navigate back to the Sink in the Copy and expand dataset properties. Add the dynamic content for the new parameter.
- You can now trigger your run and see the new files landing in the datalake.
Streaming ETL with Azure Data Factory and CDC – Create a Parameter Driver Pipeline