/* eslint-disable no-promise-executor-return */
/* eslint-disable no-await-in-loop */
/* eslint-disable no-plusplus */
const Joi = require('joi');
const StoppedError = require('./StoppedError');
/**
* @description Schema for TaskQueue's constructor options
*/
const optionsSchema = Joi.object({
name: Joi.string().description('The name of the queue. It is logged.'),
size: Joi.number()
.integer()
.min(1)
.description(`push() resolves when the task queue's length is less than this value`),
workers: Joi.number()
.integer()
.min(1)
.description('The maximum number of tasks that can execute simultaneously. Defaults to size.'),
logger: Joi.object(),
});
/**
* All log entries use this tag
* @ignore
* @private
*/
const logTag = 'taskQueue';
/**
* A queue that enforces a maximum number of simultaneously executing tasks
*/
class TaskQueue {
/**
* Properties:
* {boolean} stopped
* {object} logger
* {number} taskCount The number of currently executing tasks
* {function[]} doneListeners
* {function[]} waitListeners
*/
/**
* Constructor. There is no need to call start() after creating a new object.
* @param {object} options
* @param {number} options.size The maximum number of functions that can execute at a time
*/
constructor(options) {
const validation = optionsSchema.validate(options);
if (validation.error) throw new Error(validation.error.message);
Object.assign(this, validation.value);
// eslint-disable-next-line no-multi-assign
if (!this.workers && !this.size) this.workers = this.size = 1;
else if (!this.workers) this.workers = this.size;
else if (!this.size) this.size = this.workers;
if (this.logger && !this.logger.isLevelEnabled(logTag)) delete this.logger;
this.taskCount = 0;
this.doneListeners = [];
this.waitListeners = [];
}
/**
* Called when a task finishes
* @private
* @ignore
*/
taskFinished() {
const newTasks = this.taskCount - 1;
if (this.logger) {
this.logger.log(logTag, {
message: `Task finished for '${this.name}'. Tasks: ${newTasks}`,
name: this.name,
taskCount: newTasks,
});
}
if (newTasks < 0) {
const message = `Task counter is negative for '${this.name}'`;
if (this.logger) {
this.logger.log(['error', logTag], {
message,
name: this.name,
});
}
} else {
this.taskCount = newTasks;
}
const { length: doneListenerCount } = this.doneListeners;
// Calling resolve() on a doneListener doesn't cause a new task to be started
if (doneListenerCount) {
this.doneListeners.shift()();
} else if (!newTasks) {
const { waitListeners } = this;
// =========================
// Release callers to wait()
if (waitListeners.length) {
this.waitListeners = [];
waitListeners.forEach((resolve) => resolve());
}
}
}
/**
* Same as push() but without the check for this.stopping
* @param {function} task
* @return {Promise}
* @private
* @ignore
*/
async pushInternal(task) {
// Wait for an available slot in the queue
// eslint-disable-next-line no-await-in-loop
for (;;) {
if (this.taskCount < this.workers) break;
if (this.full) {
// ============
// Size reached
await new Promise((resolve) => this.doneListeners.push(resolve));
} else {
// ===============
// Workers reached
const promise = new Promise((resolve, reject) =>
this.doneListeners.push(() =>
this.pushInternal(task).then((ret) => {
// ret.promise is from the task. Forward to the Promise returned by this method.
ret.promise.then(resolve, reject);
})
)
);
return { promise };
}
}
let fret;
let err;
try {
fret = task(); // this could throw an exception if it's not explicitly marked async, so increment
} catch (error) {
err = error;
}
// Increment taskCount here, not earlier
const taskCount = ++this.taskCount;
if (this.logger) {
this.logger.log(logTag, {
message: `Task started for '${this.name}'. Tasks: ${taskCount}`,
name: this.name,
taskCount,
});
}
let promise;
if (fret && Object.prototype.toString.call(fret) === '[object Promise]') {
promise = new Promise((resolve, reject) => {
fret.then(
(value) => {
this.taskFinished();
resolve(value);
},
(error) => {
this.taskFinished();
reject(error);
}
);
});
} else {
promise = err ? Promise.reject(err) : Promise.resolve(fret);
this.taskFinished();
}
// If bare 'promise' was returned, the caller would wait for fret to resolve. Instead, callers should only wait for
// for task to be called. Therefore, return an object that contains a 'promise' key. Awaiting on this method
// therefore waits for an empty slot in the queue and returns a Promise that immediately resolves to an object.
return { promise };
}
/**
* Starts a task. If the queue's maximum size has been reached, this method waits for a task to finish
* before invoking task().
* @param {function} task A function to call. It can return a Promise, throw an exception, or return a value.
* @throws {StoppingError} If stop() has been called
* @return {Promise} Does not reject. Resolves to an object with the property 'promise'
* containing either the Promise returned by task or a new Promise that resolves to the value returned by task or
* rejects using the exception thrown by it. Therefore, it is not only possible to wait for the task to start, it is
* also possible to wait for it to finish.
*
* For example:
* // Wait for an open slot in the queue
* const ret = await queue.push(()=>new Promise(resolve=>setTimeout(()=>resolve('Hello'), 5000)));
* // Wait for 5 seconds and output Hello
* console.log(await ret.promise);
*/
push(task) {
if (this.stopped) throw new StoppedError('Stopped');
return this.pushInternal(task);
}
/**
* Is the queue full?
* @return {boolean} true if the maximum number of tasks are queued
*/
get full() {
return this.taskCount + this.doneListeners.length >= this.size;
}
/**
* Waits for running tasks to complete. Callers are not prevented callers from calling push(); thus
* there is no guarantee that when the returned Promise resolves, the queue will have an available slot.
* @return {Promise}
*/
wait() {
// eslint-disable-next-line no-await-in-loop
if (this.taskCount) return new Promise((resolve) => this.waitListeners.push(resolve));
return undefined;
}
/**
* Waits for running tasks to complete. Prevents additional calls to push().
*/
stop() {
this.stopped = true;
return this.wait();
}
/**
* Undo method for stop(). There is no need to invoke start() after creating a new object.
*/
start() {
this.stopped = false;
}
}
module.exports = TaskQueue;