Skip to content

Connection Leak 0.5.2 #611

@moderakh

Description

@moderakh

for our project, https://github.com/Azure/azure-cosmosdb-java, we are trying to upgrade our dependency on RxNetty from 0.4.x to 0.5.2 however it seems there is a connection leak issue in 0.5.2 that we are facing.

code to repro:

public class testing {

    private static class DefaultSSLEngineFactory implements Func1<ByteBufAllocator, SSLEngine> {
        private final SslContext sslContex;

        private DefaultSSLEngineFactory() {
            try {
                SslProvider sslProvider = SslContext.defaultClientProvider();
                sslContex = SslContextBuilder.forClient().sslProvider(sslProvider).build();
            } catch (SSLException e) {
                throw new IllegalStateException("Failed to create default SSL context", e);
            }
        }

        @Override
        public SSLEngine call(ByteBufAllocator byteBufAllocator) {
            return sslContex.newEngine(byteBufAllocator);
        }
    }

    public static void main(String[] args) throws  Exception {

        PoolConfig<ByteBuf, ByteBuf> pConfig = new PoolConfig<>();
        pConfig.maxConnections(1000);
        HttpClient<ByteBuf, ByteBuf> cl = HttpClientImpl.newClient(SingleHostPoolingProviderFactory.create(pConfig),
                Observable.just(new Host(new InetSocketAddress(InetAddress.getByName("github.com"), 443))))
                .secure(new DefaultSSLEngineFactory());

        List<Observable<Void>> list = new ArrayList<>();

        for(int i = 0; i < 4000; i++) {

            Observable<HttpClientResponse<ByteBuf>> rsp =
                    cl.createRequest(HttpMethod.GET, "/").writeBytesContent(Observable.just(new byte[] {}));

            Observable<Void> contentDiscardedObs = rsp.flatMap(hcr -> hcr.discardContent());
            list.add(contentDiscardedObs);
        }

        List<Void> res = Observable.merge(list, 1000)
                .observeOn(Schedulers.computation())
                .toList().toBlocking().single();

        System.out.println("DONE");
    }
}

As Connection Pool Size is 1000, and we are merging results with 1000 as the degree of concurrency the above code should work. However we get the following failure:

Exception in thread "main" java.lang.RuntimeException: io.reactivex.netty.client.pool.PoolExhaustedException: Client connection pool exhausted.
	at rx.exceptions.Exceptions.propagate(Exceptions.java:57)
	at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:463)
	at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
	at testing.main(testing.java:66)
Caused by: io.reactivex.netty.client.pool.PoolExhaustedException: Client connection pool exhausted.
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$7.call(PooledConnectionProviderImpl.java:193)
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$7.call(PooledConnectionProviderImpl.java:173)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
	at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
	at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
	at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
	at rx.Subscriber.setProducer(Subscriber.java:211)
	at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
	at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
	at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
	at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$4.call(PooledConnectionProviderImpl.java:129)
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$4.call(PooledConnectionProviderImpl.java:108)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at io.reactivex.netty.protocol.tcp.client.ConnectionRequestImpl$1.call(ConnectionRequestImpl.java:30)
	at io.reactivex.netty.protocol.tcp.client.ConnectionRequestImpl$1.call(ConnectionRequestImpl.java:27)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at io.reactivex.netty.protocol.http.client.internal.HttpClientRequestImpl$OnSubscribeFuncImpl.call(HttpClientRequestImpl.java:447)
	at io.reactivex.netty.protocol.http.client.internal.HttpClientRequestImpl$OnSubscribeFuncImpl.call(HttpClientRequestImpl.java:420)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowPath(OnSubscribeFromIterable.java:117)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
	at rx.Subscriber.request(Subscriber.java:157)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:781)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
	at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:857)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:656)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
	at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:857)
	at rx.internal.operators.OperatorIgnoreElements$1.onCompleted(OperatorIgnoreElements.java:42)
	at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge$ConnectionInputSubscriber.contentComplete(AbstractHttpConnectionBridge.java:508)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge.processNextItemInEventloop(AbstractHttpConnectionBridge.java:283)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge.access$1200(AbstractHttpConnectionBridge.java:56)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge$ConnectionInputSubscriber.onNext(AbstractHttpConnectionBridge.java:431)
	at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373)
	at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189)
	at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.reactivex.netty.protocol.http.ws.client.Ws7To13UpgradeHandler.channelRead(Ws7To13UpgradeHandler.java:135)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1389)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1159)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1203)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:146)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

can someone please look at this? I only face this problem with 0.5.2 but not 0.4.x

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions