chore(vscode): update to 1.53.2
These conflicts will be resolved in the following commits. We do it this way so that PR review is possible.
This commit is contained in:
@ -3,6 +3,7 @@
|
||||
* Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { onUnexpectedError } from 'vs/base/common/errors';
|
||||
import { DisposableStore, toDisposable } from 'vs/base/common/lifecycle';
|
||||
|
||||
/**
|
||||
@ -229,7 +230,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
|
||||
|
||||
// flowing: directly send the data to listeners
|
||||
if (this.state.flowing) {
|
||||
this.listeners.data.forEach(listener => listener(data));
|
||||
this.emitData(data);
|
||||
}
|
||||
|
||||
// not yet flowing: buffer data until flowing
|
||||
@ -250,7 +251,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
|
||||
|
||||
// flowing: directly send the error to listeners
|
||||
if (this.state.flowing) {
|
||||
this.listeners.error.forEach(listener => listener(error));
|
||||
this.emitError(error);
|
||||
}
|
||||
|
||||
// not yet flowing: buffer errors until flowing
|
||||
@ -273,7 +274,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
|
||||
|
||||
// flowing: send end event to listeners
|
||||
if (this.state.flowing) {
|
||||
this.listeners.end.forEach(listener => listener());
|
||||
this.emitEnd();
|
||||
|
||||
this.destroy();
|
||||
}
|
||||
@ -284,6 +285,22 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
|
||||
}
|
||||
}
|
||||
|
||||
private emitData(data: T): void {
|
||||
this.listeners.data.slice(0).forEach(listener => listener(data)); // slice to avoid listener mutation from delivering event
|
||||
}
|
||||
|
||||
private emitError(error: Error): void {
|
||||
if (this.listeners.error.length === 0) {
|
||||
onUnexpectedError(error); // nobody listened to this error so we log it as unexpected
|
||||
} else {
|
||||
this.listeners.error.slice(0).forEach(listener => listener(error)); // slice to avoid listener mutation from delivering event
|
||||
}
|
||||
}
|
||||
|
||||
private emitEnd(): void {
|
||||
this.listeners.end.slice(0).forEach(listener => listener()); // slice to avoid listener mutation from delivering event
|
||||
}
|
||||
|
||||
on(event: 'data', callback: (data: T) => void): void;
|
||||
on(event: 'error', callback: (err: Error) => void): void;
|
||||
on(event: 'end', callback: () => void): void;
|
||||
@ -361,7 +378,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
|
||||
if (this.buffer.data.length > 0) {
|
||||
const fullDataBuffer = this.reducer(this.buffer.data);
|
||||
|
||||
this.listeners.data.forEach(listener => listener(fullDataBuffer));
|
||||
this.emitData(fullDataBuffer);
|
||||
|
||||
this.buffer.data.length = 0;
|
||||
|
||||
@ -375,7 +392,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
|
||||
private flowErrors(): void {
|
||||
if (this.listeners.error.length > 0) {
|
||||
for (const error of this.buffer.error) {
|
||||
this.listeners.error.forEach(listener => listener(error));
|
||||
this.emitError(error);
|
||||
}
|
||||
|
||||
this.buffer.error.length = 0;
|
||||
@ -384,7 +401,7 @@ class WriteableStreamImpl<T> implements WriteableStream<T> {
|
||||
|
||||
private flowEnd(): boolean {
|
||||
if (this.state.ended) {
|
||||
this.listeners.end.forEach(listener => listener());
|
||||
this.emitEnd();
|
||||
|
||||
return this.listeners.end.length > 0;
|
||||
}
|
||||
@ -478,9 +495,13 @@ export function consumeStream<T>(stream: ReadableStreamEvents<T>, reducer: IRedu
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks: T[] = [];
|
||||
|
||||
stream.on('data', data => chunks.push(data));
|
||||
stream.on('error', error => reject(error));
|
||||
stream.on('end', () => resolve(reducer(chunks)));
|
||||
|
||||
// Adding the `data` listener will turn the stream
|
||||
// into flowing mode. As such it is important to
|
||||
// add this listener last (DO NOT CHANGE!)
|
||||
stream.on('data', data => chunks.push(data));
|
||||
});
|
||||
}
|
||||
|
||||
@ -574,3 +595,40 @@ export function transform<Original, Transformed>(stream: ReadableStreamEvents<Or
|
||||
|
||||
return target;
|
||||
}
|
||||
|
||||
export interface IReadableStreamObservable {
|
||||
|
||||
/**
|
||||
* A promise to await the `end` or `error` event
|
||||
* of a stream.
|
||||
*/
|
||||
errorOrEnd: () => Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to observe a stream for certain events through
|
||||
* a promise based API.
|
||||
*/
|
||||
export function observe(stream: ReadableStream<unknown>): IReadableStreamObservable {
|
||||
|
||||
// A stream is closed when it ended or errord
|
||||
// We install this listener right from the
|
||||
// beginning to catch the events early.
|
||||
const errorOrEnd = Promise.race([
|
||||
new Promise<void>(resolve => stream.on('end', () => resolve())),
|
||||
new Promise<void>(resolve => stream.on('error', () => resolve()))
|
||||
]);
|
||||
|
||||
return {
|
||||
errorOrEnd(): Promise<void> {
|
||||
|
||||
// We need to ensure the stream is flowing so that our
|
||||
// listeners are getting triggered. It is possible that
|
||||
// the stream is not flowing because no `data` listener
|
||||
// was attached yet.
|
||||
stream.resume();
|
||||
|
||||
return errorOrEnd;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
Reference in New Issue
Block a user