diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v4/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v4/test.ts index a4891d7850ce..a5219f76a88e 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v4/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v4/test.ts @@ -19,6 +19,7 @@ import { GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, } from '../../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../../utils/runner'; +import { isOrchestrionEnabled } from '../../../../utils'; /** * Helper to match a typed attribute value in a SerializedStreamedSpan. @@ -29,7 +30,7 @@ function attr(value: unknown) { return expect.objectContaining({ value }); } -describe('Vercel AI integration (streaming)', () => { +describe.skipIf(isOrchestrionEnabled())('Vercel AI integration (streaming v4)', () => { afterAll(() => { cleanupChildProcesses(); }); diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v6/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v6/test.ts index 4c8c6c72749e..2e8ccc0c1602 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v6/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/span-streaming-v6/test.ts @@ -19,6 +19,7 @@ import { GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, } from '../../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../../utils/runner'; +import { isOrchestrionEnabled } from '../../../../utils'; /** * Helper to match a typed attribute value in a SerializedStreamedSpan. @@ -29,11 +30,13 @@ function attr(value: unknown) { return expect.objectContaining({ value }); } -describe('Vercel AI integration (streaming, V6)', () => { +describe('Vercel AI integration (streaming, v6)', () => { afterAll(() => { cleanupChildProcesses(); }); + const origin = isOrchestrionEnabled() ? 'auto.vercelai.channel' : 'auto.vercelai.otel'; + const EXPECTED_SPANS_DEFAULT_PII_FALSE = { items: expect.arrayContaining([ // First span - invoke_agent for simple generateText @@ -48,10 +51,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(30), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('invoke_agent'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.invoke_agent'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), - 'vercel.ai.pipeline.name': attr('generateText'), - 'vercel.ai.streaming': attr(false), - 'vercel.ai.request.headers.user-agent': expect.objectContaining({ value: expect.any(String) }), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Second span - generate_content for simple generateText @@ -66,9 +66,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(30), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('generate_content'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.generate_content'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), - 'vercel.ai.pipeline.name': attr('generateText.doGenerate'), - 'vercel.ai.streaming': attr(false), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Third span - invoke_agent for explicit telemetry generateText @@ -82,7 +80,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(30), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('invoke_agent'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.invoke_agent'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Fourth span - tool call invoke_agent @@ -96,7 +94,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(40), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('invoke_agent'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.invoke_agent'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Fifth span - tool call generate_content @@ -110,7 +108,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(40), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('generate_content'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.generate_content'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Sixth span - execute_tool @@ -124,7 +122,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_TOOL_TYPE_ATTRIBUTE]: attr('function'), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('execute_tool'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.execute_tool'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), ]), @@ -149,9 +147,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(30), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('invoke_agent'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.invoke_agent'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), - 'vercel.ai.pipeline.name': attr('generateText'), - 'vercel.ai.streaming': attr(false), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Second span - generate_content with input/output messages @@ -170,7 +166,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(30), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('generate_content'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.generate_content'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Third span - explicit telemetry invoke_agent with messages @@ -188,7 +184,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(30), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('invoke_agent'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.invoke_agent'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Fourth span - tool call invoke_agent with messages (V6: no text part, only tool_call) @@ -208,7 +204,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(40), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('invoke_agent'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.invoke_agent'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Fifth span - tool call generate_content with available_tools @@ -225,7 +221,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(40), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('generate_content'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.generate_content'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), // Sixth span - execute_tool with description and input/output @@ -241,7 +237,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_TOOL_TYPE_ATTRIBUTE]: attr('function'), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('execute_tool'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.execute_tool'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), ]), @@ -254,7 +250,7 @@ describe('Vercel AI integration (streaming, V6)', () => { attributes: expect.objectContaining({ [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('invoke_agent'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.invoke_agent'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), expect.objectContaining({ @@ -267,7 +263,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: attr(40), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('generate_content'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.generate_content'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), expect.objectContaining({ @@ -279,7 +275,7 @@ describe('Vercel AI integration (streaming, V6)', () => { [GEN_AI_TOOL_TYPE_ATTRIBUTE]: attr('function'), [GEN_AI_OPERATION_NAME_ATTRIBUTE]: attr('execute_tool'), [SEMANTIC_ATTRIBUTE_SENTRY_OP]: attr('gen_ai.execute_tool'), - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr('auto.vercelai.otel'), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: attr(origin), }), }), ]), diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts index d56815058097..b559610b81a9 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts @@ -22,8 +22,9 @@ import { GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, } from '../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner'; +import { isOrchestrionEnabled } from '../../../utils'; -describe('Vercel AI integration', () => { +describe.skipIf(isOrchestrionEnabled())('Vercel AI integration (v4)', () => { afterAll(() => { cleanupChildProcesses(); }); diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v5/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/v5/test.ts index 0a5e6cebebbf..e8920c4e845b 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v5/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v5/test.ts @@ -18,8 +18,9 @@ import { GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, } from '../../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../../utils/runner'; +import { isOrchestrionEnabled } from '../../../../utils'; -describe('Vercel AI integration (V5)', () => { +describe.skipIf(isOrchestrionEnabled())('Vercel AI integration (v5)', () => { afterAll(() => { cleanupChildProcesses(); }); diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-concurrent-stream.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-concurrent-stream.mjs new file mode 100644 index 000000000000..e384661b3a4a --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-concurrent-stream.mjs @@ -0,0 +1,61 @@ +import * as Sentry from '@sentry/node'; +import { streamText } from 'ai'; +import { MockLanguageModelV3, simulateReadableStream } from 'ai/test'; + +function makeStreamModel(text) { + return new MockLanguageModelV3({ + doStream: async () => ({ + stream: simulateReadableStream({ + chunks: [ + { type: 'stream-start', warnings: [] }, + { type: 'text-start', id: '0' }, + { type: 'text-delta', id: '0', delta: text }, + { type: 'text-end', id: '0' }, + { + type: 'finish', + finishReason: { unified: 'stop', raw: 'stop' }, + usage: { + inputTokens: { total: 10, noCache: 10, cached: 0 }, + outputTokens: { total: 20, noCache: 20, cached: 0 }, + totalTokens: { total: 30, noCache: 30, cached: 0 }, + }, + }, + ], + }), + }), + }); +} + +async function consume(result) { + for await (const _part of result.fullStream) { + void _part; + } +} + +async function run() { + // A single model instance shared by two *concurrent* streamText calls. The shared model carries + // only a single captured-parent slot, so naive parent tracking could attribute a model call to + // whichever operation resolved the model last. Each `generate_content` (doStream) must land under + // its own `invoke_agent`, and both invoke_agents under `main`. + const sharedModel = makeStreamModel('shared!'); + + await Sentry.startSpan({ op: 'function', name: 'main' }, async () => { + // Start both operations before consuming either, so both resolve the shared model first. + const streams = [ + streamText({ + experimental_telemetry: { isEnabled: true }, + model: sharedModel, + prompt: 'Concurrent stream A?', + }), + streamText({ + experimental_telemetry: { isEnabled: true }, + model: sharedModel, + prompt: 'Concurrent stream B?', + }), + ]; + + await Promise.all(streams.map(consume)); + }); +} + +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-embeddings.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-embeddings.mjs new file mode 100644 index 000000000000..52cbf4d3ded5 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-embeddings.mjs @@ -0,0 +1,35 @@ +import * as Sentry from '@sentry/node'; +import { embed, embedMany } from 'ai'; +import { MockEmbeddingModelV3 } from 'ai/test'; + +async function run() { + await Sentry.startSpan({ op: 'function', name: 'main' }, async () => { + // Single embedding + await embed({ + model: new MockEmbeddingModelV3({ + doEmbed: async () => ({ + embeddings: [[0.1, 0.2, 0.3]], + usage: { tokens: 10 }, + }), + }), + value: 'Embedding test!', + }); + + // Multiple embeddings + await embedMany({ + model: new MockEmbeddingModelV3({ + maxEmbeddingsPerCall: 5, + doEmbed: async () => ({ + embeddings: [ + [0.1, 0.2, 0.3], + [0.4, 0.5, 0.6], + ], + usage: { tokens: 20 }, + }), + }), + values: ['First input', 'Second input'], + }); + }); +} + +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts index bb579a8f9443..bb7fa96b197e 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts @@ -2,6 +2,7 @@ import { NODE_VERSION, type Event } from '@sentry/node'; import { afterAll, describe, expect } from 'vitest'; import { GEN_AI_CONVERSATION_ID_ATTRIBUTE, + GEN_AI_EMBEDDINGS_INPUT_ATTRIBUTE, GEN_AI_INPUT_MESSAGES_ATTRIBUTE, GEN_AI_OUTPUT_MESSAGES_ATTRIBUTE, GEN_AI_REQUEST_AVAILABLE_TOOLS_ATTRIBUTE, @@ -22,6 +23,7 @@ import { GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, } from '../../../../../../packages/core/src/tracing/ai/gen-ai-attributes'; import { cleanupChildProcesses, createEsmAndCjsTests, createEsmTests } from '../../../../utils/runner'; +import { isOrchestrionEnabled } from '../../../../utils'; describe.each([ ['6', '^6.0.0'], @@ -36,9 +38,11 @@ describe.each([ const nodeVersion = NODE_VERSION.major; const failsOnCjs = version === '7' && nodeVersion === 18; - // v6 is instrumented via the OTel processor, v7 via the `ai:telemetry` tracing-channel subscriber, - // so the span origin differs by version. - const expectedOrigin = version === '7' ? 'auto.vercelai.channel' : 'auto.vercelai.otel'; + const usesChannels = version === '7' || isOrchestrionEnabled(); + + // in v7 and orchestrion mode, we use the channel-based integration + // else, we use the OTel processor + const expectedOrigin = usesChannels ? 'auto.vercelai.channel' : 'auto.vercelai.otel'; // We only run this in ESM and CJS to verify full support // Other suites we only run in ESM to simplify the test setup @@ -233,10 +237,10 @@ describe.each([ // On v6, vercel AI natively defaults to recording inputs and outputs by default when telemetry is enabled // On v7, we do not have access to this, so this defaults to false in this case expect(secondInvokeAgentSpan.attributes?.[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]?.value).toEqual( - version === '6' ? '[{"role":"user","content":"Where is the second span?"}]' : undefined, + !usesChannels ? '[{"role":"user","content":"Where is the second span?"}]' : undefined, ); expect(secondInvokeAgentSpan.attributes?.[GEN_AI_OUTPUT_MESSAGES_ATTRIBUTE]?.value).toEqual( - version === '6' + !usesChannels ? '[{"role":"assistant","parts":[{"type":"text","content":"Second span here!"}],"finish_reason":"stop"}]' : undefined, ); @@ -515,6 +519,56 @@ describe.each([ }, ); + createEsmTests( + __dirname, + 'scenario-concurrent-stream.mjs', + 'instrument.mjs', + (createRunner, test) => { + // A single model instance shared by two concurrent `streamText` calls carries only one + // captured-parent slot, so both model calls must still land under their own `invoke_agent` — not + // collapse onto whichever operation resolved the shared model last. + test.skipIf(version === '7' && nodeVersion === 18)( + 'parents concurrent streamText calls that share one model instance correctly', + async () => { + await createRunner() + .expect({ transaction: { transaction: 'main' } }) + .expect({ + span: container => { + const invokeAgents = container.items.filter( + span => span.attributes?.['sentry.op']?.value === 'gen_ai.invoke_agent', + ); + const generateContents = container.items.filter( + span => span.attributes?.['sentry.op']?.value === 'gen_ai.generate_content', + ); + + // Two concurrent operations -> two invoke_agent + two generate_content spans. + expect(invokeAgents).toHaveLength(2); + expect(generateContents).toHaveLength(2); + + const agentSpanIds = new Set(invokeAgents.map(span => span.span_id)); + + // Each model call lands under an invoke_agent span... + for (const span of generateContents) { + expect(agentSpanIds.has(span.parent_span_id!)).toBe(true); + } + // ...a distinct one each (no cross-attribution despite the shared model instance)... + expect(new Set(generateContents.map(span => span.parent_span_id)).size).toBe(2); + // ...and both operations sit under the same `main` parent. + expect(new Set(invokeAgents.map(span => span.parent_span_id)).size).toBe(1); + }, + }) + .start() + .completed(); + }, + ); + }, + { + additionalDependencies: { + ai: vercelAiVersion, + }, + }, + ); + createEsmTests( __dirname, 'scenario-stream-text.mjs', @@ -652,4 +706,61 @@ describe.each([ }, }, ); + + createEsmTests( + __dirname, + 'scenario-embeddings.mjs', + 'instrument-with-pii.mjs', + (createRunner, test) => { + // `ai` v7 only routes `embed` through its telemetry tracing channel — `embedMany` is dispatched + // via the callback-only path and never published — so the channel-based integration (v7 and v6 + // orchestrion) cannot see it there. On v6 both the OTel processor and the orchestrion channels + // instrument `embedMany`, so its span is expected only on v6. + const embedManyInstrumented = version === '6'; + + test('creates embeddings spans for embed and embedMany', async () => { + await createRunner() + .expect({ transaction: { transaction: 'main' } }) + .expect({ + span: container => { + // Every emitted gen_ai span carries the version-appropriate origin. + container.items + .filter(s => String(s.attributes?.['sentry.op']?.value ?? '').startsWith('gen_ai.')) + .forEach(s => expect(s.attributes?.['sentry.origin']?.value).toBe(expectedOrigin)); + + const embedSpan = container.items.find( + span => span.attributes?.[GEN_AI_EMBEDDINGS_INPUT_ATTRIBUTE]?.value === 'Embedding test!', + )!; + expect(embedSpan).toBeDefined(); + expect(embedSpan.name).toBe('embeddings mock-model-id'); + expect(embedSpan.status).toBe('ok'); + expect(embedSpan.attributes?.['sentry.op']?.value).toBe('gen_ai.embeddings'); + expect(embedSpan.attributes?.[GEN_AI_REQUEST_MODEL_ATTRIBUTE]?.value).toBe('mock-model-id'); + expect(embedSpan.attributes?.[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]?.value).toBe(10); + + const embedManySpan = container.items.find( + span => + span.attributes?.[GEN_AI_EMBEDDINGS_INPUT_ATTRIBUTE]?.value === '["First input","Second input"]', + ); + if (embedManyInstrumented) { + expect(embedManySpan).toBeDefined(); + expect(embedManySpan!.name).toBe('embeddings mock-model-id'); + expect(embedManySpan!.status).toBe('ok'); + expect(embedManySpan!.attributes?.['sentry.op']?.value).toBe('gen_ai.embeddings'); + expect(embedManySpan!.attributes?.[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]?.value).toBe(20); + } else { + expect(embedManySpan).toBeUndefined(); + } + }, + }) + .start() + .completed(); + }); + }, + { + additionalDependencies: { + ai: vercelAiVersion, + }, + }, + ); }); diff --git a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts index 6bba862cdfc0..247f93cf3526 100644 --- a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts +++ b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts @@ -2,6 +2,7 @@ import { mysqlChannelIntegration, lruMemoizerChannelIntegration, detectOrchestrionSetup, + vercelAiChannelIntegration, } from '@sentry/server-utils/orchestrion'; import { registerDiagnosticsChannelInjection } from '@sentry/server-utils/orchestrion/register'; import type { DiagnosticsChannelInjection } from './diagnosticsChannelInjection'; @@ -41,7 +42,11 @@ import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInject */ export function experimentalUseDiagnosticsChannelInjection(): void { setDiagnosticsChannelInjectionLoader((): DiagnosticsChannelInjection => { - const integrations = [mysqlChannelIntegration(), lruMemoizerChannelIntegration()] as const; + const integrations = [ + mysqlChannelIntegration(), + lruMemoizerChannelIntegration(), + vercelAiChannelIntegration(), + ] as const; const replacedOtelIntegrationNames = integrations.map(i => i.name); return { diff --git a/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts b/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts new file mode 100644 index 000000000000..28fea1cb5229 --- /dev/null +++ b/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts @@ -0,0 +1,40 @@ +import type { IntegrationFn } from '@sentry/core'; +import { defineIntegration, extendIntegration, waitForTracingChannelBinding } from '@sentry/core'; +import { vercelAiIntegration as baseVercelAiIntegration } from '../../vercel-ai'; +import * as dc from 'node:diagnostics_channel'; +import { subscribeVercelAiOrchestrionChannels } from '../../vercel-ai/vercel-ai-orchestrion-v6-subscriber'; + +type VercelAiOptions = Parameters[0]; + +// In channel-based (orchestrion) mode we emit our own `gen_ai.*` spans from the +// diagnostics channels. The `ai` SDK would otherwise emit its own native +// OpenTelemetry spans whenever the user enables `experimental_telemetry`, which +// would be duplicates. Rather than dropping those after the fact, the v6 +// subscriber suppresses them at the source: it flips the wrapped call's +// `experimental_telemetry.isEnabled` to `false`, so `ai` falls back to its +// internal no-op tracer and never creates the native spans in the first place. +// See `subscribeVercelAiOrchestrionChannels`. +const _vercelAiChannelIntegration = ((options: VercelAiOptions = {}) => { + const parentIntegration = baseVercelAiIntegration(options); + + return extendIntegration(parentIntegration, { + options, + setupOnce() { + // Bail if this is not available + if (!dc.tracingChannel) { + return; + } + + waitForTracingChannelBinding(() => { + subscribeVercelAiOrchestrionChannels(dc.tracingChannel, options); + }); + }, + }); +}) satisfies IntegrationFn; + +/** + * Auto-instrument the `ai` SDK. Supported are: + * - v7 via native `ai:telemetry` tracing channel + * - v6 via orchestrion `orchestrion:ai:*` channels + */ +export const vercelAiChannelIntegration = defineIntegration(_vercelAiChannelIntegration); diff --git a/packages/server-utils/src/orchestrion/channels.ts b/packages/server-utils/src/orchestrion/channels.ts index ad2d8ccdd4dd..57202cd20f73 100644 --- a/packages/server-utils/src/orchestrion/channels.ts +++ b/packages/server-utils/src/orchestrion/channels.ts @@ -14,6 +14,19 @@ export const CHANNELS = { MYSQL_QUERY: 'orchestrion:mysql:query', LRU_MEMOIZER_LOAD: 'orchestrion:lru-memoizer:load', + // Vercel AI (`ai`) v6: orchestrion injects these so the same channel-based + // integration that consumes `ai`'s native `ai:telemetry` channel (v7) can + // also instrument v6. Each maps to a top-level function in `ai`'s bundle. + VERCEL_AI_GENERATE_TEXT: 'orchestrion:ai:generateText', + VERCEL_AI_STREAM_TEXT: 'orchestrion:ai:streamText', + VERCEL_AI_EMBED: 'orchestrion:ai:embed', + VERCEL_AI_EMBED_MANY: 'orchestrion:ai:embedMany', + VERCEL_AI_EXECUTE_TOOL_CALL: 'orchestrion:ai:executeToolCall', + // `resolveLanguageModel` is the single chokepoint every model call flows + // through; we wrap it to monkey-patch `doGenerate`/`doStream` on the returned + // model (the model-call site itself is an inline call with no injectable + // definition). + VERCEL_AI_RESOLVE_LANGUAGE_MODEL: 'orchestrion:ai:resolveLanguageModel', } as const; export type ChannelName = (typeof CHANNELS)[keyof typeof CHANNELS]; diff --git a/packages/server-utils/src/orchestrion/config.ts b/packages/server-utils/src/orchestrion/config.ts index 104df2185386..7cc53d3d74b6 100644 --- a/packages/server-utils/src/orchestrion/config.ts +++ b/packages/server-utils/src/orchestrion/config.ts @@ -11,6 +11,19 @@ import type { InstrumentationConfig } from '@apm-js-collab/code-transformer'; * `channelName` here is the unprefixed suffix; the actual diagnostics_channel * name is `orchestrion:${module.name}:${channelName}` (see `channels.ts`). */ +/** + * `ai` ships a single bundled entry per module system, so each instrumented + * function needs one config entry per file (the app loads whichever matches its + * module system). This expands a single target into both. + */ +function vercelAiV6Entries(channelName: string, functionName: string, kind: 'Async' | 'Sync'): InstrumentationConfig[] { + return ['dist/index.js', 'dist/index.mjs'].map(filePath => ({ + channelName, + module: { name: 'ai', versionRange: '>=6.0.0 <7.0.0', filePath }, + functionQuery: { functionName, kind }, + })); +} + export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ { channelName: 'query', @@ -38,6 +51,20 @@ export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ module: { name: 'lru-memoizer', versionRange: '>=2.1.0 <4', filePath: 'lib/async.js' }, functionQuery: { functionName: 'memoizedFunction', kind: 'Callback' }, }, + // Vercel AI v6: mirror the v7 native `ai:telemetry` channel by injecting + // channels into the top-level entry points. `resolveLanguageModel` is wrapped + // not to span it, but so the subscriber can monkey-patch `doGenerate`/ + // `doStream` on the returned model (the only way to span the model call, + // which is an inline call with no injectable definition in `ai`). + // `streamText` returns its result synchronously (streaming is lazy), so it's + // `Sync`; the subscriber binds the span via `bindTracingChannelToSpan`, which + // ends it when the (synchronous) call returns. + ...vercelAiV6Entries('generateText', 'generateText', 'Async'), + ...vercelAiV6Entries('streamText', 'streamText', 'Sync'), + ...vercelAiV6Entries('embed', 'embed', 'Async'), + ...vercelAiV6Entries('embedMany', 'embedMany', 'Async'), + ...vercelAiV6Entries('executeToolCall', 'executeToolCall', 'Async'), + ...vercelAiV6Entries('resolveLanguageModel', 'resolveLanguageModel', 'Sync'), ]; /** diff --git a/packages/server-utils/src/orchestrion/index.ts b/packages/server-utils/src/orchestrion/index.ts index 4b182e51ec13..23eb7a7ac4f7 100644 --- a/packages/server-utils/src/orchestrion/index.ts +++ b/packages/server-utils/src/orchestrion/index.ts @@ -1,3 +1,4 @@ export { detectOrchestrionSetup } from './detect'; export { mysqlChannelIntegration } from '../integrations/tracing-channel/mysql'; export { lruMemoizerChannelIntegration } from '../integrations/tracing-channel/lru-memoizer'; +export { vercelAiChannelIntegration } from '../integrations/tracing-channel/vercel-ai'; diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts index f2b4b0debff9..1f25d48edff4 100644 --- a/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts +++ b/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts @@ -86,7 +86,7 @@ const toolDescriptionsByCallId = new Map>(); // Only top-level operations own the `callId` → operationId mapping; `step`/`languageModelCall`/ // `executeTool` share the parent's `callId`, so they must not clear it. -const ROOT_OPERATION_TYPES = new Set(['generateText', 'streamText', 'embed', 'rerank']); +const ROOT_OPERATION_TYPES = new Set(['generateText', 'streamText', 'embed', 'embedMany', 'rerank']); /** Drop the per-operation `callId` maps once the owning top-level operation settles (success or error). */ export function clearOperationId(data: VercelAiChannelMessage): void { @@ -95,11 +95,21 @@ export function clearOperationId(data: VercelAiChannelMessage): void { } const callId = asString(data.event.callId); if (callId) { - operationIdByCallId.delete(callId); - toolDescriptionsByCallId.delete(callId); + clearOperationCallId(callId); } } +/** + * Drop the per-operation `callId` maps for a single id. The v6 orchestrion adapter uses this to clear a + * `streamText` operation only after its lazily-run model call settles — the operation's own span ends + * synchronously (when `streamText` returns) but the model call runs later as the stream is consumed, and + * it still needs the operation's `operationId`/`isStream` entry to name itself `ai.streamText.doStream`. + */ +export function clearOperationCallId(callId: string): void { + operationIdByCallId.delete(callId); + toolDescriptionsByCallId.delete(callId); +} + /** Record tool name → description from an event's `tools`, so tool spans can backfill the description. */ function recordToolDescriptions(callId: string | undefined, tools: unknown): void { if (!callId || !Array.isArray(tools)) { @@ -148,6 +158,7 @@ export type ChannelEventType = | 'languageModelCall' | 'executeTool' | 'embed' + | 'embedMany' | 'rerank'; /** @@ -172,10 +183,10 @@ export interface VercelAiChannelMessage { * nested AI SDK operations (model calls, tool calls) become children of the enclosing span without * any manual parent bookkeeping here. */ -type VercelAiTracingChannelFactory = (name: string) => TracingChannel; +export type VercelAiTracingChannelFactory = (name: string) => TracingChannel; /** Integration-level recording options, pinned at subscribe time so we never look the integration up per event. */ -interface VercelAiChannelOptions { +export interface VercelAiChannelOptions { recordInputs?: boolean; recordOutputs?: boolean; enableTruncation?: boolean; @@ -264,10 +275,14 @@ export function createSpanFromMessage( case 'executeTool': return buildToolSpan(event, recordInputs); case 'embed': + case 'embedMany': { + // `embed` carries a single `value`; `embedMany` a `values` array — both map to the embeddings input. + const input = type === 'embedMany' ? event.values : event.value; return startGenAiSpan(GEN_AI_EMBEDDINGS_OPERATION, modelId, { ...baseAttributes, - ...(recordInputs && event.value !== undefined ? { [GEN_AI_EMBEDDINGS_INPUT]: safeStringify(event.value) } : {}), + ...(recordInputs && input !== undefined ? { [GEN_AI_EMBEDDINGS_INPUT]: safeStringify(input) } : {}), }); + } case 'rerank': return startGenAiSpan(GEN_AI_RERANK_OPERATION, modelId, baseAttributes); default: diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts new file mode 100644 index 000000000000..cc7f82e194c1 --- /dev/null +++ b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts @@ -0,0 +1,438 @@ +import type { Span } from '@sentry/core'; +import { debug, getActiveSpan, SPAN_STATUS_ERROR, withActiveSpan } from '@sentry/core'; +import { DEBUG_BUILD } from '../debug-build'; +import { CHANNELS } from '../orchestrion/channels'; +import { bindTracingChannelToSpan, type TracingChannelPayloadWithSpan } from '../tracing-channel'; +import { + clearOperationCallId, + clearOperationId, + createSpanFromMessage, + enrichSpanOnEnd, + type VercelAiChannelMessage, + type VercelAiChannelOptions, + type VercelAiTracingChannelFactory, +} from './vercel-ai-dc-subscriber'; + +/** + * v6 channel adapter for the Vercel AI (`ai`) SDK. + * + * `ai` >= 7 publishes a normalized `ai:telemetry` tracing channel natively + * (consumed by `subscribeVercelAiTracingChannel`). v6 has no such channel, so + * orchestrion injects `orchestrion:ai:*` channels around the top-level + * functions (see `orchestrion/config.ts`). The injected channels carry only the + * wrapped call's `{ arguments, result, error }` — NOT v7's normalized `event` + * object — so this adapter reconstructs an equivalent {@link VercelAiChannelMessage} + * from v6's argument/result shapes and delegates to the SAME span-building core + * (`createSpanFromMessage` / `enrichSpanOnEnd`) the v7 subscriber uses, so the + * emitted spans are identical between v6 and v7. + * + * Like the v7 subscriber, each operation channel is wired up via + * {@link bindTracingChannelToSpan}, which binds the opened span into the runtime's + * async context for the duration of the traced call and ends it when the call + * settles. That binding is what lets the model call below find its enclosing + * `invoke_agent` span via the active context (see {@link resolveModelCallParent}). + * + * The model call (`languageModelCall` / `generate_content` span) has no + * injectable definition in `ai`, so we instead wrap `resolveLanguageModel` (the + * single chokepoint every model call flows through) and monkey-patch + * `doGenerate`/`doStream` on the returned model. + */ + +/** Shape orchestrion's transform attaches to the tracing-channel context. */ +interface OrchestrionContext { + arguments: unknown[]; + result?: unknown; + error?: unknown; +} + +/** Builds the normalized message for a channel from the wrapped call's first-arg options. */ +type MessageBuilder = (options: Record, telemetry: Record) => VercelAiChannelMessage; + +/** A resolved `ai` language model — has `doGenerate`/`doStream` and identity fields. */ +interface ResolvedModel { + modelId?: string; + provider?: string; + doGenerate?: (...args: unknown[]) => Promise; + doStream?: (...args: unknown[]) => Promise; +} + +const PATCHED = Symbol('SentryVercelAiModelPatched'); + +/** A resolved model with our patch bookkeeping (idempotency flag). */ +type PatchableModel = ResolvedModel & { [PATCHED]?: boolean }; + +// Per-operation correlation id. No Date/random (unavailable / non-deterministic) — a counter is enough. +let callIdCounter = 0; +function nextCallId(): string { + return `v6-${++callIdCounter}`; +} + +// The message built on `start` for each operation, keyed by the (stable-identity) channel context, so +// the `beforeSpanEnd` handler can enrich the span from the settled result and clear the `callId` maps. +const messages = new WeakMap(); +// The spans we opened for top-level operations, and each one's `callId`. A model call resolves its +// parent against this set (so it never mis-attributes to the enclosing `main`/user span) and reads the +// parent's `callId` so its span can be named after the operation (e.g. `ai.streamText.doStream`). +const operationSpans = new WeakSet(); +const callIdBySpan = new WeakMap(); + +// The `experimental_telemetry` objects we swapped in to suppress `ai`'s native OTel spans (see +// `suppressNativeTelemetry`). Our skip logic treats `isEnabled === false` as "user disabled telemetry, +// emit no span"; without this set, a call whose options object we already neutralized — or a user-shared +// telemetry object we replaced on a prior call — would be misread as user-disabled and lose its span. +const suppressedTelemetry = new WeakSet(); + +let subscribed = false; + +/** + * Subscribe the v6 orchestrion channel adapter. Safe to always call: inert on + * `ai` >= 7 (those channels are never published) and when orchestrion injection + * isn't active. Idempotent. + * + * `tracingChannel` is the platform-provided factory (the same one passed to + * `subscribeVercelAiTracingChannel`); `options` pins the recording settings at + * subscribe time so we never look the integration up per event. + */ +export function subscribeVercelAiOrchestrionChannels( + tracingChannel: VercelAiTracingChannelFactory, + options: VercelAiChannelOptions = {}, +): void { + if (subscribed) { + return; + } + subscribed = true; + + try { + bindOperation(tracingChannel, CHANNELS.VERCEL_AI_GENERATE_TEXT, buildTextMessage('generateText'), options); + bindOperation(tracingChannel, CHANNELS.VERCEL_AI_STREAM_TEXT, buildTextMessage('streamText'), options); + bindOperation( + tracingChannel, + CHANNELS.VERCEL_AI_EMBED, + (callOptions, telemetry) => ({ + type: 'embed', + event: { + callId: nextCallId(), + ...modelFields(callOptions.model), + maxRetries: callOptions.maxRetries, + value: callOptions.value, + ...recording(telemetry), + }, + }), + options, + ); + bindOperation( + tracingChannel, + CHANNELS.VERCEL_AI_EMBED_MANY, + // `embedMany` takes a `values` array (vs `embed`'s single `value`); the shared core reads it as the + // embeddings input, matching the OTel path's batch `ai.embedMany` span. + (callOptions, telemetry) => ({ + type: 'embedMany', + event: { + callId: nextCallId(), + ...modelFields(callOptions.model), + maxRetries: callOptions.maxRetries, + values: callOptions.values, + ...recording(telemetry), + }, + }), + options, + ); + bindOperation( + tracingChannel, + CHANNELS.VERCEL_AI_EXECUTE_TOOL_CALL, + (callOptions, telemetry) => ({ + type: 'executeTool', + // v6 carries the tool definitions on the executeToolCall args (a record keyed by name); + // the shared core reads the matching tool's `description` for the span. + event: { + callId: nextCallId(), + toolCall: callOptions.toolCall, + tools: callOptions.tools, + ...recording(telemetry), + }, + }), + options, + ); + subscribeResolveLanguageModel(tracingChannel, CHANNELS.VERCEL_AI_RESOLVE_LANGUAGE_MODEL, options); + } catch { + DEBUG_BUILD && debug.log('Vercel AI orchestrion channel subscription failed.'); + } +} + +/** + * Bind one operation channel: `getSpan` opens a span from the message reconstructed out of the wrapped + * call's first argument; `beforeSpanEnd` enriches it from the settled result (tokens, output messages, + * finish reasons, …) before the helper ends the span. + * + * An operation whose `experimental_telemetry.isEnabled` is explicitly `false` is skipped entirely (no + * span): the orchestrion channel fires regardless of that flag, whereas v7's native `ai:telemetry` + * channel is simply not published in that case — so we reproduce v7's "no telemetry → no span". + */ +function bindOperation( + tracingChannel: VercelAiTracingChannelFactory, + channelName: string, + build: MessageBuilder, + options: VercelAiChannelOptions, +): void { + const channel = tracingChannel(channelName); + + // Build the operation span from the wrapped call's first argument and track it (so a model call can + // resolve it as its parent). `bindTracingChannelToSpan` calls this once at channel `start` and makes + // the returned span the active async context for the operation's duration — that active span is what + // `resolveModelCallParent` reads. It also sets `data._sentrySpan`, so we don't here. + const buildOperationSpan = (data: TracingChannelPayloadWithSpan): Span | undefined => { + const callOptions = isRecord(data.arguments[0]) ? data.arguments[0] : {}; + const telemetry = isRecord(callOptions.experimental_telemetry) ? callOptions.experimental_telemetry : {}; + // `isEnabled === false` means the user opted out — emit no span. But `isEnabled` is also `false` on + // the telemetry object we swap in to suppress native spans, so don't mistake our own object for a + // user opt-out (which would drop the span on a call whose options we already neutralized). + if (telemetry.isEnabled === false && !suppressedTelemetry.has(telemetry)) { + return undefined; + } + const message = build(callOptions, telemetry); + // Stop `ai` from emitting its own native OTel spans for this call — we build the equivalent spans + // from the channels, so the SDK's would be duplicates. Reads above have already captured everything + // we need off `telemetry`. + suppressNativeTelemetry(callOptions, telemetry); + const span = createSpanFromMessage(message, options); + if (span) { + messages.set(data, message); + operationSpans.add(span); + const callId = asString(message.event.callId); + if (callId) { + callIdBySpan.set(span, callId); + } + } + return span; + }; + + bindTracingChannelToSpan( + channel, + (data: TracingChannelPayloadWithSpan) => buildOperationSpan(data), + { + beforeSpanEnd: (span, data) => { + const message = messages.get(data); + if (!message) { + return; + } + // The helper's `error` handler already set the span status; only enrich from a successful result. + if (!('error' in data)) { + // v6's `executeToolCall` returns the tool result/error object directly, whereas the shared core + // (matching v7) expects it nested under `output`; wrap it so tool-error detection works. + message.result = message.type === 'executeTool' ? { output: data.result } : data.result; + enrichSpanOnEnd(span, message, options); + } + // A `streamText` model call runs after this (synchronously-returning) operation's span has + // already ended, so its `callId` entry must outlive the operation — it's cleared once the model + // call settles (see `patchModelMethod`). Every other operation can clear here. + if (message.type !== 'streamText') { + clearOperationId(message); + } + messages.delete(data); + }, + }, + ); +} + +/** + * Neutralize `ai`'s native OpenTelemetry instrumentation for this call by pointing + * `experimental_telemetry` at a copy with `isEnabled: false`. `ai`'s `getTracer` then returns its + * internal no-op tracer, so it never creates (nor sets active) the duplicate `ai.*` spans we'd + * otherwise have to drop. + * + * Only a call that explicitly enabled telemetry emits native spans — otherwise `ai` already uses its + * no-op tracer, so there's nothing to suppress and we leave the user's options untouched. When + * `isEnabled === true`, `telemetry` is `callOptions.experimental_telemetry` and `callOptions` is the + * real first argument the SDK will read, so the reassignment takes effect for the wrapped call. We + * replace rather than mutate in place, so a telemetry object the user shares across calls keeps its own + * `isEnabled: true`; the replacement is tracked in `suppressedTelemetry` so our skip logic doesn't + * later read it back as a user opt-out. + */ +function suppressNativeTelemetry(callOptions: Record, telemetry: Record): void { + if (telemetry.isEnabled !== true) { + return; + } + const suppressed = { ...telemetry, isEnabled: false }; + suppressedTelemetry.add(suppressed); + callOptions.experimental_telemetry = suppressed; +} + +/** + * `resolveLanguageModel` returns the model every call flows through. We don't span it — on `end` we + * monkey-patch `doGenerate`/`doStream` on the returned model so each invocation produces a + * `languageModelCall` span parented to the enclosing invoke_agent span. + */ +function subscribeResolveLanguageModel( + tracingChannel: VercelAiTracingChannelFactory, + channelName: string, + options: VercelAiChannelOptions, +): void { + tracingChannel(channelName).subscribe({ + end(rawCtx) { + const ctx = rawCtx as OrchestrionContext; + if (!isRecord(ctx.result)) { + return; + } + const model = ctx.result as PatchableModel; + // Patch the model's `doGenerate`/`doStream` once. The model call recovers its parent from the + // active async context at call time (the operation span `bindTracingChannelToSpan` bound), which + // propagates into the model call for `streamText` too, so there is nothing to capture on the model here. + if (!model[PATCHED]) { + model[PATCHED] = true; + patchModelMethod(model, 'doGenerate', options); + patchModelMethod(model, 'doStream', options); + } + }, + start() { + /* no-op */ + }, + asyncStart() { + /* no-op */ + }, + asyncEnd() { + /* no-op */ + }, + error() { + /* no-op */ + }, + }); +} + +/** + * Pick the invoke_agent span a model call should hang under: the operation span that + * `bindTracingChannelToSpan` planted as the active async context for the enclosing operation. + * + * Because we suppress `ai`'s native telemetry (so it never installs its own span as active), the active + * span inside `doGenerate`/`doStream` is our operation span. The `operationSpans` gate makes this return + * `undefined` when the active span isn't one of ours — e.g. telemetry was disabled for the call so we + * opened no operation span and the active span is the user's enclosing span — so the model call is + * skipped rather than mis-parented. + * + * This covers `generateText`/`embed` (whose model call is awaited inside the operation body) and + * `streamText` alike — `ai` initiates the stream synchronously within the operation's bound context, so + * the later `doStream` continuation restores the same active span even though the operation's span has + * already ended. The per-operation binding also disambiguates concurrent calls that share one model + * instance (a single mutable slot on the shared model could not — it would hold whichever operation + * resolved the model last). + */ +function resolveModelCallParent(): Span | undefined { + const active = getActiveSpan(); + return active && operationSpans.has(active) ? active : undefined; +} + +function patchModelMethod( + model: PatchableModel, + method: 'doGenerate' | 'doStream', + options: VercelAiChannelOptions, +): void { + const original = model[method]; + if (typeof original !== 'function') { + return; + } + model[method] = function (this: unknown, ...args: unknown[]): Promise { + const parent = resolveModelCallParent(); + // No enclosing operation span (e.g. telemetry disabled for the call) → don't open a model-call span. + if (!parent) { + return Promise.resolve(original.apply(this, args)); + } + + const callArgs = isRecord(args[0]) ? args[0] : {}; + // Carry the operation's `callId` so the shared core can name the span after it + // (`ai.generateText.doGenerate` / `ai.streamText.doStream`). + const callId = callIdBySpan.get(parent); + const message: VercelAiChannelMessage = { + type: 'languageModelCall', + event: { + callId, + provider: model.provider, + modelId: model.modelId, + tools: callArgs.tools, + messages: callArgs.prompt, + }, + }; + const span = withActiveSpan(parent, () => createSpanFromMessage(message, options)); + // `languageModelCall` always opens a span; the guard just keeps the wrapper safe if that changes. + if (!span) { + return Promise.resolve(original.apply(this, args)); + } + + // `streamText` ends its operation span synchronously, so its `callId` entry was deliberately left in + // place for this later model call; drop it now that we've used it. + const clearStreamCallId = (): void => { + if (method === 'doStream' && callId) { + clearOperationCallId(callId); + } + }; + + // Both the synchronous throw and the async rejection of the model call must end the span with an + // error status (an `async` `doGenerate`/`doStream` that throws rejects rather than throwing here). + const failSpan = (error: unknown): never => { + span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); + span.end(); + clearStreamCallId(); + throw error; + }; + + try { + const result = Promise.resolve(original.apply(this, args)); + // `doStream` resolves to `{ stream, ... }` before the stream is consumed; we end here (start/end + // bracket the call) to match the channel timing. + return result.then(value => { + message.result = value; + enrichSpanOnEnd(span, message, options); + span.end(); + clearStreamCallId(); + return value; + }, failSpan); + } catch (error) { + return failSpan(error); + } + }; +} + +function buildTextMessage(type: 'generateText' | 'streamText'): MessageBuilder { + return (options, telemetry) => ({ + type, + event: { + callId: nextCallId(), + operationId: `ai.${type}`, + functionId: asString(telemetry.functionId), + ...modelFields(options.model), + maxRetries: options.maxRetries, + // Normalize to the message-array shape the shared core (and v7's channel) expects: a bare string + // `prompt` becomes a single user message, matching the SDK's own normalization. + messages: normalizePromptMessages(options), + ...recording(telemetry), + }, + }); +} + +function normalizePromptMessages(options: Record): unknown { + if (Array.isArray(options.messages)) { + return options.messages; + } + if (typeof options.prompt === 'string') { + return [{ role: 'user', content: options.prompt }]; + } + return options.messages ?? options.prompt; +} + +function recording(telemetry: Record): { recordInputs: unknown; recordOutputs: unknown } { + return { recordInputs: telemetry.recordInputs, recordOutputs: telemetry.recordOutputs }; +} + +function modelFields(model: unknown): { provider?: string; modelId?: string } { + return { provider: modelField(model, 'provider'), modelId: modelField(model, 'modelId') }; +} + +function modelField(model: unknown, field: 'modelId' | 'provider'): string | undefined { + return isRecord(model) ? asString(model[field]) : undefined; +} + +function asString(value: unknown): string | undefined { + return typeof value === 'string' ? value : undefined; +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null; +}