GH-49669: [Python] Expose the experimental async device API through Python#49670
GH-49669: [Python] Expose the experimental async device API through Python#49670CurtHagenlocher wants to merge 3 commits intoapache:mainfrom
Conversation
|
|
There was a problem hiding this comment.
Pull request overview
This PR adds initial Python bindings and tests for consuming Arrow’s experimental async device stream interface via an AsyncRecordBatchReader async-iterator.
Changes:
- Introduces a new
AsyncRecordBatchReaderCython type withasync forsupport and schema access. - Adds a C++ test-only roundtrip helper to wire a producer and consumer via
ArrowAsyncDeviceStreamHandler. - Adds Python tests validating async iteration, empty streams, schema, context manager usage, and backpressure via small queue sizing.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| python/pyarrow/tests/test_ipc_async.py | Adds unit tests for the new async reader behavior. |
| python/pyarrow/src/arrow/python/async_stream.h | Adds C++ helper functions to bridge async generators and implement a roundtrip producer/consumer for tests. |
| python/pyarrow/lib.pyx | Includes the new ipc_async.pxi bindings in the extension build. |
| python/pyarrow/lib.pxd | Declares the new AsyncRecordBatchReader Cython type. |
| python/pyarrow/ipc_async.pxi | Implements AsyncRecordBatchReader and a private async roundtrip test helper. |
| python/pyarrow/includes/libarrow.pxd | Adds Cython declarations for async stream ABI / bridge entrypoints. |
| python/pyarrow/init.py | Exports AsyncRecordBatchReader at the top-level pyarrow namespace. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| inline Future<AsyncRecordBatchGenerator> RoundtripAsyncBatches( | ||
| std::shared_ptr<Schema> schema, std::vector<std::shared_ptr<RecordBatch>> batches, | ||
| ::arrow::internal::Executor* executor, uint64_t queue_size = 5) { | ||
| // Heap-allocate the handler so it outlives this function. | ||
| auto* handler = new ArrowAsyncDeviceStreamHandler; | ||
| std::memset(handler, 0, sizeof(ArrowAsyncDeviceStreamHandler)); | ||
|
|
||
| auto fut_gen = CreateAsyncDeviceStreamHandler(handler, executor, queue_size); | ||
|
|
||
| // Submit the export to the executor so it runs concurrently with the consumer. | ||
| auto submit_result = executor->Submit( | ||
| [schema = std::move(schema), batches = std::move(batches), handler]() mutable { | ||
| auto generator = MakeVectorGenerator(std::move(batches)); | ||
| return ExportAsyncRecordBatchReader(std::move(schema), std::move(generator), | ||
| DeviceAllocationType::kCPU, handler); | ||
| }); | ||
|
|
||
| if (!submit_result.ok()) { | ||
| return Future<AsyncRecordBatchGenerator>::MakeFinished(submit_result.status()); | ||
| } | ||
|
|
||
| return fut_gen; |
There was a problem hiding this comment.
RoundtripAsyncBatches heap-allocates ArrowAsyncDeviceStreamHandler but never deletes it. CreateAsyncDeviceStreamHandler sets handler->release to only delete private_data (see cpp/src/arrow/c/bridge.cc:2696-2698), and ExportAsyncRecordBatchReader calls handler->release(handler) but does not free the handler itself, so this leaks per roundtrip and can accumulate in test runs. Manage handler ownership explicitly (e.g., capture the Future returned by executor->Submit, attach a continuation that deletes the handler after ExportAsyncRecordBatchReader completes, and ensure the submit-failure path also cleans up and marks the consumer future finished).
| f"Do not call {self.__class__.__name__}'s constructor directly, " | ||
| "use factory methods instead.") |
There was a problem hiding this comment.
The constructor error message instructs users to "use factory methods instead", but no public factory is introduced for AsyncRecordBatchReader in this PR (only the private test helper _test_roundtrip_async). Either provide/mention the actual public factory in the message, or adjust the message to avoid pointing to APIs that don't exist.
| f"Do not call {self.__class__.__name__}'s constructor directly, " | |
| "use factory methods instead.") | |
| f"Do not call {self.__class__.__name__}'s constructor directly; " | |
| f"{self.__class__.__name__} instances are created by pyarrow APIs.") |
| from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, | ||
| concat_arrays, concat_tables, TableGroupBy, | ||
| RecordBatchReader, concat_batches) | ||
| RecordBatchReader, AsyncRecordBatchReader, | ||
| concat_batches) |
There was a problem hiding this comment.
The PR description mentions a "new API to produce an async device stream", but the Python surface added here only exports the AsyncRecordBatchReader type; there is no public factory/function that returns an instance (search shows no other AsyncRecordBatchReader references besides ipc_async.pxi and this import). If the user-facing entrypoint is intended to be part of this PR, it appears to be missing.
|
Heh... clearly I understood the code even less than I'd thought... |
Rationale for this change
Allow an async device stream to be consumed from within Python.
What changes are included in this PR?
Code and tests to allow an async device stream to be consumed from within Python.
Are these changes tested?
Yes.
Are there any user-facing changes?
There's a new API to produce an async device stream.
DISCLOSURE: this change was heavily supported by AI. I understand the code and made some changes to it, but would likely not have put in the effort to understand how to make the change without AI help.