diff --git a/dotnet/src/Client.cs b/dotnet/src/Client.cs index 71c7c8795..6041fe239 100644 --- a/dotnet/src/Client.cs +++ b/dotnet/src/Client.cs @@ -8,6 +8,7 @@ using Microsoft.Extensions.Logging.Abstractions; using System.Collections.Concurrent; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.Net.Sockets; using System.Runtime.ExceptionServices; @@ -1037,7 +1038,8 @@ public async Task CreateSessionAsync(SessionConfig config, Cance Providers: config.Providers, Models: config.Models, ToolFilterPrecedence: toolFilter.ToolFilterPrecedence, - ExpAssignments: config.ExpAssignments); + ExpAssignments: config.ExpAssignments, + EnableGitHubTelemetryForwarding: _options.OnGitHubTelemetry != null ? true : null); var rpcTimestamp = Stopwatch.GetTimestamp(); @@ -1246,7 +1248,8 @@ public async Task ResumeSessionAsync(string sessionId, ResumeSes Providers: config.Providers, Models: config.Models, ToolFilterPrecedence: toolFilter.ToolFilterPrecedence, - ExpAssignments: config.ExpAssignments); + ExpAssignments: config.ExpAssignments, + EnableGitHubTelemetryForwarding: _options.OnGitHubTelemetry != null ? true : null); var rpcTimestamp = Stopwatch.GetTimestamp(); var response = await InvokeRpcAsync( @@ -1724,21 +1727,24 @@ await Rpc.SessionFs.SetProviderAsync( } /// - /// Builds the client-global RPC handler bag at construction time. Currently - /// only the LLM inference provider adapter is registered; returns null when no + /// Builds the client-global RPC handler bag at construction time. Registers + /// the LLM inference provider adapter and/or the GitHub telemetry adapter + /// depending on which options are configured; returns null when no /// client-global API is configured so the registration is skipped entirely. /// private ClientGlobalApiHandlers? BuildClientGlobalApis() { var handler = _options.RequestHandler; - if (handler is null) + var onGitHubTelemetry = _options.OnGitHubTelemetry; + if (handler is null && onGitHubTelemetry is null) { return null; } return new ClientGlobalApiHandlers { - LlmInference = new LlmInferenceAdapter(handler, () => _serverRpc), + LlmInference = handler is null ? null : new LlmInferenceAdapter(handler, () => _serverRpc), + GitHubTelemetry = onGitHubTelemetry is null ? null : new GitHubTelemetryAdapter(onGitHubTelemetry, _logger), }; } @@ -2495,7 +2501,8 @@ internal record CreateSessionRequest( IList? Providers = null, IList? Models = null, OptionsUpdateToolFilterPrecedence? ToolFilterPrecedence = null, - [property: JsonPropertyName("expAssignments")] JsonElement? ExpAssignments = null); + [property: JsonPropertyName("expAssignments")] JsonElement? ExpAssignments = null, + bool? EnableGitHubTelemetryForwarding = null); #pragma warning restore GHCP001 internal record ToolDefinition( @@ -2594,7 +2601,8 @@ internal record ResumeSessionRequest( IList? Providers = null, IList? Models = null, OptionsUpdateToolFilterPrecedence? ToolFilterPrecedence = null, - [property: JsonPropertyName("expAssignments")] JsonElement? ExpAssignments = null); + [property: JsonPropertyName("expAssignments")] JsonElement? ExpAssignments = null, + bool? EnableGitHubTelemetryForwarding = null); #pragma warning restore GHCP001 internal record ResumeSessionResponse( @@ -2713,3 +2721,28 @@ public sealed class ToolResultAIContent(ToolResultObject toolResult) : AIContent /// public ToolResultObject Result => toolResult; } + +/// +/// Bridges the generated client-global handler to +/// the public OnGitHubTelemetry callback, forwarding the generated +/// payload unchanged. +/// +[Experimental(Diagnostics.Experimental)] +internal sealed class GitHubTelemetryAdapter(Func callback, ILogger logger) : Rpc.IGitHubTelemetryHandler +{ + private readonly Func _callback = callback ?? throw new ArgumentNullException(nameof(callback)); + private readonly ILogger _logger = logger ?? NullLogger.Instance; + + public async Task EventAsync(Rpc.GitHubTelemetryNotification request, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + try + { + await _callback(request).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error handling gitHubTelemetry.event notification"); + } + } +} diff --git a/dotnet/src/Types.cs b/dotnet/src/Types.cs index 172d9305f..37cb0ebe6 100644 --- a/dotnet/src/Types.cs +++ b/dotnet/src/Types.cs @@ -281,6 +281,7 @@ private CopilotClientOptions(CopilotClientOptions? other) OnListModels = other.OnListModels; SessionFs = other.SessionFs; RequestHandler = other.RequestHandler; + OnGitHubTelemetry = other.OnGitHubTelemetry; SessionIdleTimeoutSeconds = other.SessionIdleTimeoutSeconds; EnableRemoteSessions = other.EnableRemoteSessions; Mode = other.Mode; @@ -378,6 +379,15 @@ private CopilotClientOptions(CopilotClientOptions? other) [Experimental(Diagnostics.Experimental)] public CopilotRequestHandler? RequestHandler { get; set; } + /// + /// Experimental. Receives GitHub telemetry events the runtime forwards to this + /// connection; setting a handler opts created/resumed sessions into forwarding. + /// The SDK awaits the handler task so it may perform asynchronous work. + /// + [Experimental(Diagnostics.Experimental)] + [EditorBrowsable(EditorBrowsableState.Never)] + public Func? OnGitHubTelemetry { get; set; } + /// /// OpenTelemetry configuration for the runtime. /// When set to a non- instance, the runtime is started with OpenTelemetry instrumentation enabled. diff --git a/dotnet/test/E2E/GitHubTelemetryForwardingE2ETests.cs b/dotnet/test/E2E/GitHubTelemetryForwardingE2ETests.cs new file mode 100644 index 000000000..85f3706e2 --- /dev/null +++ b/dotnet/test/E2E/GitHubTelemetryForwardingE2ETests.cs @@ -0,0 +1,63 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +using System.Collections.Concurrent; +using GitHub.Copilot.Rpc; +using GitHub.Copilot.Test.Harness; +using Xunit; +using Xunit.Abstractions; + +namespace GitHub.Copilot.Test.E2E; + +#pragma warning disable GHCP001 // GitHub telemetry forwarding is experimental. + +public class GitHubTelemetryForwardingE2ETests(E2ETestFixture fixture, ITestOutputHelper output) + : E2ETestBase(fixture, "github_telemetry", output) +{ + [Fact] + public async Task Should_Forward_GitHub_Telemetry_For_A_Live_Session() + { + var notifications = new ConcurrentQueue(); + + await using var client = Ctx.CreateClient(options: new CopilotClientOptions + { + OnGitHubTelemetry = notification => + { + notifications.Enqueue(notification); + return Task.CompletedTask; + }, + }); + + CopilotSession? session = null; + try + { + session = await client.CreateSessionAsync(new SessionConfig + { + OnPermissionRequest = PermissionHandler.ApproveAll, + }); + + await TestHelper.WaitForConditionAsync( + () => !notifications.IsEmpty, + timeout: TimeSpan.FromSeconds(30), + timeoutMessage: "Timed out waiting for GitHub telemetry notification."); + + Assert.True(notifications.TryPeek(out var notification)); + Assert.NotEmpty(notification.SessionId); + Assert.NotNull(notification.Event); + Assert.NotEmpty(notification.Event.Kind); + Assert.IsType(notification.Restricted); + } + finally + { + if (session is not null) + { + await session.DisposeAsync(); + } + + await client.StopAsync(); + } + } +} + +#pragma warning restore GHCP001 diff --git a/dotnet/test/Unit/GitHubTelemetryTests.cs b/dotnet/test/Unit/GitHubTelemetryTests.cs new file mode 100644 index 000000000..4a41c1cb8 --- /dev/null +++ b/dotnet/test/Unit/GitHubTelemetryTests.cs @@ -0,0 +1,441 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +#if NET8_0_OR_GREATER +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Text.Json; +using Xunit; + +using GitHub.Copilot.Rpc; + +namespace GitHub.Copilot.Test.Unit; + +#pragma warning disable GHCP001 // GitHub telemetry forwarding is experimental. + +public sealed class GitHubTelemetryTests +{ + [Fact] + public async Task CreateSession_Opts_Into_Forwarding_When_Handler_Provided() + { + await using var server = await FakeTelemetryServer.StartAsync(); + await using var client = new CopilotClient(new CopilotClientOptions + { + Connection = RuntimeConnection.ForUri(server.Url), + OnGitHubTelemetry = _ => Task.CompletedTask, + }); + await client.StartAsync(); + + await client.CreateSessionAsync(new SessionConfig { OnPermissionRequest = PermissionHandler.ApproveAll }); + + var createParams = server.LastCreateParams ?? throw new InvalidOperationException("session.create was not captured."); + Assert.True(createParams.TryGetProperty("enableGitHubTelemetryForwarding", out var flag)); + Assert.True(flag.GetBoolean()); + } + + [Fact] + public async Task ResumeSession_Opts_Into_Forwarding_When_Handler_Provided() + { + await using var server = await FakeTelemetryServer.StartAsync(); + await using var client = new CopilotClient(new CopilotClientOptions + { + Connection = RuntimeConnection.ForUri(server.Url), + OnGitHubTelemetry = _ => Task.CompletedTask, + }); + await client.StartAsync(); + + await client.ResumeSessionAsync("session-1", new ResumeSessionConfig { OnPermissionRequest = PermissionHandler.ApproveAll }); + + var resumeParams = server.LastResumeParams ?? throw new InvalidOperationException("session.resume was not captured."); + Assert.True(resumeParams.TryGetProperty("enableGitHubTelemetryForwarding", out var flag)); + Assert.True(flag.GetBoolean()); + } + + [Fact] + public async Task CreateSession_Does_Not_Opt_In_Without_Handler() + { + await using var server = await FakeTelemetryServer.StartAsync(); + await using var client = new CopilotClient(new CopilotClientOptions + { + Connection = RuntimeConnection.ForUri(server.Url), + }); + await client.StartAsync(); + + await client.CreateSessionAsync(new SessionConfig { OnPermissionRequest = PermissionHandler.ApproveAll }); + + var createParams = server.LastCreateParams ?? throw new InvalidOperationException("session.create was not captured."); + var optedIn = createParams.TryGetProperty("enableGitHubTelemetryForwarding", out var flag) + && flag.ValueKind == JsonValueKind.True; + Assert.False(optedIn); + } + + [Fact] + public async Task GitHubTelemetry_Event_Is_Forwarded_To_OnGitHubTelemetry() + { + var received = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await using var server = await FakeTelemetryServer.StartAsync(); + await using var client = new CopilotClient(new CopilotClientOptions + { + Connection = RuntimeConnection.ForUri(server.Url), + OnGitHubTelemetry = notification => + { + received.TrySetResult(notification); + return Task.CompletedTask; + }, + }); + await client.StartAsync(); + + await server.SendGitHubTelemetryEventAsync(new Dictionary + { + ["sessionId"] = "session-1", + ["restricted"] = false, + ["event"] = new Dictionary + { + ["kind"] = "tool_call_executed", + ["properties"] = new Dictionary { ["tool"] = "shell" }, + ["metrics"] = new Dictionary { ["duration_ms"] = 42 }, + ["session_id"] = "session-1", + }, + }); + + var notification = await received.Task.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.Equal("session-1", notification.SessionId); + Assert.False(notification.Restricted); + Assert.Equal("tool_call_executed", notification.Event.Kind); + Assert.Equal("shell", notification.Event.Properties["tool"]); + Assert.Equal(42, notification.Event.Metrics["duration_ms"]); + Assert.Equal("session-1", notification.Event.SessionId); + } + + [Fact] + public async Task GitHubTelemetry_Event_Maps_Restricted_And_ClientInfo() + { + var received = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + await using var server = await FakeTelemetryServer.StartAsync(); + await using var client = new CopilotClient(new CopilotClientOptions + { + Connection = RuntimeConnection.ForUri(server.Url), + OnGitHubTelemetry = notification => + { + received.TrySetResult(notification); + return Task.CompletedTask; + }, + }); + await client.StartAsync(); + + await server.SendGitHubTelemetryEventAsync(new Dictionary + { + ["sessionId"] = "session-2", + ["restricted"] = true, + ["event"] = new Dictionary + { + ["kind"] = "model_call", + ["properties"] = new Dictionary { ["model"] = "gpt-5" }, + ["metrics"] = new Dictionary { ["tokens"] = 128 }, + ["session_id"] = "session-2", + ["client"] = new Dictionary + { + ["cli_version"] = "1.2.3", + ["os_platform"] = "win32", + ["os_arch"] = "x64", + ["node_version"] = "20.0.0", + ["is_staff"] = false, + }, + }, + }); + + var notification = await received.Task.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.True(notification.Restricted); + + var clientInfo = notification.Event.Client; + Assert.NotNull(clientInfo); + Assert.Equal("1.2.3", clientInfo!.CliVersion); + Assert.Equal("win32", clientInfo.OsPlatform); + Assert.Equal("x64", clientInfo.OsArch); + Assert.Equal("20.0.0", clientInfo.NodeVersion); + Assert.Equal(false, clientInfo.IsStaff); + } + + private sealed class FakeTelemetryServer : IAsyncDisposable + { + private readonly TcpListener _listener; + private readonly CancellationTokenSource _cts = new(); + private readonly SemaphoreSlim _writeLock = new(1, 1); + private readonly TaskCompletionSource _connected = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly Task _serverTask; + + private FakeTelemetryServer(TcpListener listener) + { + _listener = listener; + _serverTask = RunAsync(); + } + + public string Url + { + get + { + var endpoint = (IPEndPoint)_listener.LocalEndpoint; + return $"http://127.0.0.1:{endpoint.Port}"; + } + } + + public JsonElement? LastCreateParams { get; private set; } + + public JsonElement? LastResumeParams { get; private set; } + + public static Task StartAsync() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + return Task.FromResult(new FakeTelemetryServer(listener)); + } + + public async Task SendGitHubTelemetryEventAsync(Dictionary notificationParams) + { + var stream = await _connected.Task.WaitAsync(_cts.Token); + + // Send a genuine JSON-RPC notification (no "id"), exactly as the runtime + // does via sendNotification. This exercises the real notification dispatch + // path rather than masking it behind a request that carries an id. + await WriteMessageAsync(stream, new Dictionary + { + ["jsonrpc"] = "2.0", + ["method"] = "gitHubTelemetry.event", + ["params"] = notificationParams, + }, _cts.Token); + } + + public async ValueTask DisposeAsync() + { + _cts.Cancel(); + _listener.Stop(); + + try + { + await _serverTask; + } + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException or IOException or SocketException) + { + // Expected during teardown: the listener/socket is torn down while the + // server loop is still awaiting I/O. Observe the exception and move on. + _ = ex; + } + + _cts.Dispose(); + _writeLock.Dispose(); + } + + private async Task RunAsync() + { + using var tcpClient = await _listener.AcceptTcpClientAsync(_cts.Token); + using var stream = tcpClient.GetStream(); + _connected.TrySetResult(stream); + + while (!_cts.Token.IsCancellationRequested) + { + using var message = await ReadMessageAsync(stream, _cts.Token); + if (message is null) + { + return; + } + + // Inbound messages without a "method" are responses to our own + // server-initiated requests (e.g. session.* the SDK answers); the + // SDK never replies to the gitHubTelemetry.event notification. + if (!message.RootElement.TryGetProperty("method", out _)) + { + continue; + } + + await HandleRequestAsync(stream, message.RootElement, _cts.Token); + } + } + + private async Task HandleRequestAsync(Stream stream, JsonElement request, CancellationToken cancellationToken) + { + if (!request.TryGetProperty("id", out var idElement)) + { + return; + } + + var id = idElement.Clone(); + var method = request.GetProperty("method").GetString(); + + object? result = method switch + { + "connect" => new Dictionary + { + ["ok"] = true, + ["protocolVersion"] = 3, + ["version"] = "test", + }, + "session.create" => CaptureCreate(request), + "session.resume" => CaptureResume(request), + "session.send" => new Dictionary { ["messageId"] = "message-1" }, + "session.destroy" => new Dictionary(), + "runtime.shutdown" => new Dictionary(), + _ => throw new InvalidOperationException($"Unexpected RPC method '{method}'."), + }; + + await WriteMessageAsync(stream, new Dictionary + { + ["jsonrpc"] = "2.0", + ["id"] = id, + ["result"] = result, + }, cancellationToken); + } + + private Dictionary CaptureCreate(JsonElement request) + { + LastCreateParams = request.TryGetProperty("params", out var p) ? p.Clone() : null; + return SessionResult(LastCreateParams); + } + + private Dictionary CaptureResume(JsonElement request) + { + LastResumeParams = request.TryGetProperty("params", out var p) ? p.Clone() : null; + return SessionResult(LastResumeParams); + } + + private static Dictionary SessionResult(JsonElement? paramsElement) + { + string sessionId = "session-1"; + if (paramsElement is { ValueKind: JsonValueKind.Object } p + && p.TryGetProperty("sessionId", out var sidProp) + && sidProp.ValueKind == JsonValueKind.String + && sidProp.GetString() is string sid + && !string.IsNullOrEmpty(sid)) + { + sessionId = sid; + } + + return new Dictionary + { + ["sessionId"] = sessionId, + ["workspacePath"] = null, + ["capabilities"] = null, + }; + } + + private async Task WriteMessageAsync(Stream stream, object payload, CancellationToken cancellationToken) + { + using var bodyStream = new MemoryStream(); + using (var writer = new Utf8JsonWriter(bodyStream)) + { + WriteJsonValue(writer, payload); + } + + var body = bodyStream.ToArray(); + var header = Encoding.ASCII.GetBytes($"Content-Length: {body.Length}\r\n\r\n"); + + await _writeLock.WaitAsync(cancellationToken); + try + { + await stream.WriteAsync(header, cancellationToken); + await stream.WriteAsync(body, cancellationToken); + await stream.FlushAsync(cancellationToken); + } + finally + { + _writeLock.Release(); + } + } + + private static void WriteJsonValue(Utf8JsonWriter writer, object? value) + { + switch (value) + { + case null: + writer.WriteNullValue(); + break; + case string stringValue: + writer.WriteStringValue(stringValue); + break; + case bool boolValue: + writer.WriteBooleanValue(boolValue); + break; + case int intValue: + writer.WriteNumberValue(intValue); + break; + case long longValue: + writer.WriteNumberValue(longValue); + break; + case JsonElement jsonElement: + jsonElement.WriteTo(writer); + break; + case Dictionary dictionary: + writer.WriteStartObject(); + foreach (var (propertyName, propertyValue) in dictionary) + { + writer.WritePropertyName(propertyName); + WriteJsonValue(writer, propertyValue); + } + writer.WriteEndObject(); + break; + default: + throw new InvalidOperationException($"Unexpected JSON value type '{value.GetType().Name}'."); + } + } + + private static async Task ReadMessageAsync(Stream stream, CancellationToken cancellationToken) + { + var headerBytes = new List(); + while (true) + { + var value = await ReadByteAsync(stream, cancellationToken); + if (value < 0) + { + return null; + } + + headerBytes.Add((byte)value); + var count = headerBytes.Count; + if (count >= 4 && + headerBytes[count - 4] == '\r' && + headerBytes[count - 3] == '\n' && + headerBytes[count - 2] == '\r' && + headerBytes[count - 1] == '\n') + { + break; + } + } + + var header = Encoding.ASCII.GetString([.. headerBytes]); + var contentLength = header + .Split(["\r\n"], StringSplitOptions.RemoveEmptyEntries) + .Select(line => line.Split(':', 2)) + .Where(parts => parts.Length == 2 && parts[0].Equals("Content-Length", StringComparison.OrdinalIgnoreCase)) + .Select(parts => int.Parse(parts[1].Trim(), System.Globalization.CultureInfo.InvariantCulture)) + .Single(); + + var body = new byte[contentLength]; + var offset = 0; + while (offset < body.Length) + { + var read = await stream.ReadAsync(body.AsMemory(offset, body.Length - offset), cancellationToken); + if (read == 0) + { + return null; + } + + offset += read; + } + + return JsonDocument.Parse(body); + } + + private static async Task ReadByteAsync(Stream stream, CancellationToken cancellationToken) + { + var buffer = new byte[1]; + var read = await stream.ReadAsync(buffer, cancellationToken); + return read == 0 ? -1 : buffer[0]; + } + } +} + +#pragma warning restore GHCP001 +#endif diff --git a/go/client.go b/go/client.go index 4e7f1f549..1bbf615d7 100644 --- a/go/client.go +++ b/go/client.go @@ -760,6 +760,9 @@ func (c *Client) CreateSession(ctx context.Context, config *SessionConfig) (*Ses } else { req.IncludeSubAgentStreamingEvents = Bool(true) } + if c.options.OnGitHubTelemetry != nil { + req.EnableGitHubTelemetryForwarding = Bool(true) + } if config.OnUserInputRequest != nil { req.RequestUserInput = Bool(true) } @@ -1038,6 +1041,9 @@ func (c *Client) ResumeSessionWithOptions(ctx context.Context, sessionID string, } else { req.IncludeSubAgentStreamingEvents = Bool(true) } + if c.options.OnGitHubTelemetry != nil { + req.EnableGitHubTelemetryForwarding = Bool(true) + } if config.OnUserInputRequest != nil { req.RequestUserInput = Bool(true) } @@ -2057,17 +2063,35 @@ func (c *Client) setupNotificationHandler() { } return session.clientSessionAPIs }) - if c.options.RequestHandler != nil { - adapter := newCopilotRequestAdapter(c.options.RequestHandler, func() *rpc.ServerLlmInferenceAPI { - if c.RPC == nil { - return nil - } - return c.RPC.LlmInference - }) - rpc.RegisterClientGlobalAPIHandlers(c.client, &rpc.ClientGlobalAPIHandlers{LlmInference: adapter}) + if c.options.RequestHandler != nil || c.options.OnGitHubTelemetry != nil { + handlers := &rpc.ClientGlobalAPIHandlers{} + if c.options.RequestHandler != nil { + handlers.LlmInference = newCopilotRequestAdapter(c.options.RequestHandler, func() *rpc.ServerLlmInferenceAPI { + if c.RPC == nil { + return nil + } + return c.RPC.LlmInference + }) + } + if c.options.OnGitHubTelemetry != nil { + handlers.GitHubTelemetry = &gitHubTelemetryAdapter{callback: c.options.OnGitHubTelemetry} + } + rpc.RegisterClientGlobalAPIHandlers(c.client, handlers) } } +// gitHubTelemetryAdapter adapts the OnGitHubTelemetry option to the generated +// rpc.GitHubTelemetryHandler interface. +type gitHubTelemetryAdapter struct { + callback func(notification *rpc.GitHubTelemetryNotification) +} + +func (a *gitHubTelemetryAdapter) Event(request *rpc.GitHubTelemetryNotification) error { + defer func() { recover() }() // Ignore handler panics + a.callback(request) + return nil +} + func (c *Client) handleSessionEvent(req sessionEventRequest) { if req.SessionID == "" { return diff --git a/go/client_test.go b/go/client_test.go index 059f098a0..f7d5f50c6 100644 --- a/go/client_test.go +++ b/go/client_test.go @@ -15,6 +15,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/github/copilot-sdk/go/internal/jsonrpc2" "github.com/github/copilot-sdk/go/internal/truncbuffer" @@ -2337,6 +2338,235 @@ func TestResumeSessionRequest_IncludeSubAgentStreamingEvents(t *testing.T) { }) } +func TestCreateSessionRequest_EnableGitHubTelemetryForwarding(t *testing.T) { + t.Run("forwards explicit true", func(t *testing.T) { + req := createSessionRequest{ + EnableGitHubTelemetryForwarding: Bool(true), + } + data, err := json.Marshal(req) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + var m map[string]any + if err := json.Unmarshal(data, &m); err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + if m["enableGitHubTelemetryForwarding"] != true { + t.Errorf("Expected enableGitHubTelemetryForwarding to be true, got %v", m["enableGitHubTelemetryForwarding"]) + } + }) + + t.Run("omits when not set", func(t *testing.T) { + req := createSessionRequest{} + data, _ := json.Marshal(req) + var m map[string]any + json.Unmarshal(data, &m) + if _, ok := m["enableGitHubTelemetryForwarding"]; ok { + t.Error("Expected enableGitHubTelemetryForwarding to be omitted when not set") + } + }) +} + +func TestResumeSessionRequest_EnableGitHubTelemetryForwarding(t *testing.T) { + t.Run("forwards explicit true", func(t *testing.T) { + req := resumeSessionRequest{ + SessionID: "s1", + EnableGitHubTelemetryForwarding: Bool(true), + } + data, err := json.Marshal(req) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + var m map[string]any + if err := json.Unmarshal(data, &m); err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + if m["enableGitHubTelemetryForwarding"] != true { + t.Errorf("Expected enableGitHubTelemetryForwarding to be true, got %v", m["enableGitHubTelemetryForwarding"]) + } + }) + + t.Run("omits when not set", func(t *testing.T) { + req := resumeSessionRequest{SessionID: "s1"} + data, _ := json.Marshal(req) + var m map[string]any + json.Unmarshal(data, &m) + if _, ok := m["enableGitHubTelemetryForwarding"]; ok { + t.Error("Expected enableGitHubTelemetryForwarding to be omitted when not set") + } + }) +} + +func TestClient_ForwardsGitHubTelemetryForwardingToSessionRequests(t *testing.T) { + rpcClient, server, _ := newRuntimeShutdownRpcPair(t) + t.Cleanup(server.Stop) + client := &Client{ + client: rpcClient, + RPC: rpc.NewServerRPC(rpcClient), + sessions: make(map[string]*Session), + options: ClientOptions{OnGitHubTelemetry: func(*rpc.GitHubTelemetryNotification) {}}, + } + + createParams := make(chan json.RawMessage, 1) + server.SetRequestHandler("session.create", func(params json.RawMessage) (json.RawMessage, *jsonrpc2.Error) { + createParams <- append(json.RawMessage(nil), params...) + sessionID := sessionIDFromParams(t, params) + return []byte(`{"sessionId":"` + sessionID + `","workspacePath":"/workspace"}`), nil + }) + + if _, err := client.CreateSession(t.Context(), &SessionConfig{}); err != nil { + t.Fatalf("CreateSession failed: %v", err) + } + assertForwardingFlagTrue(t, <-createParams) + + resumeParams := make(chan json.RawMessage, 1) + server.SetRequestHandler("session.resume", func(params json.RawMessage) (json.RawMessage, *jsonrpc2.Error) { + resumeParams <- append(json.RawMessage(nil), params...) + return []byte(`{"sessionId":"resumed","workspacePath":"/workspace"}`), nil + }) + + if _, err := client.ResumeSessionWithOptions(t.Context(), "resumed", &ResumeSessionConfig{}); err != nil { + t.Fatalf("ResumeSessionWithOptions failed: %v", err) + } + assertForwardingFlagTrue(t, <-resumeParams) +} + +func assertForwardingFlagTrue(t *testing.T, params json.RawMessage) { + t.Helper() + var decoded map[string]any + if err := json.Unmarshal(params, &decoded); err != nil { + t.Fatalf("failed to unmarshal request params: %v", err) + } + if decoded["enableGitHubTelemetryForwarding"] != true { + t.Fatalf("expected enableGitHubTelemetryForwarding=true, got %v", decoded["enableGitHubTelemetryForwarding"]) + } +} + +func TestClient_OmitsGitHubTelemetryForwardingWhenNoHandler(t *testing.T) { + rpcClient, server, _ := newRuntimeShutdownRpcPair(t) + t.Cleanup(server.Stop) + client := &Client{ + client: rpcClient, + RPC: rpc.NewServerRPC(rpcClient), + sessions: make(map[string]*Session), + options: ClientOptions{}, + } + + createParams := make(chan json.RawMessage, 1) + server.SetRequestHandler("session.create", func(params json.RawMessage) (json.RawMessage, *jsonrpc2.Error) { + createParams <- append(json.RawMessage(nil), params...) + sessionID := sessionIDFromParams(t, params) + return []byte(`{"sessionId":"` + sessionID + `","workspacePath":"/workspace"}`), nil + }) + + if _, err := client.CreateSession(t.Context(), &SessionConfig{}); err != nil { + t.Fatalf("CreateSession failed: %v", err) + } + assertForwardingFlagAbsent(t, <-createParams) + + resumeParams := make(chan json.RawMessage, 1) + server.SetRequestHandler("session.resume", func(params json.RawMessage) (json.RawMessage, *jsonrpc2.Error) { + resumeParams <- append(json.RawMessage(nil), params...) + return []byte(`{"sessionId":"resumed","workspacePath":"/workspace"}`), nil + }) + + if _, err := client.ResumeSessionWithOptions(t.Context(), "resumed", &ResumeSessionConfig{}); err != nil { + t.Fatalf("ResumeSessionWithOptions failed: %v", err) + } + assertForwardingFlagAbsent(t, <-resumeParams) +} + +func assertForwardingFlagAbsent(t *testing.T, params json.RawMessage) { + t.Helper() + var decoded map[string]any + if err := json.Unmarshal(params, &decoded); err != nil { + t.Fatalf("failed to unmarshal request params: %v", err) + } + if _, ok := decoded["enableGitHubTelemetryForwarding"]; ok { + t.Fatalf("expected enableGitHubTelemetryForwarding to be omitted, got %v", decoded["enableGitHubTelemetryForwarding"]) + } +} + +func TestGitHubTelemetryNotificationRoutesToCallback(t *testing.T) { + // The runtime forwards telemetry via a JSON-RPC *notification* (no id). + // Drive a real Content-Length-framed notification through the transport and + // verify that a real Client wired with OnGitHubTelemetry routes it to the + // callback through the client's own client-global handler registration + // (setupNotificationHandler), rather than registering the adapter by hand. + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + rpcClient := jsonrpc2.NewClient(clientConn, clientConn) + rpcClient.Start() + defer rpcClient.Stop() + + // Drain the client->server direction so net.Pipe writes never block. + go func() { + buf := make([]byte, 4096) + for { + if _, err := serverConn.Read(buf); err != nil { + return + } + } + }() + + received := make(chan *rpc.GitHubTelemetryNotification, 1) + client := &Client{ + client: rpcClient, + RPC: rpc.NewServerRPC(rpcClient), + sessions: make(map[string]*Session), + options: ClientOptions{ + OnGitHubTelemetry: func(n *rpc.GitHubTelemetryNotification) { received <- n }, + }, + } + // setupNotificationHandler is what registers the gitHubTelemetryAdapter when + // OnGitHubTelemetry is set; exercising it here covers the real client wiring. + client.setupNotificationHandler() + + notification := map[string]any{ + "jsonrpc": "2.0", + "method": "gitHubTelemetry.event", + "params": map[string]any{ + "sessionId": "sess-telemetry", + "restricted": true, + "event": map[string]any{ + "kind": "tool_call_executed", + "metrics": map[string]any{"duration_ms": 12.5}, + "properties": map[string]any{"tool": "shell"}, + }, + }, + } + data, err := json.Marshal(notification) + if err != nil { + t.Fatalf("marshal notification: %v", err) + } + go func() { + _, _ = fmt.Fprintf(serverConn, "Content-Length: %d\r\n\r\n%s", len(data), data) + }() + + select { + case n := <-received: + if n.SessionID != "sess-telemetry" { + t.Errorf("session id = %q, want sess-telemetry", n.SessionID) + } + if !n.Restricted { + t.Error("expected restricted to be true") + } + if n.Event.Kind != "tool_call_executed" { + t.Errorf("kind = %q, want tool_call_executed", n.Event.Kind) + } + if n.Event.Metrics["duration_ms"] != 12.5 { + t.Errorf("metrics[duration_ms] = %v, want 12.5", n.Event.Metrics["duration_ms"]) + } + if n.Event.Properties["tool"] != "shell" { + t.Errorf("properties[tool] = %q, want shell", n.Event.Properties["tool"]) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for telemetry notification") + } +} + func TestCreateSessionRequest_EnableOnDemandInstructionDiscovery(t *testing.T) { t.Run("forwards explicit true", func(t *testing.T) { req := createSessionRequest{ diff --git a/go/internal/e2e/github_telemetry_e2e_test.go b/go/internal/e2e/github_telemetry_e2e_test.go new file mode 100644 index 000000000..666817451 --- /dev/null +++ b/go/internal/e2e/github_telemetry_e2e_test.go @@ -0,0 +1,68 @@ +package e2e + +import ( + "sync" + "testing" + "time" + + copilot "github.com/github/copilot-sdk/go" + "github.com/github/copilot-sdk/go/internal/e2e/testharness" + "github.com/github/copilot-sdk/go/rpc" +) + +func TestGitHubTelemetryE2E(t *testing.T) { + t.Run("should forward github telemetry for a live session", func(t *testing.T) { + ctx := testharness.NewTestContext(t) + ctx.ConfigureForTest(t) + + var mu sync.Mutex + var notifications []*rpc.GitHubTelemetryNotification + client := ctx.NewClient(func(opts *copilot.ClientOptions) { + opts.OnGitHubTelemetry = func(notification *rpc.GitHubTelemetryNotification) { + mu.Lock() + notifications = append(notifications, notification) + mu.Unlock() + } + }) + t.Cleanup(func() { client.ForceStop() }) + + session, err := client.CreateSession(t.Context(), &copilot.SessionConfig{ + OnPermissionRequest: copilot.PermissionHandler.ApproveAll, + }) + if err != nil { + t.Fatalf("CreateSession failed: %v", err) + } + t.Cleanup(func() { session.Disconnect() }) + + notification := waitForGitHubTelemetryNotification(t, &mu, ¬ifications, 30*time.Second) + if notification.SessionID == "" { + t.Fatal("Expected a non-empty SessionID") + } + if notification.Event.Kind == "" { + t.Fatal("Expected a non-empty Event.Kind") + } + }) +} + +func waitForGitHubTelemetryNotification(t *testing.T, mu *sync.Mutex, notifications *[]*rpc.GitHubTelemetryNotification, timeout time.Duration) *rpc.GitHubTelemetryNotification { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + mu.Lock() + if len(*notifications) > 0 { + notification := (*notifications)[0] + mu.Unlock() + if notification != nil { + return notification + } + t.Fatal("Received nil GitHub telemetry notification") + } + mu.Unlock() + + time.Sleep(50 * time.Millisecond) + } + + t.Fatalf("Timed out waiting for GitHub telemetry notification after %s", timeout) + return nil +} diff --git a/go/rpc/zrpc.go b/go/rpc/zrpc.go index 03cec0dc3..7a021c0ab 100644 --- a/go/rpc/zrpc.go +++ b/go/rpc/zrpc.go @@ -18721,7 +18721,7 @@ type GitHubTelemetryHandler interface { // Parameters: Payload for a `gitHubTelemetry.event` notification: a single GitHub telemetry // event the runtime forwards to a host connection that opted into telemetry forwarding for // the session. - Event(request *GitHubTelemetryNotification) (*GitHubTelemetryEventResult, error) + Event(request *GitHubTelemetryNotification) error } // Experimental: LlmInferenceHandler contains experimental APIs that may change or be @@ -18784,17 +18784,12 @@ func RegisterClientGlobalAPIHandlers(client *jsonrpc2.Client, handlers *ClientGl return nil, &jsonrpc2.Error{Code: -32602, Message: fmt.Sprintf("Invalid params: %v", err)} } if handlers == nil || handlers.GitHubTelemetry == nil { - return nil, &jsonrpc2.Error{Code: -32603, Message: "No gitHubTelemetry client-global handler registered"} + return nil, nil } - result, err := handlers.GitHubTelemetry.Event(&request) - if err != nil { + if err := handlers.GitHubTelemetry.Event(&request); err != nil { return nil, clientGlobalHandlerError(err) } - raw, err := json.Marshal(result) - if err != nil { - return nil, &jsonrpc2.Error{Code: -32603, Message: fmt.Sprintf("Failed to marshal response: %v", err)} - } - return raw, nil + return nil, nil }) client.SetRequestHandler("llmInference.httpRequestChunk", func(params json.RawMessage) (json.RawMessage, *jsonrpc2.Error) { var request LlmInferenceHTTPRequestChunkRequest diff --git a/go/types.go b/go/types.go index 3586a2a91..1e424514e 100644 --- a/go/types.go +++ b/go/types.go @@ -122,6 +122,11 @@ type ClientOptions struct { // this handler instead of issuing the calls itself. Works for both CAPI // and BYOK sessions. RequestHandler *CopilotRequestHandler + // OnGitHubTelemetry registers a connection-level callback (experimental) + // that receives GitHub telemetry events the runtime forwards for sessions + // opened by this client. When non-nil, every session created or resumed by + // this client opts into telemetry forwarding (enableGitHubTelemetryForwarding). + OnGitHubTelemetry func(notification *rpc.GitHubTelemetryNotification) // Telemetry configures OpenTelemetry integration for the runtime. // When non-nil, COPILOT_OTEL_ENABLED=true is set and any populated // fields are mapped to the corresponding environment variables. @@ -2081,6 +2086,7 @@ type createSessionRequest struct { WorkingDirectory string `json:"workingDirectory,omitempty"` Streaming *bool `json:"streaming,omitempty"` IncludeSubAgentStreamingEvents *bool `json:"includeSubAgentStreamingEvents,omitempty"` + EnableGitHubTelemetryForwarding *bool `json:"enableGitHubTelemetryForwarding,omitempty"` MCPServers map[string]MCPServerConfig `json:"mcpServers,omitempty"` MCPOAuthTokenStorage string `json:"mcpOAuthTokenStorage,omitempty"` EnvValueMode string `json:"envValueMode,omitempty"` @@ -2179,6 +2185,7 @@ type resumeSessionRequest struct { ContinuePendingWork *bool `json:"continuePendingWork,omitempty"` Streaming *bool `json:"streaming,omitempty"` IncludeSubAgentStreamingEvents *bool `json:"includeSubAgentStreamingEvents,omitempty"` + EnableGitHubTelemetryForwarding *bool `json:"enableGitHubTelemetryForwarding,omitempty"` MCPServers map[string]MCPServerConfig `json:"mcpServers,omitempty"` MCPOAuthTokenStorage string `json:"mcpOAuthTokenStorage,omitempty"` EnvValueMode string `json:"envValueMode,omitempty"` diff --git a/java/src/main/java/com/github/copilot/CopilotClient.java b/java/src/main/java/com/github/copilot/CopilotClient.java index 52c0cfa48..31a892914 100644 --- a/java/src/main/java/com/github/copilot/CopilotClient.java +++ b/java/src/main/java/com/github/copilot/CopilotClient.java @@ -17,6 +17,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; @@ -26,6 +27,7 @@ import com.github.copilot.generated.rpc.SessionOptionsUpdateParams; import com.github.copilot.generated.rpc.SessionInstalledPlugin; import com.github.copilot.generated.rpc.ConnectParams; +import com.github.copilot.generated.rpc.GitHubTelemetryNotification; import com.github.copilot.generated.rpc.ServerRpc; import com.github.copilot.generated.rpc.SessionEventLogRegisterInterestParams; import com.github.copilot.rpc.DeleteSessionResponse; @@ -258,6 +260,14 @@ private Connection startCoreBody() { llmAdapter.registerHandlers(rpc); } + // Register the GitHub telemetry forwarding handler when configured. + Function> onGitHubTelemetry = this.options + .getOnGitHubTelemetry(); + if (onGitHubTelemetry != null) { + GitHubTelemetryAdapter telemetryAdapter = new GitHubTelemetryAdapter(onGitHubTelemetry); + telemetryAdapter.registerHandlers(rpc); + } + // Verify protocol version verifyProtocolVersion(connection); LoggingHelpers.logTiming(LOG, Level.FINE, @@ -579,6 +589,13 @@ public CompletableFuture createSession(SessionConfig config) { request.setSystemMessage(extracted.wireSystemMessage()); } + // Opt this session into GitHub telemetry forwarding when a + // connection-level handler is registered (mirrors the runtime's + // hand-written capability flag, not part of the codegen'd contract). + if (options.getOnGitHubTelemetry() != null) { + request.setEnableGitHubTelemetryForwarding(true); + } + // Empty mode: validate availableTools and set toolFilterPrecedence if (options.getMode() == CopilotClientMode.EMPTY) { if (config.getAvailableTools() == null) { @@ -733,6 +750,13 @@ public CompletableFuture resumeSession(String sessionId, ResumeS request.setSystemMessage(extracted.wireSystemMessage()); } + // Opt this session into GitHub telemetry forwarding when a + // connection-level handler is registered (mirrors the runtime's + // hand-written capability flag, not part of the codegen'd contract). + if (options.getOnGitHubTelemetry() != null) { + request.setEnableGitHubTelemetryForwarding(true); + } + // Empty mode: validate availableTools and set toolFilterPrecedence for resume // path if (options.getMode() == CopilotClientMode.EMPTY) { diff --git a/java/src/main/java/com/github/copilot/GitHubTelemetryAdapter.java b/java/src/main/java/com/github/copilot/GitHubTelemetryAdapter.java new file mode 100644 index 000000000..1fdb2a473 --- /dev/null +++ b/java/src/main/java/com/github/copilot/GitHubTelemetryAdapter.java @@ -0,0 +1,54 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +package com.github.copilot; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.copilot.generated.rpc.GitHubTelemetryNotification; + +/** + * Bridges the runtime's {@code gitHubTelemetry.event} client-global + * notification to a consumer's async {@code onGitHubTelemetry} callback. The + * notification carries per-session GitHub (hydro) telemetry the runtime + * forwards to connections that opted into telemetry forwarding. + */ +final class GitHubTelemetryAdapter { + + private static final Logger LOG = Logger.getLogger(GitHubTelemetryAdapter.class.getName()); + private static final ObjectMapper MAPPER = JsonRpcClient.getObjectMapper(); + + private final Function> callback; + + GitHubTelemetryAdapter(Function> callback) { + this.callback = callback; + } + + void registerHandlers(JsonRpcClient rpc) { + rpc.registerMethodHandler("gitHubTelemetry.event", (rpcId, params) -> handleEvent(params)); + } + + private void handleEvent(JsonNode params) { + try { + GitHubTelemetryNotification notification = MAPPER.treeToValue(params, GitHubTelemetryNotification.class); + if (notification != null) { + CompletableFuture result = callback.apply(notification); + if (result != null) { + result.whenComplete((unused, error) -> { + if (error != null) { + LOG.log(Level.WARNING, "Error handling gitHubTelemetry.event notification", error); + } + }); + } + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Error handling gitHubTelemetry.event notification", e); + } + } +} diff --git a/java/src/main/java/com/github/copilot/rpc/CopilotClientOptions.java b/java/src/main/java/com/github/copilot/rpc/CopilotClientOptions.java index e9f59aa64..0d4494d73 100644 --- a/java/src/main/java/com/github/copilot/rpc/CopilotClientOptions.java +++ b/java/src/main/java/com/github/copilot/rpc/CopilotClientOptions.java @@ -11,11 +11,14 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Function; import java.util.function.Supplier; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.github.copilot.CopilotExperimental; import com.github.copilot.CopilotRequestHandler; +import com.github.copilot.generated.rpc.GitHubTelemetryNotification; import java.util.Optional; import java.util.OptionalInt; @@ -57,6 +60,7 @@ public class CopilotClientOptions { private CopilotClientMode mode = CopilotClientMode.COPILOT_CLI; private Supplier>> onListModels; private CopilotRequestHandler requestHandler; + private Function> onGitHubTelemetry; private int port; private TelemetryConfig telemetry; private Integer sessionIdleTimeoutSeconds; @@ -484,6 +488,44 @@ public CopilotClientOptions setRequestHandler(CopilotRequestHandler requestHandl return this; } + /** + * Gets the connection-level GitHub telemetry forwarding handler. + * + *

+ * Experimental: this option may change or be removed without notice. + * + * @return the async telemetry handler, or {@code null} if not set + */ + @JsonIgnore + @CopilotExperimental + public Function> getOnGitHubTelemetry() { + return onGitHubTelemetry; + } + + /** + * Sets a connection-level handler for GitHub telemetry forwarding + * (experimental). + * + *

+ * When provided, the client opts every session it creates or resumes into + * telemetry forwarding, and the runtime forwards each per-session telemetry + * event to this handler via the {@code gitHubTelemetry.event} notification. The + * handler returns a {@link CompletableFuture} that completes when asynchronous + * processing is finished. + * + * @param onGitHubTelemetry + * the async telemetry handler (must not be {@code null}) + * @return this options instance for method chaining + * @throws IllegalArgumentException + * if {@code onGitHubTelemetry} is {@code null} + */ + @CopilotExperimental + public CopilotClientOptions setOnGitHubTelemetry( + Function> onGitHubTelemetry) { + this.onGitHubTelemetry = Objects.requireNonNull(onGitHubTelemetry, "onGitHubTelemetry must not be null"); + return this; + } + /** * Gets the TCP port for the CLI server. * @@ -720,6 +762,7 @@ public CopilotClientOptions clone() { copy.logLevel = this.logLevel; copy.onListModels = this.onListModels; copy.requestHandler = this.requestHandler; + copy.onGitHubTelemetry = this.onGitHubTelemetry; copy.port = this.port; copy.remote = this.remote; copy.sessionIdleTimeoutSeconds = this.sessionIdleTimeoutSeconds; diff --git a/java/src/main/java/com/github/copilot/rpc/CreateSessionRequest.java b/java/src/main/java/com/github/copilot/rpc/CreateSessionRequest.java index 3ec3dc569..2510b0bd6 100644 --- a/java/src/main/java/com/github/copilot/rpc/CreateSessionRequest.java +++ b/java/src/main/java/com/github/copilot/rpc/CreateSessionRequest.java @@ -102,6 +102,9 @@ public final class CreateSessionRequest { @JsonProperty("includeSubAgentStreamingEvents") private Boolean includeSubAgentStreamingEvents; + @JsonProperty("enableGitHubTelemetryForwarding") + private Boolean enableGitHubTelemetryForwarding; + @JsonProperty("mcpServers") private Map mcpServers; @@ -820,6 +823,27 @@ public void clearIncludeSubAgentStreamingEvents() { this.includeSubAgentStreamingEvents = null; } + /** Gets the GitHub telemetry forwarding flag. @return the flag */ + public Boolean getEnableGitHubTelemetryForwarding() { + return enableGitHubTelemetryForwarding; + } + + /** + * Sets the GitHub telemetry forwarding flag. @param + * enableGitHubTelemetryForwarding the flag + */ + public void setEnableGitHubTelemetryForwarding(boolean enableGitHubTelemetryForwarding) { + this.enableGitHubTelemetryForwarding = enableGitHubTelemetryForwarding; + } + + /** + * Clears the enableGitHubTelemetryForwarding setting, reverting to the default + * behavior. + */ + public void clearEnableGitHubTelemetryForwarding() { + this.enableGitHubTelemetryForwarding = null; + } + /** Gets the commands wire definitions. @return the commands */ public List getCommands() { return commands == null ? null : Collections.unmodifiableList(commands); diff --git a/java/src/main/java/com/github/copilot/rpc/ResumeSessionRequest.java b/java/src/main/java/com/github/copilot/rpc/ResumeSessionRequest.java index 89776f86c..4917f1d8c 100644 --- a/java/src/main/java/com/github/copilot/rpc/ResumeSessionRequest.java +++ b/java/src/main/java/com/github/copilot/rpc/ResumeSessionRequest.java @@ -145,6 +145,9 @@ public final class ResumeSessionRequest { @JsonProperty("includeSubAgentStreamingEvents") private Boolean includeSubAgentStreamingEvents; + @JsonProperty("enableGitHubTelemetryForwarding") + private Boolean enableGitHubTelemetryForwarding; + @JsonProperty("mcpServers") private Map mcpServers; @@ -705,6 +708,27 @@ public void clearIncludeSubAgentStreamingEvents() { this.includeSubAgentStreamingEvents = null; } + /** Gets the GitHub telemetry forwarding flag. @return the flag */ + public Boolean getEnableGitHubTelemetryForwarding() { + return enableGitHubTelemetryForwarding; + } + + /** + * Sets the GitHub telemetry forwarding flag. @param + * enableGitHubTelemetryForwarding the flag + */ + public void setEnableGitHubTelemetryForwarding(boolean enableGitHubTelemetryForwarding) { + this.enableGitHubTelemetryForwarding = enableGitHubTelemetryForwarding; + } + + /** + * Clears the enableGitHubTelemetryForwarding setting, reverting to the default + * behavior. + */ + public void clearEnableGitHubTelemetryForwarding() { + this.enableGitHubTelemetryForwarding = null; + } + /** Gets MCP servers. @return the servers map */ public Map getMcpServers() { return mcpServers == null ? null : Collections.unmodifiableMap(mcpServers); diff --git a/java/src/test/java/com/github/copilot/GitHubTelemetryForwardingIT.java b/java/src/test/java/com/github/copilot/GitHubTelemetryForwardingIT.java new file mode 100644 index 000000000..d41c5f97d --- /dev/null +++ b/java/src/test/java/com/github/copilot/GitHubTelemetryForwardingIT.java @@ -0,0 +1,59 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +package com.github.copilot; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import com.github.copilot.generated.rpc.GitHubTelemetryNotification; +import com.github.copilot.rpc.CopilotClientOptions; +import com.github.copilot.rpc.PermissionHandler; +import com.github.copilot.rpc.SessionConfig; + +/** + * Failsafe integration test that verifies the live CLI forwards GitHub + * telemetry notifications during session creation. + */ +@AllowCopilotExperimental +class GitHubTelemetryForwardingIT { + + @Test + void forwardsGitHubTelemetryForALiveSession() throws Exception { + var notifications = new CopyOnWriteArrayList(); + var firstNotification = new CompletableFuture(); + + try (E2ETestContext ctx = E2ETestContext.create()) { + var options = new CopilotClientOptions().setOnGitHubTelemetry(notification -> { + notifications.add(notification); + firstNotification.complete(notification); + return CompletableFuture.completedFuture(null); + }); + + try (CopilotClient client = ctx.createClient(options); + CopilotSession session = client + .createSession(new SessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)) + .get(30, TimeUnit.SECONDS)) { + + GitHubTelemetryNotification notification = firstNotification.get(30, TimeUnit.SECONDS); + + assertFalse(notifications.isEmpty(), "Expected at least one GitHub telemetry notification"); + assertNotNull(notification, "Expected a GitHub telemetry notification"); + assertNotNull(notification.sessionId(), "Telemetry notification sessionId must be present"); + assertTrue(!notification.sessionId().isBlank(), "Telemetry notification sessionId must be non-empty"); + assertNotNull(notification.restricted(), "Telemetry notification restricted flag must be present"); + assertNotNull(notification.event(), "Telemetry notification event must be present"); + assertNotNull(notification.event().kind(), "Telemetry event kind must be present"); + assertTrue(!notification.event().kind().isBlank(), "Telemetry event kind must be non-empty"); + } + } + } +} diff --git a/java/src/test/java/com/github/copilot/GitHubTelemetryTest.java b/java/src/test/java/com/github/copilot/GitHubTelemetryTest.java new file mode 100644 index 000000000..ad950b823 --- /dev/null +++ b/java/src/test/java/com/github/copilot/GitHubTelemetryTest.java @@ -0,0 +1,289 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +package com.github.copilot; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.github.copilot.generated.rpc.GitHubTelemetryNotification; +import com.github.copilot.rpc.CopilotClientOptions; +import com.github.copilot.rpc.PermissionHandler; +import com.github.copilot.rpc.ResumeSessionConfig; +import com.github.copilot.rpc.SessionConfig; + +/** + * Exercises the hand-written GitHub telemetry forwarding surface: the + * {@code gitHubTelemetry.event} notification adapter, the + * {@code enableGitHubTelemetryForwarding} capability flag on the create/resume + * requests, and the {@code onGitHubTelemetry} client option. + */ +@AllowCopilotExperimental +class GitHubTelemetryTest { + + private record SocketPair(JsonRpcClient client, Socket serverSide, + ServerSocket serverSocket) implements AutoCloseable { + + @Override + public void close() throws Exception { + client.close(); + serverSide.close(); + serverSocket.close(); + } + } + + private SocketPair createSocketPair() throws Exception { + var serverSocket = new ServerSocket(0); + var clientSocket = new Socket("localhost", serverSocket.getLocalPort()); + var serverSide = serverSocket.accept(); + var client = JsonRpcClient.fromSocket(clientSocket); + return new SocketPair(client, serverSide, serverSocket); + } + + private void writeRpcMessage(OutputStream out, String json) throws IOException { + byte[] content = json.getBytes(StandardCharsets.UTF_8); + String header = "Content-Length: " + content.length + "\r\n\r\n"; + out.write(header.getBytes(StandardCharsets.UTF_8)); + out.write(content); + out.flush(); + } + + @Test + void adapterDispatchesNotificationToHandlerWithTypedPayload() throws Exception { + try (var pair = createSocketPair()) { + var received = new CompletableFuture(); + Function> handler = notification -> { + received.complete(notification); + return CompletableFuture.completedFuture(null); + }; + new GitHubTelemetryAdapter(handler).registerHandlers(pair.client()); + + String notification = """ + { + "jsonrpc": "2.0", + "method": "gitHubTelemetry.event", + "params": { + "sessionId": "sess-123", + "restricted": true, + "event": { + "kind": "tool_call_executed", + "created_at": "2024-01-01T00:00:00Z", + "model_call_id": "call-9", + "properties": { "tool": "shell" }, + "metrics": { "duration_ms": 42.5 }, + "exp_assignment_context": "ctx", + "features": { "flag_a": "on" }, + "session_id": "sess-123", + "copilot_tracking_id": "track-1", + "client": { + "cli_version": "1.2.3", + "os_platform": "win32", + "os_version": "10", + "os_arch": "x64", + "node_version": "20.0.0", + "is_staff": false + } + } + } + } + """; + writeRpcMessage(pair.serverSide().getOutputStream(), notification); + + GitHubTelemetryNotification result = received.get(5, TimeUnit.SECONDS); + assertEquals("sess-123", result.sessionId()); + assertTrue(result.restricted()); + + var event = result.event(); + assertNotNull(event); + assertEquals("tool_call_executed", event.kind()); + assertEquals("2024-01-01T00:00:00Z", event.createdAt()); + assertEquals("call-9", event.modelCallId()); + assertEquals("shell", event.properties().get("tool")); + assertEquals(42.5, event.metrics().get("duration_ms")); + assertEquals("ctx", event.expAssignmentContext()); + assertEquals("on", event.features().get("flag_a")); + assertEquals("sess-123", event.sessionId()); + assertEquals("track-1", event.copilotTrackingId()); + + var client = event.client(); + assertNotNull(client); + assertEquals("1.2.3", client.cliVersion()); + assertEquals("win32", client.osPlatform()); + assertEquals("x64", client.osArch()); + assertEquals("20.0.0", client.nodeVersion()); + assertEquals(Boolean.FALSE, client.isStaff()); + } + } + + @Test + void clientOptsSessionsIntoForwardingAndReceivesEvents() throws Exception { + var received = new CompletableFuture(); + Function> handler = notification -> { + received.complete(notification); + return CompletableFuture.completedFuture(null); + }; + + try (var server = new FakeRuntimeServer(); + var client = new CopilotClient( + new CopilotClientOptions().setCliUrl(server.url()).setOnGitHubTelemetry(handler))) { + + client.start().get(15, TimeUnit.SECONDS); + + // Creating a session must opt it into telemetry forwarding. + client.createSession(new SessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)).get(15, + TimeUnit.SECONDS); + JsonNode createParams = server.awaitCreate(); + assertTrue(createParams.path("enableGitHubTelemetryForwarding").asBoolean(), + "create request should carry enableGitHubTelemetryForwarding=true"); + + // The adapter registered on connect should forward server-pushed events. + server.sendTelemetry(Map.of("sessionId", "sess-xyz", "restricted", false, "event", + Map.of("kind", "session_started", "session_id", "sess-xyz"))); + GitHubTelemetryNotification event = received.get(5, TimeUnit.SECONDS); + assertEquals("sess-xyz", event.sessionId()); + assertFalse(event.restricted()); + assertEquals("session_started", event.event().kind()); + + // Resuming a session must opt it in as well. + client.resumeSession("resume-1", + new ResumeSessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)) + .get(15, TimeUnit.SECONDS); + JsonNode resumeParams = server.awaitResume(); + assertTrue(resumeParams.path("enableGitHubTelemetryForwarding").asBoolean(), + "resume request should carry enableGitHubTelemetryForwarding=true"); + } + } + + @Test + void clientOmitsForwardingWhenNoHandler() throws Exception { + try (var server = new FakeRuntimeServer(); + var client = new CopilotClient(new CopilotClientOptions().setCliUrl(server.url()))) { + + client.start().get(15, TimeUnit.SECONDS); + + client.createSession(new SessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)).get(15, + TimeUnit.SECONDS); + JsonNode createParams = server.awaitCreate(); + assertFalse(createParams.has("enableGitHubTelemetryForwarding"), + "create request should omit the flag when no handler is registered"); + + client.resumeSession("resume-1", + new ResumeSessionConfig().setOnPermissionRequest(PermissionHandler.APPROVE_ALL)) + .get(15, TimeUnit.SECONDS); + JsonNode resumeParams = server.awaitResume(); + assertFalse(resumeParams.has("enableGitHubTelemetryForwarding"), + "resume request should omit the flag when no handler is registered"); + } + } + + @Test + void optionsRetainAndCloneTelemetryHandler() { + Function> handler = n -> CompletableFuture + .completedFuture(null); + var options = new CopilotClientOptions().setOnGitHubTelemetry(handler); + assertSame(handler, options.getOnGitHubTelemetry()); + + var copy = options.clone(); + assertSame(handler, copy.getOnGitHubTelemetry()); + } + + /** + * A minimal in-process JSON-RPC runtime that answers the connect/create/resume + * handshake so a real {@link CopilotClient} can be driven over a socket, and + * can push {@code gitHubTelemetry.event} notifications back to the client. + */ + private static final class FakeRuntimeServer implements AutoCloseable { + + private final ServerSocket serverSocket; + private final Thread acceptThread; + private final CompletableFuture ready = new CompletableFuture<>(); + private final CompletableFuture createParams = new CompletableFuture<>(); + private final CompletableFuture resumeParams = new CompletableFuture<>(); + + FakeRuntimeServer() throws IOException { + serverSocket = new ServerSocket(0); + acceptThread = new Thread(this::acceptLoop, "fake-runtime-accept"); + acceptThread.setDaemon(true); + acceptThread.start(); + } + + String url() { + return "127.0.0.1:" + serverSocket.getLocalPort(); + } + + JsonNode awaitCreate() throws Exception { + return createParams.get(15, TimeUnit.SECONDS); + } + + JsonNode awaitResume() throws Exception { + return resumeParams.get(15, TimeUnit.SECONDS); + } + + void sendTelemetry(Object params) throws Exception { + ready.get(15, TimeUnit.SECONDS).notify("gitHubTelemetry.event", params); + } + + private void acceptLoop() { + try { + Socket socket = serverSocket.accept(); + JsonRpcClient server = JsonRpcClient.fromSocket(socket); + server.registerMethodHandler("connect", + (id, params) -> respond(server, id, Map.of("protocolVersion", 2))); + server.registerMethodHandler("session.create", (id, params) -> { + createParams.complete(params); + respond(server, id, Map.of("sessionId", params.path("sessionId").asText("created"), "workspacePath", + "/workspace")); + }); + server.registerMethodHandler("session.resume", (id, params) -> { + resumeParams.complete(params); + respond(server, id, Map.of("sessionId", params.path("sessionId").asText("resume-1"), + "workspacePath", "/workspace")); + }); + server.registerMethodHandler("session.destroy", (id, params) -> respond(server, id, Map.of())); + server.registerMethodHandler("runtime.shutdown", (id, params) -> respond(server, id, Map.of())); + ready.complete(server); + } catch (IOException e) { + ready.completeExceptionally(e); + createParams.completeExceptionally(e); + resumeParams.completeExceptionally(e); + } + } + + private static void respond(JsonRpcClient server, String id, Object result) { + if (id == null) { + return; + } + try { + server.sendResponse(id, result); + } catch (IOException e) { + // Connection torn down (e.g. client closing); ignore. + } + } + + @Override + public void close() throws Exception { + JsonRpcClient server = ready.getNow(null); + if (server != null) { + server.close(); + } + serverSocket.close(); + } + } +} diff --git a/nodejs/src/client.ts b/nodejs/src/client.ts index 5d0f51ac1..160a12d48 100644 --- a/nodejs/src/client.ts +++ b/nodejs/src/client.ts @@ -33,7 +33,11 @@ import { registerClientGlobalApiHandlers, registerClientSessionApiHandlers, } from "./generated/rpc.js"; -import type { OpenCanvasInstance, SessionUpdateOptionsParams } from "./generated/rpc.js"; +import type { + GitHubTelemetryNotification, + OpenCanvasInstance, + SessionUpdateOptionsParams, +} from "./generated/rpc.js"; import { getSdkProtocolVersion } from "./sdkProtocolVersion.js"; import { CopilotSession } from "./session.js"; import { createSessionFsAdapter, type SessionFsProvider } from "./sessionFsProvider.js"; @@ -514,7 +518,8 @@ export class CopilotClient { /** Connection-level session filesystem config, set via constructor option. */ private sessionFsConfig: SessionFsConfig | null = null; private requestHandler: CopilotRequestHandler | null = null; - private llmInferenceHandlers: import("./generated/rpc.js").ClientGlobalApiHandlers = {}; + private onGitHubTelemetry?: (notification: GitHubTelemetryNotification) => void | Promise; + private clientGlobalHandlers: import("./generated/rpc.js").ClientGlobalApiHandlers = {}; /** * Typed server-scoped RPC methods. @@ -634,7 +639,8 @@ export class CopilotClient { this.onGetTraceContext = options.onGetTraceContext; this.sessionFsConfig = options.sessionFs ?? null; this.requestHandler = options.requestHandler ?? null; - this.setupLlmInference(); + this.onGitHubTelemetry = options.onGitHubTelemetry; + this.setupClientGlobalHandlers(); const effectiveEnv = options.env ?? process.env; this.resolvedEnv = effectiveEnv; @@ -751,19 +757,30 @@ export class CopilotClient { session.clientSessionApis.sessionFs = createSessionFsAdapter(provider); } - private setupLlmInference(): void { - if (!this.requestHandler) { - return; - } - this.llmInferenceHandlers = { - llmInference: createCopilotRequestAdapter(this.requestHandler, () => { + private setupClientGlobalHandlers(): void { + const handlers: import("./generated/rpc.js").ClientGlobalApiHandlers = {}; + if (this.requestHandler) { + handlers.llmInference = createCopilotRequestAdapter(this.requestHandler, () => { if (!this.connection) { return undefined; } this._rpc ??= createServerRpc(this.connection); return this._rpc; - }), - }; + }); + } + if (this.onGitHubTelemetry) { + const onGitHubTelemetry = this.onGitHubTelemetry; + handlers.gitHubTelemetry = { + event: async (notification) => { + try { + await onGitHubTelemetry(notification); + } catch { + // Ignore handler errors + } + }, + }; + } + this.clientGlobalHandlers = handlers; } /** @@ -1426,6 +1443,9 @@ export class CopilotClient { workingDirectory: config.workingDirectory, streaming: config.streaming, includeSubAgentStreamingEvents: config.includeSubAgentStreamingEvents ?? true, + ...(this.onGitHubTelemetry != null + ? { enableGitHubTelemetryForwarding: true } + : {}), mcpServers: toWireMcpServers(config.mcpServers), mcpOAuthTokenStorage: config.mcpOAuthTokenStorage, envValueMode: "direct", @@ -1642,6 +1662,9 @@ export class CopilotClient { enableSkills: config.enableSkills, streaming: config.streaming, includeSubAgentStreamingEvents: config.includeSubAgentStreamingEvents ?? true, + ...(this.onGitHubTelemetry != null + ? { enableGitHubTelemetryForwarding: true } + : {}), mcpServers: toWireMcpServers(config.mcpServers), mcpOAuthTokenStorage: config.mcpOAuthTokenStorage, envValueMode: "direct", @@ -2565,7 +2588,7 @@ export class CopilotClient { // Register client *global* API handlers (e.g. LLM inference) on the // same connection. These methods carry no implicit sessionId dispatch // — the runtime calls into a single handler for the whole connection. - registerClientGlobalApiHandlers(this.connection, this.llmInferenceHandlers); + registerClientGlobalApiHandlers(this.connection, this.clientGlobalHandlers); this.connection.onClose(() => { this.state = "disconnected"; diff --git a/nodejs/src/generated/rpc.ts b/nodejs/src/generated/rpc.ts index 79991dae9..02c6f19e5 100644 --- a/nodejs/src/generated/rpc.ts +++ b/nodejs/src/generated/rpc.ts @@ -17151,9 +17151,9 @@ export function registerClientGlobalApiHandlers( if (!handler) throw new Error("No llmInference client-global handler registered"); return handler.httpRequestChunk(params); }); - connection.onRequest("gitHubTelemetry.event", async (params: GitHubTelemetryNotification) => { + connection.onNotification("gitHubTelemetry.event", async (params: GitHubTelemetryNotification) => { const handler = handlers.gitHubTelemetry; - if (!handler) throw new Error("No gitHubTelemetry client-global handler registered"); - return handler.event(params); + if (!handler) return; + await handler.event(params); }); } diff --git a/nodejs/src/index.ts b/nodejs/src/index.ts index eebf9add5..e05b33c15 100644 --- a/nodejs/src/index.ts +++ b/nodejs/src/index.ts @@ -76,6 +76,9 @@ export type { ForegroundSessionInfo, GetAuthStatusResponse, GetStatusResponse, + GitHubTelemetryNotification, + GitHubTelemetryEvent, + GitHubTelemetryClientInfo, InfiniteSessionConfig, LargeToolOutputConfig, MemoryConfiguration, diff --git a/nodejs/src/types.ts b/nodejs/src/types.ts index 898ca0256..97182b5f1 100644 --- a/nodejs/src/types.ts +++ b/nodejs/src/types.ts @@ -17,12 +17,18 @@ import type { } from "./generated/session-events.js"; import type { CopilotSession } from "./session.js"; import type { + GitHubTelemetryNotification, ModelBillingTokenPrices, OpenCanvasInstance, RemoteSessionMode, } from "./generated/rpc.js"; import type { ToolSet } from "./toolSet.js"; export type { RemoteSessionMode } from "./generated/rpc.js"; +export type { + GitHubTelemetryNotification, + GitHubTelemetryEvent, + GitHubTelemetryClientInfo, +} from "./generated/rpc.js"; export type { ModelBillingTokenPrices, ModelBillingTokenPricesLongContext, @@ -339,6 +345,18 @@ export interface CopilotClientOptions { */ requestHandler?: CopilotRequestHandler; + /** + * Experimental. Receives GitHub telemetry events the runtime forwards to + * this connection. When set, the client opts each session it creates or + * resumes into telemetry forwarding and dispatches each + * `gitHubTelemetry.event` notification to this connection-global handler; + * each {@link GitHubTelemetryNotification} carries its originating + * `sessionId`. + * + * @experimental + */ + onGitHubTelemetry?: (notification: GitHubTelemetryNotification) => void | Promise; + /** * Server-wide idle timeout for sessions in seconds. * Sessions without activity for this duration are automatically cleaned up. diff --git a/nodejs/test/client.test.ts b/nodejs/test/client.test.ts index 8c52512fd..b17449454 100644 --- a/nodejs/test/client.test.ts +++ b/nodejs/test/client.test.ts @@ -7,6 +7,7 @@ import { CopilotClient, createCanvas, RuntimeConnection, + type GitHubTelemetryNotification, type ModelInfo, } from "../src/index.js"; import { CopilotSession } from "../src/session.js"; @@ -461,6 +462,137 @@ describe("CopilotClient", () => { expect(resumePayload.sessionLimits).toEqual({ maxAiCredits: 15 }); }); + it("opts into GitHub telemetry forwarding when onGitHubTelemetry is provided", async () => { + const client = new CopilotClient({ onGitHubTelemetry: () => {} }); + await client.start(); + onTestFinished(() => client.forceStop()); + + const spy = vi + .spyOn((client as any).connection!, "sendRequest") + .mockImplementation(async (method: string, params: any) => { + if (method === "session.create") return { sessionId: params.sessionId }; + if (method === "session.resume") return { sessionId: params.sessionId }; + throw new Error(`Unexpected method: ${method}`); + }); + + const session = await client.createSession({ onPermissionRequest: approveAll }); + await client.resumeSession(session.sessionId, { onPermissionRequest: approveAll }); + + const createPayload = spy.mock.calls.find( + ([method]) => method === "session.create" + )![1] as any; + const resumePayload = spy.mock.calls.find( + ([method]) => method === "session.resume" + )![1] as any; + expect(createPayload.enableGitHubTelemetryForwarding).toBe(true); + expect(resumePayload.enableGitHubTelemetryForwarding).toBe(true); + }); + + it("does not opt into GitHub telemetry forwarding without a handler", async () => { + const client = new CopilotClient(); + await client.start(); + onTestFinished(() => client.forceStop()); + + const spy = vi + .spyOn((client as any).connection!, "sendRequest") + .mockImplementation(async (method: string, params: any) => { + if (method === "session.create") return { sessionId: params.sessionId }; + throw new Error(`Unexpected method: ${method}`); + }); + + await client.createSession({ onPermissionRequest: approveAll }); + + const createPayload = spy.mock.calls.find( + ([method]) => method === "session.create" + )![1] as any; + expect(createPayload.enableGitHubTelemetryForwarding).toBeUndefined(); + }); + + it("dispatches a real gitHubTelemetry.event wire message to the handler", async () => { + const { createMessageConnection, StreamMessageReader, StreamMessageWriter } = + await import("vscode-jsonrpc/node.js"); + const { registerClientGlobalApiHandlers } = await import("../src/generated/rpc.js"); + + const clientToServer = new PassThrough(); + const serverToClient = new PassThrough(); + + const clientConn = createMessageConnection( + new StreamMessageReader(serverToClient), + new StreamMessageWriter(clientToServer) + ); + const serverConn = createMessageConnection( + new StreamMessageReader(clientToServer), + new StreamMessageWriter(serverToClient) + ); + onTestFinished(() => { + clientConn.dispose(); + serverConn.dispose(); + }); + + const received: GitHubTelemetryNotification[] = []; + let resolveReceived: () => void; + const got = new Promise((resolve) => { + resolveReceived = resolve; + }); + + registerClientGlobalApiHandlers(clientConn, { + gitHubTelemetry: { + event: async (notification) => { + received.push(notification); + resolveReceived(); + }, + }, + }); + + clientConn.listen(); + serverConn.listen(); + + const notification: GitHubTelemetryNotification = { + sessionId: "session-1", + restricted: false, + event: { + kind: "tool_call_executed", + properties: { tool: "shell" }, + metrics: { duration_ms: 42 }, + }, + }; + + // Deliver the event as a real JSON-RPC *notification* (no id) and confirm + // the generated dispatcher routes it to the registered handler. The runtime + // forwards telemetry via `sendNotification`, which only fires `onNotification` + // handlers — an `onRequest` registration would never be invoked, so sending a + // notification here guards against regressing back to request-style dispatch. + serverConn.sendNotification("gitHubTelemetry.event", notification); + await got; + + expect(received).toEqual([notification]); + }); + + it("registers no gitHubTelemetry handler when onGitHubTelemetry is omitted", () => { + const client = new CopilotClient(); + onTestFinished(() => client.forceStop()); + + const handlers = (client as any).clientGlobalHandlers; + expect(handlers.gitHubTelemetry).toBeUndefined(); + }); + + it("forwards gitHubTelemetry events to the onGitHubTelemetry handler", () => { + const received: GitHubTelemetryNotification[] = []; + const client = new CopilotClient({ onGitHubTelemetry: (n) => received.push(n) }); + onTestFinished(() => client.forceStop()); + + const handlers = (client as any).clientGlobalHandlers; + expect(handlers.gitHubTelemetry).toBeDefined(); + + const notification: GitHubTelemetryNotification = { + sessionId: "session-1", + restricted: false, + event: { kind: "tool_call_executed", properties: {}, metrics: {} }, + }; + handlers.gitHubTelemetry.event(notification); + expect(received).toEqual([notification]); + }); + it("forwards expAssignments in session.create and session.resume", async () => { const client = new CopilotClient(); await client.start(); diff --git a/nodejs/test/e2e/github_telemetry.e2e.test.ts b/nodejs/test/e2e/github_telemetry.e2e.test.ts new file mode 100644 index 000000000..e33178f9d --- /dev/null +++ b/nodejs/test/e2e/github_telemetry.e2e.test.ts @@ -0,0 +1,57 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +import { describe, expect, it } from "vitest"; +import { approveAll, GitHubTelemetryNotification } from "../../src/index.js"; +import { createSdkTestContext } from "./harness/sdkTestContext.js"; +import { waitForCondition } from "./harness/sdkTestHelper.js"; + +// Experimental: exercises the end-to-end GitHub (hydro) telemetry forwarding +// path. The runtime forwards per-session telemetry to opted-in connections via +// the `gitHubTelemetry.event` JSON-RPC *notification*; the SDK opts in +// automatically whenever an `onGitHubTelemetry` handler is registered. Creating +// a session emits an early `session.start` hydro event, so no model round-trip +// (and therefore no recorded CAPI exchange) is needed to observe forwarding. +describe("GitHub telemetry forwarding", async () => { + const received: GitHubTelemetryNotification[] = []; + + const { copilotClient: client } = await createSdkTestContext({ + copilotClientOptions: { + onGitHubTelemetry: (notification) => { + received.push(notification); + }, + }, + }); + + it( + "forwards gitHubTelemetry.event notifications from a live session", + { timeout: 60_000 }, + async () => { + received.length = 0; + + const session = await client.createSession({ + onPermissionRequest: approveAll, + }); + + // The CLI forwards telemetry over the JSON-RPC connection + // asynchronously, so wait until at least one event arrives or we + // time out. + await waitForCondition(() => received.length > 0, { + timeoutMs: 30_000, + timeoutMessage: "Timed out waiting for a gitHubTelemetry.event notification.", + }); + + expect(received.length).toBeGreaterThan(0); + + const notification = received[0]; + expect(typeof notification.sessionId).toBe("string"); + expect(notification.sessionId.length).toBeGreaterThan(0); + expect(typeof notification.restricted).toBe("boolean"); + expect(notification.event).toBeDefined(); + expect(typeof notification.event.kind).toBe("string"); + + await session.disconnect(); + } + ); +}); diff --git a/python/copilot/__init__.py b/python/copilot/__init__.py index cdcfdc6ae..fdf422d2a 100644 --- a/python/copilot/__init__.py +++ b/python/copilot/__init__.py @@ -74,6 +74,9 @@ LlmInferenceHeaders, ) from .generated.rpc import ( + GitHubTelemetryClientInfo, + GitHubTelemetryEvent, + GitHubTelemetryNotification, ModelBillingTokenPrices, ModelBillingTokenPricesLongContext, ) @@ -226,6 +229,9 @@ "GetAuthStatusResponse", "BearerTokenProvider", "GetStatusResponse", + "GitHubTelemetryClientInfo", + "GitHubTelemetryEvent", + "GitHubTelemetryNotification", "InfiniteSessionConfig", "InputOptions", "LargeToolOutputConfig", diff --git a/python/copilot/_jsonrpc.py b/python/copilot/_jsonrpc.py index a58908d08..ed70e4e8d 100644 --- a/python/copilot/_jsonrpc.py +++ b/python/copilot/_jsonrpc.py @@ -80,6 +80,7 @@ def __init__(self, process): self.pending_requests: dict[str, asyncio.Future] = {} self._pending_inline_callbacks: dict[str, Callable[[Any], None]] = {} self.notification_handler: Callable[[str, dict], None] | None = None + self.notification_method_handlers: dict[str, Callable[[dict], Any]] = {} self.request_handlers: dict[str, RequestHandler] = {} self._running = False self._read_thread: threading.Thread | None = None @@ -232,6 +233,19 @@ def set_notification_handler(self, handler: Callable[[str, dict], None]): """Set the handler for incoming notifications from the server.""" self.notification_handler = handler + def set_notification_method_handler(self, method: str, handler: Callable[[dict], Any] | None): + """Register a handler for a specific server-to-client notification method. + + Notifications carry no ``id`` and expect no response, so they are + dispatched separately from request handlers. A registered method + handler takes precedence over the generic notification handler. The + handler may be a coroutine function; its result is awaited. + """ + if handler is None: + self.notification_method_handlers.pop(method, None) + else: + self.notification_method_handlers[method] = handler + def set_request_handler(self, method: str, handler: RequestHandler): if handler is None: self.request_handlers.pop(method, None) @@ -397,9 +411,14 @@ def _handle_message(self, message: dict): # Check if it's a notification from the server if "method" in message and "id" not in message: + method = message["method"] + params = message.get("params", {}) + handler = self.notification_method_handlers.get(method) + if handler is not None and self._loop: + # Method-specific notification handler takes precedence. + self._loop.call_soon_threadsafe(self._dispatch_notification, handler, params) + return if self.notification_handler and self._loop: - method = message["method"] - params = message.get("params", {}) # Schedule notification handler on the event loop for thread safety self._loop.call_soon_threadsafe(self.notification_handler, method, params) return @@ -427,6 +446,25 @@ def _handle_request(self, message: dict): self._loop, ) + def _dispatch_notification(self, handler: Callable[[dict], Any], params: dict): + """Invoke a method-specific notification handler. Runs on the event loop; + coroutine results are scheduled and any error is logged (notifications + carry no response, so failures never propagate to the server).""" + try: + outcome = handler(params) + except Exception: # pylint: disable=broad-except + logger.warning("Notification handler raised", exc_info=True) + return + if inspect.isawaitable(outcome): + + async def _await_outcome(): + try: + await outcome + except Exception: # pylint: disable=broad-except + logger.warning("Notification handler raised", exc_info=True) + + asyncio.create_task(_await_outcome()) + async def _dispatch_request(self, message: dict, handler: RequestHandler): try: params = message.get("params", {}) diff --git a/python/copilot/client.py b/python/copilot/client.py index 6bbdf9136..269aaf96c 100644 --- a/python/copilot/client.py +++ b/python/copilot/client.py @@ -64,6 +64,7 @@ from .generated.rpc import ( ClientGlobalApiHandlers, ClientSessionApiHandlers, + GitHubTelemetryNotification, ModelBillingTokenPrices, ModelBillingTokenPricesLongContext, # noqa: F401 OpenCanvasInstance, @@ -399,6 +400,26 @@ class UriRuntimeConnection(RuntimeConnection): """Shared secret to authenticate the connection.""" +class _GitHubTelemetryAdapter: + """Adapts a user-provided ``on_github_telemetry`` callback to the generated + ``GitHubTelemetryHandler`` protocol. + """ + + def __init__( + self, + callback: Callable[[GitHubTelemetryNotification], None | Awaitable[None]], + ) -> None: + self._callback = callback + + async def event(self, params: GitHubTelemetryNotification) -> None: + try: + result = self._callback(params) + if inspect.isawaitable(result): + await result + except Exception: + logger.warning("Error handling gitHubTelemetry.event notification", exc_info=True) + + @dataclass class _CopilotClientOptions: """Internal configuration carrier used by :class:`CopilotClient`. @@ -420,6 +441,9 @@ class _CopilotClientOptions: session_idle_timeout_seconds: int | None = None enable_remote_sessions: bool = False on_list_models: Callable[[], list[ModelInfo] | Awaitable[list[ModelInfo]]] | None = None + on_github_telemetry: Callable[[GitHubTelemetryNotification], None | Awaitable[None]] | None = ( + None + ) mode: CopilotClientMode = "copilot-cli" @@ -1109,6 +1133,8 @@ def __init__( session_idle_timeout_seconds: int | None = None, enable_remote_sessions: bool = False, on_list_models: Callable[[], list[ModelInfo] | Awaitable[list[ModelInfo]]] | None = None, + on_github_telemetry: Callable[[GitHubTelemetryNotification], None | Awaitable[None]] + | None = None, mode: CopilotClientMode = "copilot-cli", ): """ @@ -1153,6 +1179,10 @@ def __init__( on_list_models: Custom handler for :meth:`list_models`. When provided, the handler is called instead of querying the runtime server. + on_github_telemetry: Internal. Callback invoked when the runtime + forwards a GitHub telemetry event for a session. The callback + may be sync or async. Registering a handler opts every session + opened by this client into telemetry forwarding. Example: >>> # Default — spawns runtime using stdio with the bundled binary @@ -1183,6 +1213,7 @@ def __init__( session_idle_timeout_seconds=session_idle_timeout_seconds, enable_remote_sessions=enable_remote_sessions, on_list_models=on_list_models, + on_github_telemetry=on_github_telemetry, mode=mode, ) connection = ( @@ -1198,6 +1229,7 @@ def __init__( self._options: _CopilotClientOptions = options self._connection: RuntimeConnection = connection self._on_list_models = options.on_list_models + self._on_github_telemetry = options.on_github_telemetry # Resolve connection-mode-specific state. self._actual_host: str = "localhost" @@ -2002,6 +2034,11 @@ async def create_session( else True ) + # Opt this connection into gitHubTelemetry.event notifications when a + # telemetry handler was registered on the client. + if self._on_github_telemetry is not None: + payload["enableGitHubTelemetryForwarding"] = True + # Add provider configuration if provided if provider: payload["provider"] = self._convert_provider_to_wire_format(provider) @@ -2620,6 +2657,11 @@ async def resume_session( else True ) + # Opt this connection into gitHubTelemetry.event notifications when a + # telemetry handler was registered on the client. + if self._on_github_telemetry is not None: + payload["enableGitHubTelemetryForwarding"] = True + # Enable permission request callback if handler provided payload["requestPermission"] = bool(on_permission_request) @@ -3690,7 +3732,7 @@ def handle_notification(method: str, params: dict): "systemMessage.transform", self._handle_system_message_transform ) register_client_session_api_handlers(self._client, self._get_client_session_handlers) - self._register_llm_inference_handlers() + self._register_client_global_handlers() # Start listening for messages loop = asyncio.get_running_loop() @@ -3810,7 +3852,7 @@ def handle_notification(method: str, params: dict): "systemMessage.transform", self._handle_system_message_transform ) register_client_session_api_handlers(self._client, self._get_client_session_handlers) - self._register_llm_inference_handlers() + self._register_client_global_handlers() # Start listening for messages loop = asyncio.get_running_loop() @@ -3883,15 +3925,26 @@ async def _set_session_fs_provider(self) -> None: await self._client.request("sessionFs.setProvider", params) - def _register_llm_inference_handlers(self) -> None: - if self._request_handler is None or not self._client: + def _register_client_global_handlers(self) -> None: + if not self._client: + return + llm_inference_adapter = None + if self._request_handler is not None: + llm_inference_adapter = create_copilot_request_adapter( + self._request_handler, + lambda: self._rpc.llm_inference if self._rpc is not None else None, + ) + github_telemetry_adapter = None + if self._on_github_telemetry is not None: + github_telemetry_adapter = _GitHubTelemetryAdapter(self._on_github_telemetry) + if llm_inference_adapter is None and github_telemetry_adapter is None: return - adapter = create_copilot_request_adapter( - self._request_handler, - lambda: self._rpc.llm_inference if self._rpc is not None else None, - ) register_client_global_api_handlers( - self._client, ClientGlobalApiHandlers(llm_inference=adapter) + self._client, + ClientGlobalApiHandlers( + llm_inference=llm_inference_adapter, + git_hub_telemetry=github_telemetry_adapter, + ), ) async def _set_llm_inference_provider(self) -> None: diff --git a/python/copilot/generated/rpc.py b/python/copilot/generated/rpc.py index 89b724ce9..72e1023b1 100644 --- a/python/copilot/generated/rpc.py +++ b/python/copilot/generated/rpc.py @@ -27096,13 +27096,13 @@ async def handle_llm_inference_http_request_chunk(params: dict) -> dict | None: result = await handler.http_request_chunk(request) return result.to_dict() client.set_request_handler("llmInference.httpRequestChunk", handle_llm_inference_http_request_chunk) - async def handle_git_hub_telemetry_event(params: dict) -> dict | None: + async def handle_git_hub_telemetry_event(params: dict) -> None: request = GitHubTelemetryNotification.from_dict(params) handler = handlers.git_hub_telemetry - if handler is None: raise RuntimeError("No git_hub_telemetry client-global handler registered") + if handler is None: return None await handler.event(request) return None - client.set_request_handler("gitHubTelemetry.event", handle_git_hub_telemetry_event) + client.set_notification_method_handler("gitHubTelemetry.event", handle_git_hub_telemetry_event) __all__ = [ "APIKeyAuthInfo", diff --git a/python/e2e/test_github_telemetry_e2e.py b/python/e2e/test_github_telemetry_e2e.py new file mode 100644 index 000000000..976b0b616 --- /dev/null +++ b/python/e2e/test_github_telemetry_e2e.py @@ -0,0 +1,57 @@ +"""Live CLI E2E coverage for forwarded GitHub telemetry notifications.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from copilot import CopilotClient, GitHubTelemetryNotification, RuntimeConnection +from copilot.session import PermissionHandler + +from .testharness import DEFAULT_GITHUB_TOKEN, E2ETestContext +from .testharness.context import get_cli_path_for_tests + +pytestmark = pytest.mark.asyncio(loop_scope="module") + + +class TestGitHubTelemetryE2E: + async def test_should_receive_session_start_github_telemetry(self, ctx: E2ETestContext): + received: list[GitHubTelemetryNotification] = [] + + def on_github_telemetry(notification: GitHubTelemetryNotification) -> None: + received.append(notification) + + client = CopilotClient( + connection=RuntimeConnection.for_stdio(path=get_cli_path_for_tests(), args=()), + working_directory=ctx.work_dir, + env=ctx.get_env(), + github_token=DEFAULT_GITHUB_TOKEN, + on_github_telemetry=on_github_telemetry, + ) + + session = None + try: + await client.start() + session = await client.create_session( + on_permission_request=PermissionHandler.approve_all, + ) + + for _ in range(600): + if received: + break + await asyncio.sleep(0.05) + + assert received + notification = received[0] + assert isinstance(notification.session_id, str) + assert notification.session_id + assert isinstance(notification.restricted, bool) + assert notification.event is not None + assert isinstance(notification.event.kind, str) + finally: + try: + if session is not None: + await session.disconnect() + finally: + await client.stop() diff --git a/python/test_client.py b/python/test_client.py index 08076e87c..13fc50e73 100644 --- a/python/test_client.py +++ b/python/test_client.py @@ -2272,3 +2272,230 @@ def on_failure(input_data, invocation): }, ) assert result == {"additionalContext": "sync-ok"} + + +class TestGitHubTelemetry: + """Unit tests for the experimental gitHubTelemetry.event consumer surface.""" + + @pytest.mark.asyncio + async def test_create_session_enables_forwarding_when_handler_registered(self): + client = CopilotClient( + connection=RuntimeConnection.for_stdio(path=CLI_PATH), + on_github_telemetry=lambda _notification: None, + ) + await client.start() + + try: + captured = {} + original_request = client._client.request + + async def mock_request(method, params, **kwargs): + captured[method] = params + return await original_request(method, params, **kwargs) + + client._client.request = mock_request + await client.create_session( + on_permission_request=PermissionHandler.approve_all, + ) + assert captured["session.create"]["enableGitHubTelemetryForwarding"] is True + finally: + await client.force_stop() + + @pytest.mark.asyncio + async def test_create_session_omits_forwarding_without_handler(self): + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + + try: + captured = {} + original_request = client._client.request + + async def mock_request(method, params, **kwargs): + captured[method] = params + return await original_request(method, params, **kwargs) + + client._client.request = mock_request + await client.create_session( + on_permission_request=PermissionHandler.approve_all, + ) + assert "enableGitHubTelemetryForwarding" not in captured["session.create"] + finally: + await client.force_stop() + + @pytest.mark.asyncio + async def test_resume_session_enables_forwarding_when_handler_registered(self): + client = CopilotClient( + connection=RuntimeConnection.for_stdio(path=CLI_PATH), + on_github_telemetry=lambda _notification: None, + ) + await client.start() + + try: + session = await client.create_session( + on_permission_request=PermissionHandler.approve_all + ) + + captured = {} + original_request = client._client.request + + async def mock_request(method, params, **kwargs): + captured[method] = params + if method == "session.resume": + return {"sessionId": session.session_id} + return await original_request(method, params, **kwargs) + + client._client.request = mock_request + await client.resume_session( + session.session_id, + on_permission_request=PermissionHandler.approve_all, + ) + assert captured["session.resume"]["enableGitHubTelemetryForwarding"] is True + finally: + await client.force_stop() + + @pytest.mark.asyncio + async def test_resume_session_omits_forwarding_without_handler(self): + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + + try: + session = await client.create_session( + on_permission_request=PermissionHandler.approve_all + ) + + captured = {} + original_request = client._client.request + + async def mock_request(method, params, **kwargs): + captured[method] = params + if method == "session.resume": + return {"sessionId": session.session_id} + return await original_request(method, params, **kwargs) + + client._client.request = mock_request + await client.resume_session( + session.session_id, + on_permission_request=PermissionHandler.approve_all, + ) + assert "enableGitHubTelemetryForwarding" not in captured["session.resume"] + finally: + await client.force_stop() + + @pytest.mark.asyncio + async def test_event_routes_to_handler(self): + from copilot.generated.rpc import GitHubTelemetryNotification + + received: list = [] + + def on_telemetry(notification): + received.append(notification) + + client = CopilotClient( + connection=RuntimeConnection.for_stdio(path=CLI_PATH), + on_github_telemetry=on_telemetry, + ) + await client.start() + + try: + # gitHubTelemetry.event is a JSON-RPC *notification*: the generated + # client-global dispatcher wires it into the notification-handler + # table, never the request-handler table. Regressing to request-style + # dispatch would drop the runtime's id-less telemetry frames. + assert "gitHubTelemetry.event" in client._client.notification_method_handlers + assert "gitHubTelemetry.event" not in client._client.request_handlers + + # Drive a real id-less notification frame through the dispatcher to + # exercise the full from_dict decode + adapter + user-callback path. + client._client._handle_message( + { + "jsonrpc": "2.0", + "method": "gitHubTelemetry.event", + "params": { + "sessionId": "sess-telemetry", + "restricted": True, + "event": { + "kind": "tool_call_executed", + "metrics": {"duration_ms": 12.5}, + "properties": {"tool": "shell"}, + "session_id": "sess-telemetry", + }, + }, + } + ) + + # Notifications dispatch onto the event loop; yield until delivered. + for _ in range(100): + if received: + break + await asyncio.sleep(0.01) + + assert len(received) == 1 + notification = received[0] + assert isinstance(notification, GitHubTelemetryNotification) + assert notification.session_id == "sess-telemetry" + assert notification.restricted is True + assert notification.event.kind == "tool_call_executed" + assert notification.event.metrics["duration_ms"] == 12.5 + assert notification.event.properties["tool"] == "shell" + finally: + await client.force_stop() + + @pytest.mark.asyncio + async def test_event_routes_to_async_handler(self): + from copilot.generated.rpc import GitHubTelemetryNotification + + received: list = [] + delivered = asyncio.Event() + + async def on_telemetry(notification): + await asyncio.sleep(0) + received.append(notification) + delivered.set() + + client = CopilotClient( + connection=RuntimeConnection.for_stdio(path=CLI_PATH), + on_github_telemetry=on_telemetry, + ) + await client.start() + + try: + client._client._handle_message( + { + "jsonrpc": "2.0", + "method": "gitHubTelemetry.event", + "params": { + "sessionId": "sess-async-telemetry", + "restricted": False, + "event": { + "kind": "tool_call_executed", + "metrics": {"duration_ms": 3.5}, + "properties": {"tool": "python"}, + "session_id": "sess-async-telemetry", + }, + }, + } + ) + + await asyncio.wait_for(delivered.wait(), timeout=1) + + assert len(received) == 1 + notification = received[0] + assert isinstance(notification, GitHubTelemetryNotification) + assert notification.session_id == "sess-async-telemetry" + assert notification.restricted is False + assert notification.event.kind == "tool_call_executed" + assert notification.event.metrics["duration_ms"] == 3.5 + assert notification.event.properties["tool"] == "python" + finally: + await client.force_stop() + + @pytest.mark.asyncio + async def test_event_handler_not_registered_without_option(self): + client = CopilotClient(connection=RuntimeConnection.for_stdio(path=CLI_PATH)) + await client.start() + + try: + assert "gitHubTelemetry.event" not in client._client.notification_method_handlers + assert "gitHubTelemetry.event" not in client._client.request_handlers + finally: + await client.force_stop() diff --git a/rust/src/github_telemetry.rs b/rust/src/github_telemetry.rs new file mode 100644 index 000000000..9ef5c6e2e --- /dev/null +++ b/rust/src/github_telemetry.rs @@ -0,0 +1,28 @@ +//! GitHub telemetry forwarding callback surface. +//! +//! The runtime forwards per-session GitHub (hydro) telemetry to opted-in host +//! connections via the `gitHubTelemetry.event` JSON-RPC notification. The +//! payload types (`GitHubTelemetryNotification`, `GitHubTelemetryEvent`, +//! `GitHubTelemetryClientInfo`) are generated from the protocol schema and +//! re-exported here so consumers can register a callback against them via +//! [`ClientOptions::on_github_telemetry`](crate::ClientOptions::on_github_telemetry). +//! +//! Experimental: this surface is part of the GitHub telemetry forwarding +//! feature and may change or be removed without notice. + +use std::sync::Arc; + +#[doc(hidden)] +pub use crate::generated::api_types::{ + GitHubTelemetryClientInfo, GitHubTelemetryEvent, GitHubTelemetryNotification, +}; + +/// Callback invoked for each `gitHubTelemetry.event` notification forwarded by +/// the runtime to a connection that opted into telemetry forwarding. +/// +/// Set via +/// [`ClientOptions::on_github_telemetry`](crate::ClientOptions::on_github_telemetry). +/// Registering a callback auto-enables telemetry forwarding on every session +/// created or resumed by the client. +#[doc(hidden)] +pub type GitHubTelemetryCallback = Arc; diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 22fdc53d7..c31e80dc5 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -15,6 +15,10 @@ pub use errors::*; /// model-layer HTTP and WebSocket traffic the runtime issues for both CAPI and /// BYOK sessions. pub mod copilot_request_handler; +/// GitHub telemetry forwarding callback surface (experimental). Public but +/// `#[doc(hidden)]` — re-exports the generated telemetry payload types. +#[doc(hidden)] +pub mod github_telemetry; /// Event handler traits for session lifecycle. pub mod handler; /// Lifecycle hook callbacks (pre/post tool use, prompt submission, session start/end). @@ -257,6 +261,15 @@ pub struct ClientOptions { /// [`CopilotRequestHandler`] /// instead of issuing the calls itself. pub request_handler: Option>, + /// Connection-level GitHub telemetry forwarding callback (experimental). + /// + /// When set, every session created or resumed on this client opts into + /// telemetry forwarding (`enableGitHubTelemetryForwarding`) and the + /// callback is invoked for each `gitHubTelemetry.event` notification the + /// runtime forwards. `#[doc(hidden)]`, consistent with the experimental + /// telemetry payload types. + #[doc(hidden)] + pub on_github_telemetry: Option, /// Optional [`TraceContextProvider`] used to inject W3C Trace Context /// headers (`traceparent` / `tracestate`) on outbound `session.create`, /// `session.resume`, and `session.send` requests. @@ -336,6 +349,10 @@ impl std::fmt::Debug for ClientOptions { "request_handler", &self.request_handler.as_ref().map(|_| ""), ) + .field( + "on_github_telemetry", + &self.on_github_telemetry.as_ref().map(|_| ""), + ) .field( "on_get_trace_context", &self.on_get_trace_context.as_ref().map(|_| ""), @@ -584,6 +601,7 @@ impl Default for ClientOptions { on_list_models: None, session_fs: None, request_handler: None, + on_github_telemetry: None, on_get_trace_context: None, telemetry: None, base_directory: None, @@ -728,6 +746,20 @@ impl ClientOptions { self } + /// Register a connection-level GitHub telemetry forwarding callback + /// (internal/experimental). Registering a callback auto-enables telemetry + /// forwarding on every session created or resumed on this client; the + /// callback fires for each forwarded `gitHubTelemetry.event` notification. + /// The callback is wrapped in `Arc` internally. + #[doc(hidden)] + pub fn with_on_github_telemetry(mut self, callback: F) -> Self + where + F: Fn(crate::github_telemetry::GitHubTelemetryNotification) + Send + Sync + 'static, + { + self.on_github_telemetry = Some(Arc::new(callback)); + self + } + /// Set the [`TraceContextProvider`] used to inject W3C Trace Context /// headers on outbound `session.create` / `session.resume` / /// `session.send` requests. The provider is wrapped in `Arc` internally. @@ -853,6 +885,11 @@ struct ClientInner { /// Inbound `llmInference.*` dispatcher, installed when /// [`ClientOptions::request_handler`] is set. llm_inference: OnceLock>, + /// Connection-level GitHub telemetry forwarding callback, set from + /// [`ClientOptions::on_github_telemetry`]. Drives the + /// `enableGitHubTelemetryForwarding` wire flag and the + /// `gitHubTelemetry.event` notification dispatch. + on_github_telemetry: Option, on_get_trace_context: Option>, /// Token sent in the `connect` handshake. Auto-generated when the /// SDK spawns its own CLI in TCP mode and no explicit token is set; @@ -1005,6 +1042,7 @@ impl Client { session_fs_config.is_some(), session_fs_sqlite_declared, options.on_get_trace_context, + options.on_github_telemetry, effective_connection_token.clone(), options.mode, )? @@ -1032,6 +1070,7 @@ impl Client { session_fs_config.is_some(), session_fs_sqlite_declared, options.on_get_trace_context, + options.on_github_telemetry, effective_connection_token.clone(), options.mode, )? @@ -1050,6 +1089,7 @@ impl Client { session_fs_config.is_some(), session_fs_sqlite_declared, options.on_get_trace_context, + options.on_github_telemetry, effective_connection_token.clone(), options.mode, )? @@ -1097,6 +1137,7 @@ impl Client { &client.inner.notification_tx, &client.inner.request_rx, Some(dispatcher.clone()), + client.inner.on_github_telemetry.clone(), ); client.rpc().llm_inference().set_provider().await?; debug!( @@ -1129,6 +1170,7 @@ impl Client { false, None, None, + None, ClientMode::default(), ) } @@ -1157,6 +1199,7 @@ impl Client { false, Some(provider), None, + None, ClientMode::default(), ) } @@ -1180,11 +1223,37 @@ impl Client { false, false, None, + None, token, ClientMode::default(), ) } + /// Construct a [`Client`] from raw streams with a preset GitHub telemetry + /// callback, for integration testing telemetry forwarding. + #[doc(hidden)] + #[cfg(any(test, feature = "test-support"))] + pub fn from_streams_with_github_telemetry( + reader: impl AsyncRead + Unpin + Send + 'static, + writer: impl AsyncWrite + Unpin + Send + 'static, + cwd: PathBuf, + on_github_telemetry: crate::github_telemetry::GitHubTelemetryCallback, + ) -> Result { + Self::from_transport( + reader, + writer, + None, + cwd, + None, + false, + false, + None, + Some(on_github_telemetry), + None, + ClientMode::default(), + ) + } + /// Public test-only wrapper around the random connection-token /// generator used by [`Client::start`] when the SDK spawns a TCP /// server without an explicit token. Lets integration tests @@ -1205,6 +1274,7 @@ impl Client { session_fs_configured: bool, session_fs_sqlite_declared: bool, on_get_trace_context: Option>, + on_github_telemetry: Option, effective_connection_token: Option, mode: ClientMode, ) -> Result { @@ -1237,6 +1307,7 @@ impl Client { session_fs_configured, session_fs_sqlite_declared, llm_inference: OnceLock::new(), + on_github_telemetry, on_get_trace_context, effective_connection_token, mode, @@ -1646,6 +1717,7 @@ impl Client { &self.inner.notification_tx, &self.inner.request_rx, self.inner.llm_inference.get().cloned(), + self.inner.on_github_telemetry.clone(), ); self.inner.router.register(session_id) } @@ -2732,6 +2804,7 @@ mod tests { session_fs_configured: false, session_fs_sqlite_declared: false, llm_inference: OnceLock::new(), + on_github_telemetry: None, on_get_trace_context: None, effective_connection_token: None, mode: ClientMode::default(), diff --git a/rust/src/router.rs b/rust/src/router.rs index cc621c287..adc192382 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -86,6 +86,7 @@ impl SessionRouter { notification_tx: &broadcast::Sender, request_rx: &Mutex>>, llm_inference: Option>, + github_telemetry: Option, ) { let mut started = self.started.lock(); if *started { @@ -100,6 +101,40 @@ impl SessionRouter { loop { match notif_rx.recv().await { Ok(notification) => { + // Client-global `gitHubTelemetry.event` notifications carry + // no routable session and are surfaced to the consumer + // callback (if any) registered at client construction. + if notification.method == "gitHubTelemetry.event" { + if let Some(ref callback) = github_telemetry { + let Some(ref params) = notification.params else { + continue; + }; + match serde_json::from_value::< + crate::github_telemetry::GitHubTelemetryNotification, + >(params.clone()) + { + Ok(telemetry) => { + if std::panic::catch_unwind(std::panic::AssertUnwindSafe( + || callback(telemetry), + )) + .is_err() + { + warn!( + "gitHubTelemetry.event callback panicked; \ + continuing notification routing" + ); + } + } + Err(e) => { + warn!( + error = %e, + "failed to deserialize gitHubTelemetry.event notification" + ); + } + } + } + continue; + } if notification.method != "session.event" { continue; } diff --git a/rust/src/session.rs b/rust/src/session.rs index e5a2dc4dd..d0fadd044 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -876,7 +876,9 @@ impl Client { let opt_custom_agents_local_only = config.custom_agents_local_only; let opt_coauthor_enabled = config.coauthor_enabled; let opt_manage_schedule_enabled = config.manage_schedule_enabled; - let (wire, mut runtime) = config.into_wire(local_session_id.clone())?; + let (mut wire, mut runtime) = config.into_wire(local_session_id.clone())?; + wire.enable_github_telemetry_forwarding = + self.inner.on_github_telemetry.is_some().then_some(true); let permission_handler = crate::permission::resolve_handler( runtime.permission_handler.take(), @@ -1139,7 +1141,9 @@ impl Client { let opt_custom_agents_local_only = config.custom_agents_local_only; let opt_coauthor_enabled = config.coauthor_enabled; let opt_manage_schedule_enabled = config.manage_schedule_enabled; - let (wire, mut runtime) = config.into_wire()?; + let (mut wire, mut runtime) = config.into_wire()?; + wire.enable_github_telemetry_forwarding = + self.inner.on_github_telemetry.is_some().then_some(true); let permission_handler = crate::permission::resolve_handler( runtime.permission_handler.take(), diff --git a/rust/src/types.rs b/rust/src/types.rs index e42ecdd11..e5ba28a56 100644 --- a/rust/src/types.rs +++ b/rust/src/types.rs @@ -2163,6 +2163,7 @@ impl SessionConfig { remote_session: self.remote_session, cloud: self.cloud, include_sub_agent_streaming_events: self.include_sub_agent_streaming_events, + enable_github_telemetry_forwarding: None, commands: wire_commands, exp_assignments: self.exp_assignments, }; @@ -3173,6 +3174,7 @@ impl ResumeSessionConfig { github_token: self.github_token, remote_session: self.remote_session, include_sub_agent_streaming_events: self.include_sub_agent_streaming_events, + enable_github_telemetry_forwarding: None, commands: wire_commands, exp_assignments: self.exp_assignments, suppress_resume_event: self.suppress_resume_event, diff --git a/rust/src/wire.rs b/rust/src/wire.rs index f888e032a..f73870fa5 100644 --- a/rust/src/wire.rs +++ b/rust/src/wire.rs @@ -160,6 +160,11 @@ pub(crate) struct SessionCreateWire { pub cloud: Option, #[serde(skip_serializing_if = "Option::is_none")] pub include_sub_agent_streaming_events: Option, + #[serde( + rename = "enableGitHubTelemetryForwarding", + skip_serializing_if = "Option::is_none" + )] + pub enable_github_telemetry_forwarding: Option, #[serde(skip_serializing_if = "Option::is_none")] pub commands: Option>, #[serde(skip_serializing_if = "Option::is_none")] @@ -283,6 +288,11 @@ pub(crate) struct SessionResumeWire { pub remote_session: Option, #[serde(skip_serializing_if = "Option::is_none")] pub include_sub_agent_streaming_events: Option, + #[serde( + rename = "enableGitHubTelemetryForwarding", + skip_serializing_if = "Option::is_none" + )] + pub enable_github_telemetry_forwarding: Option, #[serde(skip_serializing_if = "Option::is_none")] pub commands: Option>, /// Maps to wire field `disableResume`. diff --git a/rust/tests/e2e.rs b/rust/tests/e2e.rs index 79059c7f2..62412963b 100644 --- a/rust/tests/e2e.rs +++ b/rust/tests/e2e.rs @@ -31,6 +31,8 @@ mod elicitation; mod error_resilience; #[path = "e2e/event_fidelity.rs"] mod event_fidelity; +#[path = "e2e/github_telemetry.rs"] +mod github_telemetry; #[path = "e2e/hooks.rs"] mod hooks; #[path = "e2e/hooks_extended.rs"] diff --git a/rust/tests/e2e/github_telemetry.rs b/rust/tests/e2e/github_telemetry.rs new file mode 100644 index 000000000..26e2a3f94 --- /dev/null +++ b/rust/tests/e2e/github_telemetry.rs @@ -0,0 +1,60 @@ +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use github_copilot_sdk::github_telemetry::GitHubTelemetryNotification; +use github_copilot_sdk::handler::ApproveAllHandler; +use github_copilot_sdk::{Client, SessionConfig}; + +use super::support::{DEFAULT_TEST_TOKEN, with_e2e_context_no_snapshot}; + +#[tokio::test] +async fn should_forward_github_telemetry_on_session_create() { + with_e2e_context_no_snapshot(|ctx| { + Box::pin(async move { + ctx.set_default_copilot_user(); + + let notifications = Arc::new(Mutex::new(Vec::::new())); + let collected = notifications.clone(); + let client = Client::start(ctx.client_options().with_on_github_telemetry(move |n| { + collected.lock().unwrap().push(n); + })) + .await + .expect("start client"); + let session = client + .create_session( + SessionConfig::default() + .with_github_token(DEFAULT_TEST_TOKEN) + .with_permission_handler(Arc::new(ApproveAllHandler)), + ) + .await + .expect("create session"); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + loop { + if !notifications.lock().unwrap().is_empty() { + break; + } + assert!( + tokio::time::Instant::now() < deadline, + "timed out waiting for github telemetry notification" + ); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + { + let notifications = notifications.lock().unwrap(); + assert!(!notifications.is_empty()); + let first = notifications + .first() + .expect("github telemetry notification"); + assert!(!first.session_id.is_empty()); + let _: bool = first.restricted; + assert!(!first.event.kind.is_empty()); + } + + session.disconnect().await.expect("disconnect session"); + client.stop().await.expect("stop client"); + }) + }) + .await; +} diff --git a/rust/tests/e2e/support.rs b/rust/tests/e2e/support.rs index 5052ef1be..7e47d8fba 100644 --- a/rust/tests/e2e/support.rs +++ b/rust/tests/e2e/support.rs @@ -157,12 +157,7 @@ impl E2eContext { } pub fn client_options(&self) -> ClientOptions { - ClientOptions::new() - .with_program(CliProgram::Path(PathBuf::from(node_program()))) - .with_prefix_args([self.cli_path.as_os_str().to_owned()]) - .with_cwd(self.work_dir.path()) - .with_env(self.environment()) - .with_use_logged_in_user(false) + client_options_for_cli(&self.cli_path, self.work_dir.path(), self.environment()) } pub fn client_options_with_transport(&self, transport: Transport) -> ClientOptions { @@ -188,12 +183,7 @@ impl E2eContext { .iter() .map(|(key, value)| (OsString::from(*key), OsString::from(*value))), ); - let options = ClientOptions::new() - .with_program(CliProgram::Path(PathBuf::from(node_program()))) - .with_prefix_args([self.cli_path.as_os_str().to_owned()]) - .with_cwd(self.work_dir.path()) - .with_env(env) - .with_use_logged_in_user(false) + let options = client_options_for_cli(&self.cli_path, self.work_dir.path(), env) .with_request_handler(handler); Client::start(options).await.expect("start E2E LLM client") } @@ -627,6 +617,28 @@ fn cli_path(repo_root: &Path) -> std::io::Result { )) } +fn client_options_for_cli( + cli_path: &Path, + cwd: &Path, + env: Vec<(OsString, OsString)>, +) -> ClientOptions { + let options = ClientOptions::new() + .with_cwd(cwd) + .with_env(env) + .with_use_logged_in_user(false); + if cli_path + .extension() + .and_then(|extension| extension.to_str()) + .is_some_and(|extension| extension.eq_ignore_ascii_case("js")) + { + options + .with_program(CliProgram::Path(PathBuf::from(node_program()))) + .with_prefix_args([cli_path.as_os_str().to_owned()]) + } else { + options.with_program(CliProgram::Path(cli_path.to_path_buf())) + } +} + fn canonical_temp_path(path: &Path) -> PathBuf { std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf()) } diff --git a/rust/tests/session_test.rs b/rust/tests/session_test.rs index 2a2ceabf8..08f8a7653 100644 --- a/rust/tests/session_test.rs +++ b/rust/tests/session_test.rs @@ -754,6 +754,234 @@ async fn create_session_sends_canvas_wire_fields() { timeout(TIMEOUT, create_handle).await.unwrap().unwrap(); } +fn make_client_with_telemetry( + callback: github_copilot_sdk::github_telemetry::GitHubTelemetryCallback, +) -> (Client, tokio::io::DuplexStream, tokio::io::DuplexStream) { + let (client_write, server_read) = duplex(8192); + let (server_write, client_read) = duplex(8192); + let client = Client::from_streams_with_github_telemetry( + client_read, + client_write, + std::env::temp_dir(), + callback, + ) + .unwrap(); + (client, server_read, server_write) +} + +#[tokio::test] +async fn create_and_resume_send_github_telemetry_forwarding_when_callback_registered() { + use github_copilot_sdk::types::ResumeSessionConfig; + + let callback: github_copilot_sdk::github_telemetry::GitHubTelemetryCallback = + Arc::new(|_notification| {}); + let (client, mut server_read, mut server_write) = make_client_with_telemetry(callback); + + let create_handle = tokio::spawn({ + let client = client.clone(); + async move { + client + .create_session(SessionConfig::default()) + .await + .unwrap() + } + }); + + let request = read_framed(&mut server_read).await; + assert_eq!(request["method"], "session.create"); + assert_eq!(request["params"]["enableGitHubTelemetryForwarding"], true); + + let id = request["id"].as_u64().unwrap(); + let session_id = requested_session_id(&request).to_string(); + let response = serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "result": { "sessionId": session_id.clone() }, + }); + write_framed(&mut server_write, &serde_json::to_vec(&response).unwrap()).await; + timeout(TIMEOUT, create_handle).await.unwrap().unwrap(); + + let resume_handle = tokio::spawn({ + let client = client.clone(); + let session_id = session_id.clone(); + async move { + client + .resume_session(ResumeSessionConfig::new(SessionId::from(session_id))) + .await + .unwrap() + } + }); + + let request = read_framed(&mut server_read).await; + assert_eq!(request["method"], "session.resume"); + assert_eq!(request["params"]["enableGitHubTelemetryForwarding"], true); + + let id = request["id"].as_u64().unwrap(); + let response = serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "result": { "sessionId": session_id }, + }); + write_framed(&mut server_write, &serde_json::to_vec(&response).unwrap()).await; + + let reload = read_framed(&mut server_read).await; + assert_eq!(reload["method"], "session.skills.reload"); + let id = reload["id"].as_u64().unwrap(); + let response = serde_json::json!({ "jsonrpc": "2.0", "id": id, "result": {} }); + write_framed(&mut server_write, &serde_json::to_vec(&response).unwrap()).await; + + timeout(TIMEOUT, resume_handle).await.unwrap().unwrap(); +} + +#[tokio::test] +async fn create_session_omits_github_telemetry_forwarding_without_callback() { + let (client, mut server_read, mut server_write) = make_client(); + + let create_handle = tokio::spawn({ + let client = client.clone(); + async move { + client + .create_session(SessionConfig::default()) + .await + .unwrap() + } + }); + + let request = read_framed(&mut server_read).await; + assert_eq!(request["method"], "session.create"); + assert!( + request["params"] + .get("enableGitHubTelemetryForwarding") + .is_none_or(Value::is_null), + "forwarding flag should be omitted when no callback is registered" + ); + + let id = request["id"].as_u64().unwrap(); + let session_id = requested_session_id(&request).to_string(); + let response = serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "result": { "sessionId": session_id }, + }); + write_framed(&mut server_write, &serde_json::to_vec(&response).unwrap()).await; + timeout(TIMEOUT, create_handle).await.unwrap().unwrap(); +} + +#[tokio::test] +async fn resume_session_omits_github_telemetry_forwarding_without_callback() { + use github_copilot_sdk::types::ResumeSessionConfig; + + let (client, mut server_read, mut server_write) = make_client(); + + let resume_handle = tokio::spawn({ + let client = client.clone(); + async move { + client + .resume_session(ResumeSessionConfig::new(SessionId::from( + "sess-1".to_string(), + ))) + .await + .unwrap() + } + }); + + let request = read_framed(&mut server_read).await; + assert_eq!(request["method"], "session.resume"); + assert!( + request["params"] + .get("enableGitHubTelemetryForwarding") + .is_none_or(Value::is_null), + "forwarding flag should be omitted when no callback is registered" + ); + + let id = request["id"].as_u64().unwrap(); + let response = serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "result": { "sessionId": "sess-1" }, + }); + write_framed(&mut server_write, &serde_json::to_vec(&response).unwrap()).await; + + let reload = read_framed(&mut server_read).await; + assert_eq!(reload["method"], "session.skills.reload"); + let id = reload["id"].as_u64().unwrap(); + let response = serde_json::json!({ "jsonrpc": "2.0", "id": id, "result": {} }); + write_framed(&mut server_write, &serde_json::to_vec(&response).unwrap()).await; + + timeout(TIMEOUT, resume_handle).await.unwrap().unwrap(); +} + +#[tokio::test] +async fn github_telemetry_event_dispatches_to_callback() { + use github_copilot_sdk::github_telemetry::GitHubTelemetryNotification; + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + let callback: github_copilot_sdk::github_telemetry::GitHubTelemetryCallback = + Arc::new(move |notification| { + let _ = tx.send(notification); + }); + let (client, mut server_read, mut server_write) = make_client_with_telemetry(callback); + + let create_handle = tokio::spawn({ + let client = client.clone(); + async move { + client + .create_session(SessionConfig::default()) + .await + .unwrap() + } + }); + + let request = read_framed(&mut server_read).await; + let id = request["id"].as_u64().unwrap(); + let session_id = requested_session_id(&request).to_string(); + let response = serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "result": { "sessionId": session_id.clone() }, + }); + write_framed(&mut server_write, &serde_json::to_vec(&response).unwrap()).await; + timeout(TIMEOUT, create_handle).await.unwrap().unwrap(); + + let notification = serde_json::json!({ + "jsonrpc": "2.0", + "method": "gitHubTelemetry.event", + "params": { + "sessionId": session_id.clone(), + "restricted": false, + "event": { + "kind": "tool_call_executed", + "properties": { "tool": "bash" }, + "metrics": { "duration_ms": 12.0 }, + "session_id": session_id.clone(), + "created_at": "2025-01-01T00:00:00Z" + } + } + }); + write_framed( + &mut server_write, + &serde_json::to_vec(¬ification).unwrap(), + ) + .await; + + let received = timeout(TIMEOUT, rx.recv()).await.unwrap().unwrap(); + assert_eq!(received.session_id, session_id); + assert!(!received.restricted); + assert_eq!(received.event.kind, "tool_call_executed"); + assert_eq!( + received.event.properties.get("tool").map(String::as_str), + Some("bash") + ); + assert_eq!( + received.event.metrics.get("duration_ms").copied(), + Some(12.0) + ); + assert_eq!( + received.event.created_at.as_deref(), + Some("2025-01-01T00:00:00Z") + ); +} + #[tokio::test] async fn provider_canvas_dispatch_routes_direct_canvas_action_requests() { let (session, mut server) = create_session_pair_with_config(|cfg| { diff --git a/scripts/codegen/go.ts b/scripts/codegen/go.ts index 57957499e..1e6cf0a42 100644 --- a/scripts/codegen/go.ts +++ b/scripts/codegen/go.ts @@ -4385,6 +4385,11 @@ function emitClientGlobalApiRegistration(lines: string[], clientSchema: Record None:`); + lines.push(` request = ${paramsType}.from_dict(params)`); + lines.push(` handler = handlers.${handlerField}`); + lines.push(` if handler is None: return None`); + lines.push(` await handler.${handlerMethod}(request)`); + lines.push(` return None`); + lines.push(` client.set_notification_method_handler("${method.rpcMethod}", ${handlerVariableName})`); + return; + } + lines.push(` async def ${handlerVariableName}(params: dict) -> dict | None:`); lines.push(` request = ${paramsType}.from_dict(params)`); lines.push(` handler = handlers.${handlerField}`); diff --git a/scripts/codegen/typescript.ts b/scripts/codegen/typescript.ts index 1303a4979..497c909ea 100644 --- a/scripts/codegen/typescript.ts +++ b/scripts/codegen/typescript.ts @@ -1011,7 +1011,24 @@ function emitClientGlobalApiRegistration(clientSchema: Record): const pType = paramsTypeName(method); const hasParams = hasSchemaPayload(getMethodParamsSchema(method)); - if (hasParams) { + if (method.notification) { + // Notification methods carry no response; the server dispatches + // them via `sendNotification`, which only fires `onNotification` + // handlers (an `onRequest` handler would never be invoked). + if (hasParams) { + lines.push(` connection.onNotification("${method.rpcMethod}", async (params: ${pType}) => {`); + lines.push(` const handler = handlers.${groupName};`); + lines.push(` if (!handler) return;`); + lines.push(` await handler.${name}(params);`); + lines.push(` });`); + } else { + lines.push(` connection.onNotification("${method.rpcMethod}", async () => {`); + lines.push(` const handler = handlers.${groupName};`); + lines.push(` if (!handler) return;`); + lines.push(` await handler.${name}();`); + lines.push(` });`); + } + } else if (hasParams) { lines.push(` connection.onRequest("${method.rpcMethod}", async (params: ${pType}) => {`); lines.push(` const handler = handlers.${groupName};`); lines.push(` if (!handler) throw new Error("No ${groupName} client-global handler registered");`); diff --git a/scripts/codegen/utils.ts b/scripts/codegen/utils.ts index c63f9732c..9ab335b05 100644 --- a/scripts/codegen/utils.ts +++ b/scripts/codegen/utils.ts @@ -383,6 +383,7 @@ export interface RpcMethod { stability?: string; visibility?: string; deprecated?: boolean; + notification?: boolean; } export function getRpcSchemaTypeName(schema: JSONSchema7 | null | undefined, fallback: string): string {