@@ -95,48 +95,6 @@ actor Speaker<SendMsg: RPCMessage & Message, RecvMsg: RPCMessage & Message> {
9595 try _ = await hndsh. handshake ( )
9696 }
9797
98- public func start( ) {
99- guard readLoopTask == nil else {
100- logger. error ( " speaker is already running " )
101- return
102- }
103- readLoopTask = Task {
104- do throws ( ReceiveError) {
105- for try await msg in try await self . receiver. messages ( ) {
106- guard msg. hasRpc else {
107- await messageBuffer. push ( . message( msg) )
108- continue
109- }
110- guard msg. rpc. msgID == 0 else {
111- let req = RPCRequest < SendMsg , RecvMsg > ( req: msg, sender: self . sender)
112- await messageBuffer. push ( . RPC( req) )
113- continue
114- }
115- guard msg. rpc. responseTo == 0 else {
116- self . logger. debug ( " got RPC reply for msgID \( msg. rpc. responseTo) " )
117- do throws ( RPCError) {
118- try await self . secretary. route ( reply: msg)
119- } catch {
120- self . logger. error (
121- " couldn't route RPC reply for \( msg. rpc. responseTo) : \( error) " )
122- }
123- continue
124- }
125- }
126- } catch {
127- self . logger. error ( " failed to receive messages: \( error) " )
128- throw error
129- }
130- }
131- }
132-
133- func wait( ) async throws {
134- guard let task = readLoopTask else {
135- return
136- }
137- try await task. value
138- }
139-
14098 /// Send a unary RPC message and handle the response
14199 func unaryRPC( _ req: SendMsg ) async throws -> RecvMsg {
142100 return try await withCheckedThrowingContinuation { continuation in
@@ -212,7 +170,25 @@ extension Speaker: AsyncSequence, AsyncIteratorProtocol {
212170 }
213171
214172 func next( ) async throws -> IncomingMessage ? {
215- return await messageBuffer. next ( )
173+ for try await msg in try await receiver. messages ( ) {
174+ guard msg. hasRpc else {
175+ return . message( msg)
176+ }
177+ guard msg. rpc. msgID == 0 else {
178+ return . RPC( RPCRequest < SendMsg , RecvMsg > ( req: msg, sender: sender) )
179+ }
180+ guard msg. rpc. responseTo == 0 else {
181+ logger. debug ( " got RPC reply for msgID \( msg. rpc. responseTo) " )
182+ do throws ( RPCError) {
183+ try await self . secretary. route ( reply: msg)
184+ } catch {
185+ logger. error (
186+ " couldn't route RPC reply for \( msg. rpc. responseTo) : \( error) " )
187+ }
188+ continue
189+ }
190+ }
191+ return nil
216192 }
217193}
218194
0 commit comments