Conversation
…er upon termination
There was a problem hiding this comment.
Pull request overview
Refactors the NATS publishing path to provide a cleaner async↔sync bridge (via a revamped CoroutineExecutor), adds stricter failure semantics and health reporting to NATSPublisher, and updates docs/examples/tests to match the new connection/publish lifecycle.
Changes:
- Reworked
CoroutineExecutorto run coroutines on a dedicated event loop thread and sync callables in a thread pool, with shutdown support. - Updated
NATSPublisherto remove explicit stream selection, add strict publish mode, publish tracking/flush, graceful close/drain, and ahealthsnapshot API. - Expanded test coverage and updated README/example usage; added
nkeysdev dependency and updated lockfile.
Reviewed changes
Copilot reviewed 6 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/bluesky_nats/nats_publisher.py |
Major refactor: new executor behavior, publisher strict mode/health/close+flush, connection management changes. |
tests/test_nats_publisher.py |
Adds coverage for retry behavior, fail-fast ensure_connection, strict publish latching, close/drain, shutdown callback, and health snapshot. |
tests/test_executor.py |
Adds test ensuring executor rejects submissions after shutdown. |
README.md |
Updates usage guidance for new executor pattern, strict mode, health API, and atexit cleanup. |
examples/publisher.py |
Updates runnable example to use new executor, strict mode, ensure_connection gating, atexit shutdown, and improved logging. |
pyproject.toml |
Adds nkeys to dev dependency group. |
uv.lock |
Lockfile updates for nkeys and its transitive dependencies. |
.gitignore |
Ignores examples/publisher_jwt.py. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/bluesky_nats/nats_publisher.py
Outdated
| if wait and self._io_loop_thread.is_alive() and threading.current_thread() is not self._io_loop_thread: | ||
| self._io_loop_thread.join() | ||
|
|
||
| if not self._io_loop.is_closed(): |
There was a problem hiding this comment.
CoroutineExecutor.shutdown() can attempt to close _io_loop without waiting for the loop thread to stop when shutdown() is called from within _io_loop_thread (the join() is skipped). In that case _io_loop.close() may raise RuntimeError: Cannot close a running event loop. Consider handling the “called from io loop thread” case explicitly (e.g., stop the loop and return without closing, or schedule a finalizer on the loop thread that stops then closes the loop), and/or guard close() with if not self._io_loop.is_running().
| if not self._io_loop.is_closed(): | |
| if not self._io_loop.is_closed() and not self._io_loop.is_running(): |
| def ensure_connection(self, timeout: float = NATS_TIMEOUT) -> bool: | ||
| self._start_connect_if_needed() | ||
| if self._connect_future is None: | ||
| return self.nats_client.is_connected and self.js is not None | ||
|
|
||
| try: | ||
| running_loop = asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| running_loop = None | ||
|
|
||
| if running_loop is not None and not self._connect_future.done(): | ||
| logger.error( | ||
| "NATS ensure_connection called from a running event loop with pending connect future; " | ||
| "failing fast to avoid blocking the loop" | ||
| ) | ||
| return False | ||
|
|
||
| try: | ||
| self._connect_future.result(timeout=timeout) | ||
| except FutureTimeoutError: | ||
| logger.warning(f"NATS connect did not finish within {timeout}s") | ||
| return False | ||
| except Exception as e: # noqa: BLE001 | ||
| logger.debug(f"NATS connect future returned error: {e!s}") | ||
| return False | ||
| return self.nats_client.is_connected and self.js is not None |
There was a problem hiding this comment.
ensure_connection() returns False when the connect future fails, but it leaves _connect_future set to the failed future. Because _start_connect_if_needed() bails out when _connect_future is not None, subsequent ensure_connection() calls cannot retry connecting. To allow retries (consistent with _ensure_connected()), clear _connect_future (and js) under _connect_lock when .result() raises, or do the clearing in _on_connect_done() for failed futures.
| def flush_publishes(self, timeout: float = NATS_TIMEOUT) -> bool: | ||
| deadline = time.monotonic() + timeout | ||
| while True: | ||
| with self._publish_lock: | ||
| pending_futures = list(self._publish_futures) | ||
| if not pending_futures: | ||
| logger.debug("NATS flush complete: no pending publish futures") | ||
| return True | ||
| remaining = deadline - time.monotonic() | ||
| if remaining <= 0: | ||
| logger.warning(f"NATS flush timed out with pending={len(pending_futures)}") | ||
| return False | ||
| pending_futures[0].result(timeout=remaining) | ||
|
|
There was a problem hiding this comment.
flush_publishes() calls .result() on pending publish futures but does not handle exceptions from failed publishes. A single publish failure will raise out of flush_publishes() (and therefore out of close()), which is surprising for an API that returns a boolean success flag. Consider catching Exception/CancelledError around .result(), recording the error via _record_strict_error(), and continuing to drain the remaining futures so close() can reliably return False instead of raising.
src/bluesky_nats/nats_publisher.py
Outdated
| def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None: | ||
| self.loop = loop |
There was a problem hiding this comment.
CoroutineExecutor.__init__ accepts a loop parameter but the new implementation never uses it (all coroutines are scheduled onto a private _io_loop). This is misleading for callers and risks silently changing behavior for existing code that passes RE.loop. Consider either removing the parameter (breaking change but explicit), or honoring it by scheduling onto the provided loop when given (and only creating a private loop/thread when loop is None).
…e-effect' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
streamremoved. The publisher will bind to a generic stream context and raise exceptions when the server refuses to publish due to Authzatexit