Async Queue.ts

| comments

type Callback<A> = (a: A) => void;

/**
 * Delays stuff for ensuring fairness.
 */
export function yieldRunLoop(): Promise<void> {
  const fn: (cb: (() => void)) => void = typeof setImmediate !== 'undefined'
    ? setImmediate
    : cb => setTimeout(cb, 0)
  return new Promise(fn)
}

/**
 * Async queue implementation
 */
export class Queue<A> {
  private readonly elements: A[] = []
  private readonly callbacks: ([Callback<A>, Callback<Error>])[] = []

  enqueue = async (a: A) => {
    const cbs = this.callbacks.shift()
    if (cbs) {
      // fairness + guards against stack overflows
      await yieldRunLoop()
      const [resolve, _] = cbs
      resolve(a)
    } else {
      this.elements.push(a)
    }
  }

  dequeue = async () => {
    if (this.elements.length > 0) {
      return this.elements.shift() as A
    } else {
      let cb: [Callback<A>, Callback<any>] | undefined
      const p = new Promise<A>((resolve, reject) => { cb = [resolve, reject] })
      if (!cb) throw new Error("Promise constructor")
      this.callbacks.push(cb)
      return await p
    }
  }

  rejectAllActive = (e: Error) => {
    while (this.callbacks.length > 0) {
      const cbs = this.callbacks.pop()
      if (!cbs) continue
      const [_, reject] = cbs
      reject(e)
    }
  }
}

/**
 * Consumer implementation.
 *
 * @param workers specifies the number of workers to start in parallel
 * @param blockProcessFromExiting if `true` then blocks the Nodejs process from exiting
 *
 * @returns a `[promise, cancel]` tuple, where `cancel` is a function that can be used
 *          to stop all processing and `promise` can be awaited for the completion of
 *          all workers, workers that complete on cancellation
 */
export function consumeQueue<A>(queue: Queue<A>, workers: number, blockProcessFromExiting: boolean = false) {
  const Cancel = new Error("queue-cancel-all")
  const startWorker =
    async (isActive: boolean[], process: (a: A) => Promise<void>) => {
      await yieldRunLoop()
      try {
        while (isActive.length > 0 && isActive[0]) {
          const a = await queue.dequeue()
          try {
            await process(a)
          } catch (e) {
            console.error("Error while processing queue message:", a, e)
          }
          // Fairness + protection against stack-overflow
          await yieldRunLoop()
        }
      } catch (e) {
        if (e != Cancel) throw e
      }
    }

  // For keeping the process alive, for as long as there is a run-loop active
  function startTick() {
    let tickID: Object
    function tick() { tickID = setTimeout(tick, 1000) }
    tick()
    return () => clearTimeout(tickID as any)
  }

  return (process: (a: A) => Promise<void>) => {
    const isActive = [true]
    const cancelTick = blockProcessFromExiting ? startTick() : () => {}
    const cancel = () => {
      isActive[0] = false
      queue.rejectAllActive(Cancel)
      cancelTick()
    }

    const tasks: Promise<void>[] = []
    for (let i=0; i<workers; i++) {
      tasks.push(startWorker(isActive, process))
    }

    const all = Promise.all(tasks).then(_ => undefined as void)
    return [all, cancel] as [Promise<void>, () => void]
  }
}

And usage:

async function main() {
  console.log("Starting...")
  const queue = new Queue<string>()

  const [promise, cancel] = consumeQueue(queue, 3, true)(
    async msg => {
      await new Promise(r => setTimeout(r, 1000))
      console.log(msg)
    })

  process.on('SIGINT', async () => {
    console.log("\nCancelling...\n")
    cancel()
  })

  await queue.enqueue("Hello")
  await queue.enqueue("World!")

  // Requires `blockProcessFromExiting` to be `true`
  await promise
  console.log("Done!")
}

main().catch(console.error)
| Written by