Skip to content

[data] Fix HTTP streaming file download by using open_input_stream#58542

Merged
bveeramani merged 18 commits into
ray-project:masterfrom
xyuzh:fix-http-streaming-file-download
Nov 13, 2025
Merged

[data] Fix HTTP streaming file download by using open_input_stream#58542
bveeramani merged 18 commits into
ray-project:masterfrom
xyuzh:fix-http-streaming-file-download

Conversation

@xyuzh

@xyuzh xyuzh commented Nov 11, 2025

Copy link
Copy Markdown
Member

What does this PR do?

Fixes HTTP streaming file downloads in Ray Data's download operation. Some URIs (especially HTTP streams) require open_input_stream instead of open_input_file.

Changes

  • Modified download_bytes_threaded in plan_download_op.py to try both open_input_file and open_input_stream for each URI
  • Improved error handling to distinguish between different error types
  • Failed downloads now return None gracefully instead of crashing

Testing

import pyarrow as pa
from ray.data.context import DataContext
from ray.data._internal.planner.plan_download_op import download_bytes_threaded

# Test URLs: one valid, one 404
urls = [    
    "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&",
]

# Create PyArrow table and call download function
table = pa.table({"url": urls})
ctx = DataContext.get_current()
results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))

# Check results
result_table = results[0]
for i in range(result_table.num_rows):
    url = result_table['url'][i].as_py()
    bytes_data = result_table['bytes'][i].as_py()
    
    if bytes_data is None:
        print(f"Row {i}: FAILED (None) - try-catch worked ✓")
    else:
        print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)")
    print(f"  URL: {url[:60]}...")

print("\n✅ Test passed: Failed downloads return None instead of crashing.")

Before the fix:

TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/default/test_streaming_fallback.py", line 110, in <module>
    test_download_expression_with_streaming_fallback()
  File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback
    with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file):
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__
    if not self.__exit__(*sys.exc_info()):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__
    setattr(self.target, self.attribute, self.temp_original)
TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'
(base) ray@ip-10-0-39-21:~/default$ python test.py
2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker!
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file
Traceback (most recent call last):
  File "/home/ray/default/test.py", line 16, in <module>
    results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded
    uri_bytes = list(
                ^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen
    raise item
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file

After the fix:

Row 0: SUCCESS (189370 bytes)
  URL: https://static-assets.tesla.com/configurator/compositor?cont...

Tested with HTTP streaming URLs (e.g., Tesla configurator images) that previously failed:

  • ✅ Successfully downloads HTTP stream files
  • ✅ Gracefully handles failed downloads (returns None)
  • ✅ Maintains backward compatibility with existing file downloads

@xyuzh xyuzh requested a review from a team as a code owner November 11, 2025 18:30

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses an issue with downloading streaming HTTP files by attempting both open_input_file and open_input_stream. The approach is correct and improves the robustness of the download operation. My review includes a critical fix to the exception handling logic to prevent silent failures on certain errors, and a minor improvement to a log message for better clarity. Overall, this is a good change that makes the download functionality more reliable.

Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/plan_download_op.py
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
@xyuzh xyuzh force-pushed the fix-http-streaming-file-download branch 3 times, most recently from b87d746 to e883909 Compare November 11, 2025 18:40
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Nov 11, 2025
@xyuzh xyuzh force-pushed the fix-http-streaming-file-download branch from e883909 to bd09c14 Compare November 11, 2025 19:40
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated

@robertnishihara robertnishihara left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. Please also fix linting.

@xyuzh xyuzh force-pushed the fix-http-streaming-file-download branch from 9389657 to e34c0b8 Compare November 12, 2025 03:53
@xyuzh xyuzh changed the title [data] Fix HTTP streaming file download by trying both open_input_file and open_input_stream [data] Fix HTTP streaming file download by using open_input_stream Nov 12, 2025
@xyuzh xyuzh force-pushed the fix-http-streaming-file-download branch from e34c0b8 to d023707 Compare November 12, 2025 03:55
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
xyuzh and others added 8 commits November 11, 2025 20:18
Signed-off-by: xyuzh <xinyzng@gmail.com>
Prevents double-yield issue if file stream cleanup fails after successful read.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: xyuzh <xinyzng@gmail.com>
Since these errors are gracefully handled (return None), debug level is more appropriate.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Test that invalid URI schemes like foo://bar are caught and return None.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Use ray.data.from_items and single test case for better clarity.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Test invalid scheme, missing scheme, empty URI, and relative path.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Include random strings, absolute/relative paths, injection attempts, null bytes, and line breaks.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Remove redundant path variations and consolidate to key edge cases.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Merge OSError test cases (file not found) into malformed URI test for better organization.

Signed-off-by: xyuzh <xinyzng@gmail.com>
if len(uris) == 0:
continue

paths, fs = _resolve_paths_and_filesystem(uris)

@xyuzh xyuzh Nov 12, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertnishihara @bveeramani @richardliaw I found the _resolve_paths_and_filesystem raises exception for unresolved path and the exception is not handled here

To move forward I want to clarify:

  • Are we supporting multiple columns of uri to be downloaded at the same time

The above matters to how we want to do exception handling when one uri fails.
_resolve_paths_and_filesystem considers the multiple uris in the same column to share the same file system and I don't think this is a good practice
As @robertnishihara also mentioned in comment

@bveeramani bveeramani left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xyuzh Could you run the read_from_uris_fixed_size release test and ensure changing to open_input_stream doesn't introduce a performance regression? https://github.com/kyuds/ray/blob/0eb4ee2087b85497a58099568802fb59e5a8af7b/release/release_data_tests.yaml#L77-L78

Comment on lines +303 to +313
malformed_uris = [
f"local://{tmp_path}/nonexistent.txt", # File doesn't exist
f"local:///this/path/does/not/exist/file.txt", # Invalid path
"foo://bar", # Invalid scheme
"://no-scheme", # Missing scheme
"", # Empty URI
"foobar", # Random string
"http://host/path?query=<script>", # Injection attempts
"file:///\x00/null/byte", # Null byte
"http://host/path\n\r", # Line breaks
]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to use pytest.mark.parametrize, so that if this test fails, it's clear which case caused the failure. The tradeoff is that it'll increase the runtime of this test more, but I think that tradeoff might be worth it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you feel strongly, I would prefer to not add extra runtime to our tests (for productivity reasons). I don't expect this test to be flaky?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't feel strongly

Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
def load_uri_bytes(uri_path_iterator):
"""Function that takes an iterator of URI paths and yields downloaded bytes for each."""
for uri_path in uri_path_iterator:
read_bytes = None

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Malformed URIs Crash Download Operation

The call to _resolve_paths_and_filesystem(uris) isn't wrapped in exception handling, so malformed URIs (like "foo://bar" or "://no-scheme") will raise exceptions and crash the entire download operation instead of gracefully returning None for those URIs. The error handling in load_uri_bytes only catches exceptions during file reading, not during path resolution. This contradicts the test expectations in test_download_expression_with_malformed_uris which expects all malformed URIs to return None.

Fix in Cursor Fix in Web

Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
@robertnishihara robertnishihara added the go add ONLY when ready to merge, run all tests label Nov 13, 2025
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>

@bveeramani bveeramani left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than elaborating on the comment.

If open_input_stream introduces a performance regression, we can fix-forward

"local:///this/path/does/not/exist/file.txt", # Invalid path
"", # Empty URI
"foobar", # Random string
# TODO(xyuzh): Add the tests below back once the issue is fixed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you either link to an Issue/comment on GitHub, or describe the issue? I don't think it'd be clear to me what the issue is from reading this comment in isolation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue mentioned is #58542 (comment)
In this case the invalid uri would crash _resolve_paths_and_filesystem mentioned in the comment and raises exception which is not handled

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just elaborated in the comment.

Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
@bveeramani bveeramani enabled auto-merge (squash) November 13, 2025 06:32
@bveeramani bveeramani merged commit dde70e7 into ray-project:master Nov 13, 2025
7 checks passed
@xyuzh

xyuzh commented Nov 13, 2025

Copy link
Copy Markdown
Member Author

Findings

Sample size: 100 images


open_input_file

  • Succeeded: 63 / 100 images
  • Speed: 10.83 images/s

open_input_stream

  • Succeeded: 70 / 100 images
  • Speed: 1.99 images/s

Fallback strategy

Try open_input_file first, and on ValueError, fall back to open_input_stream:

  • Succeeded: 70 / 100 images
  • Speed: 2.1 images/s

Conclusion

  • Downloading non-seekable HTTP-streamed images is significantly slower than downloading seekable images.
  • open_input_stream itself does NOT slow down seekable images.
  • The slowdown comes entirely from the cases where open_input_file fails, forcing downloads of non-seekable, streaming-only HTTP content, which costs extra time.

@robertnishihara

Copy link
Copy Markdown
Collaborator
  • open_input_stream itself does NOT slow down seekable images.

That's good to hear. Thanks for checking!

landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…ay-project#58542)

## What does this PR do?
   
Fixes HTTP streaming file downloads in Ray Data's download operation.
Some URIs (especially HTTP streams) require `open_input_stream` instead
of `open_input_file`.
   
   ## Changes
   
- Modified `download_bytes_threaded` in `plan_download_op.py` to try
both `open_input_file` and `open_input_stream` for each URI
- Improved error handling to distinguish between different error types
   - Failed downloads now return `None` gracefully instead of crashing
   
   ## Testing
```
import pyarrow as pa
from ray.data.context import DataContext
from ray.data._internal.planner.plan_download_op import download_bytes_threaded

# Test URLs: one valid, one 404
urls = [    
    "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&",
]

# Create PyArrow table and call download function
table = pa.table({"url": urls})
ctx = DataContext.get_current()
results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))

# Check results
result_table = results[0]
for i in range(result_table.num_rows):
    url = result_table['url'][i].as_py()
    bytes_data = result_table['bytes'][i].as_py()
    
    if bytes_data is None:
        print(f"Row {i}: FAILED (None) - try-catch worked ✓")
    else:
        print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)")
    print(f"  URL: {url[:60]}...")

print("\n✅ Test passed: Failed downloads return None instead of crashing.")
```

Before the fix:
```
TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/default/test_streaming_fallback.py", line 110, in <module>
    test_download_expression_with_streaming_fallback()
  File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback
    with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file):
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__
    if not self.__exit__(*sys.exc_info()):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__
    setattr(self.target, self.attribute, self.temp_original)
TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'
(base) ray@ip-10-0-39-21:~/default$ python test.py
2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker!
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file
Traceback (most recent call last):
  File "/home/ray/default/test.py", line 16, in <module>
    results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded
    uri_bytes = list(
                ^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen
    raise item
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file
```
After the fix:
```
Row 0: SUCCESS (189370 bytes)
  URL: https://static-assets.tesla.com/configurator/compositor?cont...
```
   
Tested with HTTP streaming URLs (e.g., Tesla configurator images) that
previously failed:
   - ✅ Successfully downloads HTTP stream files
   - ✅ Gracefully handles failed downloads (returns None)
   - ✅ Maintains backward compatibility with existing file downloads

---------

Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Co-authored-by: Robert Nishihara <robertnishihara@gmail.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…ay-project#58542)

## What does this PR do?
   
Fixes HTTP streaming file downloads in Ray Data's download operation.
Some URIs (especially HTTP streams) require `open_input_stream` instead
of `open_input_file`.
   
   ## Changes
   
- Modified `download_bytes_threaded` in `plan_download_op.py` to try
both `open_input_file` and `open_input_stream` for each URI
- Improved error handling to distinguish between different error types
   - Failed downloads now return `None` gracefully instead of crashing
   
   ## Testing
```
import pyarrow as pa
from ray.data.context import DataContext
from ray.data._internal.planner.plan_download_op import download_bytes_threaded

# Test URLs: one valid, one 404
urls = [    
    "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&",
]

# Create PyArrow table and call download function
table = pa.table({"url": urls})
ctx = DataContext.get_current()
results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))

# Check results
result_table = results[0]
for i in range(result_table.num_rows):
    url = result_table['url'][i].as_py()
    bytes_data = result_table['bytes'][i].as_py()
    
    if bytes_data is None:
        print(f"Row {i}: FAILED (None) - try-catch worked ✓")
    else:
        print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)")
    print(f"  URL: {url[:60]}...")

print("\n✅ Test passed: Failed downloads return None instead of crashing.")
```

Before the fix:
```
TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/default/test_streaming_fallback.py", line 110, in <module>
    test_download_expression_with_streaming_fallback()
  File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback
    with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file):
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__
    if not self.__exit__(*sys.exc_info()):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__
    setattr(self.target, self.attribute, self.temp_original)
TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'
(base) ray@ip-10-0-39-21:~/default$ python test.py
2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker!
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file
Traceback (most recent call last):
  File "/home/ray/default/test.py", line 16, in <module>
    results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded
    uri_bytes = list(
                ^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen
    raise item
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file
```
After the fix:
```
Row 0: SUCCESS (189370 bytes)
  URL: https://static-assets.tesla.com/configurator/compositor?cont...
```
   
Tested with HTTP streaming URLs (e.g., Tesla configurator images) that
previously failed:
   - ✅ Successfully downloads HTTP stream files
   - ✅ Gracefully handles failed downloads (returns None)
   - ✅ Maintains backward compatibility with existing file downloads

---------

Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Co-authored-by: Robert Nishihara <robertnishihara@gmail.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…ay-project#58542)

## What does this PR do?

Fixes HTTP streaming file downloads in Ray Data's download operation.
Some URIs (especially HTTP streams) require `open_input_stream` instead
of `open_input_file`.

   ## Changes

- Modified `download_bytes_threaded` in `plan_download_op.py` to try
both `open_input_file` and `open_input_stream` for each URI
- Improved error handling to distinguish between different error types
   - Failed downloads now return `None` gracefully instead of crashing

   ## Testing
```
import pyarrow as pa
from ray.data.context import DataContext
from ray.data._internal.planner.plan_download_op import download_bytes_threaded

# Test URLs: one valid, one 404
urls = [
    "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&",
]

# Create PyArrow table and call download function
table = pa.table({"url": urls})
ctx = DataContext.get_current()
results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))

# Check results
result_table = results[0]
for i in range(result_table.num_rows):
    url = result_table['url'][i].as_py()
    bytes_data = result_table['bytes'][i].as_py()

    if bytes_data is None:
        print(f"Row {i}: FAILED (None) - try-catch worked ✓")
    else:
        print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)")
    print(f"  URL: {url[:60]}...")

print("\n✅ Test passed: Failed downloads return None instead of crashing.")
```

Before the fix:
```
TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/default/test_streaming_fallback.py", line 110, in <module>
    test_download_expression_with_streaming_fallback()
  File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback
    with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file):
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__
    if not self.__exit__(*sys.exc_info()):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__
    setattr(self.target, self.attribute, self.temp_original)
TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem'
(base) ray@ip-10-0-39-21:~/default$ python test.py
2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker!
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file
Traceback (most recent call last):
  File "/home/ray/default/test.py", line 16, in <module>
    results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded
    uri_bytes = list(
                ^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen
    raise item
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker
    for result in fn(input_queue_iter):
                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes
    yield f.read()
          ^^^^^^^^
  File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
  File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek
    raise ValueError("Cannot seek streaming HTTP file")
ValueError: Cannot seek streaming HTTP file
```
After the fix:
```
Row 0: SUCCESS (189370 bytes)
  URL: https://static-assets.tesla.com/configurator/compositor?cont...
```

Tested with HTTP streaming URLs (e.g., Tesla configurator images) that
previously failed:
   - ✅ Successfully downloads HTTP stream files
   - ✅ Gracefully handles failed downloads (returns None)
   - ✅ Maintains backward compatibility with existing file downloads

---------

Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: Robert Nishihara <robertnishihara@gmail.com>
Co-authored-by: Robert Nishihara <robertnishihara@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants