Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cuegui/cuegui/FrameMonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import grpc

import FileSequence
from opencue.cuebot import Cuebot
from opencue_proto import job_pb2

import cuegui.FrameMonitorTree
Expand Down Expand Up @@ -165,6 +166,9 @@ def _frameRangeSelectionFilterUpdate(self):
grpc.StatusCode.UNAVAILABLE]:
log.warning(
"gRPC connection interrupted while updating frame range filter, will retry")
# Record failed call and potentially reset the channel
if Cuebot.recordFailedCall():
log.info("Channel was reset due to connection issues")
else:
log.error("gRPC error in _frameRangeSelectionFilterUpdate: %s", e)
# pylint: enable=no-member
Expand Down Expand Up @@ -377,6 +381,9 @@ def _filterLayersUpdate(self):
grpc.StatusCode.UNAVAILABLE]:
log.warning(
"gRPC connection interrupted while updating layer filter, will retry")
# Record failed call and potentially reset the channel
if Cuebot.recordFailedCall():
log.info("Channel was reset due to connection issues")
else:
log.error("gRPC error in _filterLayersUpdate: %s", e)
# pylint: enable=no-member
Expand Down
14 changes: 13 additions & 1 deletion cuegui/cuegui/FrameMonitorTree.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import grpc

import opencue
from opencue.cuebot import Cuebot
from opencue_proto import job_pb2

import cuegui.AbstractTreeWidget
Expand Down Expand Up @@ -483,14 +484,20 @@ def _getUpdate(self):
try:
if self.__job:
self.__lastUpdateTime = int(time.time())
return self.__job.getFrames(**self.frameSearch.options)
result = self.__job.getFrames(**self.frameSearch.options)
# Record successful call for connection health tracking
Cuebot.recordSuccessfulCall()
return result
return []
except grpc.RpcError as e:
# Handle gRPC errors - log but don't crash, allow UI to retry
# pylint: disable=no-member
if hasattr(e, 'code') and e.code() in [grpc.StatusCode.CANCELLED,
grpc.StatusCode.UNAVAILABLE]:
logger.warning("gRPC connection interrupted during frame update, will retry")
# Record failed call and potentially reset the channel
if Cuebot.recordFailedCall():
logger.info("Channel reset due to connection issues, retrying")
else:
logger.error("gRPC error in _getUpdate: %s", e)
# pylint: enable=no-member
Expand All @@ -515,13 +522,18 @@ def _getUpdateChanged(self):
self.__lastUpdateTime = updated_data.server_time
self.__jobState = updated_data.state
updatedFrames = updated_data.updated_frames.updated_frames
# Record successful call for connection health tracking
Cuebot.recordSuccessfulCall()
Comment on lines +525 to +526
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a static method call to act as a channel health-check is error prune and not recommended. What is the rationale behind having this on frameMonitorTree and not LayerMonitorTree for example? It looks like a patch for a symptom and not the facing the actual illness.

If you want to implement a logic to keep track of successful calls, please use the class RetryOnRpcErrorClientInterceptor which intercepts every call to the grpc channel.


except grpc.RpcError as e:
# Handle gRPC errors - allow UI to continue and retry
# pylint: disable=no-member
if hasattr(e, 'code'):
if e.code() in [grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE]:
logger.warning("gRPC connection interrupted during frame update, will retry")
# Record failed call and potentially reset the channel
if Cuebot.recordFailedCall():
logger.info("Channel reset due to connection issues, retrying")
Comment on lines +534 to +536
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this check to RetryOnRpcErrorClientInterceptor as explained above.

# Return None to trigger a full update on next cycle
return None
if e.code() == grpc.StatusCode.NOT_FOUND:
Expand Down
8 changes: 7 additions & 1 deletion cuegui/cuegui/ThreadPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def someWorkCallback(work, result):
from qtpy import QtCore
import grpc

from opencue.cuebot import Cuebot
import cuegui.Logger


Expand Down Expand Up @@ -220,7 +221,12 @@ def run(self):
if hasattr(e, 'code') and e.code() in [grpc.StatusCode.CANCELLED,
grpc.StatusCode.UNAVAILABLE]:
logger.warning("gRPC connection issue for '%s': %s - "
"UI will retry on next update", work[2], e.details())
"UI will retry on next update", work[2],
e.details() if hasattr(e, 'details') else str(e))
# Record failed call and potentially reset the channel
if Cuebot.recordFailedCall():
logger.info("Channel was reset due to connection issues, "
"subsequent operations should recover")
else:
logger.error("gRPC error processing work for '%s': %s", work[2], e)
# pylint: enable=no-member
Expand Down
98 changes: 95 additions & 3 deletions pycue/opencue/cuebot.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@
DEFAULT_MAX_MESSAGE_BYTES = 1024 ** 2 * 10
DEFAULT_GRPC_PORT = 8443

# gRPC keepalive settings to prevent "Connection reset by peer" errors
# These settings help maintain long-lived connections through load balancers and firewalls
DEFAULT_KEEPALIVE_TIME_MS = 30000 # Send keepalive ping every 30 seconds
DEFAULT_KEEPALIVE_TIMEOUT_MS = 10000 # Wait 10 seconds for keepalive response
DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS = True # Send keepalive even when no active RPCs
DEFAULT_MAX_CONNECTION_IDLE_MS = 0 # Disable max idle time (keep connection open)
DEFAULT_MAX_CONNECTION_AGE_MS = 0 # Disable max connection age
Comment on lines +78 to +82
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all these constants are being used when creating the channel. Please sanitize.


if platform.system() != 'Darwin':
# Avoid spamming users with epoll fork warning messages
os.environ["GRPC_POLL_STRATEGY"] = "epoll1"
Expand All @@ -92,6 +100,12 @@ class Cuebot(object):
Config = opencue.config.load_config_from_file()
Timeout = Config.get('cuebot.timeout', 10000)

# Connection health tracking
_lastSuccessfulCall = 0
_consecutiveFailures = 0
_maxConsecutiveFailures = 3 # Reset channel after this many failures
_channelResetInProgress = False

PROTO_MAP = {
'action': filter_pb2,
'allocation': facility_pb2,
Expand Down Expand Up @@ -199,10 +213,32 @@ def setChannel():
# pylint: enable=logging-not-lazy
# TODO(bcipriano) Configure gRPC TLS. (Issue #150)
try:
# Configure keepalive settings to prevent "Connection reset by peer" errors
# These are essential for long-lived connections through load balancers
keepalive_time_ms = Cuebot.Config.get(
'cuebot.keepalive_time_ms', DEFAULT_KEEPALIVE_TIME_MS)
keepalive_timeout_ms = Cuebot.Config.get(
'cuebot.keepalive_timeout_ms', DEFAULT_KEEPALIVE_TIMEOUT_MS)
keepalive_permit_without_calls = Cuebot.Config.get(
'cuebot.keepalive_permit_without_calls', DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS)
Comment on lines +222 to +223
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config has to be set on both server and client to have effect. As this is not configured on the server, it has no effect.


channel_options = [
('grpc.max_send_message_length', maxMessageBytes),
('grpc.max_receive_message_length', maxMessageBytes),
# Keepalive settings to maintain connection health
('grpc.keepalive_time_ms', keepalive_time_ms),
('grpc.keepalive_timeout_ms', keepalive_timeout_ms),
('grpc.keepalive_permit_without_calls', keepalive_permit_without_calls),
# Allow client to send keepalive pings even without data
('grpc.http2.max_pings_without_data', 0),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recommend removing empty ping limitations. If you open a python shell and import pycue for example, a connection will be created, if you don't sent any data, the channel will live forever until the shell is closed. This has the potential to overwhelm the server with too many opened empty channels.

The default value is 2, maybe we can increase it to 10. But given how noisy cuegui's communication with Cuebot is, I doubt there's a period of inactivity where pings are being sent without payload.

# Minimum time between pings (allows more frequent pings)
('grpc.http2.min_time_between_pings_ms', 10000),
# Don't limit ping strikes (server may reject too many pings)
('grpc.http2.min_ping_interval_without_data_ms', 5000),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mainly a server side configuration, and it needs to be set in both server and client. Reducing the empty-ping interval might have the opposite effect of what you expect.

Read grpc.config

Why am I receiving a GOAWAY with error code ENHANCE_YOUR_CALM?
A server sends a GOAWAY with ENHANCE_YOUR_CALM if the client sends too many misbehaving pings as described in A8-client-side-keepalive.md. Some scenarios where this can happen are

  • if a server has GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS set to false while the client has set this to true resulting in keepalive pings being sent even when there is no call in flight.
  • if the client's GRPC_ARG_KEEPALIVE_TIME_MS setting is lower than the server's GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS.

]

Cuebot.RpcChannel = grpc.intercept_channel(
grpc.insecure_channel(connectStr, options=[
('grpc.max_send_message_length', maxMessageBytes),
('grpc.max_receive_message_length', maxMessageBytes)]),
grpc.insecure_channel(connectStr, options=channel_options),
*interceptors)
# Test the connection
Cuebot.getStub('cue').GetSystemStats(
Expand Down Expand Up @@ -302,6 +338,62 @@ def getConfig():
"""Gets the Cuebot config object, originally read in from the config file on disk."""
return Cuebot.Config

@staticmethod
def recordSuccessfulCall():
"""Record a successful gRPC call to track connection health."""
Cuebot._lastSuccessfulCall = time.time()
Cuebot._consecutiveFailures = 0

@staticmethod
def recordFailedCall():
"""Record a failed gRPC call and trigger channel reset if needed.

Returns True if the channel was reset and the caller should retry."""
Cuebot._consecutiveFailures += 1

if Cuebot._consecutiveFailures >= Cuebot._maxConsecutiveFailures:
if not Cuebot._channelResetInProgress:
Cuebot._channelResetInProgress = True
try:
logger.warning(
"Connection appears unhealthy after %d consecutive failures, "
"resetting gRPC channel...", Cuebot._consecutiveFailures)
Cuebot.resetChannel()
Cuebot._consecutiveFailures = 0
return True
except Exception as e:
logger.error("Failed to reset gRPC channel: %s", e)
finally:
Cuebot._channelResetInProgress = False
return False

@staticmethod
def checkChannelHealth():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find where this method is being called outside of a unit test context. Am I missing something?

"""Check if the gRPC channel is healthy by making a simple call.

Returns True if healthy, False otherwise."""
if Cuebot.RpcChannel is None:
return False

try:
Cuebot.getStub('cue').GetSystemStats(
cue_pb2.CueGetSystemStatsRequest(), timeout=5000)
Cuebot.recordSuccessfulCall()
return True
except grpc.RpcError as e:
# pylint: disable=no-member
if hasattr(e, 'code') and e.code() == grpc.StatusCode.UNAVAILABLE:
details = e.details() if hasattr(e, 'details') else str(e)
logger.warning("Channel health check failed: %s", details)
Cuebot.recordFailedCall()
return False
# pylint: enable=no-member
# Other errors might be OK (e.g., permission issues)
return True
except Exception as e:
logger.warning("Channel health check failed with unexpected error: %s", e)
return False


# Python 2/3 compatible implementation of ABC
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
Expand Down
124 changes: 124 additions & 0 deletions pycue/tests/test_cuebot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
"""Tests for `opencue.cuebot`."""

import os
import time
import unittest
import mock

import grpc

import opencue
from opencue import cuebot


TESTING_CONFIG = {
Expand Down Expand Up @@ -72,5 +76,125 @@ def test__should_ignore_unknown_facility(self):
self.assertEqual(['fake-cuebot-01'], self.cuebot.Hosts)


class ConnectionHealthTests(unittest.TestCase):
"""Tests for gRPC connection health tracking and recovery."""

def setUp(self):
"""Reset connection health state before each test."""
opencue.Cuebot._consecutiveFailures = 0
opencue.Cuebot._lastSuccessfulCall = 0
opencue.Cuebot._channelResetInProgress = False

def tearDown(self):
"""Reset connection health state after each test."""
opencue.Cuebot._consecutiveFailures = 0
opencue.Cuebot._lastSuccessfulCall = 0
opencue.Cuebot._channelResetInProgress = False

def test__keepalive_constants_defined(self):
"""Test that keepalive constants are defined with expected values."""
self.assertEqual(cuebot.DEFAULT_KEEPALIVE_TIME_MS, 30000)
self.assertEqual(cuebot.DEFAULT_KEEPALIVE_TIMEOUT_MS, 10000)
self.assertTrue(cuebot.DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS)

def test__record_successful_call_updates_timestamp(self):
"""Test that recordSuccessfulCall updates the last successful call timestamp."""
before = time.time()
opencue.Cuebot.recordSuccessfulCall()
after = time.time()

self.assertGreaterEqual(opencue.Cuebot._lastSuccessfulCall, before)
self.assertLessEqual(opencue.Cuebot._lastSuccessfulCall, after)

def test__record_successful_call_resets_failure_counter(self):
"""Test that recordSuccessfulCall resets the consecutive failure counter."""
opencue.Cuebot._consecutiveFailures = 2

opencue.Cuebot.recordSuccessfulCall()

self.assertEqual(opencue.Cuebot._consecutiveFailures, 0)

def test__record_failed_call_increments_counter(self):
"""Test that recordFailedCall increments the consecutive failure counter."""
self.assertEqual(opencue.Cuebot._consecutiveFailures, 0)

opencue.Cuebot.recordFailedCall()
self.assertEqual(opencue.Cuebot._consecutiveFailures, 1)

opencue.Cuebot.recordFailedCall()
self.assertEqual(opencue.Cuebot._consecutiveFailures, 2)

@mock.patch.object(opencue.Cuebot, 'resetChannel')
def test__record_failed_call_resets_channel_after_max_failures(self, mock_reset):
"""Test that recordFailedCall triggers channel reset after max consecutive failures."""
# Simulate failures up to the threshold
for i in range(opencue.Cuebot._maxConsecutiveFailures - 1):
result = opencue.Cuebot.recordFailedCall()
self.assertFalse(result)
mock_reset.assert_not_called()

# The next failure should trigger a reset
result = opencue.Cuebot.recordFailedCall()
self.assertTrue(result)
mock_reset.assert_called_once()

# Counter should be reset after channel reset
self.assertEqual(opencue.Cuebot._consecutiveFailures, 0)

@mock.patch.object(opencue.Cuebot, 'resetChannel')
def test__record_failed_call_does_not_reset_when_already_in_progress(self, mock_reset):
"""Test that recordFailedCall doesn't trigger reset if one is already in progress."""
opencue.Cuebot._channelResetInProgress = True
opencue.Cuebot._consecutiveFailures = opencue.Cuebot._maxConsecutiveFailures

result = opencue.Cuebot.recordFailedCall()

self.assertFalse(result)
mock_reset.assert_not_called()

@mock.patch.object(opencue.Cuebot, 'getStub')
def test__check_channel_health_returns_true_on_success(self, mock_get_stub):
"""Test that checkChannelHealth returns True when the health check succeeds."""
mock_stub = mock.Mock()
mock_get_stub.return_value = mock_stub
opencue.Cuebot.RpcChannel = mock.Mock()

result = opencue.Cuebot.checkChannelHealth()

self.assertTrue(result)
mock_get_stub.assert_called_with('cue')
self.assertEqual(opencue.Cuebot._consecutiveFailures, 0)

@mock.patch.object(opencue.Cuebot, 'getStub')
@mock.patch.object(opencue.Cuebot, 'recordFailedCall')
def test__check_channel_health_returns_false_on_unavailable(
self, mock_record_failed, mock_get_stub):
"""Test that checkChannelHealth returns False on UNAVAILABLE error."""
mock_stub = mock.Mock()
error = grpc.RpcError()
error.code = mock.Mock(return_value=grpc.StatusCode.UNAVAILABLE)
error.details = mock.Mock(return_value="Connection refused")
mock_stub.GetSystemStats.side_effect = error
mock_get_stub.return_value = mock_stub
opencue.Cuebot.RpcChannel = mock.Mock()

result = opencue.Cuebot.checkChannelHealth()

self.assertFalse(result)
mock_record_failed.assert_called_once()

def test__check_channel_health_returns_false_when_no_channel(self):
"""Test that checkChannelHealth returns False when RpcChannel is None."""
opencue.Cuebot.RpcChannel = None

result = opencue.Cuebot.checkChannelHealth()

self.assertFalse(result)

def test__max_consecutive_failures_default(self):
"""Test that the default max consecutive failures is set correctly."""
self.assertEqual(opencue.Cuebot._maxConsecutiveFailures, 3)


if __name__ == '__main__':
unittest.main()
Loading