Akka.Streams: add cancellation-aware Source.Queue offers#8248
Merged
Conversation
…WithComplete directly - Remove ICancellableSourceQueueWithComplete<T> interface entirely - Add OfferAsync(T, CancellationToken) directly to ISourceQueueWithComplete<T> - Remove SourceQueueWithCompleteExtensions helper class - Update API verified snapshots - Remove extension method tests, simplify LegacySourceQueue test stub
Cancellation is purely an offer concern with no dependency on the Complete/Fail surface, so OfferAsync(T, CancellationToken) belongs next to OfferAsync(T) on the base ISourceQueue<T> interface rather than on ISourceQueueWithComplete<T>. ISourceQueueWithComplete<T> inherits it. - Queue.cs: relocate the overload to ISourceQueue<T> - Source.cs: disambiguate OfferAsync crefs (the move made the bare cref ambiguous under warnings-as-errors) - QueueSourceSpec.cs: drop the unused LegacySourceQueue test double (custom queue implementations are not a supported scenario) - API approvals: move the approved member to the base interface - Restore UTF-8 BOMs stripped from Sources.cs/QueueSourceSpec.cs/Queue.cs
# Conflicts: # RELEASE_NOTES.md
Aaronontheweb
commented
Jun 12, 2026
Aaronontheweb
left a comment
Member
Author
There was a problem hiding this comment.
LGTM - nice work @orange-dot
| public interface ISourceQueue<in T> | ||
| { | ||
| System.Threading.Tasks.Task<Akka.Streams.IQueueOfferResult> OfferAsync(T element); | ||
| System.Threading.Tasks.Task<Akka.Streams.IQueueOfferResult> OfferAsync(T element, System.Threading.CancellationToken cancellationToken); |
Member
Author
There was a problem hiding this comment.
Moved this method to the ISourceQueue<T> interface, which might technically be a breaking change but Akka.Streams has never supported custom queue implementations, so this nets out to zero.
| if (cancellationToken.CanBeCanceled) | ||
| { | ||
| var registration = cancellationToken.Register(() => _invokeLogic(new CancelOffer(offer, cancellationToken))); | ||
| promise.Task.ContinueWith(_ => registration.Dispose(), TaskContinuationOptions.ExecuteSynchronously); |
Member
Author
There was a problem hiding this comment.
better to keep it this way so we can avoid any await silliness
| FailStage(failure.Ex); | ||
| } | ||
|
|
||
| if (input is CancelOffer cancelOffer) |
Contributor
@Aaronontheweb thanks. Was not sure about breaking changes. |
Member
Author
|
@orange-dot I appreciated you being cautious, but because we don't really allow custom |
This was referenced Jun 12, 2026
Aaronontheweb
added a commit
that referenced
this pull request
Jun 12, 2026
…ckport to v1.5 (#8258) Problem: ISourceQueue<T>.OfferAsync(T element) can block indefinitely under backpressure and does not accept a CancellationToken. Solution: Added OfferAsync(T element, CancellationToken cancellationToken) to the ISourceQueue<T> interface so backpressured pending offers can be canceled without later emitting the canceled element. Co-authored-by: Bojan Janjatović <bojan.janjatovic@mamut-studio.com>
This was referenced Jun 12, 2026
Open
Open
Open
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
ISourceQueue<T>.OfferAsync(T element)can block indefinitely under backpressure and does not accept aCancellationToken.When awaited inside actor handlers (for example, channel adapters built with
ReceiveAsync), the actor can become effectively wedged waiting for queue demand that may never return.Solution
Added
OfferAsync(T element, CancellationToken cancellationToken)to theISourceQueue<T>interface, alongside the existingOfferAsync(T)— cancellation is purely an offer concern. Cancelling the token while an offer is pending underOverflowStrategy.Backpressureremoves the pending offer from the stage and completes the returned task as canceled, without later emitting the canceled element.Changes
OfferAsync(T, CancellationToken)is declared directly onISourceQueue<T>— no extension method, no separate interface, no runtime type checks. It sits next toOfferAsync(T)on the base offer interface rather than onISourceQueueWithComplete<T>, because cancellation has no dependency on the completion surface (Complete/Fail).ISourceQueueWithComplete<T>inherits it.QueueSource.Materializedregisters on theCancellationTokenand sends an internalCancelOffercommand to the stage callback to remove the pending offer.CancelOfferregistration is created strictly after theOfferis submitted, so the stage always observes the offer before any cancel, regardless of which thread fires the token. The handler guards with!_terminating && ReferenceEquals(_pendingOffer, cancelOffer.Target)so an already-resolved or draining offer is a no-op.Task.FromCanceledwithout submitting the offer;OfferAsync(T)delegates withCancellationToken.None, whoseCanBeCanceled == falseskips all registration overhead, so the existing hot path is unchanged.OperationCanceledExceptioncarrying the supplied token (TrySetCanceled(token)).API Impact
ISourceQueue<T>gainsOfferAsync(T, CancellationToken)(inherited byISourceQueueWithComplete<T>).ISourceQueue<T>/ISourceQueueWithComplete<T>are only ever produced bySource.Queue<T>materialization and consumed by callers — there is no supported path for users to implement them — so there are no real-world implementers to break.Tests
9 new
QueueSourceSpectests covering: offer withCancellationToken.None; cancellation of a pending backpressured offer (buffered + unbuffered); accepting the next offer after a cancel; pre-canceled token (element never enqueued); cancellation-after-enqueue is a no-op; cancellation racingComplete()while draining; new offers dropped during drain; and the token firing before the stage has processed the offer. All 36QueueSourceSpectests pass; API approval snapshots updated.Related