Skip to content

Commit 6ead0d0

Browse files
Cover reconnect and tracking phases in onError callback tests
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent f235e1d commit 6ead0d0

File tree

2 files changed

+103
-2
lines changed

2 files changed

+103
-2
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,107 @@ describe("TriggerChatTransport", function () {
994994
expect(chunks).toHaveLength(2);
995995
});
996996

997+
it("reports consumeTrackingStream failures through onError", async function () {
998+
const errors: TriggerChatTransportError[] = [];
999+
const runStore = new TrackedRunStore();
1000+
1001+
const server = await startServer(function (req, res) {
1002+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
1003+
res.writeHead(200, {
1004+
"content-type": "application/json",
1005+
"x-trigger-jwt": "pk_run_tracking_error",
1006+
});
1007+
res.end(JSON.stringify({ id: "run_tracking_error" }));
1008+
return;
1009+
}
1010+
1011+
res.writeHead(404);
1012+
res.end();
1013+
});
1014+
1015+
const transport = new TriggerChatTransport({
1016+
task: "chat-task",
1017+
stream: "chat-stream",
1018+
accessToken: "pk_trigger",
1019+
baseURL: server.url,
1020+
runStore,
1021+
onError: function onError(error) {
1022+
errors.push(error);
1023+
},
1024+
});
1025+
1026+
(transport as any).fetchRunStream = async function fetchRunStream() {
1027+
return new ReadableStream({
1028+
start(controller) {
1029+
controller.error(new Error("tracking failed root cause"));
1030+
},
1031+
});
1032+
};
1033+
1034+
const stream = await transport.sendMessages({
1035+
trigger: "submit-message",
1036+
chatId: "chat-tracking-error",
1037+
messageId: undefined,
1038+
messages: [],
1039+
abortSignal: undefined,
1040+
});
1041+
1042+
await expect(readChunks(stream)).rejects.toThrowError("tracking failed root cause");
1043+
1044+
await waitForCondition(function () {
1045+
return errors.length === 1;
1046+
});
1047+
1048+
expect(errors[0]).toMatchObject({
1049+
phase: "consumeTrackingStream",
1050+
chatId: "chat-tracking-error",
1051+
runId: "run_tracking_error",
1052+
});
1053+
expect(errors[0]?.error.message).toBe("tracking failed root cause");
1054+
expect(runStore.get("chat-tracking-error")).toBeUndefined();
1055+
});
1056+
1057+
it("reports reconnect failures through onError", async function () {
1058+
const errors: TriggerChatTransportError[] = [];
1059+
const runStore = new InMemoryTriggerChatRunStore();
1060+
runStore.set({
1061+
chatId: "chat-reconnect-error",
1062+
runId: "run_reconnect_error",
1063+
publicAccessToken: "pk_reconnect_error",
1064+
streamKey: "chat-stream",
1065+
lastEventId: "100-0",
1066+
isActive: true,
1067+
});
1068+
1069+
const transport = new TriggerChatTransport({
1070+
task: "chat-task",
1071+
stream: "chat-stream",
1072+
accessToken: "pk_trigger",
1073+
runStore,
1074+
onError: function onError(error) {
1075+
errors.push(error);
1076+
},
1077+
});
1078+
1079+
(transport as any).fetchRunStream = async function fetchRunStream() {
1080+
throw new Error("reconnect root cause");
1081+
};
1082+
1083+
const stream = await transport.reconnectToStream({
1084+
chatId: "chat-reconnect-error",
1085+
});
1086+
1087+
expect(stream).toBeNull();
1088+
expect(errors).toHaveLength(1);
1089+
expect(errors[0]).toMatchObject({
1090+
phase: "reconnect",
1091+
chatId: "chat-reconnect-error",
1092+
runId: "run_reconnect_error",
1093+
});
1094+
expect(errors[0]?.error.message).toBe("reconnect root cause");
1095+
expect(runStore.get("chat-reconnect-error")).toBeUndefined();
1096+
});
1097+
9971098
it("cleans run store state when stream completes", async function () {
9981099
const trackedRunStore = new TrackedRunStore();
9991100

packages/ai/src/chatTransport.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ export class TriggerChatTransport<
301301
await this.runStore.set(runState);
302302
await this.runStore.delete(chatId);
303303
}
304-
} catch {
304+
} catch (error) {
305305
const runState = await this.runStore.get(chatId);
306306
if (runState) {
307307
runState.isActive = false;
@@ -311,7 +311,7 @@ export class TriggerChatTransport<
311311
phase: "consumeTrackingStream",
312312
chatId: runState.chatId,
313313
runId: runState.runId,
314-
error: new Error("Stream tracking failed"),
314+
error: normalizeError(error),
315315
});
316316
}
317317
}

0 commit comments

Comments
 (0)