fix(amber): order worker state by version, not timestamp#6011
fix(amber): order worker state by version, not timestamp#6011Yicong-Huang wants to merge 3 commits into
Conversation
Worker state was reconciled at the controller by last-nanoTime-wins. A fast source's startWorker response carries a stale RUNNING snapshot that can arrive after COMPLETED was recorded, clobbering it and leaving the operator stuck orange. Order state causally instead: WorkerStateManager now stamps every transition with a monotonic per-worker version, carried on all state reports (WorkerStateResponse, WorkerStateUpdatedRequest, WorkerMetrics). The controller applies a state only if its version is newer, and treats COMPLETED/TERMINATED as absorbing. Stats keep timestamp ordering. Closes apache#6010
Automated Reviewer SuggestionsBased on the
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6011 +/- ##
============================================
- Coverage 56.67% 56.65% -0.02%
- Complexity 3030 3042 +12
============================================
Files 1124 1124
Lines 43298 43380 +82
Branches 4667 4673 +6
============================================
+ Hits 24539 24577 +38
- Misses 17319 17362 +43
- Partials 1440 1441 +1
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
✅ No material benchmark regressions detected🟢 2 better · 🔴 0 worse · ⚪ 13 noise (<±5%) · 0 without baseline
Baseline detailsLatest main
Raw CSVconfig_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,466.65,200,128000,429,0.262,22008.46,41100.98,41100.98
1,100,10,64,20,2119.90,2000,1280000,943,0.576,103986.16,132672.71,132672.71
2,1000,10,64,20,18343.72,20000,12800000,1090,0.665,918483.88,1031369.58,1031369.58 |
The pyamber worker reports its state to the controller via start/pause/ resume responses and query-statistics metrics. Without a version these defaulted to 0, so after the first report the controller's version gate dropped every later state change — breaking reconfiguration, which waits to observe RUNNING -> PAUSED -> RUNNING (ReconfigurationIntegrationSpec timed out). Mirror the Scala change in pyamber: StateManager bumps a monotonic state version on each transition, and the four state-reporting handlers include it.
The pause/resume/query-statistics assertions compared full messages against literals that defaulted state_version to 0; read the actual version from the report (as the stats sizes already are) so the equality holds while StateManager tests cover the version itself.
There was a problem hiding this comment.
Pull request overview
Fixes a race in Amber’s controller-side worker-state reconstruction where late-arriving RUNNING snapshots (e.g., from startWorker) could overwrite a newer terminal state (COMPLETED), causing operators to remain visually “RUNNING” after completion. The PR replaces receipt-time ordering with causal ordering via a per-worker logical stateVersion reported by the worker state machine, and makes terminal states absorbing.
Changes:
- Introduce a monotonic per-worker
stateVersion(Scala + Python) and propagate it through RPC/proto messages that report state. - Update controller-side reconciliation to apply worker state only when
stateVersionis strictly newer; keep worker statistics ordered by timestamp independently. - Add/adjust Scala and Python tests to cover version ordering, stale/equal-version rejection, terminal absorption, and regression for #6010.
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/StateManager.scala | Adds a monotonic stateVersion logical clock bumped on successful transitions. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala | Replaces timestamp-based state updates with updateState(stateVersion, state); adds terminal-state absorption and separate updateStats. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala | Applies startWorker state using stateVersion; switches termination path to forceTerminate(). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala | Orders pushed state updates by stateVersion instead of controller receipt time. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala | Applies resume responses using stateVersion. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala | Applies pause responses using stateVersion; updates stats via updateStats. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala | Splits metrics application into updateState(stateVersion, ...) and updateStats(timestamp, ...). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartHandler.scala | Includes stateVersion in WorkerStateResponse for start responses. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/ResumeHandler.scala | Includes stateVersion in WorkerStateResponse for resume responses. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/PauseHandler.scala | Includes stateVersion in WorkerStateResponse for pause responses. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/QueryStatisticsHandler.scala | Includes stateVersion in WorkerMetrics returned by queryStatistics. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala | Includes stateVersion in WorkerStateUpdatedRequest push events. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto | Adds state_version to WorkerMetrics for causal state ordering. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto | Adds state_version to WorkerStateResponse. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto | Adds state_version to WorkerStateUpdatedRequest. |
| amber/src/main/python/core/architecture/managers/state_manager.py | Adds a monotonic _state_version and get_state_version(); bumps on transitions. |
| amber/src/main/python/core/architecture/handlers/control/start_worker_handler.py | Returns WorkerStateResponse with state_version. |
| amber/src/main/python/core/architecture/handlers/control/pause_worker_handler.py | Returns WorkerStateResponse with state_version. |
| amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py | Returns WorkerStateResponse with state_version. |
| amber/src/main/python/core/architecture/handlers/control/query_statistics_handler.py | Includes state_version in WorkerMetrics. |
| amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala | Adds unit tests for initial version, bumping rules, and no-op/rejected transitions. |
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala | Updates test stub StartWorker response to include stateVersion. |
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala | Adds coverage for version-based ordering, terminal absorption, and #6010 regression. |
| amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala | Updates test helper to use updateState + updateStats with a monotonic surrogate. |
| amber/src/test/python/core/architecture/managers/test_state_manager.py | Adds Python unit tests mirroring Scala version semantics. |
| amber/src/test/python/core/runnables/test_main_loop.py | Updates expectations to carry through the reported state_version instead of hardcoding counts. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Hi @Yicong-Huang, it seems this PR cannot be directly backported. Do we still need to backport this change? I noticed that OperatorExecutionSpec.scala does not exist on the release branch. |
What changes were proposed in this PR?
A fast source operator stayed orange (RUNNING) in the editor after the run finished (issue #6010).
Why it's a bug. The controller reconstructs each worker's state from several unordered channels (source
RUNNINGvia thestartWorkerresponse, non-sourceRUNNINGviaworkerStateUpdated,COMPLETED/PAUSEDviaqueryStatistics/pauseWorkerresponses) and reconciled them with last-System.nanoTime()-wins inWorkerExecution. Worker state, however, is single-writer and strictly ordered causally. For a tiny source the run finishes almost instantly, so thestartWorkerresponse — carrying the staleRUNNINGit sampled at launch — can reach the controller afterCOMPLETEDwas recorded; its later receipt timestamp wins and clobbersCOMPLETED. Results render fine (separate path), so only the border is stuck.This PR orders worker state causally instead of by wall clock:
WorkerExecution.update(nanoTime, state)— last-write-wins by receipt timeupdateState(version, state)— newest logical version winsWorkerStateManagerbumps a monotonicstateVersionon everytransitTo(its state-machine logical clock).WorkerStateResponse,WorkerStateUpdatedRequest,WorkerMetrics(3 new proto fields).StateManagernow bumps the same monotonic version and the four handlers include it; otherwise version-0 reports would be dropped after the first and reconfiguration (RUNNING→PAUSED→RUNNING) would hang.Any related issues, documentation, discussions?
Closes #6010. The timestamp-based
updatewas introduced in #3557.How was this PR tested?
JDK 17. Scala unit + Scala/Python integration + Python unit:
WorkerExecutionSpec: version ordering (positive + stale/equal-version negatives), terminal-state absorption,forceTerminate, independent stats-vs-state ordering, and a named regression for Fast source operator stays orange (RUNNING) after the workflow completes #6010 (COMPLETEDsurvives a lateRUNNING). Verified the regression goes red whenupdateStateis reverted to last-write-wins.WorkerStateManagerSpec/test_state_manager.py: version starts at 0, bumps per successful transition, no bump on no-op self-transition or rejected transition.ReconfigurationIntegrationSpecreproduced the failure (timeout) before the pyamber fix and passes after.Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8 (1M context), via Claude Code