WebSocket API for CDS

This commit is contained in:
Fedor Indutny 2021-11-09 00:32:31 +01:00 committed by GitHub
parent 4cb1ea9e5d
commit 409bf1fc82
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 436 additions and 109 deletions

View file

@ -5,21 +5,18 @@ import URL from 'url';
import ProxyAgent from 'proxy-agent';
import type { RequestInit } from 'node-fetch';
import { Response, Headers } from 'node-fetch';
import { client as WebSocketClient } from 'websocket';
import type { connection as WebSocket } from 'websocket';
import qs from 'querystring';
import EventListener from 'events';
import { AbortableProcess } from '../util/AbortableProcess';
import type { AbortableProcess } from '../util/AbortableProcess';
import { strictAssert } from '../util/assert';
import { explodePromise } from '../util/explodePromise';
import { BackOff, FIBONACCI_TIMEOUTS } from '../util/BackOff';
import { getUserAgent } from '../util/getUserAgent';
import * as durations from '../util/durations';
import { sleep } from '../util/sleep';
import { SocketStatus } from '../types/SocketStatus';
import * as Errors from '../types/errors';
import * as Bytes from '../Bytes';
import * as Timers from '../Timers';
import * as log from '../logging/log';
import type {
@ -27,11 +24,9 @@ import type {
IncomingWebSocketRequest,
} from './WebsocketResources';
import WebSocketResource from './WebsocketResources';
import { ConnectTimeoutError, HTTPError } from './Errors';
import { handleStatusCode, translateError } from './Utils';
import { HTTPError } from './Errors';
import type { WebAPICredentials, IRequestHandler } from './Types.d';
const TEN_SECONDS = 10 * durations.SECOND;
import { connect as connectWebSocket } from './WebSocket';
const FIVE_MINUTES = 5 * durations.MINUTE;
@ -472,112 +467,29 @@ export class SocketManager extends EventListener {
path,
resourceOptions,
query = {},
timeout = TEN_SECONDS,
}: {
path: string;
resourceOptions: WebSocketResourceOptions;
query?: Record<string, string>;
timeout?: number;
}): AbortableProcess<WebSocketResource> {
const fixedScheme = this.options.url
.replace('https://', 'wss://')
.replace('http://', 'ws://');
const headers = {
'User-Agent': getUserAgent(this.options.version),
};
const client = new WebSocketClient({
tlsOptions: {
ca: this.options.certificateAuthority,
agent: this.proxyAgent,
},
maxReceivedFrameSize: 0x210000,
});
const queryWithDefaults = {
agent: 'OWD',
version: this.options.version,
...query,
};
client.connect(
`${fixedScheme}${path}?${qs.encode(queryWithDefaults)}`,
undefined,
undefined,
headers
);
const url = `${this.options.url}${path}?${qs.encode(queryWithDefaults)}`;
const { stack } = new Error();
return connectWebSocket({
url,
certificateAuthority: this.options.certificateAuthority,
version: this.options.version,
proxyAgent: this.proxyAgent,
const { promise, resolve, reject } = explodePromise<WebSocketResource>();
const timer = Timers.setTimeout(() => {
reject(new ConnectTimeoutError('Connection timed out'));
client.abort();
}, timeout);
let resource: WebSocketResource | undefined;
client.on('connect', socket => {
Timers.clearTimeout(timer);
resource = new WebSocketResource(socket, resourceOptions);
resolve(resource);
});
client.on('httpResponse', async response => {
Timers.clearTimeout(timer);
const statusCode = response.statusCode || -1;
await handleStatusCode(statusCode);
const error = new HTTPError(
'connectResource: invalid websocket response',
{
code: statusCode || -1,
headers: {},
stack,
}
);
const translatedError = translateError(error);
strictAssert(
translatedError,
'`httpResponse` event cannot be emitted with 200 status code'
);
reject(translatedError);
});
client.on('connectFailed', e => {
Timers.clearTimeout(timer);
reject(
new HTTPError('connectResource: connectFailed', {
code: -1,
headers: {},
response: e.toString(),
stack,
})
);
});
return new AbortableProcess<WebSocketResource>(
`SocketManager.connectResource(${path})`,
{
abort() {
if (resource) {
log.warn(`SocketManager closing socket ${path}`);
resource.close(3000, 'aborted');
} else {
log.warn(`SocketManager aborting connection ${path}`);
Timers.clearTimeout(timer);
client.abort();
}
},
createResource(socket: WebSocket): WebSocketResource {
return new WebSocketResource(socket, resourceOptions);
},
promise
);
});
}
private static async checkResource(