Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 22 additions & 38 deletions src/execution/__tests__/mapAsyncIterable-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,10 @@ describe('mapAsyncIterable', () => {

it('allows returning early from mapped async generator', async () => {
async function* source() {
try {
yield 1;
/* c8 ignore next 3 */
yield 2;
yield 3; // Shouldn't be reached.
} finally {
// eslint-disable-next-line no-unsafe-finally
return 'The End';
}
yield 1;
/* c8 ignore next 3 */
yield 2;
yield 3; // Shouldn't be reached.
}

const doubles = mapAsyncIterable(source(), (x) => x + x);
Expand All @@ -108,8 +103,8 @@ describe('mapAsyncIterable', () => {
expect(await doubles.next()).to.deep.equal({ value: 4, done: false });

// Early return
expect(await doubles.return('')).to.deep.equal({
value: 'The End',
expect(await doubles.return()).to.deep.equal({
value: undefined,
done: true,
});

Expand Down Expand Up @@ -147,23 +142,18 @@ describe('mapAsyncIterable', () => {
expect(await doubles.next()).to.deep.equal({ value: 4, done: false });

// Early return
expect(await doubles.return(0)).to.deep.equal({
expect(await doubles.return()).to.deep.equal({
value: undefined,
done: true,
});
});

it('passes through early return from async values', async () => {
async function* source() {
try {
yield 'a';
/* c8 ignore next 3 */
yield 'b';
yield 'c'; // Shouldn't be reached.
} finally {
yield 'Done';
yield 'Last';
}
yield 'a';
/* c8 ignore next 3 */
yield 'b';
yield 'c'; // Shouldn't be reached.
}

const doubles = mapAsyncIterable(source(), (x) => x + x);
Expand All @@ -173,14 +163,14 @@ describe('mapAsyncIterable', () => {

// Early return
expect(await doubles.return()).to.deep.equal({
value: 'DoneDone',
done: false,
value: undefined,
done: true,
});

// Subsequent next calls may yield from finally block
// Subsequent next calls
expect(await doubles.next()).to.deep.equal({
value: 'LastLast',
done: false,
value: undefined,
done: true,
});
expect(await doubles.next()).to.deep.equal({
value: undefined,
Expand Down Expand Up @@ -260,14 +250,10 @@ describe('mapAsyncIterable', () => {

it('passes through caught errors through async generators', async () => {
async function* source() {
try {
yield 1;
/* c8 ignore next 2 */
yield 2;
yield 3; // Shouldn't be reached.
} catch (e) {
yield e;
}
yield 1;
/* c8 ignore next 2 */
yield 2;
yield 3; // Shouldn't be reached.
}

const doubles = mapAsyncIterable(source(), (x) => x + x);
Expand All @@ -276,11 +262,9 @@ describe('mapAsyncIterable', () => {
expect(await doubles.next()).to.deep.equal({ value: 4, done: false });

// Throw error
expect(await doubles.throw('Ouch')).to.deep.equal({
value: 'OuchOuch',
done: false,
});
await expectPromise(doubles.throw(new Error('Ouch'))).toRejectWith('Ouch');

// Subsequent next calls
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
Expand Down
84 changes: 22 additions & 62 deletions src/execution/mapAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,18 @@
import { isPromise } from '../jsutils/isPromise.js';
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';

import { withCleanup } from './withCleanup.js';

/**
* Given an AsyncIterable and a callback function, return an AsyncIterator
* which produces values mapped via calling the callback function.
*/
export function mapAsyncIterable<T, U, R = undefined>(
iterable: AsyncGenerator<T, R, void> | AsyncIterable<T>,
export function mapAsyncIterable<T, U>(
iterable: AsyncGenerator<T> | AsyncIterable<T>,
callback: (value: T) => PromiseOrValue<U>,
): AsyncGenerator<U, R, void> {
const iterator = iterable[Symbol.asyncIterator]();

async function mapResult(
promise: Promise<IteratorResult<T, R>>,
): Promise<IteratorResult<U, R>> {
const result = await promise;
if (result.done) {
return result;
}

const value = result.value;
try {
return { value: await callback(value), done: false };
} catch (error) {
await returnIgnoringErrors();
throw error;
}
}

async function returnIgnoringErrors(): Promise<void> {
): AsyncGenerator<U, void, void> {
return withCleanup(mapAsyncIterableImpl(iterable, callback), async () => {
const iterator = iterable[Symbol.asyncIterator]();
if (typeof iterator.return === 'function') {
try {
await iterator.return(); /* c8 ignore start */
Expand All @@ -36,44 +21,19 @@ export function mapAsyncIterable<T, U, R = undefined>(
/* ignore error */
} /* c8 ignore stop */
}
}

const asyncDispose: typeof Symbol.asyncDispose =
Symbol.asyncDispose /* c8 ignore start */ ??
Symbol.for('Symbol.asyncDispose'); /* c8 ignore stop */

return {
async next() {
return mapResult(iterator.next());
},
async return(): Promise<IteratorResult<U, R>> {
// If iterator.return() does not exist, then type R must be undefined.
return typeof iterator.return === 'function'
? mapResult(iterator.return())
: { value: undefined as any, done: true };
},
async throw(error?: unknown) {
if (typeof iterator.throw === 'function') {
return mapResult(iterator.throw(error));
}

if (typeof iterator.return === 'function') {
await returnIgnoringErrors();
}
});
}

throw error;
},
[Symbol.asyncIterator]() {
return this;
},
async [asyncDispose]() {
await this.return(undefined as R);
if (
typeof (iterable as AsyncGenerator<T, R, void>)[asyncDispose] ===
'function'
) {
await (iterable as AsyncGenerator<T, R, void>)[asyncDispose]();
}
},
};
async function* mapAsyncIterableImpl<T, U, R = undefined>(
iterable: AsyncGenerator<T, R, void> | AsyncIterable<T>,
mapFn: (value: T) => PromiseOrValue<U>,
): AsyncGenerator<U, void, void> {
for await (const value of iterable) {
const result = mapFn(value);
if (isPromise(result)) {
yield await result;
continue;
}
yield result;
}
}
Loading