Skip to content

Commit e9c5d03

Browse files
committed
Integrate circuit breaker with vMCP routing and aggregation
Complete the circuit breaker integration by wiring it through the Virtual MCP Server stack for automatic backend failure handling. Configuration wiring (cmd/vmcp/app/commands.go): - Wire circuit breaker config from YAML to health monitor - Pass configurable failure threshold and timeout to monitor - Log circuit breaker enablement on server startup Capability filtering (pkg/vmcp/aggregator/default_aggregator.go): - Filter tools, resources, and prompts from unhealthy backends - Skip capabilities when backend status is Unhealthy, Unknown, or Unauthenticated - Keep capabilities from Healthy and Degraded backends (degraded backends are still operational) - Add isBackendHealthy helper for status evaluation Routing protection (pkg/vmcp/router/default_router.go): - Check backend health before routing requests - Return ErrBackendUnavailable when backend is unhealthy - Add isTargetHealthy helper for status evaluation - Log warnings when routing fails due to backend unavailability Testing (pkg/vmcp/router/default_router_test.go): - Add HealthStatus to all test fixtures - Add test cases for unhealthy backend routing failures - Add test cases for unauthenticated backend failures - Add test cases for degraded backend success (still operational) - Cover all three routing methods: tools, resources, prompts Related-to: #3036
1 parent 3556e04 commit e9c5d03

4 files changed

Lines changed: 231 additions & 40 deletions

File tree

cmd/vmcp/app/commands.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,19 @@ func runServe(cmd *cobra.Command, _ []string) error {
433433
Timeout: defaults.Timeout,
434434
DegradedThreshold: defaults.DegradedThreshold,
435435
}
436+
437+
// Configure circuit breaker if enabled
438+
if cfg.Operational.FailureHandling.CircuitBreaker != nil && cfg.Operational.FailureHandling.CircuitBreaker.Enabled {
439+
healthMonitorConfig.CircuitBreaker = &health.CircuitBreakerConfig{
440+
Enabled: true,
441+
FailureThreshold: cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold,
442+
Timeout: time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout),
443+
}
444+
logger.Infof("Circuit breaker enabled (failure threshold: %d, timeout: %v)",
445+
cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold,
446+
time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout))
447+
}
448+
436449
logger.Info("Health monitoring configured from operational settings")
437450
}
438451

pkg/vmcp/aggregator/default_aggregator.go

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -302,65 +302,77 @@ func (a *defaultAggregator) MergeCapabilities(
302302
// Convert resolved tools to final vmcp.Tool format
303303
tools := make([]vmcp.Tool, 0, len(resolved.Tools))
304304
for _, resolvedTool := range resolved.Tools {
305+
// Look up full backend information from registry
306+
backend := registry.Get(ctx, resolvedTool.BackendID)
307+
if backend == nil {
308+
logger.Warnf("Backend %s not found in registry for tool %s, skipping",
309+
resolvedTool.BackendID, resolvedTool.ResolvedName)
310+
continue
311+
}
312+
313+
// Filter out tools from unhealthy backends
314+
if !isBackendHealthy(backend.HealthStatus) {
315+
logger.Debugf("Skipping tool %s from unhealthy backend %s (status: %s)",
316+
resolvedTool.ResolvedName, backend.Name, backend.HealthStatus)
317+
continue
318+
}
319+
305320
tools = append(tools, vmcp.Tool{
306321
Name: resolvedTool.ResolvedName,
307322
Description: resolvedTool.Description,
308323
InputSchema: resolvedTool.InputSchema,
309324
BackendID: resolvedTool.BackendID,
310325
})
311326

312-
// Look up full backend information from registry
313-
backend := registry.Get(ctx, resolvedTool.BackendID)
314-
if backend == nil {
315-
logger.Warnf("Backend %s not found in registry for tool %s, creating minimal target",
316-
resolvedTool.BackendID, resolvedTool.ResolvedName)
317-
routingTable.Tools[resolvedTool.ResolvedName] = &vmcp.BackendTarget{
318-
WorkloadID: resolvedTool.BackendID,
319-
OriginalCapabilityName: resolvedTool.OriginalName,
320-
}
321-
} else {
322-
// Use the backendToTarget helper from registry package
323-
target := vmcp.BackendToTarget(backend)
324-
// Store the original tool name for forwarding to backend
325-
target.OriginalCapabilityName = resolvedTool.OriginalName
326-
routingTable.Tools[resolvedTool.ResolvedName] = target
327-
}
327+
// Use the backendToTarget helper from registry package
328+
target := vmcp.BackendToTarget(backend)
329+
// Store the original tool name for forwarding to backend
330+
target.OriginalCapabilityName = resolvedTool.OriginalName
331+
routingTable.Tools[resolvedTool.ResolvedName] = target
328332
}
329333

330334
// Add resources to routing table
331335
for _, resource := range resolved.Resources {
332336
backend := registry.Get(ctx, resource.BackendID)
333337
if backend == nil {
334-
logger.Warnf("Backend %s not found in registry for resource %s, creating minimal target",
338+
logger.Warnf("Backend %s not found in registry for resource %s, skipping",
335339
resource.BackendID, resource.URI)
336-
routingTable.Resources[resource.URI] = &vmcp.BackendTarget{
337-
WorkloadID: resource.BackendID,
338-
OriginalCapabilityName: resource.URI,
339-
}
340-
} else {
341-
target := vmcp.BackendToTarget(backend)
342-
// Store the original resource URI for forwarding to backend
343-
target.OriginalCapabilityName = resource.URI
344-
routingTable.Resources[resource.URI] = target
340+
continue
341+
}
342+
343+
// Filter out resources from unhealthy backends
344+
if !isBackendHealthy(backend.HealthStatus) {
345+
logger.Debugf("Skipping resource %s from unhealthy backend %s (status: %s)",
346+
resource.URI, backend.Name, backend.HealthStatus)
347+
continue
345348
}
349+
350+
target := vmcp.BackendToTarget(backend)
351+
// Store the original resource URI for forwarding to backend
352+
target.OriginalCapabilityName = resource.URI
353+
routingTable.Resources[resource.URI] = target
346354
}
347355

348356
// Add prompts to routing table
349357
for _, prompt := range resolved.Prompts {
350358
backend := registry.Get(ctx, prompt.BackendID)
351359
if backend == nil {
352-
logger.Warnf("Backend %s not found in registry for prompt %s, creating minimal target",
360+
logger.Warnf("Backend %s not found in registry for prompt %s, skipping",
353361
prompt.BackendID, prompt.Name)
354-
routingTable.Prompts[prompt.Name] = &vmcp.BackendTarget{
355-
WorkloadID: prompt.BackendID,
356-
OriginalCapabilityName: prompt.Name,
357-
}
358-
} else {
359-
target := vmcp.BackendToTarget(backend)
360-
// Store the original prompt name for forwarding to backend
361-
target.OriginalCapabilityName = prompt.Name
362-
routingTable.Prompts[prompt.Name] = target
362+
continue
363363
}
364+
365+
// Filter out prompts from unhealthy backends
366+
if !isBackendHealthy(backend.HealthStatus) {
367+
logger.Debugf("Skipping prompt %s from unhealthy backend %s (status: %s)",
368+
prompt.Name, backend.Name, backend.HealthStatus)
369+
continue
370+
}
371+
372+
target := vmcp.BackendToTarget(backend)
373+
// Store the original prompt name for forwarding to backend
374+
target.OriginalCapabilityName = prompt.Name
375+
routingTable.Prompts[prompt.Name] = target
364376
}
365377

366378
// Determine conflict strategy used
@@ -466,3 +478,18 @@ func (a *defaultAggregator) AggregateCapabilities(
466478

467479
return aggregated, nil
468480
}
481+
482+
// isBackendHealthy determines if a backend is healthy enough to have its capabilities included.
483+
// Returns true for healthy and degraded backends (degraded backends are still operational).
484+
// Returns false for unhealthy, unknown, and unauthenticated backends.
485+
func isBackendHealthy(status vmcp.BackendHealthStatus) bool {
486+
switch status {
487+
case vmcp.BackendHealthy, vmcp.BackendDegraded:
488+
return true
489+
case vmcp.BackendUnhealthy, vmcp.BackendUnknown, vmcp.BackendUnauthenticated:
490+
return false
491+
default:
492+
// Unknown status - err on the side of caution
493+
return false
494+
}
495+
}

pkg/vmcp/router/default_router.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ func routeCapability(
7171
return nil, fmt.Errorf("%w: %s", notFoundErr, key)
7272
}
7373

74+
// Check if the backend is healthy before routing
75+
if !isTargetHealthy(target.HealthStatus) {
76+
logger.Warnf("%s %s found but backend %s is unavailable (status: %s)",
77+
entityType, key, target.WorkloadName, target.HealthStatus)
78+
return nil, fmt.Errorf("%w: backend %s is %s", ErrBackendUnavailable, target.WorkloadName, target.HealthStatus)
79+
}
80+
7481
logger.Debugf("Routed %s %s to backend %s", entityType, key, target.WorkloadID)
7582
return target, nil
7683
}
@@ -116,3 +123,18 @@ func (*defaultRouter) RoutePrompt(ctx context.Context, name string) (*vmcp.Backe
116123
ErrPromptNotFound,
117124
)
118125
}
126+
127+
// isTargetHealthy determines if a backend target is healthy enough to route requests to.
128+
// Returns true for healthy and degraded backends (degraded backends are still operational).
129+
// Returns false for unhealthy, unknown, and unauthenticated backends.
130+
func isTargetHealthy(status vmcp.BackendHealthStatus) bool {
131+
switch status {
132+
case vmcp.BackendHealthy, vmcp.BackendDegraded:
133+
return true
134+
case vmcp.BackendUnhealthy, vmcp.BackendUnknown, vmcp.BackendUnauthenticated:
135+
return false
136+
default:
137+
// Unknown status - err on the side of caution
138+
return false
139+
}
140+
}

pkg/vmcp/router/default_router_test.go

Lines changed: 133 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func TestDefaultRouter_RouteTool(t *testing.T) {
3535
WorkloadID: "backend1",
3636
WorkloadName: "Backend 1",
3737
BaseURL: "http://backend1:8080",
38+
HealthStatus: vmcp.BackendHealthy,
3839
},
3940
},
4041
Resources: make(map[string]*vmcp.BackendTarget),
@@ -73,6 +74,60 @@ func TestDefaultRouter_RouteTool(t *testing.T) {
7374
expectError: true,
7475
errorContains: "routing table tools map not initialized",
7576
},
77+
{
78+
name: "backend is unhealthy",
79+
setupTable: &vmcp.RoutingTable{
80+
Tools: map[string]*vmcp.BackendTarget{
81+
"test_tool": {
82+
WorkloadID: "backend1",
83+
WorkloadName: "Backend 1",
84+
BaseURL: "http://backend1:8080",
85+
HealthStatus: vmcp.BackendUnhealthy,
86+
},
87+
},
88+
Resources: make(map[string]*vmcp.BackendTarget),
89+
Prompts: make(map[string]*vmcp.BackendTarget),
90+
},
91+
toolName: "test_tool",
92+
expectError: true,
93+
errorContains: "backend unavailable",
94+
},
95+
{
96+
name: "backend is unauthenticated",
97+
setupTable: &vmcp.RoutingTable{
98+
Tools: map[string]*vmcp.BackendTarget{
99+
"test_tool": {
100+
WorkloadID: "backend1",
101+
WorkloadName: "Backend 1",
102+
BaseURL: "http://backend1:8080",
103+
HealthStatus: vmcp.BackendUnauthenticated,
104+
},
105+
},
106+
Resources: make(map[string]*vmcp.BackendTarget),
107+
Prompts: make(map[string]*vmcp.BackendTarget),
108+
},
109+
toolName: "test_tool",
110+
expectError: true,
111+
errorContains: "backend unavailable",
112+
},
113+
{
114+
name: "backend is degraded but still works",
115+
setupTable: &vmcp.RoutingTable{
116+
Tools: map[string]*vmcp.BackendTarget{
117+
"test_tool": {
118+
WorkloadID: "backend1",
119+
WorkloadName: "Backend 1",
120+
BaseURL: "http://backend1:8080",
121+
HealthStatus: vmcp.BackendDegraded,
122+
},
123+
},
124+
Resources: make(map[string]*vmcp.BackendTarget),
125+
Prompts: make(map[string]*vmcp.BackendTarget),
126+
},
127+
toolName: "test_tool",
128+
expectedID: "backend1",
129+
expectError: false,
130+
},
76131
}
77132

78133
for _, tt := range tests {
@@ -126,6 +181,7 @@ func TestDefaultRouter_RouteResource(t *testing.T) {
126181
WorkloadID: "backend2",
127182
WorkloadName: "Backend 2",
128183
BaseURL: "http://backend2:8080",
184+
HealthStatus: vmcp.BackendHealthy,
129185
},
130186
},
131187
Prompts: make(map[string]*vmcp.BackendTarget),
@@ -163,6 +219,42 @@ func TestDefaultRouter_RouteResource(t *testing.T) {
163219
expectError: true,
164220
errorContains: "routing table resources map not initialized",
165221
},
222+
{
223+
name: "backend is unhealthy",
224+
setupTable: &vmcp.RoutingTable{
225+
Tools: make(map[string]*vmcp.BackendTarget),
226+
Resources: map[string]*vmcp.BackendTarget{
227+
"file:///path/to/resource": {
228+
WorkloadID: "backend2",
229+
WorkloadName: "Backend 2",
230+
BaseURL: "http://backend2:8080",
231+
HealthStatus: vmcp.BackendUnhealthy,
232+
},
233+
},
234+
Prompts: make(map[string]*vmcp.BackendTarget),
235+
},
236+
uri: "file:///path/to/resource",
237+
expectError: true,
238+
errorContains: "backend unavailable",
239+
},
240+
{
241+
name: "backend is degraded but still works",
242+
setupTable: &vmcp.RoutingTable{
243+
Tools: make(map[string]*vmcp.BackendTarget),
244+
Resources: map[string]*vmcp.BackendTarget{
245+
"file:///path/to/resource": {
246+
WorkloadID: "backend2",
247+
WorkloadName: "Backend 2",
248+
BaseURL: "http://backend2:8080",
249+
HealthStatus: vmcp.BackendDegraded,
250+
},
251+
},
252+
Prompts: make(map[string]*vmcp.BackendTarget),
253+
},
254+
uri: "file:///path/to/resource",
255+
expectedID: "backend2",
256+
expectError: false,
257+
},
166258
}
167259

168260
for _, tt := range tests {
@@ -217,6 +309,7 @@ func TestDefaultRouter_RoutePrompt(t *testing.T) {
217309
WorkloadID: "backend3",
218310
WorkloadName: "Backend 3",
219311
BaseURL: "http://backend3:8080",
312+
HealthStatus: vmcp.BackendHealthy,
220313
},
221314
},
222315
},
@@ -253,6 +346,42 @@ func TestDefaultRouter_RoutePrompt(t *testing.T) {
253346
expectError: true,
254347
errorContains: "routing table prompts map not initialized",
255348
},
349+
{
350+
name: "backend is unhealthy",
351+
setupTable: &vmcp.RoutingTable{
352+
Tools: make(map[string]*vmcp.BackendTarget),
353+
Resources: make(map[string]*vmcp.BackendTarget),
354+
Prompts: map[string]*vmcp.BackendTarget{
355+
"greeting": {
356+
WorkloadID: "backend3",
357+
WorkloadName: "Backend 3",
358+
BaseURL: "http://backend3:8080",
359+
HealthStatus: vmcp.BackendUnhealthy,
360+
},
361+
},
362+
},
363+
promptName: "greeting",
364+
expectError: true,
365+
errorContains: "backend unavailable",
366+
},
367+
{
368+
name: "backend is degraded but still works",
369+
setupTable: &vmcp.RoutingTable{
370+
Tools: make(map[string]*vmcp.BackendTarget),
371+
Resources: make(map[string]*vmcp.BackendTarget),
372+
Prompts: map[string]*vmcp.BackendTarget{
373+
"greeting": {
374+
WorkloadID: "backend3",
375+
WorkloadName: "Backend 3",
376+
BaseURL: "http://backend3:8080",
377+
HealthStatus: vmcp.BackendDegraded,
378+
},
379+
},
380+
},
381+
promptName: "greeting",
382+
expectedID: "backend3",
383+
expectError: false,
384+
},
256385
}
257386

258387
for _, tt := range tests {
@@ -292,14 +421,14 @@ func TestDefaultRouter_ConcurrentAccess(t *testing.T) {
292421
// Setup routing table
293422
table := &vmcp.RoutingTable{
294423
Tools: map[string]*vmcp.BackendTarget{
295-
"tool1": {WorkloadID: "backend1"},
296-
"tool2": {WorkloadID: "backend2"},
424+
"tool1": {WorkloadID: "backend1", HealthStatus: vmcp.BackendHealthy},
425+
"tool2": {WorkloadID: "backend2", HealthStatus: vmcp.BackendHealthy},
297426
},
298427
Resources: map[string]*vmcp.BackendTarget{
299-
"res1": {WorkloadID: "backend1"},
428+
"res1": {WorkloadID: "backend1", HealthStatus: vmcp.BackendHealthy},
300429
},
301430
Prompts: map[string]*vmcp.BackendTarget{
302-
"prompt1": {WorkloadID: "backend2"},
431+
"prompt1": {WorkloadID: "backend2", HealthStatus: vmcp.BackendHealthy},
303432
},
304433
}
305434

0 commit comments

Comments
 (0)