Skip to content

Commit

Permalink
separate watcher struct
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Nov 19, 2024
1 parent 0d031aa commit 8ed2a3a
Showing 1 changed file with 40 additions and 21 deletions.
61 changes: 40 additions & 21 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,45 @@ func newListenerWatcher() *listenerWatcher {
return &listenerWatcher{updateCh: testutils.NewChannel()}
}

func newListenerWatcherWithSize(size int) *listenerWatcher {
return &listenerWatcher{updateCh: testutils.NewChannelWithSize(size)}
}

func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
onDone()
}

func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
if len(lw.updateCh.C) == 1 {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
lw.updateCh.Replace(listenerUpdateErrTuple{err: err})
} else {
lw.updateCh.Send(listenerUpdateErrTuple{err: err})
}
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
lw.updateCh.Replace(listenerUpdateErrTuple{err: err})
onDone()
}

func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
if len(lw.updateCh.C) == 1 {
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
} else {
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
}
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

type listenerWatcherMultiple struct {
updateCh *testutils.Channel
}

func newListenerWatcherMultiple(size int) *listenerWatcherMultiple {
return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)}
}

func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
onDone()
}

func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{err: err})
onDone()
}

func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

Expand Down Expand Up @@ -949,7 +960,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {

// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is
// registered for a resource which is already present in the cache with an old
// good update as well latest NACK error. The test verifies that new watcher
// good update as well as latest NACK error. The test verifies that new watcher
// receives both good update and error without a new resource request being
// sent to the management server.
func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
Expand All @@ -970,7 +981,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {

nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)

// Create an xDS client with the above bootstrap contents.
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Expand Down Expand Up @@ -1032,7 +1042,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {

// Register another watch for the same resource. This should get the update
// and error from the cache.
lw2 := newListenerWatcherWithSize(2)
lw2 := newListenerWatcherMultiple(2)
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
Expand All @@ -1046,6 +1056,15 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}
// No request should get sent out as part of this watch.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-secondRequestReceived.Done():
t.Fatal("xdsClient sent out request instead of using update from cache")
default:
}
}

// TestLDSWatch_PartialValid covers the case where a response from the
Expand Down

0 comments on commit 8ed2a3a

Please sign in to comment.