feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900
feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900aglinxinyuan wants to merge 4 commits into
Conversation
…ant) Extend the cross-region State materialization format from 1 column (content) to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri. The loop bookkeeping is promoted to first-class columns (never inside the content JSON), and the transport carries them: OutputManager state writer + emit, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites. Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the loop columns to 0/"", so every existing non-loop caller is unchanged, and fromTuple/from_tuple read only the content column. The columns activate only once the loop operators set them (follow-up PR). State materialization is intra-execution (execution-scoped iceberg URI, recreated fresh each run), so no backward-compatible read of old 1-column data is needed. Extracted from apache#5700 (loop operators); part of apache#4442.
Automated Reviewer SuggestionsBased on the
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5900 +/- ##
============================================
+ Coverage 54.11% 55.02% +0.91%
- Complexity 2819 2988 +169
============================================
Files 1103 1111 +8
Lines 42650 42925 +275
Branches 4588 4623 +35
============================================
+ Hits 23079 23621 +542
+ Misses 18226 17916 -310
- Partials 1345 1388 +43
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 391 | 0.239 | 25,144/35,671/35,671 us | 🔴 +14.1% / 🔴 +132.6% |
| 🔴 | bs=100 sw=10 sl=64 | 809 | 0.494 | 122,134/140,551/140,551 us | 🔴 +6.7% / 🔴 +27.1% |
| ⚪ | bs=1000 sw=10 sl=64 | 941 | 0.575 | 1,062,243/1,107,215/1,107,215 us | ⚪ within ±5% / ⚪ within ±5% |
Baseline details
Latest main 8803d08 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 391 tuples/sec | 443 tuples/sec | 757.16 tuples/sec | -11.7% | -48.4% |
| bs=10 sw=10 sl=64 | MB/s | 0.239 MB/s | 0.271 MB/s | 0.462 MB/s | -11.8% | -48.3% |
| bs=10 sw=10 sl=64 | p50 | 25,144 us | 22,045 us | 12,971 us | +14.1% | +93.8% |
| bs=10 sw=10 sl=64 | p95 | 35,671 us | 32,753 us | 15,333 us | +8.9% | +132.6% |
| bs=10 sw=10 sl=64 | p99 | 35,671 us | 32,753 us | 18,732 us | +8.9% | +90.4% |
| bs=100 sw=10 sl=64 | throughput | 809 tuples/sec | 860 tuples/sec | 957.66 tuples/sec | -5.9% | -15.5% |
| bs=100 sw=10 sl=64 | MB/s | 0.494 MB/s | 0.525 MB/s | 0.585 MB/s | -5.9% | -15.5% |
| bs=100 sw=10 sl=64 | p50 | 122,134 us | 114,435 us | 103,982 us | +6.7% | +17.5% |
| bs=100 sw=10 sl=64 | p95 | 140,551 us | 132,053 us | 110,583 us | +6.4% | +27.1% |
| bs=100 sw=10 sl=64 | p99 | 140,551 us | 132,053 us | 118,369 us | +6.4% | +18.7% |
| bs=1000 sw=10 sl=64 | throughput | 941 tuples/sec | 936 tuples/sec | 979.6 tuples/sec | +0.5% | -3.9% |
| bs=1000 sw=10 sl=64 | MB/s | 0.575 MB/s | 0.571 MB/s | 0.598 MB/s | +0.7% | -3.8% |
| bs=1000 sw=10 sl=64 | p50 | 1,062,243 us | 1,070,452 us | 1,024,553 us | -0.8% | +3.7% |
| bs=1000 sw=10 sl=64 | p95 | 1,107,215 us | 1,115,452 us | 1,063,789 us | -0.7% | +4.1% |
| bs=1000 sw=10 sl=64 | p99 | 1,107,215 us | 1,115,452 us | 1,096,239 us | -0.7% | +1.0% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,511.44,200,128000,391,0.239,25144.48,35671.08,35671.08
1,100,10,64,20,2472.42,2000,1280000,809,0.494,122133.94,140550.88,140550.88
2,1000,10,64,20,21246.30,20000,12800000,941,0.575,1062242.89,1107214.64,1107214.64There was a problem hiding this comment.
Pull request overview
This PR updates Amber’s cross-region State materialization wire format from a single content column to a 4-column schema (content, loop_counter, loop_start_id, loop_start_state_uri) so loop bookkeeping is carried as first-class columns rather than embedded in the user state JSON. It touches both Scala and Python runtimes plus tests to keep the change dormant for non-loop workflows via default values.
Changes:
- Extend Scala/Python
Stateschemas andtoTuple/to_tuplewriters to emit the 4-column state tuple with defaults for non-loop callers. - Update Python materialization reader and Python network sender/receiver to read/write the new loop bookkeeping columns on
StateFrame. - Add/adjust Scala + Python tests to pin Arrow vector round-trips and state materialization behavior.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala | Expands State.schema to 4 columns and updates toTuple to write loop bookkeeping columns with defaults. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala | Updates state tuple tests to align with the new toTuple() signature and schema. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala | Adds a data-level Arrow encode/decode round-trip test to ensure multi-column State tuples survive Arrow vector conversion. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala | Updates call sites to toTuple() in state-document tests (but some test semantics still use reserved loop keys in JSON). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala | Updates Scala state persistence path to call state.toTuple(). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala | Updates Scala→Python Arrow send path to call state.toTuple(). |
| amber/src/main/python/core/models/state.py | Expands Python State schema and adds loop-bookkeeping parameters to to_tuple(...). |
| amber/src/main/python/core/models/payload.py | Extends Python StateFrame envelope with loop bookkeeping fields (defaulting to no-loop values). |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Threads loop bookkeeping through save_state_to_storage_if_needed(...) and emit_state(...) and refactors writer setup/close logic. |
| amber/src/main/python/core/runnables/network_sender.py | Serializes StateFrame as a 4-column Arrow table (content + loop bookkeeping columns). |
| amber/src/main/python/core/runnables/network_receiver.py | Deserializes StateFrame from a 4-column Arrow table and populates loop bookkeeping fields. |
| amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py | Reads loop bookkeeping columns from the state table rows and carries them on emitted StateFrames. |
| amber/src/test/python/core/models/test_state.py | Updates tests to assert the expanded schema and that loop bookkeeping does not leak into the content JSON. |
| amber/src/test/python/core/runnables/test_network_receiver.py | Updates unit test to verify non-zero loop_counter survives Python sender→receiver serialization. |
| amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py | Updates reader test to assert loop bookkeeping values are carried on emitted StateFrames. |
| amber/src/test/python/core/architecture/packaging/test_output_manager.py | Updates tests for new signature and adds a dormancy test ensuring omitted loop args default to 0/empty strings. |
| amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py | Updates e2e state materialization test to use sqlite-backed catalog and assert loop_counter column round-trip. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address review (Copilot): the materialization round-trip tests used
"loop_counter" as a user-state key (landing in the content JSON), which is
misleading now that loop bookkeeping is a dedicated column. Rename the user key
to "i" and write/assert the loop columns via toTuple(loopCounter = ...) +
row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching
how StateSpec/test_state were updated.
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
I reviewed the part of the loop-split change that handles the state format. The main logic looks correct:
- State only lives within a single execution, so no pre-existing single-column state can reach the new four-column reader. I checked this across the restart, fault-recovery, and result-reuse paths.
- The non-loop defaults keep existing behavior unchanged.
- The Python and Scala four-column schemas agree on names, order, and types.
The comments below are about test coverage and two design points. I'm not recording an approve / request-changes decision.
One accuracy note on the description: it lists only to_tuple/toTuple and two OutputManager methods, but the change also adds the three fields to the Python StateFrame and widens the wire format to four columns. Behavior is unchanged; just worth stating accurately.
| # that need these values read the corresponding columns off the tuple. | ||
| LOOP_COUNTER = "loop_counter" | ||
| LOOP_START_ID = "loop_start_id" | ||
| LOOP_START_STATE_URI = "loop_start_state_uri" |
There was a problem hiding this comment.
Design question, not a request: does the saved state need to carry the storage URI, or is loop_start_id enough for the controller to resolve where Loop End writes the next iteration? Raising it partly because it looks different from what we discussed offline.
There was a problem hiding this comment.
Good question — and you're right it differs from the offline sketch. As currently wired in the loop branch, the URI isn't derivable from loop_start_id alone:
loop_start_idis just LoopStart's logical op id — it's whatJumpToOperatorRegionRequest(OperatorIdentity(loop_start_id))jumps to.loop_start_state_uriisVFSURIFactory.state_uri(readers[0].uri)— the physical iceberg URI of LoopStart's input port, which also encodes the execution id, the physical layer name, and which input port/reader. LoopEnd writes the next iteration's state straight into that URI, so it needs the full path, not just the logical id.
Basically, the LOOP_START_STATE_URI is the storage URI for the operator right before the LoopStart.
There was a problem hiding this comment.
Thanks — makes sense. I'll take this up in the loop PR: since the controller allocates the port URIs and owns the topology, it can likely resolve LoopEnd's write location from loop_start_id and keep the URI out of the saved State. Noted as a follow-up for that PR; I'll weigh it against the extra worker↔controller round-trip it implies. Leaving the format as-is here per your note.
🤖 Addressed by Claude Code
There was a problem hiding this comment.
To clarify the question: it isn't whether loop_start_id can encode the URI (agreed it can't — the URI is LoopStart's input-port path). It's whether that write location should be in State at all.
The reason: the loop-back target is the same every iteration — same LoopStart, same input port, same execution. So it's loop config that happens to be constant, not per-iteration data, and State is the per-iteration payload.
A few alternatives, your call on which fits:
- Give it to LoopEnd at setup, the way workers already receive their port storage URIs from the scheduler (
set_up_port_storage_writer/ the mat-reader setup). LoopEnd still does the write, but holds the address as config instead of in every State row, and it never gets persisted to iceberg. - Let the controller resolve it at jump time. It already gets
jump_to_operator_region(loop_start_id), and it owns the topology and allocated the URIs, so it can resolve the target from the id and either do the write or hand the URI back to LoopEnd. - Smaller option: carry it on the in-memory
StateFrameonly, never in the persisted columns.
Not blocking this PR — for the loop PR. Which fits best, or is there a constraint I'm missing?
There was a problem hiding this comment.
Dug through the loop branch to answer this properly. Short version: agreed it's constant config, and I'd go with (1) hand it to LoopEnd at setup, which then lets us drop the persisted column as the cleanup it unlocks (your (3)). I'd skip (2).
One wrinkle worth flagging first — (3) "in-memory StateFrame only" isn't where we are today. LoopStart and LoopEnd are separate workers in separate regions, and the only thing carrying the URI across that boundary to LoopEnd is the iceberg mat-reader rehydrating the StateFrame from the persisted loop_start_state_uri column (input_port_materialization_reader_runnable.py:161-168; from_tuple reads only content, so the loop fields come straight from the columns). At the write instant LoopEnd does read it from the in-memory self._loop_start_state_uri (main_loop.py:139), but that field was populated from frame.loop_start_state_uri (:406), whose value came off the persisted column. So dropping the column with nothing in its place hands LoopEnd the empty default across the region boundary — (3) needs a replacement carrier; it can't stand alone.
That's exactly why (1) fits best: the URI is fully resolvable from topology at schedule time — it's stateURI(LoopStart's input-port base), and that base is the upstream output port's storageURIBase the scheduler already allocates and looks up by GlobalPortIdentity (CostBasedScheduleGenerator.scala:244-257). set_up_port_storage_writer already derives state_uri(base) the exact same way LoopStart does at runtime (output_manager.py:160-169 vs main_loop.py:129). Inject it at setup and both the persisted column and the StateFrame field go away — fully realizing "config, not per-iteration data."
The honest cost of (1): (a) AssignPort today only carries a worker's own port URIs, so handing LoopEnd a sibling's (LoopStart's input) URI needs a new field / dedicated setup message; (b) the scheduler has to know the LoopStart↔LoopEnd pairing at schedule time — today that's only known at runtime via loop_start_id on the frame.
On (2): the controller can resolve the URI from loop_start_id (it holds the plan + allocated port URIs and already calls VFSURIFactory.stateURI — RegionExecutionCoordinator.scala:572-574), so that half is fine. But the "controller already does the write or hands it back" premise isn't true today — JumpToOperatorRegionHandler only re-points the schedule level and returns EmptyReturn, and the back-edge write is worker-side (main_loop.py:146-150). Making the controller perform the write is a brand-new cross-language storage write (plus shipping the state payload controller-ward), and the "return the URI in the jump reply" sub-variant collapses to (3)'s effect while adding a round-trip on the hot jump path — so it's dominated by (1).
Net: (1) for the durable fix, (3) as the cleanup it enables, skip (2). I'll carry this into the loop PR — and you weren't missing a constraint so much as (3) being further along than it actually is: the cross-region hop is what keeps the URI on the persisted row today.
🤖 Addressed by Claude Code
…ip tests Address review feedback on apache#5900: - Add `State.to_columns`, the single column-name -> value mapping for the State wire/storage format, and route both `to_tuple` (iceberg) and the network sender's StateFrame branch through it, so adding a column is a one-line change rather than an edit in every serializer. - e2e materialization test and StateSpec now round-trip all three loop columns (loop_counter, loop_start_id, loop_start_state_uri) with non-default values, not just loop_counter, so a regression in any single column's plumbing is caught. - Document why the e2e deliberately uses a hermetic sqlite catalog while the other iceberg tests use postgres/REST.
There was a problem hiding this comment.
Approving. I verified the four code fixes are in 74eef1d / db66a7f: the e2e now round-trips all three loop columns through real storage with non-default values, StateSpec asserts them via getField, the column mapping now lives in one place (State.to_columns), and the sqlite-catalog difference is documented. The state-format change is correct and isn't active on main yet.
One thing is still open, though: whether loop_start_state_uri belongs in State (see the thread on state.py). content, loop_counter, and loop_start_id aren't in question, but the URI is a write address that stays the same across the whole loop. If that thread lands on 'not in State', it's better to drop the column from this PR than to ship it now and remove it later.
What changes were proposed in this PR?
Extends the cross-region State materialization format from a single
contentcolumn to 4 columns —content,loop_counter,loop_start_id,loop_start_state_uri— promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: theOutputManagerstate writer +emit_state, the Python network sender/receiver, the materialization reader, and the Scalastate.toTuplecall sites. In memory the three loop fields ride on theStateFrameenvelope; they are materialized/serialized as their own columns (parallel tocontent), andfrom_tuple/fromTupleread onlycontentback into theState.On the Python side the column-name → value mapping is defined once in
State.to_columnsand reused by bothto_tuple(iceberg) and the network sender'sStateFramebranch, so adding a column later is a single-line change in one place rather than an edit in every serializer.Dormant on
main— nothing observable changes without the loop operators:to_tuple()/toTuple()andOutputManager.save_state_to_storage_if_needed/emit_statedefault the three loop columns to0/"", so every existing non-loop caller is unchanged.from_tuple/fromTupleread only thecontentcolumn, so round-trip identity is preserved and the extra columns are inert.No backward-compatible read of old 1-column State is needed: State materialization is intra-execution only — the iceberg state-document URI is execution-scoped (
…/eid/{executionId}/) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 4-column reader.This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR).
Any related issues, documentation, discussions?
Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop").
How was this PR tested?
test_state.py(loop columns are their own columns, never in content JSON, default to0/""), ScalaStateSpec(all three loop columns round-trip through a tuple with non-default values, not justcontent),ArrowUtilsSpec(4-column Arrow vector round-trip),IcebergDocumentSpec(iceberg state-doc round-trip).test_network_receiver.py,test_input_port_materialization_reader_runnable.py, andtest_state_materialization_e2e.py— the e2e (hermetic sqlite catalog) writes non-default values for all three loop columns end-to-end and asserts they replay both on theStateFrameand on the raw iceberg row, exercising the real Tuple/Schema/iceberg path.test_output_manager.py::test_defaults_loop_columns_when_omittedpins that a no-loop caller (noloop_counter) still produces a valid 4-column tuple with the loop columns at0/"".workflow-core+ambercompile;StateSpec+ArrowUtilsSpecpass; Python state + transport + e2e tests pass; scalafmt + scalafix + black clean. (IcebergDocumentSpecneeds the iceberg catalog backend, so it runs in CI.)Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.