Skip to content

Commit 26422bd

Browse files
timsaucerclaude
andcommitted
refactor: tighten PR1 scope to codec plumbing only
Review feedback pass. PR1 is now strictly the composable codec layer + session routing + class-method serialization API. Anything that touches actual Python UDF inline encoding or Python expression wrapping moves to PR2 alongside the pickle work. Dropped: * `encode_python_scalar_udf` / `decode_python_scalar_udf` helpers from `crates/core/src/codec.rs`, along with cloudpickle and pyarrow imports. The wrapper codecs now delegate every method to `inner`. `DFPYUDF1` magic constant is kept (marked `dead_code` for now) as a reservation so PR2 has a single definition site. * `udf.rs` reverted to pre-PR1 shape. The codec no longer needs `func()` / `input_fields()` / `volatility()` / `from_parts()` accessors. Re-added by PR2 when scalar-UDF inlining lands. * `PyPhysicalExpr` class + Python wrapper + `__init__` export + `MyPhysicalExpr` FFI fixture + `_test_physical_expr.py`. No consumer in PR1 or PR2 plan documents; symmetry with `PyExecutionPlan` is not enough to justify the surface area. * Rust-side `PyLogicalPlan::to_proto` / `from_proto` and `PyExecutionPlan::to_proto` / `from_proto` deprecated wrappers. The deprecation lives entirely in the Python wrapper layer, which emits `DeprecationWarning` and forwards to `to_bytes` / `from_bytes`. Less Rust duplication. * `PythonLogicalCodec::with_default_inner` / `PythonPhysicalCodec::with_default_inner` — redundant with `impl Default`. Logic moved into `Default::default`. * `PySessionContext::default_logical_codec` / `default_physical_codec` helpers. Inlined as `Arc::new(PythonLogicalCodec::default())` at the three call sites. Tests (root: 1076, FFI example: 36) all green after the cuts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d6f4cc7 commit 26422bd

12 files changed

Lines changed: 55 additions & 593 deletions

File tree

crates/core/src/codec.rs

Lines changed: 44 additions & 191 deletions
Original file line numberDiff line numberDiff line change
@@ -18,68 +18,60 @@
1818
//! Python-aware extension codecs.
1919
//!
2020
//! [`PythonLogicalCodec`] wraps a user-supplied (or default)
21-
//! [`LogicalExtensionCodec`] and adds in-band encoding of Python-defined
22-
//! scalar UDFs via cloudpickle. [`PythonPhysicalCodec`] is the symmetric
23-
//! wrapper around [`PhysicalExtensionCodec`]; both reuse the same payload
24-
//! framing so a Python `ScalarUDF` round-trips identically through either
25-
//! codec layer.
21+
//! [`LogicalExtensionCodec`] and is the codec datafusion-python parks
22+
//! on every `SessionContext`. [`PythonPhysicalCodec`] is the symmetric
23+
//! wrapper around [`PhysicalExtensionCodec`].
2624
//!
27-
//! ## Wire-format magic prefix registry
25+
//! In PR1 both codecs delegate every call to their `inner` codec. The
26+
//! types exist so that follow-up work (pickle support, Python scalar
27+
//! UDF inline encoding) can add in-band Python payloads without
28+
//! re-plumbing the session field.
2829
//!
29-
//! Each Python-inline payload begins with an 8-byte magic prefix so the
30-
//! decoder can distinguish it from arbitrary `fun_definition` bytes (e.g.
31-
//! produced by a user FFI codec or left empty by the default codec).
30+
//! ## Wire-format magic prefix registry
3231
//!
33-
//! | Layer + kind | Magic prefix | Owner |
34-
//! | ----------------------------- | ------------ | -------------- |
35-
//! | `PythonLogicalCodec` scalar | `DFPYUDF1` | in use |
36-
//! | `PythonLogicalCodec` agg | `DFPYUDA1` | reserved |
37-
//! | `PythonLogicalCodec` window | `DFPYUDW1` | reserved |
38-
//! | `PythonPhysicalCodec` scalar | `DFPYUDF1` | in use (shared with logical) |
39-
//! | `PythonPhysicalCodec` agg | `DFPYUDA1` | reserved |
40-
//! | `PythonPhysicalCodec` window | `DFPYUDW1` | reserved |
41-
//! | `PythonPhysicalCodec` expr | `DFPYPE1` | reserved |
42-
//! | User FFI extension codec | user-chosen | downstream |
43-
//! | Default codec | (none) | upstream |
32+
//! Future in-band Python payloads will be prefixed with an 8-byte
33+
//! magic so the decoder can distinguish them from arbitrary
34+
//! `fun_definition` bytes produced by the default codec or a user FFI
35+
//! codec.
4436
//!
45-
//! Dispatch precedence inside the Python codecs: **Python-inline payload
46-
//! (magic prefix match) → `inner` codec → caller's registry fallback.**
47-
//! When a payload does not match a Python magic prefix the call delegates
48-
//! to `inner`, which is typically `DefaultLogicalExtensionCodec` /
49-
//! `DefaultPhysicalExtensionCodec` but may be a user-supplied FFI codec
50-
//! installed via `SessionContext.with_logical_extension_codec(...)` /
51-
//! `with_physical_extension_codec(...)`.
37+
//! | Layer + kind | Magic prefix | Status |
38+
//! | ----------------------------- | ------------ | ------------- |
39+
//! | `PythonLogicalCodec` scalar | `DFPYUDF1` | reserved (PR2)|
40+
//! | `PythonLogicalCodec` agg | `DFPYUDA1` | reserved |
41+
//! | `PythonLogicalCodec` window | `DFPYUDW1` | reserved |
42+
//! | `PythonPhysicalCodec` scalar | `DFPYUDF1` | reserved (PR2)|
43+
//! | `PythonPhysicalCodec` agg | `DFPYUDA1` | reserved |
44+
//! | `PythonPhysicalCodec` window | `DFPYUDW1` | reserved |
45+
//! | `PythonPhysicalCodec` expr | `DFPYPE1` | reserved |
46+
//! | User FFI extension codec | user-chosen | downstream |
47+
//! | Default codec | (none) | upstream |
5248
//!
53-
//! User FFI codecs should pick non-colliding prefixes (recommend a `DF`
54-
//! namespace plus a crate-specific suffix).
49+
//! Dispatch precedence once in-band payloads land: **Python-inline
50+
//! payload (magic prefix match) → `inner` codec → caller's registry
51+
//! fallback.** User FFI codecs should pick non-colliding prefixes
52+
//! (recommend a `DF` namespace plus a crate-specific suffix).
5553
5654
use std::sync::Arc;
5755

58-
use arrow::datatypes::{Field, Schema};
59-
use arrow::pyarrow::ToPyArrow;
60-
use datafusion::arrow::datatypes::SchemaRef;
61-
use datafusion::arrow::pyarrow::FromPyArrow;
56+
use arrow::datatypes::SchemaRef;
6257
use datafusion::common::{Result, TableReference};
6358
use datafusion::datasource::TableProvider;
6459
use datafusion::execution::TaskContext;
65-
use datafusion::logical_expr::{Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl};
60+
use datafusion::logical_expr::{Extension, LogicalPlan, ScalarUDF};
6661
use datafusion::physical_plan::ExecutionPlan;
6762
use datafusion_proto::logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec};
6863
use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
69-
use pyo3::BoundObject;
70-
use pyo3::prelude::*;
71-
use pyo3::types::{PyBytes, PyTuple};
72-
73-
use crate::udf::PythonFunctionScalarUDF;
7464

75-
/// Magic prefix for an inlined Python scalar UDF payload. Shared between
76-
/// logical and physical codec layers — the cloudpickled tuple shape is
77-
/// identical, so a `ScalarUDF` reaching either codec encodes the same way.
65+
/// Reserved magic prefix for an inlined Python scalar UDF payload.
66+
/// Not produced or consumed by PR1; the constant is reserved here so
67+
/// follow-up work has a single definition site.
68+
#[allow(dead_code)]
7869
pub(crate) const PY_SCALAR_UDF_MAGIC: &[u8] = b"DFPYUDF1";
7970

80-
/// `LogicalExtensionCodec` that serializes Python scalar UDFs inline and
81-
/// delegates every other call to a composable `inner` codec. See module
82-
/// docs for the dispatch precedence.
71+
/// `LogicalExtensionCodec` parked on every `SessionContext`. Wraps a
72+
/// composable `inner` codec; PR1 delegates every method straight
73+
/// through. The wrapper exists so follow-up patches can add Python
74+
/// in-band encoding without changing every serializer.
8375
#[derive(Debug)]
8476
pub struct PythonLogicalCodec {
8577
inner: Arc<dyn LogicalExtensionCodec>,
@@ -90,19 +82,14 @@ impl PythonLogicalCodec {
9082
Self { inner }
9183
}
9284

93-
/// Convenience constructor wrapping `DefaultLogicalExtensionCodec`.
94-
pub fn with_default_inner() -> Self {
95-
Self::new(Arc::new(DefaultLogicalExtensionCodec {}))
96-
}
97-
9885
pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
9986
&self.inner
10087
}
10188
}
10289

10390
impl Default for PythonLogicalCodec {
10491
fn default() -> Self {
105-
Self::with_default_inner()
92+
Self::new(Arc::new(DefaultLogicalExtensionCodec {}))
10693
}
10794
}
10895

@@ -141,29 +128,17 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
141128
}
142129

143130
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
144-
if try_encode_python_scalar_udf(node, buf)? {
145-
return Ok(());
146-
}
147131
self.inner.try_encode_udf(node, buf)
148132
}
149133

150134
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
151-
match try_decode_python_scalar_udf(buf)? {
152-
Some(udf) => Ok(udf),
153-
None => self.inner.try_decode_udf(name, buf),
154-
}
135+
self.inner.try_decode_udf(name, buf)
155136
}
156137
}
157138

158-
/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`]. Scalar-UDF
159-
/// payloads use the shared `DFPYUDF1` framing so an `ExecutionPlan` or
160-
/// `PhysicalExpr` that references a Python `ScalarUDF` round-trips
161-
/// regardless of which codec layer encoded it.
162-
///
163-
/// Encoding for Python-defined `ExecutionPlan` impls and `PhysicalExpr`
164-
/// impls (the `DFPYPE*` namespace) is reserved for a future change — no
165-
/// concrete Python-side physical extension type exists today, so all
166-
/// non-UDF calls delegate to `inner`.
139+
/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`]. Same
140+
/// motivation: a stable session field that follow-up patches can layer
141+
/// Python in-band encoding onto.
167142
#[derive(Debug)]
168143
pub struct PythonPhysicalCodec {
169144
inner: Arc<dyn PhysicalExtensionCodec>,
@@ -174,18 +149,14 @@ impl PythonPhysicalCodec {
174149
Self { inner }
175150
}
176151

177-
pub fn with_default_inner() -> Self {
178-
Self::new(Arc::new(DefaultPhysicalExtensionCodec {}))
179-
}
180-
181152
pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
182153
&self.inner
183154
}
184155
}
185156

186157
impl Default for PythonPhysicalCodec {
187158
fn default() -> Self {
188-
Self::with_default_inner()
159+
Self::new(Arc::new(DefaultPhysicalExtensionCodec {}))
189160
}
190161
}
191162

@@ -204,128 +175,10 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
204175
}
205176

206177
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
207-
if try_encode_python_scalar_udf(node, buf)? {
208-
return Ok(());
209-
}
210178
self.inner.try_encode_udf(node, buf)
211179
}
212180

213181
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
214-
match try_decode_python_scalar_udf(buf)? {
215-
Some(udf) => Ok(udf),
216-
None => self.inner.try_decode_udf(name, buf),
217-
}
218-
}
219-
}
220-
221-
/// Encode a Python scalar UDF inline if `node` is one. Returns `Ok(true)`
222-
/// when the payload (magic prefix + cloudpickle tuple) was written, or
223-
/// `Ok(false)` to signal the caller should delegate to its `inner` codec
224-
/// (non-Python UDF, e.g. built-in or FFI capsule).
225-
pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<bool> {
226-
let Some(py_udf) = node
227-
.inner()
228-
.as_any()
229-
.downcast_ref::<PythonFunctionScalarUDF>()
230-
else {
231-
return Ok(false);
232-
};
233-
234-
Python::attach(|py| -> Result<bool> {
235-
let bytes = encode_python_scalar_udf(py, py_udf)
236-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
237-
buf.extend_from_slice(PY_SCALAR_UDF_MAGIC);
238-
buf.extend_from_slice(&bytes);
239-
Ok(true)
240-
})
241-
}
242-
243-
/// Decode an inline Python scalar UDF payload. Returns `Ok(None)` when
244-
/// `buf` does not carry the magic prefix, signalling the caller should
245-
/// delegate to its `inner` codec (which will typically defer to the
246-
/// `FunctionRegistry`).
247-
pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> {
248-
if buf.is_empty() || !buf.starts_with(PY_SCALAR_UDF_MAGIC) {
249-
return Ok(None);
182+
self.inner.try_decode_udf(name, buf)
250183
}
251-
let payload = &buf[PY_SCALAR_UDF_MAGIC.len()..];
252-
253-
Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
254-
let udf = decode_python_scalar_udf(py, payload)
255-
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
256-
Ok(Some(Arc::new(ScalarUDF::new_from_impl(udf))))
257-
})
258-
}
259-
260-
/// Build the cloudpickle payload for a `PythonFunctionScalarUDF`.
261-
///
262-
/// Layout: `cloudpickle.dumps((name, func, input_schema_bytes,
263-
/// return_field, volatility_str))`. Input fields ride along as an
264-
/// IPC-encoded pyarrow Schema so they round-trip without extra plumbing.
265-
fn encode_python_scalar_udf(py: Python<'_>, udf: &PythonFunctionScalarUDF) -> PyResult<Vec<u8>> {
266-
let cloudpickle = py.import("cloudpickle")?;
267-
268-
let input_schema = Schema::new(udf.input_fields().to_vec());
269-
let pa_schema_obj = input_schema.to_pyarrow(py)?;
270-
let pa_schema = pa_schema_obj.into_bound();
271-
let schema_bytes: Vec<u8> = pa_schema
272-
.call_method0("serialize")?
273-
.call_method0("to_pybytes")?
274-
.extract()?;
275-
276-
let return_field_obj = udf.return_field().as_ref().to_pyarrow(py)?;
277-
let volatility = format!("{:?}", udf.volatility()).to_lowercase();
278-
279-
let payload = PyTuple::new(
280-
py,
281-
[
282-
udf.name().into_pyobject(py)?.into_any(),
283-
udf.func().bind(py).clone().into_any(),
284-
PyBytes::new(py, &schema_bytes).into_any(),
285-
return_field_obj.into_bound(),
286-
volatility.into_pyobject(py)?.into_any(),
287-
],
288-
)?;
289-
290-
let blob = cloudpickle.call_method1("dumps", (payload,))?;
291-
blob.extract::<Vec<u8>>()
292-
}
293-
294-
/// Inverse of [`encode_python_scalar_udf`].
295-
fn decode_python_scalar_udf(py: Python<'_>, payload: &[u8]) -> PyResult<PythonFunctionScalarUDF> {
296-
let cloudpickle = py.import("cloudpickle")?;
297-
let pyarrow = py.import("pyarrow")?;
298-
299-
let tuple = cloudpickle
300-
.call_method1("loads", (PyBytes::new(py, payload),))?
301-
.cast_into::<PyTuple>()?;
302-
303-
let name: String = tuple.get_item(0)?.extract()?;
304-
let func: Py<PyAny> = tuple.get_item(1)?.unbind();
305-
let schema_bytes: Vec<u8> = tuple.get_item(2)?.extract()?;
306-
let return_field_py = tuple.get_item(3)?;
307-
let volatility_str: String = tuple.get_item(4)?.extract()?;
308-
309-
let buffer = pyarrow.call_method1("py_buffer", (PyBytes::new(py, &schema_bytes),))?;
310-
let pa_schema = pyarrow
311-
.getattr("ipc")?
312-
.call_method1("read_schema", (buffer,))?;
313-
314-
let schema = Schema::from_pyarrow_bound(&pa_schema)
315-
.map_err(|e| pyo3::exceptions::PyValueError::new_err(format!("{e}")))?;
316-
let input_fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
317-
318-
let return_field = Field::from_pyarrow_bound(&return_field_py)
319-
.map_err(|e| pyo3::exceptions::PyValueError::new_err(format!("{e}")))?;
320-
321-
let volatility = datafusion_python_util::parse_volatility(&volatility_str)
322-
.map_err(|e| pyo3::exceptions::PyValueError::new_err(format!("{e}")))?;
323-
324-
Ok(PythonFunctionScalarUDF::from_parts(
325-
name,
326-
func,
327-
input_fields,
328-
return_field,
329-
volatility,
330-
))
331184
}

crates/core/src/context.rs

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,10 @@ impl PySessionContext {
398398
.with_default_features()
399399
.build();
400400
let ctx = Arc::new(SessionContext::new_with_state(session_state));
401-
let logical_codec = Self::default_logical_codec();
402-
let physical_codec = Self::default_physical_codec();
403401
Ok(PySessionContext {
404402
ctx,
405-
logical_codec,
406-
physical_codec,
403+
logical_codec: Arc::new(PythonLogicalCodec::default()),
404+
physical_codec: Arc::new(PythonPhysicalCodec::default()),
407405
})
408406
}
409407

@@ -419,12 +417,10 @@ impl PySessionContext {
419417
#[pyo3(signature = ())]
420418
pub fn global_ctx() -> PyResult<Self> {
421419
let ctx = get_global_ctx().clone();
422-
let logical_codec = Self::default_logical_codec();
423-
let physical_codec = Self::default_physical_codec();
424420
Ok(Self {
425421
ctx,
426-
logical_codec,
427-
physical_codec,
422+
logical_codec: Arc::new(PythonLogicalCodec::default()),
423+
physical_codec: Arc::new(PythonPhysicalCodec::default()),
428424
})
429425
}
430426

@@ -1458,14 +1454,6 @@ impl PySessionContext {
14581454
Ok(())
14591455
}
14601456

1461-
fn default_logical_codec() -> Arc<PythonLogicalCodec> {
1462-
Arc::new(PythonLogicalCodec::default())
1463-
}
1464-
1465-
fn default_physical_codec() -> Arc<PythonPhysicalCodec> {
1466-
Arc::new(PythonPhysicalCodec::default())
1467-
}
1468-
14691457
/// Session-scoped logical codec. Sibling modules read this when they
14701458
/// need to serialize/deserialize logical-layer types (LogicalPlan,
14711459
/// Expr) against the user-installed (or default) codec stack.
@@ -1525,14 +1513,10 @@ impl From<PySessionContext> for SessionContext {
15251513

15261514
impl From<SessionContext> for PySessionContext {
15271515
fn from(ctx: SessionContext) -> PySessionContext {
1528-
let ctx = Arc::new(ctx);
1529-
let logical_codec = Self::default_logical_codec();
1530-
let physical_codec = Self::default_physical_codec();
1531-
15321516
PySessionContext {
1533-
ctx,
1534-
logical_codec,
1535-
physical_codec,
1517+
ctx: Arc::new(ctx),
1518+
logical_codec: Arc::new(PythonLogicalCodec::default()),
1519+
physical_codec: Arc::new(PythonPhysicalCodec::default()),
15361520
}
15371521
}
15381522
}

crates/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
9797
m.add_class::<metrics::PyMetricsSet>()?;
9898
m.add_class::<metrics::PyMetric>()?;
9999
m.add_class::<physical_plan::PyExecutionPlan>()?;
100-
m.add_class::<physical_plan::PyPhysicalExpr>()?;
101100
m.add_class::<record_batch::PyRecordBatch>()?;
102101
m.add_class::<record_batch::PyRecordBatchStream>()?;
103102

0 commit comments

Comments
 (0)