Node.js: Simple Queuing for File Processing

QueueManager (JavaScript)

For the past few months I have been in the process of creating a custom, enterprise
grade, distributed media transcoding/encoding system for my employer. Because the
system is meant to be stand alone and can be integrated into any of the company’s
web products it needed to be extremely robust, meaning it needed to handle the
processing of any number of media files being passed to it at any given time and by
any number of clients. It is for this purpose that I used Node.js as the programming
platform. Node has the speed required for a system like this due primarily to its
non-blocking IO implementation.

Because the transcoder has to handle an unspecified number of videos added to it
at any given time, by any number of clients, it needed the ability to asynchronously
handle the addition of new media, queue that media for transcoding, while at the
same time be actively processing media that was previously queued. For this I needed
a simple and easy to understand queuing mechanism. Below is what I came up with.

The code below was written to deal with the processing of a file at the time it
is added to a directory being watched by the transcoding service. To put it simply:

  1. A Node file system watcher, such as Chokidar, watches a specified directory
  2. Each time a file is added to that directory it is wrapped in a ‘File Processor’
    (described below) that is responsible for processing that file type
  3. The processor is then added to the ‘QueueManager’ (described below). The manager places the processor in the primary queue
  4. If the manager is currently processing, files nothing happens after the processor is added to the queue
  5. If the manager is not processing, it will iterate over the main queue removing processors in the main queue and placing a reference to each processor’s ‘process’ method into a processing queue (see Async File Processor below)
  6. Once the main queue has been emptied, the processing queue is passed to the Async library for processing in either a series (one at a time) or parallel (each processor spun off onto its own sudo-subprocess) – see Async Library for details on implementation

Possible Issues:

Question: What happens if the main queue is constantly being added to while the manager is trying to empty it?

Answer: if there is no halt in main queue additions an infinite loop would occur.

Likelihood: Extremely low considering the system and its use.

/**
 - Created by Tim on 5/28/15.
 */

var async = require('async'),
    logger = require('./logger'),
    _ = require('underscore');

var component = 'QueueManager: ';
var queue = [];
var isProcessingQueue = false;
var isParallelProcessingActive = false;

// Event handler called when Async is done processing the current queue
function queueProcessingComplete(err, results){
    isProcessingQueue = false;

    if(err){
        logger.error(component + 'Error Processing queue: ERR-MSG: ' + err.message);
    }else{
        processQueue();
    }

}

// Processes the queue by moving all current items in the queue
// into a temporary list which is then handed to the Async library
// for processing in either series or parallel depending on need
function processQueue(){
    if(!isProcessingQueue){
        var managerList = [];
        var shifted = undefined;
        var flag = true;

        while(flag) {
            shifted = queue.shift();
            if(_.isUndefined(shifted)){
                flag = false
            } else {
                managerList.push(shifted.process);
            }
        }

        if(managerList.length > 0) {
            isProcessingQueue = true;
            // Execute each of the Transcoders in series
            if(isParallelProcessingActive){
                async.parallel(managerList, queueProcessingComplete);
            } else {
                async.series(managerList, queueProcessingComplete);
            }
        }
    }
}

// Public function for adding 'processors' to the queue
exports.pushToQueue = function(processor){
    queue.push(processor);
    processQueue();
};

Async File Processor (JavaScript)

The Async library requires a processing function to be passed to it that accepts a callback
function. The callback is used by the processing function to notify Async of
either its success or failure in performing its job. For that reason I have created
the ‘Processor’ template below.

By using a JavaScript ‘Constructor Function’ to create a new object that wraps the
Async processing function, one can maintain a reference through ‘that’ to any and
all relevant objects and data necessary for not only the processing of the file but
for returning results via the Async callback function.

/**
 * Created by Tim on 5/27/15.
 */

var fs = require('fs-extra'),
    logger = require('./../utils/logger'),
    _ = require('underscore');

var component = 'Processor: ';

// Constructor Function: creates a new unique processor
function Processor(file){
    // Allow reference to instance in callbacks
    var that = this;

    //'Private' Variables
    this._file = file;
    this._active = true;
    this._completeCallback = undefined;

    //'Private' Methods

    //'Event Handlers' Methods
    this._handleProcessingComplete = function (err, results) {
        if(err){
            var msg = component + 'File processing did not complete successfully' +
                '. ERR-MSG: ' + err.message;

            logger.error(msg, null);

            // Your processing error code here

            // End processing and hand execution back to queue
            that._completeCallback(null, new Error(msg));

        } else {

            // Your processing complete handling code here

            // End processing and hand execution back to queue
            that._completeCallback(null, results);
        }
    };

    //'Public' Methods
    this.process = function (callback) {
        // Allow notification of processing completion
        that._completeCallback = callback;

        // Prepare for file processing code here

        // Only execute processing task if initialization was successful
        if(that._active){

            // Non-blocking IO File processing code here that eventually calls the
            // that._handleProcessingComplete function which in turn calls the
            // Async callback function to notify processing is complete

        } else {
            // End processing and hand execution back to queue
            that._completeCallback(null, []);
        }
    };
}

module.exports = Processor;

Conclusion

That about does it, a simple queuing mechanism for Node.js file processing. As always
any feedback or suggestions are welcome so long as they are constructive.

Node.js + Chokidar: Wait for file copy to complete before modifying

If you have ever spent any time dealing with folder watchers in pretty much any language, you will probably have noticed that the watcher usually notifies your application of a new file added to the folder the instant the file is added to the folder’s index. This however does not mean that the file is complete, it may still be in the process of being copied or saved to the disk. This creates a problem when you are watching for new files so that they can then be processed in some manner. To get around this you will need to check the API for the language/platform you are using to find out if the IO library has a way to check if a file is whole before processing.

I ran into this same issue while building a bulk video file transcoding system on top of the Node.js JavaScript platform. Unfortunately I could not find a built in method for checking if a file was completely saved before acting on it and so had to handle this myself. The following assumes you know how to use Chokidar or some other Node file watcher package from NPM. The function of interest is titled ‘checkFileCopyComplete’, hopefully this will help speed things up if you are looking for this solution.

// Setup video source folder observer for notifications of new files
var chokidar = require('chokidar');

var watcher = chokidar.watch(config.videoRawFolder, {
    persistent: true,
    followSymlinks: false,
    usePolling: true,
    depth: undefined,
    interval: 100,
    ignorePermissionErrors: false
});

watcher
    .on('ready', function() { logger.info('Initial scan complete. Ready for changes.'); })
    .on('unlink', function(path) { logger.info('File: ' + path + ', has been REMOVED'); })
    .on('error', function(err) {
        logger.error(component + 'Chokidar file watcher failed. ERR: ' + err.message);
    })
    .on('add', function(path) {
        logger.info('File', path, 'has been ADDED');

        fs.stat(path, function (err, stat) {

            if (err){
                logger.error(component + 'Error watching file for copy completion. ERR: ' + err.message);
                logger.error(component + 'Error file not processed. PATH: ' + path);
            } else {
                logger.info(component + 'File copy started...');
                setTimeout(checkFileCopyComplete, fileCopyDelaySeconds*1000, path, stat);
            }
        });
    });

// Makes sure that the file added to the directory, but may not have been completely copied yet by the
// Operating System, finishes being copied before it attempts to do anything with the file.
function checkFileCopyComplete(path, prev) {
    fs.stat(path, function (err, stat) {

        if (err) {
            throw err;
        }
        if (stat.mtime.getTime() === prev.mtime.getTime()) {
            logger.info(component + 'File copy complete => beginning processing');
            //------------------------------------- 
            // CALL A FUNCTION TO PROCESS FILE HERE
            //-------------------------------------
        }
        else {
            setTimeout(checkFileCopyComplete, fileCopyDelaySeconds*1000, path, stat);
        }
    });
}