A Herd Of Rabbits Part 2: RabbitMQ Data Pipelines
abbitMQ is a powerful message borker allowing engineers to implement complex messaging topologies with relative ease. At the day job we used RabbitMQ as the backbone of our real time data infrastructure. In the previous post we setup a simple PostgreSQL trigger to send change capture messages to a RabbitMQ exchange. Conceptually, this is where we left off:
In this early stage, we basically have a fire-hose that we can selectively tap into. But we have no way to control the flow of data.
To recap a bit before we get too deep, we had a simple and manual way of handling real time operations. Effectively, we just baked all of the logic in the specific application code path. This became complex and difficult to manage. We wanted to automate as much as possible and stop doing it manually.
We had 3 primary goals:
- Continue sending writes to PG + not manage "real-time" inline
- Capture and funnel changes through a single point
- Apply various actions in response to the changes
Staying with the articles
theme of the previous post we can talk in terms of some basic business rules
- Anytime someone adds or modifies an article we want to cache its current state
- Anytime someone adds or modifies an article we want to push a websocket message
- Anytime someone adds a comment to an article, we want to push a websocket message
- When an article goes live the first time, we want to send some data to a reporting system.
RabbitMQ | Kafka |
---|---|
Smart Server + Dumb Consumers | Dumb Server + Smart Consumers |
No long term persistence | Messages stored indefinitely |
Topic Routing
RabbitMQ takes the Smart Server / Dumb Client approach. Most of your interactions with rabbit are with the server. Creating resources, configuring them, stitching them together, etc. It has two basic constructs, exchange
and queue
and provides a way to bind them together in interesting ways.
Exchanges allow for complex message routing patterns like pub-sub, pattern matching and point-to-point. It is not uncommon to see rather elaborate server setups. RabbitMQ clients read messages from a queue, but it is possible for a message to arrive at a queue from multiple sources. RabbitMQ server do not have any other dependencies.
Think of an exchange as a post office, and a queue as a mailbox. In the case of a topic exchange, a routing pattern can be used to define how messages are routed to queues based on a routing key
. Routing keys are words separated by a period ( .
). They can also contain positional wildcards ( *
) or a pound ( #
) which will match zero or more words
For our purposes, this results in having to write a lot less code. Most of what we are going to be building sits on top of RabbitMQ topic routing. In a nutshell, topic routing allows us to route a single message through multiple exchanges and arrive at multiple queues for consumers to handle. In our setup, PostgreSQL publishes everything the single pg-messaging
exchange, and all we need to do is define how things get routed around and what needs to happen when a consumer receives a message.
Realtime Engine
Internally, we referred to the system that defined this logic as the Realtime Engine (RTE). It is collection of "pipeline" instances. Each pipeline is a RabbitMQ connection that holds one or more node.js streams housed in a Node.js Pipeline instance. The pipeline receives a message from a RabbitMQ Queue, and sends it through its streams.
A Simplified RTE pipeline class looks like this:
const {pipeline, Writable, Passthrough} = require('stream')
class RTEPipeline extends RabbitMQConnection {
constructor(opts) {
super(opts)
if (!Array.isArray(opts.streams)) {
throw new TypeError(
'streams is required and must be an array'
)
}
// setup exchanges
// setup queues
this.writer = new PassThrough()
this.pipeline = pipeline(
this.writer
, ...this.opts.streams
, new Writable({objectMode: true, write: this._drain.bind(this)})
, this.onFinish.bind(this)
)
for(const queue of this.opts.queues) {
queue.onMessage(this._onMessage.bind(this)
}
}
write(buffer) {
if (buffer === undefined) return true
return this.writer.write(buffer)
}
onFinish(err) {
// called when pipeline finishes or errors
if (err) {
console.error(err)
} else {
console.log(`${this.name} pipeline shutdown success`)
}
}
_onMessage(buffer, msg) {
this.ack(msg)
return this.write(buffer)
}
_drain(buffer, enc, cb) {
// things to do as the last stage of the pipeline
// metrics
// logging
// other clean up
cb()
}
}
- Sets up any declared RabbitMQ Exchanges + bindings
- Given an array of Duplex Streams, create a stream pipeline. A dummy stream is used as the writable stream to push messages to as the come in from RabbitMQ. Add a single write stream to the end of the pipeline to drain the pipeline
- Sets up any declared queues, and adds a handler
- Adds all queue bindings from the primary exchange to each of the queues.
In practice, a pipeline instance would look like this:
'use strict'
const Pipeline = require('./pipeline')
const {Passthrough} = require('stream')
new Pipeline({
name: 'CRUD-Pipeline'
, exchange: 'cdc'
, exchange_routing: { // exchange (pg-messaging) <> exchange (cdc) routing
from: 'pg-messaging'
, to: 'rte-cdc'
, pattern: 'cdc.*.*'
}
, queues: [{
name: 'rte-pipeline-noop'
, pattern: [ // cdc -> rte-pipeline-crud bindings
'cdc.article.*'
, 'cdc.author.*'
, 'cdc.comment.*'
]
}]
, streams: [
new Passthrough()
, new Passthrough()
, new Passthrough()
]
})
This does the following:
- Create 2 exchanges if they do not exist
cdc
andpg-messaging
. - Route all messages with a
cdc
routing key prefix tocdc
exchange - Set up and consume from a queue named
rte-pipeline-crud
- Route all CDC messages for
article
,author
, andcomment
resource to therte-pipeline-crud
queue. - Push every message received through each of the defined streams.
In this case all of the streams are pass-through streams, so nothing actually happens. Visually, this snippet of code looks like this.
Here is a slightly more involved and concrete setup to illustrate some use cases a bit better:
'use strict'
new Pipeline({
name: 'crud-Pipeline'
, queues: [{
name: 'rte-pipeline-crud'
, pattern: [
'cdc.article.*'
, 'cdc.author.*'
, 'cdc.comment.*'
]
}]
, streams: [
new AMQPParseStream()
, new CacheStream(['article', 'comment'])
, new ParallelStream([
new EmailStream(['author'])
, new GraphQLSubscriptionStream(['article', 'comment'])
])
]
})
- Parse the incoming message
- Cache all articles + comments
- Send an update message via GraphQL Subscriptions for article + comment updates
- Send and email notification to the authors.
This is also very easy to scale out. By running more processes with the same pipelines, we get more consumers on the same queues which gives use higher throughput and more workers to process messages.
With the Pipeline
class we are able to automate real time data workflows in a rather modular way. More importantly, it frees us from having to juggle business logic, database transactions and real time messaging logic throughout the codebase. Once a pipeline is setup, we concern ourselves with the business logic; anytime a particular database record is touched (successfully), it will automatically trigger all of the associated data pipelines.
In the next part we'll closer look at writing streams to handle data.