Skip to content

Commit

Permalink
Finish v4.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
zouyx committed Dec 18, 2022
2 parents c9dde04 + 15f242d commit cde7163
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 24 deletions.
14 changes: 12 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/apolloconfig/agollo/v4/utils/parse/yml"
)

const separator = ","

func init() {
extension.SetCacheFactory(&memory.DefaultCacheFactory{})
extension.SetLoadBalance(&roundrobin.RoundRobin{})
Expand Down Expand Up @@ -75,13 +77,15 @@ type Client interface {
RemoveChangeListener(listener storage.ChangeListener)
GetChangeListeners() *list.List
UseEventDispatch()
Close()
}

// internalClient apollo 客户端实例
type internalClient struct {
initAppConfigFunc func() (*config.AppConfig, error)
appConfig *config.AppConfig
cache *storage.Cache
configComponent *notify.ConfigComponent
}

func (c *internalClient) getAppConfig() config.AppConfig {
Expand Down Expand Up @@ -135,6 +139,7 @@ func StartWithConfig(loadAppConfig func() (*config.AppConfig, error)) (Client, e
configComponent.SetAppConfig(c.getAppConfig)
configComponent.SetCache(c.cache)
go component.StartRefreshConfig(configComponent)
c.configComponent = configComponent

log.Info("agollo start finished ! ")

Expand Down Expand Up @@ -218,12 +223,12 @@ func (c *internalClient) GetBoolValue(key string, defaultValue bool) bool {

//GetStringSliceValue 获取[]string 配置值
func (c *internalClient) GetStringSliceValue(key string, defaultValue []string) []string {
return c.GetConfig(storage.GetDefaultNamespace()).GetStringSliceValue(key, defaultValue)
return c.GetConfig(storage.GetDefaultNamespace()).GetStringSliceValue(key, separator, defaultValue)
}

//GetIntSliceValue 获取[]int 配置值
func (c *internalClient) GetIntSliceValue(key string, defaultValue []int) []int {
return c.GetConfig(storage.GetDefaultNamespace()).GetIntSliceValue(key, defaultValue)
return c.GetConfig(storage.GetDefaultNamespace()).GetIntSliceValue(key, separator, defaultValue)
}

func (c *internalClient) getConfigValue(key string) interface{} {
Expand Down Expand Up @@ -260,3 +265,8 @@ func (c *internalClient) GetChangeListeners() *list.List {
func (c *internalClient) UseEventDispatch() {
c.AddChangeListener(storage.UseEventDispatch())
}

// Close 停止轮询
func (c *internalClient) Close() {
c.configComponent.Stop()
}
15 changes: 15 additions & 0 deletions component/notify/componet_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
type ConfigComponent struct {
appConfigFunc func() config.AppConfig
cache *storage.Cache
stopCh chan interface{}
}

// SetAppConfig nolint
Expand All @@ -48,9 +49,14 @@ func (c *ConfigComponent) SetCache(cache *storage.Cache) {

//Start 启动配置组件定时器
func (c *ConfigComponent) Start() {
if c.stopCh == nil {
c.stopCh = make(chan interface{})
}

t2 := time.NewTimer(longPollInterval)
instance := remote.CreateAsyncApolloConfig()
//long poll for sync
loop:
for {
select {
case <-t2.C:
Expand All @@ -59,6 +65,15 @@ func (c *ConfigComponent) Start() {
c.cache.UpdateApolloConfig(apolloConfig, c.appConfigFunc)
}
t2.Reset(longPollInterval)
case <-c.stopCh:
break loop
}
}
}

// Stop 停止配置组件定时器
func (c *ConfigComponent) Stop() {
if c.stopCh != nil {
close(c.stopCh)
}
}
7 changes: 6 additions & 1 deletion component/remote/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,12 @@ func createApolloConfigWithJSON(b []byte, callback http.CallBack) (o interface{}
if parser == nil {
return apolloConfig, nil
}
m, err := parser.Parse(apolloConfig.Configurations[defaultContentKey])

content, ok := apolloConfig.Configurations[defaultContentKey]
if !ok {
content = string(b)
}
m, err := parser.Parse(content)
if err != nil {
log.Debugf("GetContent fail ! error: %v", err)
}
Expand Down
7 changes: 6 additions & 1 deletion component/remote/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ func processJSONFiles(b []byte, callback http.CallBack) (o interface{}, err erro
if parser == nil {
return apolloConfig, nil
}
m, err := parser.Parse(configurations[defaultContentKey])

content, ok := configurations[defaultContentKey]
if !ok {
content = string(b)
}
m, err := parser.Parse(content)
if err != nil {
log.Debugf("GetContent fail ! error: %v", err)
}
Expand Down
24 changes: 18 additions & 6 deletions env/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,33 @@ func SetServers(configIp string, serverMap map[string]*config.ServerInfo) {
}

//SetDownNode 设置失效节点
func SetDownNode(configIp string, host string) {
func SetDownNode(configService string, serverHost string) {
serverLock.Lock()
defer serverLock.Unlock()
s := ipMap[configIp]
if host == "" || s == nil || len(s.serverMap) == 0 {
s := ipMap[configService]
if serverHost == "" {
return
}

if host == configIp {
s.nextTryConnTime = nextTryConnectPeriod
if s == nil || len(s.serverMap) == 0 {
// init server map
ipMap[configService] = &Info{
serverMap: map[string]*config.ServerInfo{
serverHost: {
HomepageURL: serverHost,
},
},
}
s = ipMap[configService]
}

if serverHost == configService {
s.nextTryConnTime = time.Now().Unix() + nextTryConnectPeriod
}

for k, server := range s.serverMap {
// if some node has down then select next node
if strings.Index(k, host) > -1 {
if strings.Index(k, serverHost) > -1 {
server.IsDown = true
}
}
Expand Down
6 changes: 1 addition & 5 deletions protocol/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,7 @@ func RequestRecovery(appConfig config.AppConfig,
return response, nil
}

if host == appConfig.GetHost() {
return response, err
}

server.SetDownNode(host, appConfig.GetHost())
server.SetDownNode(appConfig.GetHost(), host)
}
}

Expand Down
30 changes: 23 additions & 7 deletions storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -314,31 +315,46 @@ func (c *Config) GetStringValue(key string, defaultValue string) string {
}

// GetStringSliceValue 获取配置值([]string)
func (c *Config) GetStringSliceValue(key string, defaultValue []string) []string {
func (c *Config) GetStringSliceValue(key, separator string, defaultValue []string) []string {
value := c.getConfigValue(key, true)
if value == nil {
return defaultValue
}

v, ok := value.([]string)
if !ok {
log.Debugf("convert to []string fail ! source type:%T", value)
return defaultValue
s, ok := value.(string)
if !ok {
log.Debugf("convert to []string fail ! source type:%T", value)
return defaultValue
}
return strings.Split(s, separator)
}
return v
}

// GetIntSliceValue 获取配置值([]int)
func (c *Config) GetIntSliceValue(key string, defaultValue []int) []int {
func (c *Config) GetIntSliceValue(key, separator string, defaultValue []int) []int {
value := c.getConfigValue(key, true)
if value == nil {
return defaultValue
}

v, ok := value.([]int)
if !ok {
log.Debugf("convert to []int fail ! source type:%T", value)
return defaultValue
sl := c.GetStringSliceValue(key, separator, nil)
if sl == nil {
return defaultValue
}
v = make([]int, 0, len(sl))
for index := range sl {
i, err := strconv.Atoi(sl[index])
if err != nil {
log.Debugf("convert to []int fail! value:%s, source type:%T", sl[index], sl[index])
return defaultValue
}
v = append(v, i)
}
}
return v
}
Expand Down Expand Up @@ -573,7 +589,7 @@ func (c *Cache) AddChangeListener(listener ChangeListener) {
c.changeListeners.PushBack(listener)
}

// RemoveChangeListener 增加变更监控
// RemoveChangeListener 删除变更监控
func (c *Cache) RemoveChangeListener(listener ChangeListener) {
if listener == nil {
return
Expand Down
11 changes: 9 additions & 2 deletions storage/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func TestGetConfig(t *testing.T) {
configurations["bool"] = false
configurations["string_bool"] = "false"
configurations["sliceString"] = []string{"1", "2", "3"}
configurations["sliceStringWithSeparator"] = "1,2,3"
configurations["sliceInt"] = []int{1, 2, 3}
configurations["sliceIntWithSeparator"] = "1,2,3"
configurations["sliceInter"] = []interface{}{1, "2", 3}
c := creatTestApolloConfig(configurations, "test")
config := c.GetConfig("test")
Expand Down Expand Up @@ -148,10 +150,15 @@ func TestGetConfig(t *testing.T) {
b = config.GetBoolValue("b", false)
Assert(t, b, Equal(false))

slice := config.GetStringSliceValue("sliceString", []string{})
slice := config.GetStringSliceValue("sliceString", ",", []string{})
Assert(t, slice, Equal([]string{"1", "2", "3"}))
slice = config.GetStringSliceValue("sliceStringWithSeparator", ",", []string{})
Assert(t, slice, Equal([]string{"1", "2", "3"}))

sliceInt := config.GetIntSliceValue("sliceInt", ",", []int{})
Assert(t, sliceInt, Equal([]int{1, 2, 3}))

sliceInt := config.GetIntSliceValue("sliceInt", []int{})
sliceInt = config.GetIntSliceValue("sliceIntWithSeparator", ",", []int{})
Assert(t, sliceInt, Equal([]int{1, 2, 3}))

sliceInter := config.GetSliceValue("sliceInter", []interface{}{})
Expand Down

0 comments on commit cde7163

Please sign in to comment.