Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TSC took-time policy to guard both heap and disk tier #17190

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881))
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)
- Optimize innerhits query performance [#16937](https://github.com/opensearch-project/OpenSearch/pull/16937)
- TieredSpilloverCache took-time threshold now guards heap tier as well as disk tier [#17190](https://github.com/opensearch-project/OpenSearch/pull/17190)

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,12 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

private final Map<ICache<K, V>, TierInfo> caches;

// Policies guarding access to the cache overall.
private final List<Predicate<V>> policies;

// Policies guarding access to the disk tier.
private final List<Predicate<V>> diskPolicies;

private final TieredSpilloverCacheStatsHolder statsHolder;

private final long onHeapCacheMaxWeight;
Expand Down Expand Up @@ -220,7 +223,8 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
cacheListMap.put(onHeapCache, new TierInfo(true, TIER_DIMENSION_VALUE_ON_HEAP));
cacheListMap.put(diskCache, new TierInfo(isDiskCacheEnabled, TIER_DIMENSION_VALUE_DISK));
this.caches = Collections.synchronizedMap(cacheListMap);
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
this.policies = builder.policies;
this.diskPolicies = builder.diskPolicies; // Will never be null; builder initializes it to an empty list
this.onHeapCacheMaxWeight = onHeapCacheSizeInBytes;
this.diskCacheMaxWeight = diskCacheSizeInBytes;
}
Expand Down Expand Up @@ -257,19 +261,23 @@ public void put(ICacheKey<K> key, V value) {
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(true).apply(key);
if (cacheValueTuple == null) {
// In case it is not present in any tier, put it inside onHeap cache by default.
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
if (evaluatePoliciesList(value, policies)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to remove listener logic in computeIfAbsent, should we also trigger a removal listener from here as well?
As otherwise there is no way to communicate to the caller that the put call actually got skipped in case policy returned false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good catch, I only noticed the removal listener issue later on and forgot to add it back into put().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it isn't necessary, since the old indices stats API onCached listener only gets called on loader.load(), and for put() there's no loader involved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider tiered cache in generic terms, as the framework is agnostic of its users. If someone is using TSC and calls put(), but the item isn't added to the on-heap cache, we should notify them via a removal listener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in a fully generic setup, we shouldn't call onRemoval() in either put() or computeIfAbsent() as nothing was ever actually removed from the cache. Adding the call in computeIfAbsent() is kind of a gross hack we do because we know the IRC's external stats tracking incorrectly assumes loader.load() always results in a value entering the cache.

I'm wondering if it might be better to keep this generic and not have either method call the removal listener? This would mean the old indices stats API can be incorrect, but we're planning on removing that in 3.0 anyway.

Or, we can put it in both and define "cache removal" to include "rejected by policy". If the IRC ever called put() directly it would make the old stats API incorrect, but it never actually does call put(), so it should be fine?

try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
}
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
} else {
// Put it inside desired tier.
try (ReleasableLock ignore = writeLock.acquire()) {
for (Map.Entry<ICache<K, V>, TierInfo> entry : this.caches.entrySet()) {
if (cacheValueTuple.v2().equals(entry.getValue().tierName)) {
entry.getKey().put(key, value);
if (evaluatePoliciesList(value, policies)) {
try (ReleasableLock ignore = writeLock.acquire()) {
for (Map.Entry<ICache<K, V>, TierInfo> entry : this.caches.entrySet()) {
if (cacheValueTuple.v2().equals(entry.getValue().tierName)) {
entry.getKey().put(key, value);
}
}
updateStatsOnPut(cacheValueTuple.v2(), key, value);
}
updateStatsOnPut(cacheValueTuple.v2(), key, value);
}
}
}
Expand Down Expand Up @@ -297,13 +305,13 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = compute(key, loader, future);
Tuple<V, Boolean> computedValueTuple = compute(key, loader, future);
// Handle stats
if (loader.isLoaded()) {
if (computedValueTuple.v2()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk
// cache
// if present
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, computedValueTuple.v1());
statsHolder.incrementMisses(heapDimensionValues);
if (caches.get(diskCache).isEnabled()) {
statsHolder.incrementMisses(diskDimensionValues);
Expand All @@ -312,7 +320,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// Another thread requesting this key already loaded the value. Register a hit for the heap cache
statsHolder.incrementHits(heapDimensionValues);
}
return value;
return computedValueTuple.v1();
} else {
// Handle stats for an initial hit from getValueFromTieredCache()
if (cacheValueTuple.v2().equals(TIER_DIMENSION_VALUE_ON_HEAP)) {
Expand All @@ -327,32 +335,45 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader, CompletableFuture<Tuple<ICacheKey<K>, V>> future)
throws Exception {
private Tuple<V, Boolean> compute(
ICacheKey<K> key,
LoadAwareCacheLoader<ICacheKey<K>, V> loader,
CompletableFuture<Tuple<ICacheKey<K>, V>> future
) throws Exception {
// Handler to handle results post-processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
// a tuple of the value and a boolean for whether it entered the cache. Also before returning value, puts the value in cache
// if this is allowed by the cache policies.
boolean didPutIntoCache = false;
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Boolean> handler = (pair, ex) -> {
boolean lambdaDidPutIntoCache = false;
if (pair != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
if (evaluatePoliciesList(pair.v2(), policies)) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
// We must load the value for the policy to check it, so we can't rely on loader.isLoaded()
// to determine if the new value entered the cache since the policy may have blocked it.
// Instead return this boolean as well.
lambdaDidPutIntoCache = true;
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
// Safe to remove from the map even if policy blocks value from entering the cache
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
return null;
return lambdaDidPutIntoCache;
};
V value = null;
if (future == null) {
future = completableFutureMap.get(key);
future.handle(handler);
CompletableFuture<Boolean> didPutIntoCacheFuture = future.handle(handler);
try {
value = loader.load(key);
} catch (Exception ex) {
Expand All @@ -365,6 +386,8 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
throw new ExecutionException(npe);
} else {
future.complete(new Tuple<>(key, value));
// If the future is completed, didPutIntoCacheFuture should also be completed, so it's safe to run .get() on it
didPutIntoCache = didPutIntoCacheFuture.get();
}
} else {
try {
Expand All @@ -373,7 +396,7 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
throw new IllegalStateException(ex);
}
}
return value;
return new Tuple<>(value, didPutIntoCache);
}

@Override
Expand Down Expand Up @@ -442,7 +465,9 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier
boolean exceptionOccurredOnDiskCachePut = false;
boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue());
boolean canCacheOnDisk = caches.get(diskCache).isEnabled()
&& wasEvicted
&& evaluatePoliciesList(notification.getValue(), diskPolicies);
if (canCacheOnDisk) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
Expand All @@ -465,8 +490,8 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue(), countEvictionTowardsTotal);
}

boolean evaluatePolicies(V value) {
for (Predicate<V> policy : policies) {
boolean evaluatePoliciesList(V value, List<Predicate<V>> policiesList) {
for (Predicate<V> policy : policiesList) {
if (!policy.test(value)) {
return false;
}
Expand Down Expand Up @@ -873,7 +898,8 @@ public static class Builder<K, V> {
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
private final ArrayList<Predicate<V>> policies = new ArrayList<>();
private final List<Predicate<V>> policies = new ArrayList<>();
private final List<Predicate<V>> diskPolicies = new ArrayList<>();

private int numberOfSegments;
private long onHeapCacheSizeInBytes;
Expand Down Expand Up @@ -945,7 +971,7 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* Set a cache policy to be used to limit access to this cache.
* @param policy the policy
* @return builder
*/
Expand All @@ -955,7 +981,7 @@ public Builder<K, V> addPolicy(Predicate<V> policy) {
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* Set multiple policies to be used to limit access to this cache.
* @param policies the policies
* @return builder
*/
Expand All @@ -964,6 +990,26 @@ public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
return this;
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* @param diskPolicy the policy
* @return builder
*/
public Builder<K, V> addDiskPolicy(Predicate<V> diskPolicy) {
this.diskPolicies.add(diskPolicy);
return this;
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* @param diskPolicies the policies
* @return builder
*/
public Builder<K, V> addDiskPolicies(List<Predicate<V>> diskPolicies) {
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
this.diskPolicies.addAll(diskPolicies);
return this;
}

/**
* Sets number of segments for tiered cache
* @param numberOfSegments number of segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class TieredSpilloverCacheSettings {
*/
public static final long MIN_DISK_CACHE_SIZE_IN_BYTES = 10485760L;

public static final TimeValue DEFAULT_TOOK_TIME_THRESHOLD = new TimeValue(10, TimeUnit.MILLISECONDS);

/**
* Setting which defines the onHeap cache store to be used in TieredSpilloverCache.
*
Expand Down Expand Up @@ -109,13 +111,15 @@ public class TieredSpilloverCacheSettings {
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
* Setting defining the minimum took time for a query to be allowed in the cache.
* For backwards compatibility, the setting key still has "disk" in it, but the threshold is applied to enter into
* any tier of the cache.
*/
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
DEFAULT_TOOK_TIME_THRESHOLD,
TimeValue.ZERO, // Minimum value for this setting
NodeScope,
Setting.Property.Dynamic
Expand Down Expand Up @@ -143,7 +147,7 @@ public class TieredSpilloverCacheSettings {
for (CacheType cacheType : CacheType.values()) {
concreteTookTimePolicySettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
TIERED_SPILLOVER_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
diskCacheSettingMap.put(
cacheType,
Expand Down
Loading
Loading