Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/metal-steaks-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

fix(sdk): batch triggerAndWait variants now return correct run.taskIdentifier instead of unknown
30 changes: 19 additions & 11 deletions packages/trigger-sdk/src/v3/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
ctx,
});

const runs = await handleBatchTaskRunExecutionResultV2(result.items);
const runs = await handleBatchTaskRunExecutionResultV2(result.items, response.taskIdentifiers);

return {
id: result.id,
Expand Down Expand Up @@ -980,7 +980,7 @@ export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
ctx,
});

const runs = await handleBatchTaskRunExecutionResultV2(result.items);
const runs = await handleBatchTaskRunExecutionResultV2(result.items, response.taskIdentifiers);

return {
id: result.id,
Expand Down Expand Up @@ -1457,7 +1457,7 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
ctx,
});

const runs = await handleBatchTaskRunExecutionResultV2(result.items);
const runs = await handleBatchTaskRunExecutionResultV2(result.items, response.taskIdentifiers);

return {
id: result.id,
Expand Down Expand Up @@ -1504,7 +1504,7 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
ctx,
});

const runs = await handleBatchTaskRunExecutionResultV2(result.items);
const runs = await handleBatchTaskRunExecutionResultV2(result.items, response.taskIdentifiers);

return {
id: result.id,
Expand Down Expand Up @@ -1545,7 +1545,7 @@ async function executeBatchTwoPhase(
spanParentAsLink?: boolean;
},
requestOptions?: TriggerApiRequestOptions
): Promise<{ id: string; runCount: number; publicAccessToken: string }> {
): Promise<{ id: string; runCount: number; publicAccessToken: string; taskIdentifiers: string[] }> {
let batch: Awaited<ReturnType<typeof apiClient.createBatch>> | undefined;

try {
Expand Down Expand Up @@ -1588,6 +1588,7 @@ async function executeBatchTwoPhase(
id: batch.id,
runCount: batch.runCount,
publicAccessToken: batch.publicAccessToken,
taskIdentifiers: items.map((item) => item.task),
};
}

Expand Down Expand Up @@ -1703,7 +1704,7 @@ async function executeBatchTwoPhaseStreaming(
spanParentAsLink?: boolean;
},
requestOptions?: TriggerApiRequestOptions
): Promise<{ id: string; runCount: number; publicAccessToken: string }> {
): Promise<{ id: string; runCount: number; publicAccessToken: string; taskIdentifiers: string[] }> {
// For streaming, we need to buffer items to get the count first
// This is because createBatch requires runCount upfront
// In the future, we could add a streaming-first endpoint that doesn't require this
Expand Down Expand Up @@ -2676,16 +2677,20 @@ async function handleBatchTaskRunExecutionResult<TIdentifier extends string, TOu
}

async function handleBatchTaskRunExecutionResultV2(
items: Array<TaskRunExecutionResult>
items: Array<TaskRunExecutionResult>,
taskIdentifiers?: string[]
): Promise<Array<AnyTaskRunResult>> {
const someObjectStoreOutputs = items.some(
(item) => item.ok && item.outputType === "application/store"
);

if (!someObjectStoreOutputs) {
const results = await Promise.all(
items.map(async (item) => {
return await handleTaskRunExecutionResult(item, item.taskIdentifier ?? "unknown");
items.map(async (item, index) => {
return await handleTaskRunExecutionResult(
item,
item.taskIdentifier ?? taskIdentifiers?.[index] ?? "unknown"
);
})
);

Expand All @@ -2696,8 +2701,11 @@ async function handleBatchTaskRunExecutionResultV2(
"store.downloadPayloads",
async (span) => {
const results = await Promise.all(
items.map(async (item) => {
return await handleTaskRunExecutionResult(item, item.taskIdentifier ?? "unknown");
items.map(async (item, index) => {
return await handleTaskRunExecutionResult(
item,
item.taskIdentifier ?? taskIdentifiers?.[index] ?? "unknown"
);
})
);

Expand Down
Loading