Skip to content

fix(amber): order worker state by version, not timestamp#6011

Open
Yicong-Huang wants to merge 3 commits into
apache:mainfrom
Yicong-Huang:fix/worker-state-causal-order
Open

fix(amber): order worker state by version, not timestamp#6011
Yicong-Huang wants to merge 3 commits into
apache:mainfrom
Yicong-Huang:fix/worker-state-causal-order

Conversation

@Yicong-Huang

@Yicong-Huang Yicong-Huang commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

A fast source operator stayed orange (RUNNING) in the editor after the run finished (issue #6010).

Why it's a bug. The controller reconstructs each worker's state from several unordered channels (source RUNNING via the startWorker response, non-source RUNNING via workerStateUpdated, COMPLETED/PAUSED via queryStatistics/pauseWorker responses) and reconciled them with last-System.nanoTime()-wins in WorkerExecution. Worker state, however, is single-writer and strictly ordered causally. For a tiny source the run finishes almost instantly, so the startWorker response — carrying the stale RUNNING it sampled at launch — can reach the controller after COMPLETED was recorded; its later receipt timestamp wins and clobbers COMPLETED. Results render fine (separate path), so only the border is stuck.

This PR orders worker state causally instead of by wall clock:

Before After
WorkerExecution.update(nanoTime, state) — last-write-wins by receipt time updateState(version, state) — newest logical version wins
stale late RUNNING clobbers COMPLETED RUNNING (v2) < COMPLETED (v3) ⇒ COMPLETED kept
terminal states (COMPLETED/TERMINATED) are absorbing
  • WorkerStateManager bumps a monotonic stateVersion on every transitTo (its state-machine logical clock).
  • The version rides on every state report: WorkerStateResponse, WorkerStateUpdatedRequest, WorkerMetrics (3 new proto fields).
  • The controller applies a state only when its version is newer. Stats stay timestamp-ordered (monotonic snapshots can share a state version).
  • A single per-worker counter ⇒ no cross-process clock-sync assumptions (why not worker timestamps).
  • pyamber parity: the Python worker also reports state (start/pause/resume + query-statistics). Its StateManager now bumps the same monotonic version and the four handlers include it; otherwise version-0 reports would be dropped after the first and reconfiguration (RUNNING→PAUSED→RUNNING) would hang.

Any related issues, documentation, discussions?

Closes #6010. The timestamp-based update was introduced in #3557.

How was this PR tested?

JDK 17. Scala unit + Scala/Python integration + Python unit:

sbt "WorkflowExecutionService/testOnly \
  *WorkerExecutionSpec *WorkerStateManagerSpec *OperatorExecutionSpec \
  *RegionExecutionCoordinatorSpec *WorkflowExecutionCoordinatorSpec \
  *RegionExecutionSpec *WorkflowExecutionSpec"

# Scala-Python reconfiguration end-to-end (spawns Python UDF workers):
sbt "WorkflowExecutionService/testOnly *ReconfigurationIntegrationSpec"   # 3 passed

# Python unit:
cd amber && pytest src/test/python/core/architecture/managers/test_state_manager.py   # 9 passed
  • WorkerExecutionSpec: version ordering (positive + stale/equal-version negatives), terminal-state absorption, forceTerminate, independent stats-vs-state ordering, and a named regression for Fast source operator stays orange (RUNNING) after the workflow completes #6010 (COMPLETED survives a late RUNNING). Verified the regression goes red when updateState is reverted to last-write-wins.
  • WorkerStateManagerSpec / test_state_manager.py: version starts at 0, bumps per successful transition, no bump on no-op self-transition or rejected transition.
  • ReconfigurationIntegrationSpec reproduced the failure (timeout) before the pyamber fix and passes after.

Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.8 (1M context), via Claude Code

Worker state was reconciled at the controller by last-nanoTime-wins.
A fast source's startWorker response carries a stale RUNNING snapshot
that can arrive after COMPLETED was recorded, clobbering it and leaving
the operator stuck orange.

Order state causally instead: WorkerStateManager now stamps every
transition with a monotonic per-worker version, carried on all state
reports (WorkerStateResponse, WorkerStateUpdatedRequest, WorkerMetrics).
The controller applies a state only if its version is newer, and treats
COMPLETED/TERMINATED as absorbing. Stats keep timestamp ordering.

Closes apache#6010
@github-actions

Copy link
Copy Markdown
Contributor

Automated Reviewer Suggestions

Based on the git blame history of the changed files, we recommend the following reviewers:

  • Contributors with relevant context: @aglinxinyuan, @kunwp1, @Ma77Ball
    You can notify them by mentioning @aglinxinyuan, @kunwp1, @Ma77Ball in a comment.

@codecov-commenter

codecov-commenter commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 93.54839% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.65%. Comparing base (e5b928f) to head (ee1411a).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
...hitecture/handlers/control/start_worker_handler.py 0.00% 1 Missing ⚠️
...ecture/deploysemantics/layer/WorkerExecution.scala 91.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #6011      +/-   ##
============================================
- Coverage     56.67%   56.65%   -0.02%     
- Complexity     3030     3042      +12     
============================================
  Files          1124     1124              
  Lines         43298    43380      +82     
  Branches       4667     4673       +6     
============================================
+ Hits          24539    24577      +38     
- Misses        17319    17362      +43     
- Partials       1440     1441       +1     
Flag Coverage Δ *Carryforward flag
access-control-service 70.00% <ø> (ø) Carriedforward from af62bb2
agent-service 44.59% <ø> (ø) Carriedforward from af62bb2
amber 58.55% <95.83%> (-0.08%) ⬇️
computing-unit-managing-service 0.00% <ø> (ø) Carriedforward from af62bb2
config-service 52.30% <ø> (ø) Carriedforward from af62bb2
file-service 62.81% <ø> (ø) Carriedforward from af62bb2
frontend 49.45% <ø> (ø) Carriedforward from af62bb2
notebook-migration-service 78.57% <ø> (ø) Carriedforward from af62bb2
pyamber 90.23% <85.71%> (+0.03%) ⬆️
python 90.78% <ø> (+0.01%) ⬆️ Carriedforward from af62bb2
workflow-compiling-service 55.14% <ø> (ø) Carriedforward from af62bb2

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

github-actions Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

✅ No material benchmark regressions detected

🟢 2 better · 🔴 0 worse · ⚪ 13 noise (<±5%) · 0 without baseline

Compared against main e5b928f benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
bs=10 sw=10 sl=64 429 0.262 22,008/41,101/41,101 us ⚪ within ±5% / 🔴 +172.7%
🟢 bs=100 sw=10 sl=64 943 0.576 103,986/132,673/132,673 us 🟢 -18.6% / 🔴 +21.9%
bs=1000 sw=10 sl=64 1,090 0.665 918,484/1,031,370/1,031,370 us ⚪ within ±5% / 🟢 +8.6%
Baseline details

Latest main e5b928f from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 429 tuples/sec 444 tuples/sec 770.82 tuples/sec -3.4% -44.3%
bs=10 sw=10 sl=64 MB/s 0.262 MB/s 0.271 MB/s 0.47 MB/s -3.3% -44.3%
bs=10 sw=10 sl=64 p50 22,008 us 21,033 us 12,723 us +4.6% +73.0%
bs=10 sw=10 sl=64 p95 41,101 us 40,826 us 15,070 us +0.7% +172.7%
bs=10 sw=10 sl=64 p99 41,101 us 40,826 us 18,429 us +0.7% +123.0%
bs=100 sw=10 sl=64 throughput 943 tuples/sec 918 tuples/sec 973.75 tuples/sec +2.7% -3.2%
bs=100 sw=10 sl=64 MB/s 0.576 MB/s 0.56 MB/s 0.594 MB/s +2.9% -3.1%
bs=100 sw=10 sl=64 p50 103,986 us 103,381 us 102,519 us +0.6% +1.4%
bs=100 sw=10 sl=64 p95 132,673 us 163,015 us 108,855 us -18.6% +21.9%
bs=100 sw=10 sl=64 p99 132,673 us 163,015 us 117,788 us -18.6% +12.6%
bs=1000 sw=10 sl=64 throughput 1,090 tuples/sec 1,126 tuples/sec 1,004 tuples/sec -3.2% +8.6%
bs=1000 sw=10 sl=64 MB/s 0.665 MB/s 0.688 MB/s 0.613 MB/s -3.3% +8.5%
bs=1000 sw=10 sl=64 p50 918,484 us 879,252 us 1,001,930 us +4.5% -8.3%
bs=1000 sw=10 sl=64 p95 1,031,370 us 986,070 us 1,042,923 us +4.6% -1.1%
bs=1000 sw=10 sl=64 p99 1,031,370 us 986,070 us 1,074,893 us +4.6% -4.0%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,466.65,200,128000,429,0.262,22008.46,41100.98,41100.98
1,100,10,64,20,2119.90,2000,1280000,943,0.576,103986.16,132672.71,132672.71
2,1000,10,64,20,18343.72,20000,12800000,1090,0.665,918483.88,1031369.58,1031369.58

The pyamber worker reports its state to the controller via start/pause/
resume responses and query-statistics metrics. Without a version these
defaulted to 0, so after the first report the controller's version gate
dropped every later state change — breaking reconfiguration, which waits
to observe RUNNING -> PAUSED -> RUNNING (ReconfigurationIntegrationSpec
timed out).

Mirror the Scala change in pyamber: StateManager bumps a monotonic
state version on each transition, and the four state-reporting handlers
include it.

@aglinxinyuan aglinxinyuan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

The pause/resume/query-statistics assertions compared full messages
against literals that defaulted state_version to 0; read the actual
version from the report (as the stats sizes already are) so the
equality holds while StateManager tests cover the version itself.
@Yicong-Huang Yicong-Huang added release/v1.2 back porting to release/v1.2 and removed release/v1.2 back porting to release/v1.2 labels Jun 29, 2026
@Yicong-Huang Yicong-Huang requested a review from Copilot June 29, 2026 05:30

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a race in Amber’s controller-side worker-state reconstruction where late-arriving RUNNING snapshots (e.g., from startWorker) could overwrite a newer terminal state (COMPLETED), causing operators to remain visually “RUNNING” after completion. The PR replaces receipt-time ordering with causal ordering via a per-worker logical stateVersion reported by the worker state machine, and makes terminal states absorbing.

Changes:

  • Introduce a monotonic per-worker stateVersion (Scala + Python) and propagate it through RPC/proto messages that report state.
  • Update controller-side reconciliation to apply worker state only when stateVersion is strictly newer; keep worker statistics ordered by timestamp independently.
  • Add/adjust Scala and Python tests to cover version ordering, stale/equal-version rejection, terminal absorption, and regression for #6010.

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated no comments.

Show a summary per file
File Description
amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/StateManager.scala Adds a monotonic stateVersion logical clock bumped on successful transitions.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala Replaces timestamp-based state updates with updateState(stateVersion, state); adds terminal-state absorption and separate updateStats.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala Applies startWorker state using stateVersion; switches termination path to forceTerminate().
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala Orders pushed state updates by stateVersion instead of controller receipt time.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala Applies resume responses using stateVersion.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala Applies pause responses using stateVersion; updates stats via updateStats.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala Splits metrics application into updateState(stateVersion, ...) and updateStats(timestamp, ...).
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartHandler.scala Includes stateVersion in WorkerStateResponse for start responses.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/ResumeHandler.scala Includes stateVersion in WorkerStateResponse for resume responses.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/PauseHandler.scala Includes stateVersion in WorkerStateResponse for pause responses.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/QueryStatisticsHandler.scala Includes stateVersion in WorkerMetrics returned by queryStatistics.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala Includes stateVersion in WorkerStateUpdatedRequest push events.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto Adds state_version to WorkerMetrics for causal state ordering.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto Adds state_version to WorkerStateResponse.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto Adds state_version to WorkerStateUpdatedRequest.
amber/src/main/python/core/architecture/managers/state_manager.py Adds a monotonic _state_version and get_state_version(); bumps on transitions.
amber/src/main/python/core/architecture/handlers/control/start_worker_handler.py Returns WorkerStateResponse with state_version.
amber/src/main/python/core/architecture/handlers/control/pause_worker_handler.py Returns WorkerStateResponse with state_version.
amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py Returns WorkerStateResponse with state_version.
amber/src/main/python/core/architecture/handlers/control/query_statistics_handler.py Includes state_version in WorkerMetrics.
amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala Adds unit tests for initial version, bumping rules, and no-op/rejected transitions.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala Updates test stub StartWorker response to include stateVersion.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala Adds coverage for version-based ordering, terminal absorption, and #6010 regression.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala Updates test helper to use updateState + updateStats with a monotonic surrogate.
amber/src/test/python/core/architecture/managers/test_state_manager.py Adds Python unit tests mirroring Scala version semantics.
amber/src/test/python/core/runnables/test_main_loop.py Updates expectations to carry through the reported state_version instead of hardcoding counts.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@xuang7 xuang7 added the release/v1.2 back porting to release/v1.2 label Jun 29, 2026
@xuang7

xuang7 commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Hi @Yicong-Huang, it seems this PR cannot be directly backported. Do we still need to backport this change? I noticed that OperatorExecutionSpec.scala does not exist on the release branch.

@xuang7 xuang7 removed the release/v1.2 back porting to release/v1.2 label Jun 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fast source operator stays orange (RUNNING) after the workflow completes

5 participants