Skip to content

Commit 734f9e6

Browse files
Provide better snapshot interface
1 parent 239405c commit 734f9e6

File tree

16 files changed

+580
-396
lines changed

16 files changed

+580
-396
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package internal
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/hex"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
11+
12+
"google.golang.org/protobuf/proto"
13+
"google.golang.org/protobuf/types/known/anypb"
14+
"google.golang.org/protobuf/types/known/durationpb"
15+
)
16+
17+
// Resource is the base interface for the xDS payload.
18+
type Resource interface {
19+
proto.Message
20+
}
21+
22+
// ResourceWithTTL is a Resource with an optional TTL.
23+
type ResourceWithTTL struct {
24+
Resource Resource
25+
TTL *time.Duration
26+
}
27+
28+
// CachedResource is used to track resources added by the user in the cache.
29+
// It contains the resource itself and its associated version (currently in two different modes).
30+
// It should not be altered once created, to allow concurrent access.
31+
type CachedResource struct {
32+
Name string
33+
typeURL string
34+
35+
resource Resource
36+
ttl *time.Duration
37+
38+
// cacheVersion is the version of the cache at the time of last update, used in sotw.
39+
cacheVersion string
40+
41+
marshalFunc func() ([]byte, error)
42+
computeResourceVersionFunc func() (string, error)
43+
}
44+
45+
type CachedResourceOption = func(*CachedResource)
46+
47+
// WithCacheVersion allows specifying the cacheVersion when the resource is set.
48+
func WithCacheVersion(version string) CachedResourceOption {
49+
return func(r *CachedResource) { r.cacheVersion = version }
50+
}
51+
52+
// WithMarshaledResource enables the user to provide the already marshaled bytes if they have them.
53+
// Those bytes should strive at being consistent if the object has not changed (beware protobuf non-deterministic marshaling)
54+
// or alternatively the resource version should also then be set.
55+
// By default it is computed by performing a deterministic protobuf marshaling.
56+
func WithMarshaledResource(bytes []byte) CachedResourceOption {
57+
if len(bytes) == 0 {
58+
return func(*CachedResource) {}
59+
}
60+
return func(r *CachedResource) { r.marshalFunc = func() ([]byte, error) { return bytes, nil } }
61+
}
62+
63+
// WithResourceVersion enables the user to provide the resource version to be used.
64+
// This version should be constant if the object has not changed to avoid needlessly sending resources to clients.
65+
// By default it is computed by hashing the serialized version of the resource.
66+
func WithResourceVersion(version string) CachedResourceOption {
67+
if version == "" {
68+
return func(*CachedResource) {}
69+
}
70+
return func(r *CachedResource) { r.computeResourceVersionFunc = func() (string, error) { return version, nil } }
71+
}
72+
73+
// WithResourceTTL sets a TTL on the resource, that will be sent to the client with the payload.
74+
func WithResourceTTL(ttl *time.Duration) CachedResourceOption {
75+
return func(r *CachedResource) { r.ttl = ttl }
76+
}
77+
78+
func NewCachedResource(name, typeURL string, res Resource, opts ...CachedResourceOption) *CachedResource {
79+
cachedRes := &CachedResource{
80+
Name: name,
81+
typeURL: typeURL,
82+
resource: res,
83+
}
84+
for _, opt := range opts {
85+
opt(cachedRes)
86+
}
87+
if cachedRes.marshalFunc == nil {
88+
cachedRes.marshalFunc = sync.OnceValues(func() ([]byte, error) {
89+
return marshalResource(res)
90+
})
91+
}
92+
if cachedRes.computeResourceVersionFunc == nil {
93+
cachedRes.computeResourceVersionFunc = sync.OnceValues(func() (string, error) {
94+
marshaled, err := cachedRes.marshalFunc()
95+
if err != nil {
96+
return "", fmt.Errorf("marshaling resource: %w", err)
97+
}
98+
return hashResource(marshaled), nil
99+
})
100+
}
101+
return cachedRes
102+
}
103+
104+
// SetCacheVersion updates the cache version. This violates the assumption that all fields can be safely read concurrently.
105+
// It's required today for the linear cache constructor, where we are guaranteed resources are not being used concurently,
106+
// and should not be used elsewhere.
107+
func (c *CachedResource) SetCacheVersion(version string) {
108+
c.cacheVersion = version
109+
}
110+
111+
// HasTTL returns whether the resource has a TTL set.
112+
func (c *CachedResource) HasTTL() bool {
113+
return c.ttl != nil
114+
}
115+
116+
// getMarshaledResource lazily marshals the resource and returns the bytes.
117+
func (c *CachedResource) getMarshaledResource() ([]byte, error) {
118+
return c.marshalFunc()
119+
}
120+
121+
// GetResourceVersion returns a stable version reflecting the resource content.
122+
// By default it is built by hashing the serialized version of the object, using deterministic serializing.
123+
func (c *CachedResource) GetResourceVersion() (string, error) {
124+
return c.computeResourceVersionFunc()
125+
}
126+
127+
// GetVersion returns the version for the resource.
128+
// By default it returns the cache version when the resource was added, but if requested it will return a
129+
// version specific to the resource content.
130+
func (c *CachedResource) GetVersion(useResourceVersion bool) (string, error) {
131+
if !useResourceVersion {
132+
return c.cacheVersion, nil
133+
}
134+
135+
return c.GetResourceVersion()
136+
}
137+
138+
// GetRawResource returns the underlying resource for use in legacy accessors.
139+
func (c *CachedResource) GetRawResource() ResourceWithTTL {
140+
return ResourceWithTTL{
141+
Resource: c.resource,
142+
TTL: c.ttl,
143+
}
144+
}
145+
146+
var deltaResourceTypeURL = "type.googleapis.com/" + string(proto.MessageName(&discovery.Resource{}))
147+
148+
// getResourceVersion lazily hashes the resource and returns the stable hash used to track version changes.
149+
func (c *CachedResource) GetSotwResource(isHeartbeat bool) (*anypb.Any, error) {
150+
buildResource := func() (*anypb.Any, error) {
151+
marshaled, err := c.getMarshaledResource()
152+
if err != nil {
153+
return nil, fmt.Errorf("marshaling: %w", err)
154+
}
155+
return &anypb.Any{
156+
TypeUrl: c.typeURL,
157+
Value: marshaled,
158+
}, nil
159+
}
160+
161+
if c.ttl == nil {
162+
return buildResource()
163+
}
164+
165+
wrappedResource := &discovery.Resource{
166+
Name: c.Name,
167+
Ttl: durationpb.New(*c.ttl),
168+
}
169+
170+
if !isHeartbeat {
171+
rsrc, err := buildResource()
172+
if err != nil {
173+
return nil, err
174+
}
175+
wrappedResource.Resource = rsrc
176+
}
177+
178+
marshaled, err := marshalResource(wrappedResource)
179+
if err != nil {
180+
return nil, fmt.Errorf("marshaling discovery resource: %w", err)
181+
}
182+
183+
return &anypb.Any{
184+
TypeUrl: deltaResourceTypeURL,
185+
Value: marshaled,
186+
}, nil
187+
}
188+
189+
// getResourceVersion lazily hashes the resource and returns the stable hash used to track version changes.
190+
func (c *CachedResource) GetDeltaResource() (*discovery.Resource, error) {
191+
marshaled, err := c.getMarshaledResource()
192+
if err != nil {
193+
return nil, fmt.Errorf("marshaling: %w", err)
194+
}
195+
version, err := c.GetResourceVersion()
196+
if err != nil {
197+
return nil, fmt.Errorf("computing version: %w", err)
198+
}
199+
return &discovery.Resource{
200+
Name: c.Name,
201+
Resource: &anypb.Any{
202+
TypeUrl: c.typeURL,
203+
Value: marshaled,
204+
},
205+
Version: version,
206+
}, nil
207+
}
208+
209+
// hashResource will take a resource and create a SHA256 hash sum out of the marshaled bytes
210+
func hashResource(resource []byte) string {
211+
hasher := sha256.New()
212+
hasher.Write(resource)
213+
return hex.EncodeToString(hasher.Sum(nil))
214+
}
215+
216+
// marshalResource converts the Resource to MarshaledResource.
217+
func marshalResource(resource Resource) ([]byte, error) {
218+
return proto.MarshalOptions{Deterministic: true}.Marshal(resource)
219+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package internal

pkg/cache/types/snapshot.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package types //nolint:revive // var-naming: avoid meaningless package names
2+
3+
import (
4+
"time"
5+
6+
"github.com/envoyproxy/go-control-plane/pkg/cache/internal"
7+
8+
"google.golang.org/protobuf/types/known/anypb"
9+
)
10+
11+
// SnapshotResource represents a resource to be provided to caches.
12+
type SnapshotResource struct {
13+
// Mandatory
14+
Name string
15+
// Mandatory
16+
Resource Resource
17+
// Optional
18+
TTL *time.Duration
19+
20+
// Optional
21+
Serialized *anypb.Any
22+
// Optional
23+
Version string
24+
}
25+
26+
func (r *SnapshotResource) asCachedResource(typeURL, cacheVersion string) *internal.CachedResource {
27+
var serialized []byte
28+
if r.Serialized != nil {
29+
serialized = r.Serialized.Value
30+
}
31+
return internal.NewCachedResource(r.Name, typeURL, r.Resource,
32+
internal.WithCacheVersion(cacheVersion),
33+
internal.WithResourceTTL(r.TTL),
34+
internal.WithResourceVersion(r.Version),
35+
internal.WithMarshaledResource(serialized))
36+
}
37+
38+
// TypeSnapshot represents the resources for a given type, associated with an opaque version.
39+
// The snapshot (and associated resources) must not be modified once provided to the cache.
40+
// A given TypeSnapshot instance can be provided in multiple snapshots, including for multiple node ids, as long as it remains immutable.
41+
type TypeSnapshot struct {
42+
typeURL string
43+
version string
44+
resources map[string]*internal.CachedResource
45+
}
46+
47+
func NewTypeSnapshot(typeURL, version string, resources []SnapshotResource) TypeSnapshot {
48+
s := TypeSnapshot{
49+
typeURL: typeURL,
50+
version: version,
51+
resources: make(map[string]*internal.CachedResource, len(resources)),
52+
}
53+
for _, res := range resources {
54+
s.resources[res.Name] = res.asCachedResource(typeURL, version)
55+
}
56+
return s
57+
}
58+
59+
// GetVersion returns the version of the snapshot.
60+
// Multiple TypeSnapshots in a given Snapshot can have different versions.
61+
func (s TypeSnapshot) GetVersion() string {
62+
return s.version
63+
}
64+
65+
// GetResources returns the resources in the snapshot.
66+
// The map must not be modified by the caller.
67+
func (s TypeSnapshot) GetResources() map[string]*internal.CachedResource {
68+
return s.resources
69+
}
70+
71+
// Snapshot represents a consistent set of resources for multiple types.
72+
// Once provided to a cache it should not be altered in any way.
73+
type Snapshot struct {
74+
// defaultVersion is the negative version returned if there is no snapshot set for the provided type.
75+
defaultVersion string
76+
resources map[string]TypeSnapshot
77+
}
78+
79+
// NewSnapshot creates a snapshot with a single version for all resource types.
80+
func NewSnapshot(version string, resources map[string][]SnapshotResource) (*Snapshot, error) {
81+
s := &Snapshot{
82+
defaultVersion: version,
83+
resources: make(map[string]TypeSnapshot, len(resources)),
84+
}
85+
for typeURL, res := range resources {
86+
s.resources[typeURL] = NewTypeSnapshot(typeURL, version, res)
87+
}
88+
return s, nil
89+
}
90+
91+
// NewSnapshotFromTypeSnapshots creates a snapshot from per-type snapshots.
92+
// TypeSnapshot instances can be shared across snapshots, but must not be altered in any way once provided to at least one snapshot.
93+
func NewSnapshotFromTypeSnapshots(version string, snapshots []TypeSnapshot) (*Snapshot, error) {
94+
s := &Snapshot{
95+
defaultVersion: version,
96+
resources: make(map[string]TypeSnapshot, len(snapshots)),
97+
}
98+
for _, snap := range snapshots {
99+
s.resources[snap.typeURL] = snap
100+
}
101+
return s, nil
102+
}
103+
104+
// GetVersion returns the current version of the resource indicated by typeURL.
105+
// The version string that is returned is opaque and should only be compared for equality.
106+
func (s *Snapshot) GetVersion(typeURL string) string {
107+
typeSnapshot, ok := s.resources[typeURL]
108+
if !ok {
109+
return s.defaultVersion
110+
}
111+
return typeSnapshot.version
112+
}
113+
114+
func (s *Snapshot) GetTypeSnapshot(typeURL string) TypeSnapshot {
115+
return s.resources[typeURL]
116+
}

pkg/cache/types/types.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,23 @@
11
package types //nolint:revive // var-naming: avoid meaningless package names
22

33
import (
4-
"time"
4+
"github.com/envoyproxy/go-control-plane/pkg/cache/internal"
55

66
"google.golang.org/protobuf/proto"
77
)
88

99
// Resource is the base interface for the xDS payload.
10-
type Resource interface {
11-
proto.Message
12-
}
10+
type Resource = internal.Resource
1311

1412
// ResourceWithTTL is a Resource with an optional TTL.
15-
type ResourceWithTTL struct {
16-
Resource Resource
17-
TTL *time.Duration
18-
}
13+
type ResourceWithTTL = internal.ResourceWithTTL
1914

2015
// ResourceWithName provides a name for out-of-tree resources.
2116
type ResourceWithName interface {
2217
proto.Message
2318
GetName() string
2419
}
2520

26-
// MarshaledResource is an alias for the serialized binary array.
27-
type MarshaledResource = []byte
28-
2921
// SkipFetchError is the error returned when the cache fetch is short
3022
// circuited due to the client's version already being up-to-date.
3123
type SkipFetchError struct{}

0 commit comments

Comments
 (0)