182 lines
No EOL
6.9 KiB
JavaScript
182 lines
No EOL
6.9 KiB
JavaScript
"use strict";
|
|
var __importDefault = (this && this.__importDefault) || function (mod) {
|
|
return (mod && mod.__esModule) ? mod : { "default": mod };
|
|
};
|
|
const stream_1 = require("stream");
|
|
const miniget_1 = __importDefault(require("miniget"));
|
|
const m3u8_parser_1 = __importDefault(require("./m3u8-parser"));
|
|
const dash_mpd_parser_1 = __importDefault(require("./dash-mpd-parser"));
|
|
const queue_1 = require("./queue");
|
|
const parse_time_1 = require("./parse-time");
|
|
const supportedParsers = {
|
|
m3u8: m3u8_parser_1.default,
|
|
'dash-mpd': dash_mpd_parser_1.default,
|
|
};
|
|
let m3u8stream = ((playlistURL, options = {}) => {
|
|
const stream = new stream_1.PassThrough();
|
|
const chunkReadahead = options.chunkReadahead || 3;
|
|
// 20 seconds.
|
|
const liveBuffer = options.liveBuffer || 20000;
|
|
const requestOptions = options.requestOptions;
|
|
const Parser = supportedParsers[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')];
|
|
if (!Parser) {
|
|
throw TypeError(`parser '${options.parser}' not supported`);
|
|
}
|
|
let begin = 0;
|
|
if (typeof options.begin !== 'undefined') {
|
|
begin = typeof options.begin === 'string' ?
|
|
parse_time_1.humanStr(options.begin) :
|
|
Math.max(options.begin - liveBuffer, 0);
|
|
}
|
|
const forwardEvents = (req) => {
|
|
for (let event of ['abort', 'request', 'response', 'redirect', 'retry', 'reconnect']) {
|
|
req.on(event, stream.emit.bind(stream, event));
|
|
}
|
|
};
|
|
let currSegment;
|
|
const streamQueue = new queue_1.Queue((req, callback) => {
|
|
currSegment = req;
|
|
// Count the size manually, since the `content-length` header is not
|
|
// always there.
|
|
let size = 0;
|
|
req.on('data', (chunk) => size += chunk.length);
|
|
req.pipe(stream, { end: false });
|
|
req.on('end', () => callback(null, size));
|
|
}, { concurrency: 1 });
|
|
let segmentNumber = 0;
|
|
let downloaded = 0;
|
|
const requestQueue = new queue_1.Queue((segment, callback) => {
|
|
let reqOptions = Object.assign({}, requestOptions);
|
|
if (segment.range) {
|
|
reqOptions.headers = Object.assign({}, reqOptions.headers, {
|
|
Range: `bytes=${segment.range.start}-${segment.range.end}`,
|
|
});
|
|
}
|
|
let req = miniget_1.default(new URL(segment.url, playlistURL).toString(), reqOptions);
|
|
req.on('error', callback);
|
|
forwardEvents(req);
|
|
streamQueue.push(req, (_, size) => {
|
|
downloaded += +size;
|
|
stream.emit('progress', {
|
|
num: ++segmentNumber,
|
|
size: size,
|
|
duration: segment.duration,
|
|
url: segment.url,
|
|
}, requestQueue.total, downloaded);
|
|
callback(null);
|
|
});
|
|
}, { concurrency: chunkReadahead });
|
|
const onError = (err) => {
|
|
if (ended) {
|
|
return;
|
|
}
|
|
stream.emit('error', err);
|
|
// Stop on any error.
|
|
stream.end();
|
|
};
|
|
// When to look for items again.
|
|
let refreshThreshold;
|
|
let minRefreshTime;
|
|
let refreshTimeout;
|
|
let fetchingPlaylist = true;
|
|
let ended = false;
|
|
let isStatic = false;
|
|
let lastRefresh;
|
|
const onQueuedEnd = (err) => {
|
|
currSegment = null;
|
|
if (err) {
|
|
onError(err);
|
|
}
|
|
else if (!fetchingPlaylist && !ended && !isStatic &&
|
|
requestQueue.tasks.length + requestQueue.active <= refreshThreshold) {
|
|
let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh));
|
|
fetchingPlaylist = true;
|
|
refreshTimeout = setTimeout(refreshPlaylist, ms);
|
|
}
|
|
else if ((ended || isStatic) &&
|
|
!requestQueue.tasks.length && !requestQueue.active) {
|
|
stream.end();
|
|
}
|
|
};
|
|
let currPlaylist;
|
|
let lastSeq;
|
|
let starttime = 0;
|
|
const refreshPlaylist = () => {
|
|
lastRefresh = Date.now();
|
|
currPlaylist = miniget_1.default(playlistURL, requestOptions);
|
|
currPlaylist.on('error', onError);
|
|
forwardEvents(currPlaylist);
|
|
const parser = currPlaylist.pipe(new Parser(options.id));
|
|
parser.on('starttime', (a) => {
|
|
if (starttime) {
|
|
return;
|
|
}
|
|
starttime = a;
|
|
if (typeof options.begin === 'string' && begin >= 0) {
|
|
begin += starttime;
|
|
}
|
|
});
|
|
parser.on('endlist', () => { isStatic = true; });
|
|
parser.on('endearly', currPlaylist.unpipe.bind(currPlaylist, parser));
|
|
let addedItems = [];
|
|
const addItem = (item) => {
|
|
if (!item.init) {
|
|
if (item.seq <= lastSeq) {
|
|
return;
|
|
}
|
|
lastSeq = item.seq;
|
|
}
|
|
begin = item.time;
|
|
requestQueue.push(item, onQueuedEnd);
|
|
addedItems.push(item);
|
|
};
|
|
let tailedItems = [], tailedItemsDuration = 0;
|
|
parser.on('item', (item) => {
|
|
let timedItem = Object.assign({ time: starttime }, item);
|
|
if (begin <= timedItem.time) {
|
|
addItem(timedItem);
|
|
}
|
|
else {
|
|
tailedItems.push(timedItem);
|
|
tailedItemsDuration += timedItem.duration;
|
|
// Only keep the last `liveBuffer` of items.
|
|
while (tailedItems.length > 1 &&
|
|
tailedItemsDuration - tailedItems[0].duration > liveBuffer) {
|
|
const lastItem = tailedItems.shift();
|
|
tailedItemsDuration -= lastItem.duration;
|
|
}
|
|
}
|
|
starttime += timedItem.duration;
|
|
});
|
|
parser.on('end', () => {
|
|
currPlaylist = null;
|
|
// If we are too ahead of the stream, make sure to get the
|
|
// latest available items with a small buffer.
|
|
if (!addedItems.length && tailedItems.length) {
|
|
tailedItems.forEach(item => { addItem(item); });
|
|
}
|
|
// Refresh the playlist when remaining segments get low.
|
|
refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01));
|
|
// Throttle refreshing the playlist by looking at the duration
|
|
// of live items added on this refresh.
|
|
minRefreshTime =
|
|
addedItems.reduce((total, item) => item.duration + total, 0);
|
|
fetchingPlaylist = false;
|
|
onQueuedEnd(null);
|
|
});
|
|
};
|
|
refreshPlaylist();
|
|
stream.end = () => {
|
|
ended = true;
|
|
streamQueue.die();
|
|
requestQueue.die();
|
|
clearTimeout(refreshTimeout);
|
|
currPlaylist === null || currPlaylist === void 0 ? void 0 : currPlaylist.destroy();
|
|
currSegment === null || currSegment === void 0 ? void 0 : currSegment.destroy();
|
|
stream_1.PassThrough.prototype.end.call(stream, null);
|
|
};
|
|
return stream;
|
|
});
|
|
m3u8stream.parseTimestamp = parse_time_1.humanStr;
|
|
module.exports = m3u8stream;
|
|
//# sourceMappingURL=index.js.map
|