|
8 | 8 | import logging |
9 | 9 | import subprocess |
10 | 10 | import tempfile |
| 11 | +import threading |
11 | 12 | import unittest |
12 | 13 | from collections.abc import Mapping |
13 | 14 | from contextlib import redirect_stderr, redirect_stdout |
|
36 | 37 | from src_py_lib.clients.sourcegraph import ( |
37 | 38 | SourcegraphClient, |
38 | 39 | SourcegraphClientConfig, |
| 40 | + SourcegraphTrace, |
39 | 41 | decode_external_service_id, |
40 | 42 | decode_repository_id, |
41 | 43 | encode_repository_id, |
@@ -1535,6 +1537,50 @@ def handler(request: httpx.Request) -> httpx.Response: |
1535 | 1537 | self.assertEqual(summaries[0].graphql_operations[0]["operation"], "Viewer") |
1536 | 1538 | self.assertEqual(summaries[0].errored_spans[0]["description"], "boom") |
1537 | 1539 |
|
| 1540 | + def test_sourcegraph_streams_jaeger_summaries_in_parallel(self) -> None: |
| 1541 | + trace_ids = ("1" * 32, "2" * 32, "3" * 32) |
| 1542 | + requested_trace_ids: list[str] = [] |
| 1543 | + first_batch_barrier = threading.Barrier(2, timeout=1) |
| 1544 | + |
| 1545 | + def handler(request: httpx.Request) -> httpx.Response: |
| 1546 | + trace_id = request.url.path.rsplit("/", 1)[-1] |
| 1547 | + requested_trace_ids.append(trace_id) |
| 1548 | + if trace_id in trace_ids[:2]: |
| 1549 | + first_batch_barrier.wait() |
| 1550 | + return httpx.Response( |
| 1551 | + 200, |
| 1552 | + json={ |
| 1553 | + "data": [ |
| 1554 | + { |
| 1555 | + "spans": [ |
| 1556 | + { |
| 1557 | + "operationName": f"trace {trace_id[0]}", |
| 1558 | + "duration": 1_000, |
| 1559 | + "tags": [], |
| 1560 | + } |
| 1561 | + ] |
| 1562 | + } |
| 1563 | + ] |
| 1564 | + }, |
| 1565 | + ) |
| 1566 | + |
| 1567 | + client = SourcegraphClient( |
| 1568 | + "https://sourcegraph.example.com/", |
| 1569 | + "token", |
| 1570 | + http=HTTPClient(max_attempts=1, transport=httpx.MockTransport(handler)), |
| 1571 | + ) |
| 1572 | + |
| 1573 | + summaries = list( |
| 1574 | + client.stream_jaeger_trace_summaries( |
| 1575 | + [SourcegraphTrace(trace_id) for trace_id in trace_ids], |
| 1576 | + retry_delays_seconds=(0,), |
| 1577 | + parallelism=2, |
| 1578 | + ) |
| 1579 | + ) |
| 1580 | + |
| 1581 | + self.assertCountEqual(requested_trace_ids, trace_ids) |
| 1582 | + self.assertCountEqual([summary.trace.trace_id for summary in summaries], trace_ids) |
| 1583 | + |
1538 | 1584 | def test_graphql_client_paginates_cursor_results(self) -> None: |
1539 | 1585 | http = RecordingHTTP( |
1540 | 1586 | [ |
|
0 commit comments