Skip to content

Akka.Streams: add cancellation-aware Source.Queue offers#8248

Merged
Aaronontheweb merged 15 commits into
akkadotnet:devfrom
Aaronontheweb:pr8199-refactored
Jun 12, 2026
Merged

Akka.Streams: add cancellation-aware Source.Queue offers#8248
Aaronontheweb merged 15 commits into
akkadotnet:devfrom
Aaronontheweb:pr8199-refactored

Conversation

@Aaronontheweb

@Aaronontheweb Aaronontheweb commented Jun 11, 2026

Copy link
Copy Markdown
Member

Problem

ISourceQueue<T>.OfferAsync(T element) can block indefinitely under backpressure and does not accept a CancellationToken.

When awaited inside actor handlers (for example, channel adapters built with ReceiveAsync), the actor can become effectively wedged waiting for queue demand that may never return.

Solution

Added OfferAsync(T element, CancellationToken cancellationToken) to the ISourceQueue<T> interface, alongside the existing OfferAsync(T) — cancellation is purely an offer concern. Cancelling the token while an offer is pending under OverflowStrategy.Backpressure removes the pending offer from the stage and completes the returned task as canceled, without later emitting the canceled element.

Changes

  • Interface: OfferAsync(T, CancellationToken) is declared directly on ISourceQueue<T> — no extension method, no separate interface, no runtime type checks. It sits next to OfferAsync(T) on the base offer interface rather than on ISourceQueueWithComplete<T>, because cancellation has no dependency on the completion surface (Complete/Fail). ISourceQueueWithComplete<T> inherits it.
  • Implementation: QueueSource.Materialized registers on the CancellationToken and sends an internal CancelOffer command to the stage callback to remove the pending offer.
  • Ordering safety: the CancelOffer registration is created strictly after the Offer is submitted, so the stage always observes the offer before any cancel, regardless of which thread fires the token. The handler guards with !_terminating && ReferenceEquals(_pendingOffer, cancelOffer.Target) so an already-resolved or draining offer is a no-op.
  • Fast paths: a pre-canceled token short-circuits to Task.FromCanceled without submitting the offer; OfferAsync(T) delegates with CancellationToken.None, whose CanBeCanceled == false skips all registration overhead, so the existing hot path is unchanged.
  • Awaiter contract: cancellation surfaces as an OperationCanceledException carrying the supplied token (TrySetCanceled(token)).

API Impact

  • ISourceQueue<T> gains OfferAsync(T, CancellationToken) (inherited by ISourceQueueWithComplete<T>).
  • This adds a member to a public interface, but ISourceQueue<T> / ISourceQueueWithComplete<T> are only ever produced by Source.Queue<T> materialization and consumed by callers — there is no supported path for users to implement them — so there are no real-world implementers to break.

Tests

9 new QueueSourceSpec tests covering: offer with CancellationToken.None; cancellation of a pending backpressured offer (buffered + unbuffered); accepting the next offer after a cancel; pre-canceled token (element never enqueued); cancellation-after-enqueue is a no-op; cancellation racing Complete() while draining; new offers dropped during drain; and the token firing before the stage has processed the offer. All 36 QueueSourceSpec tests pass; API approval snapshots updated.

Related

orange-dot and others added 13 commits May 6, 2026 10:59
…WithComplete directly

- Remove ICancellableSourceQueueWithComplete<T> interface entirely
- Add OfferAsync(T, CancellationToken) directly to ISourceQueueWithComplete<T>
- Remove SourceQueueWithCompleteExtensions helper class
- Update API verified snapshots
- Remove extension method tests, simplify LegacySourceQueue test stub
Cancellation is purely an offer concern with no dependency on the
Complete/Fail surface, so OfferAsync(T, CancellationToken) belongs next
to OfferAsync(T) on the base ISourceQueue<T> interface rather than on
ISourceQueueWithComplete<T>. ISourceQueueWithComplete<T> inherits it.

- Queue.cs: relocate the overload to ISourceQueue<T>
- Source.cs: disambiguate OfferAsync crefs (the move made the bare
  cref ambiguous under warnings-as-errors)
- QueueSourceSpec.cs: drop the unused LegacySourceQueue test double
  (custom queue implementations are not a supported scenario)
- API approvals: move the approved member to the base interface
- Restore UTF-8 BOMs stripped from Sources.cs/QueueSourceSpec.cs/Queue.cs
@Aaronontheweb Aaronontheweb marked this pull request as ready for review June 12, 2026 17:20

@Aaronontheweb Aaronontheweb left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - nice work @orange-dot

public interface ISourceQueue<in T>
{
System.Threading.Tasks.Task<Akka.Streams.IQueueOfferResult> OfferAsync(T element);
System.Threading.Tasks.Task<Akka.Streams.IQueueOfferResult> OfferAsync(T element, System.Threading.CancellationToken cancellationToken);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this method to the ISourceQueue<T> interface, which might technically be a breaking change but Akka.Streams has never supported custom queue implementations, so this nets out to zero.

if (cancellationToken.CanBeCanceled)
{
var registration = cancellationToken.Register(() => _invokeLogic(new CancelOffer(offer, cancellationToken)));
promise.Task.ContinueWith(_ => registration.Dispose(), TaskContinuationOptions.ExecuteSynchronously);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to keep it this way so we can avoid any await silliness

FailStage(failure.Ex);
}

if (input is CancelOffer cancelOffer)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) June 12, 2026 17:47
@Aaronontheweb Aaronontheweb disabled auto-merge June 12, 2026 18:45
@Aaronontheweb Aaronontheweb merged commit 8e2fbd4 into akkadotnet:dev Jun 12, 2026
9 of 11 checks passed
@orange-dot

Copy link
Copy Markdown
Contributor

LGTM - nice work @orange-dot

@Aaronontheweb thanks. Was not sure about breaking changes.

@Aaronontheweb Aaronontheweb deleted the pr8199-refactored branch June 12, 2026 18:45
@Aaronontheweb

Copy link
Copy Markdown
Member Author

@orange-dot I appreciated you being cautious, but because we don't really allow custom ISourceQueue definitions we were able to get away with it in this case.

Aaronontheweb added a commit that referenced this pull request Jun 12, 2026
…ckport to v1.5 (#8258)

Problem: ISourceQueue<T>.OfferAsync(T element) can block indefinitely
under backpressure and does not accept a CancellationToken.

Solution: Added OfferAsync(T element, CancellationToken cancellationToken)
to the ISourceQueue<T> interface so backpressured pending offers can be
canceled without later emitting the canceled element.

Co-authored-by: Bojan Janjatović <bojan.janjatovic@mamut-studio.com>
This was referenced Jun 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Akka.Streams: Source.Queue OfferAsync needs CancellationToken/timeout support

2 participants