Skip to content
Draft

debug #105

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/conformance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:

client:
runs-on: ubuntu-2404-4-cores
timeout-minutes: 30

steps:
# https://github.com/actions/checkout
Expand Down Expand Up @@ -93,7 +94,10 @@ jobs:
run: |
go install -v connectrpc.com/conformance/cmd/connectconformance@latest

- name: Run the connectconformance for client
- name: Run the connectconformance for client (10 times)
run: |
connectconformance --trace --conf ./client_config.yaml --mode client --known-flaky @./client_known_flaky.yaml -- uv run python client_runner.py
for i in {1..10}; do
echo "Running conformance test iteration $i"
connectconformance --trace --conf ./client_config.yaml --mode client --known-flaky @./client_known_flaky.yaml -- uv run python client_runner.py
done

29 changes: 26 additions & 3 deletions conformance/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ async def delayed_abort() -> None:
UnaryRequest(
content=req,
headers=headers,
metadata={
"test_name": msg.test_name,
},
),
CallOptions(
timeout=msg.timeout_ms / 1000,
Expand All @@ -279,6 +282,7 @@ async def _reqs() -> AsyncGenerator[service_pb2.ClientStreamRequest]:
async for req in reqs:
if msg.request_delay_ms > 0:
await asyncio.sleep(msg.request_delay_ms / 1000)
print(f"[{msg.test_name}] Sending request", file=sys.stderr)
yield req

if msg.cancel.HasField("before_close_send"):
Expand All @@ -293,7 +297,13 @@ async def delayed_abort() -> None:
asyncio.create_task(delayed_abort())

async with getattr(client, msg.method)(
StreamRequest(content=_reqs(), headers=headers),
StreamRequest(
content=_reqs(),
headers=headers,
metadata={
"test_name": msg.test_name,
},
),
CallOptions(
timeout=msg.timeout_ms / 1000,
abort_event=abort_event,
Expand All @@ -319,7 +329,13 @@ async def delayed_abort() -> None:
headers = to_connect_headers(msg.request_headers)

async with getattr(client, msg.method)(
StreamRequest(content=reqs, headers=headers),
StreamRequest(
content=reqs,
headers=headers,
metadata={
"test_name": msg.test_name,
},
),
CallOptions(
timeout=msg.timeout_ms / 1000,
abort_event=abort_event,
Expand Down Expand Up @@ -358,12 +374,19 @@ async def _reqs() -> AsyncGenerator[service_pb2.ClientStreamRequest]:
async for req in reqs:
if msg.request_delay_ms > 0:
await asyncio.sleep(msg.request_delay_ms / 1000)
print(f"[{msg.test_name}] Sending request", file=sys.stderr)
yield req

headers = to_connect_headers(msg.request_headers)

async with getattr(client, msg.method)(
StreamRequest(content=_reqs(), headers=headers),
StreamRequest(
content=_reqs(),
headers=headers,
metadata={
"test_name": msg.test_name,
},
),
CallOptions(
timeout=msg.timeout_ms / 1000,
abort_event=abort_event,
Expand Down
1 change: 1 addition & 0 deletions conformance/run-testcase.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Errors/HTTPVersion:1/Protocol:PROTOCOL_GRPC_WEB/Codec:CODEC_PROTO/Compression:COMPRESSION_GZIP/TLS:false/(grpc server impl)/bidi-stream/half-duplex/error-with-no-responses
62 changes: 62 additions & 0 deletions spec/conformance-error.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
```
cd conformance
❯ connectconformance -vv --trace --conf ./client_config.yaml --mode client -- uv run python client_runner.py
```

# testcase
- request:
testName: bidi-stream/half-duplex/error-with-no-responses
streamType: STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
requestHeaders:
- name: X-Conformance-Test
value: ["Value1","Value2"]
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
responseDefinition:
responseHeaders:
- name: x-custom-header
value: ["foo"]
error:
code: CODE_INTERNAL
message: "bidi half duplex stream failed"
responseTrailers:
- name: x-custom-trailer
value: ["bing"]
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
requestData: "dGVzdCByZXNwb25zZQ=="

# flaky failed
FAILED: Errors/HTTPVersion:1/Protocol:PROTOCOL_GRPC_WEB/Codec:CODEC_PROTO/Compression:COMPRESSION_GZIP/TLS:false/(grpc server impl)/bidi-stream/half-duplex/error-with-no-responses:
actual error {code: 14 (unavailable), message: "http: invalid Read on closed Body"} does not match expected code 13 (internal)
actual error {code: 14 (unavailable), message: "http: invalid Read on closed Body"} does not match expected message "bidi half duplex stream failed"
actual error contain 0 details; expecting 1
---- HTTP Trace ----
request> 0.000ms POST http://.../connectrpc.conformance.v1.ConformanceService/BidiStream HTTP/1.1
request> Accept-Encoding: identity
request> Content-Type: application/grpc-web+proto
request> Grpc-Accept-Encoding: gzip
request> Grpc-Encoding: gzip
request> User-Agent: connect-python/0.0.1 (Python/3.13)
request> X-Conformance-Test: Value1, Value2
request> X-Test-Case-Name: Errors/HTTPVersion:1/Protocol:PROTOCOL_GRPC_WEB/Codec:CODEC_PROTO/Compression:COMPRESSION_GZIP/TLS:false/(grpc server impl)/bidi-stream/half-duplex/error-with-no-responses
request> X-User-Agent: connect-python/0.0.1 (Python/3.13)
request>
request> 1.545ms message #1: prefix: flags=1, len=99
request> message #1: data: 99/99 bytes
response< 1.671ms 200 OK
response< Access-Control-Expose-Headers: Server, X-Custom-Header, Vary, Date, Content-Type, Grpc-Encoding, grpc-status, grpc-message
response< Content-Type: application/grpc-web+proto
response< Grpc-Encoding: gzip
response< Server: connectconformance-grpcserver/v1.0.4
response< Vary: Origin
response< X-Custom-Header: foo
response<
request> 2.714ms message #2: prefix: flags=1, len=35
request> message #2: data: 35/35 bytes
request> 3.677ms body end (err=http: invalid Read on closed Body)
--------------------

Total cases: 4282
4279 passed, 2 failed
(Another 1 failed as expected due to being known failures/flakes.)
Error: Process completed with exit code 1.
6 changes: 4 additions & 2 deletions src/connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,11 @@ def on_request_send(r: httpcore.Request) -> None:

conn.on_request_send(on_request_send)

await conn.send(request.messages, call_options.timeout, call_options.abort_event)
await conn.send(request.messages, call_options.timeout, call_options.abort_event, request.matadata)

response = await receive_stream_response(conn, output, request.spec, call_options.abort_event)
response = await receive_stream_response(
conn, output, request.spec, call_options.abort_event, request.matadata
)
return response

stream_func = apply_interceptors(_stream_func, options.interceptors)
Expand Down
37 changes: 30 additions & 7 deletions src/connect/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ class RequestCommon:
_peer: Peer
_headers: Headers
_method: str
matadata: dict[str, Any] | None = None

def __init__(
self,
spec: Spec | None = None,
peer: Peer | None = None,
headers: Headers | None = None,
method: str | None = None,
matadata: dict[str, Any] | None = None,
) -> None:
"""Initializes the RPC context.

Expand All @@ -134,6 +136,7 @@ def __init__(
self._peer = peer if peer else Peer(address=None, protocol="", query={})
self._headers = headers if headers is not None else Headers()
self._method = method if method else HTTPMethod.POST.value
self.matadata = matadata if matadata is not None else {}

@property
def spec(self) -> Spec:
Expand Down Expand Up @@ -229,6 +232,7 @@ def __init__(
peer: Peer | None = None,
headers: Headers | None = None,
method: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Initializes a new request.

Expand All @@ -240,7 +244,13 @@ def __init__(
headers: The headers associated with the request. Defaults to None.
method: The method name for the request. Defaults to None.
"""
super().__init__(spec, peer, headers, method)
super().__init__(
spec,
peer,
headers,
method,
metadata,
)
self._messages = content if isinstance(content, AsyncIterable) else aiterate([content])

@property
Expand Down Expand Up @@ -298,6 +308,7 @@ def __init__(
peer: Peer | None = None,
headers: Headers | None = None,
method: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Initializes the request object.

Expand All @@ -308,7 +319,7 @@ def __init__(
headers (Headers | None, optional): The request headers. Defaults to None.
method (str | None, optional): The request method. Defaults to None.
"""
super().__init__(spec, peer, headers, method)
super().__init__(spec, peer, headers, method, metadata)
self._message = content

@property
Expand Down Expand Up @@ -874,7 +885,9 @@ def peer(self) -> Peer:
raise NotImplementedError()

@abc.abstractmethod
def receive(self, message: Any, abort_event: asyncio.Event | None) -> AsyncIterator[Any]:
def receive(
self, message: Any, abort_event: asyncio.Event | None, metadata: dict[str, Any] | None = None
) -> AsyncIterator[Any]:
"""Asynchronously receives a stream of messages.

This method sends an initial message and then listens for a stream of
Expand Down Expand Up @@ -912,7 +925,11 @@ def request_headers(self) -> Headers:

@abc.abstractmethod
async def send(
self, messages: AsyncIterable[Any], timeout: float | None, abort_event: asyncio.Event | None
self,
messages: AsyncIterable[Any],
timeout: float | None,
abort_event: asyncio.Event | None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Asynchronously sends a stream of messages.

Expand Down Expand Up @@ -1096,7 +1113,11 @@ async def receive_unary_response[T](


async def receive_stream_response[T](
conn: StreamingClientConn, t: type[T], spec: Spec, abort_event: asyncio.Event | None
conn: StreamingClientConn,
t: type[T],
spec: Spec,
abort_event: asyncio.Event | None,
metadata: dict[str, Any] | None = None,
) -> StreamResponse[T]:
"""Receives a streaming response from the server.

Expand All @@ -1116,12 +1137,14 @@ async def receive_stream_response[T](
headers, and trailers.
"""
if spec.stream_type == StreamType.ClientStream:
single_message = await ensure_single(conn.receive(t, abort_event))
single_message = await ensure_single(conn.receive(t, abort_event, metadata))

return StreamResponse(
AsyncDataStream[T](aiterate([single_message]), conn.aclose), conn.response_headers, conn.response_trailers
)
else:
return StreamResponse(
AsyncDataStream[T](conn.receive(t, abort_event), conn.aclose), conn.response_headers, conn.response_trailers
AsyncDataStream[T](conn.receive(t, abort_event, metadata), conn.aclose),
conn.response_headers,
conn.response_trailers,
)
26 changes: 22 additions & 4 deletions src/connect/protocol_connect/connect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ async def _receive_messages(self, message: Any) -> AsyncIterator[Any]:
obj = await self.unmarshaler.unmarshal(message)
yield obj

def receive(self, message: Any, _abort_event: asyncio.Event | None) -> AsyncIterator[Any]:
def receive(
self,
message: Any,
_abort_event: asyncio.Event | None,
metadata: dict[str, Any] | None = None,
) -> AsyncIterator[Any]:
"""Receives messages asynchronously based on the provided input message.

Args:
Expand Down Expand Up @@ -367,7 +372,11 @@ def on_request_send(self, fn: EventHook) -> None:
self._event_hooks["request"].append(fn)

async def send(
self, messages: AsyncIterable[Any], timeout: float | None, abort_event: asyncio.Event | None
self,
messages: AsyncIterable[Any],
timeout: float | None,
abort_event: asyncio.Event | None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Sends a single message asynchronously using either HTTP GET or POST, with optional timeout and abort support.

Expand Down Expand Up @@ -708,7 +717,12 @@ def on_request_send(self, fn: EventHook) -> None:
"""
self._event_hooks["request"].append(fn)

async def receive(self, message: Any, abort_event: asyncio.Event | None = None) -> AsyncIterator[Any]:
async def receive(
self,
message: Any,
abort_event: asyncio.Event | None = None,
metadata: dict[str, Any] | None = None,
) -> AsyncIterator[Any]:
"""Asynchronously receives and yields messages from the unmarshaler, handling stream control and errors.

Args:
Expand Down Expand Up @@ -757,7 +771,11 @@ async def receive(self, message: Any, abort_event: asyncio.Event | None = None)
raise ConnectError("missing end stream message", Code.INVALID_ARGUMENT)

async def send(
self, messages: AsyncIterable[Any], timeout: float | None, abort_event: asyncio.Event | None
self,
messages: AsyncIterable[Any],
timeout: float | None,
abort_event: asyncio.Event | None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Sends a stream of messages asynchronously to the server using HTTP POST.

Expand Down
Loading
Loading