Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ETCD event cleared exception during blocking lock #250

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions src/etcd/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, client, lock_name):
# prevent us from getting back the full path name. We prefix our
# lock name with a uuid and can check for its presence on retry.
self._uuid = uuid.uuid4().hex
self.path = "{}/{}".format(client.lock_prefix, lock_name)
self.path = "{}/{}".format(client.lock_prefix, lock_name)
self.is_taken = False
self._sequence = None
_log.debug("Initiating lock for %s with uuid %s", self.path, self._uuid)
Expand Down Expand Up @@ -101,8 +101,8 @@ def __exit__(self, type, value, traceback):
self.release()
return False

def _acquired(self, blocking=True, timeout=0):
locker, nearest = self._get_locker()
def _acquired(self, blocking=True, timeout=0, etcd_index_cleared=False):
locker, nearest, etcd_index = self._get_locker()
self.is_taken = False
if self.lock_key == locker:
_log.debug("Lock acquired!")
Expand All @@ -117,16 +117,27 @@ def _acquired(self, blocking=True, timeout=0):
watch_key = nearest.key
_log.debug("Lock not acquired, now watching %s", watch_key)
t = max(0, timeout)

if etcd_index_cleared:
index = etcd_index + 1
else:
index = nearest.modifiedIndex + 1

while True:
try:
r = self.client.watch(watch_key, timeout=t, index=nearest.modifiedIndex + 1)
r = self.client.watch(watch_key, timeout=t, index=index)
_log.debug("Detected variation for %s: %s", r.key, r.action)
return self._acquired(blocking=True, timeout=timeout)
except etcd.EtcdKeyNotFound:
_log.debug("Key %s not present anymore, moving on", watch_key)
return self._acquired(blocking=True, timeout=timeout)
except etcd.EtcdLockExpired as e:
raise e
except etcd.EtcdEventIndexCleared:
_log.debug("Event index cleared so recover by reading current state.")
return self._acquired(
blocking=True, timeout=timeout, etcd_index_cleared=True
)
except etcd.EtcdException:
_log.exception("Unexpected exception")

Expand Down Expand Up @@ -158,8 +169,9 @@ def _find_lock(self):
return False

def _get_locker(self):
results = [res for res in
self.client.read(self.path, recursive=True).leaves]
resp = self.client.read(self.path, recursive=True)
results = [res for res in resp.leaves]

if not self._sequence:
self._find_lock()
l = sorted([r.key for r in results])
Expand All @@ -168,10 +180,14 @@ def _get_locker(self):
i = l.index(self.lock_key)
if i == 0:
_log.debug("No key before our one, we are the locker")
return (l[0], None)
return (l[0], None, resp.etcd_index)
else:
_log.debug("Locker: %s, key to watch: %s", l[0], l[i-1])
return (l[0], next(x for x in results if x.key == l[i-1]))
return (
l[0],
next(x for x in results if x.key == l[i - 1]),
resp.etcd_index
)
except ValueError:
# Something very wrong is going on, most probably
# our lock has expired
Expand Down
39 changes: 33 additions & 6 deletions src/etcd/tests/unit/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def test_acquired(self):
Test the acquiring primitives
"""
self.locker._sequence = '4'
retval = ('/_locks/test_lock/4', None)
retval = ('/_locks/test_lock/4', None, 1)
self.locker._get_locker = mock.MagicMock(
spec=self.locker._get_locker, return_value=retval)
self.assertTrue(self.locker._acquired())
self.assertTrue(self.locker.is_taken)
retval = ('/_locks/test_lock/1', '/_locks/test_lock/4')
retval = ('/_locks/test_lock/1', '/_locks/test_lock/4', 1)
self.locker._get_locker = mock.MagicMock(return_value=retval)
self.assertFalse(self.locker._acquired(blocking=False))
self.assertFalse(self.locker.is_taken)
Expand All @@ -107,7 +107,7 @@ def test_acquired(self):
}
}
self._mock_api(200, d)
returns = [('/_locks/test_lock/1', '/_locks/test_lock/4'), ('/_locks/test_lock/4', None)]
returns = [('/_locks/test_lock/1', '/_locks/test_lock/4', 1), ('/_locks/test_lock/4', None, 1)]

def side_effect():
return returns.pop()
Expand All @@ -119,8 +119,8 @@ def side_effect():
def test_acquired_no_timeout(self):
self.locker._sequence = 4
returns = [
('/_locks/test_lock/4', None),
('/_locks/test_lock/1', etcd.EtcdResult(node={"key": '/_locks/test_lock/4', "modifiedIndex": 1}))
('/_locks/test_lock/4', None, 1),
('/_locks/test_lock/1', etcd.EtcdResult(node={"key": '/_locks/test_lock/4', "modifiedIndex": 1}), 1)
]

def side_effect():
Expand Down Expand Up @@ -175,7 +175,7 @@ def test_find_lock(self):

def test_get_locker(self):
self.recursive_read()
self.assertEquals((u'/_locks/test_lock/1', etcd.EtcdResult(node={'newKey': False, '_children': [], 'createdIndex': 33, 'modifiedIndex': 33, 'value': u'2qwwwq', 'expiration': None, 'key': u'/_locks/test_lock/1', 'ttl': None, 'action': None, 'dir': False})),
self.assertEquals((u'/_locks/test_lock/1', etcd.EtcdResult(node={'newKey': False, '_children': [], 'createdIndex': 33, 'modifiedIndex': 33, 'value': u'2qwwwq', 'expiration': None, 'key': u'/_locks/test_lock/1', 'ttl': None, 'action': None, 'dir': False}), 1),
self.locker._get_locker())
with self.assertRaises(etcd.EtcdLockExpired):
self.locker._sequence = '35'
Expand All @@ -195,3 +195,30 @@ def test_release(self):
self.locker.is_taken = True
self.locker.release()
self.assertFalse(self.locker.is_taken)

def test_event_index_cleared(self):
self.locker._sequence = 4
returns = [
('/_locks/test_lock/4', None, 1),
('/_locks/test_lock/1', etcd.EtcdResult(node={"key": '/_locks/test_lock/4', "modifiedIndex": 1}), 1)
]

def side_effect():
return returns.pop()

d = {
u'action': u'get',
u'node': {
u'modifiedIndex': 190,
u'key': u'/_locks/test_lock/4',
u'value': self.locker.uuid
}
}
self._mock_api(200, d)
self.locker._get_locker = mock.create_autospec(
self.locker._get_locker, side_effect=side_effect)

# Raise the event index cleared exception and test that we
# can recover from it.
self._mock_exception(etcd.EtcdEventIndexCleared, self.locker.lock_key)
self.assertTrue(self.locker._acquired())