Motivation
The Throttle stage's costCalculation function is called without any exception handling. If the user-provided cost function throws, the exception propagates directly and fails the stage unconditionally. Unlike other stages (e.g., MapAsync, Log, Filter), Throttle does not consult the SupervisionStrategy decider, preventing users from configuring Resume or Restart behavior for bad elements.
Current behavior
In Throttle.scala:71-84, costCalculation(elem) is called without try-catch:
override def onPush(): Unit = {
val elem = grab(in)
val cost = costCalculation(elem) // No error handling
val delayNanos = tokenBucket.offer(cost)
...
}
Proposed fix
- Add
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider in createLogic
- Mix in
StageLogging trait for error logging
- Wrap
costCalculation(elem) in try-catch, consult decider on exception:
Stop → failStage(ex) (current behavior)
Resume/Restart → log error and pull(in) to skip the element
- Add tests verifying supervision directives work with Throttle
References
- Inspired by Akka.NET #6406
- Similar pattern exists in Pekko's
MapAsync (Ops.scala:1301) and Log (Ops.scala:1542)
Motivation
The
Throttlestage'scostCalculationfunction is called without any exception handling. If the user-provided cost function throws, the exception propagates directly and fails the stage unconditionally. Unlike other stages (e.g.,MapAsync,Log,Filter), Throttle does not consult theSupervisionStrategydecider, preventing users from configuring Resume or Restart behavior for bad elements.Current behavior
In
Throttle.scala:71-84,costCalculation(elem)is called without try-catch:Proposed fix
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].deciderincreateLogicStageLoggingtrait for error loggingcostCalculation(elem)in try-catch, consult decider on exception:Stop→failStage(ex)(current behavior)Resume/Restart→ log error andpull(in)to skip the elementReferences
MapAsync(Ops.scala:1301) andLog(Ops.scala:1542)