Implement file uploads
This commit is contained in:
443
src/upload.ts
443
src/upload.ts
@ -1,23 +1,64 @@
|
||||
import { exec } from "child_process";
|
||||
import { appendFile } from "fs";
|
||||
import { promisify } from "util";
|
||||
import { logger } from "@coder/logger";
|
||||
import { escapePath } from "@coder/protocol";
|
||||
import { NotificationService, INotificationService, ProgressService, IProgressService, IProgress, Severity } from "./fill/notification";
|
||||
import { generateUuid } from "vs/base/common/uuid";
|
||||
import { DesktopDragAndDropData } from "vs/base/browser/ui/list/listView";
|
||||
import { VSBuffer, VSBufferReadable } from "vs/base/common/buffer";
|
||||
import { Emitter, Event } from "vs/base/common/event";
|
||||
import { Disposable } from "vs/base/common/lifecycle";
|
||||
import * as path from "vs/base/common/path";
|
||||
import { URI } from "vs/base/common/uri";
|
||||
import { IFileService } from "vs/platform/files/common/files";
|
||||
import { createDecorator, ServiceIdentifier, IInstantiationService } from 'vs/platform/instantiation/common/instantiation';
|
||||
import { INotificationService, Severity } from "vs/platform/notification/common/notification";
|
||||
import { IProgress, IProgressStep, IProgressService, ProgressLocation } from "vs/platform/progress/common/progress";
|
||||
import { ExplorerItem } from "vs/workbench/contrib/files/common/explorerModel";
|
||||
import { IEditorGroup } from "vs/workbench/services/editor/common/editorGroupsService";
|
||||
import { IWorkspaceContextService } from "vs/platform/workspace/common/workspace";
|
||||
import { IWindowsService } from "vs/platform/windows/common/windows";
|
||||
import { IEditorService } from "vs/workbench/services/editor/common/editorService";
|
||||
|
||||
export interface IURI {
|
||||
readonly path: string;
|
||||
readonly fsPath: string;
|
||||
readonly scheme: string;
|
||||
export const IUploadService = createDecorator<IUploadService>("uploadService");
|
||||
|
||||
export interface IUploadService {
|
||||
_serviceBrand: ServiceIdentifier<any>;
|
||||
handleDrop(event: DragEvent, resolveTargetGroup: () => IEditorGroup | undefined, afterDrop: (targetGroup: IEditorGroup | undefined) => void, targetIndex?: number): Promise<void>;
|
||||
handleExternalDrop(data: DesktopDragAndDropData, target: ExplorerItem, originalEvent: DragEvent): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents an uploadable directory, so we can query for existing files once.
|
||||
*/
|
||||
interface IUploadableDirectory {
|
||||
existingFiles: string[];
|
||||
filesToUpload: Map<string, File>;
|
||||
preparePromise?: Promise<void>;
|
||||
export class UploadService extends Disposable implements IUploadService {
|
||||
public _serviceBrand: any;
|
||||
public upload: Upload;
|
||||
|
||||
public constructor(
|
||||
@IInstantiationService instantiationService: IInstantiationService,
|
||||
@IWorkspaceContextService private readonly contextService: IWorkspaceContextService,
|
||||
@IWindowsService private readonly windowsService: IWindowsService,
|
||||
@IEditorService private readonly editorService: IEditorService,
|
||||
) {
|
||||
super();
|
||||
this.upload = instantiationService.createInstance(Upload);
|
||||
}
|
||||
|
||||
public async handleDrop(event: DragEvent, resolveTargetGroup: () => IEditorGroup | undefined, afterDrop: (targetGroup: IEditorGroup | undefined) => void, targetIndex?: number): Promise<void> {
|
||||
// TODO: should use the workspace for the editor it was dropped on?
|
||||
const target =this.contextService.getWorkspace().folders[0].uri;
|
||||
const uris = (await this.upload.uploadDropped(event, target)).map((u) => URI.file(u));
|
||||
if (uris.length > 0) {
|
||||
await this.windowsService.addRecentlyOpened(uris.map((u) => ({ fileUri: u })));
|
||||
}
|
||||
const editors = uris.map((uri) => ({
|
||||
resource: uri,
|
||||
options: {
|
||||
pinned: true,
|
||||
index: targetIndex,
|
||||
},
|
||||
}));
|
||||
const targetGroup = resolveTargetGroup();
|
||||
this.editorService.openEditors(editors, targetGroup);
|
||||
afterDrop(targetGroup);
|
||||
}
|
||||
|
||||
public async handleExternalDrop(_data: DesktopDragAndDropData, target: ExplorerItem, originalEvent: DragEvent): Promise<void> {
|
||||
await this.upload.uploadDropped(originalEvent, target.resource);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -36,55 +77,37 @@ interface IEntry {
|
||||
/**
|
||||
* Handles file uploads.
|
||||
*/
|
||||
export class Upload {
|
||||
class Upload {
|
||||
private readonly maxParallelUploads = 100;
|
||||
private readonly readSize = 32000; // ~32kb max while reading in the file.
|
||||
private readonly packetSize = 32000; // ~32kb max when writing.
|
||||
private readonly logger = logger.named("Upload");
|
||||
private readonly currentlyUploadingFiles = new Map<string, File>();
|
||||
private readonly queueByDirectory = new Map<string, IUploadableDirectory>();
|
||||
private progress: IProgress | undefined;
|
||||
private readonly uploadingFiles = new Map<string, Reader | undefined>();
|
||||
private readonly fileQueue = new Map<string, File>();
|
||||
private progress: IProgress<IProgressStep> | undefined;
|
||||
private uploadPromise: Promise<string[]> | undefined;
|
||||
private resolveUploadPromise: (() => void) | undefined;
|
||||
private finished = 0;
|
||||
private uploadedFilePaths = <string[]>[];
|
||||
private total = 0;
|
||||
private _total = 0;
|
||||
private _uploaded = 0;
|
||||
private lastPercent = 0;
|
||||
|
||||
public constructor(
|
||||
private _notificationService: INotificationService,
|
||||
private _progressService: IProgressService,
|
||||
@INotificationService private notificationService: INotificationService,
|
||||
@IProgressService private progressService: IProgressService,
|
||||
@IFileService private fileService: IFileService,
|
||||
) {}
|
||||
|
||||
public set notificationService(service: INotificationService) {
|
||||
this._notificationService = service;
|
||||
}
|
||||
|
||||
public get notificationService(): INotificationService {
|
||||
return this._notificationService;
|
||||
}
|
||||
|
||||
public set progressService(service: IProgressService) {
|
||||
this._progressService = service;
|
||||
}
|
||||
|
||||
public get progressService(): IProgressService {
|
||||
return this._progressService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload dropped files. This will try to upload everything it can. Errors
|
||||
* will show via notifications. If an upload operation is ongoing, the files
|
||||
* will be added to that operation.
|
||||
*/
|
||||
public async uploadDropped(event: DragEvent, uploadDir: IURI): Promise<string[]> {
|
||||
this.addDirectory(uploadDir.path);
|
||||
public async uploadDropped(event: DragEvent, uploadDir: URI): Promise<string[]> {
|
||||
await this.queueFiles(event, uploadDir);
|
||||
this.logger.debug( // -1 so we don't include the uploadDir itself.
|
||||
`Uploading ${this.queueByDirectory.size - 1} directories and ${this.total} files`,
|
||||
);
|
||||
await this.prepareDirectories();
|
||||
if (!this.uploadPromise) {
|
||||
this.uploadPromise = this.progressService.start("Uploading files...", (progress) => {
|
||||
this.uploadPromise = this.progressService.withProgress({
|
||||
cancellable: true,
|
||||
location: ProgressLocation.Notification,
|
||||
title: "Uploading files...",
|
||||
}, (progress) => {
|
||||
return new Promise((resolve): void => {
|
||||
this.progress = progress;
|
||||
this.resolveUploadPromise = (): void => {
|
||||
@ -92,17 +115,15 @@ export class Upload {
|
||||
this.uploadPromise = undefined;
|
||||
this.resolveUploadPromise = undefined;
|
||||
this.uploadedFilePaths = [];
|
||||
this.finished = 0;
|
||||
this.total = 0;
|
||||
this.lastPercent = 0;
|
||||
this._uploaded = 0;
|
||||
this._total = 0;
|
||||
resolve(uploaded);
|
||||
};
|
||||
});
|
||||
}, () => {
|
||||
this.cancel();
|
||||
});
|
||||
}, () => this.cancel());
|
||||
}
|
||||
this.uploadFiles();
|
||||
|
||||
return this.uploadPromise;
|
||||
}
|
||||
|
||||
@ -110,180 +131,118 @@ export class Upload {
|
||||
* Cancel all file uploads.
|
||||
*/
|
||||
public async cancel(): Promise<void> {
|
||||
this.currentlyUploadingFiles.clear();
|
||||
this.queueByDirectory.clear();
|
||||
this.fileQueue.clear();
|
||||
this.uploadingFiles.forEach((r) => r && r.abort());
|
||||
}
|
||||
|
||||
private get total(): number { return this._total; }
|
||||
private set total(total: number) {
|
||||
this._total = total;
|
||||
this.updateProgress();
|
||||
}
|
||||
|
||||
private get uploaded(): number { return this._uploaded; }
|
||||
private set uploaded(uploaded: number) {
|
||||
this._uploaded = uploaded;
|
||||
this.updateProgress();
|
||||
}
|
||||
|
||||
private updateProgress(): void {
|
||||
if (this.progress && this.total > 0) {
|
||||
const percent = Math.floor((this.uploaded / this.total) * 100);
|
||||
this.progress.report({ increment: percent - this.lastPercent });
|
||||
this.lastPercent = percent;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create directories and get existing files.
|
||||
* On failure, show the error and remove the failed directory from the queue.
|
||||
*/
|
||||
private async prepareDirectories(): Promise<void> {
|
||||
await Promise.all(Array.from(this.queueByDirectory).map(([path, dir]) => {
|
||||
if (!dir.preparePromise) {
|
||||
dir.preparePromise = this.prepareDirectory(path, dir);
|
||||
}
|
||||
|
||||
return dir.preparePromise;
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a directory and get existing files.
|
||||
* On failure, show the error and remove the directory from the queue.
|
||||
*/
|
||||
private async prepareDirectory(path: string, dir: IUploadableDirectory): Promise<void> {
|
||||
await Promise.all([
|
||||
promisify(exec)(`mkdir -p ${escapePath(path)}`).catch((error) => {
|
||||
const message = error.message.toLowerCase();
|
||||
if (message.includes("file exists")) {
|
||||
throw new Error(`Unable to create directory at ${path} because a file exists there`);
|
||||
}
|
||||
throw new Error(error.message || `Unable to upload ${path}`);
|
||||
}),
|
||||
// Only get files, so we don't show an override option that will just
|
||||
// fail anyway.
|
||||
promisify(exec)(`find ${escapePath(path)} -maxdepth 1 -not -type d`).then((stdio) => {
|
||||
dir.existingFiles = stdio.stdout.split("\n");
|
||||
}),
|
||||
]).catch((error) => {
|
||||
this.queueByDirectory.delete(path);
|
||||
this.notificationService.error(error);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload as many files as possible. When finished, resolve the upload promise.
|
||||
* Upload as many files as possible. When finished, resolve the upload
|
||||
* promise.
|
||||
*/
|
||||
private uploadFiles(): void {
|
||||
const finishFileUpload = (path: string): void => {
|
||||
++this.finished;
|
||||
this.currentlyUploadingFiles.delete(path);
|
||||
this.progress!.report(Math.floor((this.finished / this.total) * 100));
|
||||
this.uploadFiles();
|
||||
};
|
||||
while (this.queueByDirectory.size > 0 && this.currentlyUploadingFiles.size < this.maxParallelUploads) {
|
||||
const [dirPath, dir] = this.queueByDirectory.entries().next().value;
|
||||
if (dir.filesToUpload.size === 0) {
|
||||
this.queueByDirectory.delete(dirPath);
|
||||
continue;
|
||||
while (this.fileQueue.size > 0 && this.uploadingFiles.size < this.maxParallelUploads) {
|
||||
const [path, file] = this.fileQueue.entries().next().value;
|
||||
this.fileQueue.delete(path);
|
||||
if (this.uploadingFiles.has(path)) {
|
||||
this.notificationService.error(new Error(`Already uploading ${path}`));
|
||||
} else {
|
||||
this.uploadingFiles.set(path, undefined);
|
||||
this.uploadFile(path, file).catch((error) => {
|
||||
this.notificationService.error(error);
|
||||
}).finally(() => {
|
||||
this.uploadingFiles.delete(path);
|
||||
this.uploadFiles();
|
||||
});
|
||||
}
|
||||
const [filePath, item] = dir.filesToUpload.entries().next().value;
|
||||
this.currentlyUploadingFiles.set(filePath, item);
|
||||
dir.filesToUpload.delete(filePath);
|
||||
this.uploadFile(filePath, item, dir.existingFiles).then(() => {
|
||||
finishFileUpload(filePath);
|
||||
}).catch((error) => {
|
||||
this.notificationService.error(error);
|
||||
finishFileUpload(filePath);
|
||||
});
|
||||
}
|
||||
if (this.queueByDirectory.size === 0 && this.currentlyUploadingFiles.size === 0) {
|
||||
if (this.fileQueue.size === 0 && this.uploadingFiles.size === 0) {
|
||||
this.resolveUploadPromise!();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a file.
|
||||
* Upload a file, asking to override if necessary.
|
||||
*/
|
||||
private async uploadFile(path: string, file: File, existingFiles: string[]): Promise<void> {
|
||||
if (existingFiles.includes(path)) {
|
||||
const shouldOverwrite = await new Promise((resolve): void => {
|
||||
private async uploadFile(filePath: string, file: File): Promise<void> {
|
||||
const uri = URI.file(filePath);
|
||||
if (await this.fileService.exists(uri)) {
|
||||
const overwrite = await new Promise<boolean>((resolve): void => {
|
||||
this.notificationService.prompt(
|
||||
Severity.Error,
|
||||
`${path} already exists. Overwrite?`,
|
||||
[{
|
||||
label: "Yes",
|
||||
run: (): void => resolve(true),
|
||||
}, {
|
||||
label: "No",
|
||||
run: (): void => resolve(false),
|
||||
}],
|
||||
() => resolve(false),
|
||||
`${filePath} already exists. Overwrite?`,
|
||||
[
|
||||
{ label: "Yes", run: (): void => resolve(true) },
|
||||
{ label: "No", run: (): void => resolve(false) },
|
||||
],
|
||||
{ onCancel: () => resolve(false) },
|
||||
);
|
||||
});
|
||||
if (!shouldOverwrite) {
|
||||
if (!overwrite) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
await new Promise(async (resolve, reject): Promise<void> => {
|
||||
let readOffset = 0;
|
||||
const reader = new FileReader();
|
||||
const seek = (): void => {
|
||||
const slice = file.slice(readOffset, readOffset + this.readSize);
|
||||
readOffset += this.readSize;
|
||||
reader.readAsArrayBuffer(slice);
|
||||
};
|
||||
|
||||
const rm = async (): Promise<void> => {
|
||||
await promisify(exec)(`rm -f ${escapePath(path)}`);
|
||||
};
|
||||
|
||||
await rm();
|
||||
|
||||
const load = async (): Promise<void> => {
|
||||
const buffer = new Uint8Array(reader.result as ArrayBuffer);
|
||||
let bufferOffset = 0;
|
||||
|
||||
while (bufferOffset <= buffer.length) {
|
||||
// Got canceled while sending data.
|
||||
if (!this.currentlyUploadingFiles.has(path)) {
|
||||
await rm();
|
||||
|
||||
return resolve();
|
||||
}
|
||||
const data = buffer.slice(bufferOffset, bufferOffset + this.packetSize);
|
||||
|
||||
try {
|
||||
await promisify(appendFile)(path, data);
|
||||
} catch (error) {
|
||||
await rm();
|
||||
|
||||
const message = error.message.toLowerCase();
|
||||
if (message.includes("no space")) {
|
||||
return reject(new Error("You are out of disk space"));
|
||||
} else if (message.includes("is a directory")) {
|
||||
return reject(new Error(`Unable to upload ${path} because there is a directory there`));
|
||||
}
|
||||
|
||||
return reject(new Error(error.message || `Unable to upload ${path}`));
|
||||
}
|
||||
|
||||
bufferOffset += this.packetSize;
|
||||
}
|
||||
|
||||
if (readOffset >= file.size) {
|
||||
this.uploadedFilePaths.push(path);
|
||||
|
||||
return resolve();
|
||||
}
|
||||
|
||||
seek();
|
||||
};
|
||||
|
||||
reader.addEventListener("load", load);
|
||||
|
||||
seek();
|
||||
const tempUri = uri.with({
|
||||
path: path.join(
|
||||
path.dirname(uri.path),
|
||||
`.code-server-partial-upload-${path.basename(uri.path)}-${generateUuid()}`,
|
||||
),
|
||||
});
|
||||
const reader = new Reader(file);
|
||||
reader.onData((data) => {
|
||||
if (data && data.length > 0) {
|
||||
this.uploaded += data.byteLength;
|
||||
}
|
||||
});
|
||||
reader.onAbort(() => {
|
||||
const remaining = file.size - reader.offset;
|
||||
if (remaining > 0) {
|
||||
this.uploaded += remaining;
|
||||
}
|
||||
});
|
||||
this.uploadingFiles.set(filePath, reader);
|
||||
await this.fileService.writeFile(tempUri, reader);
|
||||
if (reader.aborted) {
|
||||
await this.fileService.del(tempUri);
|
||||
} else {
|
||||
await this.fileService.move(tempUri, uri, true);
|
||||
this.uploadedFilePaths.push(filePath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue files from a drop event. We have to get the files first; we can't do
|
||||
* it in tandem with uploading or the entries will disappear.
|
||||
*/
|
||||
private async queueFiles(event: DragEvent, uploadDir: IURI): Promise<void> {
|
||||
if (!event.dataTransfer || !event.dataTransfer.items) {
|
||||
return;
|
||||
}
|
||||
private async queueFiles(event: DragEvent, uploadDir: URI): Promise<void> {
|
||||
const promises: Array<Promise<void>> = [];
|
||||
for (let i = 0; i < event.dataTransfer.items.length; i++) {
|
||||
for (let i = 0; event.dataTransfer && event.dataTransfer.items && i < event.dataTransfer.items.length; ++i) {
|
||||
const item = event.dataTransfer.items[i];
|
||||
if (typeof item.webkitGetAsEntry === "function") {
|
||||
promises.push(this.traverseItem(item.webkitGetAsEntry(), uploadDir.fsPath).catch(this.notificationService.error));
|
||||
promises.push(this.traverseItem(item.webkitGetAsEntry(), uploadDir.fsPath));
|
||||
} else {
|
||||
const file = item.getAsFile();
|
||||
if (file) {
|
||||
this.addFile(uploadDir.fsPath, uploadDir.fsPath + "/" + file.name, file);
|
||||
this.addFile(uploadDir.fsPath + "/" + file.name, file);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -293,23 +252,15 @@ export class Upload {
|
||||
/**
|
||||
* Traverses an entry and add files to the queue.
|
||||
*/
|
||||
private async traverseItem(entry: IEntry, parentPath: string): Promise<void> {
|
||||
private async traverseItem(entry: IEntry, path: string): Promise<void> {
|
||||
if (entry.isFile) {
|
||||
return new Promise<void>((resolve): void => {
|
||||
entry.file((file) => {
|
||||
this.addFile(
|
||||
parentPath,
|
||||
parentPath + "/" + file.name,
|
||||
file,
|
||||
);
|
||||
resolve();
|
||||
resolve(this.addFile(path + "/" + file.name, file));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
parentPath += "/" + entry.name;
|
||||
this.addDirectory(parentPath);
|
||||
|
||||
path += "/" + entry.name;
|
||||
await new Promise((resolve): void => {
|
||||
const promises: Array<Promise<void>> = [];
|
||||
const dirReader = entry.createReader();
|
||||
@ -323,7 +274,7 @@ export class Upload {
|
||||
resolve();
|
||||
});
|
||||
} else {
|
||||
promises.push(...entries.map((child) => this.traverseItem(child, parentPath)));
|
||||
promises.push(...entries.map((c) => this.traverseItem(c, path)));
|
||||
readEntries();
|
||||
}
|
||||
});
|
||||
@ -335,24 +286,60 @@ export class Upload {
|
||||
/**
|
||||
* Add a file to the queue.
|
||||
*/
|
||||
private addFile(parentPath: string, path: string, file: File): void {
|
||||
++this.total;
|
||||
this.addDirectory(parentPath);
|
||||
this.queueByDirectory.get(parentPath)!.filesToUpload.set(path, file);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a directory to the queue.
|
||||
*/
|
||||
private addDirectory(path: string): void {
|
||||
if (!this.queueByDirectory.has(path)) {
|
||||
this.queueByDirectory.set(path, {
|
||||
existingFiles: [],
|
||||
filesToUpload: new Map(),
|
||||
});
|
||||
}
|
||||
private addFile(path: string, file: File): void {
|
||||
this.total += file.size;
|
||||
this.fileQueue.set(path, file);
|
||||
}
|
||||
}
|
||||
|
||||
// Global instance.
|
||||
export const upload = new Upload(new NotificationService(), new ProgressService());
|
||||
class Reader implements VSBufferReadable {
|
||||
private _offset = 0;
|
||||
private readonly size = 32000; // ~32kb max while reading in the file.
|
||||
private readonly _onData = new Emitter<Uint8Array | null>();
|
||||
public readonly onData: Event<Uint8Array | null> = this._onData.event;
|
||||
|
||||
private _aborted = false;
|
||||
private readonly _onAbort = new Emitter<void>();
|
||||
public readonly onAbort: Event<void> = this._onAbort.event;
|
||||
|
||||
private readonly reader = new FileReader();
|
||||
|
||||
public constructor(private readonly file: File) {
|
||||
this.reader.addEventListener("load", this.onLoad);
|
||||
}
|
||||
|
||||
public get offset(): number { return this._offset; }
|
||||
public get aborted(): boolean { return this._aborted; }
|
||||
|
||||
public abort = (): void => {
|
||||
this._aborted = true;
|
||||
this.reader.abort();
|
||||
this.reader.removeEventListener("load", this.onLoad);
|
||||
this._onAbort.fire();
|
||||
}
|
||||
|
||||
public read = async (): Promise<VSBuffer | null> => {
|
||||
return new Promise<VSBuffer | null>((resolve) => {
|
||||
const disposables = [
|
||||
this.onAbort(() => {
|
||||
disposables.forEach((d) => d.dispose());
|
||||
resolve(null);
|
||||
}),
|
||||
this.onData((data) => {
|
||||
disposables.forEach((d) => d.dispose());
|
||||
resolve(data && VSBuffer.wrap(data));
|
||||
}),
|
||||
];
|
||||
if (this.aborted || this.offset >= this.file.size) {
|
||||
return this._onData.fire(null);
|
||||
}
|
||||
const slice = this.file.slice(this.offset, this.offset + this.size);
|
||||
this._offset += this.size;
|
||||
this.reader.readAsArrayBuffer(slice);
|
||||
});
|
||||
}
|
||||
|
||||
private onLoad = () => {
|
||||
this._onData.fire(new Uint8Array(this.reader.result as ArrayBuffer));
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user