JS – Semaphore

A veces no interesa evitar que muchas peticiones asíncronas se ejecuten en paralelo, por motivos de rendimiento limite de CPU o bien por lógica del programa que debe esperar a que finalice el bloque completo antes de permitir dejar pasar a la siguiente petición,

Para ello podemos crear un semáforo, que solo permitirá la ejecución del código de 1 e 1 (o de varios si se le configura)

Caso de uso: podemos tener un servidor express que recibe simultáneamente diferentes peticiones, pero una función que necesita ser ejecutado 1 a 1 sin poder ser concurrentes, por ejemplo comprimir un archivo y enviarlo

type typeFinishJobFunc = {
    uuid: number,
    status: string,
    job: any,
    jobPromise: any
}

type typeStartJobFunc = {
    uuid: number,
    job: any,
    jobPromise: any
}

type typeJob = {
    task: () => Promise<unknown>
}

class QueueManager {
    uuid: number
    jobs: typeJob[]
    concurrentJobs: number
    status: string

    executingJobs: any[]
    finishedJobs: any[]
    errorJobs: any[]

    stoppingFunc: (isAlreadyStopped: boolean) => void
    stopedFunc: () => void
    finishedFunc: () => void
    updateFunc: () => void
    startJobFunc: (object: typeStartJobFunc) => void
    finishJobFunc: (object: typeFinishJobFunc) => void

    constructor() {
        this.uuid = 0
        this.jobs = []
        this.concurrentJobs = 8
        this.status = 'stopped'
        this.executingJobs = []
        this.finishedJobs = []
        this.errorJobs = []
    }

    executeJobs(jobs: typeJob[]) {
        if (this.status !== 'stopped') return false;

        this.uuid = 0
        this.status = 'processing'
        this.executingJobs = []
        this.finishedJobs = []
        this.errorJobs = []
        this.jobs = jobs
        this.update();
        for (let i = 0; i < this.concurrentJobs; i++) {
            if (this.jobs.length > 0) {
                this.startQueue()
            }
        }
        return true;
    }

    update() {
        if (this.updateFunc) {
            this.updateFunc()
        }
    }

    startQueue() {
        const job = this.jobs.shift()
        if (job) {
            const uuid = this.uuid
            this.uuid++

            const jobPromise = job.task()
                .then(() => {
                    this.finishedJobs.push(job)
                    if (this.finishJobFunc) {
                        this.finishJobFunc({
                            uuid,
                            status: 'success',
                            job,
                            jobPromise
                        })
                    }
                })
                .catch(() => {
                    this.errorJobs.push(job)
                    this.finishJobFunc({
                        uuid,
                        status: 'error',
                        job,
                        jobPromise
                    })
                })
                .finally(() => {
                    const indexUuid = this.executingJobs.findIndex(j => j.uuid === uuid)
                    this.executingJobs.splice(indexUuid, 1)
                    this.update()
                    if (this.status === 'processing') {
                        this.startQueue()
                        return
                    }
                })

            this.executingJobs.push({
                uuid,
                job,
                jobPromise
            })
            this.update()

            if (this.startJobFunc) {
                this.startJobFunc({
                    uuid,
                    job,
                    jobPromise
                })
            }

        } else {
            if (this.executingJobs.length === 0) {
                this.status = 'stopped'
                this.update();
                if (this.finishedFunc) {
                    this.finishedFunc();
                }
            }
        }
    }



    stop() {
        if (this.status === 'procesing-stop') {
            if (this.stoppingFunc) {
                this.stoppingFunc(false);
            }
            return false;
        }

        this.status = 'procesing-stop'
        this.update()
        if (this.stoppingFunc) {
            this.stoppingFunc(true);
        }

        Promise.allSettled(this.executingJobs.map(p => p.jobPromise))
            .then(() => {
                this.status = 'stopped'
                this.update()
                if (this.stopedFunc) {
                    this.stopedFunc();
                }
            })
    }

    getAllJobs() {
        return this.executingJobs.length + this.finishedJobs.length + this.errorJobs.length + this.jobs.length
    }

    getCurrentFinishedJobs() {
        return this.finishedJobs.length + this.errorJobs.length
    }
}

export {
    QueueManager
}

Forma de uso:

// Funcion genérica que devuelve una promesa
const compress = (encoding: string, md5: string): Promise<string> => {
    return new Promise((resolve, reject) => {
        // do stuff
    })
}

...

// Llamada a la función "callFunction"
const throttler = new Semaphore(1);
throttler.callFunction(() => compress('codec', 'xxx'))
throttler.callFunction(() => compress('codec', 'xxx'))
throttler.callFunction(() => compress('codec', 'xxx'))
throttler.callFunction(() => compress('codec', 'xxx'))

Fuente original: https://medium.com/swlh/semaphores-in-javascript-e415b0d684bc

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Este sitio usa Akismet para reducir el spam. Aprende cómo se procesan los datos de tus comentarios.