Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements simple k8s API retries #102

Merged
merged 13 commits into from
Oct 14, 2019
Merged
9 changes: 9 additions & 0 deletions calrissian/k8s.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from kubernetes import client, config, watch
from kubernetes.client.api_client import ApiException
from kubernetes.config.config_exception import ConfigException
from calrissian.retry import retry_exponential_if_exception_type
import threading
import logging
import os
from urllib3.exceptions import HTTPError

log = logging.getLogger('calrissian.k8s')

Expand Down Expand Up @@ -70,6 +72,7 @@ def __init__(self):
self.namespace = load_config_get_namespace()
self.core_api_instance = client.CoreV1Api()

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def submit_pod(self, pod_body):
with PodMonitor() as monitor:
pod = self.core_api_instance.create_namespaced_pod(self.namespace, pod_body)
Expand All @@ -89,6 +92,7 @@ def should_delete_pod(self):
else:
return True

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def delete_pod_name(self, pod_name):
try:
self.core_api_instance.delete_namespaced_pod(pod_name, self.namespace)
Expand Down Expand Up @@ -117,6 +121,7 @@ def _handle_completion(self, state, container):
)
log.info('handling completion with {}'.format(exit_code))

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def follow_logs(self):
pod_name = self.pod.metadata.name
log.info('[{}] follow_logs start'.format(pod_name))
Expand All @@ -131,6 +136,7 @@ def follow_logs(self):
log.debug('[{}] {}'.format(pod_name, line))
log.info('[{}] follow_logs end'.format(pod_name))

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def wait_for_completion(self):
w = watch.Watch()
for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
Expand Down Expand Up @@ -212,6 +218,7 @@ def _extract_start_finish_times(self, state):
"""
return (state.terminated.started_at, state.terminated.finished_at,)

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def get_pod_for_name(self, pod_name):
"""
Given a pod name return details about this pod
Expand Down Expand Up @@ -274,6 +281,7 @@ def remove(self, pod):

@staticmethod
def cleanup():
log.info('Starting Cleanup')
with PodMonitor() as monitor:
k8s_client = KubernetesClient()
for pod_name in PodMonitor.pod_names:
Expand All @@ -283,6 +291,7 @@ def cleanup():
except Exception:
log.error('Error deleting pod named {}, ignoring'.format(pod_name))
PodMonitor.pod_names = []
log.info('Finishing Cleanup')


def delete_pods():
Expand Down
24 changes: 24 additions & 0 deletions calrissian/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from tenacity import retry, wait_exponential, retry_if_exception_type, stop_after_attempt, before_sleep_log
import logging
import os


class RetryParameters(object):
MULTIPLIER = float(os.getenv('RETRY_MULTIPLIER', 5)) # Unit for multiplying the exponent
MIN = float(os.getenv('RETRY_MIN', 5)) # Min time for retrying
MAX = float(os.getenv('RETRY_MAX', 1200)) # Max interval between retries
ATTEMPTS = int(os.getenv('RETRY_ATTEMPTS', 10)) # Max number of retries before giving up


def retry_exponential_if_exception_type(exc_type, logger):
"""
Decorator function that returns the tenacity @retry decorator with our commonly-used config
:param exc_type: Type of exception (or tuple of types) to retry if encountered
:param logger: A logger instance to send retry logs to
:return: Result of tenacity.retry decorator function
"""
return retry(retry=retry_if_exception_type(exc_type),
wait=wait_exponential(multiplier=RetryParameters.MULTIPLIER, min=RetryParameters.MIN, max=RetryParameters.MAX),
stop=stop_after_attempt(RetryParameters.ATTEMPTS),
before_sleep=before_sleep_log(logger, logging.DEBUG),
reraise=True)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ scandir==1.10.0
schema-salad==4.5.20190621200723
shellescape==3.4.1
six==1.12.0
tenacity==5.1.1
typing-extensions==3.7.4
urllib3==1.24.3
websocket-client==0.56.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def run(self):
'urllib3<1.25,>=1.24.2',
'kubernetes==10.0.1',
'cwltool==1.0.20190621234233',
'tenacity==5.1.1',
],
test_suite='nose2.collector.collector',
tests_require=['nose2'],
Expand Down
2 changes: 2 additions & 0 deletions tests/test_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ def test_remove_after_cleanup(self, mock_log, mock_client):
monitor.remove(pod)
mock_log.info.assert_has_calls([
call('PodMonitor adding pod-123'),
call('Starting Cleanup'),
call('PodMonitor deleting pod pod-123'),
call('Finishing Cleanup'),
])
mock_log.warning.assert_called_with('PodMonitor pod-123 has already been removed')

Expand Down
72 changes: 72 additions & 0 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from calrissian.retry import retry_exponential_if_exception_type
from unittest import TestCase
from unittest.mock import Mock, patch


class RetryTestCase(TestCase):
def setUp(self):
self.logger = Mock()
self.mock = Mock()

def setup_mock_retry_parameters(self, mock_retry_parameters):
mock_retry_parameters.MULTIPLIER = 0.001
mock_retry_parameters.MIN = 0.001
mock_retry_parameters.MAX = 0.010
mock_retry_parameters.ATTEMPTS = 5

def test_retry_calls_wrapped_function(self):
@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
return self.mock()

result = func()
self.assertEqual(result, self.mock.return_value)
self.assertEqual(self.mock.call_count, 1)

@patch('calrissian.retry.RetryParameters')
def test_retry_gives_up_and_raises(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)
self.mock.side_effect = ValueError('value error')

@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
self.mock()

with self.assertRaisesRegex(ValueError, 'value error'):
func()

self.assertEqual(self.mock.call_count, 5)

@patch('calrissian.retry.RetryParameters')
def test_retry_eventually_succeeds_without_exception(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)

@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
r = self.mock()
if self.mock.call_count < 3:
raise ValueError('value error')
return r

result = func()

self.assertEqual(result, self.mock.return_value)
self.assertEqual(self.mock.call_count, 3)

@patch('calrissian.retry.RetryParameters')
def test_retry_raises_other_exceptions_without_second_attempt(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)

class ExceptionA(Exception): pass
class ExceptionB(Exception): pass

self.mock.side_effect = ExceptionA('exception a')

@retry_exponential_if_exception_type(ExceptionB, self.logger)
def func():
self.mock()

with self.assertRaisesRegex(ExceptionA, 'exception a'):
func()

self.assertEqual(self.mock.call_count, 1)