Custom Transports For Skyring

S

kyring is a distributed system for managing timers. When a timer lapses, a message is delivered to destination that you have defined. *How* that message is delivered is configurable. Out of the box, Skyring comes with an `HTTP` transport, and there is an official package enabling tcp delivery of messages with connection pooling. They are pretty easy to write, and you can use any of the tools you are currently used to using.

STDOUT Transport

To illustrate the process, we're going to make a simple transport handler to write the data to stdout. Basically, speaking a transport is just a node.js module that exports a named function

Module [ˈmäjo͞ol] -n., --noun

any of a number of distinct but interrelated units from which a program may be built up or into which a complex activity may be analyzed.

'use strict'

const os = require('os')

module.exports = function stdout(method, url, payload, id, storage) {
  // deliver the message
  process.stdout.write(payload);
  process.stdout(os.EOL);

  // clear the timer
  storage.remove(id);
};

Pretty simple. We just write the data to the stdout out stream attached to the process, of course, be sure to remove the timer reference from the skyring internal storage. To load your transport into a skyring server, you can pass an array of transports when instantiating as server instance. The array can contain references to the transport itself, or as string that can be passed to require

'use strict'

const Skyring = require('skyring')
const stdout = require('./transports/stdout')
const server = new Skyring({
  transports: [require('./transports/stdout')]
, seeds: ['localhost:3455']
})

server.load().listen(3000)

Done. Just be sure that every skyring node in the cluster has all of the same transports loaded so they have the capability to execute all of the timers. Other than that, we can start using our new transport by referencing it by name in the transport field of the request to create a new timer.

curl -XPOST http://localhost:3000/timer -H 'Content-Type: application/json' -d '{
  "timeout":3000
, "data":"hello world!"
, "callback": {
    "transport": "stdout"
  , "method":"unused"
  , "uri": "unused"
  }
}'

ZMQ Transport

We've got our feet wet with this simple example. Let's expand a little bit. If you've read through the summer of sockets, you may have noticed that I am fond of ZeroMQ. So instead of just writing to stdout, let's write a transport for zmq. I would want to be able to have it send timer notifications to 1 or more servers, and specify a message distribution patterns (pub, sub, push, pull, etc).

ZeroMQ [ˈzirō emkyo͞o] -n., --noun

A messaging library, which allows you to design a complex communication system without much effort.

For starters, lets start a new npm package.

$ mkdir -p skyring-zmq-transport/lib
$ cd skyring-zmq-transport
$ touch index.js lib/zmq.js README.md
$ npm init
$ npm install zmq debug --save
$ npm install skyring --save-dev

You should have something this

|-- lib
|   `-- zmq.js
|-- package.json
|-- README.md
`-- index.js

Lets start easy and fill in the primary transport handler

'use strict'

const ZMQ = require('zmq')
const debug = require('debug')('skyring:transports:zmq')

module.exports = function zmq(method, url, payload, id, storage) {
  debug('execute zmq timer', 'timeout', payload)

  const conn = ZMQ.socket('push')
  conn.connect(url)
  conn.send('timeout', ZMQ.ZMQ_SNDMORE)
  conn.send(payload)
  storage.remove(id);
}

A simplistic start, but it does complete the basic task. When a timer lapses and was configured to use the zmq transport ( the name of our function ), we'll make a new zmq socket, connect to the requested url, and send the payload. The first problem with this is it will make a new connection for every timer execution. We really need to keep track of previously created connections. We can do that pretty easily with a Map.

'use strict'

const ZMQ = require('zmq')
const debug = require('debug')('skyring:transports:zmq')
const connections = new Map()

module.exports = function zmq(method, url, payload, id, storage) {
  debug('execute zmq timer', 'timeout', payload);

  const conn = getConnection(url);
  conn.send('timeout', ZMQ.ZMQ_SNDMORE);
  conn.send(payload);
  storage.remove(id);
}

function getConnection(addr) {
  // if we've have an existing connection, return it
  if (connections.has(addr)) return connections.get(addr);

  // otherwise make a new one
  debug(`creating socket to ${addr}`);
  const socket = ZMQ.socket('push');
  socket.on('error', (err) => {
    console.error('destroying socket %s', addr, err.message);
    socket.removeAllListeners();
    socket.disconnect();
    socket.close();
    connections.delete(addr);
  });
  connections.set(addr, socket);
  socket.connect(addr);
  return socket;
}

Now, we've added a little helper function getConnection. It takes a connection url, makes a new ZMQ push socket connected to the url, and returns it. Now we can reuse our connections, and all timers configured with the same url will use the same connection. This does the job, but we could expand a bit to allow for different socket types. ZMQ has comes with PUB / SUB, REQ / REP, and a few others. But in the context of skyring, it doesn't make sense to use a SUB socket as it's read-only. And there is no real need to wait for a response, so both req and rep are out. We should restrict our types to just push and pub. Additionally, we could make it configurable if the socket should connect or bind to the address.

'use strict'

const ZMQ = require('zmq');
const debug = require('debug')('skyring:transports:zmq');
const METHODS = new Set(['push', 'pub']);
const ZMQ_BIND = !!process.env.ZMQ_BIND;
const connections = new Map();

module.exports = function zmq(method, url, payload, id, storage) {
  debug('execute zmq timer', 'timeout', payload);

  const conn = getConnection(url, method);
  conn.send('timeout', ZMQ.ZMQ_SNDMORE);
  conn.send(payload);
  storage.remove(id);
}

function getConnection(addr, type) {
  const key = `${type}:${addr}`;
  if (connections.has(key)) return connections.get(key);

  if (!METHODS.has(type)) {
    const error = new Error(`unsupported transport method ${type}`);
    error.code = 'EZMQMETHOD';
    throw error;
  }

  debug(`creating ${type} socket to ${addr}`);
  const socket = ZMQ.socket(type);
  socket.on('error', (err) => {
    console.error('destroying socket %s', key, err.message);
    socket.removeAllListeners();
    socket.disconnect();
    socket.close();
    connections.delete(key);
  });

  connections.set(key, socket);

  if (ZMQ_BIND) {
    socket.bindSync(addr);
  } else {
    socket.connect(addr);
  }
  return socket;
}

Just a couple of small adjustments. Before creating the socket we check to see of the the socket type is allowed and throw if it is not. On line 18, we use a composite key for storing connections, so it is possible to have multiple socket types for the same url.

We can plug this into a single node skyring cluster to see it in action.

'use strict'

const Skyring = require('skyring');
const server = new Skyring({
  seeds: ['localhost:3455']
, transports: [require('./lib/zmq')]
});

server.load().listen(3000);

process.once('SIGINT', onSignal);
process.once('SIGTERM', onSignal);

function onSignal() {
  server.close(() => {
    console.log('skyring closed');
  });
}

That's it! We can start setting timers with using our zmq transport

curl -XPOST http://localhost:3000/timer -H 'Content-Type: application/json' -d '{
  "timeout":3000
, "data":"hello world!"
, "callback": {
    "transport": "zmq"
  , "method":"push"
  , "uri": "tcp://0.0.0.0:5555"
  }
}'

It is pretty simple and straight forward to build your own transport layer for Skyring. It can be a simple function, or for more complex use cases, you can build out entire packages using whatever tools you want. This makes the transport system in skyring very flexible, powerful, and easy to use.