Brahma's Personal Blog

Writing about Data Engineering problems and SQL

View the Project on GitHub brahma19/blog

31 August 2024

Specify Dataproc serverless version when submitting jobs via composer

by Brahmanand Singh

Submitting Dataproc Serverless PySpark Jobs using Composer

Friday out of sudden I recieved multiple pings from a developer that his prod serverless jobs started failing suddenly and there is no code changes on his end, I told him that this must be due to the version upgrade that keeps happening on the serverless base images, and my guess was right, he is submitting the job via dag and not specified the runtime configs to keep the static image for his job.

In this post, we’ll walk through the process of creating an Apache Airflow DAG using Cloud Composer to submit a Dataproc Serverless PySpark job aand specify the runtime config and specify the version that is needed based on your workloads spark version. We’ll cover important aspects such as specifying a custom Customer-Managed Encryption Key (CMEK), subnetwork, metastore, and Persistent History Server (PHS), along with other crucial configurations.

Prerequisites

Before we dive in, make sure you have:

  1. A GCP project with Composer and Dataproc APIs enabled
  2. Appropriate IAM permissions to create and run Dataproc jobs
  3. A Cloud Composer environment set up
  4. A service account with necessary permissions
  5. A GCP connection set up in Airflow

The DAG Code

Let’s start by looking at the complete DAG code, and then we’ll break it down section by section.

from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
)
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

dag = DAG(
    'dataproc_serverless_pyspark_job',
    default_args=default_args,
    description='A DAG to submit a Dataproc Serverless PySpark job',
    schedule_interval=None,
    start_date=days_ago(1),
    tags=['dataproc', 'serverless', 'pyspark'],
)

# Replace these with your actual GCP project details
PROJECT_ID = 'your-project-id'
REGION = 'your-region'
SUBNET = 'projects/{project_id}/regions/{region}/subnetworks/{subnetwork}'
METASTORE_SERVICE = 'projects/{project_id}/locations/{region}/services/{metastore_service}'
PHS_SERVER = 'projects/{project_id}/regions/{region}/phsServers/{phs_server}'
KMS_KEY = 'projects/{project_id}/locations/{region}/keyRings/{key_ring}/cryptoKeys/{key}'
SERVICE_ACCOUNT = 'your-service-account@your-project-id.iam.gserviceaccount.com'
GCP_CONN_ID = 'your_gcp_connection_id'

create_batch = DataprocCreateBatchOperator(
    task_id='create_dataproc_pyspark_batch',
    project_id=PROJECT_ID,
    region=REGION,
    batch={
        'pyspark_batch': {
            'main_python_file_uri': 'gs://your-bucket/your-pyspark-script.py',
            'args': ['arg1', 'arg2'],  # Optional: Add arguments for your PySpark job
            'jar_file_uris': [
                # Uncomment the next line if you need a specific version of the BigQuery connector
                # 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar',
            ],
        },
        'environment_config': {
            'execution_config': {
                'subnetwork_uri': SUBNET,
                'kms_key': KMS_KEY,
                'service_account': SERVICE_ACCOUNT,
            },
            'peripherals_config': {
                'metastore_service': METASTORE_SERVICE,
                'spark_history_server_config': {
                    'dataproc_cluster': PHS_SERVER,
                },
            },
        },
        'runtime_config': {
            'version': '2.2',
            'properties': {
                # Uncomment and modify these if you need specific BigQuery connector configurations
                # 'spark.datasource.bigquery.project.id': PROJECT_ID,
                # 'spark.datasource.bigquery.temporaryGcsBucket': 'your-temporary-gcs-bucket',
            },
        },
    },
    batch_id='dataproc-serverless-pyspark-job--',
    gcp_conn_id=GCP_CONN_ID,
    dag=dag,
)

create_batch

Now, let’s break down the key components of this DAG.

DAG Configuration

dag = DAG(
    'dataproc_serverless_pyspark_job',
    default_args=default_args,
    description='A DAG to submit a Dataproc Serverless PySpark job',
    schedule_interval=None,
    start_date=days_ago(1),
    tags=['dataproc', 'serverless', 'pyspark'],
)

This section sets up the DAG configuration. We’ve set schedule_interval=None, which means this DAG will only run when triggered manually. Adjust this if you want the job to run on a schedule.

Project-Specific Variables

PROJECT_ID = 'your-project-id'
REGION = 'your-region'
SUBNET = 'projects/{project_id}/regions/{region}/subnetworks/{subnetwork}'
METASTORE_SERVICE = 'projects/{project_id}/locations/{region}/services/{metastore_service}'
PHS_SERVER = 'projects/{project_id}/regions/{region}/phsServers/{phs_server}'
KMS_KEY = 'projects/{project_id}/locations/{region}/keyRings/{key_ring}/cryptoKeys/{key}'
SERVICE_ACCOUNT = 'your-service-account@your-project-id.iam.gserviceaccount.com'
GCP_CONN_ID = 'your_gcp_connection_id'

Replace these variables with your actual GCP project details. These include:

Dataproc Job Configuration

The core of our DAG is the DataprocCreateBatchOperator, which submits the Dataproc Serverless job. Let’s break down its configuration:

PySpark Batch Configuration

'pyspark_batch': {
    'main_python_file_uri': 'gs://your-bucket/your-pyspark-script.py',
    'args': ['arg1', 'arg2'],  # Optional: Add arguments for your PySpark job
    'jar_file_uris': [
        # Uncomment the next line if you need a specific version of the BigQuery connector
        # 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar',
    ],
},

This section specifies the PySpark script to run and any arguments it needs. You can also specify additional JARs here if needed.

Environment Configuration

'environment_config': {
    'execution_config': {
        'subnetwork_uri': SUBNET,
        'kms_key': KMS_KEY,
        'service_account': SERVICE_ACCOUNT,
    },
    'peripherals_config': {
        'metastore_service': METASTORE_SERVICE,
        'spark_history_server_config': {
            'dataproc_cluster': PHS_SERVER,
        },
    },
},

This section configures the environment for the job, including:

Runtime Configuration

Version key specify the base image that will be used to run the workload, you can check the GCP release notes to get the compatibility matrix https://cloud.google.com/dataproc-serverless/docs/concepts/versions/spark-runtime-versions

'runtime_config': {
    'version': '2.2',
    'properties': {
        # Uncomment and modify these if you need specific BigQuery connector configurations
        # 'spark.datasource.bigquery.project.id': PROJECT_ID,
        # 'spark.datasource.bigquery.temporaryGcsBucket': 'your-temporary-gcs-bucket',
    },
},

Here, we specify the Dataproc runtime version and can set additional Spark properties if needed.

BigQuery Connector

Dataproc 2.2 comes with the BigQuery connector pre-installed, so you typically don’t need to specify it explicitly. However, if you need a specific version or configuration, you can uncomment and modify the relevant lines in the jar_file_uris and properties sections.

Conclusion

This DAG provides a robust setup for submitting Dataproc Serverless PySpark jobs using Cloud Composer. It includes configurations for custom networking, encryption, service accounts, and more, giving you fine-grained control over your job’s environment and execution.

Remember to replace all placeholder values with your actual GCP resource details before running this DAG. Also, ensure that your service account has the necessary permissions to access all specified resources and perform the required operations.

Happy data processing!

tags: dataproc - serverless - version