Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@

package org.apache.hadoop.hive.metastore;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.leader.LeaderElection;
import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.ql.stats.StatsUpdaterThread;
import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
Expand All @@ -31,18 +37,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Base class for HMS leader config testing.
*/
class MetastoreHousekeepingLeaderTestBase {
abstract class MetastoreHousekeepingLeaderTestBase {
private static final Logger LOG = LoggerFactory.getLogger(MetastoreHousekeepingLeaderTestBase.class);
private static HiveMetaStoreClient client;
protected static Configuration conf = MetastoreConf.newMetastoreConf();
protected Configuration conf;
private static Warehouse warehouse;
private static boolean isServerStarted = false;
private static int port;
Expand All @@ -54,12 +65,15 @@ class MetastoreHousekeepingLeaderTestBase {
static Map<String, Boolean> threadNames = new HashMap<>();
static Map<Class<? extends Thread>, Boolean> threadClasses = new HashMap<>();

void internalSetup(final String leaderHostName, boolean configuredLeader) throws Exception {
void setup(final String leaderHostName, Configuration configuration) throws Exception {
this.conf = configuration;
MetaStoreTestUtils.setConfForStandloneMode(conf);
MetastoreConf.setVar(conf, ConfVars.THRIFT_BIND_HOST, "localhost");
MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION,
configuredLeader ? "host" : "lock");
leaderHostName != null ? LeaderElectionFactory.Method.HOST.name() : LeaderElectionFactory.Method.LOCK.name());
if (leaderHostName != null) {
MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
}
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);

Expand Down Expand Up @@ -193,5 +207,199 @@ private void resetThreadStatus() {
threadNames.forEach((name, status) -> threadNames.put(name, false));
threadClasses.forEach((thread, status) -> threadClasses.put(thread, false));
}

static class CombinedLeaderElector implements AutoCloseable {
List<Pair<TableName, LeaderElection<TableName>>> elections = new ArrayList<>();
private final Configuration configuration;
private String name;

CombinedLeaderElector(Configuration conf) throws IOException {
this.configuration = conf;
for (LeaderElectionContext.TTYPE type : LeaderElectionContext.TTYPE.values()) {
TableName table = type.getTableName();
elections.add(Pair.of(table, new LeaseLeaderElection()));
}
}

public void tryBeLeader() throws Exception {
int i = 0;
for (Pair<TableName, LeaderElection<TableName>> election : elections) {
LeaderElection<TableName> le = election.getRight();
le.setName(name + "-" + i++);
le.tryBeLeader(configuration, election.getLeft());
}
}

public boolean isLeader() {
boolean isLeader = true;
for (Pair<TableName, LeaderElection<TableName>> election : elections) {
isLeader &= election.getRight().isLeader();
}
return isLeader;
}

public void setName(String name) {
this.name = name;
}

@Override
public void close() throws Exception {
for (Pair<TableName, LeaderElection<TableName>> election : elections) {
election.getRight().close();
}
}
}

static class ReleaseAndRequireLease extends LeaseLeaderElection {
private static CountDownLatch latch;
private final Configuration configuration;
private final boolean needRenewLease;
private TableName tableName;

public static void setMonitor(CountDownLatch latch) {
ReleaseAndRequireLease.latch = latch;
}
public static void reset() {
ReleaseAndRequireLease.latch = null;
}

public ReleaseAndRequireLease(Configuration conf, boolean needRenewLease) throws IOException {
super();
this.configuration = conf;
this.needRenewLease = needRenewLease;
}

@Override
public void setName(String name) {
super.setName(name);
LeaderElectionContext.TTYPE type = null;
for (LeaderElectionContext.TTYPE value : LeaderElectionContext.TTYPE.values()) {
if (value.getName().equalsIgnoreCase(name)) {
type = value;
break;
}
}
if (type == null) {
// This shouldn't happen at all
throw new AssertionError("Unknown elector name: " + name);
}
this.tableName = type.getTableName();
}

@Override
protected void notifyListener() {
ScheduledExecutorService service = null;
if (!isLeader) {
try {
service = ThreadPool.getPool();
} catch (Exception ignored) {
}
}
super.notifyListener();
if (isLeader) {
if (!needRenewLease) {
super.shutdownWatcher();
// In our tests, the time spent on notifying the listener might be greater than the lease timeout,
// which makes the leader loss the leadership quickly after wake up, and kill all housekeeping services.
// Make sure the leader is still valid while notifying the listener, and switch to ReleaseAndRequireWatcher
// after all listeners finish their work.
heartbeater = new ReleaseAndRequireWatcher(configuration, tableName);
heartbeater.startWatch();
}
} else {
if (service != null) {
// If the housekeeping task is running behind
Assert.assertTrue(service.isShutdown());
// Interrupt all sleeping tasks
service.shutdownNow();
try {
// This is the last one get notified, sleep some time to make sure all other
// services have been stopped before return
Thread.sleep(12000);
} catch (InterruptedException ignore) {
}
}
}
if (latch != null) {
latch.countDown();
}
}

// For testing purpose only, lock would become timeout and then acquire it again
private class ReleaseAndRequireWatcher extends LeaseWatcher {
long timeout;
public ReleaseAndRequireWatcher(Configuration conf,
TableName tableName) {
super(conf, tableName);
timeout = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
setName("ReleaseAndRequireWatcher-" + ((name != null) ? name + "-" : "") + ID.incrementAndGet());
}

@Override
public void beforeRun() {
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
// ignore this
}
}

@Override
public void runInternal() {
shutDown();
// The timeout lock should be cleaned,
// sleep some time to let others take the chance to become the leader
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// ignore
}
// Acquire the lock again
conf = new Configuration(conf);
reclaim();
}
}
}

public void checkHouseKeepingThreadExistence(boolean isLeader) throws Exception {
searchHousekeepingThreads();

// Verify existence of threads
for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
if (entry.getValue()) {
LOG.info("Found thread with name {}", entry.getKey());
} else {
LOG.info("No thread found with name {}", entry.getKey());
}
if (isLeader) {
Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue());
} else {
Assert.assertFalse("Thread with name " + entry.getKey() + " found.", entry.getValue());
}
}

for (Map.Entry<Class<? extends Thread>, Boolean> entry : threadClasses.entrySet()) {
if (isLeader) {
if (entry.getValue()) {
LOG.info("Found thread for {}", entry.getKey().getSimpleName());
}
Assert.assertTrue("No thread found for class " + entry.getKey().getSimpleName(), entry.getValue());
} else {
// A non-leader HMS will still run the configured number of Compaction worker threads.
if (entry.getKey() == Worker.class) {
if (entry.getValue()) {
LOG.info("Thread found for " + entry.getKey().getSimpleName());
}
} else {
if (!entry.getValue()) {
LOG.info("No thread found for " + entry.getKey().getSimpleName());
}
Assert.assertFalse("Thread found for class " + entry.getKey().getSimpleName(),
entry.getValue());
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,24 @@

package org.apache.hadoop.hive.metastore;

import org.junit.Assert;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Test for specifying a valid hostname as HMS leader.
*/
public class TestMetastoreHousekeepingLeader extends MetastoreHousekeepingLeaderTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestMetastoreHousekeepingLeader.class);

@Before
public void setUp() throws Exception {
internalSetup("localhost", true);
setup("localhost", MetastoreConf.newMetastoreConf());
}

@Test
public void testHouseKeepingThreadExistence() throws Exception {
searchHousekeepingThreads();

// Verify existence of threads
for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
if (entry.getValue()) {
LOG.info("Found thread with name " + entry.getKey());
}
Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue());
}

for (Map.Entry<Class<? extends Thread>, Boolean> entry : threadClasses.entrySet()) {
if (entry.getValue()) {
LOG.info("Found thread for " + entry.getKey().getSimpleName());
}
Assert.assertTrue("No thread found for class " + entry.getKey().getSimpleName(),
entry.getValue());
}
checkHouseKeepingThreadExistence(true);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,24 @@

package org.apache.hadoop.hive.metastore;

import org.junit.Assert;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Test for specifying empty HMS leader.
*/
public class TestMetastoreHousekeepingLeaderEmptyConfig extends MetastoreHousekeepingLeaderTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class);

@Before
public void setUp() throws Exception {
// Empty string for leader indicates that the HMS is leader.
internalSetup("", true);
setup("", MetastoreConf.newMetastoreConf());
}

@Test
public void testHouseKeepingThreadExistence() throws Exception {
searchHousekeepingThreads();

// Verify existence of threads
for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
if (entry.getValue()) {
LOG.info("Found thread with name " + entry.getKey());
}
Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue());
}

for (Map.Entry<Class<? extends Thread>, Boolean> entry : threadClasses.entrySet()) {
if (entry.getValue()) {
LOG.info("Found thread for " + entry.getKey().getSimpleName());
}
Assert.assertTrue("No thread found for class " + entry.getKey().getSimpleName(),
entry.getValue());
}
checkHouseKeepingThreadExistence(true);
}

}
Loading