Alireza Sadeghi
6 min readOct 1, 2023

--

In my previous blog post I covered different relational database data extraction techniques when using batch or micro-batch data collection frameworks.

Now let’s discuss a more practical example of developing a useful data ingestion pattern with dual data pipelines, one pipeline for incrementally loading recent data with a lower latency such as NRT or hourly, and another one to fully reload the dataset using the snapshot mechanism, with a higher latency such as nightly.

As mentioned briefly in the previous post, when working in a not fully reliable data environment, there may be situations where you cannot rely on the accuracy of source database metadata columns such as the record updated timestamp, to ensure that no changes in the source data are ever missed. In some cases, records are manually fixed, changed, or updated by system administrators and DBAs in the backend system or database, but these changes may not be reflected in the next data ingestion run if the control or metadata columns are not updated accordingly. Additionally, corrupt or incorrect records may be deleted without leaving any trace. Therefore, when having strict SLAs to always have the ingested data matches the source data 100%, you are left with only a few choices, such as fully reloading the data in the target storage system.

In order to have the best of the both words that is both have most recent data in a low latency manner and ensure that eventually data is always correct, one technique is to build a dual data pipeline for meeting each objective. This is similar to Lambda architecture with two distinct pipelines where the reliable batch pipeline eventually catches up with the real-time streaming data pipeline.

In terms of scheduling and latency following has to be decided for each pipeline:

  • Frequency and latency of incremental pipeline based on business needs. This can range from NRT using streaming CDC based pipeline to hourly or even longer latency.
  • Frequency of snapshot pipeline run depending on how long the business can tolerate data drifts or uncaptured changes by the incremental pipelines.

In the example to follow, an incremental pipelines is developed to extract recent data from a source system (ex CRM) object such as product table, on an hourly basis, and a daily snapshot pipeline is implemented to fully reload the data on the target data platform on nightly basis.

Pipeline Development and Workflow Orchestration

For demonstration purpose, popular Apache Airflow orchestration engine will be used for implementation and management of the two snapshot and incremental pipelines.

At the DAG level we would have two ingestion DAGs, one scheduled to run hourly for the incremental pipeline, and another one scheduled to run daily for the snapshot pipeline.

Snapshot Pipeline

Our daily snapshot workflow for the sample product table would like the following DAG:

start_ingestion and end_ingestion tasks are just dummy operators which are useful when dealing with multiple parallel tasks and performing data operations such as clearing and re-running downstream tasks.

The import_snapshot_product queries and extracts the table data from source database and ingests into a new snapshot partition on the data lake. Once the data is imported, the previous snapshot partition is dropped.

Few pipeline development best practices to consider:

  • To ensure that the pipeline is fully idempotent and deterministic, is better to make the extraction query bounded by max datatime such as midnight timestamp, so that in case the pipeline has to be re-executed it always extracts and loads the same data as the first run attempt.
  • Any existing partition partition that might have been created in the previous failed run has to be checked and dropped before loading.

In SQL terms we would have a Jinja templated SQL query statement similar to following for data extraction using Airflow built-in variables :

SELECT * FROM products WHERE ts < '{{ data_interval_end  }}'

For snapshot partition operation if using an SQL based engine such as Hive or Trino (Presto) to manage the data on the data lake would have a templated SQL statement similar to following:

ALTER TABLE products
DROP IF EXISTS PARTITION (snapshot_date='{{ prev_start_date_success.to_date_string() }}'

Depending on the available tools appropriate execution engine can be used for data extraction and import ranging from simple Python JDBC connection, to distributed computational tools such as Trino or Sqoop for Hadoop-based platforms, or cloud-based tools such as AWS Glue if running and operating on Cloud.

The truncate_product_incremental airflow task is simply responsible for truncating the product_incremental dataset.

Incremental Pipeline

The incremental ingestion pipeline in our example would be scheduled to run hourly on Airflow and in the most simple form it would consist of the following DAG:

Similar to the snapshot operation, to ensure that the pipeline is fully deterministic and idempotent and be safe to retry, we need to ensure any existing data or partitions created as result of previous DAG runs are dropped when import_incremental_product task is executed.

Dynamic DAG Generation

When have many candidate source tables to be imported using this data ingestion pattern, it would be cumbersome and bad software engineering practice to create a separate DAG or tasks for each table with lots of code duplication.

We can take advantage of dynamic DAG generation in Airflow by having a JSON or YAML configuration file with the list of tables, and dynamically generate the required tasks in our DAG script. This pattern is referred to declarative or config-driven data pipeline development.

For demonstration purpose lets create the following JSON file having listed four tables to be ingested by our airflow snapshot and incremental workflows.

# lib/snapshot_sources.json
[
{
"schema": "public",
"table": "product"
},
{
"schema": "public",
"table": "category"
},
{
"schema": "public",
"table": "clients"
},
{
"schema": "public",
"table": "suppliers"
}
]

Using the above config file, In our snapshot_ingestion DAG script, the required tasks for each table can be generated as follows:

import json

items = json.load(open('/var/lib/airflow/dags/lib/snapshot_sources.json'))
for item in items:
import_snapshot = PythonOperator(
task_id="import_snapshot_{}".format(item['table']),
python_callable=_import_snapshot,
provide_context=True,
op_kwargs={ 'table': item['table'], 'schema': item['schema'] },
dag=dag
)
truncate_incremental = PythonOperator(
task_id="truncate_{}_incremental".format(item['table']),
python_callable=_reload_tables,
provide_context=True,
op_kwargs={ 'table': item['table'] },
dag=dag
)
import_snapshot.set_upstream(start_ingestion)
import_snapshot.set_downstream(truncate_incremental)
truncate_incremental.set_downstream(end_ingestion)

Now our new DAG is generated dynamically:

This is also where dummy tasks such as start_ingestion come handy in case we need to clear the state of all import_snapshot tasks which one click.

Conclusion

In this article, we have discussed a batch data ingestion pattern that combines an incremental pipeline with a snapshot ingestion pipeline. This pattern can be quite useful in certain use-cases where you need 100% data completeness and consistency with the source, such as when dealing with financial data, where the incremental pipeline alone may not provide the required level of consistency and reliability.

--

--

Alireza Sadeghi

Senior Software and Data Engineer [big data, distributed storage ,distributed processing, data pipelines, infraustructure, cluster management, workflow orch]