Skip to content

[Data] Fix Union operator to avoid blocking when preserve order#59922

Merged
alexeykudinkin merged 14 commits into
ray-project:masterfrom
owenowenisme:data/make_union_not_block_when_preserve_order_true
Jan 21, 2026
Merged

[Data] Fix Union operator to avoid blocking when preserve order#59922
alexeykudinkin merged 14 commits into
ray-project:masterfrom
owenowenisme:data/make_union_not_block_when_preserve_order_true

Conversation

@owenowenisme

@owenowenisme owenowenisme commented Jan 7, 2026

Copy link
Copy Markdown
Member

Description

Make the Union operator not blocking when preserve_order is enabled if _add_input_inner is called with the input in the front.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme added alpha Alpha release features data Ray Data-related issues go add ONLY when ready to merge, run all tests and removed alpha Alpha release features labels Jan 7, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request fixes a blocking issue in the Union operator when preserve_order is enabled. The change introduces a streaming approach, flushing data from input operators as they complete, which is a good improvement. My review identifies a critical potential for deadlock because the flushing logic isn't triggered when an input stream finishes without sending a final block. I've also included a couple of suggestions to improve code readability and maintainability by reducing duplication and simplifying expressions. Addressing the critical issue is necessary to prevent hangs in the data processing pipeline.

Comment thread python/ray/data/_internal/execution/operators/union_operator.py Outdated
Comment thread python/ray/data/_internal/execution/operators/union_operator.py Outdated
Comment thread python/ray/data/_internal/execution/operators/union_operator.py Outdated
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme marked this pull request as ready for review January 7, 2026 16:05
@owenowenisme owenowenisme requested a review from a team as a code owner January 7, 2026 16:05

@alexeykudinkin alexeykudinkin left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owenowenisme the problem is that we still we're exhausting 1 input before we move on to the next one, which means we're gonna be accumulating remaining outputs before producing.

Instead, please take a look how we achieve determinism in make_async_gen and apply the same technique here:

  • Iterate over inputs in the same order
  • Always deque 1 block and never skip an input!
  • Once op completes you can start skipping it

Comment thread python/ray/data/_internal/execution/operators/union_operator.py Outdated
Comment thread python/ray/data/_internal/execution/operators/union_operator.py Outdated
@owenowenisme

Copy link
Copy Markdown
Member Author

@alexeykudinkin
Wait, are you saying that we round-robin the input ops of Union? I think Union should not behave like make_async_gen, because Union's documented behavior is concatenation (ds1.union(ds2) → all of ds1, then all of ds2), while round-robin would produce interleaved output. These are different orderings.

owenowenisme and others added 2 commits January 14, 2026 15:59
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Comment thread python/ray/data/_internal/execution/operators/union_operator.py
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Comment thread python/ray/data/dataset.py
Comment thread python/ray/data/_internal/execution/operators/union_operator.py
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Comment thread python/ray/data/_internal/execution/operators/union_operator.py Outdated
Comment thread python/ray/data/_internal/execution/operators/union_operator.py Outdated
Comment thread python/ray/data/_internal/execution/operators/union_operator.py
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Comment thread python/ray/data/_internal/execution/operators/union_operator.py
owenowenisme and others added 2 commits January 21, 2026 08:22
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme force-pushed the data/make_union_not_block_when_preserve_order_true branch from e4e4007 to da8d1b5 Compare January 21, 2026 02:30
@alexeykudinkin alexeykudinkin merged commit e722863 into ray-project:master Jan 21, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants