A Herd Of Rabbits Part 1: Postgres Change Capture

P
ostgres is no longer "just a database." It has become a data integration and distribution platform. It has hooks for integrating custom data types, data formats, remote data store integration, remote index support, a rich extension ecosystem, cascading logical replication facilities. It is practically an application server. A proverbial swiss army knife to say the least.

At the day job, we use postgres as the primary database. As communication platform (chat) we do a good deal of real-time whiz-bangery. Being that we are an early stage start-up, we try to follow the keep it simple, stupid (k.i.s.s.) approach. Do the simplest thing possible until it isn't simple anymore.  The simple approach to real time was a very manual and you might see something like this quite a bit:

async function doMessage(user, opts, db) {
  const {message, resource_id} = opts
  const txn = await db.transaction()
  try {
    const resource = await txn.query({
      text: 'INSERT INTO table ...'
    , values: [resource_id, message]
    })
    await tx.commit()
    realtime.publish(user, resource)
    return resource
  } catch (err) {
    txn.rollback()
    log(err)
    // handle errors
  }
  // Other assorted logics
}

This is simple - I can clearly see what it is trying to do. It adds a message by inserting it into a table, and publishes a message over some real-time channel. There are a couple of problems here. The biggest being that the act of adding a message and dispatching some real-time event are tied together. This innocent little function is a function that cannot really be reused. For example:

const doMessage = require('./do-message')

async function doThingAndAddMessage(user, opts, db) {
  const txn = await db.transaction()
  try {
    const result = await txn.query(...)
    await doMessage(user, {
      resource_id: result.id
    , message: opts.message
    }, txn)
    await someOtherThing(user, ...)
  } catch (err) {
    txn.rollback()
    log(err)
    // handle errors
  }
}

If someOtherThing fails, there is a pretty good chance the real-time message is still sent even though there is an error. This generally leads to people writing different but very similar functions and queries to get around this problem. When this happens, there is more than one function that can add messages, and they don't always do the real-time bits; which is another problem. At the end of the day, the basic pattern is pretty simple:

  1. modify something in the database
  2. Push some message to end-users in real-time telling them what happened so they don't have to reload pages.

Generally speaking these real-time messages tell end users what changed / how it changed. This is usually pretty application specific so it made sense in the simple phase to implement this logic in the code paths that handled user interactions. At some point we needed a to understand how things changed. The previous value and the current value. Which usually requires a read of the data before the write. You can also do this in a single query if you are using a database that implements MVCC, but it gets complicated and cumbersome.

We came to realize that the best way to de-couple and simplify these problems would be the event sourcing pattern. At the time, our applications were interacting with PostgreSQL so heavily, inverting the application to send writes to kafka, would have been too much of an undertaking. What we could do, however, is use PostgreSQL as an event source - set up debezium, funnel data into Kafka, layer on KSQL to pull data back together and sync it back into a database or some other process. That would be a pretty sustainable way to "do real-time".  

So, why not kafka connect? The idea is sound, but as a small team at a small start-up we had a number of issues with the approach.

  1. We aren't really comfortable with the java stack / ecosystem. Nor do we want to be
  2. We only had 1 devops guy and running kafka in production requires people
  3. We were already using RabbitMQ and had people with operational experience
  4. The number of moving parts, and complexity that kafka + kafka connect adds didn't seem worth it (added ~8 servers for a small cluster).
  5. We had a number of partitioned tables that made things complicated with debezium; topic per table partition which is hard to reason about down-stream

Postgres has facilities for sending messages to remote connections with LISTEN / NOTIFY. However, there are some scalability concerns there. Namely, it requires a single connection ( no pooling ). It also not supported by background workers, which can be a bit problematic. It isn't really intended for high scale messaging. Additionally, we were already using RabbitMQ and we wanted to keep our messaging there. As luck would have it there is a PostgreSQL extension for sending messages via RabbitMQ. With this single extension we can implement basic change capture and use PostgreSQL and RabbitMQ as a real time engine.  

A change capture message is fairly simple, it tells you where the change is coming from ( a table name or resource name ), and how it changed. The HOW can be thought of as a diff  of data. All we want to do is get this diff out of PostgreSQL and into some remote service automatically so we don't have to do it by hand all the time. Here is how we did it

PostgreSQL Setup

To start we need to install the amqp extension and the hstore extension, which ships with most distributions of PostgreSQL. More on this later - it will make more sense in a moment. The amqp package can be installed from a source build or from PGXN.

CREATE EXTENSION IF NOT EXISTS "amqp";
CREATE EXTENSION IF NOT EXISTS "hstore";
GRANT USAGE ON SCHEMA amqp to <USER>;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA amqp to <USER>;

The amqp extension will set up some table where config for your RabbitMQ cluster will go. The easiest thing to do is add these settings to your PostgreSQL config and read them in the DB setup migrations

## postgres.conf

amqp.host = rabbitmq # rabbit host hame
amqp.port = 5672
amqp.vhost = '/'
amqp.username = guest
amqp.password = guest
amqp.exchange = 'pg-messaging'

You can use the CURRENT_SETTING sql function to read arbitrary values out of the PostgreSQL config.

INSERT INTO amqp.broker (host, port, vhost, username, password)(
  SELECT
    COALESCE(current_setting('amqp.host'), NULL)
  , COALESCE(current_setting('amqp.port')::INT, NULL)
  , COALESCE(current_setting('amqp.vhost'), NULL)
  , COALESCE(current_setting('amqp.username'), NULL)
  , COALESCE(current_setting('amqp.password'), NULL)
);

If you have ever written a ROW level trigger for PostgreSQL, you are probably familiar with the special variables NEW and OLD which represent the previous state of the row being modified, and the current state of the row being modified. We convert this data to JSON, and send it to a RabbitMQ exchange. Our change capture message looks like this:

{
  "previous": Object // JSON representation of the previous row
, "current": Object // JSON representation of the current row
, "targets": Array //  A list of columns that actually changed
, "timestamp": Date // Date / time of the operation
, "operation": String // WHAT happened (insert, update, delete)
, "resource": String // An application specific name of the thing being chagned
, "tablename": String // The name of the table
, "routing_key": String // The rabbitmq routing key that was used
}

This gives us everything we need to know

  1. WHAT changed ( the operation, table + resource )
  2. WHEN it changed ( a timestamp )
  3. HOW it changed ( previous / current + targets )

Change Data Trigger

To make intelligent decisions in applications that may want to consume this. All that is left is to write the Trigger function to send it out into the world. I'm going to focus on the update operation case, because its the most interesting.

CREATE OR REPLACE FUNCTION AMQP_CHANGE_DATA_CAPTURE()
  RETURNS TRIGGER AS $$
    DECLARE
      routing_key_prefix TEXT
      routing_key
      resource TEXT
    BEGIN
      resource := COALESCE(TG_ARGV[0], TG_TABLE_NAME);
      routing_key_prefix := COALESCE(TG_ARGV[1], 'cdc');
      routing_key := LOWER(FORMAT('%s.%s.%s', routing_key_prefix, resource, TG_OP));
    
      IF TG_OP = 'INSERT' THEN
      -- SNIP
      RETURN NEW;
      END IF;

      IF TG_OP = 'UPDATE' THEN
        PERFORM amq.publish(
          1 -- id of broker in amqp.broker table
        , CURRENT_SETTING('amqp.exchange') -- from postgres.conf
        , routing_key
        , JSON_BUILD_OBJECT(
            'previous', ROW_TO_JSON(OLD)
          , 'current',  ROW_TO_JSON(NEW)
          , 'targets',  AKEYS(HSTORE(NEW) - HSTORE(OLD))
          , 'timestamp', CURRENT_TIMESTAMP
          , 'operation', LOWER(TG_OP)
          , 'resource', resource
          , 'table', TG_TABLE_NAME
          , 'routing_key', routing_key
          )::TEXT;
        RETURN NEW;
      END IF;

      IF TG_OP = 'DELETE' THEN
      -- SNIP
      RETURN NULL;
      END IF;

    END;
  $$ LANGUAGE plpgsql;

There are a couple of things to note here.

rabbitmq
, 'targets',  AKEYS(HSTORE(NEW) - HSTORE(OLD))
  • HSTORE is used because it implements subtract  hstore - hstore, which removes matching K/V pairs.  The AKEYS function just returns an array of keys of the hstore value. In other words, this gives us the columns that were actually changed.
resource := COALESCE(TG_ARGV[0], TG_TABLE_NAME);
  • TG_ARGV is an array of arguments passed to the trigger definition. In this case you can optionally pass a resource name and a routing key prefix if you want to route messages do a different destination. This solves our partitioned table problem. We can give them a common name. For example, Instead of _table_1 and _table_2 we can can name and handle them in code as just table.

trigger [trig'er] -n, noun

1. a device, as a lever, the pulling or pressing of which releases a detent or spring

2. procedural code that is automatically executed in response to certain events on a particular table

3. special stored procedure that is run when specific actions occur within a database.

With the trigger function defined, we can selectively add change capture to any table in our database. We can specify what events that trigger the messages ( or all of them ), what they are called and how they get routed around.

CREATE TRIGGER article_change_data_capture_trig
  AFTER INSERT OR DELETE OR UPDATE
  ON blog_articles
  FOR EACH ROW
    EXECUTE PROCEDURE AMQP_CHANGE_DATA_CAPTURE('article');

NOTE: Always use AFTER events for these triggers. This ensures that the message only is sent when the transaction succeeds.

Our RabbitMQ routing keys will look like this

  • cdc.article.insert
  • cdc.article.delete
  • cdc.article.update

Now for every insert, update and delete on the blog_articles table, A CDC messages will be sent to the pg-messaging exchange on our RabbitMQ server.  This falls inline with the Keep It Simple, Stupid mentality. Not only did we successfully avoid any additional servers, complicated infrastructure or expensive hosted services, it only took about ~80 lines of SQL.  There are certainly some trade-offs you'll need to consider with a set up like this:

Pros Cons
Low complexity Non-Standard extension
No additional servers Config requires a table
Uses SQL Additional networking to connect to amqp server
Low DB Overhead
Granular and selective
Trigger Based + replication friendly

The biggest downside here is that it makes use of a non-standard ( albeit a very stable one ) extension. In most cases, managed services ( AWS Aurora, Google Cloud SQL, etc. ) do not support these types of extensions, nor do they give you access to install them. If you are currently using or do not want to run your own database servers, this is likely not the most appropriate option.

With 1 trigger and 1 extension we've made a data firehose. In Part 2, we'll take a look at how to make the data work for us with a little bit of help from Node.js

rabbitmq postgres change-data-capture database herd-of-rabbits