"use strict";
const StoppedError = require("./StoppedError");
/**
* All log entries use this tag
* @ignore
* @private
*/
const logTag = 'taskQueue';
function isRecord(value) {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function isPromise(value) {
return Object.prototype.toString.call(value) === '[object Promise]';
}
function validatePositiveInteger(value, propertyName) {
if (value === undefined)
return undefined;
if (typeof value !== 'number' || !Number.isInteger(value) || value < 1) {
throw new Error(`"${propertyName}" must be a positive integer`);
}
return value;
}
function validateOptions(options = {}) {
if (!isRecord(options))
throw new Error('"options" must be an object');
const { name, size, workers, logger } = options;
if (name !== undefined && typeof name !== 'string') {
throw new Error('"name" must be a string');
}
if (logger !== undefined && !isRecord(logger)) {
throw new Error('"logger" must be an object');
}
return {
name,
size: validatePositiveInteger(size, 'size'),
workers: validatePositiveInteger(workers, 'workers'),
logger: logger,
};
}
/**
* A queue that enforces a maximum number of simultaneously executing tasks
*/
class TaskQueue {
/**
* Constructor. There is no need to call start() after creating a new object.
* @param {object} options
* @param {number} options.size The maximum number of functions that can execute at a time
*/
constructor(options = {}) {
var _a, _b, _c, _d;
this.stopped = false;
this.taskCount = 0;
this.doneListeners = [];
this.waitListeners = [];
const validatedOptions = validateOptions(options);
this.name = validatedOptions.name;
this.size = (_b = (_a = validatedOptions.size) !== null && _a !== void 0 ? _a : validatedOptions.workers) !== null && _b !== void 0 ? _b : 1;
this.workers = (_d = (_c = validatedOptions.workers) !== null && _c !== void 0 ? _c : validatedOptions.size) !== null && _d !== void 0 ? _d : 1;
this.logger = validatedOptions.logger;
if (this.logger && !this.logger.isLevelEnabled(logTag)) {
this.logger = undefined;
}
}
/**
* Called when a task finishes
* @private
* @ignore
*/
taskFinished() {
const newTasks = this.taskCount - 1;
if (this.logger) {
this.logger.log(logTag, {
message: `Task finished for '${this.name}'. Tasks: ${newTasks}`,
name: this.name,
taskCount: newTasks,
});
}
if (newTasks < 0) {
const message = `Task counter is negative for '${this.name}'`;
if (this.logger) {
this.logger.log(['error', logTag], {
message,
name: this.name,
});
}
}
else {
this.taskCount = newTasks;
}
const doneListener = this.doneListeners.shift();
if (doneListener) {
doneListener();
}
else if (!newTasks) {
const { waitListeners } = this;
if (waitListeners.length) {
this.waitListeners = [];
waitListeners.forEach((resolve) => resolve());
}
}
}
/**
* Same as push() but without the check for this.stopping
* @param {function} task
* @return {Promise}
* @private
* @ignore
*/
async pushInternal(task) {
for (;;) {
if (this.taskCount < this.workers)
break;
if (this.full) {
await new Promise((resolve) => this.doneListeners.push(resolve));
}
else {
const promise = new Promise((resolve, reject) => {
this.doneListeners.push(() => {
void this.pushInternal(task).then((ret) => {
ret.promise.then(resolve, reject);
});
});
});
return { promise };
}
}
let taskResult;
let taskErrored = false;
let taskError;
try {
taskResult = task();
}
catch (error) {
taskErrored = true;
taskError = error;
}
const taskCount = ++this.taskCount;
if (this.logger) {
this.logger.log(logTag, {
message: `Task started for '${this.name}'. Tasks: ${taskCount}`,
name: this.name,
taskCount,
});
}
let promise;
if (isPromise(taskResult)) {
promise = new Promise((resolve, reject) => {
taskResult.then((value) => {
this.taskFinished();
resolve(value);
}, (error) => {
this.taskFinished();
reject(error);
});
});
}
else {
promise = taskErrored ? Promise.reject(taskError) : Promise.resolve(taskResult);
this.taskFinished();
}
return { promise };
}
/**
* Starts a task. If the queue's maximum size has been reached, this method waits for a task to finish
* before invoking task().
* @param {function} task A function to call. It can return a Promise, throw an exception, or return a value.
* @throws {StoppedError} If stop() has been called
* @return {Promise} Does not reject. Resolves to an object with the property 'promise'
* containing either the Promise returned by task or a new Promise that resolves to the value returned by task or
* rejects using the exception thrown by it. Therefore, it is not only possible to wait for the task to start, it is
* also possible to wait for it to finish.
*
* For example:
* // Wait for an open slot in the queue
* const ret = await queue.push(()=>new Promise(resolve=>setTimeout(()=>resolve('Hello'), 5000)));
* // Wait for 5 seconds and output Hello
* console.log(await ret.promise);
*/
push(task) {
if (this.stopped)
throw new StoppedError('Stopped');
return this.pushInternal(task);
}
/**
* Is the queue full?
* @return {boolean} true if the maximum number of tasks are queued
*/
get full() {
return this.taskCount + this.doneListeners.length >= this.size;
}
/**
* Waits for running tasks to complete. Callers are not prevented callers from calling push(); thus
* there is no guarantee that when the returned Promise resolves, the queue will have an available slot.
* @return {Promise}
*/
wait() {
if (this.taskCount)
return new Promise((resolve) => this.waitListeners.push(resolve));
return undefined;
}
/**
* Waits for running tasks to complete. Prevents additional calls to push().
*/
stop() {
this.stopped = true;
return this.wait();
}
/**
* Undo method for stop(). There is no need to invoke start() after creating a new object.
*/
start() {
this.stopped = false;
}
}
module.exports = TaskQueue;