Skip to content

Akka.Streams: add cancellation-aware Source.Queue offers#8199

Closed
orange-dot wants to merge 8 commits into
akkadotnet:devfrom
orange-dot:fix/8081-source-queue-offer-cancellation
Closed

Akka.Streams: add cancellation-aware Source.Queue offers#8199
orange-dot wants to merge 8 commits into
akkadotnet:devfrom
orange-dot:fix/8081-source-queue-offer-cancellation

Conversation

@orange-dot

Copy link
Copy Markdown
Contributor

Fixes #8081

Changes

Adds cancellation-aware offer support for Source.Queue while preserving compatibility for existing ISourceQueueWithComplete<T> implementers.

  • Adds ICancellableSourceQueueWithComplete<T> with OfferAsync(T element, CancellationToken cancellationToken).
  • Adds an ISourceQueueWithComplete<T> extension overload that delegates to cancellable queues and rejects unsupported cancellable calls explicitly.
  • Updates QueueSource so pending backpressured offers can be canceled and removed without later emitting the canceled element.
  • Adds queue source tests for pending cancellation, already-canceled tokens, CancellationToken.None, completion ordering, no-op cancellation after enqueue, and defensive extension behavior.

API / compatibility

This keeps ISourceQueueWithComplete<T> source-compatible for existing custom implementers. Cancellable support is exposed through the new opt-in ICancellableSourceQueueWithComplete<T> interface and the extension overload requested by #8081.

Validation

  • dotnet test src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj --filter FullyQualifiedName~QueueSourceSpec -c Release --framework net10.0 -v minimal
    • Passed: 37/37
  • dotnet test src/core/Akka.API.Tests/Akka.API.Tests.csproj -c Release --framework net10.0 -v minimal
    • Passed: 18/18

@orange-dot orange-dot changed the title Akka.Streams: add cancellation-aware Source.Queue offers WIP: Akka.Streams: add cancellation-aware Source.Queue offers May 6, 2026
@orange-dot orange-dot changed the title WIP: Akka.Streams: add cancellation-aware Source.Queue offers Akka.Streams: add cancellation-aware Source.Queue offers May 6, 2026
@orange-dot orange-dot marked this pull request as ready for review May 6, 2026 12:57
@orange-dot orange-dot marked this pull request as draft May 6, 2026 14:13
@orange-dot orange-dot marked this pull request as ready for review May 6, 2026 14:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Akka.Streams: Source.Queue OfferAsync needs CancellationToken/timeout support

2 participants