diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 1a79e61c4c..ab07f9ac76 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,4 +1,3 @@ -import { invariant } from '../jsutils/invariant.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import { pathToArray } from '../jsutils/Path.js'; @@ -7,7 +6,6 @@ import type { GraphQLError } from '../error/GraphQLError.js'; import type { AbortSignalListener } from './AbortSignalListener.js'; import { IncrementalGraph } from './IncrementalGraph.js'; import type { - CancellableStreamRecord, CompletedExecutionGroup, CompletedResult, DeferredFragmentRecord, @@ -21,23 +19,25 @@ import type { InitialIncrementalExecutionResult, PendingResult, StreamItemsResult, + StreamRecord, SubsequentIncrementalExecutionResult, } from './types.js'; -import { - isCancellableStreamRecord, - isCompletedExecutionGroup, - isFailedExecutionGroup, -} from './types.js'; +import { isCompletedExecutionGroup, isFailedExecutionGroup } from './types.js'; import { withCleanup } from './withCleanup.js'; +// eslint-disable-next-line max-params export function buildIncrementalResponse( - context: IncrementalPublisherContext, result: ObjMap, errors: ReadonlyArray, newDeferredFragmentRecords: ReadonlyArray | undefined, incrementalDataRecords: ReadonlyArray, + earlyReturns: Map Promise>, + abortSignalListener: AbortSignalListener | undefined, ): ExperimentalIncrementalExecutionResults { - const incrementalPublisher = new IncrementalPublisher(context); + const incrementalPublisher = new IncrementalPublisher( + earlyReturns, + abortSignalListener, + ); return incrementalPublisher.buildResponse( result, errors, @@ -46,11 +46,6 @@ export function buildIncrementalResponse( ); } -interface IncrementalPublisherContext { - abortSignalListener: AbortSignalListener | undefined; - cancellableStreams: Set | undefined; -} - interface SubsequentIncrementalExecutionResultContext { pending: Array; incremental: Array; @@ -64,12 +59,17 @@ interface SubsequentIncrementalExecutionResultContext { * @internal */ class IncrementalPublisher { - private _context: IncrementalPublisherContext; + private _earlyReturns: Map Promise>; + private _abortSignalListener: AbortSignalListener | undefined; private _nextId: number; private _incrementalGraph: IncrementalGraph; - constructor(context: IncrementalPublisherContext) { - this._context = context; + constructor( + earlyReturns: Map Promise>, + abortSignalListener: AbortSignalListener | undefined, + ) { + this._earlyReturns = earlyReturns; + this._abortSignalListener = abortSignalListener; this._nextId = 0; this._incrementalGraph = new IncrementalGraph(); } @@ -100,7 +100,7 @@ class IncrementalPublisher { return { initialResult, subsequentResults: withCleanup(subsequentResults, async () => { - this._context.abortSignalListener?.disconnect(); + this._abortSignalListener?.disconnect(); await this._returnAsyncIteratorsIgnoringErrors(); }), }; @@ -241,21 +241,18 @@ class IncrementalPublisher { errors: streamItemsResult.errors, }); this._incrementalGraph.removeStream(streamRecord); - if (isCancellableStreamRecord(streamRecord)) { - invariant(this._context.cancellableStreams !== undefined); - this._context.cancellableStreams.delete(streamRecord); - streamRecord.earlyReturn().catch(() => { + const earlyReturn = this._earlyReturns.get(streamRecord); + if (earlyReturn !== undefined) { + earlyReturn().catch(() => { /* c8 ignore next 1 */ // ignore error }); + this._earlyReturns.delete(streamRecord); } } else if (streamItemsResult.result === undefined) { context.completed.push({ id }); this._incrementalGraph.removeStream(streamRecord); - if (isCancellableStreamRecord(streamRecord)) { - invariant(this._context.cancellableStreams !== undefined); - this._context.cancellableStreams.delete(streamRecord); - } + this._earlyReturns.delete(streamRecord); } else { const incrementalEntry: IncrementalStreamResult = { id, @@ -310,14 +307,10 @@ class IncrementalPublisher { } private async _returnAsyncIterators(): Promise { - const cancellableStreams = this._context.cancellableStreams; - if (cancellableStreams === undefined) { - return; - } const promises: Array> = []; - for (const streamRecord of cancellableStreams) { - if (streamRecord.earlyReturn !== undefined) { - promises.push(streamRecord.earlyReturn()); + for (const earlyReturn of this._earlyReturns.values()) { + if (earlyReturn !== undefined) { + promises.push(earlyReturn()); } } await Promise.all(promises); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index ca0ef4c8c3..66a353de10 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -72,7 +72,6 @@ import { getVariableSignature } from './getVariableSignature.js'; import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import type { - CancellableStreamRecord, CompletedExecutionGroup, ExecutionResult, ExperimentalIncrementalExecutionResults, @@ -172,7 +171,7 @@ export interface ExecutionContext { errors: Array; abortSignalListener: AbortSignalListener | undefined; completed: boolean; - cancellableStreams: Set | undefined; + earlyReturns: Map Promise> | undefined; errorPropagation: boolean; } @@ -344,7 +343,7 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( ? new AbortSignalListener(abortSignal) : undefined, completed: false, - cancellableStreams: undefined, + earlyReturns: undefined, errorPropagation: errorPropagation(validatedExecutionArgs.operation), }; try { @@ -428,11 +427,12 @@ function buildDataResponse( } return buildIncrementalResponse( - exeContext, data, errors, newDeferredFragmentRecords, incrementalDataRecords, + (exeContext.earlyReturns ??= new Map()), + exeContext.abortSignalListener, ); } @@ -1351,10 +1351,6 @@ async function completeAsyncIteratorValue( fieldDetailsList, path, ); - const earlyReturn = - asyncIterator.return === undefined - ? undefined - : asyncIterator.return.bind(asyncIterator); try { while (true) { if (streamUsage && index >= streamUsage.initialCount) { @@ -1368,22 +1364,17 @@ async function completeAsyncIteratorValue( itemType, ); - let streamRecord: StreamRecord | CancellableStreamRecord; - if (earlyReturn === undefined) { - streamRecord = { - label: streamUsage.label, - path, - streamItemQueue, - }; - } else { - streamRecord = { - label: streamUsage.label, - path, - earlyReturn, - streamItemQueue, - }; - exeContext.cancellableStreams ??= new Set(); - exeContext.cancellableStreams.add(streamRecord); + const streamRecord: StreamRecord = { + label: streamUsage.label, + path, + streamItemQueue, + }; + if (asyncIterator.return !== undefined) { + exeContext.earlyReturns ??= new Map(); + exeContext.earlyReturns.set( + streamRecord, + asyncIterator.return.bind(asyncIterator), + ); } addIncrementalDataRecords(graphqlWrappedResult, [streamRecord]); @@ -1450,12 +1441,10 @@ async function completeAsyncIteratorValue( index++; } } catch (error) { - if (earlyReturn !== undefined) { - earlyReturn().catch(() => { - /* c8 ignore next 1 */ - // ignore error - }); - } + asyncIterator.return?.().catch(() => { + /* c8 ignore next 1 */ + // ignore error + }); throw error; } diff --git a/src/execution/types.ts b/src/execution/types.ts index 21f919e22b..638f1d3725 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -273,16 +273,6 @@ export interface StreamItemsResult { incrementalDataRecords?: ReadonlyArray | undefined; } -export interface CancellableStreamRecord extends StreamRecord { - earlyReturn: () => Promise; -} - -export function isCancellableStreamRecord( - deliveryGroup: DeliveryGroup, -): deliveryGroup is CancellableStreamRecord { - return 'earlyReturn' in deliveryGroup; -} - export type IncrementalDataRecord = PendingExecutionGroup | StreamRecord; export type IncrementalDataRecordResult =