Skip to content

Commit 6c7a557

Browse files
alandiegosantosAlan Diego
andauthored
Fix panic in case the resourceType is not know by the caches (#1036)
* Fix panic in case the resourceType is not know by the caches Signed-off-by: Alan Diego dos Santos <[email protected]> Signed-off-by: Alan Diego <[email protected]> Co-authored-by: Alan Diego <[email protected]>
1 parent b9e8c8a commit 6c7a557

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

pkg/server/sotw/v3/xds.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,13 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
149149
sw.watches.recompute(s.ctx, reqCh)
150150
default:
151151
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
152-
if !ok {
152+
// nil is used to close the streams in the caches
153+
if value.IsNil() || !ok {
153154
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
154155
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
155156
}
156157

158+
// If a non cache.Response arrived here, there are serious issues
157159
res := value.Interface().(cache.Response)
158160
nonce, err := sw.send(res)
159161
if err != nil {

pkg/server/v3/server_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"fmt"
2121
"reflect"
22+
"strings"
2223
"sync"
2324
"testing"
2425
"time"
@@ -52,6 +53,11 @@ type mockConfigWatcher struct {
5253

5354
func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ stream.StreamState, out chan cache.Response) func() {
5455
config.counts[req.GetTypeUrl()] = config.counts[req.GetTypeUrl()] + 1
56+
57+
if strings.Contains(req.GetTypeUrl(), nilType) {
58+
out <- nil
59+
}
60+
5561
if len(config.responses[req.GetTypeUrl()]) > 0 {
5662
out <- config.responses[req.GetTypeUrl()][0]
5763
config.responses[req.GetTypeUrl()] = config.responses[req.GetTypeUrl()][1:]
@@ -163,6 +169,7 @@ var (
163169
extensionConfig = resource.MakeExtensionConfig(resource.Ads, extensionConfigName, routeName)
164170
opaque = &core.Address{}
165171
opaqueType = "unknown-type"
172+
nilType = "nil-stream-type" // This type will force the close of the connection
166173
testTypes = []string{
167174
rsrc.EndpointType,
168175
rsrc.ClusterType,
@@ -659,6 +666,24 @@ func TestOpaqueRequestsChannelMuxing(t *testing.T) {
659666
assert.Equal(t, 0, config.watches)
660667
}
661668

669+
func TestNilPropagationOverResponseChannelShouldCloseTheStream(t *testing.T) {
670+
config := makeMockConfigWatcher()
671+
resp := makeMockStream(t)
672+
for i := 0; i < 10; i++ {
673+
resp.recv <- &discovery.DiscoveryRequest{
674+
Node: node,
675+
TypeUrl: nilType,
676+
// each subsequent request is assumed to supercede the previous request
677+
ResourceNames: []string{fmt.Sprintf("%d", i)},
678+
}
679+
}
680+
close(resp.recv)
681+
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
682+
err := s.StreamAggregatedResources(resp)
683+
require.Error(t, err)
684+
assert.Equal(t, 0, config.watches)
685+
}
686+
662687
func TestCallbackError(t *testing.T) {
663688
for _, typ := range testTypes {
664689
t.Run(typ, func(t *testing.T) {

0 commit comments

Comments
 (0)