Skip to content

Commit

Permalink
linear: fix test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Fu-Sheng <[email protected]>
  • Loading branch information
fscnick committed Sep 4, 2022
1 parent 342cb6f commit 54bb6cd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ func TestLinearMixedWatches(t *testing.T) {
assert.Equal(t, 2, c.NumResources())

sotwState := stream.NewStreamState(false, nil)
sotwState.SetKnownResourceNamesAsList(testType, []string{"a", "b"})
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w)
mustBlock(t, w)
Expand All @@ -754,7 +755,7 @@ func TestLinearMixedWatches(t *testing.T) {
err = c.UpdateResources(map[string]types.Resource{"a": a}, nil)
assert.NoError(t, err)
// This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation
verifyResponse(t, w, c.getVersion(), 1)
verifyResponse(t, w, c.getVersion(), 2)
checkVersionMapNotSet(t, c)

c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w)
Expand All @@ -775,6 +776,6 @@ func TestLinearMixedWatches(t *testing.T) {
assert.NoError(t, err)
checkVersionMapSet(t, c)

verifyResponse(t, w, c.getVersion(), 0)
verifyResponse(t, w, c.getVersion(), 1)
verifyDeltaResponse(t, wd, nil, []string{"b"})
}
12 changes: 12 additions & 0 deletions pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type StreamState struct { // nolint:golint,revive

// indicates whether the object has been modified since its creation
first bool

mu *sync.RWMutex
}

// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to
Expand Down Expand Up @@ -95,10 +97,16 @@ func (s *StreamState) IsWildcard() bool {
}

func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) {
s.mu.Lock()
defer s.mu.Unlock()

s.knownResourceNames[url] = names
}

func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) {
s.mu.Lock()
defer s.mu.Unlock()

m := map[string]struct{}{}
for _, name := range names {
m[name] = struct{}{}
Expand All @@ -107,6 +115,9 @@ func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) {
}

func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()

return s.knownResourceNames[url]
}

Expand All @@ -118,6 +129,7 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
mu: &sync.RWMutex{},
}

if initialResourceVersions == nil {
Expand Down

0 comments on commit 54bb6cd

Please sign in to comment.