Skip to content

Commit 4361370

Browse files
committed
feat(supervisor): wide events on dequeue + warm-start trace
1 parent 5a3a442 commit 4361370

1 file changed

Lines changed: 207 additions & 127 deletions

File tree

apps/supervisor/src/index.ts

Lines changed: 207 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ import { FailedPodHandler } from "./services/failedPodHandler.js";
2828
import { getWorkerToken } from "./workerToken.js";
2929
import { OtlpTraceService } from "./services/otlpTraceService.js";
3030
import { extractTraceparent, getRestoreRunnerId } from "./util.js";
31+
import {
32+
fromContext,
33+
recordPhaseSince,
34+
runWideEvent,
35+
setExtra,
36+
setMeta,
37+
type WideEventOptions,
38+
} from "./wideEvents/index.js";
3139

3240
if (env.METRICS_COLLECT_DEFAULTS) {
3341
collectDefaultMetrics({ register });
@@ -50,6 +58,12 @@ class ManagedSupervisor {
5058
private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED);
5159
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;
5260

61+
private readonly wideEventOpts: WideEventOptions = {
62+
service: "supervisor",
63+
env: { nodeId: env.TRIGGER_WORKER_INSTANCE_NAME },
64+
enabled: env.TRIGGER_WIDE_EVENTS_ENABLED,
65+
};
66+
5367
constructor() {
5468
const {
5569
TRIGGER_WORKER_TOKEN,
@@ -239,149 +253,202 @@ class ManagedSupervisor {
239253
async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => {
240254
this.logger.verbose(`Received message with timestamp ${time.toLocaleString()}`, message);
241255

242-
if (message.completedWaitpoints.length > 0) {
243-
this.logger.debug("Run has completed waitpoints", {
244-
runId: message.run.id,
245-
completedWaitpoints: message.completedWaitpoints.length,
246-
});
247-
}
248-
249-
if (!message.image) {
250-
this.logger.error("Run has no image", { runId: message.run.id });
251-
return;
252-
}
256+
const traceparent = extractTraceparent(message.run.traceContext);
257+
258+
await runWideEvent(
259+
{
260+
...this.wideEventOpts,
261+
traceparent,
262+
setup: (state) => {
263+
setMeta(state, "run_id", message.run.id);
264+
setMeta(state, "env_id", message.environment.id);
265+
setMeta(state, "org_id", message.organization.id);
266+
setMeta(state, "project_id", message.project.id);
267+
if (message.deployment.friendlyId) {
268+
setMeta(state, "deployment_id", message.deployment.friendlyId);
269+
}
270+
setMeta(state, "machine_preset", message.run.machine.name);
271+
state.extras.iteration = "dequeue";
272+
state.extras.dequeue_response_ms = dequeueResponseMs;
273+
state.extras.polling_interval_ms = pollingIntervalMs;
274+
state.extras.completed_waitpoints = message.completedWaitpoints.length;
275+
},
276+
},
277+
async () => {
278+
if (message.completedWaitpoints.length > 0) {
279+
this.logger.debug("Run has completed waitpoints", {
280+
runId: message.run.id,
281+
completedWaitpoints: message.completedWaitpoints.length,
282+
});
283+
}
253284

254-
const { checkpoint, ...rest } = message;
255-
256-
// Register trace context early so snapshot spans work for all paths
257-
// (cold create, restore, warm start). Re-registration on restore is safe
258-
// since dequeue always provides fresh context.
259-
if (this.computeManager?.traceSpansEnabled) {
260-
const traceparent = extractTraceparent(message.run.traceContext);
261-
262-
if (traceparent) {
263-
this.workloadServer.registerRunTraceContext(message.run.friendlyId, {
264-
traceparent,
265-
envId: message.environment.id,
266-
orgId: message.organization.id,
267-
projectId: message.project.id,
268-
});
269-
}
270-
}
285+
if (!message.image) {
286+
setExtra(fromContext(), "path_taken", "skipped_no_image");
287+
this.logger.error("Run has no image", { runId: message.run.id });
288+
return;
289+
}
271290

272-
if (checkpoint) {
273-
this.logger.debug("Restoring run", { runId: message.run.id });
291+
const { checkpoint, ...rest } = message;
274292

275-
if (this.computeManager) {
276-
try {
277-
const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id);
278-
279-
const didRestore = await this.computeManager.restore({
280-
snapshotId: checkpoint.location,
281-
runnerId,
282-
runFriendlyId: message.run.friendlyId,
283-
snapshotFriendlyId: message.snapshot.friendlyId,
284-
machine: message.run.machine,
285-
traceContext: message.run.traceContext,
293+
// Register trace context early so snapshot spans work for all paths
294+
// (cold create, restore, warm start). Re-registration on restore is safe
295+
// since dequeue always provides fresh context.
296+
if (this.computeManager?.traceSpansEnabled && traceparent) {
297+
this.workloadServer.registerRunTraceContext(message.run.friendlyId, {
298+
traceparent,
286299
envId: message.environment.id,
287300
orgId: message.organization.id,
288301
projectId: message.project.id,
289-
dequeuedAt: message.dequeuedAt,
290302
});
303+
}
304+
305+
if (checkpoint) {
306+
setExtra(fromContext(), "path_taken", "restore");
307+
this.logger.debug("Restoring run", { runId: message.run.id });
308+
309+
if (this.computeManager) {
310+
const restoreStart = performance.now();
311+
try {
312+
const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id);
313+
314+
const didRestore = await this.computeManager.restore({
315+
snapshotId: checkpoint.location,
316+
runnerId,
317+
runFriendlyId: message.run.friendlyId,
318+
snapshotFriendlyId: message.snapshot.friendlyId,
319+
machine: message.run.machine,
320+
traceContext: message.run.traceContext,
321+
envId: message.environment.id,
322+
orgId: message.organization.id,
323+
projectId: message.project.id,
324+
dequeuedAt: message.dequeuedAt,
325+
});
326+
recordPhaseSince("restore", restoreStart, undefined);
327+
setExtra(fromContext(), "did_restore", didRestore);
328+
329+
if (didRestore) {
330+
this.logger.debug("Compute restore successful", {
331+
runId: message.run.id,
332+
runnerId,
333+
});
334+
} else {
335+
this.logger.error("Compute restore failed", {
336+
runId: message.run.id,
337+
runnerId,
338+
});
339+
}
340+
} catch (error) {
341+
recordPhaseSince(
342+
"restore",
343+
restoreStart,
344+
error instanceof Error ? error : new Error(String(error))
345+
);
346+
this.logger.error("Failed to restore run (compute)", { error });
347+
}
348+
349+
return;
350+
}
291351

292-
if (didRestore) {
293-
this.logger.debug("Compute restore successful", {
294-
runId: message.run.id,
295-
runnerId,
352+
if (!this.checkpointClient) {
353+
this.logger.error("No checkpoint client", { runId: message.run.id });
354+
return;
355+
}
356+
357+
const restoreStart = performance.now();
358+
try {
359+
const didRestore = await this.checkpointClient.restoreRun({
360+
runFriendlyId: message.run.friendlyId,
361+
snapshotFriendlyId: message.snapshot.friendlyId,
362+
body: {
363+
...rest,
364+
checkpoint,
365+
},
296366
});
297-
} else {
298-
this.logger.error("Compute restore failed", { runId: message.run.id, runnerId });
367+
recordPhaseSince("restore", restoreStart, undefined);
368+
setExtra(fromContext(), "did_restore", didRestore);
369+
370+
if (didRestore) {
371+
this.logger.debug("Restore successful", { runId: message.run.id });
372+
} else {
373+
this.logger.error("Restore failed", { runId: message.run.id });
374+
}
375+
} catch (error) {
376+
recordPhaseSince(
377+
"restore",
378+
restoreStart,
379+
error instanceof Error ? error : new Error(String(error))
380+
);
381+
this.logger.error("Failed to restore run", { error });
299382
}
300-
} catch (error) {
301-
this.logger.error("Failed to restore run (compute)", { error });
383+
384+
return;
302385
}
303386

304-
return;
305-
}
387+
this.logger.debug("Scheduling run", { runId: message.run.id });
306388

307-
if (!this.checkpointClient) {
308-
this.logger.error("No checkpoint client", { runId: message.run.id });
309-
return;
310-
}
389+
const warmStartStart = performance.now();
390+
const didWarmStart = await this.tryWarmStart(message, traceparent);
391+
const warmStartCheckMs = Math.round(performance.now() - warmStartStart);
392+
recordPhaseSince("warm_start", warmStartStart, undefined);
393+
setExtra(fromContext(), "did_warm_start", didWarmStart);
311394

312-
try {
313-
const didRestore = await this.checkpointClient.restoreRun({
314-
runFriendlyId: message.run.friendlyId,
315-
snapshotFriendlyId: message.snapshot.friendlyId,
316-
body: {
317-
...rest,
318-
checkpoint,
319-
},
320-
});
321-
322-
if (didRestore) {
323-
this.logger.debug("Restore successful", { runId: message.run.id });
324-
} else {
325-
this.logger.error("Restore failed", { runId: message.run.id });
395+
if (didWarmStart) {
396+
setExtra(fromContext(), "path_taken", "warm_start");
397+
this.logger.debug("Warm start successful", { runId: message.run.id });
398+
return;
326399
}
327-
} catch (error) {
328-
this.logger.error("Failed to restore run", { error });
329-
}
330400

331-
return;
332-
}
333-
334-
this.logger.debug("Scheduling run", { runId: message.run.id });
401+
setExtra(fromContext(), "path_taken", "cold_create");
335402

336-
const warmStartStart = performance.now();
337-
const didWarmStart = await this.tryWarmStart(message);
338-
const warmStartCheckMs = Math.round(performance.now() - warmStartStart);
403+
const createStart = performance.now();
404+
try {
405+
if (!message.deployment.friendlyId) {
406+
// mostly a type guard, deployments always exists for deployed environments
407+
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
408+
throw new Error("Deployment is missing");
409+
}
339410

340-
if (didWarmStart) {
341-
this.logger.debug("Warm start successful", { runId: message.run.id });
342-
return;
343-
}
411+
await this.workloadManager.create({
412+
dequeuedAt: message.dequeuedAt,
413+
dequeueResponseMs,
414+
pollingIntervalMs,
415+
warmStartCheckMs,
416+
envId: message.environment.id,
417+
envType: message.environment.type,
418+
image: message.image,
419+
machine: message.run.machine,
420+
orgId: message.organization.id,
421+
projectId: message.project.id,
422+
deploymentFriendlyId: message.deployment.friendlyId,
423+
deploymentVersion: message.backgroundWorker.version,
424+
runId: message.run.id,
425+
runFriendlyId: message.run.friendlyId,
426+
version: message.version,
427+
nextAttemptNumber: message.run.attemptNumber,
428+
snapshotId: message.snapshot.id,
429+
snapshotFriendlyId: message.snapshot.friendlyId,
430+
placementTags: message.placementTags,
431+
traceContext: message.run.traceContext,
432+
annotations: message.run.annotations,
433+
hasPrivateLink: message.organization.hasPrivateLink,
434+
});
435+
recordPhaseSince("workload_create", createStart, undefined);
344436

345-
try {
346-
if (!message.deployment.friendlyId) {
347-
// mostly a type guard, deployments always exists for deployed environments
348-
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
349-
throw new Error("Deployment is missing");
437+
// Disabled for now
438+
// this.resourceMonitor.blockResources({
439+
// cpu: message.run.machine.cpu,
440+
// memory: message.run.machine.memory,
441+
// });
442+
} catch (error) {
443+
recordPhaseSince(
444+
"workload_create",
445+
createStart,
446+
error instanceof Error ? error : new Error(String(error))
447+
);
448+
this.logger.error("Failed to create workload", { error });
449+
}
350450
}
351-
352-
await this.workloadManager.create({
353-
dequeuedAt: message.dequeuedAt,
354-
dequeueResponseMs,
355-
pollingIntervalMs,
356-
warmStartCheckMs,
357-
envId: message.environment.id,
358-
envType: message.environment.type,
359-
image: message.image,
360-
machine: message.run.machine,
361-
orgId: message.organization.id,
362-
projectId: message.project.id,
363-
deploymentFriendlyId: message.deployment.friendlyId,
364-
deploymentVersion: message.backgroundWorker.version,
365-
runId: message.run.id,
366-
runFriendlyId: message.run.friendlyId,
367-
version: message.version,
368-
nextAttemptNumber: message.run.attemptNumber,
369-
snapshotId: message.snapshot.id,
370-
snapshotFriendlyId: message.snapshot.friendlyId,
371-
placementTags: message.placementTags,
372-
traceContext: message.run.traceContext,
373-
annotations: message.run.annotations,
374-
hasPrivateLink: message.organization.hasPrivateLink,
375-
});
376-
377-
// Disabled for now
378-
// this.resourceMonitor.blockResources({
379-
// cpu: message.run.machine.cpu,
380-
// memory: message.run.machine.memory,
381-
// });
382-
} catch (error) {
383-
this.logger.error("Failed to create workload", { error });
384-
}
451+
);
385452
}
386453
);
387454

@@ -404,6 +471,7 @@ class ManagedSupervisor {
404471
checkpointClient: this.checkpointClient,
405472
computeManager: this.computeManager,
406473
tracing: this.tracing,
474+
wideEventOpts: this.wideEventOpts,
407475
});
408476

409477
this.workloadServer.on("runConnected", this.onRunConnected.bind(this));
@@ -420,19 +488,31 @@ class ManagedSupervisor {
420488
this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]);
421489
}
422490

423-
private async tryWarmStart(dequeuedMessage: DequeuedMessage): Promise<boolean> {
491+
private async tryWarmStart(
492+
dequeuedMessage: DequeuedMessage,
493+
traceparent: string | undefined
494+
): Promise<boolean> {
424495
if (!this.warmStartUrl) {
425496
return false;
426497
}
427498

428499
const warmStartUrlWithPath = new URL("/warm-start", this.warmStartUrl);
429500

501+
const headers: Record<string, string> = {
502+
"Content-Type": "application/json",
503+
};
504+
// Propagate the inbound W3C traceparent so the upstream warm-start
505+
// receiver continues the same trace instead of minting a new one. Gated
506+
// by the same kill switch as the wide-event emission so the whole PR is
507+
// a no-op on the wire when disabled.
508+
if (this.wideEventOpts.enabled && traceparent) {
509+
headers.traceparent = traceparent;
510+
}
511+
430512
try {
431513
const res = await fetch(warmStartUrlWithPath.href, {
432514
method: "POST",
433-
headers: {
434-
"Content-Type": "application/json",
435-
},
515+
headers,
436516
body: JSON.stringify({ dequeuedMessage }),
437517
});
438518

0 commit comments

Comments
 (0)