-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http-netty: let RetryingHttpRequesterFilter return responses on failure #3048
base: main
Are you sure you want to change the base?
Changes from 11 commits
515dd29
5b7f014
b08742d
4b4b60c
3493dc4
e80e98e
6ba45bc
6b45aa8
49d272d
f25458c
8d07708
3594f3a
0a47202
ea561aa
a4d023c
428a2d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
import io.servicetalk.transport.api.ExecutionContext; | ||
import io.servicetalk.transport.api.ExecutionStrategyInfluencer; | ||
import io.servicetalk.transport.api.RetryableException; | ||
import io.servicetalk.utils.internal.ThrowableUtils; | ||
|
||
import java.io.IOException; | ||
import java.time.Duration; | ||
|
@@ -91,15 +92,16 @@ public final class RetryingHttpRequesterFilter | |
implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer<HttpExecutionStrategy> { | ||
static final int DEFAULT_MAX_TOTAL_RETRIES = 4; | ||
private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES = | ||
new RetryingHttpRequesterFilter(true, false, false, 1, null, | ||
new RetryingHttpRequesterFilter(true, false, false, false, 1, null, | ||
(__, ___) -> NO_RETRIES, null); | ||
private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES = | ||
new RetryingHttpRequesterFilter(false, true, false, 0, null, | ||
new RetryingHttpRequesterFilter(false, true, false, false, 0, null, | ||
(__, ___) -> NO_RETRIES, null); | ||
|
||
private final boolean waitForLb; | ||
private final boolean ignoreSdErrors; | ||
private final boolean mayReplayRequestPayload; | ||
private final boolean returnFailedResponses; | ||
private final int maxTotalRetries; | ||
@Nullable | ||
private final Function<HttpResponseMetaData, HttpResponseException> responseMapper; | ||
|
@@ -109,13 +111,14 @@ public final class RetryingHttpRequesterFilter | |
|
||
RetryingHttpRequesterFilter( | ||
final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload, | ||
final int maxTotalRetries, | ||
final boolean returnFailedResponses, final int maxTotalRetries, | ||
@Nullable final Function<HttpResponseMetaData, HttpResponseException> responseMapper, | ||
final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor, | ||
@Nullable final RetryCallbacks onRequestRetry) { | ||
this.waitForLb = waitForLb; | ||
this.ignoreSdErrors = ignoreSdErrors; | ||
this.mayReplayRequestPayload = mayReplayRequestPayload; | ||
this.returnFailedResponses = returnFailedResponses; | ||
this.maxTotalRetries = maxTotalRetries; | ||
this.responseMapper = responseMapper; | ||
this.retryFor = retryFor; | ||
|
@@ -210,8 +213,21 @@ public Completable apply(final int count, final Throwable t) { | |
} | ||
|
||
Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { | ||
return retryCallbacks == null ? completable : | ||
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); | ||
Completable result = (retryCallbacks == null ? completable : | ||
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); | ||
if (returnFailedResponses && t instanceof HttpResponseException && | ||
bryce-anderson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
((HttpResponseException) t).metaData() instanceof StreamingHttpResponse) { | ||
StreamingHttpResponse response = (StreamingHttpResponse) ((HttpResponseException) t).metaData(); | ||
// If we succeed, we need to drain the response body before we continue. If we fail we want to | ||
// surface the original exception and don't worry about draining since it will be returned to | ||
// the user. | ||
result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) | ||
// If we get cancelled we also need to drain the message body as there is no guarantee | ||
// we'll ever receive a completion event, error or success. | ||
.beforeCancel(() -> drain(response).subscribe()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does that retry draining collide/overlap with the draining @idelpivnitskiy added in the other PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. That leak should have been self contained and the problem was that we didn't drain a response that the redirect filter had decided to consume itself. |
||
.concat(drain(response)); | ||
} | ||
return result; | ||
} | ||
} | ||
|
||
|
@@ -258,19 +274,31 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del | |
if (responseMapper != null) { | ||
single = single.flatMap(resp -> { | ||
final HttpResponseException exception = responseMapper.apply(resp); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated to your changes, but a find: if user-defined |
||
return (exception != null ? | ||
// Drain response payload body before discarding it: | ||
resp.payloadBody().ignoreElements().onErrorComplete() | ||
.concat(Single.<StreamingHttpResponse>failed(exception)) : | ||
Single.succeeded(resp)) | ||
.shareContextOnSubscribe(); | ||
Single<StreamingHttpResponse> response; | ||
if (exception == null) { | ||
response = Single.succeeded(resp); | ||
} else { | ||
response = Single.failed(exception); | ||
if (!returnFailedResponses) { | ||
response = drain(resp).concat(response); | ||
} | ||
} | ||
return response.shareContextOnSubscribe(); | ||
}); | ||
} | ||
|
||
// 1. Metadata is shared across retries | ||
// 2. Publisher state is restored to original state for each retry | ||
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). | ||
return single.retryWhen(retryStrategy(request, executionContext(), true)); | ||
single = single.retryWhen(retryStrategy(request, executionContext(), true)); | ||
if (returnFailedResponses) { | ||
single = single.onErrorResume(HttpResponseException.class, t -> { | ||
HttpResponseMetaData metaData = t.metaData(); | ||
return (metaData instanceof StreamingHttpResponse ? | ||
Single.succeeded((StreamingHttpResponse) metaData) : Single.failed(t)); | ||
}); | ||
} | ||
return single; | ||
} | ||
} | ||
|
||
|
@@ -719,6 +747,7 @@ public static final class Builder { | |
|
||
private int maxTotalRetries = DEFAULT_MAX_TOTAL_RETRIES; | ||
private boolean retryExpectationFailed; | ||
private boolean returnFailedResponses; | ||
|
||
private BiFunction<HttpRequestMetaData, RetryableException, BackOffPolicy> | ||
retryRetryableExceptions = (requestMetaData, e) -> BackOffPolicy.ofImmediateBounded(); | ||
|
@@ -745,6 +774,11 @@ public static final class Builder { | |
@Nullable | ||
private RetryCallbacks onRequestRetry; | ||
|
||
public Builder returnFailedResponses(final boolean returnFailedResponses) { | ||
this.returnFailedResponses = returnFailedResponses; | ||
return this; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm certain this can have a better name and clearly it needs docs before merging. Name suggestions welcome. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also think this API is a bit awkward: first you must turn a response into an HttpResponseException and then it's going to be discarded. Alternatively, we could just have a different lambda to the tune of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now we don't have RS operators to achieve retries without mapping into exceptions. If we go the route of clean retry of response meta-data without mapping to exceptions, it's possible but will take longer. Current rational was that some users want to always map responses to exceptions, that's why we have independent I agree that having a 3rd method that works only if the other 2 also configured is not intuitive. Alternatively, we can consider adding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the idea of the boolean overload, which would signal that it needs to be configured "together". Alternatively when building, we should at least check if this value is set to true and others are in their default state to reject the config? |
||
|
||
/** | ||
* By default, automatic retries wait for the associated {@link LoadBalancer} to be | ||
* {@link LoadBalancerReadyEvent ready} before triggering a retry for requests. This behavior may add latency to | ||
|
@@ -1054,7 +1088,11 @@ public RetryingHttpRequesterFilter build() { | |
return NO_RETRIES; | ||
}; | ||
return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload, | ||
maxTotalRetries, responseMapper, allPredicate, onRequestRetry); | ||
returnFailedResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); | ||
} | ||
} | ||
|
||
private static Completable drain(StreamingHttpResponse response) { | ||
return response.payloadBody().ignoreElements().onErrorComplete(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -251,21 +251,31 @@ private void assertRequestRetryingPred(final BlockingHttpClient client) { | |
assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0)); | ||
} | ||
|
||
@Test | ||
void testResponseMapper() { | ||
@ParameterizedTest | ||
bryce-anderson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@ValueSource(booleans = {true, false}) | ||
void testResponseMapper(final boolean returnFailedResponses) throws Exception { | ||
AtomicInteger newConnectionCreated = new AtomicInteger(); | ||
AtomicInteger responseDrained = new AtomicInteger(); | ||
AtomicInteger onRequestRetryCounter = new AtomicInteger(); | ||
final int maxTotalRetries = 4; | ||
final String retryMessage = "Retryable header"; | ||
normalClient = normalClientBuilder | ||
.appendClientFilter(new Builder() | ||
.returnFailedResponses(returnFailedResponses) | ||
.maxTotalRetries(maxTotalRetries) | ||
.responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ? | ||
new HttpResponseException("Retryable header", metaData) : null) | ||
new HttpResponseException(retryMessage, metaData) : null) | ||
// Disable request retrying | ||
.retryRetryableExceptions((requestMetaData, e) -> ofNoRetries()) | ||
// Retry only responses marked so | ||
.retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1)) | ||
.retryResponses((requestMetaData, throwable) -> { | ||
if (throwable instanceof HttpResponseException && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it some intermediate change that not needed anymore? I tried locally reverting back to the original lambda and it worked. |
||
retryMessage.equals(throwable.getMessage())) { | ||
return ofImmediate(maxTotalRetries - 1); | ||
} else { | ||
throw new RuntimeException("Unexpected exception"); | ||
} | ||
}) | ||
.onRequestRetry((count, req, t) -> | ||
assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) | ||
.build()) | ||
|
@@ -281,9 +291,14 @@ public Single<StreamingHttpResponse> request(final StreamingHttpRequest request) | |
}; | ||
}) | ||
.buildBlocking(); | ||
HttpResponseException e = assertThrows(HttpResponseException.class, | ||
() -> normalClient.request(normalClient.get("/"))); | ||
assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); | ||
if (returnFailedResponses) { | ||
HttpResponse response = normalClient.request(normalClient.get("/")); | ||
assertThat(response.status(), is(HttpResponseStatus.OK)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider enhancing server response to also add payload body and asserting here that payload is not drained. |
||
} else { | ||
HttpResponseException e = assertThrows(HttpResponseException.class, | ||
() -> normalClient.request(normalClient.get("/"))); | ||
assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); | ||
} | ||
// The load balancer is allowed to be not ready one time, which is counted against total retry attempts but not | ||
// against actual requests being issued. | ||
assertThat("Unexpected calls to select.", lbSelectInvoked.get(), allOf(greaterThanOrEqualTo(maxTotalRetries), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we may leak "pending response" if any user-defined function under
retryFor
throws. Scenario:HttpResponseException
OuterRetryStrategy.apply
is invokedretryFor
throwsapplyRetryCallbacks
and won't drain that response.Consider adding a try-catch inside
apply
to make sure we draint instance of HttpResponseException
in case of any unexpected exception.Adding a test will be highly appreciated.