feat: enable pickling of most Expr except udaf and udwf#1544
Conversation
Adds Python-aware encoding to PythonLogicalCodec/PythonPhysicalCodec so a ScalarUDF defined in Python travels inside the serialized expression (cloudpickled into fun_definition) instead of needing a matching registration on the receiver. With that in place, Expr gains __reduce__ + classmethod from_bytes(buf, ctx=None) so pickle.dumps / pickle.loads work end-to-end on expressions built from col, lit, built-in functions, and Python scalar UDFs. Wire format is framed as <DFPYUDF magic, version byte, cloudpickle tuple>; the version byte lets a too-new/too-old payload surface a clean Execution error instead of an opaque cloudpickle unpack failure. Schema serde is via arrow-rs's native IPC (no pyarrow round-trip). Cloudpickle module handle is cached per-interpreter through PyOnceLock. Worker-side context resolution lives in a new datafusion.ipc module: set_worker_ctx / get_worker_ctx / clear_worker_ctx plus a private _resolve_ctx helper consulted by Expr.from_bytes. Priority is explicit ctx > worker ctx > global SessionContext. FFI UDFs still travel by name and require the matching registration on the receiver's context. Aggregate and window UDF inline encoding, the per-session with_python_udf_inlining toggle, sender-side context, and the user-guide docs land in follow-on PRs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…edge-case tests
Inline `.. warning::` blocks on `Expr.to_bytes`, `Expr.from_bytes`, and
`Expr.__reduce__` so the cloudpickle / arbitrary-code-execution caveat is
visible at the public API surface in advance of the user-guide page that
lands in PR 4.
Add doctest-style `Examples:` blocks to `datafusion.ipc` functions
(`set_worker_ctx`, `clear_worker_ctx`, `get_worker_ctx`, `_resolve_ctx`),
`ScalarUDF.name`, and the new `Expr` pickle methods, per CLAUDE.md.
Tighten `Expr.__reduce__` return annotation to
`tuple[Callable[[bytes], Expr], tuple[bytes]]`.
Tests: multi-arg UDF round-trip (covers synthetic `arg_{i}` schema-field
loop in the codec) plus malformed-bytes paths through `Expr.from_bytes`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ntjohnson1
left a comment
There was a problem hiding this comment.
The general shape of the changes here seem reasonable. I guess my only lingering thought/question is mostly around cloudpickle capabilities and the usage of it.
| }) | ||
| } | ||
|
|
||
| /// Build the cloudpickle payload for a `PythonFunctionScalarUDF`. |
There was a problem hiding this comment.
Maybe it is capture more clearly somewhere else but it feels like there is some nuance of the dependency on cloudpickle that's not fully communicated here. I didn't do too much of a deep dive on it.
-
cloudpickle only works on the same version of python (I'm not sure if it detects the mismatch with a nice error). So potentially your header might want to capture the source python version to give a nicer error and advertise that there is a limitation of only sending to the same version of python for remote workers
-
cloudpickle seems to have serialize by reference (more like dill) and by value (super cool). The former needs the function installed in the environment so when deserialized it can reference it where maybe the later tries to just capture all necessary bits (here is where I didn't deep dive a ton). Those are fairly different mental models for support.
There was a problem hiding this comment.
Good points! I updated docstring and also added in version checking.
| def test_udf_self_contained_blob(self): | ||
| e = _double_udf()(col("a")) | ||
| blob = pickle.dumps(e) | ||
| # The codec inlines the callable, so the blob is much bigger than a |
There was a problem hiding this comment.
I think this is testing the thing I was asking about but I haven't thought deeply enough if it actually does. If I know cloud pickle says it can serialize lambdas but if I instead had
from foo import double
def _double_udf():
return udf(
double,
[pa.int64()],
pa.int64(),
volatility="immutable",
name="double",
)Would I still be able to deserialize this on remote in a python environment without foo?
There was a problem hiding this comment.
Updated documentation to explain when you need foo available in the worker
cloudpickle bytecode is not portable across Python minor versions —
a payload produced on 3.11 fails to load on 3.12 with an opaque
marshal/unpickle error. Embed the sender's (major, minor) in the
DFPYUDF wire header and reject mismatches at decode time with an
actionable error that names both versions, instead of letting the
failure surface from inside cloudpickle.loads.
Header layout becomes:
DFPYUDF (7) | version (1) | py_major (1) | py_minor (1) | cloudpickle
Extend the Security warnings on Expr.to_bytes / from_bytes /
__reduce__ with a Portability section covering the cross-version
constraint and cloudpickle's by-value/by-reference behavior (the
callable inlines bytecode and closure cells, but imported names
travel by reference and must be importable on the receiver). Add
a matching Serialization model note to the datafusion.ipc module
docstring.
New tests:
- codec::wire_header_tests: py-major/minor mismatch, truncated
py-version bytes, round-trip with py-version
- test_pickle_expr::test_cross_version_error_message: patches the
py_minor byte inside an emitted payload and asserts the error
message identifies the version mismatch
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Which issue does this PR close?
Addresses part of #1517
This is PR 1 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.
Follow up PRs:
Rationale for this change
Today a
LogicalPlanorExprreferencing a Python-definedScalarUDFcannot survive a serialization round-trip without the receiver pre-registering a matching UDF, because the upstream protobuf codecs only carry the UDF name. That blocks shipping expressions to worker processes viapickle.dumps/multiprocessing.Pool/ Ray actors /datafusion-distributed. This PR closes the scalar-UDF case end-to-end so the naturalpickle.dumps(expr)pattern works for built-ins and Python scalar UDFs with no receiver-side setup.What changes are included in this PR?
Adds Python-aware encoding to
PythonLogicalCodecandPythonPhysicalCodec.On the Python side,
Exprgains__reduce__plus aclassmethod from_bytes(buf, ctx=None). A newdatafusion.ipcmodule exposesset_worker_ctx/get_worker_ctx/clear_worker_ctxthread-locals;_resolve_ctxconsults explicit-ctx > worker-ctx > globalSessionContext.cloudpickle>=2.0is added as a runtime dependency (lazy-imported on the encode/decode hot path). This is a tiny dependency, in the kilobyte range.Aggregate and window inline encoding, the per-session
with_python_udf_inliningtoggle, sender-side context wiring, and the user-guide docs land in PRs 2-4 of this series.Are there any user-facing changes?
Yes, but these are only additions.
Expris now picklable. Built-ins and Python scalar UDFs round-trip with no worker-side setup.Expr.to_bytes(ctx=None)/Expr.from_bytes(buf, ctx=None)shape.from_bytesis now aclassmethodwithctxas a keyword-onlyNone-default. Breaking for any directExpr.from_bytes(ctx, blob)callers — the in-tree call sites are updated.datafusion.ipcwithset_worker_ctx/get_worker_ctx/clear_worker_ctx.ScalarUDF.nameproperty.cloudpickle>=2.0.Expr.from_bytes` has a signature flip, but that is unreleased (only merged yesterday) and so not a change any user will experience.