/**
* Welcome to taskManager.js
* @module taskManager
* @author Kier Lindsay sgo.life
* This is met to work with child processes and simplify working with them
* designed for ffmpeg video processing tasks
*
* @example
var taskManager = require('./path/to/taskmanager.js')
let task = taskManager.spawnTask('echo World', {
parseProgress: (task, data) => {
return parseInt(data); //todo actually figure out the progress made
},
args: ['42']
},
(tid, data, done) => {
console.log('Hello ', data);
if (done) {
console.log('Woo Done');
//todo start the next step or notify something
}
});
app.get('/demoStatus', (req, res) => {
taskManager.getStatus(0,)
})
*
*/
var {spawn, exec} = require('child_process');
// var spawn = ChildProcess.spawn;
class Task {
constructor() {
}
}
/**
* @property {Object} tasks - key = task id , and val is the task object
let task = {
tid, // incrementing task id
sh, // the comand executed
options, // task options
cb, // callback(task_id, data, isDone, isError)
proc, // the instance of child_proccess
time, // start time unix
promise, // resolve(task) a promise which resolves when finished process 'close' event is fired
status, // {name, val, msg, bg} for proggres bar,
data, // buffer for task.proc.stdout. note this is a string vs proc.stdout is a stream
setStatus() // function to set this tasks status
todo read registerTask for anomaly's
}
*
*/
class TaskManager {
/**
* Creat a new task manager normally you can use the default instance
* let taskManager = require('path/to/taskManager');
*
* @param options - none yet or todo document
*/
constructor(options) {
//just so that someone could interface with the underlying classes from the exported instance if desired
//rather then the default export Object deconstruction Var {TaskManager} = require('./TaskManager')
this.TaskManager = TaskManager;
//OPTIONS
options = options || {};
//SETUP
this.tasks = {};
this.tid = 0;// tasks id;
this.q = [];
this.qid = 1;
//todo
this.limits = {
potree: 2,
img: 10,
vid: 4,
gopro360: 1,
}
// g.show()
this.status = {};
this.status['qTask'] = {
bg: 'secondary',
name: "TaskQueue",
val: 100 / (this.q.length + 1),
msg: `There are ${this.q.length} items in the task queue!`
};
}
/**
* Schedule a task to be completed in a friendly way aka one at a time in a nice orderly Q
* @param opts - options
* @param opts.eta - an estimated time for the task to complete
* @param opts.priority -
* @param opts.cb - called when the fn's promise resolves cb(qEntry, ...resolveArgs)
* @param fn - this function wll be called with your provided args when this task is started. "this" is set to the task object
* it MUST return a Promise or be an Object with property promise;
* @param args - args to forward to function
* @return Object task object
*/
qTask(opts, fn, ...args) {
let t = {
qid: this.qid++,
opts,
fn,
args
}
this.q.push(t);
console.log("Task Manager: Scheduled Task ", opts.name, '(', t.qid, ') in Queue Position ', this.q.length);
this.status['qTask'] = {
bg: 'secondary',
name: "TaskQueue",
val: 100 / (this.q.length + 1),
msg: `There are ${this.q.length} items in the task queue!`
};
// todo digital ocean auto spin up scaling
let standbyWorkers = 2;
let spinUpThresh = 50;
let maxWorkers = 10;
let minWorkers = 2; // always keap 2 ready to work
let destroyTimeout = 300000; // 5 mins
if (this.q.length > spinUpThresh) {
//doctrl spin up worker/ 10 pending tasks
//
}
if (!this.qRunning) {
this.qStart()
}
return t;
}
/**
* Start the task q (sould be called when task is added to empty q)
*/
qStart() {
console.log("Startin Q")
if (this.qRunning) {
console.error("Error Q is already started?")
return;
}
this.status['qTask'] = {
bg: 'secondary',
name: "TaskQueue",
val: 100 / (this.q.length + 1),
msg: `There are ${this.q.length} items in the task queue!`
};
if (this.q.length == 0) {
console.warn("Task Manager: Queue Started by nothing is waiting. Stopping!")
this.qRunning = false;
return;
}
this.qRunning = true;
let t = this.q[0]; // current task
let opts = t.opts;
let task = t.fn(...t.args);
function isPromise(p) {
return p && typeof p.then == 'function'
}
if (isPromise(task)) {
// this.registerTask(task,)
task = {
promise: task
}
}
if (!isPromise(task.promise)) {
console.error("A Queued task did not return a task or promise for use to know when it finished!", task);
return;
}
let self = this;
task.promise.then((...args) => {
self.q = self.q.filter(qt => qt.qid != t.qid);
console.log("Task Manager: Task Finished Removing from Queue ", t.qid)
if (opts.cb) {
opts.cb(t, ...args)
}
self.qRunning = false;
self.qStart();
}).catch(e => {
console.error("Task Manager Task Failed: ", e);
});
}
// lets try and parse the progress out of the stdout/err
_taskProgress(tid, data, done, err) {
let t = this.tasks[tid];
if (!t.data) t.data = [];
if (!done) {
t.data.push(data);
}
let now = Date.now();
let statusSet = false;
let ret
if (typeof t.options.parseProgress == 'function') {
ret = t.options.parseProgress(t, data, done, err);
if (typeof ret == 'object') {
t.progress = ret.val;
t.status = ret;
statusSet = true;
} else {
t.progress = parseInt(ret);
}
// t.progress = t.options.parseProgress(t, data);
} else {
if (t.done) {
t.progress = 100;
}
//todo try and parse it allllllllllllllll =\<o.o>/=
if (t.progress && t.progress < 100) {
t.progress += 3;
} else if( t.progress < 100){
t.progress = 42;
}
console.warn('TaskManager: progress: ',t.progress,'Unknown progress so just incrementing by one :)', t.sh);
}
if (!statusSet) {
t.status = {
name: 'task' + t.tid,
bg: 'info',
val: t.progress,
msg: data
}
}
// t.setStatus(t.status);
this.status[tid] = t.status;//dont need cus js passes by reference.. jk we do :)
// t.delta = Date.now() - time
t.elapsed = now - t.time;
console.log(`TaskManager[${tid}|${t.elapsed / 1000}]`, t.status);
}
/**
* Start a task process
* @param sh - the command like in a shell
* @param options - optional options
* @param options.args - Array of args to append to args from the sh line or if just the command is passed in to sh
* @param options.options - options to pass to spawn https://nodejs.org/api/child_process.html#child_processspawncommand-args-options
* @param options.parseProgress - (task, data, done, err) a function that should return the progress out of 1 from a stdout/err data string must return number or object containing val = number for progress bars.
* Alternatively set task.status directly in the cb to override or do you own thing independently
* @param options.parseProgress - (task, data, done, err) a function that should return the progress out of 1 from a stdout/err data string
* @param options. - (task, data) a function that should return the progress out of 1 from a stdout/err data string
* then args like in child_process.spawn
* @param cb - (tid ,data, isDone, isError) called when something happens data is stdout or stderro text
*/
spawnTask(sh, options, cb) {
//options and defaults
options = options || {};
let time = Date.now();
//Var setup
let tid = options.tid || this.tid++;
let args = sh.split(' ');
let cmd = args.shift();
if (Array.isArray(options.args)) {
args = [...args, options.args];
}
let proc = spawn(cmd, args)
options.sh = sh;
options.time = time;
let t = this.registerTask(proc, options, cb)
return t
}
updateStatus(tid, status) {
}
/**
* Register an exising process to the task manager
* @param proc
* @param options
* @param [options.tid] - a custom task id
* @param [options.sh] - the shell command being executed
* @param [options.parseProgress] - a function to parse the progress (or set taskManager.status[tid]);
* @param cb
*/
registerTask(proc, options, cb, fn, args) {
// array = chain of tasks
// fn = function to call
//
options = options || {};
let tid = options.tid || this.tid++;
let time = options.time || Date.now();
let sh = options.sh || "registered_process";
cb = typeof cb == 'function' ? cb : () => {
}//noop
let promise;
if (typeof proc == 'function') {
}
if (proc) {
proc.stdout.setEncoding("utf8")
proc.stdout.on('data', (data) => {
// if(data )
console.log('TASK MANAGER got data: ', !!data);
this._taskProgress(tid, data, false);
cb(tid, data, false);
});
proc.stderr.setEncoding("utf8")
proc.stderr.on('data', (data) => {
this._taskProgress(tid, data, false, true);
cb(tid, data, false, true);
});
promise = new Promise(resolve => {
proc.on('close', () => {
this._taskProgress(tid, 'TASK MANAGER: Done Executing ' + sh, true);
cb(tid, 'TASK MANAGER: Done Executing ' + sh, true)
let t = this.tasks[tid];
resolve(t);
});
});
} else {
console.warn("No process defend are you managing the task internally?")
}
if (options.promise) {
promise = options.promise;
}
let task = {
tid, // incrementing task id
sh, // the command executed
options, // task options
cb, // callback(task_id, data, isDone, isError)
proc, // the instance of child_proccess
time, // start time unix
promise, // resolve(task) a promise which resolves when finished process 'close' event is fired
prog: 0,// proggress
data: [], // buffer of data each time event is fired data is pushed
// status: ,
setStatus: (s) => { // set the status for progress reporting
task.status = s;
this.status[tid] = task.status
}
}
if (options.status) task.setStatus(options.status);
else task.setStatus({bg: 'info', name: sh.split(' ').shift(), val: 0})
this.tasks[tid] = task;
// this.status[tid] = task.status;//references are cool i think i only need to define this here and
//even if the task status changes when someone gets the status it will return accurately
return task;
}
/**
* The official way of using
* taskManager.status
* @param {String} [name] - full status object if undefined otherwise just get the status of some particular thing/task.
* @returns {*|{}}
*/
getStatus(name) {
//ensure its initalized
//todo finish
if (!this.status) this.status = this.tasks.map();
//if param get that key
if (name) return this.status[name];
else return this.status; // otherwise return it all
}
/**
* Return the promise for task completion
* @param tid
* @returns {*}
*/
promise(tid) {
let p = this.tasks[tid].promise;
if (!p) p = new Promise(resolve => resolve(this.tasks[tid]));
return p;
}
/**
* get task object by task id
* @param tid - task id
* @returns {*}
*/
getTask(tid) {
return this.tasks[tid];
}
/**
* search for a string in the sh command
* @param str
* @returns {*}
*/
findTask(str) {
return this.tasks.find(t => t.sh.includes(str));
}
/**
* NOT TRACKED BY TASK MANAGER
* spawn a child process easily
* @param sh - the shell command as you would type it
* @param cb - cb(data, done) whenever stdout is received
*/
simpleExec = (sh, cb) => {
console.log("Spawning: ", sh);
let args = sh.split(' ');
let cmd = args.shift();
let proc = spawn(cmd, args);
proc.stdout.on('data', function (data) {
console.log(data);
// this.status(data, false);
cb(data, false);
});
proc.stderr.setEncoding("utf8")
proc.stderr.on('data', function (data) {
cb(data, false, true);
});
proc.on('close', function () {
cb('Done Executing ' + cmd, true)
});
return proc;
}
}
// this is exported in this way because I want the interface in use to be
// var taskManager = require('taskmanager.js');;
// task = taskManager.spawnTask('ffmpeg -ss 00:00:01.000 -i input.mp4 -vframes 1 output.png', (tid, data, done, err) => {
// console.log(done, data, '<-is done | from stderr-> ', err)
// })
//alternatively
// task.promise.then(t=>{
// console.log(t.data)
// })
let taskManager = new TaskManager({});
module.exports = taskManager;