Description
Hello, dear team. I use Spring Boot starters of recent 3.4.5 version to investigate Spring RSocket Interfaces. Here is my sample server.
@MessageMapping("uuid")
suspend fun uuid(): String = UUID.randomUUID().toString()
And I want to consume it using RSocket Interface. And while for HTTP Interface I can easily use both Mono<String> and suspend fun: String and it is working perfectly, with RSocket interfaces if I use it this way
interface ServerRockerClient {
@RSocketExchange("uuid")
fun uuid(): Mono<String>
}
and
val result = serverRocketClient.uuid().awaitSingle()
it works, but if I have it in coroutine way
interface ServerRockerClient {
@RSocketExchange("uuid")
suspend fun uuid(): String
}
and
val result = serverRocketClient.uuid()
I got exception
org.springframework.core.codec.EncodingException: JSON encoding error: kotlinx.atomicfu.AtomicRef
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:260)
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.encodeData(DefaultRSocketRequester.java:254)
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.lambda$createPayload$2(DefaultRSocketRequester.java:208)
at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:295)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
at io.rsocket.core.DefaultRSocketClient$FlatMapMain.request(DefaultRSocketClient.java:327)
at io.rsocket.core.DefaultRSocketClient$FlattingInner.request(DefaultRSocketClient.java:454)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:54)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at io.rsocket.core.DefaultRSocketClient$FlatMapMain.onSubscribe(DefaultRSocketClient.java:256)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:194)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at io.rsocket.core.DefaultRSocketClient$RSocketClientMonoOperator.subscribe(DefaultRSocketClient.java:530)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.Mono.block(Mono.java:1778)
at org.springframework.messaging.rsocket.service.RSocketServiceMethod.lambda$initResponseFunction$3(RSocketServiceMethod.java:173)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.springframework.messaging.rsocket.service.RSocketServiceMethod.invoke(RSocketServiceMethod.java:224)
at org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory$ServiceMethodInterceptor.invoke(RSocketServiceProxyFactory.java:255)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223)
at jdk.proxy2/jdk.proxy2.$Proxy51.uuid(Unknown Source)
at com.german.clientrocket.verticles.ServerVerticle$start$httpServerFuture$1$1.invokeSuspend(ServerVerticle.kt:42)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:363)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:26)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:21)
at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:88)
at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:123)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:52)
at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:43)
at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source)
at com.german.clientrocket.verticles.ServerVerticle.start$lambda$0(ServerVerticle.kt:24)
at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:150)
at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:42)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:342)
at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:163)
at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:174)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:159)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.onHttpRequestChannelRead(WebSocketServerExtensionHandler.java:158)
at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:82)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.vertx.core.http.impl.Http1xUpgradeToH2CHandler.channelRead(Http1xUpgradeToH2CHandler.java:124)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61)
at io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:38)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104)
at reactor.core.publisher.Mono.block(Mono.java:1779)
... 62 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: kotlinx.atomicfu.AtomicRef (through reference chain: com.german.clientrocket.verticles.ServerVerticle$start$httpServerFuture$1$1["completion"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:401)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:360)
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:323)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:778)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:184)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:502)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:341)
at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1587)
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:1061)
at org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:253)
... 87 more
Caused by: java.lang.ClassNotFoundException: kotlinx.atomicfu.AtomicRef
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.parseType(KDeclarationContainerImpl.kt:291)
at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.loadReturnType(KDeclarationContainerImpl.kt:306)
at kotlin.reflect.jvm.internal.KDeclarationContainerImpl.findMethodBySignature(KDeclarationContainerImpl.kt:216)
at kotlin.reflect.jvm.internal.KPropertyImplKt.computeCallerForAccessor(KPropertyImpl.kt:313)
at kotlin.reflect.jvm.internal.KPropertyImplKt.access$computeCallerForAccessor(KPropertyImpl.kt:1)
at kotlin.reflect.jvm.internal.KPropertyImpl$Getter$caller$2.invoke(KPropertyImpl.kt:180)
at kotlin.reflect.jvm.internal.KPropertyImpl$Getter$caller$2.invoke(KPropertyImpl.kt:179)
at kotlin.SafePublicationLazyImpl.getValue(LazyJVM.kt:107)
at kotlin.reflect.jvm.internal.KPropertyImpl$Getter.getCaller(KPropertyImpl.kt:179)
at kotlin.reflect.jvm.ReflectJvmMapping.getJavaMethod(ReflectJvmMapping.kt:64)
at kotlin.reflect.jvm.ReflectJvmMapping.getJavaGetter(ReflectJvmMapping.kt:49)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.getRequiredMarkerFromCorrespondingAccessor(KotlinAnnotationIntrospector.kt:138)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.hasRequiredMarker(KotlinAnnotationIntrospector.kt:133)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.access$hasRequiredMarker(KotlinAnnotationIntrospector.kt:27)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector$hasRequiredMarker$hasRequired$1.invoke(KotlinAnnotationIntrospector.kt:47)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector$hasRequiredMarker$hasRequired$1.invoke(KotlinAnnotationIntrospector.kt:40)
at com.fasterxml.jackson.module.kotlin.ReflectionCache.javaMemberIsRequired(ReflectionCache.kt:106)
at com.fasterxml.jackson.module.kotlin.KotlinAnnotationIntrospector.hasRequiredMarker(KotlinAnnotationIntrospector.kt:40)
at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasRequiredMarker(AnnotationIntrospectorPair.java:310)
at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasRequiredMarker(AnnotationIntrospectorPair.java:310)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.getMetadata(POJOPropertyBuilder.java:225)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._anyIndexed(POJOPropertiesCollector.java:1671)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._sortProperties(POJOPropertiesCollector.java:1563)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:503)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueAccessor(POJOPropertiesCollector.java:269)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueAccessor(BasicBeanDescription.java:248)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:393)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:174)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1535)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1503)
at com.fasterxml.jackson.databind.SerializerProvider.findPrimaryPropertySerializer(SerializerProvider.java:721)
at com.fasterxml.jackson.databind.ser.impl.PropertySerializerMap.findAndAddPrimarySerializer(PropertySerializerMap.java:72)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter._findAndAddDynamic(BeanPropertyWriter.java:899)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:710)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
... 93 more
My config for proxy client is
@Configuration
class RSocketConfig {
@Bean
fun rSocketRequester(rSocketBuilder: RSocketRequester.Builder): RSocketRequester =
rSocketBuilder.rsocketConnector {
it.reconnect(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)))
}.transport(WebsocketClientTransport.create(URI.create("ws://127.0.0.1:8090")))
@Bean
fun rSocketServiceProxyFactory(@Qualifier("rSocketRequester") rSocketRequester: RSocketRequester): RSocketServiceProxyFactory =
RSocketServiceProxyFactory.builder(rSocketRequester).build()
@Bean
fun serverRocketClient(@Qualifier("rSocketServiceProxyFactory") rSocketServiceProxyFactory: RSocketServiceProxyFactory): ServerRockerClient =
rSocketServiceProxyFactory.createClient(ServerRockerClient::class.java)
}
I don't exactly know if it is bug or not implemented feature, but wanted to ask is it planned to implement RSocket Interfaces for coroutines, considering that all other RSocket components both on client and server side are very coroutine friendly.
I would be very grateful for your assist and thank you very much for all your input to community.
Activity
doxlik commentedon Jun 3, 2025
Hi everyone,
I cloned the Spring Framework and locally added changes to the RSocket interface method interception logic, taking the HTTP interfaces as a reference point. After building Spring with these adjustments, coroutine support started working properly with RSocket interfaces.
The changes are relatively small, and I’m not sure if guiding an external contribution like this would be easier than applying similar logic directly within the codebase you know inside and out.
P.S. I’m not requesting or expecting any action—just wanted to share this in case it’s helpful. I truly appreciate all the work you do for the community and really hope RSocket support becomes even more coroutine-friendly over time.
Thanks again!