-
Notifications
You must be signed in to change notification settings - Fork 173
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Fix: Finished Configuration API Signed-off-by: LaurenceLiZhixin <[email protected]> * Fix: add configuration validation Signed-off-by: LaurenceLiZhixin <[email protected]> * Fix: fix validation test Signed-off-by: LaurenceLiZhixin <[email protected]> * Fix: remove validation Signed-off-by: LaurenceLiZhixin <[email protected]> * add ut Signed-off-by: LaurenceLiZhixin <[email protected]> * Fix: comment Signed-off-by: LaurenceLiZhixin <[email protected]>
- Loading branch information
1 parent
f0e0931
commit 6f2ae7d
Showing
12 changed files
with
688 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package client | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
|
||
"github.com/pkg/errors" | ||
|
||
pb "github.com/dapr/dapr/pkg/proto/runtime/v1" | ||
) | ||
|
||
type ConfigurationItem struct { | ||
Key string | ||
Value string | ||
Version string | ||
Metadata map[string]string | ||
} | ||
|
||
type ConfigurationOpt func(map[string]string) | ||
|
||
func WithConfigurationMetadata(key, value string) ConfigurationOpt { | ||
return func(m map[string]string) { | ||
m[key] = value | ||
} | ||
} | ||
|
||
func (c *GRPCClient) GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error) { | ||
items, err := c.GetConfigurationItems(ctx, storeName, []string{key}, opts...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(items) == 0 { | ||
return nil, nil | ||
} | ||
return items[0], nil | ||
} | ||
|
||
func (c *GRPCClient) GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) ([]*ConfigurationItem, error) { | ||
metadata := make(map[string]string) | ||
for _, opt := range opts { | ||
opt(metadata) | ||
} | ||
rsp, err := c.protoClient.GetConfigurationAlpha1(ctx, &pb.GetConfigurationRequest{ | ||
StoreName: storeName, | ||
Keys: keys, | ||
Metadata: metadata, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
configItems := make([]*ConfigurationItem, 0) | ||
for _, v := range rsp.Items { | ||
configItems = append(configItems, &ConfigurationItem{ | ||
Key: v.Key, | ||
Value: v.Value, | ||
Version: v.Version, | ||
Metadata: v.Metadata, | ||
}) | ||
} | ||
return configItems, nil | ||
} | ||
|
||
type ConfigurationHandleFunction func(string, []*ConfigurationItem) | ||
|
||
func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) error { | ||
metadata := make(map[string]string) | ||
for _, opt := range opts { | ||
opt(metadata) | ||
} | ||
|
||
client, err := c.protoClient.SubscribeConfigurationAlpha1(ctx, &pb.SubscribeConfigurationRequest{ | ||
StoreName: storeName, | ||
Keys: keys, | ||
Metadata: metadata, | ||
}) | ||
if err != nil { | ||
return errors.Errorf("subscribe configuration failed with error = %s", err) | ||
} | ||
|
||
var subcribeID string | ||
stopCh := make(chan struct{}) | ||
go func() { | ||
for { | ||
rsp, err := client.Recv() | ||
if err == io.EOF || rsp == nil { | ||
// receive goroutine would close if unsubscribe is called | ||
fmt.Println("dapr configuration subscribe finished.") | ||
close(stopCh) | ||
break | ||
} | ||
subcribeID = rsp.Id | ||
configurationItems := make([]*ConfigurationItem, 0) | ||
for _, v := range rsp.Items { | ||
configurationItems = append(configurationItems, &ConfigurationItem{ | ||
Key: v.Key, | ||
Value: v.Value, | ||
Version: v.Version, | ||
Metadata: v.Metadata, | ||
}) | ||
} | ||
handler(rsp.Id, configurationItems) | ||
} | ||
}() | ||
select { | ||
case <-ctx.Done(): | ||
return c.UnsubscribeConfigurationItems(context.Background(), storeName, subcribeID) | ||
case <-stopCh: | ||
return nil | ||
} | ||
} | ||
|
||
func (c *GRPCClient) UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error { | ||
alpha1, err := c.protoClient.UnsubscribeConfigurationAlpha1(ctx, &pb.UnsubscribeConfigurationRequest{ | ||
StoreName: storeName, | ||
Id: id, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
if !alpha1.Ok { | ||
return errors.Errorf("unsubscribe error message = %s", alpha1.GetMessage()) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package client | ||
|
||
import ( | ||
"context" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"go.uber.org/atomic" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
const ( | ||
valueSuffix = "_value" | ||
) | ||
|
||
func TestGetConfigurationItem(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
t.Run("get configuration item", func(t *testing.T) { | ||
resp, err := testClient.GetConfigurationItem(ctx, "example-config", "mykey") | ||
assert.Nil(t, err) | ||
assert.Equal(t, "mykey"+valueSuffix, resp.Value) | ||
}) | ||
|
||
t.Run("get configuration item with invalid storeName", func(t *testing.T) { | ||
_, err := testClient.GetConfigurationItem(ctx, "", "mykey") | ||
assert.NotNil(t, err) | ||
}) | ||
} | ||
|
||
func TestGetConfigurationItems(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
t.Run("Test get configuration items", func(t *testing.T) { | ||
resp, err := testClient.GetConfigurationItems(ctx, "example-config", []string{"mykey1", "mykey2", "mykey3"}) | ||
assert.Nil(t, err) | ||
for i, v := range resp { | ||
assert.Equal(t, "mykey"+strconv.Itoa(i+1)+valueSuffix, v.Value) | ||
} | ||
}) | ||
} | ||
|
||
func TestSubscribeConfigurationItems(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
counter := 0 | ||
totalCounter := 0 | ||
t.Run("Test subscribe configuration items", func(t *testing.T) { | ||
err := testClient.SubscribeConfigurationItems(ctx, "example-config", | ||
[]string{"mykey", "mykey2", "mykey3"}, func(s string, items []*ConfigurationItem) { | ||
counter++ | ||
for _, v := range items { | ||
assert.Equal(t, v.Value, v.Key+"_"+strconv.Itoa(counter-1)) | ||
totalCounter++ | ||
} | ||
}) | ||
assert.Nil(t, err) | ||
}) | ||
time.Sleep(time.Second*5 + time.Millisecond*500) | ||
assert.Equal(t, 5, counter) | ||
assert.Equal(t, 15, totalCounter) | ||
} | ||
|
||
func TestUnSubscribeConfigurationItems(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
counter := atomic.Int32{} | ||
totalCounter := atomic.Int32{} | ||
t.Run("Test unsubscribe configuration items", func(t *testing.T) { | ||
subscribeID := "" | ||
subscribeIDChan := make(chan string) | ||
go func() { | ||
err := testClient.SubscribeConfigurationItems(ctx, "example-config", | ||
[]string{"mykey", "mykey2", "mykey3"}, func(id string, items []*ConfigurationItem) { | ||
counter.Inc() | ||
for _, v := range items { | ||
assert.Equal(t, v.Value, v.Key+"_"+strconv.Itoa(int(counter.Load()-1))) | ||
totalCounter.Inc() | ||
} | ||
select { | ||
case subscribeIDChan <- id: | ||
default: | ||
} | ||
}) | ||
assert.Nil(t, err) | ||
}() | ||
subscribeID = <-subscribeIDChan | ||
time.Sleep(time.Second * 2) | ||
time.Sleep(time.Millisecond * 500) | ||
err := testClient.UnsubscribeConfigurationItems(ctx, "example-config", subscribeID) | ||
assert.Nil(t, err) | ||
}) | ||
time.Sleep(time.Second * 5) | ||
assert.Equal(t, 3, int(counter.Load())) | ||
assert.Equal(t, 9, int(totalCounter.Load())) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.