Summer Of Sockets Part 4: Multi-Part Messages With Node.js & ZeroMQ

I

n previous installments of Summer Of Sockets, we took a look at the basic messaging socket types in ZeroMQ, PUSH/PULL, PUB/SUB and REQ/REP. Each of these types provides different patterns of messaging between their connected peers. Up to now, we have been passing simple message around to illustrate the patterns. In larger, more complicated applications, the message will need to carry more information.

In JavaScript, that typically means constructing, passing and parsing JSON objects, as well as accounting for missing fields, and event embedding message types and meta data about usage into object. This quickly becomes messy and overly complicated.

JSON is a fine data format, but ZeroMQ provides a very handy facility that makes structured messaging event easier, with Multi-Part Messages. A multi-part message is exactly what it sounds like, A message that has multiple parts, or frames. ZeroMQ guarantees that all parts of the message will be delivered at once, or not at all, so that is not something you need to worry about it. Message Framing is actually used rather heavily in messaging protocols and makes dealing with and sending structured message significantly easier.

To illustrate this, lets expand on our counter example from the pipeline playground so that our producer sends a message that instructs the consumer to either increment or decrement. We can use the frames of a multipart message to hold specific pieces of a information for the consumer.

Multipart PUSH / PULL

+---+------------+-----------------------+
| 0 |   String   | Math function to call |
+---+------------+-----------------------+
| 1 |   Number   | value to apply        |
+---+------------+-----------------------+

Our message will contain 2 parts to start. The first frame will tell our consumers what math function to apply, and the second frame will be the value to pass it.

// twopart-sender.js
var zmq = require( 'zmq' )  
  , socket = zmq.socket( 'push' ) 

// bind to an address and port 
socket.bind('tcp://0.0.0.0:9998');  
// send a message to queue
setInterval(function(){  
    socket.send("multi", zmq.ZMQ_SNDMORE ); 
    socket.send( 2 ); 
},150); 
// twopart-consumer.js 
var zmq = require( 'zmq' )  
  , socket = zmq.socket( 'pull' )
  , counter = 0
  , methods
  ;

methods = {  
    add:function( value ){
        value = parseInt( value, 10 )
        if( !isNaN( value ) ){
            counter += value;
            return console.log( "counter is %d", counter )
        }

        var e = new Error();
        e.name = "InvalidValueType"
        e.message = "add only accepts Numbers";
        throw e;
    }
}

// Each message part will come across as parameter 
socket.on('message', function( fn, value ){  
    // messages come as buffers 
    var op = methods[ fn.toString('ascii') ];

    op && op( value.toString('ascii'))
});
// connect to original address 
socket.connect('tcp://0.0.0.0:9998');  

Our sender is doing something we haven't encountered yet. It sends a second parameter ZMQ_SNDMORE. This tells the zmq socket that there are more parts to this message to be sent. You can send as many parts to a single message as you wish. The consumer is mostly the same, with the exception, that it accounts for the parts of the message as additional parameters to the message event handler.

Multi-Part messages can be used to create internal message protocols or contracts

This is where things start to get interesting. Because the message parts come across and parameters, we don't have to embed all of the logical information or control parameters into the message it self. In our example, the first parameter is our control parameter telling us which method to call. In this case it is just the add function, and it is passed the second parameter. Multi-Part messages can be used to create internal message protocols or contracts made up of numerous small frames and single data frame.

This also improves memory efficiency with in individual consumer process. Because each frame is a node Buffer object, memory usage is kept outside of the process heap until it is actually read, if at all. If the responsibility of the process is to redirect the message, reading large data payloads my not be necessary. Which means we can transfer data packages that exceed the 1GB memory limit of the V8 runtime.

Multipart Request / Reply

All of the socket types support this multi-part message format. For example, we could use multipart messages with the REQ / REP socket types and implement a light weight server protocol

# REQUEST
+---+------------+-------------------------+
| 0 |   STRING   | the request method type |
+---+------------+-------------------------+
| 1 |   NUMBER   | Protocol version number |
+---+------------+-------------------------+
| 2 |   STRING   |       Request URI       |
+---+------------+-------------------------+
| 3 |   STRING   |  encoding of response   |
+---+------------+-------------------------+
# RESPONSE
+---+------------+-------------------------+
| 0 |   NUMBER   | The request method type |
+---+------------+-------------------------+
| 1 |   STRING   |     Status Message      |
+---+------------+-------------------------+
| 2 |   OBJECT   |      Data payload       |
+---+------------+-------------------------+

These two tables represent the server / client contract for a overly simplified 2-way communication channel. This is all that is required for two different developers to implement two different parts of a distributed system. Lets take a look at what the client code might look like first

// multipart-request.js
var zmq = require( 'zmq' )  
  , socket = zmq.socket( 'req' ) 

// bind to an address and port 
socket.bind('tcp://0.0.0.0:9998');

// send a message to server
setInterval(function(){  
    socket.send([
        "GET"
        ,1
        ,"/data"
        ,"json"
    ]);
},150); 

socket.on('message', function( status, type, payload ){  
    status = parseInt( status.toString() )
    if( status !== 200 ){
        // retry request
        console.error("bad request")
    } else {
        console.log( payload )
    }
})

In this example or requesting client is making a GET request, using version 1 to /data and accepts JSON data back.

// twopart-request.js 
var zmq = require( 'zmq' )  
  , socket = zmq.socket( 'rep' )
  ;

// Each message part will come across as parameter 
socket.on('message', function( method, version, uri, accepts ){  
    accepts = accepts.toString();

    if( accepts !== 'json'){
        return socket.send([400, 'BAD REQUEST', null ])
    } 
    socket.send([
        200
        , "OK"
        , JSON.stringify({
            data:[
                Math.random()
              , Math.random()
              , Math.random()
              , Math.random()
              , Math.random()
              , Math.random()
              , Math.random()
              , Math.random()
              , Math.random()
            ]
        })
    ])
});
// connect to original address 
socket.connect('tcp://0.0.0.0:9998');  

The server, checks to make sure the encoding type is json, because we don't care about anything else. If it is, we generate a data object and send back the expected message frames. The complex data object is out side the scope of what the server needs to do, and as a result, has no reason to try to parse it or its contents.

The ZMQ_SNDMORE flag can be omitted in favor of a single array message

Also notice, that we have eliminated the ZMQ_SNDMORE flag. The node zmq library will convert an array into a multi part message for you by default. So as you can see, it is really quite simple and allows developers to create small efficient message driven systems with very little code. You can find the code to accompany these example in my bitbucket repo.

node.js zmq summer of sockets