Skip to content

Commit bfe38ac

Browse files
committed
StreamInterface: prevent socket/reader-thread leak on handshake failure in __init__
If connect() or waitForConfig() raises during __init__ (handshake timeout, bad stream, config error), the reader thread started by connect() keeps running and the underlying stream/socket stays open — but the caller never receives a reference to the half-initialized instance, so they cannot call close() themselves. The leak compounds on every retry from a caller's reconnect loop. Fix: wrap connect() + waitForConfig() in try/except; call self.close() on any exception before re-raising. Also guard close() against RuntimeError from joining an unstarted reader thread (happens when close() runs from a failed __init__ before connect() could spawn it). Discovered while debugging a real-world Meshtastic firmware crash where a passive logger's retrying TCPInterface() calls against a node with 250-entry NodeDB produced a reconnect storm — every retry triggered a full config+NodeDB dump on the node, compounding heap pressure, which then exposed null-deref bugs in Router::perhapsDecode / MeshService (firmware side fixed in meshtastic/firmware#10226 and #10229). The client-side leak is independent of those firmware bugs and worth fixing on its own.
1 parent cec79a7 commit bfe38ac

2 files changed

Lines changed: 114 additions & 4 deletions

File tree

meshtastic/stream_interface.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Stream Interface base class
22
"""
3+
import contextlib
34
import io
45
import logging
56
import threading
@@ -61,9 +62,17 @@ def __init__( # pylint: disable=R0917
6162

6263
# Start the reader thread after superclass constructor completes init
6364
if connectNow:
64-
self.connect()
65-
if not noProto:
66-
self.waitForConfig()
65+
try:
66+
self.connect()
67+
if not noProto:
68+
self.waitForConfig()
69+
except Exception:
70+
# If the handshake raises, the caller never receives a reference
71+
# to this instance and cannot call close() themselves. Clean up
72+
# the reader thread + stream here so retries don't leak.
73+
with contextlib.suppress(Exception):
74+
self.close()
75+
raise
6776

6877
def connect(self) -> None:
6978
"""Connect to our radio
@@ -136,7 +145,13 @@ def close(self) -> None:
136145
# reader thread to close things for us
137146
self._wantExit = True
138147
if self._rxThread != threading.current_thread():
139-
self._rxThread.join() # wait for it to exit
148+
try:
149+
self._rxThread.join() # wait for it to exit
150+
except RuntimeError:
151+
# Thread was never started — happens when close() is invoked
152+
# from a failed __init__ before connect() could spawn it.
153+
# Nothing to join; safe to ignore.
154+
pass
140155

141156
def _handleLogByte(self, b):
142157
"""Handle a byte that is part of a log message from the device."""

meshtastic/tests/test_stream_interface.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,101 @@ def test_StreamInterface():
1818
assert pytest_wrapped_e.type == Exception
1919

2020

21+
@pytest.mark.unit
22+
@pytest.mark.usefixtures("reset_mt_config")
23+
def test_StreamInterface_close_safe_when_thread_never_started():
24+
"""close() must not raise RuntimeError when called before connect() has started the reader.
25+
26+
Hits the cleanup path used by __init__ when the handshake raises before the
27+
reader thread is started.
28+
"""
29+
iface = StreamInterface(noProto=True, connectNow=False)
30+
iface.stream = MagicMock()
31+
# _rxThread was created in __init__ but never .start()'d. close() should
32+
# detect that and skip join() instead of raising RuntimeError.
33+
iface.close()
34+
35+
36+
@pytest.mark.unit
37+
@pytest.mark.usefixtures("reset_mt_config")
38+
def test_StreamInterface_init_cleans_up_when_connect_raises():
39+
"""If connect() raises during __init__, close() runs and the original exception propagates."""
40+
41+
cleanup_calls = []
42+
43+
class FailingConnectStream(StreamInterface):
44+
"""Subclass whose connect() raises, to exercise the __init__ cleanup path."""
45+
46+
def __init__(self):
47+
self.stream = MagicMock() # bypass StreamInterface abstract check
48+
super().__init__(noProto=False, connectNow=True)
49+
50+
def connect(self):
51+
raise RuntimeError("simulated handshake failure")
52+
53+
def close(self):
54+
cleanup_calls.append("close")
55+
super().close()
56+
57+
with pytest.raises(RuntimeError, match="simulated handshake failure"):
58+
FailingConnectStream()
59+
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake failure"
60+
61+
62+
@pytest.mark.unit
63+
@pytest.mark.usefixtures("reset_mt_config")
64+
def test_StreamInterface_init_cleans_up_when_waitForConfig_raises():
65+
"""If waitForConfig() raises after a successful connect(), close() runs and exception propagates."""
66+
67+
cleanup_calls = []
68+
69+
class FailingWaitStream(StreamInterface):
70+
"""Subclass whose waitForConfig() raises, to exercise the second leg of cleanup."""
71+
72+
def __init__(self):
73+
self.stream = MagicMock()
74+
super().__init__(noProto=False, connectNow=True)
75+
76+
def connect(self):
77+
# No-op connect — we are simulating handshake-stage failure, not connect-stage.
78+
pass
79+
80+
def waitForConfig(self):
81+
raise TimeoutError("simulated config-handshake timeout")
82+
83+
def close(self):
84+
cleanup_calls.append("close")
85+
super().close()
86+
87+
with pytest.raises(TimeoutError, match="simulated config-handshake timeout"):
88+
FailingWaitStream()
89+
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake timeout"
90+
91+
92+
@pytest.mark.unit
93+
@pytest.mark.usefixtures("reset_mt_config")
94+
def test_StreamInterface_init_cleanup_does_not_shadow_original_exception():
95+
"""If close() itself raises during __init__ cleanup, the original exception still propagates.
96+
97+
The cleanup uses contextlib.suppress(Exception) so that a secondary failure
98+
in close() doesn't replace the real reason for the failed handshake.
99+
"""
100+
101+
class CleanupRaisesStream(StreamInterface):
102+
def __init__(self):
103+
self.stream = MagicMock()
104+
super().__init__(noProto=False, connectNow=True)
105+
106+
def connect(self):
107+
raise RuntimeError("original handshake failure")
108+
109+
def close(self):
110+
raise RuntimeError("secondary close failure — should be suppressed")
111+
112+
with pytest.raises(RuntimeError, match="original handshake failure"):
113+
CleanupRaisesStream()
114+
115+
21116
# Note: This takes a bit, so moving from unit to slow
22117
@pytest.mark.unitslow
23118
@pytest.mark.usefixtures("reset_mt_config")

0 commit comments

Comments
 (0)