Source: taskManager.js

/**
 * Welcome to taskManager.js
 * @module taskManager
 * @author Kier Lindsay sgo.life
 * This is met to work with child processes and simplify working with them
 * designed for ffmpeg video processing tasks
 *
 * @example
 var taskManager = require('./path/to/taskmanager.js')

 let task = taskManager.spawnTask('echo World', {
        parseProgress: (task, data) => {
            return parseInt(data); //todo actually figure out the progress made
        },
        args: ['42']
    },
 (tid, data, done) => {
        console.log('Hello ', data);
        if (done) {
            console.log('Woo Done');
            //todo start the next step or notify something
        }
    });

 app.get('/demoStatus', (req, res) => {
    taskManager.getStatus(0,)
})



 *
 */

var {spawn, exec} = require('child_process');

// var spawn = ChildProcess.spawn;


class Task {
    constructor() {
    }
}


/**
 * @property {Object} tasks - key = task id , and val is the task object
 let task = {
            tid, // incrementing task id
            sh, // the comand executed
            options, // task options
            cb, // callback(task_id, data, isDone, isError)
            proc, // the instance of child_proccess
            time, // start time unix
            promise, // resolve(task) a promise which resolves when finished process 'close' event is fired
            status, // {name, val, msg, bg} for proggres bar,
            data, // buffer for task.proc.stdout. note this is a string vs proc.stdout is a stream
            setStatus() // function to set this tasks status

            todo read registerTask for anomaly's
        }
 *
 */
class TaskManager {

    /**
     * Creat a new task manager normally you can use the default instance
     * let taskManager = require('path/to/taskManager');
     *
     * @param options - none yet or todo document
     */
    constructor(options) {
        //just so that someone could interface with the underlying classes from the exported instance if desired
        //rather then the default export Object deconstruction Var {TaskManager} = require('./TaskManager')
        this.TaskManager = TaskManager;

        //OPTIONS
        options = options || {};

        //SETUP
        this.tasks = {};
        this.tid = 0;// tasks id;

        this.q = [];
        this.qid = 1;

        //todo
        this.limits = {
            potree: 2,
            img: 10,
            vid: 4,
            gopro360: 1,
        }
        // g.show()


        this.status = {};
        this.status['qTask'] = {
            bg: 'secondary',
            name: "TaskQueue",
            val: 100 / (this.q.length + 1),
            msg: `There are ${this.q.length} items in the task queue!`
        };


    }


    /**
     * Schedule a task to be completed in a friendly way aka one at a time in a nice orderly Q
     * @param opts - options
     * @param opts.eta - an estimated time for the task to complete
     * @param opts.priority -
     * @param opts.cb - called when the fn's promise resolves cb(qEntry, ...resolveArgs)
     * @param fn - this function wll be called with your provided args when this task is started.  "this" is set to the task object
     *  it MUST return a Promise or be an Object with property promise;
     * @param args - args to forward to function
     * @return Object task object
     */
    qTask(opts, fn, ...args) {


        let t = {
            qid: this.qid++,
            opts,
            fn,
            args
        }
        this.q.push(t);
        console.log("Task Manager: Scheduled Task ", opts.name, '(', t.qid, ') in Queue Position ', this.q.length);
        this.status['qTask'] = {
            bg: 'secondary',
            name: "TaskQueue",
            val: 100 / (this.q.length + 1),
            msg: `There are ${this.q.length} items in the task queue!`
        };


        // todo digital ocean auto spin up scaling
        let standbyWorkers = 2;
        let spinUpThresh = 50;
        let maxWorkers = 10;
        let minWorkers = 2; // always keap 2 ready to work
        let destroyTimeout = 300000; // 5 mins
        if (this.q.length > spinUpThresh) {
            //doctrl spin up  worker/ 10 pending tasks
            //
        }

        if (!this.qRunning) {
            this.qStart()
        }

        return t;

    }

    /**
     * Start the task q (sould be called when task is added to empty q)
     */
    qStart() {

        console.log("Startin Q")
        if (this.qRunning) {
            console.error("Error Q is already started?")
            return;
        }

        this.status['qTask'] = {
            bg: 'secondary',
            name: "TaskQueue",
            val: 100 / (this.q.length + 1),
            msg: `There are ${this.q.length} items in the task queue!`
        };


        if (this.q.length == 0) {
            console.warn("Task Manager: Queue Started by nothing is waiting. Stopping!")
            this.qRunning = false;
            return;
        }
        this.qRunning = true;

        let t = this.q[0]; // current task

        let opts = t.opts;
        let task = t.fn(...t.args);


        function isPromise(p) {
            return p && typeof p.then == 'function'
        }

        if (isPromise(task)) {
            // this.registerTask(task,)
            task = {
                promise: task
            }
        }

        if (!isPromise(task.promise)) {
            console.error("A Queued task did not return a task or promise for use to know when it finished!", task);
            return;
        }
        let self = this;
        task.promise.then((...args) => {
            self.q = self.q.filter(qt => qt.qid != t.qid);
            console.log("Task Manager: Task Finished Removing from Queue ", t.qid)
            if (opts.cb) {
                opts.cb(t, ...args)
            }

            self.qRunning = false;
            self.qStart();

        }).catch(e => {
            console.error("Task Manager Task Failed: ", e);
        });


    }

    // lets try and parse the progress out of the stdout/err
    _taskProgress(tid, data, done, err) {

        let t = this.tasks[tid];

        if (!t.data) t.data = [];
        if (!done) {
            t.data.push(data);
        }
        let now = Date.now();


        let statusSet = false;
        let ret
        if (typeof t.options.parseProgress == 'function') {
            ret = t.options.parseProgress(t, data, done, err);
            if (typeof ret == 'object') {
                t.progress = ret.val;
                t.status = ret;
                statusSet = true;
            } else {
                t.progress = parseInt(ret);
            }
            // t.progress = t.options.parseProgress(t, data);
        } else {

            if (t.done) {
                t.progress = 100;
            }
            //todo try and parse it allllllllllllllll =\<o.o>/=

            if (t.progress && t.progress < 100) {
                t.progress += 3;
            } else if( t.progress < 100){
                t.progress = 42;
            }
            console.warn('TaskManager: progress: ',t.progress,'Unknown progress so just incrementing by one :)', t.sh);
        }

        if (!statusSet) {
            t.status = {
                name: 'task' + t.tid,
                bg: 'info',
                val: t.progress,
                msg: data
            }
        }
        // t.setStatus(t.status);


        this.status[tid] = t.status;//dont need cus js passes by reference.. jk we do :)
        // t.delta = Date.now() - time
        t.elapsed = now - t.time;
        console.log(`TaskManager[${tid}|${t.elapsed / 1000}]`, t.status);
    }

    /**
     * Start a task process
     * @param sh - the command like in a shell
     * @param options - optional options
     * @param options.args - Array of args to append to args from the sh line or if just the command is passed in to sh
     * @param options.options - options to pass to spawn https://nodejs.org/api/child_process.html#child_processspawncommand-args-options
     * @param  options.parseProgress - (task, data, done, err) a function that should return the progress out of 1  from a stdout/err data string must return number or object containing val = number for progress bars.
     *              Alternatively set task.status directly in the cb to override or do you own thing independently
     * @param options.parseProgress - (task, data, done, err) a function that should return the progress out of 1  from a stdout/err data string
     * @param options. - (task, data) a function that should return the progress out of 1  from a stdout/err data string
     * then args like in child_process.spawn
     * @param cb - (tid ,data, isDone, isError) called when something happens data is stdout or stderro text
     */
    spawnTask(sh, options, cb) {

        //options and defaults
        options = options || {};

        let time = Date.now();
        //Var setup
        let tid = options.tid || this.tid++;
        let args = sh.split(' ');
        let cmd = args.shift();
        if (Array.isArray(options.args)) {
            args = [...args, options.args];
        }

        let proc = spawn(cmd, args)

        options.sh = sh;
        options.time = time;
        let t = this.registerTask(proc, options, cb)

        return t
    }

    updateStatus(tid, status) {

    }

    /**
     * Register an exising process to the task manager
     * @param proc
     * @param options
     * @param [options.tid] - a custom task id
     * @param [options.sh] - the shell command being executed
     * @param [options.parseProgress] - a function to parse the progress (or set taskManager.status[tid]);
     * @param cb
     */
    registerTask(proc, options, cb, fn, args) {

        // array = chain of tasks
        // fn = function to call
        //

        options = options || {};
        let tid = options.tid || this.tid++;
        let time = options.time || Date.now();
        let sh = options.sh || "registered_process";
        cb = typeof cb == 'function' ? cb : () => {
        }//noop

        let promise;

        if (typeof proc == 'function') {


        }
        if (proc) {
            proc.stdout.setEncoding("utf8")
            proc.stdout.on('data', (data) => {
                // if(data )
                console.log('TASK MANAGER got data: ', !!data);

                this._taskProgress(tid, data, false);
                cb(tid, data, false);
            });

            proc.stderr.setEncoding("utf8")
            proc.stderr.on('data', (data) => {
                this._taskProgress(tid, data, false, true);
                cb(tid, data, false, true);
            });

            promise = new Promise(resolve => {
                proc.on('close', () => {
                    this._taskProgress(tid, 'TASK MANAGER: Done Executing ' + sh, true);
                    cb(tid, 'TASK MANAGER: Done Executing ' + sh, true)
                    let t = this.tasks[tid];
                    resolve(t);
                });
            });
        } else {
            console.warn("No process defend are you managing the task internally?")
        }
        if (options.promise) {
            promise = options.promise;
        }


        let task = {
            tid, // incrementing task id
            sh, // the command executed
            options, // task options
            cb, // callback(task_id, data, isDone, isError)
            proc, // the instance of child_proccess
            time, // start time unix
            promise, // resolve(task) a promise which resolves when finished process 'close' event is fired
            prog: 0,// proggress
            data: [], // buffer of data each time event is fired data is pushed
            // status: ,
            setStatus: (s) => { // set the status for progress reporting
                task.status = s;
                this.status[tid] = task.status
            }
        }

        if (options.status) task.setStatus(options.status);
        else task.setStatus({bg: 'info', name: sh.split(' ').shift(), val: 0})


        this.tasks[tid] = task;

        // this.status[tid] = task.status;//references are cool i think i only need to define this here and
        //even if the task status changes when someone gets the status it will return accurately
        return task;
    }

    /**
     * The official way of using
     * taskManager.status
     * @param {String} [name] - full status object if undefined otherwise just get the status of some particular thing/task.
     * @returns {*|{}}
     */
    getStatus(name) {
        //ensure its initalized
        //todo finish
        if (!this.status) this.status = this.tasks.map();
        //if param get that key
        if (name) return this.status[name];
        else return this.status; // otherwise return it all
    }

    /**
     * Return the promise for task completion
     * @param tid
     * @returns {*}
     */
    promise(tid) {
        let p = this.tasks[tid].promise;
        if (!p) p = new Promise(resolve => resolve(this.tasks[tid]));
        return p;
    }

    /**
     * get task object by task id
     * @param tid - task id
     * @returns {*}
     */
    getTask(tid) {
        return this.tasks[tid];
    }

    /**
     * search for a string in the sh command
     * @param str
     * @returns {*}
     */
    findTask(str) {
        return this.tasks.find(t => t.sh.includes(str));
    }


    /**
     * NOT TRACKED BY TASK MANAGER
     * spawn a child process easily
     * @param sh - the shell command as you would type it
     * @param cb - cb(data, done) whenever stdout is received
     */
    simpleExec = (sh, cb) => {


        console.log("Spawning: ", sh);

        let args = sh.split(' ');
        let cmd = args.shift();
        let proc = spawn(cmd, args);

        proc.stdout.on('data', function (data) {
            console.log(data);
            // this.status(data, false);
            cb(data, false);
        });

        proc.stderr.setEncoding("utf8")
        proc.stderr.on('data', function (data) {
            cb(data, false, true);
        });

        proc.on('close', function () {
            cb('Done Executing ' + cmd, true)
        });

        return proc;

    }
}

// this is exported in this way because I want the interface in use to be
// var taskManager = require('taskmanager.js');;
// task = taskManager.spawnTask('ffmpeg -ss 00:00:01.000 -i input.mp4 -vframes 1 output.png', (tid, data, done, err) => {
//     console.log(done, data, '<-is done | from stderr-> ', err)
// })

//alternatively
// task.promise.then(t=>{
//     console.log(t.data)
// })
let taskManager = new TaskManager({});
module.exports = taskManager;