-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(supervisor): compute workload manager #3114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 26 commits
Commits
Show all changes
70 commits
Select commit
Hold shift + click to select a range
7cbd3aa
feat(supervisor): add ComputeWorkloadManager for compute gateway
nicktrn ccc8fe2
chore: merge main into feat/compute-workload-manager
nicktrn 3175a10
fix(supervisor): strip image digest in ComputeWorkloadManager
nicktrn 56ef39f
fix: add fetch timeout and wide event logging to ComputeWorkloadManager
nicktrn 1bccd1e
feat: make gateway fetch timeout configurable
nicktrn 74817d7
refactor(supervisor): improve ComputeWorkloadManager wide event logging
nicktrn a538735
feat(supervisor): send metadata in compute sandbox creation requests
nicktrn ac3dadf
feat(supervisor): send machine cpu/memory in compute sandbox requests
nicktrn 5a7b8ce
feat(supervisor): add dequeue and warm start timing to wide event
nicktrn 7e251d4
Merge branch 'main' into HEAD
nicktrn e4915c4
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 4d603ad
feat(supervisor): add compute checkpoint/restore support
nicktrn 9466a47
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn c1511f9
fix(cli): fix --load flag on local/self-hosted builds
nicktrn 0a6d6f1
feat(supervisor): pass name, metadata, and resources in compute resto…
nicktrn 4332743
feat(supervisor): add flag to enable compute snapshots
nicktrn 5089bba
feat(supervisor): require metadata URL when compute snapshots enabled
nicktrn 7ed9221
fix(supervisor): require workload API domain when compute snapshots e…
nicktrn e9b5fd3
fix(supervisor): don't destroy compute instance after snapshot
nicktrn 0531a23
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 9572c7d
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 5032b7f
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn f3e0cb8
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 0edc308
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 63424fa
feat(supervisor): add snapshot delay for compute path via timer wheel
nicktrn 80b62d4
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 8b4c6bf
feat: emit compute OTel spans (provision, restore, snapshot) in run t…
nicktrn f70be68
refactor(supervisor): demote per-run logs to debug/verbose for quiete…
nicktrn bc05705
feat(supervisor): add COMPUTE_TRACE_OTLP_ENDPOINT override and demote…
nicktrn 7349acb
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 7b37b0c
feat(database): add WorkloadType enum and column to WorkerInstanceGroup
nicktrn 441334b
feat(core): add shared compute gateway client and template creation t…
nicktrn d0149e9
feat(webapp): add compute template creation service
nicktrn 50cf672
feat(webapp): integrate template creation into deploy finalize flow
nicktrn 899a7fb
fix: deduplicate shadow template creation and reuse service instance
nicktrn 3cc7874
fix(webapp): add template creation to V1 finalize path (non-remote bu…
nicktrn 8f08403
refactor: move compute gateway client from @trigger.dev/core to @inte…
nicktrn d0445ef
fix: simplify gateway client return type, remove unused json parsing
nicktrn 1834da4
fix(supervisor): address review feedback on compute workload manager
nicktrn 76e5715
refactor: convert span-timeline-events from always-loaded rule to on-…
nicktrn 2397101
chore: merge main into feat/compute-workload-manager
nicktrn 6e32bb7
feat(webapp): add hasComputeAccess feature flag for private beta org …
nicktrn 64b08b9
fix: skip resolveMode early when compute gateway is not configured
nicktrn 6809f07
fix: shadow mode uses fire-and-forget HTTP to avoid holding connections
nicktrn 5d24dd1
feat(webapp): use background flag for shadow mode template creation
nicktrn 4cbbadb
refactor: consolidate template creation logic into ComputeTemplateCre…
nicktrn eef782f
fix: use .then() instead of .catch() for shadow mode error logging
nicktrn 2219d11
fix: update consumerPool test assertion for optional timing parameter
nicktrn b7fa420
refactor: consolidate compute gateway clients into shared @internal/c…
nicktrn d8e478a
fix: add type-safe post return, strip image digests consistently
nicktrn 641d6a3
refactor: convert remaining compute types to zod schemas
nicktrn c1021f2
fix: bound trace context map, gate on compute mode, use machine prese…
nicktrn 1005428
fix: register trace context before restore/warm-start, sanitize logge…
nicktrn 5ffc7d4
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 64729bb
fix: shadow mode for org-level compute access, require only for MICRO…
nicktrn e9bcbe4
fix: wrap writer.write in try/catch to handle client disconnect durin…
nicktrn 061c2fb
feat: add OtlpTraceService
nicktrn 8711f5b
refactor: move otlp trace tests to services/
nicktrn 9d72ae2
refactor: remove env import from compute workload manager
nicktrn 18eb7bb
refactor: use OtlpTraceService in workload server
nicktrn 91f9fa3
refactor: wire up OtlpTraceService to workload server, delete old otl…
nicktrn 36ecdb5
refactor: inline payload builder into trace service, extract tracepar…
nicktrn 30df9e2
fix: skip k8s integration tests by default, require K8S_INTEGRATION_T…
nicktrn 05a6721
fix: review fixes - COMPUTE checkpoint type, memory_gb standardizatio…
nicktrn cacee1e
fix: make snapshot dispatch limit configurable via COMPUTE_SNAPSHOT_D…
nicktrn 680f156
refactor: extract ComputeSnapshotService from workload server, fix zo…
nicktrn 5142954
fix: remove unnecessary re-export, import schema directly from @inter…
nicktrn 9925c72
Merge remote-tracking branch 'origin/main' into feat/compute-workload…
nicktrn 48bbb87
fix: use BigInt for OTLP nanosecond timestamps to avoid precision loss
nicktrn 5b188a5
docs: add server-changes for compute template pre-warming
nicktrn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "trigger.dev": patch | ||
| --- | ||
|
|
||
| Fix `--load` flag being silently ignored on local/self-hosted builds. |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,254 @@ | ||
| import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; | ||
| import { TimerWheel } from "./timerWheel.js"; | ||
|
|
||
| describe("TimerWheel", () => { | ||
| beforeEach(() => { | ||
| vi.useFakeTimers(); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| vi.useRealTimers(); | ||
| }); | ||
|
|
||
| it("dispatches item after delay", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "snapshot-data"); | ||
|
|
||
| // Not yet | ||
| vi.advanceTimersByTime(2900); | ||
| expect(dispatched).toEqual([]); | ||
|
|
||
| // After delay | ||
| vi.advanceTimersByTime(200); | ||
| expect(dispatched).toEqual(["run-1"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("cancels item before it fires", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data"); | ||
|
|
||
| vi.advanceTimersByTime(1000); | ||
| expect(wheel.cancel("run-1")).toBe(true); | ||
|
|
||
| vi.advanceTimersByTime(5000); | ||
| expect(dispatched).toEqual([]); | ||
| expect(wheel.size).toBe(0); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("cancel returns false for unknown key", () => { | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: () => {}, | ||
| }); | ||
| expect(wheel.cancel("nonexistent")).toBe(false); | ||
| }); | ||
|
|
||
| it("deduplicates: resubmitting same key replaces the entry", () => { | ||
| const dispatched: { key: string; data: string }[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push({ key: item.key, data: item.data }), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "old-data"); | ||
|
|
||
| vi.advanceTimersByTime(1000); | ||
| wheel.submit("run-1", "new-data"); | ||
|
|
||
| // Original would have fired at t=3000, but was replaced | ||
| // New one fires at t=1000+3000=4000 | ||
| vi.advanceTimersByTime(2100); | ||
| expect(dispatched).toEqual([]); | ||
|
|
||
| vi.advanceTimersByTime(1000); | ||
| expect(dispatched).toEqual([{ key: "run-1", data: "new-data" }]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("handles many concurrent items", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| for (let i = 0; i < 1000; i++) { | ||
| wheel.submit(`run-${i}`, `data-${i}`); | ||
| } | ||
| expect(wheel.size).toBe(1000); | ||
|
|
||
| vi.advanceTimersByTime(3100); | ||
| expect(dispatched.length).toBe(1000); | ||
| expect(wheel.size).toBe(0); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("handles items submitted at different times", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| wheel.submit("run-1", "data"); | ||
| vi.advanceTimersByTime(1000); | ||
| wheel.submit("run-2", "data"); | ||
| vi.advanceTimersByTime(1000); | ||
| wheel.submit("run-3", "data"); | ||
|
|
||
| // t=2000: nothing yet | ||
| expect(dispatched).toEqual([]); | ||
|
|
||
| // t=3100: run-1 fires | ||
| vi.advanceTimersByTime(1100); | ||
| expect(dispatched).toEqual(["run-1"]); | ||
|
|
||
| // t=4100: run-2 fires | ||
| vi.advanceTimersByTime(1000); | ||
| expect(dispatched).toEqual(["run-1", "run-2"]); | ||
|
|
||
| // t=5100: run-3 fires | ||
| vi.advanceTimersByTime(1000); | ||
| expect(dispatched).toEqual(["run-1", "run-2", "run-3"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("setDelay changes delay for new items only", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| wheel.submit("run-1", "data"); // 3s delay | ||
|
|
||
| vi.advanceTimersByTime(500); | ||
| wheel.setDelay(1000); | ||
| wheel.submit("run-2", "data"); // 1s delay | ||
|
|
||
| // t=1500: run-2 should have fired (submitted at t=500 with 1s delay) | ||
| vi.advanceTimersByTime(1100); | ||
| expect(dispatched).toEqual(["run-2"]); | ||
|
|
||
| // t=3100: run-1 fires at its original 3s delay | ||
| vi.advanceTimersByTime(1500); | ||
| expect(dispatched).toEqual(["run-2", "run-1"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("stop returns unprocessed items", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data-1"); | ||
| wheel.submit("run-2", "data-2"); | ||
| wheel.submit("run-3", "data-3"); | ||
|
|
||
| const remaining = wheel.stop(); | ||
| expect(dispatched).toEqual([]); | ||
| expect(wheel.size).toBe(0); | ||
| expect(remaining.length).toBe(3); | ||
| expect(remaining.map((r) => r.key).sort()).toEqual(["run-1", "run-2", "run-3"]); | ||
| expect(remaining.find((r) => r.key === "run-1")?.data).toBe("data-1"); | ||
| }); | ||
|
|
||
| it("after stop, new submissions are silently dropped", () => { | ||
| const dispatched: string[] = []; | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.stop(); | ||
|
|
||
| wheel.submit("run-late", "data"); | ||
| expect(dispatched).toEqual([]); | ||
| expect(wheel.size).toBe(0); | ||
| }); | ||
|
|
||
| it("tracks size correctly through submit/cancel/dispatch", () => { | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: () => {}, | ||
| }); | ||
|
|
||
| wheel.start(); | ||
|
|
||
| wheel.submit("a", "data"); | ||
| wheel.submit("b", "data"); | ||
| expect(wheel.size).toBe(2); | ||
|
|
||
| wheel.cancel("a"); | ||
| expect(wheel.size).toBe(1); | ||
|
|
||
| vi.advanceTimersByTime(3100); | ||
| expect(wheel.size).toBe(0); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("clamps delay to valid range", () => { | ||
| const dispatched: string[] = []; | ||
|
|
||
| // Very small delay (should be at least 1 tick = 100ms) | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 0, | ||
| onExpire: (item) => dispatched.push(item.key), | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data"); | ||
|
|
||
| vi.advanceTimersByTime(200); | ||
| expect(dispatched).toEqual(["run-1"]); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
|
|
||
| it("multiple cancel calls are safe", () => { | ||
| const wheel = new TimerWheel<string>({ | ||
| delayMs: 3000, | ||
| onExpire: () => {}, | ||
| }); | ||
|
|
||
| wheel.start(); | ||
| wheel.submit("run-1", "data"); | ||
|
|
||
| expect(wheel.cancel("run-1")).toBe(true); | ||
| expect(wheel.cancel("run-1")).toBe(false); | ||
|
|
||
| wheel.stop(); | ||
| }); | ||
| }); |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.