Add shared process active message (#16)
* Add shared process active message * Add client function for calling bootstrap fork
This commit is contained in:
@ -1,12 +1,13 @@
|
||||
import { SharedProcessInitMessage } from "@coder/protocol/src/proto";
|
||||
import { field, logger } from "@coder/logger";
|
||||
import { ServerMessage, SharedProcessActiveMessage } from "@coder/protocol/src/proto";
|
||||
import { Command, flags } from "@oclif/command";
|
||||
import { logger, field } from "@coder/logger";
|
||||
import * as fs from "fs";
|
||||
import * as os from "os";
|
||||
import * as path from "path";
|
||||
import { requireModule } from "./vscode/bootstrapFork";
|
||||
import * as WebSocket from "ws";
|
||||
import { createApp } from "./server";
|
||||
import { SharedProcess } from './vscode/sharedProcess';
|
||||
import { requireModule } from "./vscode/bootstrapFork";
|
||||
import { SharedProcess, SharedProcessState } from './vscode/sharedProcess';
|
||||
|
||||
export class Entry extends Command {
|
||||
|
||||
@ -69,15 +70,22 @@ export class Entry extends Command {
|
||||
logger.info("Initializing", field("data-dir", dataDir), field("working-dir", workingDir));
|
||||
const sharedProcess = new SharedProcess(dataDir);
|
||||
logger.info("Starting shared process...", field("socket", sharedProcess.socketPath));
|
||||
sharedProcess.onWillRestart(() => {
|
||||
logger.info("Restarting shared process...");
|
||||
|
||||
sharedProcess.ready.then(() => {
|
||||
logger.info("Shared process has restarted!");
|
||||
});
|
||||
});
|
||||
sharedProcess.ready.then(() => {
|
||||
logger.info("Shared process has started up!");
|
||||
const sendSharedProcessReady = (socket: WebSocket) => {
|
||||
const active = new SharedProcessActiveMessage();
|
||||
active.setSocketPath(sharedProcess.socketPath);
|
||||
const serverMessage = new ServerMessage();
|
||||
serverMessage.setSharedProcessActive(active);
|
||||
socket.send(serverMessage.serializeBinary());
|
||||
};
|
||||
sharedProcess.onState((event) => {
|
||||
if (event.state === SharedProcessState.Stopped) {
|
||||
logger.error("Shared process stopped. Restarting...", field("error", event.error));
|
||||
} else if (event.state === SharedProcessState.Starting) {
|
||||
logger.info("Starting shared process...");
|
||||
} else if (event.state === SharedProcessState.Ready) {
|
||||
logger.info("Shared process is ready!");
|
||||
app.wss.clients.forEach((c) => sendSharedProcessReady(c));
|
||||
}
|
||||
});
|
||||
|
||||
const app = createApp((app) => {
|
||||
@ -108,13 +116,12 @@ export class Entry extends Command {
|
||||
let clientId = 1;
|
||||
app.wss.on("connection", (ws, req) => {
|
||||
const id = clientId++;
|
||||
const spm = (<any>req).sharedProcessInit as SharedProcessInitMessage;
|
||||
if (!spm) {
|
||||
logger.warn("Received a socket without init data. Not sure how this happened.");
|
||||
|
||||
return;
|
||||
if (sharedProcess.state === SharedProcessState.Ready) {
|
||||
sendSharedProcessReady(ws);
|
||||
}
|
||||
logger.info(`WebSocket opened \u001B[0m${req.url}`, field("client", id), field("ip", req.socket.remoteAddress), field("window_id", spm.getWindowId()), field("log_directory", spm.getLogDirectory()));
|
||||
|
||||
logger.info(`WebSocket opened \u001B[0m${req.url}`, field("client", id), field("ip", req.socket.remoteAddress));
|
||||
|
||||
ws.on("close", (code) => {
|
||||
logger.info(`WebSocket closed \u001B[0m${req.url}`, field("client", id), field("code", code));
|
||||
|
@ -1,10 +1,11 @@
|
||||
import { ReadWriteConnection } from "@coder/protocol";
|
||||
import { Server, ServerOptions } from "@coder/protocol/src/node/server";
|
||||
import { NewSessionMessage } from '@coder/protocol/src/proto';
|
||||
import { ChildProcess } from "child_process";
|
||||
import * as express from "express";
|
||||
import * as http from "http";
|
||||
import * as ws from "ws";
|
||||
import * as url from "url";
|
||||
import { ClientMessage, SharedProcessInitMessage } from '@coder/protocol/src/proto';
|
||||
import { forkModule } from "./vscode/bootstrapFork";
|
||||
|
||||
export const createApp = (registerMiddleware?: (app: express.Application) => void, options?: ServerOptions): {
|
||||
readonly express: express.Application;
|
||||
@ -19,27 +20,7 @@ export const createApp = (registerMiddleware?: (app: express.Application) => voi
|
||||
const wss = new ws.Server({ server });
|
||||
|
||||
wss.shouldHandle = (req): boolean => {
|
||||
if (typeof req.url === "undefined") {
|
||||
return false;
|
||||
}
|
||||
|
||||
const parsedUrl = url.parse(req.url, true);
|
||||
const sharedProcessInit = parsedUrl.query["shared_process_init"];
|
||||
if (typeof sharedProcessInit === "undefined" || Array.isArray(sharedProcessInit)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
const msg = ClientMessage.deserializeBinary(Buffer.from(sharedProcessInit, "base64"));
|
||||
if (!msg.hasSharedProcessInit()) {
|
||||
return false;
|
||||
}
|
||||
const spm = msg.getSharedProcessInit()!;
|
||||
(<any>req).sharedProcessInit = spm;
|
||||
} catch (ex) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Should handle auth here
|
||||
return true;
|
||||
};
|
||||
|
||||
@ -59,7 +40,20 @@ export const createApp = (registerMiddleware?: (app: express.Application) => voi
|
||||
onClose: (cb): void => ws.addEventListener("close", () => cb()),
|
||||
};
|
||||
|
||||
const server = new Server(connection, options);
|
||||
const server = new Server(connection, options ? {
|
||||
...options,
|
||||
forkProvider: (message: NewSessionMessage): ChildProcess => {
|
||||
let proc: ChildProcess;
|
||||
|
||||
if (message.getIsBootstrapFork()) {
|
||||
proc = forkModule(message.getCommand());
|
||||
} else {
|
||||
throw new Error("No support for non bootstrap-forking yet");
|
||||
}
|
||||
|
||||
return proc;
|
||||
},
|
||||
} : undefined);
|
||||
});
|
||||
|
||||
/**
|
||||
|
@ -9,41 +9,47 @@ import { ParsedArgs } from "vs/platform/environment/common/environment";
|
||||
import { LogLevel } from "vs/platform/log/common/log";
|
||||
import { Emitter, Event } from '@coder/events/src';
|
||||
|
||||
export enum SharedProcessState {
|
||||
Stopped,
|
||||
Starting,
|
||||
Ready,
|
||||
}
|
||||
|
||||
export type SharedProcessEvent = {
|
||||
readonly state: SharedProcessState.Ready | SharedProcessState.Starting;
|
||||
} | {
|
||||
readonly state: SharedProcessState.Stopped;
|
||||
readonly error: string;
|
||||
}
|
||||
|
||||
export class SharedProcess {
|
||||
public readonly socketPath: string = path.join(os.tmpdir(), `.vscode-online${Math.random().toString()}`);
|
||||
private _ready: Promise<void> | undefined;
|
||||
private _state: SharedProcessState = SharedProcessState.Stopped;
|
||||
private activeProcess: ChildProcess | undefined;
|
||||
private ipcHandler: StdioIpcHandler | undefined;
|
||||
private readonly willRestartEmitter: Emitter<void>;
|
||||
private readonly onStateEmitter: Emitter<SharedProcessEvent>;
|
||||
|
||||
public constructor(
|
||||
private readonly userDataDir: string,
|
||||
) {
|
||||
this.willRestartEmitter = new Emitter();
|
||||
this.onStateEmitter = new Emitter();
|
||||
|
||||
this.restart();
|
||||
}
|
||||
|
||||
public get onWillRestart(): Event<void> {
|
||||
return this.willRestartEmitter.event;
|
||||
public get onState(): Event<SharedProcessEvent> {
|
||||
return this.onStateEmitter.event;
|
||||
}
|
||||
|
||||
public get ready(): Promise<void> {
|
||||
return this._ready!;
|
||||
public get state(): SharedProcessState {
|
||||
return this._state;
|
||||
}
|
||||
|
||||
public restart(): void {
|
||||
if (this.activeProcess) {
|
||||
this.willRestartEmitter.emit();
|
||||
}
|
||||
|
||||
if (this.activeProcess && !this.activeProcess.killed) {
|
||||
this.activeProcess.kill();
|
||||
}
|
||||
|
||||
let resolve: () => void;
|
||||
let reject: (err: Error) => void;
|
||||
|
||||
const extensionsDir = path.join(this.userDataDir, "extensions");
|
||||
const mkdir = (dir: string): void => {
|
||||
try {
|
||||
@ -57,14 +63,18 @@ export class SharedProcess {
|
||||
mkdir(this.userDataDir);
|
||||
mkdir(extensionsDir);
|
||||
|
||||
this._ready = new Promise<void>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
this.setState({
|
||||
state: SharedProcessState.Starting,
|
||||
});
|
||||
|
||||
let resolved: boolean = false;
|
||||
this.activeProcess = forkModule("vs/code/electron-browser/sharedProcess/sharedProcessMain");
|
||||
this.activeProcess.on("exit", () => {
|
||||
this.activeProcess.on("exit", (err) => {
|
||||
if (this._state !== SharedProcessState.Stopped) {
|
||||
this.setState({
|
||||
error: `Exited with ${err}`,
|
||||
state: SharedProcessState.Stopped,
|
||||
});
|
||||
}
|
||||
this.restart();
|
||||
});
|
||||
this.ipcHandler = new StdioIpcHandler(this.activeProcess);
|
||||
@ -86,11 +96,20 @@ export class SharedProcess {
|
||||
});
|
||||
this.ipcHandler.once("handshake:im ready", () => {
|
||||
resolved = true;
|
||||
resolve();
|
||||
this.setState({
|
||||
state: SharedProcessState.Ready,
|
||||
});
|
||||
});
|
||||
this.activeProcess.stderr.on("data", (data) => {
|
||||
if (!resolved) {
|
||||
reject(data.toString());
|
||||
this.setState({
|
||||
error: data.toString(),
|
||||
state: SharedProcessState.Stopped,
|
||||
});
|
||||
if (!this.activeProcess) {
|
||||
return;
|
||||
}
|
||||
this.activeProcess.kill();
|
||||
} else {
|
||||
logger.named("SHRD PROC").debug("stderr", field("message", data.toString()));
|
||||
}
|
||||
@ -103,4 +122,9 @@ export class SharedProcess {
|
||||
}
|
||||
this.ipcHandler = undefined;
|
||||
}
|
||||
|
||||
private setState(event: SharedProcessEvent): void {
|
||||
this._state = event.state;
|
||||
this.onStateEmitter.emit(event);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user