Simplify WebSocket keepalive logic

This commit is contained in:
Fedor Indutny 2023-06-06 17:36:38 -07:00 committed by GitHub
parent adc8513f41
commit 7abd2280bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 81 deletions

View file

@ -68,6 +68,7 @@ describe('WebSocket-Resource', () => {
// actual test
new WebSocketResource(socket as WebSocket, {
name: 'test',
handleRequest(request: any) {
assert.strictEqual(request.verb, 'PUT');
assert.strictEqual(request.path, '/some/path');
@ -106,7 +107,9 @@ describe('WebSocket-Resource', () => {
});
// actual test
const resource = new WebSocketResource(socket as WebSocket);
const resource = new WebSocketResource(socket as WebSocket, {
name: 'test',
});
const promise = resource.sendRequest({
verb: 'PUT',
path: '/some/path',
@ -134,14 +137,18 @@ describe('WebSocket-Resource', () => {
sinon.stub(socket, 'close').callsFake(() => done());
const resource = new WebSocketResource(socket as WebSocket);
const resource = new WebSocketResource(socket as WebSocket, {
name: 'test',
});
resource.close();
});
it('force closes the connection', function test(done) {
const socket = new FakeSocket();
const resource = new WebSocketResource(socket as WebSocket);
const resource = new WebSocketResource(socket as WebSocket, {
name: 'test',
});
resource.close();
resource.addEventListener('close', () => done());
@ -164,37 +171,21 @@ describe('WebSocket-Resource', () => {
});
new WebSocketResource(socket as WebSocket, {
name: 'test',
keepalive: { path: '/v1/keepalive' },
});
this.clock.next();
});
it('uses / as a default path', function test(done) {
const socket = new FakeSocket();
sinon.stub(socket, 'sendBytes').callsFake(data => {
const message = Proto.WebSocketMessage.decode(data);
assert.strictEqual(message.type, Proto.WebSocketMessage.Type.REQUEST);
assert.strictEqual(message.request?.verb, 'GET');
assert.strictEqual(message.request?.path, '/');
done();
});
new WebSocketResource(socket as WebSocket, {
keepalive: true,
});
this.clock.next();
});
it('optionally disconnects if no response', function thisNeeded1(done) {
const socket = new FakeSocket();
sinon.stub(socket, 'close').callsFake(() => done());
new WebSocketResource(socket as WebSocket, {
keepalive: true,
name: 'test',
keepalive: { path: '/' },
});
// One to trigger send
@ -210,7 +201,8 @@ describe('WebSocket-Resource', () => {
sinon.stub(socket, 'close').callsFake(() => done());
new WebSocketResource(socket as WebSocket, {
keepalive: true,
name: 'test',
keepalive: { path: '/' },
});
// Just skip one hour immediately
@ -237,7 +229,8 @@ describe('WebSocket-Resource', () => {
});
const resource = new WebSocketResource(socket as WebSocket, {
keepalive: true,
name: 'test',
keepalive: { path: '/' },
});
setTimeout(() => {

View file

@ -142,6 +142,7 @@ export class SocketManager extends EventListener {
path: '/v1/websocket/',
query: { login: username, password },
resourceOptions: {
name: 'authenticated',
keepalive: { path: '/v1/keepalive' },
handleRequest: (req: IncomingWebSocketRequest): void => {
this.queueOrHandleRequest(req);
@ -269,6 +270,7 @@ export class SocketManager extends EventListener {
name: 'provisioning',
path: '/v1/websocket/provisioning/',
resourceOptions: {
name: 'provisioning',
handleRequest: (req: IncomingWebSocketRequest): void => {
handler.handleRequest(req);
},
@ -483,6 +485,7 @@ export class SocketManager extends EventListener {
name: 'unauthenticated',
path: '/v1/websocket/',
resourceOptions: {
name: 'unauthenticated',
keepalive: { path: '/v1/keepalive' },
},
});

View file

@ -25,6 +25,7 @@
import type { connection as WebSocket, IMessage } from 'websocket';
import Long from 'long';
import pTimeout from 'p-timeout';
import type { EventHandler } from './EventTarget';
import EventTarget from './EventTarget';
@ -94,8 +95,9 @@ export type SendRequestResult = Readonly<{
}>;
export type WebSocketResourceOptions = {
name: string;
handleRequest?: (request: IncomingWebSocketRequest) => void;
keepalive?: KeepAliveOptionsType | true;
keepalive?: KeepAliveOptionsType;
};
export class CloseEvent extends Event {
@ -122,15 +124,19 @@ export default class WebSocketResource extends EventTarget {
private shutdownTimer?: Timers.Timeout;
private readonly logId: string;
// Public for tests
public readonly keepalive?: KeepAlive;
constructor(
private readonly socket: WebSocket,
private readonly options: WebSocketResourceOptions = {}
private readonly options: WebSocketResourceOptions
) {
super();
this.logId = `WebSocketResource(${options.name})`;
this.boundOnMessage = this.onMessage.bind(this);
socket.on('message', this.boundOnMessage);
@ -138,25 +144,22 @@ export default class WebSocketResource extends EventTarget {
if (options.keepalive) {
const keepalive = new KeepAlive(
this,
options.keepalive === true ? {} : options.keepalive
options.name,
options.keepalive ?? {}
);
this.keepalive = keepalive;
keepalive.reset();
socket.on('message', () => keepalive.reset());
socket.on('close', () => keepalive.stop());
socket.on('close', () => this.keepalive?.stop());
socket.on('error', (error: Error) => {
log.warn(
'WebSocketResource: WebSocket error',
Errors.toLogFormat(error)
);
log.warn(`${this.logId}: WebSocket error`, Errors.toLogFormat(error));
});
}
socket.on('close', (code, reason) => {
this.closed = true;
log.warn('WebSocketResource: Socket closed');
log.warn(`${this.logId}: Socket closed`);
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
});
@ -218,10 +221,13 @@ export default class WebSocketResource extends EventTarget {
}
if (timedOut) {
log.warn(
`Response received after timeout; id: [${idString}], path: [${
options.path
}], response time: [${Date.now() - sentAt}]`
`${this.logId}: Response received after timeout; ` +
`id: [${idString}], path: [${options.path}], ` +
`response time: ${Date.now() - sentAt}ms`
);
} else {
// Reset keepalive when an on-time response arrives
this.keepalive?.reset();
}
this.removeActive(idString);
resolve(result);
@ -242,11 +248,11 @@ export default class WebSocketResource extends EventTarget {
public close(code = 3000, reason?: string): void {
if (this.closed) {
log.info(`WebSocketResource.close: Already closed! ${code}/${reason}`);
log.info(`${this.logId}.close: Already closed! ${code}/${reason}`);
return;
}
log.info('WebSocketResource.close()');
log.info(`${this.logId}.close(${code})`);
if (this.keepalive) {
this.keepalive.stop();
}
@ -263,9 +269,9 @@ export default class WebSocketResource extends EventTarget {
return;
}
log.warn('WebSocketResource: Dispatching our own socket close event');
log.warn(`${this.logId}.close: Dispatching our own socket close event`);
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
}, 5000);
}, 5 * durations.SECOND);
}
public shutdown(): void {
@ -274,20 +280,20 @@ export default class WebSocketResource extends EventTarget {
}
if (this.activeRequests.size === 0) {
log.info('WebSocketResource: no active requests, closing');
log.info(`${this.logId}.shutdown: no active requests, closing`);
this.close(3000, 'Shutdown');
return;
}
this.shuttingDown = true;
log.info('WebSocketResource: shutting down');
log.info(`${this.logId}.shutdown: shutting down`);
this.shutdownTimer = Timers.setTimeout(() => {
if (this.closed) {
return;
}
log.warn('WebSocketResource: Failed to shutdown gracefully');
log.warn(`${this.logId}.shutdown: Failed to shutdown gracefully`);
this.close(3000, 'Shutdown');
}, THIRTY_SECONDS);
}
@ -370,7 +376,7 @@ export default class WebSocketResource extends EventTarget {
private removeActive(request: IncomingWebSocketRequest | string): void {
if (!this.activeRequests.has(request)) {
log.warn('WebSocketResource: removing unknown request');
log.warn(`${this.logId}.removeActive: removing unknown request`);
return;
}
@ -387,41 +393,47 @@ export default class WebSocketResource extends EventTarget {
this.shutdownTimer = undefined;
}
log.info('WebSocketResource: shutdown complete');
log.info(`${this.logId}.removeActive: shutdown complete`);
this.close(3000, 'Shutdown');
}
}
export type KeepAliveOptionsType = {
path?: string;
disconnect?: boolean;
};
// 30 seconds + 5 seconds for closing the socket above.
const KEEPALIVE_INTERVAL_MS = 30 * durations.SECOND;
const MAX_KEEPALIVE_INTERVAL_MS = 5 * durations.MINUTE;
// If the machine was in suspended mode for more than 5 minutes - trigger
// immediate disconnect.
const STALE_THRESHOLD_MS = 5 * durations.MINUTE;
// If we don't receive a response to keepalive request within 10 seconds -
// close the socket.
const KEEPALIVE_TIMEOUT_MS = 10 * durations.SECOND;
const LOG_KEEPALIVE_AFTER_MS = 500;
class KeepAlive {
private keepAliveTimer: Timers.Timeout | undefined;
private disconnectTimer: Timers.Timeout | undefined;
private path: string;
private disconnect: boolean;
private wsr: WebSocketResource;
private lastAliveAt: number = Date.now();
private logId: string;
constructor(
websocketResource: WebSocketResource,
name: string,
opts: KeepAliveOptionsType = {}
) {
this.logId = `WebSocketResources.KeepAlive(${name})`;
if (websocketResource instanceof WebSocketResource) {
this.path = opts.path !== undefined ? opts.path : '/';
this.disconnect = opts.disconnect !== undefined ? opts.disconnect : true;
this.path = opts.path ?? '/';
this.wsr = websocketResource;
} else {
throw new TypeError('KeepAlive expected a WebSocketResource');
@ -435,8 +447,9 @@ class KeepAlive {
public async send(): Promise<void> {
this.clearTimers();
if (isOlderThan(this.lastAliveAt, MAX_KEEPALIVE_INTERVAL_MS)) {
log.info('WebSocketResources: disconnecting due to stale state');
const isStale = isOlderThan(this.lastAliveAt, STALE_THRESHOLD_MS);
if (isStale) {
log.info(`${this.logId}.send: disconnecting due to stale state`);
this.wsr.close(
3001,
`Last keepalive request was too far in the past: ${this.lastAliveAt}`
@ -444,36 +457,38 @@ class KeepAlive {
return;
}
if (this.disconnect) {
// automatically disconnect if server doesn't ack
this.disconnectTimer = Timers.setTimeout(() => {
log.info('WebSocketResources: disconnecting due to no response');
this.clearTimers();
this.wsr.close(3001, 'No response to keepalive request');
}, 10000);
} else {
this.reset();
}
log.info('WebSocketResources: Sending a keepalive message');
log.info(`${this.logId}.send: Sending a keepalive message`);
const sentAt = Date.now();
const { status } = await this.wsr.sendRequest({
verb: 'GET',
path: this.path,
});
try {
const { status } = await pTimeout(
this.wsr.sendRequest({
verb: 'GET',
path: this.path,
}),
KEEPALIVE_TIMEOUT_MS
);
if (status < 200 || status >= 300) {
log.warn(`${this.logId}.send: keepalive response status ${status}`);
this.wsr.close(3001, `keepalive response with ${status} code`);
return;
}
} catch (error) {
this.wsr.close(3001, 'No response to keepalive request');
return;
}
const responseTime = Date.now() - sentAt;
if (responseTime > LOG_KEEPALIVE_AFTER_MS) {
log.warn(
`Delayed response to keepalive request, response time: [${responseTime}]`
`${this.logId}.send: delayed response to keepalive request, ` +
`response time: ${responseTime}ms`
);
}
if (status >= 200 || status < 300) {
this.reset();
}
// Successful response on time
this.reset();
}
public reset(): void {
@ -492,9 +507,5 @@ class KeepAlive {
Timers.clearTimeout(this.keepAliveTimer);
this.keepAliveTimer = undefined;
}
if (this.disconnectTimer) {
Timers.clearTimeout(this.disconnectTimer);
this.disconnectTimer = undefined;
}
}
}