"use strict"; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; const stream_1 = require("stream"); const miniget_1 = __importDefault(require("miniget")); const m3u8_parser_1 = __importDefault(require("./m3u8-parser")); const dash_mpd_parser_1 = __importDefault(require("./dash-mpd-parser")); const queue_1 = require("./queue"); const parse_time_1 = require("./parse-time"); const supportedParsers = { m3u8: m3u8_parser_1.default, 'dash-mpd': dash_mpd_parser_1.default, }; let m3u8stream = ((playlistURL, options = {}) => { const stream = new stream_1.PassThrough(); const chunkReadahead = options.chunkReadahead || 3; // 20 seconds. const liveBuffer = options.liveBuffer || 20000; const requestOptions = options.requestOptions; const Parser = supportedParsers[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')]; if (!Parser) { throw TypeError(`parser '${options.parser}' not supported`); } let begin = 0; if (typeof options.begin !== 'undefined') { begin = typeof options.begin === 'string' ? parse_time_1.humanStr(options.begin) : Math.max(options.begin - liveBuffer, 0); } const forwardEvents = (req) => { for (let event of ['abort', 'request', 'response', 'redirect', 'retry', 'reconnect']) { req.on(event, stream.emit.bind(stream, event)); } }; let currSegment; const streamQueue = new queue_1.Queue((req, callback) => { currSegment = req; // Count the size manually, since the `content-length` header is not // always there. let size = 0; req.on('data', (chunk) => size += chunk.length); req.pipe(stream, { end: false }); req.on('end', () => callback(null, size)); }, { concurrency: 1 }); let segmentNumber = 0; let downloaded = 0; const requestQueue = new queue_1.Queue((segment, callback) => { let reqOptions = Object.assign({}, requestOptions); if (segment.range) { reqOptions.headers = Object.assign({}, reqOptions.headers, { Range: `bytes=${segment.range.start}-${segment.range.end}`, }); } let req = miniget_1.default(new URL(segment.url, playlistURL).toString(), reqOptions); req.on('error', callback); forwardEvents(req); streamQueue.push(req, (_, size) => { downloaded += +size; stream.emit('progress', { num: ++segmentNumber, size: size, duration: segment.duration, url: segment.url, }, requestQueue.total, downloaded); callback(null); }); }, { concurrency: chunkReadahead }); const onError = (err) => { if (ended) { return; } stream.emit('error', err); // Stop on any error. stream.end(); }; // When to look for items again. let refreshThreshold; let minRefreshTime; let refreshTimeout; let fetchingPlaylist = true; let ended = false; let isStatic = false; let lastRefresh; const onQueuedEnd = (err) => { currSegment = null; if (err) { onError(err); } else if (!fetchingPlaylist && !ended && !isStatic && requestQueue.tasks.length + requestQueue.active <= refreshThreshold) { let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh)); fetchingPlaylist = true; refreshTimeout = setTimeout(refreshPlaylist, ms); } else if ((ended || isStatic) && !requestQueue.tasks.length && !requestQueue.active) { stream.end(); } }; let currPlaylist; let lastSeq; let starttime = 0; const refreshPlaylist = () => { lastRefresh = Date.now(); currPlaylist = miniget_1.default(playlistURL, requestOptions); currPlaylist.on('error', onError); forwardEvents(currPlaylist); const parser = currPlaylist.pipe(new Parser(options.id)); parser.on('starttime', (a) => { if (starttime) { return; } starttime = a; if (typeof options.begin === 'string' && begin >= 0) { begin += starttime; } }); parser.on('endlist', () => { isStatic = true; }); parser.on('endearly', currPlaylist.unpipe.bind(currPlaylist, parser)); let addedItems = []; const addItem = (item) => { if (!item.init) { if (item.seq <= lastSeq) { return; } lastSeq = item.seq; } begin = item.time; requestQueue.push(item, onQueuedEnd); addedItems.push(item); }; let tailedItems = [], tailedItemsDuration = 0; parser.on('item', (item) => { let timedItem = Object.assign({ time: starttime }, item); if (begin <= timedItem.time) { addItem(timedItem); } else { tailedItems.push(timedItem); tailedItemsDuration += timedItem.duration; // Only keep the last `liveBuffer` of items. while (tailedItems.length > 1 && tailedItemsDuration - tailedItems[0].duration > liveBuffer) { const lastItem = tailedItems.shift(); tailedItemsDuration -= lastItem.duration; } } starttime += timedItem.duration; }); parser.on('end', () => { currPlaylist = null; // If we are too ahead of the stream, make sure to get the // latest available items with a small buffer. if (!addedItems.length && tailedItems.length) { tailedItems.forEach(item => { addItem(item); }); } // Refresh the playlist when remaining segments get low. refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01)); // Throttle refreshing the playlist by looking at the duration // of live items added on this refresh. minRefreshTime = addedItems.reduce((total, item) => item.duration + total, 0); fetchingPlaylist = false; onQueuedEnd(null); }); }; refreshPlaylist(); stream.end = () => { ended = true; streamQueue.die(); requestQueue.die(); clearTimeout(refreshTimeout); currPlaylist === null || currPlaylist === void 0 ? void 0 : currPlaylist.destroy(); currSegment === null || currSegment === void 0 ? void 0 : currSegment.destroy(); stream_1.PassThrough.prototype.end.call(stream, null); }; return stream; }); m3u8stream.parseTimestamp = parse_time_1.humanStr; module.exports = m3u8stream; //# sourceMappingURL=index.js.map