delivery.js

const { Asset } = require('./asset')
const { EventEmitter } = require('events')
const { Source } = require('./source')
const { demux } = require('./demux')
const { Pool } = require('nanoresource-pool')
const assert = require('assert')
const Batch = require('batch')
const uuid = require('uuid/v4')
const os = require('os')

/**
 * Default concurrency of source probing (`ffprobe`).
 * @private
 */
const DEFAULT_PROBE_CONCURRENCY = 2 * os.cpus().length

/**
 * Default concurrency of source probing (`ffmpeg`).
 * @private
 */
const DEFAULT_DEMUX_CONCURRENCY = os.cpus().length

/**
 * The `Delivery` class represents a container of multiple
 * sources.
 * @public
 * @class
 * @extends nanoresource-pool
 */
class Delivery extends Pool {

  /**
   * `Delivery` class constructor.
   * @param {(Object)} opts
   * @param {(String)} opts.id
   */
  constructor(opts) {
    super(Source, opts)

    if (!opts || 'object' !== typeof opts) {
      opts = {}
    }

    this.id = opts.id || uuid()

    this.date = Date.now()
    this.dateIso = new Date(this.date).toISOString()
  }

  /**
   * All assets in the delivery pool, including
   * child delivery pools.
   * @accessor
   * @type {Array<Source>}
   */
  get assets() {
    return this.sources.filter(s => s instanceof Asset)
  }

  /**
   * All sources in the delivery pool, including
   * child delivery pools.
   * @accessor
   * @type {Array<Source>}
   */
  get sources() {
    return this.query()
  }

  /**
   * Creates and adds a new Asset from a URI. Associates it with the Delivery.
   * @param {String} uri
   * @param {Object} opts
   * @return {Asset}
   */
  associate(uri, opts={}) {
    // Do not allow a new resource to be created if one already exists in the
    // pool with the given `uri`
    if (this.sources.some(s => s.uri.includes(uri))) {
      return this.sources.filter(s => s.uri.includes(uri))[0]
    }
    const asset = new Asset(uri, { associations: opts.associations || [this.id]})
    return this.add(asset)
  }

  /**
   * Creates and adds a new source from a URI.
   * @param {String} uri
   * @param {Object} opts
   * @return {Source}
   */
  source(uri, opts) {
    // Do not allow a new resource to be created if one already exists in the
    // pool with the given `uri`
    if (this.sources.some(s => s.uri.includes(uri))) {
      return this.sources.filter(s => s.uri.includes(uri))[0]
    }
    return this.resource(uri, opts)
  }

  /**
   * Probes all sources in delivery pool.
   * @param {Object} opts
   * @param {Function} callback
   */
  probe(opts, callback) {
    if ('function' === typeof opts) {
      callback = opts
    }

    if (!opts || 'object' !== typeof opts) {
      opts = {}
    }

    assert('function' === typeof callback, 'callback is not a function')

    const { concurrency = DEFAULT_PROBE_CONCURRENCY } = opts
    const { sources } = this
    const probes = {}
    const batch = new Batch().concurrency(concurrency)

    for (const source of sources) {
      batch.push((next) => {
        source.probe((err, info) => {
          if (err) { return next(err) }
          probes[source.uri] = info
          next(null)
        })
      })
    }

    batch.end((err) => {
      if (err) { return callback(err) }
      callback(null, probes)
    })
  }

  /**
   * An alias for `probe()`.
   * @param {Function} callback
   */
  stat(callback) {
    this.probe(callback)
  }

  /**
   * Demux sources into output streams. Outputs
   * @param {(Object)} opts
   * @param {Function} callback
   * @return {EventEmitter}
   */
  demux(opts, callback) {
    if ('function' === typeof opts) {
      callback = opts
    }

    if (!opts || 'object' !== typeof opts) {
      opts = {}
    }

    assert('function' === typeof callback, 'callback is not a function')

    const { concurrency = DEFAULT_PROBE_CONCURRENCY } = opts
    const { sources } = this
    const demuxes = {}
    const emitter = new EventEmitter()
    const batch = new Batch()

    batch.concurrency(concurrency)
    emitter.setMaxListeners(0)

    for (const source of sources) {
      batch.push((next) => {
        const demuxer = demux(source, opts, (err, outputs) => {
          if (err) { return next(err) }
          demuxes[source.uri] = outputs
          next(null)
        })

        const proxy = (event) => {
          demuxer.on(event, (...args) => {
            emitter.emit(event, ...args.concat(source))
          })
        }

        proxy('codecData')
        proxy('end')
        proxy('error')
        proxy('progress')
        proxy('start')
        proxy('stderr')
      })
    }

    batch.end((err) => {
      if (err) { return callback(err) }
      callback(null, demuxes)
    })

    return emitter
  }
}

/**
 * Module exports.
 */
module.exports = {
  Delivery
}