diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 697501af98..3a9588cf6b 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/dns" httpgrpc_server "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/kv" @@ -852,7 +853,12 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { if err != nil { return nil, err } - remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware) + retryConfig := backoff.Config{ + MinBackoff: t.Cfg.Ruler.QueryFrontend.MinRetryBackoff, + MaxBackoff: t.Cfg.Ruler.QueryFrontend.MaxRetryBackoff, + MaxRetries: t.Cfg.Ruler.QueryFrontend.MaxRetries, + } + remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, retryConfig, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware) embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient( remoteQuerier, diff --git a/pkg/ruler/remotequerier.go b/pkg/ruler/remotequerier.go index 57a13f7ef6..1ec5497940 100644 --- a/pkg/ruler/remotequerier.go +++ b/pkg/ruler/remotequerier.go @@ -50,8 +50,6 @@ const ( statusError = "error" - maxRequestRetries = 3 - formatJSON = "json" formatProtobuf = "protobuf" ) @@ -63,6 +61,11 @@ type QueryFrontendConfig struct { // Address is the address of the query-frontend to connect to. Address string `yaml:"address"` + // Retry configuration. + MaxRetries int `yaml:"max_retries"` + MinRetryBackoff time.Duration `yaml:"min_retry_backoff"` + MaxRetryBackoff time.Duration `yaml:"max_retry_backoff"` + // GRPCClientConfig contains gRPC specific config options. GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the rulers and query-frontends."` @@ -75,6 +78,9 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) { "", "GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) "+ "to enable client side load balancing.") + f.IntVar(&c.MaxRetries, "ruler.query-frontend.max-retries", 3, "Maximum number of retries for a single request.") + f.DurationVar(&c.MinRetryBackoff, "ruler.query-frontend.min-retry-backoff", 100*time.Millisecond, "Minimum backoff duration for retries.") + f.DurationVar(&c.MaxRetryBackoff, "ruler.query-frontend.max-retry-backoff", 2*time.Second, "Maximum backoff duration for retries.") c.GRPCClientConfig.CustomCompressors = []string{s2.Name} c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f) @@ -115,6 +121,7 @@ type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error // RemoteQuerier executes read operations against a httpgrpc.HTTPClient. type RemoteQuerier struct { client httpgrpc.HTTPClient + retryConfig backoff.Config timeout time.Duration middlewares []Middleware promHTTPPrefix string @@ -129,6 +136,7 @@ var protobufDecoderInstance = protobufDecoder{} // NewRemoteQuerier creates and initializes a new RemoteQuerier instance. func NewRemoteQuerier( client httpgrpc.HTTPClient, + retryConfig backoff.Config, timeout time.Duration, preferredQueryResultResponseFormat string, prometheusHTTPPrefix string, @@ -137,6 +145,7 @@ func NewRemoteQuerier( ) *RemoteQuerier { return &RemoteQuerier{ client: client, + retryConfig: retryConfig, timeout: timeout, middlewares: middlewares, promHTTPPrefix: prometheusHTTPPrefix, @@ -309,12 +318,7 @@ func (q *RemoteQuerier) createRequest(ctx context.Context, query string, ts time func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPRequest, logger log.Logger) (*httpgrpc.HTTPResponse, error) { // Ongoing request may be cancelled during evaluation due to some transient error or server shutdown, // so we'll keep retrying until we get a successful response or backoff is terminated. - retryConfig := backoff.Config{ - MinBackoff: 100 * time.Millisecond, - MaxBackoff: 2 * time.Second, - MaxRetries: maxRequestRetries, - } - retry := backoff.New(ctx, retryConfig) + retry := backoff.New(ctx, q.retryConfig) for { resp, err := q.client.Handle(ctx, req) diff --git a/pkg/ruler/remotequerier_test.go b/pkg/ruler/remotequerier_test.go index 1105bb7709..fdfea416b5 100644 --- a/pkg/ruler/remotequerier_test.go +++ b/pkg/ruler/remotequerier_test.go @@ -15,6 +15,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/status" "github.com/golang/snappy" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/httpgrpc" "github.com/prometheus/prometheus/model/histogram" @@ -30,6 +31,12 @@ import ( "github.com/grafana/mimir/pkg/querier/api" ) +var retryConfig = backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 1 * time.Second, + MaxRetries: 3, +} + type mockHTTPGRPCClient func(ctx context.Context, req *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) func (c mockHTTPGRPCClient) Handle(ctx context.Context, req *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { @@ -64,7 +71,7 @@ func TestRemoteQuerier_Read(t *testing.T) { t.Run("should issue a remote read request", func(t *testing.T) { client, inReq := setup() - q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) _, err := q.Read(context.Background(), &prompb.Query{}, false) require.NoError(t, err) @@ -76,7 +83,7 @@ func TestRemoteQuerier_Read(t *testing.T) { t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) { client, inReq := setup() - q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) _, err := q.Read(context.Background(), &prompb.Query{}, false) require.NoError(t, err) @@ -86,7 +93,7 @@ func TestRemoteQuerier_Read(t *testing.T) { t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) { client, inReq := setup() - q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong) _, err := q.Read(ctx, &prompb.Query{}, false) @@ -101,7 +108,7 @@ func TestRemoteQuerier_ReadReqTimeout(t *testing.T) { <-ctx.Done() return nil, ctx.Err() } - q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Second, formatJSON, "/prometheus", log.NewNopLogger()) _, err := q.Read(context.Background(), &prompb.Query{}, false) require.Error(t, err) @@ -139,7 +146,7 @@ func TestRemoteQuerier_Query(t *testing.T) { t.Run(fmt.Sprintf("format = %s", format), func(t *testing.T) { client, inReq := setup() - q := NewRemoteQuerier(client, time.Minute, format, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(client, retryConfig, time.Minute, format, "/prometheus", log.NewNopLogger()) _, err := q.Query(context.Background(), "qs", tm) require.NoError(t, err) @@ -165,7 +172,7 @@ func TestRemoteQuerier_Query(t *testing.T) { t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) { client, inReq := setup() - q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) _, err := q.Query(context.Background(), "qs", tm) require.NoError(t, err) @@ -175,7 +182,7 @@ func TestRemoteQuerier_Query(t *testing.T) { t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) { client, inReq := setup() - q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(client, retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong) _, err := q.Query(ctx, "qs", tm) @@ -276,7 +283,7 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) { } return testCase.response, nil } - q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) require.Equal(t, int64(0), count.Load()) _, err := q.Query(ctx, "qs", time.Now()) if testCase.err == nil { @@ -405,7 +412,7 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) { Body: []byte(scenario.body), }, nil } - q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) tm := time.Unix(1649092025, 515834) actual, err := q.Query(context.Background(), "qs", tm) @@ -678,7 +685,7 @@ func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) { Body: b, }, nil } - q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger()) tm := time.Unix(1649092025, 515834) actual, err := q.Query(context.Background(), "qs", tm) @@ -701,7 +708,7 @@ func TestRemoteQuerier_QueryUnknownResponseContentType(t *testing.T) { Body: []byte("some body content"), }, nil } - q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", log.NewNopLogger()) tm := time.Unix(1649092025, 515834) _, err := q.Query(context.Background(), "qs", tm) @@ -713,7 +720,7 @@ func TestRemoteQuerier_QueryReqTimeout(t *testing.T) { <-ctx.Done() return nil, ctx.Err() } - q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger()) + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Second, formatJSON, "/prometheus", log.NewNopLogger()) tm := time.Unix(1649092025, 515834) _, err := q.Query(context.Background(), "qs", tm) @@ -771,7 +778,7 @@ func TestRemoteQuerier_StatusErrorResponses(t *testing.T) { return testCase.resp, testCase.err } logger := newLoggerWithCounter() - q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", logger) + q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), retryConfig, time.Minute, formatJSON, "/prometheus", logger) tm := time.Unix(1649092025, 515834)