Skip to content

Commit cff5244

Browse files
Use the same mcs-shmem-locks tool that we used to inspect locks state to unlock them
1 parent 384dd56 commit cff5244

File tree

5 files changed

+161
-48
lines changed

5 files changed

+161
-48
lines changed

cmapi/cmapi_server/constants.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,8 @@ class ProgInfo(NamedTuple):
9292
LIBJEMALLOC_DEFAULT_PATH = os.path.join(MCS_DATA_PATH, 'libjemalloc.so.2')
9393
MCS_LOG_PATH = '/var/log/mariadb/columnstore'
9494

95-
# tools for BRM shmem lock inspection/reset
95+
# BRM shmem lock inspection/reset tool
9696
SHMEM_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'mcs-shmem-locks')
97-
RESET_LOCKS_PATH = os.path.join(MCS_INSTALL_BIN, 'reset_locks')
9897

9998
# client constants
10099
CMAPI_PORT = 8640 #TODO: use it in all places

cmapi/cmapi_server/process_dispatchers/container.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,7 @@ def stop(
223223
# Run pre-stop lock reset before saving BRM
224224
# These stale locks can occur if the controllernode couldn't stop correctly
225225
# and they cause mcs-savebrm.py to hang
226-
227-
dispatcher_utils.reset_shmem_locks(logger)
226+
dispatcher_utils.release_shmem_locks(logger)
228227

229228
# start mcs-savebrm.py before stoping workernode
230229
logger.debug('Waiting to save BRM.')

cmapi/cmapi_server/process_dispatchers/systemd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def stop(
168168
# Run pre-stop lock reset before saving BRM
169169
# These stale locks can occur if the controllernode couldn't stop correctly
170170
# and they cause mcs-savebrm.py to hang
171-
dispatcher_utils.reset_shmem_locks(logging.getLogger(__name__))
171+
dispatcher_utils.release_shmem_locks(logging.getLogger(__name__))
172172

173173
service_name = f'{service_name}@1.service {service_name}@2.service'
174174
cls._workernode_enable(False, use_sudo)
Lines changed: 83 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,104 @@
11
import logging
22
import re
33
from time import sleep
4-
from typing import Optional
5-
from cmapi_server.constants import SHMEM_LOCKS_PATH, RESET_LOCKS_PATH
4+
from typing import Optional, List, Tuple
5+
from cmapi_server.constants import SHMEM_LOCKS_PATH
66
from cmapi_server.process_dispatchers.base import BaseDispatcher
77

88

9-
def parse_locks_num(cmd_output: str) -> int:
10-
"""Parse output of mcs-shmem-locks command."""
11-
active_total = 0
9+
def parse_locks_state(cmd_output: str, logger: logging.Logger) -> List[Tuple[int, int, int]]:
10+
"""Parse per-lock state from mcs-shmem-locks output.
11+
12+
Returns a list of tuples: (lock_id, readers, writers)
13+
"""
14+
locks: List[Tuple[int, int, int]] = []
15+
current_id = 0
16+
readers = None
17+
writers = None
18+
1219
for line in cmd_output.splitlines():
13-
m = re.search(r'^\s*(readers|writers)\s*=\s*(\d+)', line)
20+
if line.strip().endswith('RWLock'):
21+
# flush previous section counts (if we have both)
22+
if current_id > 0 and readers is not None and writers is not None:
23+
locks.append((current_id, readers, writers))
24+
current_id += 1
25+
readers = None
26+
writers = None
27+
continue
28+
29+
m = re.search(r'^\s*readers\s*=\s*(\d+)', line)
1430
if m:
1531
try:
16-
active_total += int(m.group(2))
32+
readers = int(m.group(1))
1733
except ValueError:
18-
pass
19-
return active_total
34+
logger.warning('Failed to parse readers count from line: %s', line)
35+
readers = 0
36+
continue
2037

38+
m = re.search(r'^\s*writers\s*=\s*(\d+)', line)
39+
if m:
40+
try:
41+
writers = int(m.group(1))
42+
except ValueError:
43+
logger.warning('Failed to parse writers count from line: %s', line)
44+
writers = 0
45+
continue
2146

22-
def get_active_shmem_locks_num(logger: logging.Logger) -> Optional[int]:
23-
"""Get number of active shmem locks."""
24-
cmd = f'{SHMEM_LOCKS_PATH} --lock-id 0'
25-
success, out = BaseDispatcher.exec_command(cmd)
26-
if not success:
27-
logger.error('Failed to inspect shmem locks (command failed)')
28-
return None
29-
if not out:
30-
logger.error('Failed to inspect shmem locks (empty output)')
31-
return None
47+
# flush the last parsed lock
48+
if current_id > 0 and readers is not None and writers is not None:
49+
locks.append((current_id, readers, writers))
3250

33-
logger.debug('Current lock state:\n%s', (out or '').strip())
51+
return locks
3452

35-
return parse_locks_num(out)
3653

54+
def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool:
55+
"""Attempt to release active shmem locks.
3756
38-
def reset_shmem_locks(logger: logging.Logger) -> None:
39-
"""Inspect and reset BRM shmem locks"""
40-
logger.debug('Inspecting and resetting shmem locks.')
57+
- Inspect all locks.
58+
- Unlock writer lock (there can be only one)
59+
- Unlock each reader lock sequentially
60+
- Re-check and repeat up to max_iterations.
4161
42-
# Get current lock state
43-
active_locks_num = get_active_shmem_locks_num(logger)
44-
if active_locks_num is None:
45-
return
62+
Returns True on success (no active readers/writers remain), False otherwise.
63+
"""
64+
for attempt in range(1, max_iterations + 1):
65+
success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0')
66+
if not success or not out:
67+
logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt)
68+
return False
4669

47-
# Reset if any read/write locks are active
48-
if active_locks_num > 0:
49-
logger.info('Detected active shmem locks (sum readers+writers=%d). Attempting reset.', active_locks_num)
70+
locks = parse_locks_state(out, logger=logger)
5071

51-
# Reset locks
52-
success, out = BaseDispatcher.exec_command(f'{RESET_LOCKS_PATH} -s')
53-
if not success:
54-
logger.error('Failed to reset shmem locks (command failed)')
55-
return
72+
total_active = sum_active_locks(locks)
73+
if total_active == 0:
74+
logger.debug('Unlock attempt %d: no active locks', attempt)
75+
return True
76+
logger.debug('Unlock attempt %d: active total=%d; detail=%s', attempt, total_active, locks)
5677

57-
# Check that locks were reset
78+
# Issue unlocks per lock
79+
for lock_id, readers, writers in locks:
80+
# Unlock writer
81+
if writers > 0:
82+
cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -w -u'
83+
ok, _ = BaseDispatcher.exec_command(cmd)
84+
if not ok:
85+
logger.warning('Failed to unlock writer for lock-id=%d', lock_id)
86+
87+
# Unlock all readers
88+
if readers > 0:
89+
for _ in range(readers):
90+
cmd = f'{SHMEM_LOCKS_PATH} -i {lock_id} -r -u'
91+
ok, _ = BaseDispatcher.exec_command(cmd)
92+
if not ok:
93+
logger.warning('Failed to unlock a reader for lock-id=%d', lock_id)
94+
break
95+
96+
# Wait some time for state to settle
5897
sleep(1)
59-
active_locks_num = get_active_shmem_locks_num(logger)
60-
if active_locks_num is not None and active_locks_num > 0:
61-
logger.error('Failed to reset shmem locks (locks are still active)')
62-
return
63-
else:
64-
logger.info('No active shmem locks detected.')
98+
99+
logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts', max_iterations)
100+
return False
101+
102+
103+
def sum_active_locks(locks: List[Tuple[int, int, int]]) -> int:
104+
return sum(r + w for _, r, w in locks)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import unittest
2+
import logging
3+
4+
from cmapi_server.process_dispatchers.utils import (
5+
parse_locks_state,
6+
sum_active_locks,
7+
)
8+
9+
logger = logging.getLogger(__name__)
10+
11+
class TestShmemLocksParsing(unittest.TestCase):
12+
def test_parse_locks_state_basic(self):
13+
sample_output = """
14+
VSS RWLock
15+
readers = 2
16+
writers = 1
17+
readers waiting = 0
18+
writers waiting = 0
19+
mutex locked = 0
20+
ExtentMap RWLock
21+
readers = 0
22+
writers = 0
23+
readers waiting = 0
24+
writers waiting = 0
25+
mutex locked = 0
26+
"""
27+
state = parse_locks_state(sample_output, logger)
28+
# Two sections => IDs 1 and 2
29+
self.assertEqual(state, [(1, 2, 1), (2, 0, 0)])
30+
31+
def test_parse_locks_state_malformed_values(self):
32+
sample_output = """
33+
VSS RWLock
34+
readers = blorg
35+
writers = 1
36+
readers waiting = 0
37+
writers waiting = 0
38+
mutex locked = 0
39+
ExtentMap RWLock
40+
readers = 3
41+
writers = one
42+
readers waiting = 0
43+
writers waiting = 0
44+
mutex locked = 0
45+
FreeList RWLock
46+
readers = 1
47+
writers = 0
48+
readers waiting = 0
49+
writers waiting = 0
50+
mutex locked = 0
51+
"""
52+
state = parse_locks_state(sample_output, logger)
53+
self.assertEqual(state, [(3, 1, 0)])
54+
55+
def test_parse_locks_state_partial_section_ignored(self):
56+
sample_output = """
57+
VSS RWLock
58+
readers = 4
59+
writers = 0
60+
readers waiting = 0
61+
writers waiting = 0
62+
mutex locked = 0
63+
ExtentMap RWLock
64+
readers = 1
65+
readers waiting = 0
66+
writers waiting = 0
67+
mutex locked = 0
68+
"""
69+
state = parse_locks_state(sample_output, logger)
70+
# Second section missing writers, so we skip it
71+
self.assertEqual(state, [(1, 4, 0)])
72+
73+
74+
if __name__ == "__main__":
75+
unittest.main()

0 commit comments

Comments
 (0)