diff --git a/cuegui/cuegui/FrameMonitor.py b/cuegui/cuegui/FrameMonitor.py index f87da3e9e..5aacd93ef 100644 --- a/cuegui/cuegui/FrameMonitor.py +++ b/cuegui/cuegui/FrameMonitor.py @@ -30,6 +30,7 @@ import grpc import FileSequence +from opencue.cuebot import Cuebot from opencue_proto import job_pb2 import cuegui.FrameMonitorTree @@ -165,6 +166,9 @@ def _frameRangeSelectionFilterUpdate(self): grpc.StatusCode.UNAVAILABLE]: log.warning( "gRPC connection interrupted while updating frame range filter, will retry") + # Record failed call and potentially reset the channel + if Cuebot.recordFailedCall(): + log.info("Channel was reset due to connection issues") else: log.error("gRPC error in _frameRangeSelectionFilterUpdate: %s", e) # pylint: enable=no-member @@ -377,6 +381,9 @@ def _filterLayersUpdate(self): grpc.StatusCode.UNAVAILABLE]: log.warning( "gRPC connection interrupted while updating layer filter, will retry") + # Record failed call and potentially reset the channel + if Cuebot.recordFailedCall(): + log.info("Channel was reset due to connection issues") else: log.error("gRPC error in _filterLayersUpdate: %s", e) # pylint: enable=no-member diff --git a/cuegui/cuegui/FrameMonitorTree.py b/cuegui/cuegui/FrameMonitorTree.py index 11e1d1849..4b33272cb 100644 --- a/cuegui/cuegui/FrameMonitorTree.py +++ b/cuegui/cuegui/FrameMonitorTree.py @@ -36,6 +36,7 @@ import grpc import opencue +from opencue.cuebot import Cuebot from opencue_proto import job_pb2 import cuegui.AbstractTreeWidget @@ -483,7 +484,10 @@ def _getUpdate(self): try: if self.__job: self.__lastUpdateTime = int(time.time()) - return self.__job.getFrames(**self.frameSearch.options) + result = self.__job.getFrames(**self.frameSearch.options) + # Record successful call for connection health tracking + Cuebot.recordSuccessfulCall() + return result return [] except grpc.RpcError as e: # Handle gRPC errors - log but don't crash, allow UI to retry @@ -491,6 +495,9 @@ def _getUpdate(self): if hasattr(e, 'code') and e.code() in [grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE]: logger.warning("gRPC connection interrupted during frame update, will retry") + # Record failed call and potentially reset the channel + if Cuebot.recordFailedCall(): + logger.info("Channel reset due to connection issues, retrying") else: logger.error("gRPC error in _getUpdate: %s", e) # pylint: enable=no-member @@ -515,6 +522,8 @@ def _getUpdateChanged(self): self.__lastUpdateTime = updated_data.server_time self.__jobState = updated_data.state updatedFrames = updated_data.updated_frames.updated_frames + # Record successful call for connection health tracking + Cuebot.recordSuccessfulCall() except grpc.RpcError as e: # Handle gRPC errors - allow UI to continue and retry @@ -522,6 +531,9 @@ def _getUpdateChanged(self): if hasattr(e, 'code'): if e.code() in [grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE]: logger.warning("gRPC connection interrupted during frame update, will retry") + # Record failed call and potentially reset the channel + if Cuebot.recordFailedCall(): + logger.info("Channel reset due to connection issues, retrying") # Return None to trigger a full update on next cycle return None if e.code() == grpc.StatusCode.NOT_FOUND: diff --git a/cuegui/cuegui/ThreadPool.py b/cuegui/cuegui/ThreadPool.py index 79f000e07..243c63e23 100644 --- a/cuegui/cuegui/ThreadPool.py +++ b/cuegui/cuegui/ThreadPool.py @@ -52,6 +52,7 @@ def someWorkCallback(work, result): from qtpy import QtCore import grpc +from opencue.cuebot import Cuebot import cuegui.Logger @@ -220,7 +221,12 @@ def run(self): if hasattr(e, 'code') and e.code() in [grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE]: logger.warning("gRPC connection issue for '%s': %s - " - "UI will retry on next update", work[2], e.details()) + "UI will retry on next update", work[2], + e.details() if hasattr(e, 'details') else str(e)) + # Record failed call and potentially reset the channel + if Cuebot.recordFailedCall(): + logger.info("Channel was reset due to connection issues, " + "subsequent operations should recover") else: logger.error("gRPC error processing work for '%s': %s", work[2], e) # pylint: enable=no-member diff --git a/pycue/opencue/cuebot.py b/pycue/opencue/cuebot.py index 4de529e6f..438fa28d0 100644 --- a/pycue/opencue/cuebot.py +++ b/pycue/opencue/cuebot.py @@ -73,6 +73,14 @@ DEFAULT_MAX_MESSAGE_BYTES = 1024 ** 2 * 10 DEFAULT_GRPC_PORT = 8443 +# gRPC keepalive settings to prevent "Connection reset by peer" errors +# These settings help maintain long-lived connections through load balancers and firewalls +DEFAULT_KEEPALIVE_TIME_MS = 30000 # Send keepalive ping every 30 seconds +DEFAULT_KEEPALIVE_TIMEOUT_MS = 10000 # Wait 10 seconds for keepalive response +DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS = True # Send keepalive even when no active RPCs +DEFAULT_MAX_CONNECTION_IDLE_MS = 0 # Disable max idle time (keep connection open) +DEFAULT_MAX_CONNECTION_AGE_MS = 0 # Disable max connection age + if platform.system() != 'Darwin': # Avoid spamming users with epoll fork warning messages os.environ["GRPC_POLL_STRATEGY"] = "epoll1" @@ -92,6 +100,12 @@ class Cuebot(object): Config = opencue.config.load_config_from_file() Timeout = Config.get('cuebot.timeout', 10000) + # Connection health tracking + _lastSuccessfulCall = 0 + _consecutiveFailures = 0 + _maxConsecutiveFailures = 3 # Reset channel after this many failures + _channelResetInProgress = False + PROTO_MAP = { 'action': filter_pb2, 'allocation': facility_pb2, @@ -199,10 +213,32 @@ def setChannel(): # pylint: enable=logging-not-lazy # TODO(bcipriano) Configure gRPC TLS. (Issue #150) try: + # Configure keepalive settings to prevent "Connection reset by peer" errors + # These are essential for long-lived connections through load balancers + keepalive_time_ms = Cuebot.Config.get( + 'cuebot.keepalive_time_ms', DEFAULT_KEEPALIVE_TIME_MS) + keepalive_timeout_ms = Cuebot.Config.get( + 'cuebot.keepalive_timeout_ms', DEFAULT_KEEPALIVE_TIMEOUT_MS) + keepalive_permit_without_calls = Cuebot.Config.get( + 'cuebot.keepalive_permit_without_calls', DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS) + + channel_options = [ + ('grpc.max_send_message_length', maxMessageBytes), + ('grpc.max_receive_message_length', maxMessageBytes), + # Keepalive settings to maintain connection health + ('grpc.keepalive_time_ms', keepalive_time_ms), + ('grpc.keepalive_timeout_ms', keepalive_timeout_ms), + ('grpc.keepalive_permit_without_calls', keepalive_permit_without_calls), + # Allow client to send keepalive pings even without data + ('grpc.http2.max_pings_without_data', 0), + # Minimum time between pings (allows more frequent pings) + ('grpc.http2.min_time_between_pings_ms', 10000), + # Don't limit ping strikes (server may reject too many pings) + ('grpc.http2.min_ping_interval_without_data_ms', 5000), + ] + Cuebot.RpcChannel = grpc.intercept_channel( - grpc.insecure_channel(connectStr, options=[ - ('grpc.max_send_message_length', maxMessageBytes), - ('grpc.max_receive_message_length', maxMessageBytes)]), + grpc.insecure_channel(connectStr, options=channel_options), *interceptors) # Test the connection Cuebot.getStub('cue').GetSystemStats( @@ -302,6 +338,62 @@ def getConfig(): """Gets the Cuebot config object, originally read in from the config file on disk.""" return Cuebot.Config + @staticmethod + def recordSuccessfulCall(): + """Record a successful gRPC call to track connection health.""" + Cuebot._lastSuccessfulCall = time.time() + Cuebot._consecutiveFailures = 0 + + @staticmethod + def recordFailedCall(): + """Record a failed gRPC call and trigger channel reset if needed. + + Returns True if the channel was reset and the caller should retry.""" + Cuebot._consecutiveFailures += 1 + + if Cuebot._consecutiveFailures >= Cuebot._maxConsecutiveFailures: + if not Cuebot._channelResetInProgress: + Cuebot._channelResetInProgress = True + try: + logger.warning( + "Connection appears unhealthy after %d consecutive failures, " + "resetting gRPC channel...", Cuebot._consecutiveFailures) + Cuebot.resetChannel() + Cuebot._consecutiveFailures = 0 + return True + except Exception as e: + logger.error("Failed to reset gRPC channel: %s", e) + finally: + Cuebot._channelResetInProgress = False + return False + + @staticmethod + def checkChannelHealth(): + """Check if the gRPC channel is healthy by making a simple call. + + Returns True if healthy, False otherwise.""" + if Cuebot.RpcChannel is None: + return False + + try: + Cuebot.getStub('cue').GetSystemStats( + cue_pb2.CueGetSystemStatsRequest(), timeout=5000) + Cuebot.recordSuccessfulCall() + return True + except grpc.RpcError as e: + # pylint: disable=no-member + if hasattr(e, 'code') and e.code() == grpc.StatusCode.UNAVAILABLE: + details = e.details() if hasattr(e, 'details') else str(e) + logger.warning("Channel health check failed: %s", details) + Cuebot.recordFailedCall() + return False + # pylint: enable=no-member + # Other errors might be OK (e.g., permission issues) + return True + except Exception as e: + logger.warning("Channel health check failed with unexpected error: %s", e) + return False + # Python 2/3 compatible implementation of ABC ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) diff --git a/pycue/tests/test_cuebot.py b/pycue/tests/test_cuebot.py index 9c210e943..0dc6c4d83 100644 --- a/pycue/tests/test_cuebot.py +++ b/pycue/tests/test_cuebot.py @@ -17,10 +17,14 @@ """Tests for `opencue.cuebot`.""" import os +import time import unittest import mock +import grpc + import opencue +from opencue import cuebot TESTING_CONFIG = { @@ -72,5 +76,125 @@ def test__should_ignore_unknown_facility(self): self.assertEqual(['fake-cuebot-01'], self.cuebot.Hosts) +class ConnectionHealthTests(unittest.TestCase): + """Tests for gRPC connection health tracking and recovery.""" + + def setUp(self): + """Reset connection health state before each test.""" + opencue.Cuebot._consecutiveFailures = 0 + opencue.Cuebot._lastSuccessfulCall = 0 + opencue.Cuebot._channelResetInProgress = False + + def tearDown(self): + """Reset connection health state after each test.""" + opencue.Cuebot._consecutiveFailures = 0 + opencue.Cuebot._lastSuccessfulCall = 0 + opencue.Cuebot._channelResetInProgress = False + + def test__keepalive_constants_defined(self): + """Test that keepalive constants are defined with expected values.""" + self.assertEqual(cuebot.DEFAULT_KEEPALIVE_TIME_MS, 30000) + self.assertEqual(cuebot.DEFAULT_KEEPALIVE_TIMEOUT_MS, 10000) + self.assertTrue(cuebot.DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS) + + def test__record_successful_call_updates_timestamp(self): + """Test that recordSuccessfulCall updates the last successful call timestamp.""" + before = time.time() + opencue.Cuebot.recordSuccessfulCall() + after = time.time() + + self.assertGreaterEqual(opencue.Cuebot._lastSuccessfulCall, before) + self.assertLessEqual(opencue.Cuebot._lastSuccessfulCall, after) + + def test__record_successful_call_resets_failure_counter(self): + """Test that recordSuccessfulCall resets the consecutive failure counter.""" + opencue.Cuebot._consecutiveFailures = 2 + + opencue.Cuebot.recordSuccessfulCall() + + self.assertEqual(opencue.Cuebot._consecutiveFailures, 0) + + def test__record_failed_call_increments_counter(self): + """Test that recordFailedCall increments the consecutive failure counter.""" + self.assertEqual(opencue.Cuebot._consecutiveFailures, 0) + + opencue.Cuebot.recordFailedCall() + self.assertEqual(opencue.Cuebot._consecutiveFailures, 1) + + opencue.Cuebot.recordFailedCall() + self.assertEqual(opencue.Cuebot._consecutiveFailures, 2) + + @mock.patch.object(opencue.Cuebot, 'resetChannel') + def test__record_failed_call_resets_channel_after_max_failures(self, mock_reset): + """Test that recordFailedCall triggers channel reset after max consecutive failures.""" + # Simulate failures up to the threshold + for i in range(opencue.Cuebot._maxConsecutiveFailures - 1): + result = opencue.Cuebot.recordFailedCall() + self.assertFalse(result) + mock_reset.assert_not_called() + + # The next failure should trigger a reset + result = opencue.Cuebot.recordFailedCall() + self.assertTrue(result) + mock_reset.assert_called_once() + + # Counter should be reset after channel reset + self.assertEqual(opencue.Cuebot._consecutiveFailures, 0) + + @mock.patch.object(opencue.Cuebot, 'resetChannel') + def test__record_failed_call_does_not_reset_when_already_in_progress(self, mock_reset): + """Test that recordFailedCall doesn't trigger reset if one is already in progress.""" + opencue.Cuebot._channelResetInProgress = True + opencue.Cuebot._consecutiveFailures = opencue.Cuebot._maxConsecutiveFailures + + result = opencue.Cuebot.recordFailedCall() + + self.assertFalse(result) + mock_reset.assert_not_called() + + @mock.patch.object(opencue.Cuebot, 'getStub') + def test__check_channel_health_returns_true_on_success(self, mock_get_stub): + """Test that checkChannelHealth returns True when the health check succeeds.""" + mock_stub = mock.Mock() + mock_get_stub.return_value = mock_stub + opencue.Cuebot.RpcChannel = mock.Mock() + + result = opencue.Cuebot.checkChannelHealth() + + self.assertTrue(result) + mock_get_stub.assert_called_with('cue') + self.assertEqual(opencue.Cuebot._consecutiveFailures, 0) + + @mock.patch.object(opencue.Cuebot, 'getStub') + @mock.patch.object(opencue.Cuebot, 'recordFailedCall') + def test__check_channel_health_returns_false_on_unavailable( + self, mock_record_failed, mock_get_stub): + """Test that checkChannelHealth returns False on UNAVAILABLE error.""" + mock_stub = mock.Mock() + error = grpc.RpcError() + error.code = mock.Mock(return_value=grpc.StatusCode.UNAVAILABLE) + error.details = mock.Mock(return_value="Connection refused") + mock_stub.GetSystemStats.side_effect = error + mock_get_stub.return_value = mock_stub + opencue.Cuebot.RpcChannel = mock.Mock() + + result = opencue.Cuebot.checkChannelHealth() + + self.assertFalse(result) + mock_record_failed.assert_called_once() + + def test__check_channel_health_returns_false_when_no_channel(self): + """Test that checkChannelHealth returns False when RpcChannel is None.""" + opencue.Cuebot.RpcChannel = None + + result = opencue.Cuebot.checkChannelHealth() + + self.assertFalse(result) + + def test__max_consecutive_failures_default(self): + """Test that the default max consecutive failures is set correctly.""" + self.assertEqual(opencue.Cuebot._maxConsecutiveFailures, 3) + + if __name__ == '__main__': unittest.main()