@@ -566,36 +566,33 @@ extension NIOThrowingAsyncSequenceProducer {
566566 @inlinable
567567 internal func next( ) async throws -> Element ? {
568568 try await withTaskCancellationHandler { ( ) async throws -> Element ? in
569- let unsafe = self . _state. unsafe
570- unsafe. lock ( )
571-
572- let action = unsafe. withValueAssumingLockIsAcquired {
573- $0. stateMachine. next ( )
569+ let ( action, delegate) = self . _state. withLockedValue { state -> ( StateMachine . NextAction , Delegate ? ) in
570+ let action = state. stateMachine. next ( continuation: nil )
571+ switch action {
572+ case . returnElement, . returnCancellationError, . returnNil, . suspendTask_withoutContinuationOnly:
573+ return ( action, nil )
574+ case . returnElementAndCallProduceMore:
575+ return ( action, state. delegate)
576+ case . returnFailureAndCallDidTerminate:
577+ let delegate = state. delegate
578+ state. delegate = nil
579+ return ( action, delegate)
580+ case . produceMore_withContinuationOnly, . doNothing_withContinuationOnly:
581+ // Can't be returned when 'next(continuation:)' is called with no
582+ // continuation.
583+ fatalError ( )
584+ }
574585 }
575586
576587 switch action {
577588 case . returnElement( let element) :
578- unsafe. unlock ( )
579589 return element
580590
581591 case . returnElementAndCallProduceMore( let element) :
582- let delegate = unsafe. withValueAssumingLockIsAcquired {
583- $0. delegate
584- }
585- unsafe. unlock ( )
586-
587592 delegate? . produceMore ( )
588-
589593 return element
590594
591595 case . returnFailureAndCallDidTerminate( let failure) :
592- let delegate = unsafe. withValueAssumingLockIsAcquired {
593- let delegate = $0. delegate
594- $0. delegate = nil
595- return delegate
596- }
597- unsafe. unlock ( )
598-
599596 delegate? . didTerminate ( )
600597
601598 switch failure {
@@ -607,7 +604,6 @@ extension NIOThrowingAsyncSequenceProducer {
607604 }
608605
609606 case . returnCancellationError:
610- unsafe. unlock ( )
611607 // We have deprecated the generic Failure type in the public API and Failure should
612608 // now be `Swift.Error`. However, if users have not migrated to the new API they could
613609 // still use a custom generic Error type and this cast might fail.
@@ -622,42 +618,79 @@ extension NIOThrowingAsyncSequenceProducer {
622618 return nil
623619
624620 case . returnNil:
625- unsafe. unlock ( )
626621 return nil
627622
628- case . suspendTask:
629- // It is safe to hold the lock across this method
630- // since the closure is guaranteed to be run straight away
623+ case . produceMore_withContinuationOnly, . doNothing_withContinuationOnly:
624+ // Can't be returned when 'next(continuation:)' is called with no
625+ // continuation.
626+ fatalError ( )
627+
628+ case . suspendTask_withoutContinuationOnly:
629+ // Holding the lock here *should* be safe but because of a bug in the runtime
630+ // it isn't, so drop the lock, create the continuation and then try again.
631+ //
632+ // See https://github.com/swiftlang/swift/issues/85668
631633 return try await withCheckedThrowingContinuation {
632634 ( continuation: CheckedContinuation < Element ? , any Error > ) in
633- let ( action, callDidSuspend ) = unsafe . withValueAssumingLockIsAcquired {
634- let action = $0 . stateMachine. next ( for : continuation)
635- let callDidSuspend = $0 . didSuspend != nil
636- return ( action, callDidSuspend )
635+ let ( action, delegate , didSuspend ) = self . _state . withLockedValue { state in
636+ let action = state . stateMachine. next ( continuation : continuation)
637+ let delegate = state . delegate
638+ return ( action, delegate , state . didSuspend )
637639 }
638640
639641 switch action {
640- case . callProduceMore:
641- let delegate = unsafe. withValueAssumingLockIsAcquired {
642- $0. delegate
642+ case . returnElement( let element) :
643+ continuation. resume ( returning: element)
644+
645+ case . returnElementAndCallProduceMore( let element) :
646+ delegate? . produceMore ( )
647+ continuation. resume ( returning: element)
648+
649+ case . returnFailureAndCallDidTerminate( let failure) :
650+ delegate? . didTerminate ( )
651+ switch failure {
652+ case . some( let error) :
653+ continuation. resume ( throwing: error)
654+ case . none:
655+ continuation. resume ( returning: nil )
643656 }
644- unsafe. unlock ( )
645657
658+ case . returnCancellationError:
659+ // We have deprecated the generic Failure type in the public API and Failure should
660+ // now be `Swift.Error`. However, if users have not migrated to the new API they could
661+ // still use a custom generic Error type and this cast might fail.
662+ // In addition, we use `NIOThrowingAsyncSequenceProducer` in the implementation of the
663+ // non-throwing variant `NIOAsyncSequenceProducer` where `Failure` will be `Never` and
664+ // this cast will fail as well.
665+ // Everything is marked @inlinable and the Failure type is known at compile time,
666+ // therefore this cast should be optimised away in release build.
667+ if let error = CancellationError ( ) as? Failure {
668+ continuation. resume ( throwing: error)
669+ } else {
670+ continuation. resume ( returning: nil )
671+ }
672+
673+ case . returnNil:
674+ continuation. resume ( returning: nil )
675+
676+ case . produceMore_withContinuationOnly:
646677 delegate? . produceMore ( )
647678
648- case . none:
649- unsafe. unlock ( )
650- }
679+ case . doNothing_withContinuationOnly:
680+ ( )
651681
652- if callDidSuspend {
653- let didSuspend = self . _state. withLockedValue { $0. didSuspend }
654- didSuspend ? ( )
682+ case . suspendTask_withoutContinuationOnly:
683+ // Can't be returned when 'next(continuation:)' is called with a
684+ // continuation.
685+ fatalError ( )
655686 }
687+
688+ didSuspend ? ( )
656689 }
657690 }
658691 } onCancel: {
659692 // We must not resume the continuation while holding the lock
660- // because it can deadlock in combination with the underlying ulock
693+ // because it can deadlock in combination with the underlying unlock
661694 // in cases where we race with a cancellation handler
662695 let ( delegate, action) : ( Delegate ? , NIOThrowingAsyncSequenceProducer . StateMachine . CancelledAction ) =
663696 self . _state. withLockedValue {
@@ -1167,14 +1200,25 @@ extension NIOThrowingAsyncSequenceProducer {
11671200 case returnCancellationError
11681201 /// Indicates that the `nil` should be returned to the caller.
11691202 case returnNil
1170- /// Indicates that the `Task` of the caller should be suspended.
1171- case suspendTask
1203+
1204+ /// Indicates that the `Task` of the caller should be suspended. Only returned when
1205+ /// `next(continuation:)` is called with a non-nil continuation.
1206+ case suspendTask_withoutContinuationOnly
1207+
1208+ /// Indicates that caller should produce more values. Only returned when
1209+ /// `next(continuation:)` is called with `nil`.
1210+ case produceMore_withContinuationOnly
1211+ /// Indicates that caller shouldn't do anything. Only returned
1212+ /// when `next(continuation:)` is called with `nil`.
1213+ case doNothing_withContinuationOnly
11721214 }
11731215
11741216 @inlinable
1175- mutating func next( ) -> NextAction {
1217+ mutating func next( continuation : CheckedContinuation < Element ? , Error > ? ) -> NextAction {
11761218 switch self . _state {
11771219 case . initial( let backPressureStrategy, let iteratorInitialized) :
1220+ precondition ( continuation == nil )
1221+
11781222 // We are not interacting with the back-pressure strategy here because
11791223 // we are doing this inside `next(:)`
11801224 self . _state = . streaming(
@@ -1185,7 +1229,7 @@ extension NIOThrowingAsyncSequenceProducer {
11851229 iteratorInitialized: iteratorInitialized
11861230 )
11871231
1188- return . suspendTask
1232+ return . suspendTask_withoutContinuationOnly
11891233
11901234 case . streaming( _, _, . some, _, _) :
11911235 // We have multiple AsyncIterators iterating the sequence
@@ -1202,7 +1246,6 @@ extension NIOThrowingAsyncSequenceProducer {
12021246
12031247 if let element = buffer. popFirst ( ) {
12041248 // We have an element to fulfil the demand right away.
1205-
12061249 let shouldProduceMore = backPressureStrategy. didConsume ( bufferDepth: buffer. count)
12071250
12081251 self . _state = . streaming(
@@ -1220,10 +1263,27 @@ extension NIOThrowingAsyncSequenceProducer {
12201263 // We don't have any new demand, so we can just return the element.
12211264 return . returnElement( element)
12221265 }
1266+ } else if let continuation = continuation {
1267+ let shouldProduceMore = backPressureStrategy. didConsume ( bufferDepth: buffer. count)
1268+ self . _state = . streaming(
1269+ backPressureStrategy: backPressureStrategy,
1270+ buffer: buffer,
1271+ continuation: continuation,
1272+ hasOutstandingDemand: shouldProduceMore,
1273+ iteratorInitialized: iteratorInitialized
1274+ )
1275+
1276+ if shouldProduceMore && !hasOutstandingDemand {
1277+ return . produceMore_withContinuationOnly
1278+ } else {
1279+ return . doNothing_withContinuationOnly
1280+ }
12231281 } else {
1224- // There is nothing in the buffer to fulfil the demand so we need to suspend.
1225- // We are not interacting with the back-pressure strategy here because
1226- // we are doing this inside `next(:)`
1282+ // This function is first called without a continuation (i.e. this branch), if
1283+ // the buffer is empty the caller needs to re-call this function with a
1284+ // continuation (the branch above) so defer checking the backpressure strategy
1285+ // until then to minimise unnecessary delegate calls.
1286+
12271287 self . _state = . streaming(
12281288 backPressureStrategy: backPressureStrategy,
12291289 buffer: buffer,
@@ -1232,7 +1292,7 @@ extension NIOThrowingAsyncSequenceProducer {
12321292 iteratorInitialized: iteratorInitialized
12331293 )
12341294
1235- return . suspendTask
1295+ return . suspendTask_withoutContinuationOnly
12361296 }
12371297
12381298 case . sourceFinished( var buffer, let iteratorInitialized, let failure) :
@@ -1265,56 +1325,6 @@ extension NIOThrowingAsyncSequenceProducer {
12651325 preconditionFailure ( " Invalid state " )
12661326 }
12671327 }
1268-
1269- /// Actions returned by `next(for:)`.
1270- @usableFromInline
1271- enum NextForContinuationAction : Sendable {
1272- /// Indicates that ``NIOAsyncSequenceProducerDelegate/produceMore()`` should be called.
1273- case callProduceMore
1274- /// Indicates that nothing should be done.
1275- case none
1276- }
1277-
1278- @inlinable
1279- mutating func next( for continuation: CheckedContinuation < Element ? , Error > ) -> NextForContinuationAction {
1280- switch self . _state {
1281- case . initial:
1282- // We are transitioning away from the initial state in `next()`
1283- preconditionFailure ( " Invalid state " )
1284-
1285- case . streaming(
1286- var backPressureStrategy,
1287- let buffer,
1288- . none,
1289- let hasOutstandingDemand,
1290- let iteratorInitialized
1291- ) :
1292- precondition ( buffer. isEmpty, " Expected an empty buffer " )
1293-
1294- self . _state = . modifying
1295- let shouldProduceMore = backPressureStrategy. didConsume ( bufferDepth: buffer. count)
1296-
1297- self . _state = . streaming(
1298- backPressureStrategy: backPressureStrategy,
1299- buffer: buffer,
1300- continuation: continuation,
1301- hasOutstandingDemand: shouldProduceMore,
1302- iteratorInitialized: iteratorInitialized
1303- )
1304-
1305- if shouldProduceMore && !hasOutstandingDemand {
1306- return . callProduceMore
1307- } else {
1308- return . none
1309- }
1310-
1311- case . streaming( _, _, . some( _) , _, _) , . sourceFinished, . finished, . cancelled:
1312- preconditionFailure ( " This should have already been handled by `next()` " )
1313-
1314- case . modifying:
1315- preconditionFailure ( " Invalid state " )
1316- }
1317- }
13181328 }
13191329}
13201330
0 commit comments