Archivo por meses: noviembre 2022

JS – Semaphore

A veces no interesa evitar que muchas peticiones asíncronas se ejecuten en paralelo, por motivos de rendimiento limite de CPU o bien por lógica del programa que debe esperar a que finalice el bloque completo antes de permitir dejar pasar a la siguiente petición,

Para ello podemos crear un semáforo, que solo permitirá la ejecución del código de 1 e 1 (o de varios si se le configura)

Caso de uso: podemos tener un servidor express que recibe simultáneamente diferentes peticiones, pero una función que necesita ser ejecutado 1 a 1 sin poder ser concurrentes, por ejemplo comprimir un archivo y enviarlo

export class Semaphore {

    currentRequests: any[]
    runningRequests: number
    maxConcurrentRequests: number

    constructor(maxConcurrentRequests = 1) {
        this.currentRequests = [];
        this.runningRequests = 0;
        this.maxConcurrentRequests = maxConcurrentRequests;
    }


    callFunction<T>(fnToCall: () => Promise<T>, ...args: any): Promise<T> {
        return new Promise((resolve, reject) => {
            this.currentRequests.push({
                resolve,
                reject,
                fnToCall,
                args,
            });
            this.tryNext();
        });
    }

    tryNext() {
        if (!this.currentRequests.length) {
            return;
        } else if (this.runningRequests < this.maxConcurrentRequests) {
            let { resolve, reject, fnToCall, args } = this.currentRequests.shift();
            this.runningRequests++;
            let req = fnToCall(...args);
            req.then((res: any) => resolve(res))
                .catch((err: any) => reject(err))
                .finally(() => {
                    this.runningRequests--;
                    this.tryNext();
                });
        }
    }
}

Forma de uso:

// Funcion genérica que devuelve una promesa
const compress = (encoding: string, md5: string): Promise<string> => {
    return new Promise((resolve, reject) => {
        // do stuff
    })
}

...

// Llamada a la función "callFunction"
const throttler = new Semaphore(1);
throttler.callFunction(() => compress('codec', 'xxx'))
throttler.callFunction(() => compress('codec', 'xxx'))
throttler.callFunction(() => compress('codec', 'xxx'))
throttler.callFunction(() => compress('codec', 'xxx'))

Fuente original: https://medium.com/swlh/semaphores-in-javascript-e415b0d684bc

JS – Queue Manager

A continuación expongo una solución para solventar la problemática de ejecutar múltiples peticiones simultaneas pero pidiendo controlar que cantidad de concurrencias hay,

De esta forma podremos crear un pequeño objecto que permite gestionar la cola automáticamente y con eventos que indica cuando se ha finalizado un proceso o todo el lote de el

type typeFinishJobFunc = {
    uuid: number,
    status: string,
    job: any,
    jobPromise: any
}
type typeStartJobFunc = {
    uuid: number,
    job: any,
    jobPromise: any
}
type typeJob = {
    task: () => Promise<unknown>
}
class QueueManager {
    uuid: number
    jobs: typeJob[]
    concurrentJobs: number
    status: string
    executingJobs: any[]
    finishedJobs: any[]
    errorJobs: any[]
    stoppingFunc?: (isAlreadyStopped: boolean) => void
    stopedFunc?: () => void
    finishedFunc?: () => void
    updateFunc?: () => void
    startJobFunc?: (object: typeStartJobFunc) => void
    finishJobFunc?: (object: typeFinishJobFunc) => void
    constructor() {
        this.uuid = 0
        this.jobs = []
        this.concurrentJobs = 8
        this.status = 'stopped'
        this.executingJobs = []
        this.finishedJobs = []
        this.errorJobs = []
    }
    executeJobs(jobs: typeJob[]) {
        if (this.status !== 'stopped') return false;
        this.uuid = 0
        this.status = 'processing'
        this.executingJobs = []
        this.finishedJobs = []
        this.errorJobs = []
        this.jobs = jobs
        this.update();
        for (let i = 0; i < this.concurrentJobs; i++) {
            if (this.jobs.length > 0) {
                this.startQueue()
            }
        }
        return true;
    }
    update() {
        if (this.updateFunc) {
            this.updateFunc()
        }
    }
    startQueue() {
        const job = this.jobs.shift()
        if (job) {
            const uuid = this.uuid
            this.uuid++
            const jobPromise = job.task()
                .then(() => {
                    this.finishedJobs.push(job)
                    if (this.finishJobFunc) {
                        this.finishJobFunc({
                            uuid,
                            status: 'success',
                            job,
                            jobPromise
                        })
                    }
                })
                .catch(() => {
                    this.errorJobs.push(job)
                    if (this.finishJobFunc) {
                        this.finishJobFunc({
                            uuid,
                            status: 'error',
                            job,
                            jobPromise
                        })
                    }
                })
                .finally(() => {
                    const indexUuid = this.executingJobs.findIndex(j => j.uuid === uuid)
                    this.executingJobs.splice(indexUuid, 1)
                    this.update()
                    if (this.status === 'processing') {
                        this.startQueue()
                        return
                    }
                })
            this.executingJobs.push({
                uuid,
                job,
                jobPromise
            })
            this.update()
            if (this.startJobFunc) {
                this.startJobFunc({
                    uuid,
                    job,
                    jobPromise
                })
            }
        } else {
            if (this.executingJobs.length === 0) {
                this.status = 'stopped'
                this.update();
                if (this.finishedFunc) {
                    this.finishedFunc();
                }
            }
        }
    }
    stop() {
        if (this.status === 'procesing-stop') {
            if (this.stoppingFunc) {
                this.stoppingFunc(false);
            }
            return false;
        }
        this.status = 'procesing-stop'
        this.update()
        if (this.stoppingFunc) {
            this.stoppingFunc(true);
        }
        Promise.allSettled(this.executingJobs.map(p => p.jobPromise))
            .then(() => {
                this.status = 'stopped'
                this.update()
                if (this.stopedFunc) {
                    this.stopedFunc();
                }
            })
    }
    getAllJobs() {
        return this.executingJobs.length + this.finishedJobs.length + this.errorJobs.length + this.jobs.length
    }
    getCurrentFinishedJobs() {
        return this.finishedJobs.length + this.errorJobs.length
    }
}
export {
    QueueManager
}

La forma de uso:

const queueManager = new QueueManager();
queueManager.concurrentJobs = 1;
queueManager.startJobFunc = () => console.log('Job started');
queueManager.finishedFunc = () => console.log('job finished');
queueManager.stoppingFunc = () => console.log('Stopping queues')
queueManager.stopedFunc = () => console.log('job stopped');
queueManager.updateFunc = () => console.log('job updated');

const jobs = [];

fileNames.forEach(fileName => jobs.push({
    fileName: fileName,
    task: () => {
        return new Promise((resolve, reject) => {
            console.log('do_stuff')
        })
    }
}));

queueManager.executeJobs(jobs);