Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory leak of grpc resolver #4490

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/discov/internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"errors"
"fmt"
"io"
"slices"

Check failure on line 8 in core/discov/internal/registry.go

View workflow job for this annotation

GitHub Actions / Windows

package slices is not in GOROOT (C:\hostedtoolcache\windows\go\1.20.14\x64\src\slices)
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -59,6 +60,17 @@
return c.monitor(key, l, exactMatch)
}

// Unmonitor cancel monitoring of given endpoints and keys, and remove the listener.
func (r *Registry) Unmonitor(endpoints []string, key string, l UpdateListener) {
c, exists := r.getCluster(endpoints)
// if not exists, return.
if !exists {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not exist, getCluster will create a cluster and save it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have considered this issue originally. I have written a new method to obtain the cluster, but usually the cluster has already been created when the Monitor is called first, so I deleted this method. To take a step back, generally speaking, when we use etcd, the endpoints are often the same, so even if a new cluster is created, it will have little impact.

}

c.unmonitor(key, l)
}

func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
clusterKey := getClusterKey(endpoints)
r.lock.RLock()
Expand Down Expand Up @@ -273,6 +285,14 @@
return nil
}

func (c *cluster) unmonitor(key string, l UpdateListener) {
c.lock.Lock()
defer c.lock.Unlock()
c.listeners[key] = slices.DeleteFunc(c.listeners[key], func(listener UpdateListener) bool {
return l == listener
})
}

func (c *cluster) newClient() (EtcdClient, error) {
cli, err := NewClient(c.endpoints)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions core/discov/internal/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,31 @@ func TestRegistry_Monitor(t *testing.T) {
assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener), false))
}

func TestRegistry_Unmonitor(t *testing.T) {
svr, err := mockserver.StartMockServers(1)
assert.NoError(t, err)
svr.StartAt(0)

endpoints := []string{svr.Servers[0].Address}
GetRegistry().lock.Lock()
GetRegistry().clusters = map[string]*cluster{
getClusterKey(endpoints): {
listeners: map[string][]UpdateListener{},
values: map[string]map[string]string{
"foo": {
"bar": "baz",
},
},
},
}
GetRegistry().lock.Unlock()
l := new(mockListener)
assert.Error(t, GetRegistry().Monitor(endpoints, "foo", l, false))
assert.Equal(t, 1, len(GetRegistry().clusters[getClusterKey(endpoints)].listeners["foo"]))
GetRegistry().Unmonitor(endpoints, "foo", l)
assert.Equal(t, 0, len(GetRegistry().clusters[getClusterKey(endpoints)].listeners["foo"]))
}

type mockListener struct {
}

Expand Down
35 changes: 21 additions & 14 deletions core/discov/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"sync/atomic"

"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
Expand All @@ -16,6 +17,7 @@ type (
// A Subscriber is used to subscribe the given key on an etcd cluster.
Subscriber struct {
endpoints []string
key string
exclusive bool
exactMatch bool
items *container
Expand Down Expand Up @@ -52,6 +54,11 @@ func (s *Subscriber) Values() []string {
return s.items.getValues()
}

// Close s.
func (s *Subscriber) Close() {
internal.GetRegistry().Unmonitor(s.endpoints, s.key, s.items)
}

// Exclusive means that key value can only be 1:1,
// which means later added value will remove the keys associated with the same value previously.
func Exclusive() SubOption {
Expand Down Expand Up @@ -83,7 +90,7 @@ func WithSubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify boo

type container struct {
exclusive bool
values map[string][]string
values map[string]*collection.Set
mapping map[string]string
snapshot atomic.Value
dirty *syncx.AtomicBool
Expand All @@ -94,7 +101,7 @@ type container struct {
func newContainer(exclusive bool) *container {
return &container{
exclusive: exclusive,
values: make(map[string][]string),
values: make(map[string]*collection.Set),
mapping: make(map[string]string),
dirty: syncx.ForAtomicBool(true),
}
Expand All @@ -116,15 +123,21 @@ func (c *container) addKv(key, value string) ([]string, bool) {
defer c.lock.Unlock()

c.dirty.Set(true)
keys := c.values[value]
if c.values[value] == nil {
c.values[value] = collection.NewSet()
}
keys := c.values[value].KeysStr()
previous := append([]string(nil), keys...)
early := len(keys) > 0
if c.exclusive && early {
for _, each := range keys {
c.doRemoveKey(each)
}
if c.values[value] == nil {
c.values[value] = collection.NewSet()
}
}
c.values[value] = append(c.values[value], key)
c.values[value].AddStr(key)
c.mapping[key] = value

if early {
Expand All @@ -147,18 +160,12 @@ func (c *container) doRemoveKey(key string) {
}

delete(c.mapping, key)
keys := c.values[server]
remain := keys[:0]

for _, k := range keys {
if k != key {
remain = append(remain, k)
}
if c.values[server] == nil {
return
}
c.values[server].Remove(key)

if len(remain) > 0 {
c.values[server] = remain
} else {
if c.values[server].Count() == 0 {
delete(c.values, server)
}
}
Expand Down
2 changes: 1 addition & 1 deletion zrpc/resolver/internal/discovbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _
sub.AddListener(update)
update()

return &nopResolver{cc: cc}, nil
return &nopResolver{cc: cc, closeFunc: func() { sub.Close() }}, nil
}

func (b *discovBuilder) Scheme() string {
Expand Down
6 changes: 5 additions & 1 deletion zrpc/resolver/internal/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ func register() {
}

type nopResolver struct {
cc resolver.ClientConn
cc resolver.ClientConn
closeFunc func()
}

func (r *nopResolver) Close() {
if r.closeFunc != nil {
r.closeFunc()
}
}

func (r *nopResolver) ResolveNow(_ resolver.ResolveNowOptions) {
Expand Down
14 changes: 14 additions & 0 deletions zrpc/resolver/internal/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ func TestNopResolver(t *testing.T) {
})
}

func TestNopResolver_Close(t *testing.T) {
var isChanged bool
r := nopResolver{}
r.Close()
assert.False(t, isChanged)
r = nopResolver{
closeFunc: func() {
isChanged = true
},
}
r.Close()
assert.True(t, isChanged)
}

type mockedClientConn struct {
state resolver.State
err error
Expand Down
Loading