@@ -540,36 +540,66 @@ extension NIOAsyncWriter {
540540 let yieldID = yieldID ?? self . _yieldIDGenerator. generateUniqueYieldID ( )
541541
542542 return try await withTaskCancellationHandler {
543- // We are manually locking here to hold the lock across the withCheckedContinuation call
544- let unsafe = self . _state. unsafe
545- unsafe. lock ( )
546-
547- let action = unsafe. withValueAssumingLockIsAcquired {
543+ let action = self . _state. withLockedValue {
548544 $0. stateMachine. yield ( yieldID: yieldID)
549545 }
550546
551547 switch action {
552548 case . callDidYield( let delegate) :
553549 // We are allocating a new Deque for every write here
554- unsafe. unlock ( )
555550 delegate. didYield ( contentsOf: Deque ( sequence) )
556551 self . unbufferQueuedEvents ( )
557552 return . yielded
558553
559554 case . throwError( let error) :
560- unsafe. unlock ( )
561555 throw error
562556
563557 case . suspendTask:
558+ // Holding the lock here *should* be safe but because of a bug in the runtime
559+ // it isn't, so drop the lock, create the continuation and then try again.
560+ //
561+ // See https://github.com/swiftlang/swift/issues/85668
562+ //
563+ // Dropping and reacquiring the lock may result in yields being reordered but
564+ // only from the perspective of when this function was entered. For example:
565+ //
566+ // - T1 calls _yield
567+ // - T2 calls _yield
568+ // - T2 returns from _yield
569+ // - T1 returns from _yield
570+ //
571+ // This is fine: the async writer doesn't offer any ordering guarantees for
572+ // calls made from different threads.
573+ //
574+ // Within a thread there is no possibility of re-ordering as the call only
575+ // returns once the write has been yielded.
564576 return try await withCheckedThrowingContinuation {
565577 ( continuation: CheckedContinuation < StateMachine . YieldResult , Error > ) in
566- let didSuspend = unsafe. withValueAssumingLockIsAcquired {
567- $0. stateMachine. yield ( continuation: continuation, yieldID: yieldID)
568- return $0. didSuspend
578+ let ( action, didSuspend) = self . _state. withLockedValue {
579+ state -> ( NIOAsyncWriter . StateMachine . YieldAction , ( @Sendable ( ) -> Void ) ? ) in
580+ let yieldAction = state. stateMachine. yield ( yieldID: yieldID)
581+ switch yieldAction {
582+ case . callDidYield, . throwError:
583+ return ( yieldAction, nil )
584+ case . suspendTask:
585+ state. stateMachine. yield ( continuation: continuation, yieldID: yieldID)
586+ let didSuspend = state. didSuspend
587+ return ( yieldAction, didSuspend)
588+ }
569589 }
570590
571- unsafe. unlock ( )
572- didSuspend ? ( )
591+ switch action {
592+ case . callDidYield( let delegate) :
593+ delegate. didYield ( contentsOf: Deque ( sequence) )
594+ self . unbufferQueuedEvents ( )
595+ continuation. resume ( returning: . yielded)
596+
597+ case . throwError( let error) :
598+ continuation. resume ( throwing: error)
599+
600+ case . suspendTask:
601+ didSuspend ? ( )
602+ }
573603 }
574604 }
575605 } onCancel: {
@@ -611,35 +641,65 @@ extension NIOAsyncWriter {
611641 let yieldID = yieldID ?? self . _yieldIDGenerator. generateUniqueYieldID ( )
612642
613643 return try await withTaskCancellationHandler {
614- // We are manually locking here to hold the lock across the withCheckedContinuation call
615- let unsafe = self . _state. unsafe
616- unsafe. lock ( )
617-
618- let action = unsafe. withValueAssumingLockIsAcquired {
644+ let action = self . _state. withLockedValue {
619645 $0. stateMachine. yield ( yieldID: yieldID)
620646 }
621647
622648 switch action {
623649 case . callDidYield( let delegate) :
624- // We are allocating a new Deque for every write here
625- unsafe. unlock ( )
626650 delegate. didYield ( element)
627651 self . unbufferQueuedEvents ( )
628652 return . yielded
629653
630654 case . throwError( let error) :
631- unsafe. unlock ( )
632655 throw error
633656
634657 case . suspendTask:
658+ // Holding the lock here *should* be safe but because of a bug in the runtime
659+ // it isn't, so drop the lock, create the continuation and then try again.
660+ //
661+ // See https://github.com/swiftlang/swift/issues/85668
662+ //
663+ // Dropping and reacquiring the lock may result in yields being reordered but
664+ // only from the perspective of when this function was entered. For example:
665+ //
666+ // - T1 calls _yield
667+ // - T2 calls _yield
668+ // - T2 returns from _yield
669+ // - T1 returns from _yield
670+ //
671+ // This is fine: the async writer doesn't offer any ordering guarantees for
672+ // calls made from different threads.
673+ //
674+ // Within a thread there is no possibility of re-ordering as the call only
675+ // returns once the write has been yielded.
635676 return try await withCheckedThrowingContinuation {
636677 ( continuation: CheckedContinuation < StateMachine . YieldResult , Error > ) in
637- let didSuspend = unsafe. withValueAssumingLockIsAcquired {
638- $0. stateMachine. yield ( continuation: continuation, yieldID: yieldID)
639- return $0. didSuspend
678+ let ( action, didSuspend) = self . _state. withLockedValue {
679+ state -> ( NIOAsyncWriter . StateMachine . YieldAction , ( @Sendable ( ) -> Void ) ? ) in
680+ let yieldAction = state. stateMachine. yield ( yieldID: yieldID)
681+ switch yieldAction {
682+ case . callDidYield, . throwError:
683+ return ( yieldAction, nil )
684+ case . suspendTask:
685+ state. stateMachine. yield ( continuation: continuation, yieldID: yieldID)
686+ let didSuspend = state. didSuspend
687+ return ( yieldAction, didSuspend)
688+ }
689+ }
690+
691+ switch action {
692+ case . callDidYield( let delegate) :
693+ delegate. didYield ( element)
694+ self . unbufferQueuedEvents ( )
695+ continuation. resume ( returning: . yielded)
696+
697+ case . throwError( let error) :
698+ continuation. resume ( throwing: error)
699+
700+ case . suspendTask:
701+ didSuspend ? ( )
640702 }
641- unsafe. unlock ( )
642- didSuspend ? ( )
643703 }
644704 }
645705 } onCancel: {
0 commit comments