Node.js: Simple Queuing for File Processing

QueueManager (JavaScript)

For the past few months I have been in the process of creating a custom, enterprise
grade, distributed media transcoding/encoding system for my employer. Because the
system is meant to be stand alone and can be integrated into any of the company’s
web products it needed to be extremely robust, meaning it needed to handle the
processing of any number of media files being passed to it at any given time and by
any number of clients. It is for this purpose that I used Node.js as the programming
platform. Node has the speed required for a system like this due primarily to its
non-blocking IO implementation.

Because the transcoder has to handle an unspecified number of videos added to it
at any given time, by any number of clients, it needed the ability to asynchronously
handle the addition of new media, queue that media for transcoding, while at the
same time be actively processing media that was previously queued. For this I needed
a simple and easy to understand queuing mechanism. Below is what I came up with.

The code below was written to deal with the processing of a file at the time it
is added to a directory being watched by the transcoding service. To put it simply:

  1. A Node file system watcher, such as Chokidar, watches a specified directory
  2. Each time a file is added to that directory it is wrapped in a ‘File Processor’
    (described below) that is responsible for processing that file type
  3. The processor is then added to the ‘QueueManager’ (described below). The manager places the processor in the primary queue
  4. If the manager is currently processing, files nothing happens after the processor is added to the queue
  5. If the manager is not processing, it will iterate over the main queue removing processors in the main queue and placing a reference to each processor’s ‘process’ method into a processing queue (see Async File Processor below)
  6. Once the main queue has been emptied, the processing queue is passed to the Async library for processing in either a series (one at a time) or parallel (each processor spun off onto its own sudo-subprocess) – see Async Library for details on implementation

Possible Issues:

Question: What happens if the main queue is constantly being added to while the manager is trying to empty it?

Answer: if there is no halt in main queue additions an infinite loop would occur.

Likelihood: Extremely low considering the system and its use.

/**
 - Created by Tim on 5/28/15.
 */

var async = require('async'),
    logger = require('./logger'),
    _ = require('underscore');

var component = 'QueueManager: ';
var queue = [];
var isProcessingQueue = false;
var isParallelProcessingActive = false;

// Event handler called when Async is done processing the current queue
function queueProcessingComplete(err, results){
    isProcessingQueue = false;

    if(err){
        logger.error(component + 'Error Processing queue: ERR-MSG: ' + err.message);
    }else{
        processQueue();
    }

}

// Processes the queue by moving all current items in the queue
// into a temporary list which is then handed to the Async library
// for processing in either series or parallel depending on need
function processQueue(){
    if(!isProcessingQueue){
        var managerList = [];
        var shifted = undefined;
        var flag = true;

        while(flag) {
            shifted = queue.shift();
            if(_.isUndefined(shifted)){
                flag = false
            } else {
                managerList.push(shifted.process);
            }
        }

        if(managerList.length > 0) {
            isProcessingQueue = true;
            // Execute each of the Transcoders in series
            if(isParallelProcessingActive){
                async.parallel(managerList, queueProcessingComplete);
            } else {
                async.series(managerList, queueProcessingComplete);
            }
        }
    }
}

// Public function for adding 'processors' to the queue
exports.pushToQueue = function(processor){
    queue.push(processor);
    processQueue();
};

Async File Processor (JavaScript)

The Async library requires a processing function to be passed to it that accepts a callback
function. The callback is used by the processing function to notify Async of
either its success or failure in performing its job. For that reason I have created
the ‘Processor’ template below.

By using a JavaScript ‘Constructor Function’ to create a new object that wraps the
Async processing function, one can maintain a reference through ‘that’ to any and
all relevant objects and data necessary for not only the processing of the file but
for returning results via the Async callback function.

/**
 * Created by Tim on 5/27/15.
 */

var fs = require('fs-extra'),
    logger = require('./../utils/logger'),
    _ = require('underscore');

var component = 'Processor: ';

// Constructor Function: creates a new unique processor
function Processor(file){
    // Allow reference to instance in callbacks
    var that = this;

    //'Private' Variables
    this._file = file;
    this._active = true;
    this._completeCallback = undefined;

    //'Private' Methods

    //'Event Handlers' Methods
    this._handleProcessingComplete = function (err, results) {
        if(err){
            var msg = component + 'File processing did not complete successfully' +
                '. ERR-MSG: ' + err.message;

            logger.error(msg, null);

            // Your processing error code here

            // End processing and hand execution back to queue
            that._completeCallback(null, new Error(msg));

        } else {

            // Your processing complete handling code here

            // End processing and hand execution back to queue
            that._completeCallback(null, results);
        }
    };

    //'Public' Methods
    this.process = function (callback) {
        // Allow notification of processing completion
        that._completeCallback = callback;

        // Prepare for file processing code here

        // Only execute processing task if initialization was successful
        if(that._active){

            // Non-blocking IO File processing code here that eventually calls the
            // that._handleProcessingComplete function which in turn calls the
            // Async callback function to notify processing is complete

        } else {
            // End processing and hand execution back to queue
            that._completeCallback(null, []);
        }
    };
}

module.exports = Processor;

Conclusion

That about does it, a simple queuing mechanism for Node.js file processing. As always
any feedback or suggestions are welcome so long as they are constructive.