Skip to content

Throttle stage lacks supervisor strategy support for costCalculation exceptions #3101

@He-Pin

Description

@He-Pin

Motivation

The Throttle stage's costCalculation function is called without any exception handling. If the user-provided cost function throws, the exception propagates directly and fails the stage unconditionally. Unlike other stages (e.g., MapAsync, Log, Filter), Throttle does not consult the SupervisionStrategy decider, preventing users from configuring Resume or Restart behavior for bad elements.

Current behavior

In Throttle.scala:71-84, costCalculation(elem) is called without try-catch:

override def onPush(): Unit = {
  val elem = grab(in)
  val cost = costCalculation(elem)  // No error handling
  val delayNanos = tokenBucket.offer(cost)
  ...
}

Proposed fix

  1. Add lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider in createLogic
  2. Mix in StageLogging trait for error logging
  3. Wrap costCalculation(elem) in try-catch, consult decider on exception:
    • StopfailStage(ex) (current behavior)
    • Resume/Restart → log error and pull(in) to skip the element
  4. Add tests verifying supervision directives work with Throttle

References

  • Inspired by Akka.NET #6406
  • Similar pattern exists in Pekko's MapAsync (Ops.scala:1301) and Log (Ops.scala:1542)

Metadata

Metadata

Assignees

No one assigned

    Labels

    akkadotnetIssue/PR inspired by Akka.netenhancementNew feature or requestt:streamPekko Streams

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions