Brahma's Personal Blog

Writing about Data Engineering problems and SQL

View the Project on GitHub brahma19/blog

20 July 2025

Running BigQuery SQL Scripts in Cloud Composer with Jinja Templating

by Brahmanand Singh

Orchestrating data transformations is a key part of modern data engineering.in GCP, Cloud Composer (managed airflow) service provides a robust platform for managing such workflows. In this post, we’ll walk through creating a simple Cloud Composer DAG that executes a multi-statement BigQuery SQL script from a file, using Jinja templating to pass dynamic parameters like project IDs and dataset names. This approach is perfect for data engineers looking to build flexible and reusable data pipelines.

Why This Approach?

This pipeline is designed to:

This setup is ideal for teams managing data transformations in Google Cloud, especially in multi-project environments.

Architecture Overview

The pipeline includes:

  1. Google Cloud Composer (Airflow): Orchestrates the workflow, running a DAG that triggers the SQL script.
  2. BigQuery: Executes the multi-statement SQL script for data transformation and loading.
  3. Google Cloud Storage (GCS): Stores the DAG and SQL file in the Composer environment’s bucket.
  4. Jinja Templating: Injects parameters (e.g., project ID, dataset names) into the SQL script dynamically.

The DAG uses the BigQueryInsertJobOperator to run the SQL script, with start and end dummy tasks for clear workflow visualization, a single active run to avoid conflicts, and email notifications for failures.

Implementation Highlights

DAG Structure

The DAG is designed to:

Code:

python
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['your_email@example.com'],  # Replace with actual email
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='transform_load_dag',
    default_args=default_args,
    description='DAG to run BigQuery SQL script with Jinja templating',
    schedule_interval=None,
    start_date=datetime(2025, 7, 20),
    catchup=False,
    max_active_runs=1,
) as dag:
    start_task = DummyOperator(task_id='start')
    bq_task = BigQueryExecuteQueryOperator(
        task_id='run_sql_script',
        gcp_conn_id='your_gcp_connection_id',  # Replace with actual GCP connection ID
        sql="transform_load.sql",
        use_legacy_sql=False,
        params={
            'project_id': 'your_project_id',  # Replace with actual project ID
            'dataset_name': 'your_target_dataset',
            'source_dataset': 'your_source_dataset',
        },
    )
    end_task = DummyOperator(task_id='end')
    start_task >> bq_task >> end_task

SQL File

The SQL file (transform_load.sql) is stored in gs://<composer-bucket>/dags/ and uses Jinja templating for dynamic parameters:sql

sql
CREATE OR REPLACE TABLE `..transformed_table` AS
SELECT * FROM `..source_table` WHERE some_condition;
-- Additional statements as needed

Setup Instructions

Replace gs://<composer-bucket> with your Composer bucket . Update gcp_conn_id, project_id, dataset_name, and source_dataset in the DAG with actual values. Set a valid email address for email.

File Placement :

Upload the DAG to gs://<composer-bucket>/dags/my_sample_dag.py . Upload transform_load.sql to gs://<composer-bucket>/dags/transform_load.sql .

Test: Deploy the DAG to the Composer environment.

Verify in the Airflow UI for dags to parse correctly and appear, if the sql file is not placed at correct path then you will get jinja2.exception.TemplateNotFound error. Trigger the dag manually and validate, SQL script executes correctly and parameters are applied. Check logs for any errors during execution.

Conclusion

This Cloud Composer DAG provides a simple and effective way to run BigQuery SQL scripts with Jinja templating, enabling dynamic data transformations across projects or datasets.

tags: GCP - CloudComposer - Airflow - BigQuery