Improve retry
Registering returns an instance that lets you retry and recover without needing to keep passing the name everywhere. Also refactored the shared process a little to make better use of the retry and downgraded stderr messages to warnings because they aren't critical.
This commit is contained in:
@ -11,7 +11,7 @@ class WebsocketConnection implements ReadWriteConnection {
|
||||
private activeSocket: WebSocket | undefined;
|
||||
private readonly messageBuffer = <Uint8Array[]>[];
|
||||
private readonly socketTimeoutDelay = 60 * 1000;
|
||||
private readonly retryName = "Socket";
|
||||
private readonly retry = retry.register("Socket", () => this.connect());
|
||||
private isUp: boolean = false;
|
||||
private closed: boolean = false;
|
||||
|
||||
@ -26,11 +26,14 @@ class WebsocketConnection implements ReadWriteConnection {
|
||||
public readonly onMessage = this.messageEmitter.event;
|
||||
|
||||
public constructor() {
|
||||
retry.register(this.retryName, () => this.connect());
|
||||
retry.block(this.retryName);
|
||||
retry.run(this.retryName);
|
||||
this.retry.block();
|
||||
this.retry.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data across the socket. If closed, will error. If connecting, will
|
||||
* queue.
|
||||
*/
|
||||
public send(data: Buffer | Uint8Array): void {
|
||||
if (this.closed) {
|
||||
throw new Error("web socket is closed");
|
||||
@ -42,6 +45,9 @@ class WebsocketConnection implements ReadWriteConnection {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close socket connection.
|
||||
*/
|
||||
public close(): void {
|
||||
this.closed = true;
|
||||
this.dispose();
|
||||
@ -75,8 +81,8 @@ class WebsocketConnection implements ReadWriteConnection {
|
||||
field("wasClean", event.wasClean),
|
||||
);
|
||||
if (!this.closed) {
|
||||
retry.block(this.retryName);
|
||||
retry.run(this.retryName);
|
||||
this.retry.block();
|
||||
this.retry.run();
|
||||
}
|
||||
});
|
||||
|
||||
@ -108,15 +114,19 @@ class WebsocketConnection implements ReadWriteConnection {
|
||||
}, this.socketTimeoutDelay);
|
||||
|
||||
await new Promise((resolve, reject): void => {
|
||||
const onClose = (): void => {
|
||||
const doReject = (): void => {
|
||||
clearTimeout(socketWaitTimeout);
|
||||
socket.removeEventListener("close", onClose);
|
||||
socket.removeEventListener("error", doReject);
|
||||
socket.removeEventListener("close", doReject);
|
||||
reject();
|
||||
};
|
||||
socket.addEventListener("close", onClose);
|
||||
socket.addEventListener("error", doReject);
|
||||
socket.addEventListener("close", doReject);
|
||||
|
||||
socket.addEventListener("open", async () => {
|
||||
socket.addEventListener("open", () => {
|
||||
clearTimeout(socketWaitTimeout);
|
||||
socket.removeEventListener("error", doReject);
|
||||
socket.removeEventListener("close", doReject);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
@ -1,14 +1,64 @@
|
||||
import { logger, field } from "@coder/logger";
|
||||
import { NotificationService, INotificationHandle, INotificationService, Severity } from "./fill/notification";
|
||||
|
||||
// tslint:disable no-any can have different return values
|
||||
|
||||
interface IRetryItem {
|
||||
/**
|
||||
* How many times this item has been retried.
|
||||
*/
|
||||
count?: number;
|
||||
delay?: number; // In seconds.
|
||||
end?: number; // In ms.
|
||||
fn(): any | Promise<any>; // tslint:disable-line no-any can have different return values
|
||||
|
||||
/**
|
||||
* In seconds.
|
||||
*/
|
||||
delay?: number;
|
||||
|
||||
/**
|
||||
* In milliseconds.
|
||||
*/
|
||||
end?: number;
|
||||
|
||||
/**
|
||||
* Function to run when retrying.
|
||||
*/
|
||||
fn(): any;
|
||||
|
||||
/**
|
||||
* Timer for running this item.
|
||||
*/
|
||||
timeout?: number | NodeJS.Timer;
|
||||
|
||||
/**
|
||||
* Whether the item is retrying or waiting to retry.
|
||||
*/
|
||||
running?: boolean;
|
||||
showInNotification: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* An retry-able instance.
|
||||
*/
|
||||
export interface RetryInstance {
|
||||
/**
|
||||
* Run this retry.
|
||||
*/
|
||||
run(error?: Error): void;
|
||||
|
||||
/**
|
||||
* Block on this instance.
|
||||
*/
|
||||
block(): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* A retry-able instance that doesn't use a promise so it must be manually
|
||||
* ran again on failure and recovered on success.
|
||||
*/
|
||||
export interface ManualRetryInstance extends RetryInstance {
|
||||
/**
|
||||
* Mark this item as recovered.
|
||||
*/
|
||||
recover(): void;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -21,7 +71,7 @@ interface IRetryItem {
|
||||
* to the user explaining what is happening with an option to immediately retry.
|
||||
*/
|
||||
export class Retry {
|
||||
private items = new Map<string, IRetryItem>();
|
||||
private readonly items = new Map<string, IRetryItem>();
|
||||
|
||||
// Times are in seconds.
|
||||
private readonly retryMinDelay = 1;
|
||||
@ -50,13 +100,54 @@ export class Retry {
|
||||
}
|
||||
|
||||
/**
|
||||
* Block retries when we know they will fail (for example when starting Wush
|
||||
* back up). If a name is passed, that service will still be allowed to retry
|
||||
* Register a function to retry that starts/connects to a service.
|
||||
*
|
||||
* The service is automatically retried or recovered when the promise resolves
|
||||
* or rejects. If the service dies after starting, it must be manually
|
||||
* retried.
|
||||
*/
|
||||
public register(name: string, fn: () => Promise<any>): RetryInstance;
|
||||
/**
|
||||
* Register a function to retry that starts/connects to a service.
|
||||
*
|
||||
* Must manually retry if it fails to start again or dies after restarting and
|
||||
* manually recover if it succeeds in starting again.
|
||||
*/
|
||||
public register(name: string, fn: () => any): ManualRetryInstance;
|
||||
/**
|
||||
* Register a function to retry that starts/connects to a service.
|
||||
*/
|
||||
public register(name: string, fn: () => any): RetryInstance | ManualRetryInstance {
|
||||
if (this.items.has(name)) {
|
||||
throw new Error(`"${name}" is already registered`);
|
||||
}
|
||||
this.items.set(name, { fn });
|
||||
|
||||
return {
|
||||
block: (): void => this.block(name),
|
||||
run: (error?: Error): void => this.run(name, error),
|
||||
recover: (): void => this.recover(name),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Un-register a function to retry.
|
||||
*/
|
||||
public unregister(name: string): void {
|
||||
if (!this.items.has(name)) {
|
||||
throw new Error(`"${name}" is not registered`);
|
||||
}
|
||||
this.items.delete(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Block retries when we know they will fail (for example when the socket is
|
||||
* down ). If a name is passed, that service will still be allowed to retry
|
||||
* (unless we have already blocked).
|
||||
*
|
||||
* Blocking without a name will override a block with a name.
|
||||
*/
|
||||
public block(name?: string): void {
|
||||
private block(name?: string): void {
|
||||
if (!this.blocked || !name) {
|
||||
this.blocked = name || true;
|
||||
this.items.forEach((item) => {
|
||||
@ -68,7 +159,7 @@ export class Retry {
|
||||
/**
|
||||
* Unblock retries and run any that are pending.
|
||||
*/
|
||||
public unblock(): void {
|
||||
private unblock(): void {
|
||||
this.blocked = false;
|
||||
this.items.forEach((item, name) => {
|
||||
if (item.running) {
|
||||
@ -77,35 +168,10 @@ export class Retry {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a function to retry that starts/connects to a service.
|
||||
*
|
||||
* If the function returns a promise, it will automatically be retried,
|
||||
* recover, & unblock after calling `run` once (otherwise they need to be
|
||||
* called manually).
|
||||
*/
|
||||
// tslint:disable-next-line no-any can have different return values
|
||||
public register(name: string, fn: () => any | Promise<any>, showInNotification: boolean = true): void {
|
||||
if (this.items.has(name)) {
|
||||
throw new Error(`"${name}" is already registered`);
|
||||
}
|
||||
this.items.set(name, { fn, showInNotification });
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister a function to retry.
|
||||
*/
|
||||
public unregister(name: string): void {
|
||||
if (!this.items.has(name)) {
|
||||
throw new Error(`"${name}" is not registered`);
|
||||
}
|
||||
this.items.delete(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry a service.
|
||||
*/
|
||||
public run(name: string, error?: Error): void {
|
||||
private run(name: string, error?: Error): void {
|
||||
if (!this.items.has(name)) {
|
||||
throw new Error(`"${name}" is not registered`);
|
||||
}
|
||||
@ -149,7 +215,7 @@ export class Retry {
|
||||
/**
|
||||
* Reset a service after a successfully recovering.
|
||||
*/
|
||||
public recover(name: string): void {
|
||||
private recover(name: string): void {
|
||||
if (!this.items.has(name)) {
|
||||
throw new Error(`"${name}" is not registered`);
|
||||
}
|
||||
@ -191,9 +257,9 @@ export class Retry {
|
||||
if (this.blocked === name) {
|
||||
this.unblock();
|
||||
}
|
||||
}).catch(() => {
|
||||
}).catch((error) => {
|
||||
endItem();
|
||||
this.run(name);
|
||||
this.run(name, error);
|
||||
});
|
||||
} else {
|
||||
endItem();
|
||||
@ -214,8 +280,7 @@ export class Retry {
|
||||
|
||||
const now = Date.now();
|
||||
const items = Array.from(this.items.entries()).filter(([_, item]) => {
|
||||
return item.showInNotification
|
||||
&& typeof item.end !== "undefined"
|
||||
return typeof item.end !== "undefined"
|
||||
&& item.end > now
|
||||
&& item.delay && item.delay >= this.notificationThreshold;
|
||||
}).sort((a, b) => {
|
||||
|
Reference in New Issue
Block a user