Skip to content

Commit

Permalink
If a chunk fails to load when assigned, run the eviction logic (#22)
Browse files Browse the repository at this point in the history
Co-authored-by: Nora Howard <[email protected]>
  • Loading branch information
autata and baroquebobcat authored Feb 7, 2025
1 parent 5b092fd commit 28eb1b7
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 9 deletions.
14 changes: 9 additions & 5 deletions astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -341,7 +342,10 @@ private void cacheNodeListener(CacheSlotMetadata cacheSlotMetadata) {
Thread.ofVirtual().start(() -> handleChunkAssignment(cacheSlotMetadata));
} else if (newSlotState.equals(Metadata.CacheSlotMetadata.CacheSlotState.EVICT)) {
LOG.info("Chunk - EVICT received - {}", cacheSlotMetadata);
if (!cacheSlotLastKnownState.equals(Metadata.CacheSlotMetadata.CacheSlotState.LIVE)) {
if (!EnumSet.of(
Metadata.CacheSlotMetadata.CacheSlotState.LIVE,
Metadata.CacheSlotMetadata.CacheSlotState.LOADING)
.contains(cacheSlotLastKnownState)) {
LOG.warn(
"Unexpected state transition from {} to {} - {}",
cacheSlotLastKnownState,
Expand Down Expand Up @@ -433,11 +437,11 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS),
FileUtils.byteCountToDisplaySize(FileUtils.sizeOfDirectory(dataDirectory.toFile())));
} catch (Exception e) {
// if any error occurs during the chunk assignment, try to release the slot for re-assignment,
// disregarding any errors
setChunkMetadataState(cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.FREE);
LOG.error("Error handling chunk assignment", e);
LOG.error("Error handling chunk assignment for cache slot {}:", cacheSlotMetadata, e);
assignmentTimer.stop(chunkAssignmentTimerFailure);
// If any error occurs during the chunk assignment, evict the chunk so eviction code cleans up
// the files.
setChunkMetadataState(cacheSlotMetadata, Metadata.CacheSlotMetadata.CacheSlotState.EVICT);
} finally {
chunkAssignmentLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.slack.astra.metadata.replica.ReplicaMetadata;
import com.slack.astra.metadata.replica.ReplicaMetadataStore;
import com.slack.astra.metadata.schema.ChunkSchema;
import com.slack.astra.metadata.schema.FieldType;
import com.slack.astra.metadata.schema.LuceneFieldDef;
import com.slack.astra.metadata.search.SearchMetadata;
import com.slack.astra.metadata.search.SearchMetadataStore;
import com.slack.astra.metadata.snapshot.SnapshotMetadata;
Expand Down Expand Up @@ -121,7 +123,7 @@ public void shouldHandleChunkLivecycle() throws Exception {
// setup Zk, BlobFs so data can be loaded
initializeZkReplica(curatorFramework, replicaId, snapshotId);
initializeZkSnapshot(curatorFramework, snapshotId, 0);
initializeBlobStorageWithIndex(snapshotId);
initializeBlobStorageWithIndex(snapshotId, false);

SearchContext searchContext =
SearchContext.fromConfig(AstraConfig.getCacheConfig().getServerConfig());
Expand Down Expand Up @@ -392,7 +394,7 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception {
// setup Zk, BlobFs so data can be loaded
initializeZkReplica(curatorFramework, replicaId, snapshotId);
initializeZkSnapshot(curatorFramework, snapshotId, 0);
initializeBlobStorageWithIndex(snapshotId);
initializeBlobStorageWithIndex(snapshotId, false);

ReadOnlyChunkImpl<LogMessage> readOnlyChunk =
new ReadOnlyChunkImpl<>(
Expand Down Expand Up @@ -500,7 +502,7 @@ public void shouldHandleDynamicChunkSizeLifecycle() throws Exception {
// setup Zk, BlobFs so data can be loaded
initializeZkReplica(curatorFramework, replicaId, snapshotId);
initializeZkSnapshot(curatorFramework, snapshotId, 29);
initializeBlobStorageWithIndex(snapshotId);
initializeBlobStorageWithIndex(snapshotId, false);
initializeCacheNodeAssignment(
cacheNodeAssignmentStore, assignmentId, snapshotId, cacheNodeId, replicaSet, replicaId);

Expand Down Expand Up @@ -582,6 +584,84 @@ public void shouldHandleDynamicChunkSizeLifecycle() throws Exception {
curatorFramework.unwrap().close();
}

@Test
public void shouldEvictChunkOnAssignmentFailure() throws Exception {
AstraConfigs.AstraConfig astraConfig = makeCacheConfig();
AstraConfigs.ZookeeperConfig zkConfig =
AstraConfigs.ZookeeperConfig.newBuilder()
.setZkConnectString(testingServer.getConnectString())
.setZkPathPrefix("shouldEvictChunkOnAssignmentFailure")
.setZkSessionTimeoutMs(5000)
.setZkConnectionTimeoutMs(5000)
.setSleepBetweenRetriesMs(1000)
.build();

AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig);
ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework);
SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework);
SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true);
CacheSlotMetadataStore cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework);

String replicaId = "foo";
String snapshotId = "bar";

// Setup ZK and BlobFs.
initializeZkReplica(curatorFramework, replicaId, snapshotId);
initializeZkSnapshot(curatorFramework, snapshotId, 0);

// Introduce an issue with the schema, so it will fail to load.
initializeBlobStorageWithIndex(snapshotId, true);

ReadOnlyChunkImpl<LogMessage> readOnlyChunk =
new ReadOnlyChunkImpl<>(
curatorFramework,
meterRegistry,
blobStore,
SearchContext.fromConfig(astraConfig.getCacheConfig().getServerConfig()),
astraConfig.getS3Config().getS3Bucket(),
astraConfig.getCacheConfig().getDataDirectory(),
astraConfig.getCacheConfig().getReplicaSet(),
cacheSlotMetadataStore,
replicaMetadataStore,
snapshotMetadataStore,
searchMetadataStore);

// Wait for chunk to register
await()
.until(
() ->
readOnlyChunk.getChunkMetadataState()
== Metadata.CacheSlotMetadata.CacheSlotState.FREE);

assignReplicaToChunk(cacheSlotMetadataStore, replicaId, readOnlyChunk);

// The expected state transitions are:
// ASSIGNED -> LOADING (encounters manufactured error) -> EVICT -> EVICTING -> FREE
// The final state being FREE.
await()
.until(
() ->
readOnlyChunk.getChunkMetadataState()
== Metadata.CacheSlotMetadata.CacheSlotState.FREE);

// Ensure that the search metadata was not registered
assertThat(searchMetadataStore.listSync().size()).isEqualTo(0);

// Verify eviction metrics were updated. We expect a successful eviction and a failed
// assignment.
assertThat(meterRegistry.get(CHUNK_EVICTION_TIMER).tag("successful", "true").timer().count())
.isEqualTo(1);
assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count())
.isEqualTo(1);

// verify that the directory has been cleaned up
try (var files = java.nio.file.Files.list(readOnlyChunk.getDataDirectory())) {
assertThat(files.findFirst().isPresent()).isFalse();
}

curatorFramework.unwrap().close();
}

private void assignReplicaToChunk(
CacheSlotMetadataStore cacheSlotMetadataStore,
String replicaId,
Expand Down Expand Up @@ -626,7 +706,7 @@ private void initializeZkReplica(
false));
}

private void initializeBlobStorageWithIndex(String snapshotId) throws Exception {
private void initializeBlobStorageWithIndex(String snapshotId, boolean badData) throws Exception {
LuceneIndexStoreImpl logStore =
LuceneIndexStoreImpl.makeLogStore(
Files.newTemporaryFolder(),
Expand All @@ -646,6 +726,15 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception
// Create schema file to upload
ChunkSchema chunkSchema =
new ChunkSchema(snapshotId, logStore.getSchema(), new ConcurrentHashMap<>());

// Introduce bad data in the schema. Specifically, the key in the map should match the field
// name when the data is "good".
if (badData) {
chunkSchema.fieldDefMap.put(
"field_name",
new LuceneFieldDef(
"field_name_does_not_match", FieldType.INTEGER.name, true, true, true));
}
File schemaFile = new File(dirPath + "/" + SCHEMA_FILE_NAME);
ChunkSchema.serializeToFile(chunkSchema, schemaFile);

Expand Down

0 comments on commit 28eb1b7

Please sign in to comment.