[data] Fix HTTP streaming file download by using open_input_stream#58542
Conversation
There was a problem hiding this comment.
Code Review
This pull request addresses an issue with downloading streaming HTTP files by attempting both open_input_file and open_input_stream. The approach is correct and improves the robustness of the download operation. My review includes a critical fix to the exception handling logic to prevent silent failures on certain errors, and a minor improvement to a log message for better clarity. Overall, this is a good change that makes the download functionality more reliable.
b87d746 to
e883909
Compare
e883909 to
bd09c14
Compare
robertnishihara
left a comment
There was a problem hiding this comment.
Left some comments. Please also fix linting.
9389657 to
e34c0b8
Compare
open_input_stream
Signed-off-by: xyuzh <xinyzng@gmail.com>
e34c0b8 to
d023707
Compare
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Signed-off-by: xyuzh <xinyzng@gmail.com>
Prevents double-yield issue if file stream cleanup fails after successful read. Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: xyuzh <xinyzng@gmail.com>
Since these errors are gracefully handled (return None), debug level is more appropriate. Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Test that invalid URI schemes like foo://bar are caught and return None. Signed-off-by: xyuzh <xinyzng@gmail.com>
Use ray.data.from_items and single test case for better clarity. Signed-off-by: xyuzh <xinyzng@gmail.com>
Test invalid scheme, missing scheme, empty URI, and relative path. Signed-off-by: xyuzh <xinyzng@gmail.com>
Include random strings, absolute/relative paths, injection attempts, null bytes, and line breaks. Signed-off-by: xyuzh <xinyzng@gmail.com>
Remove redundant path variations and consolidate to key edge cases. Signed-off-by: xyuzh <xinyzng@gmail.com>
Merge OSError test cases (file not found) into malformed URI test for better organization. Signed-off-by: xyuzh <xinyzng@gmail.com>
| if len(uris) == 0: | ||
| continue | ||
|
|
||
| paths, fs = _resolve_paths_and_filesystem(uris) |
There was a problem hiding this comment.
@robertnishihara @bveeramani @richardliaw I found the _resolve_paths_and_filesystem raises exception for unresolved path and the exception is not handled here
To move forward I want to clarify:
- Are we supporting multiple columns of uri to be downloaded at the same time
The above matters to how we want to do exception handling when one uri fails._resolve_paths_and_filesystem considers the multiple uris in the same column to share the same file system and I don't think this is a good practice
As @robertnishihara also mentioned in comment
bveeramani
left a comment
There was a problem hiding this comment.
@xyuzh Could you run the read_from_uris_fixed_size release test and ensure changing to open_input_stream doesn't introduce a performance regression? https://github.com/kyuds/ray/blob/0eb4ee2087b85497a58099568802fb59e5a8af7b/release/release_data_tests.yaml#L77-L78
| malformed_uris = [ | ||
| f"local://{tmp_path}/nonexistent.txt", # File doesn't exist | ||
| f"local:///this/path/does/not/exist/file.txt", # Invalid path | ||
| "foo://bar", # Invalid scheme | ||
| "://no-scheme", # Missing scheme | ||
| "", # Empty URI | ||
| "foobar", # Random string | ||
| "http://host/path?query=<script>", # Injection attempts | ||
| "file:///\x00/null/byte", # Null byte | ||
| "http://host/path\n\r", # Line breaks | ||
| ] |
There was a problem hiding this comment.
I think it might be better to use pytest.mark.parametrize, so that if this test fails, it's clear which case caused the failure. The tradeoff is that it'll increase the runtime of this test more, but I think that tradeoff might be worth it.
There was a problem hiding this comment.
Unless you feel strongly, I would prefer to not add extra runtime to our tests (for productivity reasons). I don't expect this test to be flaky?
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
| def load_uri_bytes(uri_path_iterator): | ||
| """Function that takes an iterator of URI paths and yields downloaded bytes for each.""" | ||
| for uri_path in uri_path_iterator: | ||
| read_bytes = None |
There was a problem hiding this comment.
Bug: Malformed URIs Crash Download Operation
The call to _resolve_paths_and_filesystem(uris) isn't wrapped in exception handling, so malformed URIs (like "foo://bar" or "://no-scheme") will raise exceptions and crash the entire download operation instead of gracefully returning None for those URIs. The error handling in load_uri_bytes only catches exceptions during file reading, not during path resolution. This contradicts the test expectations in test_download_expression_with_malformed_uris which expects all malformed URIs to return None.
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
bveeramani
left a comment
There was a problem hiding this comment.
LGTM other than elaborating on the comment.
If open_input_stream introduces a performance regression, we can fix-forward
| "local:///this/path/does/not/exist/file.txt", # Invalid path | ||
| "", # Empty URI | ||
| "foobar", # Random string | ||
| # TODO(xyuzh): Add the tests below back once the issue is fixed. |
There was a problem hiding this comment.
Could you either link to an Issue/comment on GitHub, or describe the issue? I don't think it'd be clear to me what the issue is from reading this comment in isolation.
There was a problem hiding this comment.
The issue mentioned is #58542 (comment)
In this case the invalid uri would crash _resolve_paths_and_filesystem mentioned in the comment and raises exception which is not handled
There was a problem hiding this comment.
I just elaborated in the comment.
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
FindingsSample size: 100 images
|
That's good to hear. Thanks for checking! |
…ay-project#58542) ## What does this PR do? Fixes HTTP streaming file downloads in Ray Data's download operation. Some URIs (especially HTTP streams) require `open_input_stream` instead of `open_input_file`. ## Changes - Modified `download_bytes_threaded` in `plan_download_op.py` to try both `open_input_file` and `open_input_stream` for each URI - Improved error handling to distinguish between different error types - Failed downloads now return `None` gracefully instead of crashing ## Testing ``` import pyarrow as pa from ray.data.context import DataContext from ray.data._internal.planner.plan_download_op import download_bytes_threaded # Test URLs: one valid, one 404 urls = [ "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&", ] # Create PyArrow table and call download function table = pa.table({"url": urls}) ctx = DataContext.get_current() results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) # Check results result_table = results[0] for i in range(result_table.num_rows): url = result_table['url'][i].as_py() bytes_data = result_table['bytes'][i].as_py() if bytes_data is None: print(f"Row {i}: FAILED (None) - try-catch worked ✓") else: print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)") print(f" URL: {url[:60]}...") print("\n✅ Test passed: Failed downloads return None instead of crashing.") ``` Before the fix: ``` TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ray/default/test_streaming_fallback.py", line 110, in <module> test_download_expression_with_streaming_fallback() File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__ if not self.__exit__(*sys.exc_info()): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__ setattr(self.target, self.attribute, self.temp_original) TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' (base) ray@ip-10-0-39-21:~/default$ python test.py 2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker! Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file Traceback (most recent call last): File "/home/ray/default/test.py", line 16, in <module> results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded uri_bytes = list( ^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen raise item File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file ``` After the fix: ``` Row 0: SUCCESS (189370 bytes) URL: https://static-assets.tesla.com/configurator/compositor?cont... ``` Tested with HTTP streaming URLs (e.g., Tesla configurator images) that previously failed: - ✅ Successfully downloads HTTP stream files - ✅ Gracefully handles failed downloads (returns None) - ✅ Maintains backward compatibility with existing file downloads --------- Signed-off-by: xyuzh <xinyzng@gmail.com> Signed-off-by: Robert Nishihara <robertnishihara@gmail.com> Co-authored-by: Robert Nishihara <robertnishihara@gmail.com>
…ay-project#58542) ## What does this PR do? Fixes HTTP streaming file downloads in Ray Data's download operation. Some URIs (especially HTTP streams) require `open_input_stream` instead of `open_input_file`. ## Changes - Modified `download_bytes_threaded` in `plan_download_op.py` to try both `open_input_file` and `open_input_stream` for each URI - Improved error handling to distinguish between different error types - Failed downloads now return `None` gracefully instead of crashing ## Testing ``` import pyarrow as pa from ray.data.context import DataContext from ray.data._internal.planner.plan_download_op import download_bytes_threaded # Test URLs: one valid, one 404 urls = [ "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&", ] # Create PyArrow table and call download function table = pa.table({"url": urls}) ctx = DataContext.get_current() results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) # Check results result_table = results[0] for i in range(result_table.num_rows): url = result_table['url'][i].as_py() bytes_data = result_table['bytes'][i].as_py() if bytes_data is None: print(f"Row {i}: FAILED (None) - try-catch worked ✓") else: print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)") print(f" URL: {url[:60]}...") print("\n✅ Test passed: Failed downloads return None instead of crashing.") ``` Before the fix: ``` TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ray/default/test_streaming_fallback.py", line 110, in <module> test_download_expression_with_streaming_fallback() File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__ if not self.__exit__(*sys.exc_info()): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__ setattr(self.target, self.attribute, self.temp_original) TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' (base) ray@ip-10-0-39-21:~/default$ python test.py 2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker! Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file Traceback (most recent call last): File "/home/ray/default/test.py", line 16, in <module> results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded uri_bytes = list( ^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen raise item File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file ``` After the fix: ``` Row 0: SUCCESS (189370 bytes) URL: https://static-assets.tesla.com/configurator/compositor?cont... ``` Tested with HTTP streaming URLs (e.g., Tesla configurator images) that previously failed: - ✅ Successfully downloads HTTP stream files - ✅ Gracefully handles failed downloads (returns None) - ✅ Maintains backward compatibility with existing file downloads --------- Signed-off-by: xyuzh <xinyzng@gmail.com> Signed-off-by: Robert Nishihara <robertnishihara@gmail.com> Co-authored-by: Robert Nishihara <robertnishihara@gmail.com>
…ay-project#58542) ## What does this PR do? Fixes HTTP streaming file downloads in Ray Data's download operation. Some URIs (especially HTTP streams) require `open_input_stream` instead of `open_input_file`. ## Changes - Modified `download_bytes_threaded` in `plan_download_op.py` to try both `open_input_file` and `open_input_stream` for each URI - Improved error handling to distinguish between different error types - Failed downloads now return `None` gracefully instead of crashing ## Testing ``` import pyarrow as pa from ray.data.context import DataContext from ray.data._internal.planner.plan_download_op import download_bytes_threaded # Test URLs: one valid, one 404 urls = [ "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&", ] # Create PyArrow table and call download function table = pa.table({"url": urls}) ctx = DataContext.get_current() results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) # Check results result_table = results[0] for i in range(result_table.num_rows): url = result_table['url'][i].as_py() bytes_data = result_table['bytes'][i].as_py() if bytes_data is None: print(f"Row {i}: FAILED (None) - try-catch worked ✓") else: print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)") print(f" URL: {url[:60]}...") print("\n✅ Test passed: Failed downloads return None instead of crashing.") ``` Before the fix: ``` TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ray/default/test_streaming_fallback.py", line 110, in <module> test_download_expression_with_streaming_fallback() File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__ if not self.__exit__(*sys.exc_info()): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__ setattr(self.target, self.attribute, self.temp_original) TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' (base) ray@ip-10-0-39-21:~/default$ python test.py 2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker! Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file Traceback (most recent call last): File "/home/ray/default/test.py", line 16, in <module> results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded uri_bytes = list( ^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen raise item File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file ``` After the fix: ``` Row 0: SUCCESS (189370 bytes) URL: https://static-assets.tesla.com/configurator/compositor?cont... ``` Tested with HTTP streaming URLs (e.g., Tesla configurator images) that previously failed: - ✅ Successfully downloads HTTP stream files - ✅ Gracefully handles failed downloads (returns None) - ✅ Maintains backward compatibility with existing file downloads --------- Signed-off-by: xyuzh <xinyzng@gmail.com> Signed-off-by: Robert Nishihara <robertnishihara@gmail.com> Co-authored-by: Robert Nishihara <robertnishihara@gmail.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
What does this PR do?
Fixes HTTP streaming file downloads in Ray Data's download operation. Some URIs (especially HTTP streams) require
open_input_streaminstead ofopen_input_file.Changes
download_bytes_threadedinplan_download_op.pyto try bothopen_input_fileandopen_input_streamfor each URINonegracefully instead of crashingTesting
Before the fix:
After the fix:
Tested with HTTP streaming URLs (e.g., Tesla configurator images) that previously failed: