diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index 11e7991e0f..e874cced01 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -36,6 +36,7 @@ import ( testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/test" + "github.com/authzed/spicedb/pkg/genutil/mapz" "github.com/authzed/spicedb/pkg/migrate" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/tuple" @@ -74,6 +75,29 @@ func TestCRDBDatastoreWithoutIntegrity(t *testing.T) { return ds, nil }), false) + + t.Run("TestWatchStreaming", createDatastoreTest( + b, + StreamingWatchTest, + RevisionQuantization(0), + GCWindow(veryLargeGCWindow), + )) +} + +type datastoreTestFunc func(t *testing.T, ds datastore.Datastore) + +func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := NewCRDBDatastore(ctx, uri, options...) + require.NoError(t, err) + return ds + }) + defer ds.Close() + + tf(t, ds) + } } func TestCRDBDatastoreWithFollowerReads(t *testing.T) { @@ -593,3 +617,97 @@ func RelationshipIntegrityWatchTest(t *testing.T, tester test.DatastoreTester) { require.Fail("Timed out") } } + +func StreamingWatchTest(t *testing.T, rawDS datastore.Datastore) { + require := require.New(t) + + ds, rev := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, ` + caveat somecaveat(somecondition int) { + somecondition == 42 + } + + caveat somecaveat2(somecondition int) { + somecondition == 42 + } + + definition user {} + + definition user2 {} + + definition resource { + relation viewer: user + } + + definition resource2 { + relation viewer: user2 + } + `, []tuple.Relationship{ + tuple.MustParse("resource:foo#viewer@user:tom"), + tuple.MustParse("resource:foo#viewer@user:fred"), + }, require) + ctx := context.Background() + + // Touch and delete some relationships, add a namespace and caveat and delete a namespace and caveat. + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + err := rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{ + tuple.Touch(tuple.MustParse("resource:foo#viewer@user:tom")), + tuple.Delete(tuple.MustParse("resource:foo#viewer@user:fred")), + }) + require.NoError(err) + + err = rwt.DeleteNamespaces(ctx, "resource2") + require.NoError(err) + + err = rwt.DeleteCaveats(ctx, []string{"somecaveat2"}) + require.NoError(err) + + err = rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{ + Name: "somenewnamespace", + }) + require.NoError(err) + + err = rwt.WriteCaveats(ctx, []*core.CaveatDefinition{{ + Name: "somenewcaveat", + }}) + require.NoError(err) + + return nil + }) + require.NoError(err) + + // Ensure the watch API returns the integrity information. + opts := datastore.WatchOptions{ + Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints, + WatchBufferLength: 128, + WatchBufferWriteTimeout: 1 * time.Minute, + EmissionStrategy: datastore.EmitImmediatelyStrategy, + } + + expectedChanges := mapz.NewSet[string]() + expectedChanges.Add("DELETE(resource:foo#viewer@user:fred)\n") + expectedChanges.Add("DeletedCaveat: somecaveat2\n") + expectedChanges.Add("DeletedNamespace: resource2\n") + expectedChanges.Add("Definition: *corev1.NamespaceDefinition:somenewnamespace\n") + expectedChanges.Add("Definition: *corev1.CaveatDefinition:somenewcaveat\n") + + changes, errchan := ds.Watch(ctx, rev, opts) + for { + select { + case change, ok := <-changes: + if !ok { + require.Fail("Timed out waiting for WatchDisconnectedError") + } + + debugString := change.DebugString() + require.True(expectedChanges.Has(debugString), "unexpected change: %s", debugString) + expectedChanges.Delete(change.DebugString()) + if expectedChanges.IsEmpty() { + return + } + case err := <-errchan: + require.Failf("Failed waiting for changes with error", "error: %v", err) + case <-time.NewTimer(10 * time.Second).C: + require.Fail("Timed out") + } + } +} diff --git a/internal/datastore/crdb/pool/slqerrors.go b/internal/datastore/crdb/pool/sqlerrors.go similarity index 100% rename from internal/datastore/crdb/pool/slqerrors.go rename to internal/datastore/crdb/pool/sqlerrors.go diff --git a/internal/datastore/memdb/readwrite.go b/internal/datastore/memdb/readwrite.go index 885a406553..4add605c2a 100644 --- a/internal/datastore/memdb/readwrite.go +++ b/internal/datastore/memdb/readwrite.go @@ -312,7 +312,7 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri } if foundRaw == nil { - return fmt.Errorf("unable to find namespace to delete") + return fmt.Errorf("namespace not found") } if err := tx.Delete(tableNamespace, foundRaw); err != nil { diff --git a/internal/datastore/spanner/readwrite.go b/internal/datastore/spanner/readwrite.go index 74411a0829..57cc91dd5b 100644 --- a/internal/datastore/spanner/readwrite.go +++ b/internal/datastore/spanner/readwrite.go @@ -15,6 +15,7 @@ import ( log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" + "github.com/authzed/spicedb/pkg/genutil/mapz" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" @@ -372,7 +373,23 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ... } func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error { + namespaces, err := rwt.LookupNamespacesWithNames(ctx, nsNames) + if err != nil { + return fmt.Errorf(errUnableToDeleteConfig, err) + } + + if len(namespaces) != len(nsNames) { + expectedNamespaceNames := mapz.NewSet[string](nsNames...) + for _, ns := range namespaces { + expectedNamespaceNames.Delete(ns.Definition.Name) + } + + return fmt.Errorf(errUnableToDeleteConfig, fmt.Errorf("namespaces not found: %v", expectedNamespaceNames.AsSlice())) + } + for _, nsName := range nsNames { + // Ensure the namespace exists. + relFilter := &v1.RelationshipFilter{ResourceType: nsName} if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil { return fmt.Errorf(errUnableToDeleteConfig, err) diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index c3fcbf63bf..6505db7bd4 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -68,6 +68,32 @@ type RevisionChanges struct { Metadata *structpb.Struct } +func (rc *RevisionChanges) DebugString() string { + if rc.IsCheckpoint { + return "[checkpoint]" + } + + debugString := "" + + for _, relChange := range rc.RelationshipChanges { + debugString += relChange.DebugString() + "\n" + } + + for _, def := range rc.ChangedDefinitions { + debugString += fmt.Sprintf("Definition: %T:%s\n", def, def.GetName()) + } + + for _, ns := range rc.DeletedNamespaces { + debugString += fmt.Sprintf("DeletedNamespace: %s\n", ns) + } + + for _, caveat := range rc.DeletedCaveats { + debugString += fmt.Sprintf("DeletedCaveat: %s\n", caveat) + } + + return debugString +} + func (rc *RevisionChanges) MarshalZerologObject(e *zerolog.Event) { e.Str("revision", rc.Revision.String()) e.Bool("is-checkpoint", rc.IsCheckpoint) diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 54a898db26..8617c12248 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -113,6 +113,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories, t.Run("TestNamespaceMultiDelete", runner(tester, NamespaceMultiDeleteTest)) t.Run("TestEmptyNamespaceDelete", runner(tester, EmptyNamespaceDeleteTest)) t.Run("TestStableNamespaceReadWrite", runner(tester, StableNamespaceReadWriteTest)) + t.Run("TestNamespaceDeleteInvalidNamespace", runner(tester, NamespaceDeleteInvalidNamespaceTest)) t.Run("TestSimple", runner(tester, SimpleTest)) t.Run("TestObjectIDs", runner(tester, ObjectIDsTest)) diff --git a/pkg/datastore/test/namespace.go b/pkg/datastore/test/namespace.go index 9cca0b6e5d..c605a2eba6 100644 --- a/pkg/datastore/test/namespace.go +++ b/pkg/datastore/test/namespace.go @@ -237,6 +237,47 @@ func EmptyNamespaceDeleteTest(t *testing.T, tester DatastoreTester) { require.True(errors.As(err, &datastore.NamespaceNotFoundError{})) } +// NamespaceDeleteInvalidNamespaceTest tests deleting an invalid namespace in the datastore. +func NamespaceDeleteInvalidNamespaceTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + schemaString := `definition user {} + +definition document { + relation viewer: user +}` + + // Compile namespace to write to the datastore. + compiled, err := compiler.Compile(compiler.InputSchema{ + Source: input.Source("schema"), + SchemaString: schemaString, + }, compiler.AllowUnprefixedObjectType()) + require.NoError(err) + require.Equal(2, len(compiled.OrderedDefinitions)) + + // Write the namespace definition to the datastore. + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ctx := context.Background() + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + err := rwt.WriteCaveats(ctx, compiled.CaveatDefinitions) + if err != nil { + return err + } + + return rwt.WriteNamespaces(ctx, compiled.ObjectDefinitions...) + }) + require.NoError(err) + + // Attempt to delete the invalid namespace. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.DeleteNamespaces(ctx, "invalid") + }) + require.Error(err) + require.ErrorContains(err, "not found") +} + // StableNamespaceReadWriteTest tests writing a namespace to the datastore and reading it back, // ensuring that it does not change in any way and that the deserialized data matches that stored. func StableNamespaceReadWriteTest(t *testing.T, tester DatastoreTester) {