Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .changeset/thinking-blocks-per-step.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
'@tanstack/ai': minor
'@tanstack/ai-anthropic': patch
'@tanstack/ai-client': minor
---

**Fix thinking blocks getting merged across steps and lost on turn 2+ of Anthropic tool loops.**

Each thinking step emitted by the adapter now produces its own `ThinkingPart` on the `UIMessage` instead of being merged into a single part, and thinking content + Anthropic signatures are preserved in server-side message history so multi-turn tool flows with extended thinking work correctly.

This includes a public callback signature change: `StreamProcessorEvents.onThinkingUpdate` now receives `(messageId, stepId, content)` instead of `(messageId, content)`. `ChatClient` has been updated to handle the new `stepId` argument internally, but consumers implementing `StreamProcessorEvents` directly need to add the new parameter.

`@tanstack/ai`:

- `ThinkingPart` gains optional `stepId` and `signature` fields.
- `ModelMessage` gains an optional `thinking?: Array<{ content; signature? }>` field so prior thinking can be replayed in subsequent turns.
- `StepFinishedEvent` gains an optional `signature` field for provider-supplied thinking signatures.
- `StreamProcessor` tracks thinking per-step via `stepId` and keeps step ordering. `getState().thinking` / `getResult().thinking` concatenate step contents in order.
- The `onThinkingUpdate` callback on `StreamProcessorEvents` now receives `(messageId, stepId, content)` — consumers implementing it directly must add the `stepId` parameter.
- `TextEngine` accumulates thinking + signatures per iteration and includes them in assistant messages with tool calls so the next turn can replay them.

`@tanstack/ai-anthropic`:

- Captures `signature_delta` stream events and emits the final `STEP_FINISHED` with the signature on `content_block_stop`.
- Includes thinking blocks with signatures in `formatMessages` for multi-turn history.
- Passes `betas: ['interleaved-thinking-2025-05-14']` to the `beta.messages.create` call site when a thinking budget is configured. The beta flag is scoped to the streaming path only, so `structuredOutput` (which uses the non-beta `messages.create` endpoint) is unaffected.

`@tanstack/ai-client`:

- `ChatClient`'s internal `onThinkingUpdate` wiring is updated for the new `stepId` parameter.
83 changes: 80 additions & 3 deletions packages/typescript/ai-anthropic/src/adapters/text.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import type {
URLPDFSource,
} from '@anthropic-ai/sdk/resources/messages'
import type Anthropic_SDK from '@anthropic-ai/sdk'
import type { AnthropicBeta } from '@anthropic-ai/sdk/resources/beta/beta'
import type {
ContentPart,
Modality,
Expand Down Expand Up @@ -141,8 +142,23 @@ export class AnthropicTextAdapter<
`activity=chat provider=anthropic model=${this.model} messages=${options.messages.length} tools=${options.tools?.length ?? 0} stream=true`,
{ provider: 'anthropic', model: this.model },
)

// Interleaved thinking is only supported on the beta messages endpoint,
// so the `betas` flag is attached here rather than in the shared mapper
// (structuredOutput uses the non-beta endpoint which rejects `betas`).
const modelOptions = options.modelOptions as
| InternalTextProviderOptions
| undefined
const useInterleavedThinking =
modelOptions?.thinking?.type === 'enabled' &&
typeof modelOptions.thinking.budget_tokens === 'number' &&
modelOptions.thinking.budget_tokens > 0
const betas: Array<AnthropicBeta> | undefined = useInterleavedThinking
? ['interleaved-thinking-2025-05-14']
: undefined

const stream = await this.client.beta.messages.create(
{ ...requestParams, stream: true },
{ ...requestParams, stream: true, ...(betas && { betas }) },
{
signal: options.request?.signal,
headers: options.request?.headers,
Expand Down Expand Up @@ -431,6 +447,8 @@ export class AnthropicTextAdapter<
if (role === 'assistant' && message.toolCalls?.length) {
const contentBlocks: AnthropicContentBlocks = []

this.appendThinkingBlocks(contentBlocks, message.thinking)

if (message.content) {
const content =
typeof message.content === 'string' ? message.content : ''
Expand Down Expand Up @@ -469,6 +487,28 @@ export class AnthropicTextAdapter<
continue
}

if (role === 'assistant') {
const contentBlocks: AnthropicContentBlocks = []
this.appendThinkingBlocks(contentBlocks, message.thinking)

if (Array.isArray(message.content)) {
for (const part of message.content) {
contentBlocks.push(this.convertContentPartToAnthropic(part))
}
} else if (message.content) {
contentBlocks.push({
type: 'text',
text: message.content,
})
}

formattedMessages.push({
role: 'assistant',
content: contentBlocks.length > 0 ? contentBlocks : '',
})
continue
}

if (role === 'user' && Array.isArray(message.content)) {
const contentBlocks = message.content.map((part) =>
this.convertContentPartToAnthropic(part),
Expand All @@ -481,7 +521,7 @@ export class AnthropicTextAdapter<
}

formattedMessages.push({
role: role === 'assistant' ? 'assistant' : 'user',
role: 'user',
content:
typeof message.content === 'string'
? message.content
Expand All @@ -499,6 +539,22 @@ export class AnthropicTextAdapter<
return this.mergeConsecutiveSameRoleMessages(formattedMessages)
}

private appendThinkingBlocks(
contentBlocks: AnthropicContentBlocks,
thinkingParts: ModelMessage['thinking'],
): void {
if (!thinkingParts?.length) return

for (const thinking of thinkingParts) {
if (!thinking.signature) continue
contentBlocks.push({
type: 'thinking',
thinking: thinking.content,
signature: thinking.signature,
} as unknown as AnthropicContentBlock)
}
}

/**
* Merge consecutive messages of the same role into a single message.
* Anthropic's API requires strictly alternating user/assistant roles.
Expand Down Expand Up @@ -572,6 +628,7 @@ export class AnthropicTextAdapter<
const model = options.model
let accumulatedContent = ''
let accumulatedThinking = ''
let accumulatedSignature = ''
const timestamp = Date.now()
const toolCallsMap = new Map<
number,
Expand Down Expand Up @@ -621,6 +678,7 @@ export class AnthropicTextAdapter<
})
} else if (event.content_block.type === 'thinking') {
accumulatedThinking = ''
accumulatedSignature = ''
// Emit REASONING and STEP_STARTED for thinking
stepId = genId()
reasoningMessageId = genId()
Expand Down Expand Up @@ -714,6 +772,11 @@ export class AnthropicTextAdapter<
delta,
content: accumulatedThinking,
})
} else if (
(event.delta as { type: string }).type === 'signature_delta'
) {
accumulatedSignature +=
(event.delta as { signature: string }).signature || ''
} else if (event.delta.type === 'input_json_delta') {
const existing = toolCallsMap.get(currentToolIndex)
if (existing) {
Expand Down Expand Up @@ -744,7 +807,21 @@ export class AnthropicTextAdapter<
}
}
} else if (event.type === 'content_block_stop') {
if (currentBlockType === 'tool_use') {
if (currentBlockType === 'thinking') {
// Emit signature so it can be replayed in multi-turn context
if (accumulatedSignature && stepId) {
yield asChunk({
type: 'STEP_FINISHED',
stepName: stepId,
stepId,
model,
timestamp,
delta: '',
content: accumulatedThinking,
signature: accumulatedSignature,
})
}
} else if (currentBlockType === 'tool_use') {
const existing = toolCallsMap.get(currentToolIndex)
if (existing) {
// If tool call wasn't started yet (no args), start it now
Expand Down
124 changes: 124 additions & 0 deletions packages/typescript/ai-anthropic/tests/anthropic-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ const weatherTool: Tool = {
}),
}

function createTextStream(text: string) {
return (async function* () {
yield {
type: 'content_block_start',
index: 0,
content_block: { type: 'text', text: '' },
}
yield {
type: 'content_block_delta',
index: 0,
delta: { type: 'text_delta', text },
}
yield { type: 'content_block_stop', index: 0 }
yield {
type: 'message_delta',
delta: { stop_reason: 'end_turn' },
usage: { output_tokens: 5 },
}
yield { type: 'message_stop' }
})()
}

describe('Anthropic adapter option mapping', () => {
beforeEach(() => {
vi.clearAllMocks()
Expand Down Expand Up @@ -377,6 +399,108 @@ describe('Anthropic adapter option mapping', () => {
}
})

it('replays signed thinking blocks before tool use in multi-turn history', async () => {
mocks.betaMessagesCreate.mockResolvedValueOnce(
createTextStream('Follow-up answer'),
)

const adapter = createAdapter('claude-3-7-sonnet-20250219')

const chunks: StreamChunk[] = []
for await (const chunk of chat({
adapter,
messages: [
{ role: 'user', content: 'What is the weather in Berlin?' },
{
role: 'assistant',
content: null,
thinking: [
{
content: 'Need to fetch weather before answering.',
signature: 'signed-thinking-1',
},
],
toolCalls: [
{
id: 'call_1',
type: 'function',
function: { name: 'lookup_weather', arguments: toolArguments },
},
],
},
{ role: 'tool', toolCallId: 'call_1', content: '{"temp":72}' },
{ role: 'user', content: 'What should I wear?' },
],
tools: [weatherTool],
modelOptions: {
thinking: { type: 'enabled', budget_tokens: 1024 },
} as AnthropicTextProviderOptions,
})) {
chunks.push(chunk)
}

expect(mocks.betaMessagesCreate).toHaveBeenCalledTimes(1)
const [payload] = mocks.betaMessagesCreate.mock.calls[0]

expect(payload.betas).toEqual(['interleaved-thinking-2025-05-14'])
expect(payload.messages[1].content).toEqual([
{
type: 'thinking',
thinking: 'Need to fetch weather before answering.',
signature: 'signed-thinking-1',
},
{
type: 'tool_use',
id: 'call_1',
name: 'lookup_weather',
input: { location: 'Berlin' },
},
])
})

it('replays signed thinking blocks for assistant messages without tool calls', async () => {
mocks.betaMessagesCreate.mockResolvedValueOnce(
createTextStream('Next answer'),
)

const adapter = createAdapter('claude-3-7-sonnet-20250219')

const chunks: StreamChunk[] = []
for await (const chunk of chat({
adapter,
messages: [
{ role: 'user', content: 'Think then answer.' },
{
role: 'assistant',
content: 'Prior answer.',
thinking: [
{
content: 'Prior signed thinking.',
signature: 'signed-thinking-text-only',
},
],
},
{ role: 'user', content: 'Continue.' },
],
modelOptions: {
thinking: { type: 'enabled', budget_tokens: 1024 },
} as AnthropicTextProviderOptions,
})) {
chunks.push(chunk)
}

const [payload] = mocks.betaMessagesCreate.mock.calls[0]

expect(payload.messages[1].content).toEqual([
{
type: 'thinking',
thinking: 'Prior signed thinking.',
signature: 'signed-thinking-text-only',
},
{ type: 'text', text: 'Prior answer.' },
])
})

it('merges multiple consecutive tool result messages into one user message', async () => {
// When multiple tools are called, each tool result becomes a role:'user' message.
// These must be merged into a single user message.
Expand Down
6 changes: 5 additions & 1 deletion packages/typescript/ai-client/src/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ export class ChatClient {
this.events.textUpdated(this.currentStreamId, messageId, content)
}
},
onThinkingUpdate: (messageId: string, content: string) => {
onThinkingUpdate: (
messageId: string,
_stepId: string,
content: string,
) => {
// Emit thinking update to devtools
if (this.currentStreamId) {
this.events.thinkingUpdated(
Expand Down
Loading
Loading