Brahma's Personal Blog

Writing about Data Engineering problems and SQL

View the Project on GitHub brahma19/blog

14 September 2024

Efficiently Load XML Files from GCS to BigQuery using Dataproc

by Brahmanand Singh

Efficiently Load XML Files from GCS to BigQuery using Dataproc using pyspark

XMLs files are still around and in telecom domain most of the network configs are still getting generated in xml and used by downstream systems, processing large volumes of XML data and loading it into a data warehouse like BigQuery is a common challenge. This blog post will guide you through creating a spark job (pyspark) that efficiently reads XML files from Google Cloud Storage (GCS), parses them, and loads the data directly into a date-partitioned BigQuery table. We’ll use Google Cloud Dataproc to run our PySpark job, ensuring scalability and performance.

The Challenge

Upstream is sending bunch of XML files into GCS bucket, each containing records with fields like ID, name, and date. Your task is to parse these files and load them into a date-partitioned BigQuery table. The catch? You need to handle high volumes of data efficiently, and load into Bigquery without using temporary gcs storage.

The Solution

We’ll use PySpark running on Google Cloud Dataproc to distribute the workload across a cluster. Our script will directly read from GCS and write to BigQuery using the BigQuery Storage Write API and using the direct method avoiding any intermediate storage.

Here’s our PySpark script that accomplishes this task:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_date
from pyspark.sql.types import StructType, StructField, StringType, DateType
import xml.etree.ElementTree as ET

# Create Spark session
spark = SparkSession.builder \
    .appName("XML to BigQuery Loader") \
    .config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.36.1") \
    .getOrCreate()

# Set up GCS configuration
spark.conf.set("google.cloud.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.service.account.json.keyfile", "/path/to/your/service-account-key.json")

# Configuration
project_id = "your-project-id"
bucket_name = "your-bucket-name"
dataset_id = "your-dataset-id"
table_id = "your-table-id"

# Define schema for the data
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    # Add more fields as needed
])

# UDF to parse XML content
@udf(returnType=schema)
def parse_xml(xml_content):
    root = ET.fromstring(xml_content)
    id = root.find(".//id").text if root.find(".//id") is not None else None
    name = root.find(".//name").text if root.find(".//name") is not None else None
    date = root.find(".//date").text if root.find(".//date") is not None else None
    # Add more fields as needed
    return (id, name, date)

# Read XML files from GCS
xml_files = f"gs://{bucket_name}/*.xml"
raw_data = spark.read.text(xml_files)

# Parse XML and create DataFrame
parsed_data = raw_data.select(parse_xml(col("value")).alias("parsed"))
df = parsed_data.select("parsed.*")

# Convert date string to date type
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Write to BigQuery using direct write method
df.write \
    .format("bigquery") \
    .option("table", f"{project_id}:{dataset_id}.{table_id}") \
    .option("writeMethod", "direct") \
    .option("partitionField", "date") \
    .option("partitionType", "DAY") \
    .mode("append") \
    .save()

print("Data loaded to BigQuery successfully using direct write method.")

# Stop Spark session
spark.stop()

Breaking Down the Script

Let’s walk through the key components of this script:

  1. Spark Session Setup: We create a Spark session with the necessary configurations for GCS and BigQuery connectors. Note that we’re using the latest version of the Spark BigQuery connector (0.36.1 as of this writing, but always check for the most recent version).

  2. Schema Definition: We define a schema that matches our XML data structure. This helps Spark understand the data types we’re working with.

  3. XML Parsing UDF: We create a User Defined Function (UDF) to parse each XML file. This function extracts the relevant fields from the XML structure.

  4. Reading from GCS: We use Spark’s text reader to read all XML files from our specified GCS bucket.

  5. Data Transformation: We apply our XML parsing UDF to create a structured DataFrame, then convert the date string to a proper date type.

  6. Writing to BigQuery: We use the BigQuery connector for Spark to write our data directly to BigQuery. We specify the direct write method and set up date partitioning.

Running the Script on Dataproc

To run this script on Google Cloud Dataproc:

  1. Create a Dataproc cluster with the necessary configurations.
  2. Upload the script to GCS.
  3. Submit the job to Dataproc using the following command:
gcloud dataproc jobs submit pyspark \
    --cluster=your-cluster-name \
    --region=your-region \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.36.1.jar \
    gs://path-to-your-script/xml_to_bigquery_direct.py

Note: Make sure to use the correct version of the Spark BigQuery connector in the --jars option. The version should match the one specified in your PySpark script.

Benefits of This Approach

Conclusion

Processing large volumes of XML data and loading it into BigQuery doesn’t have to be a daunting task. With PySpark and Google Cloud Dataproc, we can create an efficient, scalable solution that directly reads from GCS and writes to BigQuery. This approach allows us to handle high volumes of data while maintaining performance and cost-efficiency.

Remember to always test your script with a small dataset before running it on your entire data lake. Also, make sure to periodically check for updates to the Spark BigQuery connector, as newer versions may offer improved performance or additional features.

Happy coding!

tags: dataproc - pyspark - bigquery - xml