A Herd Of Rabbits Part 2: RabbitMQ Data Pipelines

R

abbitMQ is a powerful message borker allowing engineers to implement complex messaging topologies with relative ease. At the day job we used RabbitMQ as the backbone of our real time data infrastructure. In the previous post we setup a simple PostgreSQL trigger to send change capture messages to a RabbitMQ exchange. Conceptually, this is where we left off:

In this early stage, we basically have a fire-hose that we can selectively tap into. But we have no way to control the flow of data.

To recap a bit before we get too deep, we had a simple and manual way of handling real time operations. Effectively, we just baked all of the logic in the specific application code path. This became complex and difficult to manage. We wanted to automate as much as possible and stop doing it manually.

We had 3 primary goals:

  1. Continue sending writes to PG + not manage "real-time" inline
  2. Capture and funnel changes through a single point
  3. Apply various actions in response to the changes

Staying with the articles theme of the previous post we can talk in terms of some basic business rules

  • Anytime someone adds  or modifies an article we want to cache its current state
  • Anytime someone adds or modifies an article we want to push a websocket message
  • Anytime someone adds a comment to an article, we want to push a websocket message
  • When an article goes live the first time, we want to send some data to a reporting system.
RabbitMQ Kafka
Smart Server + Dumb Consumers Dumb Server + Smart Consumers
No long term persistence Messages stored indefinitely

Topic Routing

RabbitMQ takes the Smart Server / Dumb Client approach. Most of your interactions with rabbit are with the server. Creating resources, configuring them, stitching them together, etc. It has two basic constructs, exchange and queue and provides a way to bind them together in interesting ways.

Exchanges allow for complex message routing patterns like pub-sub, pattern matching and point-to-point. It is not uncommon to see rather elaborate server setups. RabbitMQ clients read messages from a queue, but it is possible for a message to arrive at a queue from multiple sources. RabbitMQ server do not have any other dependencies.

Think of an exchange as a post office, and a queue as a mailbox. In the case of a topic exchange, a routing pattern can be used to define how messages are routed to queues based on a routing key. Routing keys are words separated by a period ( . ). They can also contain positional wildcards ( * ) or a pound ( # ) which will match zero or more words

For our purposes, this results in having to write a lot less code. Most of what we are going to be building sits on top of RabbitMQ topic routing. In a nutshell, topic routing allows us to route a single message through multiple exchanges and arrive at multiple queues for consumers to handle.  In our setup, PostgreSQL publishes everything the single pg-messaging exchange, and all we need to do is define how things get routed around and what needs to happen when a consumer receives a message.

Realtime Engine

Internally, we referred to the system that defined this logic as the Realtime Engine (RTE).  It is collection of "pipeline" instances. Each pipeline is a RabbitMQ connection that holds one or more node.js streams housed in a Node.js Pipeline instance. The pipeline receives a message from a RabbitMQ Queue, and sends it through its streams.

AMQPConnection+Array queues+connect()+consume()+close()+shutdown()+assertExchange()+assertQueue()+publish()RTEPipeline+String name+TransformStream[] streams+start()+end()+write()+onFinish()TransformStream+boolean readable+boolean writable+_transform()

A Simplified RTE pipeline class looks like this:

const {pipeline, Writable, Passthrough} = require('stream')

class RTEPipeline extends RabbitMQConnection {
  constructor(opts) {
    super(opts)
    if (!Array.isArray(opts.streams)) {
      throw new TypeError(
        'streams is required and must be an array'
      )
    }
     
    // setup exchanges
    // setup queues
    
    this.writer = new PassThrough()
    this.pipeline = pipeline(
      this.writer
    , ...this.opts.streams
    , new Writable({objectMode: true, write: this._drain.bind(this)})
    , this.onFinish.bind(this)
    )
    
    for(const queue of this.opts.queues) {
      queue.onMessage(this._onMessage.bind(this)
    }
  }

  write(buffer) {
    if (buffer === undefined) return true
    return this.writer.write(buffer)
  }

  onFinish(err) {
    // called when pipeline finishes or errors
    if (err) {
      console.error(err)
    } else {
      console.log(`${this.name} pipeline shutdown success`)
    }
  }

  _onMessage(buffer, msg) {
    this.ack(msg)
    return this.write(buffer)
  }

  _drain(buffer, enc, cb) {
    // things to do as the last stage of the pipeline
    // metrics
    // logging
    // other clean up
    
    cb()
  }
}
  1. Sets up any declared RabbitMQ Exchanges + bindings
  2. Given an array of Duplex Streams, create a stream pipeline. A dummy stream is used as the writable stream to push messages to as the come in from RabbitMQ. Add a single write stream to the end of the pipeline to drain the pipeline
  3. Sets up any declared queues, and adds a handler
  4. Adds all queue bindings from the primary exchange to each of the queues.

In practice, a pipeline instance would look like this:

'use strict'
const Pipeline = require('./pipeline')
const {Passthrough} = require('stream')

new Pipeline({
  name: 'CRUD-Pipeline'
, exchange: 'cdc'
, exchange_routing: { // exchange (pg-messaging) <> exchange (cdc) routing
    from: 'pg-messaging'
  , to: 'rte-cdc'
  , pattern: 'cdc.*.*'
  }  
, queues: [{
    name: 'rte-pipeline-noop'
  , pattern: [ // cdc -> rte-pipeline-crud bindings
      'cdc.article.*'
    , 'cdc.author.*'
    , 'cdc.comment.*'
    ]
  }]
, streams: [
    new Passthrough()
  , new Passthrough()
  , new Passthrough()
  ]
})

This does the following:

  1. Create 2 exchanges if they do not exist cdc and pg-messaging.
  2. Route all messages with a cdc routing key prefix to cdc exchange
  3. Set up and consume from a queue named rte-pipeline-crud
  4. Route all CDC messages for article, author, and comment resource to the rte-pipeline-crud queue.
  5. Push every message received through each of the defined streams.

In this case all of the streams are pass-through streams, so nothing actually happens. Visually, this snippet of code looks like this.

Real Time Engine
node-pipeline-1
SQL Trigger
cdc.*.*
cdc.article.*
cdc.author.*
cdc.comment.*
cdc
pg-messaging
rte-pipeline-crud
CRUD-Pipeline
stream1
stream2
stream3
PostgreSQL
RabbitMQ

Here is a slightly more involved and concrete setup to illustrate some use cases a bit better:

'use strict'

new Pipeline({
  name: 'crud-Pipeline'
, queues: [{
    name: 'rte-pipeline-crud'
  , pattern: [
      'cdc.article.*'
    , 'cdc.author.*'
    , 'cdc.comment.*'
    ]
  }]
, streams: [
    new AMQPParseStream()
  , new CacheStream(['article', 'comment'])
  , new ParallelStream([
      new EmailStream(['author'])
    , new GraphQLSubscriptionStream(['article', 'comment'])
    ])
  ]
})
  1. Parse the incoming message
  2. Cache all articles + comments
  3. Send an update message via GraphQL Subscriptions for article + comment updates
  4. Send and email notification to the authors.

This is also very easy to scale out. By running more processes with the same pipelines, we get more consumers on the same queues which gives use higher throughput and more workers to process messages.

Real Time Engine
node-pipeline-1
node-pipeline-2
node-pipeline-3
SQL Trigger
cdc.*.*
cdc.#
cdc.article.*
cdc.author.*
cdc.comment.*
cdc.login.*
cdc.abuse.*
cdc.article.*
cdc.#
cdc
pg-messaging
catchall
rte-pipeline-crud
rte-pipeline-reporting
rte-pipeline-debug
DEBUG-Pipeline
stream7
REPORT-Pipeline
stream4
stream5
stream6
CRUD-Pipeline
stream1
stream2
stream3
CRUD-Pipeline
stream1
stream2
stream3
PostgreSQL
RabbitMQ

With the Pipeline class we are able to automate real time data workflows in a rather modular way. More importantly, it frees us from having to juggle business logic, database transactions and real time messaging logic throughout the codebase. Once a pipeline is setup, we concern ourselves with the business logic; anytime a particular database record is touched (successfully), it will automatically trigger all of the associated data pipelines.

In the next part we'll closer look at writing streams to handle data.