Skip to content

Commit e14cfad

Browse files
sbtouristnormanmaurer
authored andcommitted
Improved DefaultChannelPipeline#destroy() to avoid spinning continuously in case of custom executors.
Motivation: The previous DefaultChannelPipeline#destroy() implementation, introduced in netty#3156, is suboptimal as it can cause the for loop to continuously spin if the executor used by a given handler is unable to "recognize" the event loop. It could be objected that it's the custom executor responsibility to properly implement the inEventLoop() method, but some implementetaions might not be able to do that for performance reasons, and even so, it's always better to be safe against API misuse, in particular when it is not possible to fail fast and the alternative is rather some sutle behaviour. Modifications: The patch simply avoids the recursive spin by explicitly passing the "in event loop" condition as a boolean parameter, preserving the same guarantees offered by netty#3156. A unit test has also been added. Result: All channel events are correctly called and no high CPU usage is seen anymore.
1 parent 8b243db commit e14cfad

File tree

2 files changed

+106
-8
lines changed

2 files changed

+106
-8
lines changed

transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -867,35 +867,36 @@ public ChannelPipeline fireChannelUnregistered() {
867867
* See: https://github.com/netty/netty/issues/3156
868868
*/
869869
private void destroy() {
870-
destroyUp(head.next);
870+
destroyUp(head.next, false);
871871
}
872872

873-
private void destroyUp(AbstractChannelHandlerContext ctx) {
873+
private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
874874
final Thread currentThread = Thread.currentThread();
875875
final AbstractChannelHandlerContext tail = this.tail;
876876
for (;;) {
877877
if (ctx == tail) {
878-
destroyDown(currentThread, tail.prev);
878+
destroyDown(currentThread, tail.prev, inEventLoop);
879879
break;
880880
}
881881

882882
final EventExecutor executor = ctx.executor();
883-
if (!executor.inEventLoop(currentThread)) {
883+
if (!inEventLoop && !executor.inEventLoop(currentThread)) {
884884
final AbstractChannelHandlerContext finalCtx = ctx;
885885
executor.unwrap().execute(new OneTimeTask() {
886886
@Override
887887
public void run() {
888-
destroyUp(finalCtx);
888+
destroyUp(finalCtx, true);
889889
}
890890
});
891891
break;
892892
}
893893

894894
ctx = ctx.next;
895+
inEventLoop = false;
895896
}
896897
}
897898

898-
private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx) {
899+
private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
899900
// We have reached at tail; now traverse backwards.
900901
final AbstractChannelHandlerContext head = this.head;
901902
for (;;) {
@@ -904,7 +905,7 @@ private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx
904905
}
905906

906907
final EventExecutor executor = ctx.executor();
907-
if (executor.inEventLoop(currentThread)) {
908+
if (inEventLoop || executor.inEventLoop(currentThread)) {
908909
synchronized (this) {
909910
remove0(ctx);
910911
}
@@ -913,13 +914,14 @@ private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx
913914
executor.unwrap().execute(new OneTimeTask() {
914915
@Override
915916
public void run() {
916-
destroyDown(Thread.currentThread(), finalCtx);
917+
destroyDown(Thread.currentThread(), finalCtx, true);
917918
}
918919
});
919920
break;
920921
}
921922

922923
ctx = ctx.prev;
924+
inEventLoop = false;
923925
}
924926
}
925927

transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java

+96
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import io.netty.util.AbstractReferenceCounted;
2929
import io.netty.util.ReferenceCountUtil;
3030
import io.netty.util.ReferenceCounted;
31+
import io.netty.util.concurrent.AbstractEventExecutor;
32+
import io.netty.util.concurrent.EventExecutorGroup;
33+
import io.netty.util.concurrent.Future;
3134
import org.junit.After;
3235
import org.junit.AfterClass;
3336
import org.junit.Test;
@@ -38,6 +41,8 @@
3841
import java.util.List;
3942
import java.util.Queue;
4043
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.ExecutorService;
45+
import java.util.concurrent.Executors;
4146
import java.util.concurrent.TimeUnit;
4247
import java.util.concurrent.atomic.AtomicReference;
4348

@@ -572,6 +577,37 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
572577
assertSame(exception, error.get());
573578
}
574579

580+
@Test
581+
public void testChannelUnregistrationWithCustomExecutor() throws Exception {
582+
final CountDownLatch channelLatch = new CountDownLatch(1);
583+
final CountDownLatch handlerLatch = new CountDownLatch(1);
584+
ChannelPipeline pipeline = new LocalChannel().pipeline();
585+
pipeline.addLast(new ChannelInitializer<Channel>() {
586+
@Override
587+
protected void initChannel(Channel ch) throws Exception {
588+
ch.pipeline().addLast(new WrapperExecutor(),
589+
new ChannelInboundHandlerAdapter() {
590+
591+
@Override
592+
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
593+
channelLatch.countDown();
594+
}
595+
596+
@Override
597+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
598+
handlerLatch.countDown();
599+
}
600+
});
601+
}
602+
});
603+
Channel channel = pipeline.channel();
604+
group.register(channel);
605+
channel.close();
606+
channel.deregister();
607+
assertTrue(channelLatch.await(2, TimeUnit.SECONDS));
608+
assertTrue(handlerLatch.await(2, TimeUnit.SECONDS));
609+
}
610+
575611
private static int next(AbstractChannelHandlerContext ctx) {
576612
AbstractChannelHandlerContext next = ctx.next;
577613
if (next == null) {
@@ -683,4 +719,64 @@ public void handlerRemoved(ChannelHandlerContext ctx) {
683719
afterRemove = true;
684720
}
685721
}
722+
723+
private static final class WrapperExecutor extends AbstractEventExecutor {
724+
725+
private final ExecutorService wrapped = Executors.newSingleThreadExecutor();
726+
727+
@Override
728+
public boolean isShuttingDown() {
729+
return wrapped.isShutdown();
730+
}
731+
732+
@Override
733+
public Future<?> shutdownGracefully(long l, long l2, TimeUnit timeUnit) {
734+
throw new IllegalStateException();
735+
}
736+
737+
@Override
738+
public Future<?> terminationFuture() {
739+
throw new IllegalStateException();
740+
}
741+
742+
@Override
743+
public void shutdown() {
744+
wrapped.shutdown();
745+
}
746+
747+
@Override
748+
public List<Runnable> shutdownNow() {
749+
return wrapped.shutdownNow();
750+
}
751+
752+
@Override
753+
public boolean isShutdown() {
754+
return wrapped.isShutdown();
755+
}
756+
757+
@Override
758+
public boolean isTerminated() {
759+
return wrapped.isTerminated();
760+
}
761+
762+
@Override
763+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
764+
return wrapped.awaitTermination(timeout, unit);
765+
}
766+
767+
@Override
768+
public EventExecutorGroup parent() {
769+
return null;
770+
}
771+
772+
@Override
773+
public boolean inEventLoop(Thread thread) {
774+
return false;
775+
}
776+
777+
@Override
778+
public void execute(Runnable command) {
779+
wrapped.execute(command);
780+
}
781+
}
686782
}

0 commit comments

Comments
 (0)