Skip to content

Commit 0753b6e

Browse files
committed
provides fixes to WrappedDirectBufferByteBuf and changes test to use IPC
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 77de699 commit 0753b6e

File tree

4 files changed

+53
-23
lines changed

4 files changed

+53
-23
lines changed

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronClientTransport.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,10 @@ public static AeronClientTransport createUdp(
231231
final Supplier<IdleStrategy> idleStrategySupplier =
232232
() ->
233233
new BackoffIdleStrategy(
234-
/* maxSpins */ 100, /* maxYields */
235-
1000, /* minParkPeriodNs */
236-
10000, /* maxParkPeriodNs */
237-
100000);
234+
/* maxSpins */ 100,
235+
/* maxYields */ 1000,
236+
/* minParkPeriodNs */ 10000,
237+
/* maxParkPeriodNs */ 100000);
238238
return new AeronClientTransport(
239239
aeron,
240240
new ChannelUriStringBuilder()
@@ -247,6 +247,26 @@ public static AeronClientTransport createUdp(
247247
ByteBufAllocator.DEFAULT,
248248
256,
249249
256,
250-
Duration.ofSeconds(50).toNanos());
250+
Duration.ofSeconds(5).toNanos());
251+
}
252+
253+
public static AeronClientTransport createIpc(Aeron aeron, EventLoopGroup resources) {
254+
final Supplier<IdleStrategy> idleStrategySupplier =
255+
() ->
256+
new BackoffIdleStrategy(
257+
/* maxSpins */ 100,
258+
/* maxYields */ 1000,
259+
/* minParkPeriodNs */ 10000,
260+
/* maxParkPeriodNs */ 100000);
261+
return new AeronClientTransport(
262+
aeron,
263+
new ChannelUriStringBuilder().media(CommonContext.IPC_MEDIA).build(),
264+
Schedulers.boundedElastic(),
265+
resources,
266+
idleStrategySupplier.get(),
267+
ByteBufAllocator.DEFAULT,
268+
256,
269+
256,
270+
Duration.ofSeconds(5).toNanos());
251271
}
252272
}

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/AeronServerTransport.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,21 @@ public static AeronServerTransport createUdp(
205205
ByteBufAllocator.DEFAULT,
206206
256,
207207
256,
208-
Duration.ofSeconds(50).toNanos());
208+
Duration.ofSeconds(5).toNanos());
209+
}
210+
211+
public static AeronServerTransport createIpc(Aeron aeron, EventLoopGroup resources) {
212+
final Supplier<IdleStrategy> idleStrategySupplier =
213+
() -> new BackoffIdleStrategy(100, 1000, 10000, 100000);
214+
return new AeronServerTransport(
215+
aeron,
216+
new ChannelUriStringBuilder().media(CommonContext.IPC_MEDIA).build(),
217+
Schedulers.boundedElastic(),
218+
resources,
219+
idleStrategySupplier.get(),
220+
ByteBufAllocator.DEFAULT,
221+
256,
222+
256,
223+
Duration.ofSeconds(5).toNanos());
209224
}
210225
}

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/WrappedDirectBufferByteBuf.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.nio.channels.FileChannel;
2626
import java.nio.channels.GatheringByteChannel;
2727
import java.nio.channels.ScatteringByteChannel;
28-
import org.agrona.BitUtil;
29-
import org.agrona.BufferUtil;
3028
import org.agrona.DirectBuffer;
3129

3230
class WrappedDirectBufferByteBuf extends AbstractByteBuf {
@@ -245,21 +243,23 @@ public int nioBufferCount() {
245243

246244
@Override
247245
public ByteBuffer nioBuffer(int index, int length) {
248-
final ByteBuffer byteBuffer =
249-
BufferUtil.allocateDirectAligned(length, BitUtil.CACHE_LINE_LENGTH);
250-
directBuffer.getBytes(index, byteBuffer, length);
251-
byteBuffer.flip();
252-
return byteBuffer;
246+
final ByteBuffer buffer = directBuffer.byteBuffer();
247+
if (buffer != null) {
248+
return buffer.duplicate().position(index).limit(index + length);
249+
} else {
250+
final byte[] bytes = directBuffer.byteArray();
251+
return ByteBuffer.wrap(bytes, index, length);
252+
}
253253
}
254254

255255
@Override
256256
public ByteBuffer internalNioBuffer(int index, int length) {
257-
return directBuffer.byteBuffer();
257+
return nioBuffer(index, length);
258258
}
259259

260260
@Override
261261
public ByteBuffer[] nioBuffers(int index, int length) {
262-
return new ByteBuffer[] {directBuffer.byteBuffer().duplicate()};
262+
return new ByteBuffer[] {nioBuffer(index, length)};
263263
}
264264

265265
@Override

rsocket-transport-aeron/src/test/java/io/rsocket/transport/aeron/AeronTransportTest.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ final class AeronTransportTest implements TransportTest {
2828

2929
static final MediaDriver mediaDriver =
3030
MediaDriver.launch(
31-
new MediaDriver.Context()
32-
.threadingMode(ThreadingMode.SHARED_NETWORK)
33-
.dirDeleteOnStart(true));
31+
new MediaDriver.Context().threadingMode(ThreadingMode.DEDICATED).dirDeleteOnStart(true));
3432

3533
static final Aeron clientAeron = Aeron.connect();
3634
static final Aeron serverAeron = Aeron.connect();
@@ -61,11 +59,8 @@ public AeronTransportPair(MediaDriver driver, Aeron clientAeron, Aeron serverAer
6159
InetSocketAddress.createUnresolved(
6260
"127.0.0.1", ThreadLocalRandom.current().nextInt(20000) + 5000),
6361
(address, server, allocator) ->
64-
AeronClientTransport.createUdp(
65-
clientAeron, address.getHostName(), address.getPort(), eventLoopGroup),
66-
(address, allocator) ->
67-
AeronServerTransport.createUdp(
68-
serverAeron, address.getHostName(), address.getPort(), eventLoopGroup),
62+
AeronClientTransport.createIpc(clientAeron, eventLoopGroup),
63+
(address, allocator) -> AeronServerTransport.createIpc(serverAeron, eventLoopGroup),
6964
false,
7065
false,
7166
false);

0 commit comments

Comments
 (0)