Extension host (#20)
* Implement net.Server * Move Socket class into Client This way we don't need to expose anything. * Remove some unused imports * Pass environment variables to bootstrap fork * Add debug log for when socket disconnects from server * Use VSCODE_ALLOW_IO for shared process only * Extension host can send messages now * Support callback for logging This lets us do potentially expensive operations which will only be performed if the log level is sufficiently low. * Stop extension host from committing suicide * Blank line * Add static serve (#21) * Add extension URLs * how did i remove this * Fix writing an empty string * Implement dialogs on window service
This commit is contained in:
@ -1,13 +1,16 @@
|
||||
import { ReadWriteConnection, InitData, OperatingSystem, ISharedProcessData } from "../common/connection";
|
||||
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, NewConnectionMessage, NewServerMessage } from "../proto";
|
||||
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage } from "../proto";
|
||||
import { Emitter, Event } from "@coder/events";
|
||||
import { logger, field } from "@coder/logger";
|
||||
import { ChildProcess, SpawnOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server } from "./command";
|
||||
import { ChildProcess, SpawnOptions, ForkOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server } from "./command";
|
||||
|
||||
/**
|
||||
* Client accepts an arbitrary connection intended to communicate with the Server.
|
||||
*/
|
||||
export class Client {
|
||||
|
||||
public Socket: typeof ServerSocket;
|
||||
|
||||
private evalId: number = 0;
|
||||
private evalDoneEmitter: Emitter<EvalDoneMessage> = new Emitter();
|
||||
private evalFailedEmitter: Emitter<EvalFailedMessage> = new Emitter();
|
||||
@ -41,6 +44,15 @@ export class Client {
|
||||
}
|
||||
});
|
||||
|
||||
const that = this;
|
||||
this.Socket = class extends ServerSocket {
|
||||
|
||||
public constructor() {
|
||||
super(that.connection, that.connectionId++, that.registerConnection);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
this.initDataPromise = new Promise((resolve): void => {
|
||||
this.initDataEmitter.event(resolve);
|
||||
});
|
||||
@ -77,7 +89,7 @@ export class Client {
|
||||
const newEval = new NewEvalMessage();
|
||||
const id = this.evalId++;
|
||||
newEval.setId(id);
|
||||
newEval.setArgsList([a1, a2, a3, a4, a5, a6].filter(a => a).map(a => JSON.stringify(a)));
|
||||
newEval.setArgsList([a1, a2, a3, a4, a5, a6].filter(a => typeof a !== "undefined").map(a => JSON.stringify(a)));
|
||||
newEval.setFunction(func.toString());
|
||||
|
||||
const clientMsg = new ClientMessage();
|
||||
@ -158,7 +170,7 @@ export class Client {
|
||||
* @param args Args to add for the module
|
||||
* @param options Options to execute
|
||||
*/
|
||||
public fork(modulePath: string, args: string[] = [], options?: SpawnOptions): ChildProcess {
|
||||
public fork(modulePath: string, args: string[] = [], options?: ForkOptions): ChildProcess {
|
||||
return this.doSpawn(modulePath, args, options, true);
|
||||
}
|
||||
|
||||
@ -167,27 +179,17 @@ export class Client {
|
||||
* Forks a module from bootstrap-fork
|
||||
* @param modulePath Path of the module
|
||||
*/
|
||||
public bootstrapFork(modulePath: string): ChildProcess {
|
||||
return this.doSpawn(modulePath, [], undefined, true, true);
|
||||
public bootstrapFork(modulePath: string, args: string[] = [], options?: ForkOptions): ChildProcess {
|
||||
return this.doSpawn(modulePath, args, options, true, true);
|
||||
}
|
||||
|
||||
public createConnection(path: string, callback?: () => void): Socket;
|
||||
public createConnection(port: number, callback?: () => void): Socket;
|
||||
public createConnection(target: string | number, callback?: () => void): Socket {
|
||||
public createConnection(path: string, callback?: Function): Socket;
|
||||
public createConnection(port: number, callback?: Function): Socket;
|
||||
public createConnection(target: string | number, callback?: Function): Socket;
|
||||
public createConnection(target: string | number, callback?: Function): Socket {
|
||||
const id = this.connectionId++;
|
||||
const newCon = new NewConnectionMessage();
|
||||
newCon.setId(id);
|
||||
if (typeof target === "string") {
|
||||
newCon.setPath(target);
|
||||
} else {
|
||||
newCon.setPort(target);
|
||||
}
|
||||
const clientMsg = new ClientMessage();
|
||||
clientMsg.setNewConnection(newCon);
|
||||
this.connection.send(clientMsg.serializeBinary());
|
||||
|
||||
const socket = new ServerSocket(this.connection, id, callback);
|
||||
this.connections.set(id, socket);
|
||||
const socket = new ServerSocket(this.connection, id, this.registerConnection);
|
||||
socket.connect(target, callback);
|
||||
|
||||
return socket;
|
||||
}
|
||||
@ -214,7 +216,9 @@ export class Client {
|
||||
}
|
||||
if (options.env) {
|
||||
Object.keys(options.env).forEach((envKey) => {
|
||||
newSess.getEnvMap().set(envKey, options.env![envKey]);
|
||||
if (options.env![envKey]) {
|
||||
newSess.getEnvMap().set(envKey, options.env![envKey].toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
if (options.tty) {
|
||||
@ -356,9 +360,9 @@ export class Client {
|
||||
return;
|
||||
}
|
||||
const conId = message.getServerConnectionEstablished()!.getConnectionId();
|
||||
const serverSocket = new ServerSocket(this.connection, conId);
|
||||
const serverSocket = new ServerSocket(this.connection, conId, this.registerConnection);
|
||||
this.registerConnection(conId, serverSocket);
|
||||
serverSocket.emit("connect");
|
||||
this.connections.set(conId, serverSocket);
|
||||
s.emit("connection", serverSocket);
|
||||
} else if (message.getServerFailure()) {
|
||||
const s = this.servers.get(message.getServerFailure()!.getId());
|
||||
@ -376,4 +380,12 @@ export class Client {
|
||||
this.servers.delete(message.getServerClose()!.getId());
|
||||
}
|
||||
}
|
||||
|
||||
private registerConnection = (id: number, socket: ServerSocket): void => {
|
||||
if (this.connections.has(id)) {
|
||||
throw new Error(`${id} is already registered`);
|
||||
}
|
||||
this.connections.set(id, socket);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
import * as events from "events";
|
||||
import * as stream from "stream";
|
||||
import { ReadWriteConnection } from "../common/connection";
|
||||
import { ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage, ServerCloseMessage, NewServerMessage } from "../proto";
|
||||
import { NewConnectionMessage, ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage, ServerCloseMessage, NewServerMessage } from "../proto";
|
||||
|
||||
export interface TTYDimensions {
|
||||
readonly columns: number;
|
||||
@ -10,10 +10,15 @@ export interface TTYDimensions {
|
||||
|
||||
export interface SpawnOptions {
|
||||
cwd?: string;
|
||||
env?: { readonly [key: string]: string };
|
||||
env?: { [key: string]: string };
|
||||
tty?: TTYDimensions;
|
||||
}
|
||||
|
||||
export interface ForkOptions {
|
||||
cwd?: string;
|
||||
env?: { [key: string]: string };
|
||||
}
|
||||
|
||||
export interface ChildProcess {
|
||||
readonly stdin: stream.Writable;
|
||||
readonly stdout: stream.Readable;
|
||||
@ -119,6 +124,9 @@ export interface Socket {
|
||||
write(buffer: Buffer): void;
|
||||
end(): void;
|
||||
|
||||
connect(path: string, callback?: () => void): void;
|
||||
connect(port: number, callback?: () => void): void;
|
||||
|
||||
addListener(event: "data", listener: (data: Buffer) => void): this;
|
||||
addListener(event: "close", listener: (hasError: boolean) => void): this;
|
||||
addListener(event: "connect", listener: () => void): this;
|
||||
@ -151,21 +159,37 @@ export class ServerSocket extends events.EventEmitter implements Socket {
|
||||
public readable: boolean = true;
|
||||
|
||||
private _destroyed: boolean = false;
|
||||
private _connecting: boolean = true;
|
||||
private _connecting: boolean = false;
|
||||
|
||||
public constructor(
|
||||
private readonly connection: ReadWriteConnection,
|
||||
private readonly id: number,
|
||||
connectCallback?: () => void,
|
||||
private readonly beforeConnect: (id: number, socket: ServerSocket) => void,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
if (connectCallback) {
|
||||
this.once("connect", () => {
|
||||
this._connecting = false;
|
||||
connectCallback();
|
||||
});
|
||||
public connect(target: string | number, callback?: Function): void {
|
||||
this._connecting = true;
|
||||
this.beforeConnect(this.id, this);
|
||||
|
||||
this.once("connect", () => {
|
||||
this._connecting = false;
|
||||
if (callback) {
|
||||
callback();
|
||||
}
|
||||
});
|
||||
|
||||
const newCon = new NewConnectionMessage();
|
||||
newCon.setId(this.id);
|
||||
if (typeof target === "string") {
|
||||
newCon.setPath(target);
|
||||
} else {
|
||||
newCon.setPort(target);
|
||||
}
|
||||
const clientMsg = new ClientMessage();
|
||||
clientMsg.setNewConnection(newCon);
|
||||
this.connection.send(clientMsg.serializeBinary());
|
||||
}
|
||||
|
||||
public get destroyed(): boolean {
|
||||
@ -236,6 +260,7 @@ export class ServerSocket extends events.EventEmitter implements Socket {
|
||||
public setDefaultEncoding(encoding: string): this {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
export interface Server {
|
||||
@ -266,6 +291,7 @@ export interface Server {
|
||||
}
|
||||
|
||||
export class ServerListener extends events.EventEmitter implements Server {
|
||||
|
||||
private _listening: boolean = false;
|
||||
|
||||
public constructor(
|
||||
@ -309,11 +335,12 @@ export class ServerListener extends events.EventEmitter implements Server {
|
||||
const clientMsg = new ClientMessage();
|
||||
clientMsg.setServerClose(closeMsg);
|
||||
this.connection.send(clientMsg.serializeBinary());
|
||||
|
||||
|
||||
if (callback) {
|
||||
callback();
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -40,19 +40,38 @@ export class CP {
|
||||
);
|
||||
});
|
||||
|
||||
// @ts-ignore
|
||||
// @ts-ignore TODO: not fully implemented
|
||||
return childProcess;
|
||||
}
|
||||
|
||||
public fork = (modulePath: string, args?: ReadonlyArray<string> | cp.ForkOptions, options?: cp.ForkOptions): cp.ChildProcess => {
|
||||
//@ts-ignore
|
||||
return this.client.bootstrapFork(options && options.env && options.env.AMD_ENTRYPOINT || modulePath);
|
||||
public fork = (modulePath: string, args?: string[] | cp.ForkOptions, options?: cp.ForkOptions): cp.ChildProcess => {
|
||||
if (options && options.env && options.env.AMD_ENTRYPOINT) {
|
||||
// @ts-ignore TODO: not fully implemented
|
||||
return this.client.bootstrapFork(
|
||||
options.env.AMD_ENTRYPOINT,
|
||||
Array.isArray(args) ? args : [],
|
||||
// @ts-ignore TODO: env is a different type
|
||||
Array.isArray(args) || !args ? options : args,
|
||||
);
|
||||
}
|
||||
|
||||
// @ts-ignore TODO: not fully implemented
|
||||
return this.client.fork(
|
||||
modulePath,
|
||||
Array.isArray(args) ? args : [],
|
||||
// @ts-ignore TODO: env is a different type
|
||||
Array.isArray(args) || !args ? options : args,
|
||||
);
|
||||
}
|
||||
|
||||
public spawn = (command: string, args?: ReadonlyArray<string> | cp.SpawnOptions, options?: cp.SpawnOptions): cp.ChildProcess => {
|
||||
// TODO: fix this ignore. Should check for args or options here
|
||||
//@ts-ignore
|
||||
return this.client.spawn(command, args, options);
|
||||
public spawn = (command: string, args?: string[] | cp.SpawnOptions, options?: cp.SpawnOptions): cp.ChildProcess => {
|
||||
// @ts-ignore TODO: not fully implemented
|
||||
return this.client.spawn(
|
||||
command,
|
||||
Array.isArray(args) ? args : [],
|
||||
// @ts-ignore TODO: env is a different type
|
||||
Array.isArray(args) || !args ? options : args,
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -358,9 +358,9 @@ export class FS {
|
||||
return util.promisify(fs.read)(fd, buffer, 0, length, position).then((resp) => {
|
||||
return {
|
||||
bytesRead: resp.bytesRead,
|
||||
content: buffer.toString("utf8"),
|
||||
content: (resp.bytesRead < buffer.length ? buffer.slice(0, resp.bytesRead) : buffer).toString("utf8"),
|
||||
};
|
||||
}):
|
||||
});
|
||||
}, fd, length, position).then((resp) => {
|
||||
const newBuf = Buffer.from(resp.content, "utf8");
|
||||
buffer.set(newBuf, offset);
|
||||
|
@ -13,7 +13,8 @@ export class Net implements NodeNet {
|
||||
) {}
|
||||
|
||||
public get Socket(): typeof net.Socket {
|
||||
throw new Error("not implemented");
|
||||
// @ts-ignore
|
||||
return this.client.Socket;
|
||||
}
|
||||
|
||||
public get Server(): typeof net.Server {
|
||||
@ -24,10 +25,12 @@ export class Net implements NodeNet {
|
||||
throw new Error("not implemented");
|
||||
}
|
||||
|
||||
// tslint:disable-next-line no-any
|
||||
public createConnection(...args: any[]): net.Socket {
|
||||
//@ts-ignore
|
||||
return this.client.createConnection(...args) as net.Socket;
|
||||
public createConnection(target: string | number | net.NetConnectOpts, host?: string | Function, callback?: Function): net.Socket {
|
||||
if (typeof target === "object") {
|
||||
throw new Error("not implemented");
|
||||
}
|
||||
|
||||
return this.client.createConnection(target, typeof host === "function" ? host : callback) as net.Socket;
|
||||
}
|
||||
|
||||
public isIP(_input: string): number {
|
||||
|
@ -3,6 +3,7 @@ import * as net from "net";
|
||||
import * as nodePty from "node-pty";
|
||||
import * as stream from "stream";
|
||||
import { TextEncoder } from "text-encoding";
|
||||
import { Logger, logger, field } from "@coder/logger";
|
||||
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, IdentifySessionMessage, NewConnectionMessage, ConnectionEstablishedMessage, NewConnectionFailureMessage, ConnectionCloseMessage, ConnectionOutputMessage, NewServerMessage, ServerEstablishedMessage, NewServerFailureMessage, ServerCloseMessage, ServerConnectionEstablishedMessage } from "../proto";
|
||||
import { SendableConnection } from "../common/connection";
|
||||
import { ServerOptions } from "./server";
|
||||
@ -25,10 +26,18 @@ export interface Process {
|
||||
}
|
||||
|
||||
export const handleNewSession = (connection: SendableConnection, newSession: NewSessionMessage, serverOptions: ServerOptions | undefined, onExit: () => void): Process => {
|
||||
const childLogger = getChildLogger(newSession.getCommand());
|
||||
childLogger.debug(() => [
|
||||
newSession.getIsFork() ? "Forking" : "Spawning",
|
||||
field("command", newSession.getCommand()),
|
||||
field("args", newSession.getArgsList()),
|
||||
field("env", newSession.getEnvMap().toObject()),
|
||||
]);
|
||||
|
||||
let process: Process;
|
||||
|
||||
const env = {} as any;
|
||||
newSession.getEnvMap().forEach((value: any, key: any) => {
|
||||
const env: { [key: string]: string } = {};
|
||||
newSession.getEnvMap().forEach((value, key) => {
|
||||
env[key] = value;
|
||||
});
|
||||
if (newSession.getTtyDimensions()) {
|
||||
@ -64,14 +73,29 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
|
||||
stderr: proc.stderr,
|
||||
stdout: proc.stdout,
|
||||
stdio: proc.stdio,
|
||||
on: (...args: any[]) => (<any>proc.on)(...args),
|
||||
write: (d) => proc.stdin.write(d),
|
||||
kill: (s) => proc.kill(s || "SIGTERM"),
|
||||
on: (...args: any[]): void => ((proc as any).on)(...args), // tslint:disable-line no-any
|
||||
write: (d): boolean => proc.stdin.write(d),
|
||||
kill: (s): void => proc.kill(s || "SIGTERM"),
|
||||
pid: proc.pid,
|
||||
};
|
||||
}
|
||||
|
||||
const sendOutput = (_source: SessionOutputMessage.Source, msg: string | Uint8Array): void => {
|
||||
childLogger.debug(() => {
|
||||
|
||||
let data = msg.toString();
|
||||
if (_source === SessionOutputMessage.Source.IPC) {
|
||||
data = Buffer.from(msg.toString(), "base64").toString();
|
||||
}
|
||||
|
||||
return [
|
||||
_source === SessionOutputMessage.Source.STDOUT
|
||||
? "stdout"
|
||||
: (_source === SessionOutputMessage.Source.STDERR ? "stderr" : "ipc"),
|
||||
field("id", newSession.getId()),
|
||||
field("data", data),
|
||||
];
|
||||
});
|
||||
const serverMsg = new ServerMessage();
|
||||
const d = new SessionOutputMessage();
|
||||
d.setId(newSession.getId());
|
||||
@ -110,6 +134,7 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
|
||||
connection.send(sm.serializeBinary());
|
||||
|
||||
process.on("exit", (code) => {
|
||||
childLogger.debug("Exited", field("id", newSession.getId()));
|
||||
const serverMsg = new ServerMessage();
|
||||
const exit = new SessionDoneMessage();
|
||||
exit.setId(newSession.getId());
|
||||
@ -124,10 +149,14 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
|
||||
};
|
||||
|
||||
export const handleNewConnection = (connection: SendableConnection, newConnection: NewConnectionMessage, onExit: () => void): net.Socket => {
|
||||
const target = newConnection.getPath() || `${newConnection.getPort()}`;
|
||||
const childLogger = getChildLogger(target, ">");
|
||||
|
||||
const id = newConnection.getId();
|
||||
let socket: net.Socket;
|
||||
let didConnect = false;
|
||||
const connectCallback = () => {
|
||||
const connectCallback = (): void => {
|
||||
childLogger.debug("Connected", field("id", newConnection.getId()), field("target", target));
|
||||
didConnect = true;
|
||||
const estab = new ConnectionEstablishedMessage();
|
||||
estab.setId(id);
|
||||
@ -145,6 +174,7 @@ export const handleNewConnection = (connection: SendableConnection, newConnectio
|
||||
}
|
||||
|
||||
socket.addListener("error", (err) => {
|
||||
childLogger.debug("Error", field("id", newConnection.getId()), field("error", err));
|
||||
if (!didConnect) {
|
||||
const errMsg = new NewConnectionFailureMessage();
|
||||
errMsg.setId(id);
|
||||
@ -158,6 +188,7 @@ export const handleNewConnection = (connection: SendableConnection, newConnectio
|
||||
});
|
||||
|
||||
socket.addListener("close", () => {
|
||||
childLogger.debug("Closed", field("id", newConnection.getId()));
|
||||
if (didConnect) {
|
||||
const closed = new ConnectionCloseMessage();
|
||||
closed.setId(id);
|
||||
@ -170,6 +201,11 @@ export const handleNewConnection = (connection: SendableConnection, newConnectio
|
||||
});
|
||||
|
||||
socket.addListener("data", (data) => {
|
||||
childLogger.debug(() => [
|
||||
"ipc",
|
||||
field("id", newConnection.getId()),
|
||||
field("data", data),
|
||||
]);
|
||||
const dataMsg = new ConnectionOutputMessage();
|
||||
dataMsg.setId(id);
|
||||
dataMsg.setData(data);
|
||||
@ -181,11 +217,15 @@ export const handleNewConnection = (connection: SendableConnection, newConnectio
|
||||
return socket;
|
||||
};
|
||||
|
||||
export const handleNewServer = (connection: SendableConnection, newServer: NewServerMessage, addSocket: (socket: net.Socket) => number, onExit: () => void): net.Server => {
|
||||
export const handleNewServer = (connection: SendableConnection, newServer: NewServerMessage, addSocket: (socket: net.Socket) => number, onExit: () => void, onSocketExit: (id: number) => void): net.Server => {
|
||||
const target = newServer.getPath() || `${newServer.getPort()}`;
|
||||
const childLogger = getChildLogger(target, "|");
|
||||
|
||||
const s = net.createServer();
|
||||
|
||||
try {
|
||||
s.listen(newServer.getPath() ? newServer.getPath() : newServer.getPort(), () => {
|
||||
childLogger.debug("Listening", field("id", newServer.getId()), field("target", target));
|
||||
const se = new ServerEstablishedMessage();
|
||||
se.setId(newServer.getId());
|
||||
const sm = new ServerMessage();
|
||||
@ -193,6 +233,7 @@ export const handleNewServer = (connection: SendableConnection, newServer: NewSe
|
||||
connection.send(sm.serializeBinary());
|
||||
});
|
||||
} catch (ex) {
|
||||
childLogger.debug("Failed to listen", field("id", newServer.getId()), field("target", target));
|
||||
const sf = new NewServerFailureMessage();
|
||||
sf.setId(newServer.getId());
|
||||
const sm = new ServerMessage();
|
||||
@ -203,6 +244,7 @@ export const handleNewServer = (connection: SendableConnection, newServer: NewSe
|
||||
}
|
||||
|
||||
s.on("close", () => {
|
||||
childLogger.debug("Stopped listening", field("id", newServer.getId()), field("target", target));
|
||||
const sc = new ServerCloseMessage();
|
||||
sc.setId(newServer.getId());
|
||||
const sm = new ServerMessage();
|
||||
@ -214,6 +256,7 @@ export const handleNewServer = (connection: SendableConnection, newServer: NewSe
|
||||
|
||||
s.on("connection", (socket) => {
|
||||
const socketId = addSocket(socket);
|
||||
childLogger.debug("Got connection", field("id", newServer.getId()), field("socketId", socketId));
|
||||
|
||||
const sock = new ServerConnectionEstablishedMessage();
|
||||
sock.setServerId(newServer.getId());
|
||||
@ -221,7 +264,54 @@ export const handleNewServer = (connection: SendableConnection, newServer: NewSe
|
||||
const sm = new ServerMessage();
|
||||
sm.setServerConnectionEstablished(sock);
|
||||
connection.send(sm.serializeBinary());
|
||||
|
||||
socket.addListener("data", (data) => {
|
||||
childLogger.debug(() => [
|
||||
"ipc",
|
||||
field("id", newServer.getId()),
|
||||
field("socketId", socketId),
|
||||
field("data", data),
|
||||
]);
|
||||
const dataMsg = new ConnectionOutputMessage();
|
||||
dataMsg.setId(socketId);
|
||||
dataMsg.setData(data);
|
||||
const servMsg = new ServerMessage();
|
||||
servMsg.setConnectionOutput(dataMsg);
|
||||
connection.send(servMsg.serializeBinary());
|
||||
});
|
||||
|
||||
socket.on("error", (error) => {
|
||||
childLogger.debug("Error", field("id", newServer.getId()), field("socketId", socketId), field("error", error));
|
||||
onSocketExit(socketId);
|
||||
});
|
||||
|
||||
socket.on("close", () => {
|
||||
childLogger.debug("Closed", field("id", newServer.getId()), field("socketId", socketId));
|
||||
onSocketExit(socketId);
|
||||
});
|
||||
});
|
||||
|
||||
return s;
|
||||
};
|
||||
|
||||
const getChildLogger = (command: string, prefix: string = ""): Logger => {
|
||||
// TODO: Temporary, for debugging. Should probably ask for a name?
|
||||
let name: string;
|
||||
if (command.includes("vscode-ipc") || command.includes("extensionHost")) {
|
||||
name = "exthost";
|
||||
} else if (command.includes("vscode-online")) {
|
||||
name = "shared";
|
||||
} else {
|
||||
const basename = command.split("/").pop()!;
|
||||
let i = 0;
|
||||
for (; i < basename.length; i++) {
|
||||
const character = basename.charAt(i);
|
||||
if (isNaN(+character) && character === character.toUpperCase()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
name = basename.substring(0, i);
|
||||
}
|
||||
|
||||
return logger.named(prefix + name);
|
||||
};
|
||||
|
@ -4,7 +4,7 @@ import * as path from "path";
|
||||
import { mkdir, WriteStream } from "fs";
|
||||
import { promisify } from "util";
|
||||
import { TextDecoder } from "text-encoding";
|
||||
import { Logger, logger, field } from "@coder/logger";
|
||||
import { logger, field } from "@coder/logger";
|
||||
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto";
|
||||
import { evaluate } from "./evaluate";
|
||||
import { ReadWriteConnection } from "../common/connection";
|
||||
@ -93,42 +93,50 @@ export class Server {
|
||||
|
||||
private handleMessage(message: ClientMessage): void {
|
||||
if (message.hasNewEval()) {
|
||||
evaluate(this.connection, message.getNewEval()!);
|
||||
const evalMessage = message.getNewEval()!;
|
||||
logger.debug("EvalMessage", field("id", evalMessage.getId()));
|
||||
evaluate(this.connection, evalMessage);
|
||||
} else if (message.hasNewSession()) {
|
||||
const sessionMessage = message.getNewSession()!;
|
||||
const childLogger = this.getChildLogger(sessionMessage.getCommand());
|
||||
childLogger.debug(sessionMessage.getIsFork() ? "Forking" : "Spawning", field("args", sessionMessage.getArgsList()));
|
||||
logger.debug("NewSession", field("id", sessionMessage.getId()));
|
||||
const session = handleNewSession(this.connection, sessionMessage, this.options, () => {
|
||||
childLogger.debug("Exited");
|
||||
this.sessions.delete(sessionMessage.getId());
|
||||
});
|
||||
this.sessions.set(message.getNewSession()!.getId(), session);
|
||||
this.sessions.set(sessionMessage.getId(), session);
|
||||
} else if (message.hasCloseSessionInput()) {
|
||||
const s = this.getSession(message.getCloseSessionInput()!.getId());
|
||||
const closeSessionMessage = message.getCloseSessionInput()!;
|
||||
logger.debug("CloseSessionInput", field("id", closeSessionMessage.getId()));
|
||||
const s = this.getSession(closeSessionMessage.getId());
|
||||
if (!s || !s.stdin) {
|
||||
return;
|
||||
}
|
||||
s.stdin.end();
|
||||
} else if (message.hasResizeSessionTty()) {
|
||||
const s = this.getSession(message.getResizeSessionTty()!.getId());
|
||||
const resizeSessionTtyMessage = message.getResizeSessionTty()!;
|
||||
logger.debug("ResizeSessionTty", field("id", resizeSessionTtyMessage.getId()));
|
||||
const s = this.getSession(resizeSessionTtyMessage.getId());
|
||||
if (!s || !s.resize) {
|
||||
return;
|
||||
}
|
||||
const tty = message.getResizeSessionTty()!.getTtyDimensions()!;
|
||||
const tty = resizeSessionTtyMessage.getTtyDimensions()!;
|
||||
s.resize(tty.getWidth(), tty.getHeight());
|
||||
} else if (message.hasShutdownSession()) {
|
||||
const s = this.getSession(message.getShutdownSession()!.getId());
|
||||
const shutdownSessionMessage = message.getShutdownSession()!;
|
||||
logger.debug("ShutdownSession", field("id", shutdownSessionMessage.getId()));
|
||||
const s = this.getSession(shutdownSessionMessage.getId());
|
||||
if (!s) {
|
||||
return;
|
||||
}
|
||||
s.kill(message.getShutdownSession()!.getSignal());
|
||||
s.kill(shutdownSessionMessage.getSignal());
|
||||
} else if (message.hasWriteToSession()) {
|
||||
const s = this.getSession(message.getWriteToSession()!.getId());
|
||||
const writeToSessionMessage = message.getWriteToSession()!;
|
||||
logger.debug("WriteToSession", field("id", writeToSessionMessage.getId()));
|
||||
const s = this.getSession(writeToSessionMessage.getId());
|
||||
if (!s) {
|
||||
return;
|
||||
}
|
||||
const data = new TextDecoder().decode(message.getWriteToSession()!.getData_asU8());
|
||||
const source = message.getWriteToSession()!.getSource();
|
||||
const data = new TextDecoder().decode(writeToSessionMessage.getData_asU8());
|
||||
const source = writeToSessionMessage.getSource();
|
||||
if (source === WriteToSessionMessage.Source.IPC) {
|
||||
if (!s.stdio || !s.stdio[3]) {
|
||||
throw new Error("Cannot send message via IPC to process without IPC");
|
||||
@ -139,48 +147,57 @@ export class Server {
|
||||
}
|
||||
} else if (message.hasNewConnection()) {
|
||||
const connectionMessage = message.getNewConnection()!;
|
||||
const name = connectionMessage.getPath() || `${connectionMessage.getPort()}`;
|
||||
const childLogger = this.getChildLogger(name, ">");
|
||||
childLogger.debug("Connecting", field("path", connectionMessage.getPath()), field("port", connectionMessage.getPort()));
|
||||
logger.debug("NewConnection", field("id", connectionMessage.getId()));
|
||||
if (this.connections.has(connectionMessage.getId())) {
|
||||
throw new Error(`connect EISCONN ${connectionMessage.getPath() || connectionMessage.getPort()}`);
|
||||
}
|
||||
const socket = handleNewConnection(this.connection, connectionMessage, () => {
|
||||
childLogger.debug("Disconnected");
|
||||
this.connections.delete(connectionMessage.getId());
|
||||
});
|
||||
this.connections.set(connectionMessage.getId(), socket);
|
||||
} else if (message.hasConnectionOutput()) {
|
||||
const c = this.getConnection(message.getConnectionOutput()!.getId());
|
||||
const connectionOutputMessage = message.getConnectionOutput()!;
|
||||
logger.debug("ConnectionOuput", field("id", connectionOutputMessage.getId()));
|
||||
const c = this.getConnection(connectionOutputMessage.getId());
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
c.write(Buffer.from(message.getConnectionOutput()!.getData_asU8()));
|
||||
c.write(Buffer.from(connectionOutputMessage.getData_asU8()));
|
||||
} else if (message.hasConnectionClose()) {
|
||||
const c = this.getConnection(message.getConnectionClose()!.getId());
|
||||
const connectionCloseMessage = message.getConnectionClose()!;
|
||||
logger.debug("ConnectionClose", field("id", connectionCloseMessage.getId()));
|
||||
const c = this.getConnection(connectionCloseMessage.getId());
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
c.end();
|
||||
} else if (message.hasNewServer()) {
|
||||
const serverMessage = message.getNewServer()!;
|
||||
const name = serverMessage.getPath() || `${serverMessage.getPort()}`;
|
||||
const childLogger = this.getChildLogger(name);
|
||||
childLogger.debug("Listening", field("path", serverMessage.getPath()), field("port", serverMessage.getPort()));
|
||||
logger.debug("NewServer", field("id", serverMessage.getId()));
|
||||
if (this.servers.has(serverMessage.getId())) {
|
||||
throw new Error("multiple listeners not supported");
|
||||
}
|
||||
const s = handleNewServer(this.connection, serverMessage, (socket) => {
|
||||
const id = this.connectionId--;
|
||||
this.connections.set(id, socket);
|
||||
childLogger.debug("Got connection", field("id", id));
|
||||
|
||||
return id;
|
||||
}, () => {
|
||||
childLogger.debug("Stopped");
|
||||
this.connections.delete(serverMessage.getId());
|
||||
}, (id) => {
|
||||
this.connections.delete(id);
|
||||
});
|
||||
this.servers.set(serverMessage.getId(), s);
|
||||
} else if (message.hasServerClose()) {
|
||||
const s = this.getServer(message.getServerClose()!.getId());
|
||||
const serverCloseMessage = message.getServerClose()!;
|
||||
logger.debug("ServerClose", field("id", serverCloseMessage.getId()));
|
||||
const s = this.getServer(serverCloseMessage.getId());
|
||||
if (!s) {
|
||||
return;
|
||||
}
|
||||
s.close();
|
||||
} else {
|
||||
logger.debug("Received unknown message type");
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,26 +213,4 @@ export class Server {
|
||||
return this.sessions.get(id);
|
||||
}
|
||||
|
||||
private getChildLogger(command: string, prefix: string = ""): Logger {
|
||||
// TODO: Temporary, for debugging. Should probably ask for a name?
|
||||
let name: string;
|
||||
if (command.includes("vscode-ipc")) {
|
||||
name = "exthost";
|
||||
} else if (command.includes("vscode-online")) {
|
||||
name = "shared";
|
||||
} else {
|
||||
const basename = command.split("/").pop()!;
|
||||
let i = 0;
|
||||
for (; i < basename.length; i++) {
|
||||
const character = basename.charAt(i);
|
||||
if (character === character.toUpperCase()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
name = basename.substring(0, i);
|
||||
}
|
||||
|
||||
return logger.named(prefix + name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,15 +4,16 @@ import * as os from "os";
|
||||
import * as path from "path";
|
||||
import { TextEncoder, TextDecoder } from "text-encoding";
|
||||
import { createClient } from "./helpers";
|
||||
import { Net } from "../src/browser/modules/net";
|
||||
|
||||
(<any>global).TextDecoder = TextDecoder;
|
||||
(<any>global).TextEncoder = TextEncoder;
|
||||
(global as any).TextDecoder = TextDecoder; // tslint:disable-line no-any
|
||||
(global as any).TextEncoder = TextEncoder; // tslint:disable-line no-any
|
||||
|
||||
describe("spawn", () => {
|
||||
const client = createClient({
|
||||
dataDirectory: "",
|
||||
workingDirectory: "",
|
||||
forkProvider: (msg) => {
|
||||
forkProvider: (msg): cp.ChildProcess => {
|
||||
return cp.spawn(msg.getCommand(), msg.getArgsList(), {
|
||||
stdio: [null, null, null, "pipe"],
|
||||
});
|
||||
@ -24,7 +25,7 @@ describe("spawn", () => {
|
||||
proc.stdout.on("data", (data) => {
|
||||
expect(data).toEqual("test\n");
|
||||
});
|
||||
proc.on("exit", (code) => {
|
||||
proc.on("exit", (): void => {
|
||||
done();
|
||||
});
|
||||
});
|
||||
@ -41,6 +42,7 @@ describe("spawn", () => {
|
||||
if (first) {
|
||||
// First piece of data is a welcome msg. Second is the prompt
|
||||
first = false;
|
||||
|
||||
return;
|
||||
}
|
||||
expect(data.toString().endsWith("$ ")).toBeTruthy();
|
||||
@ -92,6 +94,7 @@ describe("spawn", () => {
|
||||
|
||||
if (output === 2) {
|
||||
proc.send("tput lines\n");
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -106,6 +109,7 @@ describe("spawn", () => {
|
||||
columns: 10,
|
||||
rows: 50,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -116,6 +120,7 @@ describe("spawn", () => {
|
||||
|
||||
if (output === 6) {
|
||||
proc.send("tput lines\n");
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -123,7 +128,7 @@ describe("spawn", () => {
|
||||
// Echo of tput lines
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (output === 8) {
|
||||
expect(data.toString().trim()).toEqual("50");
|
||||
proc.kill();
|
||||
@ -132,7 +137,7 @@ describe("spawn", () => {
|
||||
});
|
||||
proc.on("exit", () => done());
|
||||
});
|
||||
|
||||
|
||||
it("should fork and echo messages", (done) => {
|
||||
const proc = client.fork(path.join(__dirname, "forker.js"));
|
||||
proc.on("message", (msg) => {
|
||||
@ -146,10 +151,10 @@ describe("spawn", () => {
|
||||
|
||||
describe("createConnection", () => {
|
||||
const client = createClient();
|
||||
const tmpPath = path.join(os.tmpdir(), Math.random().toString());
|
||||
const tmpPath = path.join(os.tmpdir(), Math.random().toString());
|
||||
let server: net.Server;
|
||||
beforeAll(async () => {
|
||||
await new Promise((r) => {
|
||||
await new Promise((r): void => {
|
||||
server = net.createServer().listen(tmpPath, () => {
|
||||
r();
|
||||
});
|
||||
@ -160,11 +165,23 @@ describe("createConnection", () => {
|
||||
server.close();
|
||||
});
|
||||
|
||||
it("should connect to socket", (done) => {
|
||||
const socket = client.createConnection(tmpPath, () => {
|
||||
socket.end();
|
||||
socket.addListener("close", () => {
|
||||
done();
|
||||
it("should connect to socket", async () => {
|
||||
await new Promise((resolve): void => {
|
||||
const socket = client.createConnection(tmpPath, () => {
|
||||
socket.end();
|
||||
socket.addListener("close", () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
await new Promise((resolve): void => {
|
||||
const socket = new (new Net(client)).Socket();
|
||||
socket.connect(tmpPath, () => {
|
||||
socket.end();
|
||||
socket.addListener("close", () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -202,7 +219,7 @@ describe("createConnection", () => {
|
||||
|
||||
describe("createServer", () => {
|
||||
const client = createClient();
|
||||
const tmpPath = path.join(os.tmpdir(), Math.random().toString());
|
||||
const tmpPath = path.join(os.tmpdir(), Math.random().toString());
|
||||
|
||||
it("should connect to server", (done) => {
|
||||
const s = client.createServer(() => {
|
||||
@ -233,4 +250,4 @@ describe("createServer", () => {
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Reference in New Issue
Block a user