Skip to content

KAFKA-10789: Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
Expand All @@ -44,6 +43,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand All @@ -53,6 +53,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -62,6 +63,10 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -72,8 +77,9 @@
public class ChangeLoggingKeyValueBytesStoreTest {

private final MockRecordCollector collector = new MockRecordCollector();
private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv");
private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
@Mock
private InMemoryKeyValueStore innerMock;
private ChangeLoggingKeyValueBytesStore store;
private InternalMockProcessorContext<?, ?> context;
private final StreamsConfig streamsConfig = streamsConfigMock();
private final Bytes hi = Bytes.wrap("hi".getBytes());
Expand All @@ -89,9 +95,86 @@ public class ChangeLoggingKeyValueBytesStoreTest {
public void before() {
context = mockContext();
context.setTime(0);
store = new ChangeLoggingKeyValueBytesStore(innerMock);
store.init(context, store);
}

private void mockPosition() {
when(innerMock.getPosition()).thenReturn(Position.emptyPosition());
}

private void mockGet(final Map<Bytes, byte[]> mockMap) {
when(innerMock.get(any(Bytes.class))).thenAnswer(invocation -> mockMap.get(invocation.getArgument(0)));
}

private void mockPut(final Map<Bytes, byte[]> mockMap) {
doAnswer(invocation -> {
mockMap.put(invocation.getArgument(0), invocation.getArgument(1));
StoreQueryUtils.updatePosition(innerMock.getPosition(), context);
return null;
}).when(innerMock).put(any(Bytes.class), any(byte[].class));
}

private void mockPutAll(final Map<Bytes, byte[]> mockMap) {
doAnswer(invocation -> {
final List<KeyValue<Bytes, byte[]>> entries = invocation.getArgument(0);
for (final KeyValue<Bytes, byte[]> entry : entries) {
mockMap.put(entry.key, entry.value);
}
return null;
}).when(innerMock).putAll(anyList());
}
private void mockDelete(final Map<Bytes, byte[]> mockMap) {
doAnswer(invocation -> {
final Bytes key = invocation.getArgument(0);
final byte[] oldValue = mockMap.get(key);
mockMap.remove(key);
return oldValue;
}).when(innerMock).delete(any(Bytes.class));
}
private void mockPutIfAbsent(final Map<Bytes, byte[]> mockMap) {
doAnswer(invocation -> {
final Bytes key = invocation.getArgument(0);
final byte[] value = invocation.getArgument(1);
return mockMap.putIfAbsent(key, value);
}).when(innerMock).putIfAbsent(any(Bytes.class), any(byte[].class));
}
private void mockPrefixScan(final Map<Bytes, byte[]> mockMap) {
when(innerMock.prefixScan(anyString(), any())).thenAnswer(invocation -> {
final String prefix = invocation.getArgument(0);
final List<KeyValue<Bytes, byte[]>> matchingRecords = new ArrayList<>();
for (final Map.Entry<Bytes, byte[]> entry : mockMap.entrySet()) {
if (entry.getKey().toString().startsWith(prefix)) {
matchingRecords.add(KeyValue.pair(entry.getKey(), entry.getValue()));
}
}
return new KeyValueIterator<Bytes, byte[]>() {
private final Iterator<KeyValue<Bytes, byte[]>> iterator = matchingRecords.iterator();

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public KeyValue<Bytes, byte[]> next() {
return iterator.next();
}

@Override
public void close() {
// No resources to clean up in this mock
}

@Override
public Bytes peekNextKey() {
return null;
}
};
});
}


private InternalMockProcessorContext mockContext() {
return new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Expand All @@ -112,28 +195,37 @@ public void after() {

@Test
public void shouldDelegateInit() {
final InternalMockProcessorContext context = mockContext();
final KeyValueStore<Bytes, byte[]> innerMock = mock(InMemoryKeyValueStore.class);
final InternalMockProcessorContext mockContext = mockContext();
final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock);
outer.init(context, outer);
verify(innerMock).init(context, outer);
outer.init(mockContext, outer);
verify(innerMock).init(mockContext, outer);
}

@Test
public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockGet(mockMap);
mockPosition();

store.put(hi, there);
assertThat(inner.get(hi), equalTo(there));
assertThat(innerMock.get(hi), equalTo(there));
assertThat(collector.collected().size(), equalTo(1));
assertThat(collector.collected().get(0).key(), equalTo(hi));
assertThat(collector.collected().get(0).value(), equalTo(there));
}

@Test
public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutAll(mockMap);
mockGet(mockMap);
mockPosition();

store.putAll(Arrays.asList(KeyValue.pair(hi, there),
KeyValue.pair(hello, world)));
assertThat(inner.get(hi), equalTo(there));
assertThat(inner.get(hello), equalTo(world));
assertThat(innerMock.get(hi), equalTo(there));
assertThat(innerMock.get(hello), equalTo(world));

assertThat(collector.collected().size(), equalTo(2));
assertThat(collector.collected().get(0).key(), equalTo(hi));
Expand All @@ -144,20 +236,37 @@ public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {

@Test
public void shouldPropagateDelete() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockGet(mockMap);
mockDelete(mockMap);
mockPosition();

store.put(hi, there);
store.delete(hi);
assertThat(inner.approximateNumEntries(), equalTo(0L));
assertThat(inner.get(hi), nullValue());

assertThat(innerMock.approximateNumEntries(), equalTo(0L));
assertThat(innerMock.get(hi), nullValue());
}

@Test
public void shouldReturnOldValueOnDelete() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockDelete(mockMap);
mockPosition();

store.put(hi, there);
assertThat(store.delete(hi), equalTo(there));
}

@Test
public void shouldLogKeyNullOnDelete() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockDelete(mockMap);
mockPosition();

store.put(hi, there);
assertThat(store.delete(hi), equalTo(there));

Expand All @@ -170,19 +279,34 @@ public void shouldLogKeyNullOnDelete() {

@Test
public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutIfAbsent(mockMap);
mockGet(mockMap);
mockPosition();

store.putIfAbsent(hi, there);
assertThat(inner.get(hi), equalTo(there));
assertThat(innerMock.get(hi), equalTo(there));
}

@Test
public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPutIfAbsent(mockMap);
mockGet(mockMap);
mockPosition();

store.put(hi, there);
store.putIfAbsent(hi, world);
assertThat(inner.get(hi), equalTo(there));
assertThat(innerMock.get(hi), equalTo(there));
}

@Test
public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutIfAbsent(mockMap);
mockPosition();

store.putIfAbsent(hi, there);

assertThat(collector.collected().size(), equalTo(1));
Expand All @@ -192,6 +316,11 @@ public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {

@Test
public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPutIfAbsent(mockMap);
mockPosition();

store.put(hi, there);
store.putIfAbsent(hi, world);

Expand All @@ -202,23 +331,42 @@ public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {

@Test
public void shouldReturnCurrentValueOnPutIfAbsent() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPutIfAbsent(mockMap);
mockPosition();

store.put(hi, there);
assertThat(store.putIfAbsent(hi, world), equalTo(there));
}

@Test
public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPutIfAbsent(mockMap);
mockPosition();

assertThat(store.putIfAbsent(hi, there), is(nullValue()));
}

@Test
public void shouldReturnValueOnGetWhenExists() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockGet(mockMap);
mockPosition();

store.put(hello, world);
assertThat(store.get(hello), equalTo(world));
}

@Test
public void shouldGetRecordsWithPrefixKey() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPrefixScan(mockMap);
mockPosition();

store.put(hi, there);
store.put(Bytes.increment(hi), world);

Expand All @@ -242,11 +390,18 @@ public void shouldGetRecordsWithPrefixKey() {

@Test
public void shouldReturnNullOnGetWhenDoesntExist() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockGet(mockMap);

assertThat(store.get(hello), is(nullValue()));
}

@Test
public void shouldLogPositionOnPut() {
final Map<Bytes, byte[]> mockMap = new HashMap<>();
mockPut(mockMap);
mockPosition();

context.setRecordContext(new ProcessorRecordContext(-1, INPUT_OFFSET, INPUT_PARTITION, INPUT_TOPIC_NAME, new RecordHeaders()));
context.setTime(1L);
store.put(hi, there);
Expand All @@ -264,13 +419,13 @@ public void shouldLogPositionOnPut() {
}

private StreamsConfig streamsConfigMock() {
final StreamsConfig streamsConfig = mock(StreamsConfig.class);
final StreamsConfig mockedStreamsConfig = mock(StreamsConfig.class);

final Map<String, Object> myValues = new HashMap<>();
myValues.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, true);
when(streamsConfig.originals()).thenReturn(myValues);
when(streamsConfig.values()).thenReturn(Collections.emptyMap());
when(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id");
return streamsConfig;
when(mockedStreamsConfig.originals()).thenReturn(myValues);
when(mockedStreamsConfig.values()).thenReturn(Collections.emptyMap());
when(mockedStreamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id");
return mockedStreamsConfig;
}
}
Loading