Skip to content

Commit

Permalink
KAFKA-17109: Move lock backoff retry to streams TaskManager (#17209)
Browse files Browse the repository at this point in the history
This PR implements exponential backoff for failed initializations of tasks due to lock exceptions. It increases the time between two consecutive attempts of initializing the tasks.

Reviewer: Bruno Cadonna <[email protected]>
  • Loading branch information
aliehsaeedii authored Sep 30, 2024
1 parent 4036081 commit bb11257
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -98,8 +97,6 @@ public StateDirectoryProcessFile() {

private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();

private final HashMap<TaskId, BackoffRecord> lockedTasksToBackoffRecord = new HashMap<>();

private FileChannel stateDirLockChannel;
private FileLock stateDirLock;

Expand Down Expand Up @@ -356,11 +353,9 @@ synchronized boolean lock(final TaskId taskId) {
if (lockOwner.equals(Thread.currentThread())) {
log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
// we already own the lock
lockedTasksToBackoffRecord.remove(taskId);
return true;
} else {
// another thread owns the lock
updateOrCreateBackoffRecord(taskId, System.currentTimeMillis());
return false;
}
} else if (!stateDir.exists()) {
Expand All @@ -369,23 +364,10 @@ synchronized boolean lock(final TaskId taskId) {
} else {
lockedTasksToOwner.put(taskId, Thread.currentThread());
// make sure the task directory actually exists, and create it if not
getOrCreateDirectoryForTask(taskId);
return true;
}
}

public boolean canTryLock(final TaskId taskId, final long nowMs) {
return !lockedTasksToBackoffRecord.containsKey(taskId) || lockedTasksToBackoffRecord.get(taskId).canAttempt(nowMs);
}

private void updateOrCreateBackoffRecord(final TaskId taskId, final long nowMs) {
if (lockedTasksToBackoffRecord.containsKey(taskId)) {
lockedTasksToBackoffRecord.get(taskId).recordAttempt(nowMs);
} else {
lockedTasksToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
}
}

/**
* Unlock the state directory for the given {@link TaskId}.
*/
Expand Down Expand Up @@ -697,25 +679,4 @@ public int hashCode() {
return Objects.hash(file, namedTopology);
}
}

public static class BackoffRecord {
private long attempts;
private long lastAttemptMs;
private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1, 2, 10000, 0.5);


public BackoffRecord(final long nowMs) {
this.attempts = 1;
this.lastAttemptMs = nowMs;
}

public void recordAttempt(final long nowMs) {
this.attempts++;
this.lastAttemptMs = nowMs;
}

public boolean canAttempt(final long nowMs) {
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,21 @@ static void registerStateStores(final Logger log,
}

final TaskId id = stateMgr.taskId();
if (!stateDirectory.canTryLock(id, System.currentTimeMillis())) {
log.trace("Task {} is still not allowed to retry acquiring the state directory lock", id);
} else if (!stateDirectory.lock(id)) {
if (!stateDirectory.lock(id)) {
throw new LockException(String.format("%sFailed to lock the state directory for task %s", logPrefix, id));
} else {
log.debug("Acquired state directory lock");
}
log.debug("Acquired state directory lock");

final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id);
final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id);

stateMgr.registerStateStores(topology.stateStores(), processorContext);
log.debug("Registered state stores");
stateMgr.registerStateStores(topology.stateStores(), processorContext);
log.debug("Registered state stores");

// We should only load checkpoint AFTER the corresponding state directory lock has been acquired and
// the state stores have been registered; we should not try to load at the state manager construction time.
// See https://issues.apache.org/jira/browse/KAFKA-8574
stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
log.debug("Initialized state stores");
}
// We should only load checkpoint AFTER the corresponding state directory lock has been acquired and
// the state stores have been registered; we should not try to load at the state manager construction time.
// See https://issues.apache.org/jira/browse/KAFKA-8574
stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
log.debug("Initialized state stores");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
Expand Down Expand Up @@ -104,6 +105,8 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();

private Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();

private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
private final StateUpdater stateUpdater;
Expand Down Expand Up @@ -1006,14 +1009,22 @@ private void addTasksToStateUpdater() {
}

private void addTaskToStateUpdater(final Task task) {
final long nowMs = time.milliseconds();
try {
task.initializeIfNeeded();
stateUpdater.add(task);
if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded();
taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task);
} else {
log.trace("Task {} is still not allowed to retry acquiring the state directory lock", task.id());
tasks.addPendingTasksToInit(Collections.singleton(task));
}
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
updateOrCreateBackoffRecord(task.id(), nowMs);
}
}

Expand Down Expand Up @@ -1769,7 +1780,6 @@ private Stream<Task> standbyTaskStream() {
return standbyTasksInTaskRegistry;
}
}

// For testing only.
int commitAll() {
return commit(tasks.allTasks());
Expand Down Expand Up @@ -2116,4 +2126,37 @@ boolean needsInitializationOrRestoration() {
void addTask(final Task task) {
tasks.addTask(task);
}

private boolean canTryInitializeTask(final TaskId taskId, final long nowMs) {
return !taskIdToBackoffRecord.containsKey(taskId) || taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
}

private void updateOrCreateBackoffRecord(final TaskId taskId, final long nowMs) {
if (taskIdToBackoffRecord.containsKey(taskId)) {
taskIdToBackoffRecord.get(taskId).recordAttempt(nowMs);
} else {
taskIdToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
}
}

public static class BackoffRecord {
private long attempts;
private long lastAttemptMs;
private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1000, 2, 10000, 0.5);


public BackoffRecord(final long nowMs) {
this.attempts = 1;
this.lastAttemptMs = nowMs;
}

public void recordAttempt(final long nowMs) {
this.attempts++;
this.lastAttemptMs = nowMs;
}

public boolean canAttempt(final long nowMs) {
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -181,7 +180,6 @@ public void cleanup() throws IOException {
@Test
public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
stateDirectory = mock(StateDirectory.class);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(false);
when(stateManager.taskType()).thenReturn(TaskType.STANDBY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,22 +444,6 @@ public void shouldNotLockStateDirLockedByAnotherThread() throws Exception {
assertFalse(directory.lock(taskId));
}

@Test
public void shouldBackoffRetryIfStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final Thread thread = new Thread(() -> directory.lock(taskId));
thread.start();
thread.join(30000);

assertTrue(directory.canTryLock(taskId, System.currentTimeMillis()));

assertFalse(directory.lock(taskId));
assertFalse(directory.lock(taskId));
assertFalse(directory.lock(taskId));
// after 3 unsuccessful retries, backoff time is > 0
assertFalse(directory.canTryLock(taskId, System.currentTimeMillis()));
}

@Test
public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mockStatic;
Expand Down Expand Up @@ -88,7 +85,6 @@ public void testRegisterStateStoreWhenTopologyEmpty() {
public void testRegisterStateStoreFailToLockStateDirectory() {
when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(false);

final LockException thrown = assertThrows(LockException.class,
Expand All @@ -98,17 +94,6 @@ public void testRegisterStateStoreFailToLockStateDirectory() {
assertEquals("logPrefix:Failed to lock the state directory for task 0_0", thrown.getMessage());
}

@Test
public void testRegisterStateStoreWhenTryLockIsNotAllowed() {
when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(false);

assertDoesNotThrow(() -> StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext));

}

@Test
public void testRegisterStateStores() {
final MockKeyValueStore store1 = new MockKeyValueStore("store1", false);
Expand All @@ -117,7 +102,6 @@ public void testRegisterStateStores() {
final InOrder inOrder = inOrder(stateManager);
when(topology.stateStores()).thenReturn(stateStores);
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(true);
when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
when(topology.stateStores()).thenReturn(stateStores);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -315,7 +314,6 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
// Clean up state directory created as part of setup
stateDirectory.close();
stateDirectory = mock(StateDirectory.class);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(false);

task = createStatefulTask(createConfig("100"), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, final b
topologyMetadata.buildAndRewriteTopology();

final TaskManager taskManager = new TaskManager(
null,
new MockTime(),
changelogReader,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
Expand Down Expand Up @@ -1243,6 +1244,54 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}

@Test
public void shouldRetryInitializationWithBackoffWhenInitializationFails() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

// task00 should not be initialized due to LockException, task01 should be initialized
verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(tasks).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
verify(stateUpdater, never()).add(task00);
verify(stateUpdater).add(task01);

time.sleep(500);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

// task00 should not be initialized since the backoff period has not passed
verify(task00, times(1)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater, never()).add(task00);

time.sleep(5000);

// task00 should call initialize since the backoff period has passed
doNothing().when(task00).initializeIfNeeded();
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

verify(task00, times(2)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater).add(task00);
}

@Test
public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
Expand Down

0 comments on commit bb11257

Please sign in to comment.