Skip to content

Commit

Permalink
Merge pull request #102 from Duke-GCB/k8s-api-retry-simple
Browse files Browse the repository at this point in the history
Implements simple k8s API retries
  • Loading branch information
dleehr authored Oct 14, 2019
2 parents a451165 + 25d1bfa commit 3f781c7
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 6 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,22 @@ To view open issues related to conformance, see the [conformance](https://github
## Setup

Please see [examples](examples) for installation and setup instructions.

## Environment Variables

Calrissian's behaviors can be customized by setting the following environment variables in the container specification.

### Pod lifecycle

By default, pods for a job step will be deleted after termination

- `CALRISSIAN_DELETE_PODS`: Default `true`. If `false`, job step pods will not be deleted.

### Kubernetes API retries

When encountering a Kubernetes API exception, Calrissian uses a library to retry API calls with an exponential backoff. See the [tenacity documentation](https://tenacity.readthedocs.io/en/latest/index.html#waiting-before-retrying) for details.

- `RETRY_MULTIPLIER`: Default `5`. Unit for multiplying the exponent interval.
- `RETRY_MIN`: Default `5`. Minimum interval between retries.
- `RETRY_MAX`: Default `1200`. Maximum interval between retries.
- `RETRY_ATTEMPTS`: Default `10`. Max number of retries before giving up.
16 changes: 15 additions & 1 deletion calrissian/k8s.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from kubernetes import client, config, watch
from kubernetes.client.api_client import ApiException
from kubernetes.config.config_exception import ConfigException
from calrissian.retry import retry_exponential_if_exception_type
import threading
import logging
import os
from urllib3.exceptions import HTTPError

log = logging.getLogger('calrissian.k8s')

Expand Down Expand Up @@ -70,6 +72,7 @@ def __init__(self):
self.namespace = load_config_get_namespace()
self.core_api_instance = client.CoreV1Api()

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def submit_pod(self, pod_body):
with PodMonitor() as monitor:
pod = self.core_api_instance.create_namespaced_pod(self.namespace, pod_body)
Expand All @@ -89,11 +92,17 @@ def should_delete_pod(self):
else:
return True

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def delete_pod_name(self, pod_name):
try:
self.core_api_instance.delete_namespaced_pod(pod_name, self.namespace)
except ApiException as e:
raise CalrissianJobException('Error deleting pod named {}'.format(pod_name), e)
if e.status == 404:
# pod was not found - already deleted, so do not retry
pass
else:
# Re-raise
raise

def _handle_completion(self, state, container):
"""
Expand All @@ -117,6 +126,7 @@ def _handle_completion(self, state, container):
)
log.info('handling completion with {}'.format(exit_code))

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def follow_logs(self):
pod_name = self.pod.metadata.name
log.info('[{}] follow_logs start'.format(pod_name))
Expand All @@ -131,6 +141,7 @@ def follow_logs(self):
log.debug('[{}] {}'.format(pod_name, line))
log.info('[{}] follow_logs end'.format(pod_name))

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def wait_for_completion(self):
w = watch.Watch()
for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
Expand Down Expand Up @@ -212,6 +223,7 @@ def _extract_start_finish_times(self, state):
"""
return (state.terminated.started_at, state.terminated.finished_at,)

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def get_pod_for_name(self, pod_name):
"""
Given a pod name return details about this pod
Expand Down Expand Up @@ -274,6 +286,7 @@ def remove(self, pod):

@staticmethod
def cleanup():
log.info('Starting Cleanup')
with PodMonitor() as monitor:
k8s_client = KubernetesClient()
for pod_name in PodMonitor.pod_names:
Expand All @@ -283,6 +296,7 @@ def cleanup():
except Exception:
log.error('Error deleting pod named {}, ignoring'.format(pod_name))
PodMonitor.pod_names = []
log.info('Finishing Cleanup')


def delete_pods():
Expand Down
24 changes: 24 additions & 0 deletions calrissian/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from tenacity import retry, wait_exponential, retry_if_exception_type, stop_after_attempt, before_sleep_log
import logging
import os


class RetryParameters(object):
MULTIPLIER = float(os.getenv('RETRY_MULTIPLIER', 5)) # Unit for multiplying the exponent
MIN = float(os.getenv('RETRY_MIN', 5)) # Min time for retrying
MAX = float(os.getenv('RETRY_MAX', 1200)) # Max interval between retries
ATTEMPTS = int(os.getenv('RETRY_ATTEMPTS', 10)) # Max number of retries before giving up


def retry_exponential_if_exception_type(exc_type, logger):
"""
Decorator function that returns the tenacity @retry decorator with our commonly-used config
:param exc_type: Type of exception (or tuple of types) to retry if encountered
:param logger: A logger instance to send retry logs to
:return: Result of tenacity.retry decorator function
"""
return retry(retry=retry_if_exception_type(exc_type),
wait=wait_exponential(multiplier=RetryParameters.MULTIPLIER, min=RetryParameters.MIN, max=RetryParameters.MAX),
stop=stop_after_attempt(RetryParameters.ATTEMPTS),
before_sleep=before_sleep_log(logger, logging.DEBUG),
reraise=True)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ scandir==1.10.0
schema-salad==4.5.20190621200723
shellescape==3.4.1
six==1.12.0
tenacity==5.1.1
typing-extensions==3.7.4
urllib3==1.24.3
websocket-client==0.56.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def run(self):
'urllib3<1.25,>=1.24.2',
'kubernetes==10.0.1',
'cwltool==1.0.20190621234233',
'tenacity==5.1.1',
],
test_suite='nose2.collector.collector',
tests_require=['nose2'],
Expand Down
11 changes: 6 additions & 5 deletions tests/test_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,11 @@ def test_delete_pod_name_calls_api(self, mock_get_namespace, mock_client):
kc.delete_pod_name('pod-123')
self.assertEqual('pod-123', mock_client.CoreV1Api.return_value.delete_namespaced_pod.call_args[0][0])

def test_delete_pod_name_raises(self, mock_get_namespace, mock_client):
mock_client.rest.ApiException = Exception
mock_client.CoreV1Api.return_value.delete_namespaced_pod.side_effect = ApiException
def test_delete_pod_name_ignores_404(self, mock_get_namespace, mock_client):
mock_client.CoreV1Api.return_value.delete_namespaced_pod.side_effect = ApiException(status=404)
kc = KubernetesClient()
with self.assertRaisesRegex(CalrissianJobException, 'Error deleting pod named pod-123'):
kc.delete_pod_name('pod-123')
kc.delete_pod_name('pod-123')
self.assertEqual('pod-123', mock_client.CoreV1Api.return_value.delete_namespaced_pod.call_args[0][0])

@patch('calrissian.k8s.log')
def test_follow_logs_streams_to_logging(self, mock_log, mock_get_namespace, mock_client):
Expand Down Expand Up @@ -370,7 +369,9 @@ def test_remove_after_cleanup(self, mock_log, mock_client):
monitor.remove(pod)
mock_log.info.assert_has_calls([
call('PodMonitor adding pod-123'),
call('Starting Cleanup'),
call('PodMonitor deleting pod pod-123'),
call('Finishing Cleanup'),
])
mock_log.warning.assert_called_with('PodMonitor pod-123 has already been removed')

Expand Down
72 changes: 72 additions & 0 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from calrissian.retry import retry_exponential_if_exception_type
from unittest import TestCase
from unittest.mock import Mock, patch


class RetryTestCase(TestCase):
def setUp(self):
self.logger = Mock()
self.mock = Mock()

def setup_mock_retry_parameters(self, mock_retry_parameters):
mock_retry_parameters.MULTIPLIER = 0.001
mock_retry_parameters.MIN = 0.001
mock_retry_parameters.MAX = 0.010
mock_retry_parameters.ATTEMPTS = 5

def test_retry_calls_wrapped_function(self):
@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
return self.mock()

result = func()
self.assertEqual(result, self.mock.return_value)
self.assertEqual(self.mock.call_count, 1)

@patch('calrissian.retry.RetryParameters')
def test_retry_gives_up_and_raises(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)
self.mock.side_effect = ValueError('value error')

@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
self.mock()

with self.assertRaisesRegex(ValueError, 'value error'):
func()

self.assertEqual(self.mock.call_count, 5)

@patch('calrissian.retry.RetryParameters')
def test_retry_eventually_succeeds_without_exception(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)

@retry_exponential_if_exception_type(ValueError, self.logger)
def func():
r = self.mock()
if self.mock.call_count < 3:
raise ValueError('value error')
return r

result = func()

self.assertEqual(result, self.mock.return_value)
self.assertEqual(self.mock.call_count, 3)

@patch('calrissian.retry.RetryParameters')
def test_retry_raises_other_exceptions_without_second_attempt(self, mock_retry_parameters):
self.setup_mock_retry_parameters(mock_retry_parameters)

class ExceptionA(Exception): pass
class ExceptionB(Exception): pass

self.mock.side_effect = ExceptionA('exception a')

@retry_exponential_if_exception_type(ExceptionB, self.logger)
def func():
self.mock()

with self.assertRaisesRegex(ExceptionA, 'exception a'):
func()

self.assertEqual(self.mock.call_count, 1)

0 comments on commit 3f781c7

Please sign in to comment.