-
Notifications
You must be signed in to change notification settings - Fork 4.1k
GH-49669: [Python] Expose the experimental async device API through Python #49670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,156 @@ | ||||||||||
| # Licensed to the Apache Software Foundation (ASF) under one | ||||||||||
| # or more contributor license agreements. See the NOTICE file | ||||||||||
| # distributed with this work for additional information | ||||||||||
| # regarding copyright ownership. The ASF licenses this file | ||||||||||
| # to you under the Apache License, Version 2.0 (the | ||||||||||
| # "License"); you may not use this file except in compliance | ||||||||||
| # with the License. You may obtain a copy of the License at | ||||||||||
| # | ||||||||||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||
| # | ||||||||||
| # Unless required by applicable law or agreed to in writing, | ||||||||||
| # software distributed under the License is distributed on an | ||||||||||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||
| # KIND, either express or implied. See the License for the | ||||||||||
| # specific language governing permissions and limitations | ||||||||||
| # under the License. | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class _AsyncioCall: | ||||||||||
| """State for an async operation using asyncio.""" | ||||||||||
|
|
||||||||||
| def __init__(self): | ||||||||||
| import asyncio | ||||||||||
| self._future = asyncio.get_running_loop().create_future() | ||||||||||
|
|
||||||||||
| def as_awaitable(self): | ||||||||||
| return self._future | ||||||||||
|
|
||||||||||
| def wakeup(self, result_or_exception): | ||||||||||
| loop = self._future.get_loop() | ||||||||||
| if isinstance(result_or_exception, BaseException): | ||||||||||
| loop.call_soon_threadsafe( | ||||||||||
| self._future.set_exception, result_or_exception) | ||||||||||
| else: | ||||||||||
| loop.call_soon_threadsafe( | ||||||||||
| self._future.set_result, result_or_exception) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| cdef object _wrap_record_batch_or_none(CRecordBatchWithMetadata batch_with_md): | ||||||||||
| """Wrap a CRecordBatchWithMetadata as a RecordBatch, or return None at end-of-stream.""" | ||||||||||
| if batch_with_md.batch.get() == NULL: | ||||||||||
| return None | ||||||||||
| return pyarrow_wrap_batch(batch_with_md.batch) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| cdef object _wrap_async_generator(CAsyncRecordBatchGenerator gen): | ||||||||||
| """Wrap a CAsyncRecordBatchGenerator into an AsyncRecordBatchReader.""" | ||||||||||
| cdef AsyncRecordBatchReader reader = AsyncRecordBatchReader.__new__( | ||||||||||
| AsyncRecordBatchReader) | ||||||||||
| cdef CAsyncRecordBatchGenerator* p = new CAsyncRecordBatchGenerator() | ||||||||||
| p.schema = gen.schema | ||||||||||
| p.device_type = gen.device_type | ||||||||||
| p.generator = move(gen.generator) | ||||||||||
| reader.generator.reset(p) | ||||||||||
| reader._schema = None | ||||||||||
| return reader | ||||||||||
|
|
||||||||||
|
|
||||||||||
| cdef class AsyncRecordBatchReader(_Weakrefable): | ||||||||||
| """Asynchronous reader for a stream of record batches. | ||||||||||
|
|
||||||||||
| This class provides an async iterator interface for consuming record | ||||||||||
| batches from an asynchronous device stream. | ||||||||||
|
|
||||||||||
| This interface is EXPERIMENTAL. | ||||||||||
|
|
||||||||||
| Examples | ||||||||||
| -------- | ||||||||||
| >>> async for batch in reader: # doctest: +SKIP | ||||||||||
| ... process(batch) | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| def __init__(self): | ||||||||||
| raise TypeError( | ||||||||||
| f"Do not call {self.__class__.__name__}'s constructor directly, " | ||||||||||
| "use factory methods instead.") | ||||||||||
|
Comment on lines
+75
to
+76
|
||||||||||
| f"Do not call {self.__class__.__name__}'s constructor directly, " | |
| "use factory methods instead.") | |
| f"Do not call {self.__class__.__name__}'s constructor directly; " | |
| f"{self.__class__.__name__} instances are created by pyarrow APIs.") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <cstring> | ||
| #include <memory> | ||
| #include <utility> | ||
| #include <vector> | ||
|
|
||
| #include "arrow/c/bridge.h" | ||
| #include "arrow/record_batch.h" | ||
| #include "arrow/util/async_generator.h" | ||
| #include "arrow/util/future.h" | ||
| #include "arrow/util/thread_pool.h" | ||
|
|
||
| namespace arrow::py { | ||
|
|
||
| /// \brief Call an AsyncGenerator<RecordBatchWithMetadata> and return the Future. | ||
| /// | ||
| /// This is needed because Cython cannot invoke std::function objects directly. | ||
| inline Future<RecordBatchWithMetadata> CallAsyncGenerator( | ||
| AsyncGenerator<RecordBatchWithMetadata>& generator) { | ||
| return generator(); | ||
| } | ||
|
|
||
| /// \brief Create a roundtrip async producer+consumer pair for testing. | ||
| /// | ||
| /// Allocates an ArrowAsyncDeviceStreamHandler on the heap, calls | ||
| /// CreateAsyncDeviceStreamHandler (consumer side), then submits | ||
| /// ExportAsyncRecordBatchReader (producer side) on the given executor. | ||
| /// Returns a Future that resolves to the AsyncRecordBatchGenerator once | ||
| /// the schema is available. | ||
| inline Future<AsyncRecordBatchGenerator> RoundtripAsyncBatches( | ||
| std::shared_ptr<Schema> schema, std::vector<std::shared_ptr<RecordBatch>> batches, | ||
| ::arrow::internal::Executor* executor, uint64_t queue_size = 5) { | ||
| // Heap-allocate the handler so it outlives this function. | ||
| auto* handler = new ArrowAsyncDeviceStreamHandler; | ||
| std::memset(handler, 0, sizeof(ArrowAsyncDeviceStreamHandler)); | ||
|
|
||
| auto fut_gen = CreateAsyncDeviceStreamHandler(handler, executor, queue_size); | ||
|
|
||
| // Submit the export to the executor so it runs concurrently with the consumer. | ||
| auto submit_result = executor->Submit( | ||
| [schema = std::move(schema), batches = std::move(batches), handler]() mutable { | ||
| auto generator = MakeVectorGenerator(std::move(batches)); | ||
| return ExportAsyncRecordBatchReader(std::move(schema), std::move(generator), | ||
| DeviceAllocationType::kCPU, handler); | ||
| }); | ||
|
|
||
| if (!submit_result.ok()) { | ||
| return Future<AsyncRecordBatchGenerator>::MakeFinished(submit_result.status()); | ||
| } | ||
|
|
||
| return fut_gen; | ||
|
Comment on lines
+48
to
+69
|
||
| } | ||
|
|
||
| } // namespace arrow::py | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description mentions a "new API to produce an async device stream", but the Python surface added here only exports the
AsyncRecordBatchReadertype; there is no public factory/function that returns an instance (search shows no otherAsyncRecordBatchReaderreferences besidesipc_async.pxiand this import). If the user-facing entrypoint is intended to be part of this PR, it appears to be missing.