Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: fix new watcher to get both old good update and nack error (if exist) from the cache #7851

Merged
merged 10 commits into from
Nov 21, 2024
11 changes: 10 additions & 1 deletion xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,23 @@
// Always add the new watcher to the set of watchers.
state.watchers[watcher] = true

// If we have a cached copy of the resource, notify the new watcher.
// If we have a cached copy of the resource, notify the new watcher
// immediately.
if state.cache != nil {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
}
// If last update was NACK'd, notify the new watcher of error
easwars marked this conversation as resolved.
Show resolved Hide resolved
// immediately as well.
if state.md.Status == xdsresource.ServiceStatusNACKed {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}

Check warning on line 650 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L649-L650

Added lines #L649 - L650 were not covered by tests
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
// If the metadata field is updated to indicate that the management
// server does not have this resource, notify the new watcher.
if state.md.Status == xdsresource.ServiceStatusNotExist {
Expand Down
198 changes: 177 additions & 21 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,47 @@ func newListenerWatcher() *listenerWatcher {
return &listenerWatcher{updateCh: testutils.NewChannel()}
}

func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
onDone()
}

func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here and in OnResourceDoesNotExist() simplifies tests which will have
// access to the most recently received error.
cw.updateCh.Replace(listenerUpdateErrTuple{err: err})
lw.updateCh.Replace(listenerUpdateErrTuple{err: err})
onDone()
}

func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

type listenerWatcherMultiple struct {
updateCh *testutils.Channel
}

// TODO: delete this once `newListenerWatcher` is modified to handle multiple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please link the issue here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// updates (https://github.com/grpc/grpc-go/issues/7864).
func newListenerWatcherMultiple(size int) *listenerWatcherMultiple {
return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)}
}

func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
onDone()
}

func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{err: err})
onDone()
}

func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

Expand Down Expand Up @@ -155,6 +180,18 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want
return nil
}

func verifyUnknownListenerError(ctx context.Context, updateCh *testutils.Channel, wantErr string) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr)
}
return nil
}

// TestLDSWatch covers the case where a single watcher exists for a single
// listener resource. The test verifies the following scenarios:
// 1. An update from the management server containing the resource being
Expand Down Expand Up @@ -953,8 +990,9 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) {
}

// TestLDSWatch_NACKError covers the case where an update from the management
// server is NACK'ed by the xdsclient. The test verifies that the error is
// propagated to the watcher.
// server is NACKed by the xdsclient. The test verifies that the error is
// propagated to the existing watcher. After NACK, if a new watcher registers
// for the resource, error is propagated to the new watcher as well.
func (s) TestLDSWatch_NACKError(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

Expand Down Expand Up @@ -992,19 +1030,141 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
}

// Verify that the expected error is propagated to the watcher.
u, err := lw.updateCh.Receive(ctx)
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// Verify that the expected error is propagated to the new watcher as well.
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}
}

// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is
// registered for a resource which is already present in the cache with an old
// good update as well as latest NACK error. The test verifies that new watcher
// receives both good update and error without a new resource request being
// sent to the management server.
easwars marked this conversation as resolved.
Show resolved Hide resolved
func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
firstRequestReceived := false
firstAckReceived := grpcsync.NewEvent()
secondAckReceived := grpcsync.NewEvent()
secondRequestReceived := grpcsync.NewEvent()

mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
// The first request has an empty version string.
if !firstRequestReceived && req.GetVersionInfo() == "" {
firstRequestReceived = true
return nil
}
// The first ack has a non-empty version string.
if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" {
firstAckReceived.Fire()
return nil
}
// The second ack has a non-empty version string.
if !secondAckReceived.HasFired() && req.GetVersionInfo() != "" {
secondAckReceived.Fire()
return nil
}
// Any requests after the first request and two acks, are not expected.
secondRequestReceived.Fire()
return nil
},
})

nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

// Create an xDS client with the above bootstrap contents.
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Name: t.Name(),
Contents: bc,
})
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
t.Fatalf("Failed to create xDS client: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
defer close()

// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
defer ldsCancel1()

// Configure the management server to return a single listener
easwars marked this conversation as resolved.
Show resolved Hide resolved
// resource, corresponding to the one we registered a watch for.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), 1000*defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Verify the contents of the received update.
easwars marked this conversation as resolved.
Show resolved Hide resolved
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Configure the management server to return a single listener resource
// which is expected to be NACKed by the client.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{badListenerResource(t, ldsName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// Register another watch for the same resource. This should get the update
// and error from the cache.
lw2 := newListenerWatcherMultiple(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this new listener watcher type? Why can't we handle this case with the existing listenerWatcher type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current listenerWatcher has channel size of 1 and notifications gets replaced. For this fix we need both good update and error as 2 different notifications to be verified so we need a channel with buffer size > 1. But yeah we don't need a new listenerWatcher struct, we can just modify the current one to have another constructor which accept the size and update OnError to not replace if size > 1, which is what I did.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah the way i used existing listenerWatcher struct, it was missing resource update during race test. I have added the separate struct back for handling variable size update channel and the race went away. Didn't get a chance to fully debug why it was happening. But may be its fine to have separate struct to hold multiple updates?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct way would be to change the listenerWatcher to have multiple channels: one each for update, error and resource not found. That way one callback will not interfere with another callback. But this change would touch a lot of tests.

I wanted to do this change when I was working on some of the refactors recently, but never got around to doing that. I would recommend making that change in a separate PR though. What do you think?

Copy link
Contributor Author

@purnesh42H purnesh42H Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah i think i can send a separate PR for that. The idea of having 3 channels for each callback is a good idea. Should this fix be blocked for that though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be Ok if we create an issue for the same and add a TODO in here to remove this new listener watcher type once that issue is taken care of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed an issue #7864. It should be simple as well. Added TODO for the new watcher in this PR.

ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw2.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// No request should get sent out as part of this watch.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-secondRequestReceived.Done():
t.Fatal("xdsClient sent out request instead of using update from cache")
default:
}
}

// TestLDSWatch_PartialValid covers the case where a response from the
// management server contains both valid and invalid resources and is expected
// to be NACK'ed by the xdsclient. The test verifies that watchers corresponding
// to be NACKed by the xdsclient. The test verifies that watchers corresponding
// to the valid resource receive the update, while watchers corresponding to the
// invalid resource receive an error.
func (s) TestLDSWatch_PartialValid(t *testing.T) {
Expand Down Expand Up @@ -1071,13 +1231,9 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {

// Verify that the expected error is propagated to the watcher which
// requested for the bad resource.
u, err := lw1.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
// Verify that the expected error is propagated to the existing watcher.
if err := verifyUnknownListenerError(ctx, lw1.updateCh, wantListenerNACKErr); err != nil {
t.Fatal(err)
}

// Verify that the watcher watching the good resource receives a good
Expand Down