Skip to content

Commit

Permalink
test to verify both update and error are sent
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Nov 17, 2024
1 parent c007e8a commit a3c89bb
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
8 changes: 7 additions & 1 deletion xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,19 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// Always add the new watcher to the set of watchers.
state.watchers[watcher] = true

// If we have a cached copy of the resource, notify the new watcher.
// If we have a cached copy of the resource, notify the new watcher
// immediately.
if state.cache != nil {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
// If last update was NACK'd, notify the new watcher of error
// immediately as well.
if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil {
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
}
}
cleanup = a.unwatchResource(rType, resourceName, watcher)
}, func() {
Expand Down
70 changes: 32 additions & 38 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,33 @@ func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc)
onDone()
}

type listenerWatcherMultiple struct {
updateCh *testutils.Channel
}

func newListenerWatcherMultiple() *listenerWatcherMultiple {
return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(2)}
}

func (cw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
onDone()
}

func (cw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
cw.updateCh.Send(listenerUpdateErrTuple{err: err})
onDone()
}

func (cw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
cw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

// badListenerResource returns a listener resource for the given name which does
// not contain the `RouteSpecifier` field in the HTTPConnectionManager, and
// hence is expected to be NACKed by the client.
Expand Down Expand Up @@ -547,7 +574,7 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
// a resource which is already present in the cache. The test verifies that the
// watch callback is invoked with the contents from the cache, instead of a
// request being sent to the management server.
func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
func TestLDSWatch_ResourceCaching(t *testing.T) {
firstRequestReceived := false
firstAckReceived := grpcsync.NewEvent()
secondRequestReceived := grpcsync.NewEvent()
Expand Down Expand Up @@ -926,28 +953,9 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
// good update and latest NACK error. The test verifies that new watcher
// receives both good update and error without request being sent to the
// management server.
func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
firstRequestReceived := false
firstAckReceived := grpcsync.NewEvent()
secondRequestReceived := grpcsync.NewEvent()
func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {

mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
// The first request has an empty version string.
if !firstRequestReceived && req.GetVersionInfo() == "" {
firstRequestReceived = true
return nil
}
// The first ack has a non-empty version string.
if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" {
firstAckReceived.Fire()
return nil
}
// Any requests after the first request and ack, are not expected.
secondRequestReceived.Fire()
return nil
},
})
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
Expand Down Expand Up @@ -975,7 +983,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), 1000000*defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
Expand All @@ -990,11 +998,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("timeout when waiting for receipt of ACK at the management server")
case <-firstAckReceived.Done():
}

// Configure the management server to return a single listener resource
// which is expected to be NACKed by the client.
Expand All @@ -1018,7 +1021,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {

// Register another watch for the same resource. This should get the update
// and error from the cache.
lw2 := newListenerWatcher()
lw2 := newListenerWatcherMultiple()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
Expand All @@ -1032,15 +1035,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
}

// No request should get sent out as part of this watch.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-secondRequestReceived.Done():
t.Fatal("xdsClient sent out request instead of using update from cache")
}
}

// TestLDSWatch_PartialValid covers the case where a response from the
Expand Down

0 comments on commit a3c89bb

Please sign in to comment.