Brahma's Personal Blog

Writing about Data Engineering problems and SQL

View the Project on GitHub brahma19/blog

13 April 2025

Real-Time Streaming Pipeline: RabbitMQ to Pub/Sub and BigQuery with Dynamic Scaling

by Brahmanand Singh

Real-time streaming pipelines are the backbone of modern data systems, enabling businesses to process high-velocity data like IoT events or user interactions with minimal latency. In this post, we’ll explore a scalable, fault-tolerant pipeline that streams messages from RabbitMQ to Google Cloud Pub/Sub and BigQuery, orchestrated by Google Cloud Dataproc and Apache Airflow (via Cloud Composer). Designed to run continuously and scale dynamically with RabbitMQ’s message rate (1–1000 msg/s), this pipeline navigates key limitations: BigQuery’s 10 MB streaming insert payload cap, Pub/Sub’s request rate constraints, and Apache Spark’s lack of a RabbitMQ connector. We’ll also cover how flow control optimizes Pub/Sub publishing for speed.

Why This Pipeline?

Our objective was to build a robust streaming system that:

By addressing these requirements, the pipeline delivers a practical solution for enterprise streaming workloads.

Architecture Overview

The pipeline integrates:

  1. RabbitMQ: Queues incoming messages at varying rates.
  2. Google Cloud Dataproc: Executes a Python script on a cluster master node to consume messages, batch them, and write to Pub/Sub and BigQuery.
  3. Google Cloud Pub/Sub: Buffers messages for downstream systems.
  4. BigQuery: Stores messages for querying.
  5. Cloud Composer (Airflow): Manages the continuous job, scaling the cluster based on queue length.

Messages are batched (up to 5000 messages or 10 MB) and acknowledged only after successful delivery to both Pub/Sub and BigQuery, ensuring at-least-once delivery.

Key Limitations and Solutions

Dataproc and Spark’s RabbitMQ Gap

Apache Spark, Dataproc’s core engine, lacks a native RabbitMQ connector, unlike its support for Kafka or Kinesis. This forces us to bypass Spark’s streaming capabilities, as RabbitMQ’s AMQP protocol isn’t directly compatible.

Solution: We deploy a custom Python script using the pika library on Dataproc’s master node, treating the cluster as a managed Python environment. This approach simplifies RabbitMQ integration but requires manual scaling logic, missing Spark’s distributed streaming optimizations.

BigQuery Streaming Insert: 10 MB Payload Limit

BigQuery’s streaming insert API caps individual requests at 10 MB, which can bottleneck high-throughput pipelines if not handled carefully. Exceeding this limit causes request failures, risking data loss or delays.

Solution: The pipeline enforces a 10 MB payload ceiling per batch, calculated by summing message sizes before sending. Batches are also capped at 5000 messages to balance throughput and latency, ensuring compliance with BigQuery’s limit while processing ~200–500 msg/s per instance.

Pub/Sub Request Rate Limits

Google Cloud Pub/Sub imposes quotas on publish requests (e.g., 10,000 requests per second per project, depending on configuration), which can throttle performance under high message rates. Sending one request per message is inefficient and risks hitting these limits.

Solution: We batch messages for Pub/Sub, sending up to 500 messages per request, reducing the request rate significantly. For example, a 5000-message batch requires only 10 Pub/Sub requests, staying well within quotas even at 1000 msg/s.

Flow Control for Pub/Sub Speedup

To optimize Pub/Sub publishing, we implement flow control via batch settings in the pubsub_v1.PublisherClient. By configuring max_messages=500, max_bytes=10 MB, and max_latency=1 second, we ensure messages are grouped efficiently:

This flow control minimizes API calls while maximizing throughput, enabling the pipeline to handle peak loads without hitting Pub/Sub’s rate limits.

Implementation Highlights

Processing Script

The Python script runs continuously on Dataproc, using pika to consume RabbitMQ messages. Key features include:

Code:

python
from google.cloud import bigquery, pubsub_v1
import pika
import logging
import sys
import socket
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from google.api_core import exceptions, retry

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

PROJECT_ID = 'your-project-id'
DATASET_ID = 'your-dataset-id'
TABLE_ID = 'your-table-id'
TOPIC_NAME = 'your-topic-name'
RABBITMQ_HOST = 'your-rabbitmq-host'
RABBITMQ_QUEUE = 'your-queue-name'
RABBITMQ_USERNAME = 'your-username'
RABBITMQ_PASSWORD = 'your-password'
RABBITMQ_PORT = <port>
MAX_BQ_PAYLOAD_SIZE = 10 * 1024 * 1024 #BQ hard limit for streaming API of 10MB
MAX_BQ_ROWS = 10000 #max number of messages to push in one batch
HEALTH_CHECK_INTERVAL = timedelta(minutes=5)

bigquery_client = bigquery.Client(project=PROJECT_ID)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_NAME)

@retry.Retry(predicate=retry.if_exception_type(exceptions.ServerError, exceptions.GoogleAPIError, socket.error))
def insert_into_bigquery(messages):
    try:
        rows_to_insert = [{"source_payload": msg.decode('utf-8')} for _, msg in messages]
        table = bigquery_client.get_table(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")
        errors = bigquery_client.insert_rows_json(table, rows_to_insert)
        if errors:
            raise Exception(f"Errors while inserting rows: {errors}")
        logger.info(f"Inserted {len(messages)} messages into BigQuery.")
        return True
    except (BrokenPipeError, exceptions.GoogleAPIError) as e:
        logger.error(f"BigQuery transient error: {e}")
        raise
    except Exception as e:
        logger.error(f"BigQuery non-retryable error: {e}")
        return False

def publish_to_pubsub(messages):
    try:
        batch_settings = pubsub_v1.types.BatchSettings(max_messages=500, max_bytes=10 * 1024 * 1024, max_latency=1)
        publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)
        futures = [publisher.publish(topic_path, body) for _, body in messages]
        for future in futures:
            future.result()
        logger.info(f"Published {len(messages)} messages to Pub/Sub.")
        return True
    except Exception as e:
        logger.error(f"Error publishing to Pub/Sub: {e}")
        return False

def get_queue_length(channel):
    try:
        queue_info = channel.queue_declare(queue=RABBITMQ_QUEUE, passive=True)
        return queue_info.method.message_count
    except Exception as e:
        logger.error(f"Failed to get queue length: {e}")
        return 0

def process_batch(ch, messages, message_count):
    try:
        if not message_count:
            return True
        batch_to_process = messages[:message_count]
        with ThreadPoolExecutor(max_workers=2) as executor:
            future_bq = executor.submit(insert_into_bigquery, batch_to_process)
            future_pubsub = executor.submit(publish_to_pubsub, batch_to_process)
            wait([future_bq, future_pubsub], return_when=ALL_COMPLETED)
            bq_success = future_bq.result()
            pubsub_success = future_pubsub.result()
            if bq_success and pubsub_success:
                ch.basic_ack(delivery_tag=batch_to_process[-1][0].delivery_tag, multiple=True)
                return True
            else:
                ch.basic_nack(delivery_tag=batch_to_process[-1][0].delivery_tag, multiple=True, requeue=True)
                return False
    except Exception as e:
        logger.error(f"Error processing batch: {e}")
        if message_count > 0:
            ch.basic_nack(delivery_tag=messages[message_count - 1][0].delivery_tag, multiple=True, requeue=True)
        return False

def main():
    credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials, heartbeat=600))
    channel = connection.channel()
    channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)

    max_batch_size = 5000
    messages = [None] * max_batch_size
    message_count = 0
    base_batch_size = 100
    batch_timeout = timedelta(seconds=30)
    last_queue_check = datetime.now()
    last_health_check = datetime.now()

    channel.basic_qos(prefetch_count=base_batch_size)
    while True:
        try:
            total_payload_size = 0
            batch_start_time = datetime.now()
            if (datetime.now() - last_queue_check) >= timedelta(hours=1):
                queue_length = get_queue_length(channel)
                batch_size = min(max(base_batch_size, queue_length // 10), max_batch_size, MAX_BQ_ROWS)
                channel.basic_qos(prefetch_count=batch_size)
                logger.info(f"Adjusted batch size to {batch_size} for queue length {queue_length}")
                # Signal scaling needs (e.g., via Pub/Sub or file)
                if queue_length > 10000:  # ~500 msg/s threshold
                    logger.info("High queue length detected; scale up recommended.")
                elif queue_length < 2000:  # ~100 msg/s
                    logger.info("Low queue length detected; scale down possible.")
                last_queue_check = datetime.now()

            while message_count < batch_size and total_payload_size < MAX_BQ_PAYLOAD_SIZE and (datetime.now() - batch_start_time) < batch_timeout:
                for method, properties, body in channel.consume(RABBITMQ_QUEUE, inactivity_timeout=1):
                    if method is None:
                        break
                    message_size = len(body)
                    if total_payload_size + message_size <= MAX_BQ_PAYLOAD_SIZE and message_count < batch_size:
                        messages[message_count] = (method, body)
                        message_count += 1
                        total_payload_size += message_size
                    if message_count >= batch_size or total_payload_size >= MAX_BQ_PAYLOAD_SIZE or (datetime.now() - batch_start_time) >= batch_timeout:
                        break

            if message_count > 0:
                process_batch(channel, messages, message_count)
                message_count = 0

            if (datetime.now() - last_health_check) >= HEALTH_CHECK_INTERVAL:
                logger.info("Health check: Consumer running.")
                last_health_check = datetime.now()

        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            channel.close()
            connection.close()
            sys.exit(1)

if __name__ == "__main__":
    main()

Conclusion

This real-time streaming pipeline from RabbitMQ to Google Cloud Pub/Sub and BigQuery delivers a robust solution for handling dynamic message rates while navigating critical constraints. By capping batches at BigQuery’s 10 MB streaming limit and leveraging Pub/Sub’s flow control with batched publishing, it ensures efficient, high-throughput processing without hitting request rate quotas. The use of a custom Python script on Dataproc sidesteps Spark’s lack of a RabbitMQ connector, while Airflow’s dynamic scaling keeps costs in check. Whether you’re managing IoT data or user events, this pipeline offers a scalable, reliable blueprint. Deploy it, fine-tune the scaling thresholds, and watch it adapt to your workload—then share your results with the community!

tags: GCP - PubSub - Dataproc - Realtimestreaming - BigQuery - Airflow