diff --git a/streaming/index.js b/streaming/index.js index 42c6bd8471b..de02b758a7b 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -91,6 +91,33 @@ const parseJSON = (json, req) => { } }; +/** + * Parses the query string from a request object. + * @param {Request?} req + * @returns {Record | undefined} + */ +const parseQueryString = (req) => { + if (!req.url) { + return undefined; + } + const url = new URL(req.url, "http:."); + const params = Array.from(url.searchParams); + return params.reduce((parsed, [key, value]) => { + if (parsed.hasOwnProperty(key)) { + if (Array.isArray(parsed[key])) { + parsed[key].push(value); + } + else { + parsed[key] = [parsed[key], value]; + } + } + else { + parsed[key] = value; + } + return parsed; + }, {}); +} + // Used for priming the counters/gauges for the various metrics that are // per-channel const CHANNEL_NAMES = [ @@ -391,8 +418,8 @@ const startServer = async () => { */ const accountFromRequest = (req) => new Promise((resolve, reject) => { const authorization = req.headers.authorization; - const location = req.url ? url.parse(req.url, true) : undefined; - const accessToken = location?.query.access_token || req.headers['sec-websocket-protocol']; + const query = parseQueryString(req); + const accessToken = query?.access_token || req.headers['sec-websocket-protocol']; if (!authorization && !accessToken) { reject(new AuthenticationError('Missing access token')); @@ -1298,8 +1325,8 @@ const startServer = async () => { * @param {import('pino').Logger} log */ function onConnection(ws, req, log) { - // Note: url.parse could throw, which would terminate the connection, so we - // increment the connected clients metric straight away when we establish + // In case the handler throws, which would terminate the connection, + // increment the connected clients metric straight away when it establishes // the connection, without waiting: metrics.connectedClients.labels({ type: 'websocket' }).inc(); @@ -1381,11 +1408,10 @@ const startServer = async () => { subscribeWebsocketToSystemChannel(session); - // Parse the URL for the connection arguments (if supplied), url.parse can throw: - const location = req.url && url.parse(req.url, true); - - if (location && location.query.stream) { - subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); + // Parse the URL for the connection arguments (if supplied) + const query = parseQueryString(req); + if (query && query.stream) { + subscribeWebsocketToChannel(session, firstParam(query.stream), query); } }