feat: Sessions - bidirectional durable agent streams#3417
Merged
ericallam merged 23 commits intoApr 28, 2026
Merged
Conversation
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What this gives Trigger.dev users
A new first-class primitive, Session, for durable, task-bound, bidirectional I/O that outlives any single run. Sessions are the run manager for
chat.agentgoing forward, and they unblock anything else that needs "one identifier, many runs over time" with a stable channel pair the client can write to and subscribe to.Use cases unblocked
chatIdviaexternalId), turns 1..N attach to the same Session, the UI subscribes once and keeps receiving output as new runs take over..in, the client writes to.in, the server enforces no-writes-after-close..outafter the task finishes to replay history.How it works (Session-as-run-manager)
A Session row is task-bound (
taskIdentifier+triggerConfigare required) and owns its current run viacurrentRunId+currentRunVersionfor optimistic claim. Three trigger paths:POST /api/v1/sessionscreates the row and triggers the first run synchronously.POST /realtime/v1/sessions/:session/in/appendchecks if the current run is alive; if it has terminated (idle exit, crash, etc.), the server triggers a new run before processing the append.POST /api/v1/sessions/:session/end-and-continue, called by the running agent, triggers a fresh run and atomically swapscurrentRunId. Used bychat.requestUpgrade()for version handoffs.Every triggered run is recorded in the
SessionRunaudit table with a reason (initial,continuation,upgrade,manual).Public API surface
Control plane
POST /api/v1/sessions— create. Idempotent on(env, externalId). Triggers the first run, returns the session and a session-scoped public access token. Returns 409 if the upserted row is already closed.GET /api/v1/sessions/:session— retrieve by friendlyId (session_abc...) or by your own externalId (server disambiguates by prefix).GET /api/v1/sessions— list with filters (type,tag,taskIdentifier,externalId, derivedstatusACTIVE/CLOSED/EXPIRED, created-at range) and cursor pagination. Backed by ClickHouse.PATCH /api/v1/sessions/:session— update tags / metadata / externalId.POST /api/v1/sessions/:session/close— terminate. Idempotent, hard-blocks new server-brokered writes.POST /api/v1/sessions/:session/end-and-continue— agent-only handoff to a fresh run.Realtime
PUT /realtime/v1/sessions/:session/:io— initialize a channel. Returns S2 credentials in headers so high-throughput clients can write direct to S2.GET /realtime/v1/sessions/:session/:io— SSE subscribe. Supports Last-Event-ID resume and an opt-inX-Peek-Settled: 1header that fast-closes the stream when the upstream is already settled (trigger:turn-complete), eliminating long-poll wait on reconnect-on-reload paths.POST /realtime/v1/sessions/:session/:io/append— server-side appends.POST /api/v1/runs/:runFriendlyId/session-streams/wait— runs wait on a session stream as a waitpoint, with a race-check to avoid suspending if data already landed.Auth scopes
sessionsis a new resource type.read:sessions:{id},write:sessions:{id},admin:sessions:{id}flow through the existing JWT validator. Session-scoped public access tokens minted by the server replace browser-held trigger-task tokens for chat-style flows — the browser never sees a run identifier or a run-scoped token in steady state.What's coming after this PR
@trigger.dev/sdkprerelease alongside this server deploy. Customers using the prereleasechat.agentwill follow the upgrade guide.Implementation notes
Sessiontable: scalar scoping columns (projectId,runtimeEnvironmentId,environmentType,organizationId) without FKs, matching the January TaskRun FK-removal decision. Point-lookup indexes only — list queries go to ClickHouse. Terminal markers (closedAt,expiresAt) are write-once.sessions_v1: ReplacingMergeTree, partitioned by month, ordered by(org_id, project_id, environment_id, created_at, session_id). Tags indexed viatokenbf_v1skip index.SessionsReplicationService: mirrorsRunsReplicationServiceexactly — leader-locked logical replication consumer,ConcurrentFlushScheduler, retry with exponential backoff + jitter, identical metric shape. Dedicated slot + publication so the two consume independently.sessions/{addressingKey}/{out|in}. The existingruns/{runId}/{streamId}key format for run-scoped streams is untouched.ensureRunForSessiontriggers a run upfront (cheap to cancel if it loses the race), then attempts anupdateManykeyed oncurrentRunVersion. Loser cancels its triggered run and reuses the winner's. No DB lock held across the trigger.What did NOT change
Run-scoped
streams.pipe/streams.inputand the existing/realtime/v1/streams/{runId}/...routes are unchanged. Sessions are net-new — not a reshaping of the current streams API.Deploy notes
SESSION_REPLICATION_CLICKHOUSE_URLandSESSION_REPLICATION_ENABLED=1to enable the replication consumer.Sessiontable needsREPLICA IDENTITY FULLset on the prod source DB before the publication is created (same one-time DDL we did forTaskRun). Required for delete events to carry full column values.GET /api/v1/sessions/:sessionloader (a JWT minted for either form authorizes both URL forms). Action routes are URL-form-specific, matching how the SDK mints PATs.Verification
apps/webapp/test/sessionsReplicationService.test.ts— round-trip tests for insert/update/delete through Postgres logical replication into ClickHouse via testcontainers..out.initialize+.out.appendx2 +.in.send+.out.subscribeover SSE, list with all filter combinations + pagination,end-and-continueswap,X-Peek-Settledfast-close (verified in browser via reconnect-on-reload and via curl). Replicated row lands in ClickHouse within ~1s.prismawriter, info-leak on auth-routes masked as 403, peek-settled discriminator parsing fix, etc.).Test plan
pnpm run typecheck --filter webapppnpm run test --filter webapp ./test/sessionsReplicationService.test.ts --runSESSION_REPLICATION_CLICKHOUSE_URLandSESSION_REPLICATION_ENABLED=1. Confirm the slot and publication auto-create on boot.POST /api/v1/sessionsand verify the row replicates totrigger_dev.sessions_v1within a couple of seconds.POST /api/v1/sessions/:id/close, then confirmPOST /realtime/v1/sessions/:id/out/appendreturns 400.externalIdonPOST /api/v1/sessionsand confirm 409.GET /realtime/v1/sessions/:id/outwithX-Peek-Settled: 1after a turn completes and confirmX-Session-Settled: trueresponse header + immediate close.