Archived
1
0

Get search working and clean up disconnected client (#23)

* Use ipc instead of pipe

* Run callback passed to child process's send method

* It also returns true

* Correct send signature

* Kill processes when client disconnects
This commit is contained in:
Asher 2019-01-29 18:23:30 -06:00 committed by Kyle Carberry
parent b4798d1a48
commit 3a88ae5fb2
No known key found for this signature in database
GPG Key ID: A0409BDB6B0B3EDB
6 changed files with 49 additions and 31 deletions

View File

@ -30,8 +30,8 @@ export interface ChildProcess {
kill(signal?: string): void; kill(signal?: string): void;
send(message: string | Uint8Array, ipc?: false): void; send(message: string | Uint8Array, callback?: () => void, ipc?: false): void;
send(message: any, ipc: true): void; send(message: any, callback: undefined | (() => void), ipc: true): void;
on(event: "message", listener: (data: any) => void): void; on(event: "message", listener: (data: any) => void): void;
on(event: "error", listener: (err: Error) => void): void; on(event: "error", listener: (err: Error) => void): void;
@ -101,7 +101,7 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess {
this._connected = false; this._connected = false;
} }
public send(message: string | Uint8Array | any, ipc: boolean = this.ipc): void { public send(message: string | Uint8Array | any, callback?: (error: Error | null) => void, ipc: boolean = this.ipc): boolean {
const send = new WriteToSessionMessage(); const send = new WriteToSessionMessage();
send.setId(this.id); send.setId(this.id);
send.setSource(ipc ? WriteToSessionMessage.Source.IPC : WriteToSessionMessage.Source.STDIN); send.setSource(ipc ? WriteToSessionMessage.Source.IPC : WriteToSessionMessage.Source.STDIN);
@ -113,6 +113,12 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess {
const client = new ClientMessage(); const client = new ClientMessage();
client.setWriteToSession(send); client.setWriteToSession(send);
this.connection.send(client.serializeBinary()); this.connection.send(client.serializeBinary());
// TODO: properly implement?
if (callback) {
callback(null);
}
return true;
} }
public resize(dimensions: TTYDimensions): void { public resize(dimensions: TTYDimensions): void {

View File

@ -13,11 +13,12 @@ export interface Process {
stdin?: stream.Writable; stdin?: stream.Writable;
stdout?: stream.Readable; stdout?: stream.Readable;
stderr?: stream.Readable; stderr?: stream.Readable;
send?: (message: string) => void;
pid: number; pid: number;
killed?: boolean; killed?: boolean;
on(event: "data", cb: (data: string) => void): void; on(event: "data" | "message", cb: (data: string) => void): void;
on(event: "exit", listener: (exitCode: number, signal?: number) => void): void; on(event: "exit", listener: (exitCode: number, signal?: number) => void): void;
write(data: string | Uint8Array): void; write(data: string | Uint8Array): void;
resize?(cols: number, rows: number): void; resize?(cols: number, rows: number): void;
@ -61,7 +62,7 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
connection.send(sm.serializeBinary()); connection.send(sm.serializeBinary());
} }
}, 200); }, 200);
ptyProc.on("exit", () => { ptyProc.on("exit", () => {
clearTimeout(timer); clearTimeout(timer);
}); });
@ -93,6 +94,9 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
stderr: proc.stderr, stderr: proc.stderr,
stdout: proc.stdout, stdout: proc.stdout,
stdio: proc.stdio, stdio: proc.stdio,
send: (message): void => {
proc.send(message);
},
on: (...args: any[]): void => ((proc as any).on)(...args), // tslint:disable-line no-any on: (...args: any[]): void => ((proc as any).on)(...args), // tslint:disable-line no-any
write: (d): boolean => proc.stdin.write(d), write: (d): boolean => proc.stdin.write(d),
kill: (s): void => proc.kill(s || "SIGTERM"), kill: (s): void => proc.kill(s || "SIGTERM"),
@ -105,7 +109,7 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
let data = msg.toString(); let data = msg.toString();
if (_source === SessionOutputMessage.Source.IPC) { if (_source === SessionOutputMessage.Source.IPC) {
data = Buffer.from(msg.toString(), "base64").toString(); // data = Buffer.from(msg.toString(), "base64").toString();
} }
return [ return [
@ -139,10 +143,10 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
}); });
} }
if (process.stdio && process.stdio[3]) { // IPC.
// We have ipc fd if (process.send) {
process.stdio[3].on("data", (data) => { process.on("message", (data) => {
sendOutput(SessionOutputMessage.Source.IPC, data); sendOutput(SessionOutputMessage.Source.IPC, JSON.stringify(data));
}); });
} }

View File

@ -1,7 +1,7 @@
import * as os from "os"; import * as os from "os";
import * as cp from "child_process"; import * as cp from "child_process";
import * as path from "path"; import * as path from "path";
import { mkdir, WriteStream } from "fs"; import { mkdir } from "fs";
import { promisify } from "util"; import { promisify } from "util";
import { TextDecoder } from "text-encoding"; import { TextDecoder } from "text-encoding";
import { logger, field } from "@coder/logger"; import { logger, field } from "@coder/logger";
@ -37,6 +37,17 @@ export class Server {
logger.error("Failed to handle client message", field("length", data.byteLength), field("exception", ex)); logger.error("Failed to handle client message", field("length", data.byteLength), field("exception", ex));
} }
}); });
connection.onClose(() => {
this.sessions.forEach((s) => {
s.kill();
});
this.connections.forEach((c) => {
c.destroy();
});
this.servers.forEach((s) => {
s.close();
});
});
if (!options) { if (!options) {
logger.warn("No server options provided. InitMessage will not be sent."); logger.warn("No server options provided. InitMessage will not be sent.");
@ -97,7 +108,12 @@ export class Server {
private handleMessage(message: ClientMessage): void { private handleMessage(message: ClientMessage): void {
if (message.hasNewEval()) { if (message.hasNewEval()) {
const evalMessage = message.getNewEval()!; const evalMessage = message.getNewEval()!;
logger.debug("EvalMessage", field("id", evalMessage.getId())); logger.debug(() => [
"EvalMessage",
field("id", evalMessage.getId()),
field("args", evalMessage.getArgsList()),
field("function", evalMessage.getFunction()),
]);
evaluate(this.connection, evalMessage); evaluate(this.connection, evalMessage);
} else if (message.hasNewSession()) { } else if (message.hasNewSession()) {
const sessionMessage = message.getNewSession()!; const sessionMessage = message.getNewSession()!;
@ -141,10 +157,10 @@ export class Server {
const data = new TextDecoder().decode(writeToSessionMessage.getData_asU8()); const data = new TextDecoder().decode(writeToSessionMessage.getData_asU8());
const source = writeToSessionMessage.getSource(); const source = writeToSessionMessage.getSource();
if (source === WriteToSessionMessage.Source.IPC) { if (source === WriteToSessionMessage.Source.IPC) {
if (!s.stdio || !s.stdio[3]) { if (!s.send) {
throw new Error("Cannot send message via IPC to process without IPC"); throw new Error("Cannot send message via IPC to process without IPC");
} }
(s.stdio[3] as WriteStream).write(data); s.send(JSON.parse(data));
} else { } else {
s.write(data); s.write(data);
} }

View File

@ -15,7 +15,7 @@ describe("spawn", () => {
workingDirectory: "", workingDirectory: "",
forkProvider: (msg): cp.ChildProcess => { forkProvider: (msg): cp.ChildProcess => {
return cp.spawn(msg.getCommand(), msg.getArgsList(), { return cp.spawn(msg.getCommand(), msg.getArgsList(), {
stdio: [null, null, null, "pipe"], stdio: [null, null, null, "ipc"],
}); });
}, },
}); });
@ -144,7 +144,7 @@ describe("spawn", () => {
expect(msg.bananas).toBeTruthy(); expect(msg.bananas).toBeTruthy();
proc.kill(); proc.kill();
}); });
proc.send({ bananas: true }, true); proc.send({ bananas: true }, undefined, true);
proc.on("exit", () => done()); proc.on("exit", () => done());
}); });
}); });

View File

@ -1,6 +1,5 @@
import * as cp from "child_process"; import * as cp from "child_process";
import * as fs from "fs"; import * as fs from "fs";
import * as net from "net";
import * as path from "path"; import * as path from "path";
export const requireModule = (modulePath: string): void => { export const requireModule = (modulePath: string): void => {
@ -8,23 +7,12 @@ export const requireModule = (modulePath: string): void => {
const xml = require("xhr2"); const xml = require("xhr2");
(<any>global).XMLHttpRequest = xml.XMLHttpRequest; // tslint:disable-next-line no-any this makes installing extensions work.
(global as any).XMLHttpRequest = xml.XMLHttpRequest;
// Always do this so we can see console.logs. // Always do this so we can see console.logs.
// process.env.VSCODE_ALLOW_IO = "true"; // process.env.VSCODE_ALLOW_IO = "true";
if (!process.send) {
const socket = new net.Socket({ fd: 3 });
socket.on("data", (data) => {
process.emit("message", JSON.parse(data.toString()), undefined);
});
// tslint:disable-next-line no-any
process.send = (message: any): void => {
socket.write(JSON.stringify(message));
};
}
const content = fs.readFileSync(path.join(process.env.BUILD_DIR as string || path.join(__dirname, "../.."), "./build/bootstrap-fork.js")); const content = fs.readFileSync(path.join(process.env.BUILD_DIR as string || path.join(__dirname, "../.."), "./build/bootstrap-fork.js"));
eval(content.toString()); eval(content.toString());
}; };
@ -45,7 +33,7 @@ export const forkModule = (modulePath: string, env?: NodeJS.ProcessEnv): cp.Chil
args.push("--env", JSON.stringify(env)); args.push("--env", JSON.stringify(env));
} }
const options: cp.SpawnOptions = { const options: cp.SpawnOptions = {
stdio: [null, null, null, "pipe"], stdio: [null, null, null, "ipc"],
}; };
if (process.env.CLI === "true") { if (process.env.CLI === "true") {
proc = cp.spawn(process.execPath, args, options); proc = cp.spawn(process.execPath, args, options);

4
packages/web/yarn.lock Normal file
View File

@ -0,0 +1,4 @@
# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY.
# yarn lockfile v1