Writing about Data Engineering problems and SQL
by Brahmanand Singh
Orchestrating data transformations is a key part of modern data engineering.in GCP, Cloud Composer (managed airflow) service provides a robust platform for managing such workflows. In this post, we’ll walk through creating a simple Cloud Composer DAG that executes a multi-statement BigQuery SQL script from a file, using Jinja templating to pass dynamic parameters like project IDs and dataset names. This approach is perfect for data engineers looking to build flexible and reusable data pipelines.
This pipeline is designed to:
This setup is ideal for teams managing data transformations in Google Cloud, especially in multi-project environments.
The pipeline includes:
The DAG uses the BigQueryInsertJobOperator
to run the SQL script, with start and end dummy tasks for clear workflow visualization, a single active run to avoid conflicts, and email notifications for failures.
The DAG is designed to:
project_id
and dataset_name
using Jinja templating.max_active_runs=1
) to prevent conflicts.python
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'email': ['your_email@example.com'], # Replace with actual email
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='transform_load_dag',
default_args=default_args,
description='DAG to run BigQuery SQL script with Jinja templating',
schedule_interval=None,
start_date=datetime(2025, 7, 20),
catchup=False,
max_active_runs=1,
) as dag:
start_task = DummyOperator(task_id='start')
bq_task = BigQueryExecuteQueryOperator(
task_id='run_sql_script',
gcp_conn_id='your_gcp_connection_id', # Replace with actual GCP connection ID
sql="transform_load.sql",
use_legacy_sql=False,
params={
'project_id': 'your_project_id', # Replace with actual project ID
'dataset_name': 'your_target_dataset',
'source_dataset': 'your_source_dataset',
},
)
end_task = DummyOperator(task_id='end')
start_task >> bq_task >> end_task
The SQL file (transform_load.sql) is stored in gs://<composer-bucket>/dags/
and uses Jinja templating for dynamic parameters:sql
sql
CREATE OR REPLACE TABLE `..transformed_table` AS
SELECT * FROM `..source_table` WHERE some_condition;
-- Additional statements as needed
Replace gs://<composer-bucket>
with your Composer bucket .
Update gcp_conn_id, project_id, dataset_name, and source_dataset in the DAG with actual values.
Set a valid email address for email.
Upload the DAG to gs://<composer-bucket>/dags/my_sample_dag.py
.
Upload transform_load.sql to gs://<composer-bucket>/dags/transform_load.sql
.
Verify in the Airflow UI for dags to parse correctly and appear, if the sql file is not placed at correct path then you will get jinja2.exception.TemplateNotFound
error.
Trigger the dag manually and validate, SQL script executes correctly and parameters are applied.
Check logs for any errors during execution.
This Cloud Composer DAG provides a simple and effective way to run BigQuery SQL scripts with Jinja templating, enabling dynamic data transformations across projects or datasets.
tags: GCP - CloudComposer - Airflow - BigQuery