Skip to content

feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900

Open
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:state-format-loop-columns
Open

feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:state-format-loop-columns

Conversation

@aglinxinyuan

@aglinxinyuan aglinxinyuan commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Extends the cross-region State materialization format from a single content column to 4 columnscontent, loop_counter, loop_start_id, loop_start_state_uri — promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: the OutputManager state writer + emit_state, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites. In memory the three loop fields ride on the StateFrame envelope; they are materialized/serialized as their own columns (parallel to content), and from_tuple / fromTuple read only content back into the State.

On the Python side the column-name → value mapping is defined once in State.to_columns and reused by both to_tuple (iceberg) and the network sender's StateFrame branch, so adding a column later is a single-line change in one place rather than an edit in every serializer.

Dormant on main — nothing observable changes without the loop operators:

  • to_tuple() / toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the three loop columns to 0 / "", so every existing non-loop caller is unchanged.
  • from_tuple / fromTuple read only the content column, so round-trip identity is preserved and the extra columns are inert.

No backward-compatible read of old 1-column State is needed: State materialization is intra-execution only — the iceberg state-document URI is execution-scoped (…/eid/{executionId}/) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 4-column reader.

This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR).

Any related issues, documentation, discussions?

Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop").

How was this PR tested?

  • Format / round-trip: test_state.py (loop columns are their own columns, never in content JSON, default to 0 / ""), Scala StateSpec (all three loop columns round-trip through a tuple with non-default values, not just content), ArrowUtilsSpec (4-column Arrow vector round-trip), IcebergDocumentSpec (iceberg state-doc round-trip).
  • Transport: test_network_receiver.py, test_input_port_materialization_reader_runnable.py, and test_state_materialization_e2e.py — the e2e (hermetic sqlite catalog) writes non-default values for all three loop columns end-to-end and asserts they replay both on the StateFrame and on the raw iceberg row, exercising the real Tuple/Schema/iceberg path.
  • Dormancy: new test_output_manager.py::test_defaults_loop_columns_when_omitted pins that a no-loop caller (no loop_counter) still produces a valid 4-column tuple with the loop columns at 0 / "".
  • Local: workflow-core + amber compile; StateSpec + ArrowUtilsSpec pass; Python state + transport + e2e tests pass; scalafmt + scalafix + black clean. (IcebergDocumentSpec needs the iceberg catalog backend, so it runs in CI.)

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.

…ant)

Extend the cross-region State materialization format from 1 column (content)
to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri.
The loop bookkeeping is promoted to first-class columns (never inside the
content JSON), and the transport carries them: OutputManager state writer +
emit, the Python network sender/receiver, the materialization reader, and the
Scala state.toTuple call sites.

Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed
/ emit_state default the loop columns to 0/"", so every existing non-loop caller
is unchanged, and fromTuple/from_tuple read only the content column. The columns
activate only once the loop operators set them (follow-up PR). State
materialization is intra-execution (execution-scoped iceberg URI, recreated
fresh each run), so no backward-compatible read of old 1-column data is needed.

Extracted from apache#5700 (loop operators); part of apache#4442.
Copilot AI review requested due to automatic review settings June 23, 2026 01:35
@github-actions

Copy link
Copy Markdown
Contributor

Automated Reviewer Suggestions

Based on the git blame history of the changed files, we recommend the following reviewers:

  • Contributors with relevant context: @Yicong-Huang, @Ma77Ball
    You can notify them by mentioning @Yicong-Huang, @Ma77Ball in a comment.

@codecov-commenter

codecov-commenter commented Jun 23, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 95.83333% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 55.02%. Comparing base (8803d08) to head (74eef1d).
⚠️ Report is 41 commits behind head on main.

Files with missing lines Patch % Lines
...ne/architecture/messaginglayer/OutputManager.scala 0.00% 1 Missing ⚠️
.../architecture/pythonworker/PythonProxyClient.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #5900      +/-   ##
============================================
+ Coverage     54.11%   55.02%   +0.91%     
- Complexity     2819     2988     +169     
============================================
  Files          1103     1111       +8     
  Lines         42650    42925     +275     
  Branches       4588     4623      +35     
============================================
+ Hits          23079    23621     +542     
+ Misses        18226    17916     -310     
- Partials       1345     1388      +43     
Flag Coverage Δ *Carryforward flag
access-control-service 70.00% <ø> (-0.45%) ⬇️
agent-service 34.36% <ø> (ø) Carriedforward from 7d5ab80
amber 57.84% <86.66%> (+2.20%) ⬆️
computing-unit-managing-service 0.00% <ø> (-1.66%) ⬇️
config-service 51.56% <ø> (-5.80%) ⬇️
file-service 59.02% <ø> (+0.42%) ⬆️
frontend 48.09% <ø> (-0.03%) ⬇️ Carriedforward from 7d5ab80
notebook-migration-service 78.57% <ø> (?)
pyamber 91.15% <100.00%> (+0.95%) ⬆️
python 90.69% <ø> (-0.08%) ⬇️ Carriedforward from 7d5ab80
workflow-compiling-service 55.14% <ø> (-3.55%) ⬇️

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

github-actions Bot commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 0 better · 🔴 10 worse · ⚪ 5 noise (<±5%) · 0 without baseline

Compared against main 8803d08 benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 391 0.239 25,144/35,671/35,671 us 🔴 +14.1% / 🔴 +132.6%
🔴 bs=100 sw=10 sl=64 809 0.494 122,134/140,551/140,551 us 🔴 +6.7% / 🔴 +27.1%
bs=1000 sw=10 sl=64 941 0.575 1,062,243/1,107,215/1,107,215 us ⚪ within ±5% / ⚪ within ±5%
Baseline details

Latest main 8803d08 from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 391 tuples/sec 443 tuples/sec 757.16 tuples/sec -11.7% -48.4%
bs=10 sw=10 sl=64 MB/s 0.239 MB/s 0.271 MB/s 0.462 MB/s -11.8% -48.3%
bs=10 sw=10 sl=64 p50 25,144 us 22,045 us 12,971 us +14.1% +93.8%
bs=10 sw=10 sl=64 p95 35,671 us 32,753 us 15,333 us +8.9% +132.6%
bs=10 sw=10 sl=64 p99 35,671 us 32,753 us 18,732 us +8.9% +90.4%
bs=100 sw=10 sl=64 throughput 809 tuples/sec 860 tuples/sec 957.66 tuples/sec -5.9% -15.5%
bs=100 sw=10 sl=64 MB/s 0.494 MB/s 0.525 MB/s 0.585 MB/s -5.9% -15.5%
bs=100 sw=10 sl=64 p50 122,134 us 114,435 us 103,982 us +6.7% +17.5%
bs=100 sw=10 sl=64 p95 140,551 us 132,053 us 110,583 us +6.4% +27.1%
bs=100 sw=10 sl=64 p99 140,551 us 132,053 us 118,369 us +6.4% +18.7%
bs=1000 sw=10 sl=64 throughput 941 tuples/sec 936 tuples/sec 979.6 tuples/sec +0.5% -3.9%
bs=1000 sw=10 sl=64 MB/s 0.575 MB/s 0.571 MB/s 0.598 MB/s +0.7% -3.8%
bs=1000 sw=10 sl=64 p50 1,062,243 us 1,070,452 us 1,024,553 us -0.8% +3.7%
bs=1000 sw=10 sl=64 p95 1,107,215 us 1,115,452 us 1,063,789 us -0.7% +4.1%
bs=1000 sw=10 sl=64 p99 1,107,215 us 1,115,452 us 1,096,239 us -0.7% +1.0%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,511.44,200,128000,391,0.239,25144.48,35671.08,35671.08
1,100,10,64,20,2472.42,2000,1280000,809,0.494,122133.94,140550.88,140550.88
2,1000,10,64,20,21246.30,20000,12800000,941,0.575,1062242.89,1107214.64,1107214.64

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates Amber’s cross-region State materialization wire format from a single content column to a 4-column schema (content, loop_counter, loop_start_id, loop_start_state_uri) so loop bookkeeping is carried as first-class columns rather than embedded in the user state JSON. It touches both Scala and Python runtimes plus tests to keep the change dormant for non-loop workflows via default values.

Changes:

  • Extend Scala/Python State schemas and toTuple/to_tuple writers to emit the 4-column state tuple with defaults for non-loop callers.
  • Update Python materialization reader and Python network sender/receiver to read/write the new loop bookkeeping columns on StateFrame.
  • Add/adjust Scala + Python tests to pin Arrow vector round-trips and state materialization behavior.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala Expands State.schema to 4 columns and updates toTuple to write loop bookkeeping columns with defaults.
common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala Updates state tuple tests to align with the new toTuple() signature and schema.
common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala Adds a data-level Arrow encode/decode round-trip test to ensure multi-column State tuples survive Arrow vector conversion.
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala Updates call sites to toTuple() in state-document tests (but some test semantics still use reserved loop keys in JSON).
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala Updates Scala state persistence path to call state.toTuple().
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala Updates Scala→Python Arrow send path to call state.toTuple().
amber/src/main/python/core/models/state.py Expands Python State schema and adds loop-bookkeeping parameters to to_tuple(...).
amber/src/main/python/core/models/payload.py Extends Python StateFrame envelope with loop bookkeeping fields (defaulting to no-loop values).
amber/src/main/python/core/architecture/packaging/output_manager.py Threads loop bookkeeping through save_state_to_storage_if_needed(...) and emit_state(...) and refactors writer setup/close logic.
amber/src/main/python/core/runnables/network_sender.py Serializes StateFrame as a 4-column Arrow table (content + loop bookkeeping columns).
amber/src/main/python/core/runnables/network_receiver.py Deserializes StateFrame from a 4-column Arrow table and populates loop bookkeeping fields.
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py Reads loop bookkeeping columns from the state table rows and carries them on emitted StateFrames.
amber/src/test/python/core/models/test_state.py Updates tests to assert the expanded schema and that loop bookkeeping does not leak into the content JSON.
amber/src/test/python/core/runnables/test_network_receiver.py Updates unit test to verify non-zero loop_counter survives Python sender→receiver serialization.
amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py Updates reader test to assert loop bookkeeping values are carried on emitted StateFrames.
amber/src/test/python/core/architecture/packaging/test_output_manager.py Updates tests for new signature and adds a dormancy test ensuring omitted loop args default to 0/empty strings.
amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py Updates e2e state materialization test to use sqlite-backed catalog and assert loop_counter column round-trip.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/runnables/network_sender.py Outdated
Comment thread amber/src/main/python/core/runnables/network_receiver.py
Address review (Copilot): the materialization round-trip tests used
"loop_counter" as a user-state key (landing in the content JSON), which is
misleading now that loop bookkeeping is a dedicated column. Rename the user key
to "i" and write/assert the loop columns via toTuple(loopCounter = ...) +
row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching
how StateSpec/test_state were updated.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 23, 2026

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the part of the loop-split change that handles the state format. The main logic looks correct:

  • State only lives within a single execution, so no pre-existing single-column state can reach the new four-column reader. I checked this across the restart, fault-recovery, and result-reuse paths.
  • The non-loop defaults keep existing behavior unchanged.
  • The Python and Scala four-column schemas agree on names, order, and types.

The comments below are about test coverage and two design points. I'm not recording an approve / request-changes decision.

One accuracy note on the description: it lists only to_tuple/toTuple and two OutputManager methods, but the change also adds the three fields to the Python StateFrame and widens the wire format to four columns. Behavior is unchanged; just worth stating accurately.

Comment thread amber/src/main/python/core/models/state.py Outdated
# that need these values read the corresponding columns off the tuple.
LOOP_COUNTER = "loop_counter"
LOOP_START_ID = "loop_start_id"
LOOP_START_STATE_URI = "loop_start_state_uri"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design question, not a request: does the saved state need to carry the storage URI, or is loop_start_id enough for the controller to resolve where Loop End writes the next iteration? Raising it partly because it looks different from what we discussed offline.

@aglinxinyuan aglinxinyuan Jun 28, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question — and you're right it differs from the offline sketch. As currently wired in the loop branch, the URI isn't derivable from loop_start_id alone:

  • loop_start_id is just LoopStart's logical op id — it's what JumpToOperatorRegionRequest(OperatorIdentity(loop_start_id)) jumps to.
  • loop_start_state_uri is VFSURIFactory.state_uri(readers[0].uri) — the physical iceberg URI of LoopStart's input port, which also encodes the execution id, the physical layer name, and which input port/reader. LoopEnd writes the next iteration's state straight into that URI, so it needs the full path, not just the logical id.

Basically, the LOOP_START_STATE_URI is the storage URI for the operator right before the LoopStart.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — makes sense. I'll take this up in the loop PR: since the controller allocates the port URIs and owns the topology, it can likely resolve LoopEnd's write location from loop_start_id and keep the URI out of the saved State. Noted as a follow-up for that PR; I'll weigh it against the extra worker↔controller round-trip it implies. Leaving the format as-is here per your note.

🤖 Addressed by Claude Code

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify the question: it isn't whether loop_start_id can encode the URI (agreed it can't — the URI is LoopStart's input-port path). It's whether that write location should be in State at all.

The reason: the loop-back target is the same every iteration — same LoopStart, same input port, same execution. So it's loop config that happens to be constant, not per-iteration data, and State is the per-iteration payload.

A few alternatives, your call on which fits:

  1. Give it to LoopEnd at setup, the way workers already receive their port storage URIs from the scheduler (set_up_port_storage_writer / the mat-reader setup). LoopEnd still does the write, but holds the address as config instead of in every State row, and it never gets persisted to iceberg.
  2. Let the controller resolve it at jump time. It already gets jump_to_operator_region(loop_start_id), and it owns the topology and allocated the URIs, so it can resolve the target from the id and either do the write or hand the URI back to LoopEnd.
  3. Smaller option: carry it on the in-memory StateFrame only, never in the persisted columns.

Not blocking this PR — for the loop PR. Which fits best, or is there a constraint I'm missing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dug through the loop branch to answer this properly. Short version: agreed it's constant config, and I'd go with (1) hand it to LoopEnd at setup, which then lets us drop the persisted column as the cleanup it unlocks (your (3)). I'd skip (2).

One wrinkle worth flagging first — (3) "in-memory StateFrame only" isn't where we are today. LoopStart and LoopEnd are separate workers in separate regions, and the only thing carrying the URI across that boundary to LoopEnd is the iceberg mat-reader rehydrating the StateFrame from the persisted loop_start_state_uri column (input_port_materialization_reader_runnable.py:161-168; from_tuple reads only content, so the loop fields come straight from the columns). At the write instant LoopEnd does read it from the in-memory self._loop_start_state_uri (main_loop.py:139), but that field was populated from frame.loop_start_state_uri (:406), whose value came off the persisted column. So dropping the column with nothing in its place hands LoopEnd the empty default across the region boundary — (3) needs a replacement carrier; it can't stand alone.

That's exactly why (1) fits best: the URI is fully resolvable from topology at schedule time — it's stateURI(LoopStart's input-port base), and that base is the upstream output port's storageURIBase the scheduler already allocates and looks up by GlobalPortIdentity (CostBasedScheduleGenerator.scala:244-257). set_up_port_storage_writer already derives state_uri(base) the exact same way LoopStart does at runtime (output_manager.py:160-169 vs main_loop.py:129). Inject it at setup and both the persisted column and the StateFrame field go away — fully realizing "config, not per-iteration data."

The honest cost of (1): (a) AssignPort today only carries a worker's own port URIs, so handing LoopEnd a sibling's (LoopStart's input) URI needs a new field / dedicated setup message; (b) the scheduler has to know the LoopStart↔LoopEnd pairing at schedule time — today that's only known at runtime via loop_start_id on the frame.

On (2): the controller can resolve the URI from loop_start_id (it holds the plan + allocated port URIs and already calls VFSURIFactory.stateURIRegionExecutionCoordinator.scala:572-574), so that half is fine. But the "controller already does the write or hands it back" premise isn't true today — JumpToOperatorRegionHandler only re-points the schedule level and returns EmptyReturn, and the back-edge write is worker-side (main_loop.py:146-150). Making the controller perform the write is a brand-new cross-language storage write (plus shipping the state payload controller-ward), and the "return the URI in the jump reply" sub-variant collapses to (3)'s effect while adding a round-trip on the hot jump path — so it's dominated by (1).

Net: (1) for the durable fix, (3) as the cleanup it enables, skip (2). I'll carry this into the loop PR — and you weren't missing a constraint so much as (3) being further along than it actually is: the cross-region hop is what keeps the URI on the persisted row today.

🤖 Addressed by Claude Code

…ip tests

Address review feedback on apache#5900:

- Add `State.to_columns`, the single column-name -> value mapping for the
  State wire/storage format, and route both `to_tuple` (iceberg) and the
  network sender's StateFrame branch through it, so adding a column is a
  one-line change rather than an edit in every serializer.
- e2e materialization test and StateSpec now round-trip all three loop
  columns (loop_counter, loop_start_id, loop_start_state_uri) with
  non-default values, not just loop_counter, so a regression in any single
  column's plumbing is caught.
- Document why the e2e deliberately uses a hermetic sqlite catalog while
  the other iceberg tests use postgres/REST.

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving. I verified the four code fixes are in 74eef1d / db66a7f: the e2e now round-trips all three loop columns through real storage with non-default values, StateSpec asserts them via getField, the column mapping now lives in one place (State.to_columns), and the sqlite-catalog difference is documented. The state-format change is correct and isn't active on main yet.

One thing is still open, though: whether loop_start_state_uri belongs in State (see the thread on state.py). content, loop_counter, and loop_start_id aren't in question, but the URI is a write address that stays the same across the whole loop. If that thread lands on 'not in State', it's better to drop the column from this PR than to ship it now and remove it later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants