Timeseries APIs on a dime with Node, Tastypie and MySQL

T

ime series data is quickly becoming all the rage in big data circles. The primary use case for large amounts of time series data tends to be visualization of collected metrics. This could be the temperature of our house, CPU usage of a remote server, the oil levels of your car, etc. In a nut shell, time series data is:

  • Data over a continuous time interval
  • Data contains successive measurements across that interval
  • Data uses equal spacing between every two consecutive measurements
  • Each time unit within the interval has at most one data point

It might look something like this

[
    {
        time: '2016-01-01 00:00:00', // minute 0
        value: 1
    },{
        time: '2016-01-01 00:01:00', // minute 1
        value: 2
    }
]

The data is sequential, continuous and contains a single data point and a time at which the data point was recorded. Just by looking at the data you can see just how well it maps to graphing - Time usually the x axis and the value is the y axis. Lets say for our example, we are running a small e-commerce store and we have been collecting sales data for the past few years and we would like to visualize profits against certain metrics.

For example, do we make more money selling hanes or fruit of the loom. Do we make more money from things that are white or black, and so on. Now of course we didn't plan for this and we have all of this data normalized across a bunch of tables in our database. We have over 10 million records of data, with a good number of foreign keys, and we need to make it performant. We could of course set up a non-relational time series data base like influxdb or openTSDB, figure out an ETL layer using some or all of hadoop projects, and slurp data into our database. But that is a lot of work that I just don't want to do nor do I have time for!

Current Table Setup

  • user - The user selling something
  • order - The incoming order for a user
  • order items - The individual items on an order
  • variant - a product variation ( product + size + color )
  • products - A Single sellable item
  • colors - color name, and hex codes
  • sizes - sizes and extra costs associated to a size ( L, XL, etc )

ETL [ee 'tee ehl] -n, --noun.

Extract Transform Load
a process in database usage

  • Extracts data from homogeneous or heterogeneous data sources.
  • Transforms the data for storing it in the proper format
  • Loads the transformed data into the final storage destination.

The primary killer of performance in our situation is all of the joins we have her perform on a table with 10 million records. What we really want to do is just make a new table to hold a de-normalized subset of that data and query that. Recently I was turned onto a shiny gem in MYsql callce scheduled events. Which is basically a cron job at the database level that runs a query. That is exactly what I want. We can write a simple event to do the heavy query every hour. We are going to need a couple of tables to pull this off.

Database

  1. Table to keep track of the last time each job was executed
  2. Table to store the results of each job we create

Our job tacking table is simple - id, job name and last run time We are using this to keep track of when our job run so we only sample data that hasn't yet been sampled.

Table Layout

-- create ETL job table
CREATE TABLE `timeseries_etl_job` (
  `timeseries_etl_job_id` int(11) NOT NULL AUTO_INCREMENT,
  `job_name` varchar(100) NOT NULL,
  `job_last_run` datetime NOT NULL DEFAULT '1970-01-01 00:00:00',
  PRIMARY KEY (`timeseries_etl_job_id`),
  UNIQUE KEY `job_name_UNIQUE` (`job_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;    
-- table for product specific data
CREATE TABLE IF NOT EXISTS `timeseries_trend_product` (
  `timeseries_trend_product_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `time` date NOT NULL,
  `value` int(11) NOT NULL DEFAULT '0',
  `size` varchar(7) NOT NULL,
  `color` varchar(45) NOT NULL,
  `color_hex` varchar(7) NOT NULL,
  `tax` int(11) NOT NULL DEFAULT '0',
  `product` varchar(255) NOT NULL,
  `product_id` int(11) NOT NULL,
  `user_id` int(11) NOT NULL,
  `quantity_sold` int(11) NOT NULL DEFAULT '0',
  `total_price` int(11) NOT NULL DEFAULT '0',
  PRIMARY KEY (`analytics_trend_product_id`),
  KEY `idx_user_time` (`user_id`,`time`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

As you have probably noticed, we have a lot more data in our table than just time and value. We are going to use each data field as a separate metric that can be aggregated and graphed. It will make more sense in a bit, I promise.

ETL EVENT

Now for the the tricky bit. First you need to enable the event scheduler in mysql. You should set event_scheduler=on under the [mysqld] section in the configuration. You can also enable it for an open session with a single query.

SET GLOBAL event_scheduler = ON;

After that is done, the event schedule itself is very similar to writing a transaction with one or more queries, but with an event schedule header. Our Schedule needs to do 3 things:

  1. Get the last run time of the same job
  2. Set the last run time of the same job to NOW()
  3. Execute a single INSERT INTO statement with all of the data rolled up as we need it starting from the last time the job was run previously
DELIMITER $$
CREATE EVENT IF NOT EXISTS ts_aggregate_trend_product

ON SCHEDULE EVERY 1 HOUR
STARTS NOW()
ON COMPLETION PRESERVE

DO BEGIN
  SELECT GET_LOCK('ts_etl_product', 0) INTO @got_lock;
  IF @got_lock = 1 THEN
    START TRANSACTION;

      -- Get the last un time
      SET @last_run := ( SELECT job_last_run from timeseries_etl_job where job_name = 'timeseries_trend_product');

      -- set the last run time to now
      UPDATE timeseries_etl_job SET job_last_run = NOW() WHERE job_name = 'timeseries_trend_product';
      
      -- Rollup the data and insert into our time series table
      INSERT INTO timeseries_trend_product(
        `time`
         , `value`
         , order_item_id
         , size
         , color
         , color_hex
         , product
         , product_id
         , user_id
         , quantity_sold
         , total_price     
      ) SELECT
         DATE(order_item.created_at) as `time`
        , ( ( order_item.price_items / order_item.quantity ) - order_item.discount_amount ) as value
        , order_item.id
        , size.name
        , color.name
        , color.hex_prim
        , product.name
        , product.id as product_id
        , product.user_id
        , order_item.quantity
        , order_item.price
            
        FROM order_item

        INNER JOIN order ON order.id = order_item.order_id
        INNER JOIN variant ON variant.variant_id = order_item.variant_id
        INNER JOIN color ON color.color_id = variants.color_id
        INNER JOIN size ON size.size_id = variants.size_id
        INNER JOIN product ON product.product_id = variants.product_id
        INNER JOIN user ON user.user_id = product.user_id
 
        where order.status = 'paid'
        AND order.paid_at > @last_run
        ORDER BY order_item.created_at ASC;
    COMMIT;
    SELECT RELEASE_LOCK('ts_etl_product') INTO @discard;   
  END IF;
END $$
DELIMITER ;

We are only doing a partial down sample here. We are not calculating the total profit of all of the items up until now, but the profit on each item for a given order and storing a little metadata about the item. We'll do the math at query time. Relational databases are still really good at adding! This single operation takes our multi-million row query that included several tables down to one table with a few hundred thousand records.

Relational databases are still really good at adding!

There are a couple of important things about this query to note

Change The Default Delimiter

We need to change the delimiter of the event to something other than a semicolon so we can use them in our actual query. Just remember to change it back at the end!

DELIMITER $$

Preserve Events

The default behavior for events is to be dropped / disabled after they run. Yes, even if the schedule suggests that it will run more than once, as we have done here with EVERY 1 HOUR
Set ON COMPLETION to PRESERVE and everything is fine.

ON COMPLETION PRESERVE

Last Execution Time

We need the fetch the last execution time of the same job, and store in a variable so we can constrain our query to only the records we haven't yet processed

SET @last_run := ( SELECT job_last_run from timeseries_etl_job where job_name = 'timeseries_trend_product');

Aquire A Lock

This is how you obtain an mutex lock in mysql. This isn't a table or row lock, it is just a named mutex. We use this here so events don't start stacking up. Or if we have a bad query that doens't close a transaction or something like that, we only do the bad thing once. Optional, but recommended.

SELECT GET_LOCK('ts_etl_product', 0) INTO @got_lock;

Cast Timestamps

The time interval is only granular to a day. Mainly because it makes grouping and aggregation easier, and faster. You could certainly store these as date time for higher precision, but performance would suffer a bit. There are certainly pretty simple work arounds, but I'm going with a DATE

 DATE(order_item.created_at ) as `time`

Time Series Tastypie Resource

This is actually rather simple to set up with Tastypie. Using the default resource class, you can simply override the get_objects method, which is used, as the name would imply, to return an array of data anyway you can get it from the database.

var Resource = require('tastypie').Resource;

module.exports = Resource.extend({
    options:{
        fields:{
            time     : { type:'date' }
          , value    : { type:'int' }
          , metric   : { type:'field' }
        }
    }
    , constructor: function( options ){
        this.parent('constructor', options );
    }

    , get_objects: function( bundle, callback ){
        var array_of_data = get_some_data();
        callback(null, array_of_data);
    }

});

Bookshelf & Knex

I personally have had to implement this using the popular bookshelf, and knex libraries. Knex more so than bookshelf, and it is just as easy to work with dynamic data metrics

var tastpie = require('tastypie')
  , Resource = tastypie.Resource;

module.exports = Resource.extend({
  options:{
      pk: 'timeseries_trend_product'
     , filtering:{
         time:[ 'range', 'period' ]
        ,user:[ 'exact' ]
        ,metric:[ 'exact' ]
     }
     ,fields:{
        time     : { type:'date', help:'time of event' }
      , value    : { type:'int', help:'the sum the metric at given time' }
      , metric   : { type:'field', help:'the metric we are measuring' }

      // user field here exclusively for filtering
      , user     : { type:'int', attribute:'user_id', exclude: true }
  }

  ,constructor: function( options ){
      this.parent( 'constructor', options );
  }
  
  ,get_objects: function( bundle, callback ){
    let queryset = BookshelfModel.collection()
      , that     = this
      ;

    queryset.query( this.aggregate.bind(this, bundle) )
            .fetch()
            .then(function( objects ){
                // tell bookshelf to not be slow
                callback(null, objects.toJSON({shallow:true}) );
            })
            .catch( console.error )
  }

  , aggregate: function( bundle, qb ){
      let metric = bundle.req.query.metric

     // validate the metric...

      qb.select(
          'time'
        , `${metric} as metric`
      )
      .sum( 'value as value' )
      .groupBy( 'time' )
      .groupBy( 'metric' );
      
      delete bundle.req.query.metric;
      // apply other filtering logic

      // apply result limits

      // apply sorting

      return qb;      
  }
})

This resource allows the user to define the series data with a custom metric query parameter. Notice that we apply a group by on both time and our customer metric. We do that so data will come back with each metric in series by date and sum up our profit data by that metric, on a specific day.

Time Windows

So by default, these queries will return all data since the beginning of time. Albeit, interesting, it will be very slow. We need an easy way to specify a limited window of time to view into; the last 6 months or year to date, etc.

We can use a simple filter to dynamically apply time period filter on our time field using Time Intervals which most popular databases support. A filter function for tastypie to do that might look something like this

//filters/period.js
module.exports = function period(qb, field, term){
    // Result will be an array of [NUMBER, TIME FRAME]
    // [ 6, "MONTH" ]
    let result = parse_term( term );
    return qb.whereRaw(`${field} > CURDATE() - INTERVAL ${result[0]} ${result[1]}`);
};

This little function will apply a window of the current date less the passed in time interval, 1 Year for examle. It generates SQL like this:

SELECT * FROM TABLE where time > CURDATE() - INTERVAL 1 YEAR

And with that You can query a single endpoint for a specific metric over a specific window of time.

// ?format=json&user=100&metric=color&time__period=6m
[
    {
        time: '2016-01-01',  // Day 1
        value: 1,
        metric:'white'
    },{
        time: '2016-01-01',
        value: 2,
        metric:'black'
    },{
        time: '2016-01-02',  // Day 2
        value: 10,
        metric:'white'
    },{
        time: '2016-01-02',
        value: 30,
        metric:'black'
    }

    // and so on
]

Change the metric and time filter we will get a different view of the data

// ?format=json&user=100&metric=size&time__period=1y
[
    {
        time: '2015-01-01',  // Day 1
        value: 10,
        metric:'m'
    },{
        time: '2015-01-01',
        value: 30,
        metric:'xl'
    },{
        time: '2015-01-02',  // Day 2
        value: 5,
        metric:'m'
    },{
        time: '2015-01-02',
        value: 100,
        metric:'xl'
    }

    // and so on
]

This keeps the data format consistent and our response footprint small for fast response times. I have been able to return over 8000 records in under 200ms and less than 70kb rather consistently.

Hook Into Hapi

All that is left to make it real, is plug the resources into a Hapi Server instance


var hapi = require('hapi')
  , tastypie = require('tastypie')
  , TimeseriesResource = require("./timeseries")
  , v1
  , server

server = new hapi.Server();
server.connection({port:3000});

v1 = new tastypie.Api('api/v1');
v1.use( new TimeseriesResource() );

server.register([v1], function(){
  server.start( console.log );
});

Time series data on a budget without any over complications, using tools you are already familiar with. And, for most use cases, it is fast enough.

  • 2 tables
  • 1 Event Query
  • 1 Resource
  • Fast Enough

Ship it.

-- If you are interested in actual the filter functionality for tastypie & bookshelf, I have started a repo on github with some of the functionality here.

tastypie mysql timeseries analytics node.js