Skip to content

Commit 7026fc2

Browse files
committed
ahc-1412 added a new close method to wait on shared netty resources
1 parent 3bb5ae4 commit 7026fc2

File tree

5 files changed

+85
-45
lines changed

5 files changed

+85
-45
lines changed

client/src/main/java/org/asynchttpclient/AsyncHttpClient.java

+8
Original file line numberDiff line numberDiff line change
@@ -287,4 +287,12 @@ public interface AsyncHttpClient extends Closeable {
287287
* @return the config associated to this client.
288288
*/
289289
AsyncHttpClientConfig getConfig();
290+
291+
/**
292+
* Similar to calling the method {@link #close()} but additionally waits for inactivity on shared resources between
293+
* multiple instances of netty. Calling this method instead of the method {@link #close()} might be helpful
294+
* on application shutdown to prevent errors like a {@link ClassNotFoundException} because the class loader was
295+
* already removed but there are still some active tasks on this shared resources which want to access these classes.
296+
*/
297+
void closeAndAwaitInactivity();
290298
}

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java

+49-16
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
import io.netty.util.HashedWheelTimer;
2222
import io.netty.util.ThreadDeathWatcher;
2323
import io.netty.util.Timer;
24+
import io.netty.util.concurrent.GlobalEventExecutor;
2425

26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ExecutionException;
2528
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
2630
import java.util.concurrent.atomic.AtomicBoolean;
2731
import java.util.function.Predicate;
2832

@@ -102,31 +106,60 @@ private Timer newNettyTimer() {
102106

103107
@Override
104108
public void close() {
109+
closeInternal(false);
110+
}
111+
112+
public void closeAndAwaitInactivity() {
113+
closeInternal(true);
114+
}
115+
116+
private void closeInternal(boolean awaitInactivity) {
105117
if (closeTriggered.compareAndSet(false, true)) {
106-
try {
107-
channelManager.close();
108-
} catch (Throwable t) {
109-
LOGGER.warn("Unexpected error on ChannelManager close", t);
110-
}
111-
if (allowStopNettyTimer) {
112-
try {
113-
nettyTimer.stop();
114-
} catch (Throwable t) {
115-
LOGGER.warn("Unexpected error on HashedWheelTimer close", t);
118+
CompletableFuture<Void> handledCloseFuture = channelManager.close().whenComplete((v, t) -> {
119+
if(t != null) {
120+
LOGGER.warn("Unexpected error on ChannelManager close", t);
121+
}
122+
if (allowStopNettyTimer) {
123+
try {
124+
nettyTimer.stop();
125+
} catch (Throwable th) {
126+
LOGGER.warn("Unexpected error on HashedWheelTimer close", th);
127+
}
116128
}
129+
});
130+
131+
if(awaitInactivity) {
132+
handledCloseFuture = handledCloseFuture.thenCombine(awaitInactivity(), (v1,v2) -> null) ;
117133
}
118-
119-
//see https://github.com/netty/netty/issues/2084#issuecomment-44822314
134+
120135
try {
121-
ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
122-
} catch(InterruptedException t) {
123-
// Ignore
136+
handledCloseFuture.get(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
137+
} catch (InterruptedException | TimeoutException t) {
138+
LOGGER.warn("Unexpected error on AsyncHttpClient close", t);
139+
} catch (ExecutionException e) {
140+
// already handled and could be ignored
124141
}
125-
126142
closed.compareAndSet(false, true);
127143
}
128144
}
129145

146+
private CompletableFuture<Void> awaitInactivity() {
147+
//see https://github.com/netty/netty/issues/2084#issuecomment-44822314
148+
CompletableFuture<Void> wait1 = CompletableFuture.runAsync(() -> {
149+
try {
150+
GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
151+
} catch(InterruptedException t) {
152+
// Ignore
153+
}});
154+
CompletableFuture<Void> wait2 = CompletableFuture.runAsync(() -> {
155+
try {
156+
ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
157+
} catch(InterruptedException t) {
158+
// Ignore
159+
}});
160+
return wait1.thenCombine(wait2, (v1,v2) -> null);
161+
}
162+
130163
@Override
131164
public boolean isClosed() {
132165
return closed.get();

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

+19-29
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@
3838
import io.netty.handler.stream.ChunkedWriteHandler;
3939
import io.netty.util.Timer;
4040
import io.netty.util.concurrent.DefaultThreadFactory;
41+
import io.netty.util.concurrent.Future;
4142
import io.netty.util.concurrent.GlobalEventExecutor;
4243

4344
import java.net.InetSocketAddress;
4445
import java.util.Map;
4546
import java.util.Map.Entry;
46-
import java.util.concurrent.CountDownLatch;
47+
import java.util.concurrent.CompletableFuture;
4748
import java.util.concurrent.ThreadFactory;
4849
import java.util.concurrent.TimeUnit;
4950
import java.util.function.Function;
@@ -100,15 +101,11 @@ public class ChannelManager {
100101
private final ChannelPool channelPool;
101102
private final ChannelGroup openChannels;
102103

103-
private final CountDownLatch closeLatch;
104-
105104
private AsyncHttpClientHandler wsHandler;
106105

107106
public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
108107

109108
this.config = config;
110-
111-
closeLatch = new CountDownLatch(2);
112109

113110
this.sslEngineFactory = config.getSslEngineFactory() != null ? config.getSslEngineFactory() : new DefaultSslEngineFactory();
114111
try {
@@ -304,32 +301,25 @@ public boolean removeAll(Channel connection) {
304301
return channelPool.removeAll(connection);
305302
}
306303

307-
private void doClose() {
308-
openChannels.close().addListener(future -> closeLatch.countDown());
309-
channelPool.destroy();
310-
311-
//see https://github.com/netty/netty/issues/2084#issuecomment-44822314
312-
try {
313-
GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
314-
} catch(InterruptedException t) {
315-
// Ignore
316-
} finally {
317-
closeLatch.countDown();
318-
}
319-
}
320-
321-
public void close() {
304+
public CompletableFuture<Void> close() {
305+
CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
322306
if (allowReleaseEventLoopGroup) {
323-
eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS)//
324-
.addListener(future -> doClose());
325-
} else
326-
doClose();
327-
328-
try {
329-
closeLatch.await(config.getShutdownTimeout(), TimeUnit.MILLISECONDS);
330-
} catch (InterruptedException e) {
331-
// Ignore
307+
closeFuture = toCompletableFuture(
308+
eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS));
332309
}
310+
return closeFuture.thenCompose(v -> toCompletableFuture(openChannels.close())).whenComplete((v,t) -> channelPool.destroy());
311+
}
312+
313+
private static CompletableFuture<Void> toCompletableFuture(final Future<?> future) {
314+
final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
315+
future.addListener(r -> {
316+
if(r.isSuccess()) {
317+
completableFuture.complete(null);
318+
} else {
319+
completableFuture.completeExceptionally(r.cause());
320+
}
321+
});
322+
return completableFuture;
333323
}
334324

335325
public void closeChannel(Channel channel) {

extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java

+5
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,9 @@ public void flushChannelPoolPartitions(Predicate<Object> predicate) {
143143
public AsyncHttpClientConfig getConfig() {
144144
return null;
145145
}
146+
147+
@Override
148+
public void closeAndAwaitInactivity() {
149+
150+
}
146151
}

extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java

+4
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,8 @@ public void flushChannelPoolPartitions(Predicate<Object> predicate) {
139139
public AsyncHttpClientConfig getConfig() {
140140
return null;
141141
}
142+
143+
@Override
144+
public void closeAndAwaitInactivity() {
145+
}
142146
}

0 commit comments

Comments
 (0)