Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28963 Updating Quota Factors is too expensive #6451

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
Expand All @@ -28,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
Expand All @@ -48,6 +50,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;

/**
* Cache that keeps track of the quota settings for the users and tables that are interacting with
* it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
Expand All @@ -61,6 +67,10 @@ public class QuotaCache implements Stoppable {
private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);

public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
"hbase.quota.cache.ttl.region.states.ms";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for including a unit indicator in the config key! 🙇

public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
"hbase.quota.cache.ttl.servers.size.ms";

// defines the request attribute key which, when provided, will override the request's username
// from the perspective of user quotas
Expand Down Expand Up @@ -102,7 +112,7 @@ public void start() throws IOException {
// TODO: This will be replaced once we have the notification bus ready.
Configuration conf = rsServices.getConfiguration();
int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
refreshChore = new QuotaRefresherChore(period, this);
refreshChore = new QuotaRefresherChore(conf, period, this);
rsServices.getChoreService().scheduleChore(refreshChore);
}

Expand Down Expand Up @@ -140,8 +150,7 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
*/
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
() -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L),
this::triggerCacheRefresh);
() -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L));
Comment on lines -143 to +153
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change here, and in getQuotaState, are critical. These changes ensure that each cache miss does not trigger an immediate refresh — particularly given that cache entries are evicted after 5 refresh periods, this approach is too heavy handed.

}

/**
Expand Down Expand Up @@ -202,7 +211,7 @@ private String getQuotaUserName(final UserGroupInformation ugi) {
* returned and the quota request will be enqueued for the next cache refresh.
*/
private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
return computeIfAbsent(quotasMap, key, QuotaState::new);
}

void triggerCacheRefresh() {
Expand Down Expand Up @@ -233,8 +242,33 @@ Map<String, UserQuotaState> getUserQuotaCache() {
private class QuotaRefresherChore extends ScheduledChore {
private long lastUpdate = 0;

public QuotaRefresherChore(final int period, final Stoppable stoppable) {
// Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
// So we cache the results to reduce that load.
private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics;
private final RefreshableExpiringValueCache<Integer> regionServersSize;

public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) {
super("QuotaRefresherChore", stoppable, period);

Duration tableRegionStatesCacheTtl =
Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period));
this.tableRegionStatesClusterMetrics =
new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up JIRA, should these configuration values be hot-reloadable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would definitely be nice, and would probably be a larger refactor so it would definitely be nice to make that a separate issue. The quota refresh period is also static, and should probably be made dynamic in that same push

tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin()
.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)));

Duration regionServersSizeCacheTtl =
Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period));
regionServersSize =
new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl,
() -> rsServices.getConnection().getAdmin().getRegionServers().size());
}

@Override
public synchronized boolean triggerNow() {
tableRegionStatesClusterMetrics.invalidate();
regionServersSize.invalidate();
return super.triggerNow();
}

@Override
Expand Down Expand Up @@ -395,21 +429,40 @@ private <K, V extends QuotaState> void fetch(final String type,
* over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
*/
private void updateQuotaFactors() {
// Update machine quota factor
ClusterMetrics clusterMetrics;
try {
clusterMetrics = rsServices.getConnection().getAdmin()
.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT));
} catch (IOException e) {
LOG.warn("Failed to get cluster metrics needed for updating quotas", e);
return;
boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is relevant here, but this check won't be atomic. the contents of tableQuotaCache can change while checking userQuotaCache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay — the two conditions are definitely independent of each other, and the implication of that case would be that your tableQuotaCache addition missed the boat for this refresh and will be reflected in subsequent refreshes

|| userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
if (hasTableQuotas) {
updateTableMachineQuotaFactors();
} else {
updateOnlyMachineQuotaFactors();
Comment on lines +434 to +437
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check ensures that we only pull down every region state if we actually need to. Without table machine quotas, there is no point

}
}

int rsSize = clusterMetrics.getServersName().size();
if (rsSize != 0) {
// TODO if use rs group, the cluster limit should be shared by the rs group
machineQuotaFactor = 1.0 / rsSize;
/**
* This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if
* we don't have any table quotas in the cache.
*/
private void updateOnlyMachineQuotaFactors() {
Optional<Integer> rsSize = regionServersSize.get();
if (rsSize.isPresent()) {
updateMachineQuotaFactors(rsSize.get());
} else {
regionServersSize.refresh();
}
}

/**
* This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine
* factors as well. This relies on a more expensive query for ClusterMetrics.
*/
private void updateTableMachineQuotaFactors() {
Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get();
if (!clusterMetricsMaybe.isPresent()) {
tableRegionStatesClusterMetrics.refresh();
return;
}
ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
updateMachineQuotaFactors(clusterMetrics.getServersName().size());

Map<TableName, RegionStatesCount> tableRegionStatesCount =
clusterMetrics.getTableRegionStatesCount();
Expand All @@ -436,6 +489,53 @@ private void updateQuotaFactors() {
}
}
}

private void updateMachineQuotaFactors(int rsSize) {
if (rsSize != 0) {
// TODO if use rs group, the cluster limit should be shared by the rs group
machineQuotaFactor = 1.0 / rsSize;
}
}
}

static class RefreshableExpiringValueCache<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this class because I don't think there's a good keyless equivalent to a LoadingCache, and a memoized supplier does not offer all of the functionality that I'd like (on-demand refresh, invalidation)

private final String name;
private final LoadingCache<String, Optional<T>> cache;

RefreshableExpiringValueCache(String name, Duration refreshPeriod,
ThrowingSupplier<T> supplier) {
this.name = name;
this.cache =
CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS)
.build(new CacheLoader<>() {
@Override
public Optional<T> load(String key) {
try {
return Optional.of(supplier.get());
} catch (Exception e) {
LOG.warn("Failed to refresh cache {}", name, e);
return Optional.empty();
}
}
});
}

Optional<T> get() {
return cache.getUnchecked(name);
}

void refresh() {
cache.refresh(name);
}

void invalidate() {
cache.invalidate(name);
}
}

@FunctionalInterface
static interface ThrowingSupplier<T> {
T get() throws Exception;
}

static interface Fetcher<Key, Value> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public void setQuotas(final String namespace, Quotas quotas) {
namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
}

public boolean hasTableLimiters() {
return tableLimiters != null && !tableLimiters.isEmpty();
}

private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, final K key,
final Quotas quotas) {
if (limiters == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class TestQuotaCache {
HBaseClassTestRule.forClass(TestQuotaCache.class);

private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final int REFRESH_TIME = 30_000;
private static final int REFRESH_TIME_MS = 1000;

@After
public void tearDown() throws Exception {
Expand All @@ -52,7 +52,7 @@ public void tearDown() throws Exception {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME_MS);
TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM, 1000);

TEST_UTIL.startMiniCluster(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,39 +78,32 @@ public static void tearDownAfterClass() throws Exception {
@Test
public void testUserGlobalThrottleWithCustomOverride() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
final String userOverrideWithQuota = User.getCurrent().getShortName() + "123";
final String userOverrideWithQuota = User.getCurrent().getShortName();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a few small test changes to ensure that refreshes were called appropriately. Because of the logic in ThrottleQuotaTestUtil.triggerUserCacheRefresh assuming that we're waiting on throttling for the current user, I also had to change around the procedure of this test a bit to ensure that the current user is who we're throttling (rather than the current user plus a suffix, like we were before). But the logic is still sound


// Add 6req/min limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

Table tableWithThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes(userOverrideWithQuota)).build();
Table tableWithoutThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
Bytes.toBytes(userOverrideWithQuota))
.build();
Table tableWithoutThrottle2 =
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build();
.setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes("anotherUser")).build();

// warm things up
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2);

// should reject some requests
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
// should accept all puts
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
// should accept all puts
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));

// Remove all the limits
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
Thread.sleep(60_000);
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
}

}