Writing about Data Engineering problems and SQL
by Brahmanand Singh
Real-time streaming pipelines are the backbone of modern data systems, enabling businesses to process high-velocity data like IoT events or user interactions with minimal latency. In this post, we’ll explore a scalable, fault-tolerant pipeline that streams messages from RabbitMQ to Google Cloud Pub/Sub and BigQuery, orchestrated by Google Cloud Dataproc and Apache Airflow (via Cloud Composer). Designed to run continuously and scale dynamically with RabbitMQ’s message rate (1–1000 msg/s), this pipeline navigates key limitations: BigQuery’s 10 MB streaming insert payload cap, Pub/Sub’s request rate constraints, and Apache Spark’s lack of a RabbitMQ connector. We’ll also cover how flow control optimizes Pub/Sub publishing for speed.
Our objective was to build a robust streaming system that:
By addressing these requirements, the pipeline delivers a practical solution for enterprise streaming workloads.
The pipeline integrates:
Messages are batched (up to 5000 messages or 10 MB) and acknowledged only after successful delivery to both Pub/Sub and BigQuery, ensuring at-least-once delivery.
Apache Spark, Dataproc’s core engine, lacks a native RabbitMQ connector, unlike its support for Kafka or Kinesis. This forces us to bypass Spark’s streaming capabilities, as RabbitMQ’s AMQP protocol isn’t directly compatible.
Solution: We deploy a custom Python script using the pika
library on Dataproc’s master node, treating the cluster as a managed Python environment. This approach simplifies RabbitMQ integration but requires manual scaling logic, missing Spark’s distributed streaming optimizations.
BigQuery’s streaming insert API caps individual requests at 10 MB, which can bottleneck high-throughput pipelines if not handled carefully. Exceeding this limit causes request failures, risking data loss or delays.
Solution: The pipeline enforces a 10 MB payload ceiling per batch, calculated by summing message sizes before sending. Batches are also capped at 5000 messages to balance throughput and latency, ensuring compliance with BigQuery’s limit while processing ~200–500 msg/s per instance.
Google Cloud Pub/Sub imposes quotas on publish requests (e.g., 10,000 requests per second per project, depending on configuration), which can throttle performance under high message rates. Sending one request per message is inefficient and risks hitting these limits.
Solution: We batch messages for Pub/Sub, sending up to 500 messages per request, reducing the request rate significantly. For example, a 5000-message batch requires only 10 Pub/Sub requests, staying well within quotas even at 1000 msg/s.
To optimize Pub/Sub publishing, we implement flow control via batch settings in the pubsub_v1.PublisherClient
. By configuring max_messages=500
, max_bytes=10 MB
, and max_latency=1 second
, we ensure messages are grouped efficiently:
This flow control minimizes API calls while maximizing throughput, enabling the pipeline to handle peak loads without hitting Pub/Sub’s rate limits.
The Python script runs continuously on Dataproc, using pika
to consume RabbitMQ messages. Key features include:
python
from google.cloud import bigquery, pubsub_v1
import pika
import logging
import sys
import socket
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from google.api_core import exceptions, retry
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
PROJECT_ID = 'your-project-id'
DATASET_ID = 'your-dataset-id'
TABLE_ID = 'your-table-id'
TOPIC_NAME = 'your-topic-name'
RABBITMQ_HOST = 'your-rabbitmq-host'
RABBITMQ_QUEUE = 'your-queue-name'
RABBITMQ_USERNAME = 'your-username'
RABBITMQ_PASSWORD = 'your-password'
RABBITMQ_PORT = <port>
MAX_BQ_PAYLOAD_SIZE = 10 * 1024 * 1024 #BQ hard limit for streaming API of 10MB
MAX_BQ_ROWS = 10000 #max number of messages to push in one batch
HEALTH_CHECK_INTERVAL = timedelta(minutes=5)
bigquery_client = bigquery.Client(project=PROJECT_ID)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_NAME)
@retry.Retry(predicate=retry.if_exception_type(exceptions.ServerError, exceptions.GoogleAPIError, socket.error))
def insert_into_bigquery(messages):
try:
rows_to_insert = [{"source_payload": msg.decode('utf-8')} for _, msg in messages]
table = bigquery_client.get_table(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")
errors = bigquery_client.insert_rows_json(table, rows_to_insert)
if errors:
raise Exception(f"Errors while inserting rows: {errors}")
logger.info(f"Inserted {len(messages)} messages into BigQuery.")
return True
except (BrokenPipeError, exceptions.GoogleAPIError) as e:
logger.error(f"BigQuery transient error: {e}")
raise
except Exception as e:
logger.error(f"BigQuery non-retryable error: {e}")
return False
def publish_to_pubsub(messages):
try:
batch_settings = pubsub_v1.types.BatchSettings(max_messages=500, max_bytes=10 * 1024 * 1024, max_latency=1)
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)
futures = [publisher.publish(topic_path, body) for _, body in messages]
for future in futures:
future.result()
logger.info(f"Published {len(messages)} messages to Pub/Sub.")
return True
except Exception as e:
logger.error(f"Error publishing to Pub/Sub: {e}")
return False
def get_queue_length(channel):
try:
queue_info = channel.queue_declare(queue=RABBITMQ_QUEUE, passive=True)
return queue_info.method.message_count
except Exception as e:
logger.error(f"Failed to get queue length: {e}")
return 0
def process_batch(ch, messages, message_count):
try:
if not message_count:
return True
batch_to_process = messages[:message_count]
with ThreadPoolExecutor(max_workers=2) as executor:
future_bq = executor.submit(insert_into_bigquery, batch_to_process)
future_pubsub = executor.submit(publish_to_pubsub, batch_to_process)
wait([future_bq, future_pubsub], return_when=ALL_COMPLETED)
bq_success = future_bq.result()
pubsub_success = future_pubsub.result()
if bq_success and pubsub_success:
ch.basic_ack(delivery_tag=batch_to_process[-1][0].delivery_tag, multiple=True)
return True
else:
ch.basic_nack(delivery_tag=batch_to_process[-1][0].delivery_tag, multiple=True, requeue=True)
return False
except Exception as e:
logger.error(f"Error processing batch: {e}")
if message_count > 0:
ch.basic_nack(delivery_tag=messages[message_count - 1][0].delivery_tag, multiple=True, requeue=True)
return False
def main():
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials, heartbeat=600))
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
max_batch_size = 5000
messages = [None] * max_batch_size
message_count = 0
base_batch_size = 100
batch_timeout = timedelta(seconds=30)
last_queue_check = datetime.now()
last_health_check = datetime.now()
channel.basic_qos(prefetch_count=base_batch_size)
while True:
try:
total_payload_size = 0
batch_start_time = datetime.now()
if (datetime.now() - last_queue_check) >= timedelta(hours=1):
queue_length = get_queue_length(channel)
batch_size = min(max(base_batch_size, queue_length // 10), max_batch_size, MAX_BQ_ROWS)
channel.basic_qos(prefetch_count=batch_size)
logger.info(f"Adjusted batch size to {batch_size} for queue length {queue_length}")
# Signal scaling needs (e.g., via Pub/Sub or file)
if queue_length > 10000: # ~500 msg/s threshold
logger.info("High queue length detected; scale up recommended.")
elif queue_length < 2000: # ~100 msg/s
logger.info("Low queue length detected; scale down possible.")
last_queue_check = datetime.now()
while message_count < batch_size and total_payload_size < MAX_BQ_PAYLOAD_SIZE and (datetime.now() - batch_start_time) < batch_timeout:
for method, properties, body in channel.consume(RABBITMQ_QUEUE, inactivity_timeout=1):
if method is None:
break
message_size = len(body)
if total_payload_size + message_size <= MAX_BQ_PAYLOAD_SIZE and message_count < batch_size:
messages[message_count] = (method, body)
message_count += 1
total_payload_size += message_size
if message_count >= batch_size or total_payload_size >= MAX_BQ_PAYLOAD_SIZE or (datetime.now() - batch_start_time) >= batch_timeout:
break
if message_count > 0:
process_batch(channel, messages, message_count)
message_count = 0
if (datetime.now() - last_health_check) >= HEALTH_CHECK_INTERVAL:
logger.info("Health check: Consumer running.")
last_health_check = datetime.now()
except Exception as e:
logger.error(f"Unexpected error: {e}")
channel.close()
connection.close()
sys.exit(1)
if __name__ == "__main__":
main()
This real-time streaming pipeline from RabbitMQ to Google Cloud Pub/Sub and BigQuery delivers a robust solution for handling dynamic message rates while navigating critical constraints. By capping batches at BigQuery’s 10 MB streaming limit and leveraging Pub/Sub’s flow control with batched publishing, it ensures efficient, high-throughput processing without hitting request rate quotas. The use of a custom Python script on Dataproc sidesteps Spark’s lack of a RabbitMQ connector, while Airflow’s dynamic scaling keeps costs in check. Whether you’re managing IoT data or user events, this pipeline offers a scalable, reliable blueprint. Deploy it, fine-tune the scaling thresholds, and watch it adapt to your workload—then share your results with the community!
tags: GCP - PubSub - Dataproc - Realtimestreaming - BigQuery - Airflow