Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve code by adding some null checks #3115

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {

/**
* When we encounter an unexpected IOException we look for these {@link Throwable#getMessage() messages} (because we have no
* better way to distinguish) and log them at DEBUG rather than WARN, since they are generally caused by unclean client
* disconnects rather than an actual problem.
* When we encounter an unexpected {@link IOException} we look for these {@link Throwable#getMessage() messages} (because we
* have no better way to distinguish) and log them at DEBUG rather than WARN, since they are generally caused by unclean
* client disconnects rather than an actual problem.
*/
static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES = LettuceSets.unmodifiableSet("Connection reset by peer",
"Broken pipe", "Connection timed out");
Expand Down Expand Up @@ -123,7 +123,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom

private Channel channel;

private ByteBuf buffer;
private ByteBuf readBuffer;

private boolean hasDecodeProgress;

Expand Down Expand Up @@ -182,8 +182,8 @@ protected void setState(LifecycleState lifecycleState) {
}
}

void setBuffer(ByteBuf buffer) {
this.buffer = buffer;
void setReadBuffer(ByteBuf readBuffer) {
this.readBuffer = readBuffer;
}

@Override
Expand Down Expand Up @@ -222,7 +222,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

setState(LifecycleState.REGISTERED);

buffer = ctx.alloc().buffer(8192 * 8);
readBuffer = ctx.alloc().buffer(8192 * 8);
rsm = new RedisStateMachine();
ctx.fireChannelRegistered();
}
Expand All @@ -242,10 +242,15 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
return;
}

channel = null;
buffer.release();
rsm.close();

if (readBuffer != null) {
readBuffer.release();
}

if (rsm != null) {
rsm.close();
}
rsm = null;

reset();
Expand Down Expand Up @@ -596,7 +601,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

try {
if (buffer.refCnt() < 1) {
if (readBuffer == null || readBuffer.refCnt() < 1) {
logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix());
return;
}
Expand All @@ -610,10 +615,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim());
}

buffer.touch("CommandHandler.read(…)");
buffer.writeBytes(input);
readBuffer.touch("CommandHandler.read(…)");
readBuffer.writeBytes(input);

decode(ctx, buffer);
decode(ctx, readBuffer);
} finally {
input.release();
}
Expand Down Expand Up @@ -829,7 +834,7 @@ private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<

private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput<?, ?, ?> pushOutput) {

if (!rsm.decode(buffer, pushOutput, ctx::fireExceptionCaught)) {
if (rsm != null && !rsm.decode(buffer, pushOutput, ctx::fireExceptionCaught)) {
return false;
}

Expand All @@ -852,11 +857,11 @@ private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput
}

protected boolean decode(ByteBuf buffer, CommandOutput<?, ?, ?> output) {
return rsm.decode(buffer, output);
return rsm != null && rsm.decode(buffer, output);
}

protected boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
return rsm.decode(buffer, output, command::completeExceptionally);
return rsm != null && rsm.decode(buffer, output, command::completeExceptionally);
}

/**
Expand Down Expand Up @@ -918,7 +923,7 @@ private void onProtectedMode(String message) {
* @param command
*/
protected void afterDecode(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command) {
decodeBufferPolicy.afterCommandDecoded(buffer);
decodeBufferPolicy.afterCommandDecoded(readBuffer);
}

private void recordLatency(WithLatency withLatency, RedisCommand<?, ?, ?> command) {
Expand Down Expand Up @@ -960,8 +965,8 @@ private void resetInternals() {
rsm.reset();
}

if (buffer.refCnt() > 0) {
buffer.clear();
if (readBuffer != null && readBuffer.refCnt() > 0) {
readBuffer.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static DecodeBufferPolicy ratio(float bufferUsageRatio) {

/**
* {@link DecodeBufferPolicy} that {@link ByteBuf#discardReadBytes() discards read bytes} after each decoding phase. This
* strategy hast the most memory efficiency but also leads to more CPU pressure.
* strategy has the most memory efficiency but also leads to more CPU pressure.
*
* @return the strategy object.
*/
Expand All @@ -80,7 +80,7 @@ public static DecodeBufferPolicy always() {
/**
* {@link DecodeBufferPolicy} that {@link ByteBuf#discardSomeReadBytes() discards some read bytes} after each decoding
* phase. This strategy might discard some, all, or none of read bytes depending on its internal implementation to reduce
* overall memory bandwidth consumption at the cost of potentially additional memory consumption.
* overall CPU bandwidth consumption at the cost of potentially additional memory consumption.
*
* @return the strategy object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ void shouldNotDiscardReadBytes() throws Exception {

// set the command handler buffer capacity to 30, make it easy to test
ByteBuf internalBuffer = context.alloc().buffer(30);
sut.setBuffer(internalBuffer);
sut.setReadBuffer(internalBuffer);

// mock a multi reply, which will reach the buffer usage ratio
ByteBuf msg = context.alloc().buffer(100);
Expand Down Expand Up @@ -559,7 +559,7 @@ void shouldDiscardReadBytes() throws Exception {

// set the command handler buffer capacity to 30, make it easy to test
ByteBuf internalBuffer = context.alloc().buffer(30);
sut.setBuffer(internalBuffer);
sut.setReadBuffer(internalBuffer);

// mock a multi reply, which will reach the buffer usage ratio
ByteBuf msg = context.alloc().buffer(100);
Expand Down Expand Up @@ -638,4 +638,16 @@ void shouldHandleIncompleteResponses() throws Exception {
assertThat(hmgetCommand.get()).hasSize(3);
}

/**
* @see <a href="https://github.com/redis/lettuce/issues/3087">Issue 3087</a>
*/
@Test
void shouldHandleNullBuffers() throws Exception {
sut.setReadBuffer(null);
sut.channelRead(context, Unpooled.wrappedBuffer(("*4\r\n" + "$3\r\nONE\r\n" + "$4\r\n>TW").getBytes()));
assertThat(stack).hasSize(0);

sut.channelUnregistered(context);
}

}
Loading