diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java index e8ec181a684d..61c81c29af7f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java @@ -18,10 +18,16 @@ package org.apache.hadoop.hive.metastore; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.leader.LeaderElection; +import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext; +import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory; +import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.ql.stats.StatsUpdaterThread; import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; @@ -31,18 +37,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Base class for HMS leader config testing. */ -class MetastoreHousekeepingLeaderTestBase { +abstract class MetastoreHousekeepingLeaderTestBase { private static final Logger LOG = LoggerFactory.getLogger(MetastoreHousekeepingLeaderTestBase.class); private static HiveMetaStoreClient client; - protected static Configuration conf = MetastoreConf.newMetastoreConf(); + protected Configuration conf; private static Warehouse warehouse; private static boolean isServerStarted = false; private static int port; @@ -54,12 +65,15 @@ class MetastoreHousekeepingLeaderTestBase { static Map threadNames = new HashMap<>(); static Map, Boolean> threadClasses = new HashMap<>(); - void internalSetup(final String leaderHostName, boolean configuredLeader) throws Exception { + void setup(final String leaderHostName, Configuration configuration) throws Exception { + this.conf = configuration; MetaStoreTestUtils.setConfForStandloneMode(conf); MetastoreConf.setVar(conf, ConfVars.THRIFT_BIND_HOST, "localhost"); - MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName); MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, - configuredLeader ? "host" : "lock"); + leaderHostName != null ? LeaderElectionFactory.Method.HOST.name() : LeaderElectionFactory.Method.LOCK.name()); + if (leaderHostName != null) { + MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName); + } MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); @@ -193,5 +207,199 @@ private void resetThreadStatus() { threadNames.forEach((name, status) -> threadNames.put(name, false)); threadClasses.forEach((thread, status) -> threadClasses.put(thread, false)); } + + static class CombinedLeaderElector implements AutoCloseable { + List>> elections = new ArrayList<>(); + private final Configuration configuration; + private String name; + + CombinedLeaderElector(Configuration conf) throws IOException { + this.configuration = conf; + for (LeaderElectionContext.TTYPE type : LeaderElectionContext.TTYPE.values()) { + TableName table = type.getTableName(); + elections.add(Pair.of(table, new LeaseLeaderElection())); + } + } + + public void tryBeLeader() throws Exception { + int i = 0; + for (Pair> election : elections) { + LeaderElection le = election.getRight(); + le.setName(name + "-" + i++); + le.tryBeLeader(configuration, election.getLeft()); + } + } + + public boolean isLeader() { + boolean isLeader = true; + for (Pair> election : elections) { + isLeader &= election.getRight().isLeader(); + } + return isLeader; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public void close() throws Exception { + for (Pair> election : elections) { + election.getRight().close(); + } + } + } + + static class ReleaseAndRequireLease extends LeaseLeaderElection { + private static CountDownLatch latch; + private final Configuration configuration; + private final boolean needRenewLease; + private TableName tableName; + + public static void setMonitor(CountDownLatch latch) { + ReleaseAndRequireLease.latch = latch; + } + public static void reset() { + ReleaseAndRequireLease.latch = null; + } + + public ReleaseAndRequireLease(Configuration conf, boolean needRenewLease) throws IOException { + super(); + this.configuration = conf; + this.needRenewLease = needRenewLease; + } + + @Override + public void setName(String name) { + super.setName(name); + LeaderElectionContext.TTYPE type = null; + for (LeaderElectionContext.TTYPE value : LeaderElectionContext.TTYPE.values()) { + if (value.getName().equalsIgnoreCase(name)) { + type = value; + break; + } + } + if (type == null) { + // This shouldn't happen at all + throw new AssertionError("Unknown elector name: " + name); + } + this.tableName = type.getTableName(); + } + + @Override + protected void notifyListener() { + ScheduledExecutorService service = null; + if (!isLeader) { + try { + service = ThreadPool.getPool(); + } catch (Exception ignored) { + } + } + super.notifyListener(); + if (isLeader) { + if (!needRenewLease) { + super.shutdownWatcher(); + // In our tests, the time spent on notifying the listener might be greater than the lease timeout, + // which makes the leader loss the leadership quickly after wake up, and kill all housekeeping services. + // Make sure the leader is still valid while notifying the listener, and switch to ReleaseAndRequireWatcher + // after all listeners finish their work. + heartbeater = new ReleaseAndRequireWatcher(configuration, tableName); + heartbeater.startWatch(); + } + } else { + if (service != null) { + // If the housekeeping task is running behind + Assert.assertTrue(service.isShutdown()); + // Interrupt all sleeping tasks + service.shutdownNow(); + try { + // This is the last one get notified, sleep some time to make sure all other + // services have been stopped before return + Thread.sleep(12000); + } catch (InterruptedException ignore) { + } + } + } + if (latch != null) { + latch.countDown(); + } + } + + // For testing purpose only, lock would become timeout and then acquire it again + private class ReleaseAndRequireWatcher extends LeaseWatcher { + long timeout; + public ReleaseAndRequireWatcher(Configuration conf, + TableName tableName) { + super(conf, tableName); + timeout = MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000; + setName("ReleaseAndRequireWatcher-" + ((name != null) ? name + "-" : "") + ID.incrementAndGet()); + } + + @Override + public void beforeRun() { + try { + Thread.sleep(timeout); + } catch (InterruptedException e) { + // ignore this + } + } + + @Override + public void runInternal() { + shutDown(); + // The timeout lock should be cleaned, + // sleep some time to let others take the chance to become the leader + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // ignore + } + // Acquire the lock again + conf = new Configuration(conf); + reclaim(); + } + } + } + + public void checkHouseKeepingThreadExistence(boolean isLeader) throws Exception { + searchHousekeepingThreads(); + + // Verify existence of threads + for (Map.Entry entry : threadNames.entrySet()) { + if (entry.getValue()) { + LOG.info("Found thread with name {}", entry.getKey()); + } else { + LOG.info("No thread found with name {}", entry.getKey()); + } + if (isLeader) { + Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue()); + } else { + Assert.assertFalse("Thread with name " + entry.getKey() + " found.", entry.getValue()); + } + } + + for (Map.Entry, Boolean> entry : threadClasses.entrySet()) { + if (isLeader) { + if (entry.getValue()) { + LOG.info("Found thread for {}", entry.getKey().getSimpleName()); + } + Assert.assertTrue("No thread found for class " + entry.getKey().getSimpleName(), entry.getValue()); + } else { + // A non-leader HMS will still run the configured number of Compaction worker threads. + if (entry.getKey() == Worker.class) { + if (entry.getValue()) { + LOG.info("Thread found for " + entry.getKey().getSimpleName()); + } + } else { + if (!entry.getValue()) { + LOG.info("No thread found for " + entry.getKey().getSimpleName()); + } + Assert.assertFalse("Thread found for class " + entry.getKey().getSimpleName(), + entry.getValue()); + } + } + } + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java index 3df58ed2919b..ae7cf125759a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java @@ -18,44 +18,24 @@ package org.apache.hadoop.hive.metastore; -import org.junit.Assert; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; /** * Test for specifying a valid hostname as HMS leader. */ public class TestMetastoreHousekeepingLeader extends MetastoreHousekeepingLeaderTestBase { - private static final Logger LOG = LoggerFactory.getLogger(TestMetastoreHousekeepingLeader.class); @Before public void setUp() throws Exception { - internalSetup("localhost", true); + setup("localhost", MetastoreConf.newMetastoreConf()); } @Test public void testHouseKeepingThreadExistence() throws Exception { - searchHousekeepingThreads(); - - // Verify existence of threads - for (Map.Entry entry : threadNames.entrySet()) { - if (entry.getValue()) { - LOG.info("Found thread with name " + entry.getKey()); - } - Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue()); - } - - for (Map.Entry, Boolean> entry : threadClasses.entrySet()) { - if (entry.getValue()) { - LOG.info("Found thread for " + entry.getKey().getSimpleName()); - } - Assert.assertTrue("No thread found for class " + entry.getKey().getSimpleName(), - entry.getValue()); - } + checkHouseKeepingThreadExistence(true); } + } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java index 7f05902cc8f6..69f01a85c2c4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java @@ -18,45 +18,24 @@ package org.apache.hadoop.hive.metastore; -import org.junit.Assert; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; /** * Test for specifying empty HMS leader. */ public class TestMetastoreHousekeepingLeaderEmptyConfig extends MetastoreHousekeepingLeaderTestBase { - private static final Logger LOG = - LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class); @Before public void setUp() throws Exception { // Empty string for leader indicates that the HMS is leader. - internalSetup("", true); + setup("", MetastoreConf.newMetastoreConf()); } @Test public void testHouseKeepingThreadExistence() throws Exception { - searchHousekeepingThreads(); - - // Verify existence of threads - for (Map.Entry entry : threadNames.entrySet()) { - if (entry.getValue()) { - LOG.info("Found thread with name " + entry.getKey()); - } - Assert.assertTrue("No thread with name " + entry.getKey() + " found.", entry.getValue()); - } - - for (Map.Entry, Boolean> entry : threadClasses.entrySet()) { - if (entry.getValue()) { - LOG.info("Found thread for " + entry.getKey().getSimpleName()); - } - Assert.assertTrue("No thread found for class " + entry.getKey().getSimpleName(), - entry.getValue()); - } + checkHouseKeepingThreadExistence(true); } + } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java index 4a0212d6860d..1a113c9cc5c2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java @@ -18,53 +18,24 @@ package org.apache.hadoop.hive.metastore; -import org.apache.hadoop.hive.ql.txn.compactor.Worker; -import org.junit.Assert; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; /** * Test for specifying HMS leader other than the current one. */ public class TestMetastoreHousekeepingNonLeader extends MetastoreHousekeepingLeaderTestBase { - private static final Logger LOG = - LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class); @Before public void setUp() throws Exception { // Empty string for leader indicates that the HMS is leader. - internalSetup("some_non_leader_host.domain1.domain", true); + setup("some_non_leader_host.domain1.domain", MetastoreConf.newMetastoreConf()); } @Test public void testHouseKeepingThreadExistence() throws Exception { - searchHousekeepingThreads(); - - // Verify existence of threads - for (Map.Entry entry : threadNames.entrySet()) { - if (!entry.getValue()) { - LOG.info("No thread found with name " + entry.getKey()); - } - Assert.assertFalse("Thread with name " + entry.getKey() + " found.", entry.getValue()); - } - - for (Map.Entry, Boolean> entry : threadClasses.entrySet()) { - // A non-leader HMS will still run the configured number of Compaction worker threads. - if (entry.getKey() == Worker.class) { - if (entry.getValue()) { - LOG.info("Thread found for " + entry.getKey().getSimpleName()); - } - } else { - if (!entry.getValue()) { - LOG.info("No thread found for " + entry.getKey().getSimpleName()); - } - Assert.assertFalse("Thread found for class " + entry.getKey().getSimpleName(), - entry.getValue()); - } - } + checkHouseKeepingThreadExistence(false); } + } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java index 88231b3ebbaa..8e570c9b3890 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java @@ -19,65 +19,65 @@ package org.apache.hadoop.hive.metastore; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.leader.LeaderElection; import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext; -import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection; +import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class TestMetastoreLeaseLeader { +public class TestMetastoreLeaseLeader extends MetastoreHousekeepingLeaderTestBase { - LeaderElection election; - - TestMetastoreHousekeepingLeader hms; + CombinedLeaderElector elector; @Before public void setUp() throws Exception { - hms = new TestMetastoreHousekeepingLeader(); - MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3, TimeUnit.SECONDS); - MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS); - hms.conf.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false); - hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true); - hms.conf.set("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - hms.internalSetup("", false); + Configuration configuration = MetastoreConf.newMetastoreConf(); + MetastoreConf.setTimeVar(configuration, MetastoreConf.ConfVars.TXN_TIMEOUT, 3, TimeUnit.SECONDS); + MetastoreConf.setTimeVar(configuration, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 200, TimeUnit.MILLISECONDS); + MetastoreConf.setLongVar(configuration, MetastoreConf.ConfVars.HMS_HANDLER_ATTEMPTS, 3); + MetastoreConf.setTimeVar(configuration, MetastoreConf.ConfVars.HMS_HANDLER_INTERVAL, 100, TimeUnit.MILLISECONDS); + configuration.set("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + LeaderElectionFactory.addElectionCreator(LeaderElectionFactory.Method.LOCK, conf -> new ReleaseAndRequireLease(conf, false)); + setup(null, configuration); - Configuration conf = MetastoreConf.newMetastoreConf(); - MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS); - MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3, TimeUnit.SECONDS); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock"); - election = new LeaseLeaderElection(); - TableName tableName = (TableName) LeaderElectionContext.getLeaderMutex(conf, - LeaderElectionContext.TTYPE.HOUSEKEEPING, null); - election.tryBeLeader(conf, tableName); + Configuration conf = new Configuration(configuration); + elector = new CombinedLeaderElector(conf); + elector.setName("TestMetastoreLeaseLeader"); + elector.tryBeLeader(); } @Test public void testHouseKeepingThreads() throws Exception { + int size = LeaderElectionContext.TTYPE.values().length; + CountDownLatch latch = new CountDownLatch(size); + MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch); // hms is the leader now - hms.testHouseKeepingThreadExistence(); - assertFalse(election.isLeader()); - Thread.sleep(15 * 1000); - // the lease of hms is timeout, election becomes leader now - assertTrue(election.isLeader()); - try { - // hms should shutdown all housekeeping tasks - hms.testHouseKeepingThreadExistence(); - throw new IllegalStateException("HMS should shutdown all housekeeping tasks"); - } catch (AssertionError e) { - // expected - } + checkHouseKeepingThreadExistence(true); + assertFalse(elector.isLeader()); + latch.await(); + // the lease of hms is timeout, the elector becomes leader now + assertTrue(elector.isLeader()); + // hms should shut down all housekeeping tasks + checkHouseKeepingThreadExistence(false); - election.close(); - Thread.sleep(10000); + latch = new CountDownLatch(size); + MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch); + elector.close(); + latch.await(); // hms becomes leader again - hms.testHouseKeepingThreadExistence(); + checkHouseKeepingThreadExistence(true); + } + + @After + public void afterTest() { + MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.reset(); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java index 6a340e4e6611..0873c129454e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java @@ -19,56 +19,54 @@ package org.apache.hadoop.hive.metastore; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.leader.LeaderElection; import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext; -import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection; +import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertTrue; -public class TestMetastoreLeaseNonLeader { +public class TestMetastoreLeaseNonLeader extends MetastoreHousekeepingLeaderTestBase { - LeaderElection election; - - TestMetastoreHousekeepingLeader hms; + CombinedLeaderElector elector; @Before public void setUp() throws Exception { Configuration conf = MetastoreConf.newMetastoreConf(); TestTxnDbUtil.setConfValues(conf); TestTxnDbUtil.prepDb(conf); - election = new LeaseLeaderElection(); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock"); - TableName tableName = (TableName) LeaderElectionContext.getLeaderMutex(conf, - LeaderElectionContext.TTYPE.HOUSEKEEPING, null); - election.tryBeLeader(conf, tableName); - assertTrue("The elector should hold the lease now", election.isLeader()); + elector = new CombinedLeaderElector(conf); + elector.setName("TestMetastoreLeaseNonLeader"); + elector.tryBeLeader(); + assertTrue("The elector should hold the lease now", elector.isLeader()); // start the non-leader hms now - hms = new TestMetastoreHousekeepingLeader(); - MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS); - hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true); - hms.internalSetup("", false); + Configuration configuration = new Configuration(conf); + MetastoreConf.setTimeVar(configuration, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 100, TimeUnit.MILLISECONDS); + LeaderElectionFactory.addElectionCreator(LeaderElectionFactory.Method.LOCK, c -> new ReleaseAndRequireLease(c, true)); + setup(null, configuration); } @Test public void testHouseKeepingThreads() throws Exception { - try { - hms.testHouseKeepingThreadExistence(); - throw new IllegalStateException("HMS shouldn't start any housekeeping tasks"); - } catch (AssertionError e) { - // expected - } + checkHouseKeepingThreadExistence(false); // elector releases the lease - election.close(); - Thread.sleep(10 * 1000); + CountDownLatch latch = new CountDownLatch(LeaderElectionContext.TTYPE.values().length); + MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch); + elector.close(); + latch.await(); // housing threads are here now as the hms wins the election - hms.testHouseKeepingThreadExistence(); + checkHouseKeepingThreadExistence(true); + } + + @After + public void afterTest() { + MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.reset(); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java index bf99745cc7d5..0ea4f54b297b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java @@ -173,10 +173,10 @@ private static void logPropChanges(Properties newProps) { return; } LOG.info("Updating the pmf due to property change"); - if (LOG.isDebugEnabled() && !newProps.equals(prop)) { + if (!newProps.equals(prop)) { for (String key : prop.stringPropertyNames()) { if (!key.equals(newProps.get(key))) { - if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(key)) { + if (MetastoreConf.isPrintable(key)) { // The jdbc connection url can contain sensitive information like username and password // which should be masked out before logging. String oldVal = prop.getProperty(key); @@ -185,10 +185,10 @@ private static void logPropChanges(Properties newProps) { oldVal = MetaStoreServerUtils.anonymizeConnectionURL(oldVal); newVal = MetaStoreServerUtils.anonymizeConnectionURL(newVal); } - LOG.debug("Found {} to be different. Old val : {} : New Val : {}", key, + LOG.warn("Found {} to be different. Old val : {} : New Val : {}", key, oldVal, newVal); } else { - LOG.debug("Found masked property {} to be different", key); + LOG.warn("Found masked property {} to be different", key); } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java index 5a6ab5d77bba..88d463e2abb6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java @@ -44,49 +44,49 @@ public interface LeaderElection extends Closeable { * It can be a path in Zookeeper, or a table that going to be locked. * @throws LeaderException */ - public void tryBeLeader(Configuration conf, T mutex) + void tryBeLeader(Configuration conf, T mutex) throws LeaderException; /** * Getting the result of election. * @return true if wins the election, false otherwise. */ - public boolean isLeader(); + boolean isLeader(); /** * Register listeners to get notified when leadership changes. * @param listener The listener to be added */ - public void addStateListener(LeadershipStateListener listener); + void addStateListener(LeadershipStateListener listener); /** * Set the name of this leader candidate * @param name the name */ - public void setName(String name); + void setName(String name); /** * Get the name of this leader candidate */ - public String getName(); + String getName(); default boolean enforceMutex() { return true; } - public interface LeadershipStateListener { + interface LeadershipStateListener { /** * Invoked when won the election. * @param election the election where happens. */ - public void takeLeadership(LeaderElection election) throws Exception; + void takeLeadership(LeaderElection election) throws Exception; /** * Invoked when lost the election * @param election the election where happens. */ - public void lossLeadership(LeaderElection election) throws Exception; + void lossLeadership(LeaderElection election) throws Exception; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java index a3652d1c0019..14d18e832db4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,8 +45,6 @@ public class LeaderElectionContext { public enum TTYPE { HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__", "metastore_housekeeping"), "housekeeping"), - WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__", - "metastore_compactor_worker"), "compactor_worker"), ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__", "metastore_always_tasks"), "always_tasks"); // Mutex of TTYPE, which can be a nonexistent table @@ -74,9 +73,8 @@ public String getName() { // State change listeners group by type private final Map> listeners; // Collection of leader candidates - private final List leaderElections = new ArrayList<>(); + private final List> leaderElections = new ArrayList<>(); // Property for testing, a single leader will be created - public final static String LEADER_IN_TEST = "metastore.leader.election.in.test"; private LeaderElectionContext(String servHost, Configuration conf, Map> listeners, @@ -97,40 +95,32 @@ private LeaderElectionContext(String servHost, Configuration conf, } public void start() throws Exception { - Map> listenerMap = this.listeners; - if (conf.getBoolean(LEADER_IN_TEST, false)) { - Map> newListeners = new HashMap<>(); - newListeners.put(TTYPE.HOUSEKEEPING, new ArrayList<>()); - listenerMap.forEach((k, v) -> newListeners.get(TTYPE.HOUSEKEEPING).addAll(v)); - listenerMap = newListeners; - } - for (Map.Entry> entry : - listenerMap.entrySet()) { - List listenerList = entry.getValue(); + List ttypes = new ArrayList<>(listeners.keySet()); + Collections.shuffle(ttypes); + for (TTYPE ttype : ttypes) { + List listenerList = listeners.get(ttype); if (listenerList.isEmpty()) { continue; } if (auditLeaderListener != null) { - listenerList.add(0, auditLeaderListener); + listenerList.addFirst(auditLeaderListener); } - TTYPE ttype = entry.getKey(); LeaderElection leaderElection = LeaderElectionFactory.create(conf); leaderElection.setName(ttype.name); - listenerList.forEach(listener -> leaderElection.addStateListener(listener)); + listenerList.forEach(leaderElection::addStateListener); leaderElections.add(leaderElection); - Thread daemon = new Thread(() -> { try { - Object mutex = getLeaderMutex(conf, ttype, servHost); + Object mutex = LeaderElectionFactory.getMutex(conf, ttype, servHost); leaderElection.tryBeLeader(conf, mutex); } catch (LeaderException e) { throw new RuntimeException("Error claiming to be leader: " + leaderElection.getName(), e); } }); - daemon.setName("Metastore Election " + leaderElection.getName()); - daemon.setDaemon(true); if (startAsDaemon) { + daemon.setName("Metastore Election " + leaderElection.getName()); + daemon.setDaemon(true); daemon.start(); } else { daemon.run(); @@ -148,25 +138,6 @@ public void close() { }); } - public static Object getLeaderMutex(Configuration conf, TTYPE ttype, String servHost) { - String method = - MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION); - switch (method.toLowerCase()) { - case "host": - return servHost; - case "lock": - TableName mutex = ttype.getTableName(); - String namespace = - MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE); - if (StringUtils.isNotEmpty(namespace)) { - return new TableName(mutex.getCat(), namespace, mutex.getTable()); - } - return mutex; - default: - throw new UnsupportedOperationException(method + " not supported for leader election"); - } - } - public static class ContextBuilder { private Configuration configuration; private boolean startAsDaemon; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java index 5055ad8a0034..ab4ca8bc4fac 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java @@ -18,27 +18,78 @@ package org.apache.hadoop.hive.metastore.leader; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.EnumUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; /** * Simple factory for creating the elector */ public class LeaderElectionFactory { + public enum Method { + HOST, LOCK + } + + private static final Map ELECTION_CREATOR_MAP = new ConcurrentHashMap<>(); + static { + addElectionCreator(Method.HOST, conf -> new StaticLeaderElection()); + addElectionCreator(Method.LOCK, conf -> new LeaseLeaderElection()); + } + + private LeaderElectionFactory() { + throw new AssertionError("The constructor shouldn't be called"); + } + + public static LeaderElection create(Configuration conf) throws IOException { + String method = + MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION); + Method m = EnumUtils.getEnum(Method.class, method.toUpperCase()); + ElectionCreator creator = null; + if (m != null) { + creator = ELECTION_CREATOR_MAP.get(m); + } + if (creator == null) { + throw new UnsupportedOperationException(method + " not supported for electing the leader"); + } + return creator.createElector(conf); + } - public static LeaderElection create(Configuration conf) throws IOException { + public static Object getMutex(Configuration conf, LeaderElectionContext.TTYPE ttype, String servHost) { String method = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION); - switch (method.toLowerCase()) { - case "host": - return new StaticLeaderElection(); - case "lock": - return new LeaseLeaderElection(); - default: - throw new UnsupportedOperationException(method + " is not supported for electing the leader"); + Method m = EnumUtils.getEnum(Method.class, method.toUpperCase()); + if (m != null) { + switch (m) { + case HOST: + return servHost; + case LOCK: + TableName mutex = ttype.getTableName(); + String namespace = + MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE); + if (StringUtils.isNotEmpty(namespace)) { + return new TableName(mutex.getCat(), namespace, mutex.getTable()); + } + return mutex; + } } + throw new UnsupportedOperationException(method + " not supported for leader election"); + } + + @VisibleForTesting + public static void addElectionCreator(Method method, ElectionCreator creator) { + ELECTION_CREATOR_MAP.put(method, creator); + } + + public interface ElectionCreator { + LeaderElection createElector(Configuration conf) throws IOException; } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java index fc4d4078df24..ee635a6f9f5f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java @@ -64,10 +64,10 @@ public class LeaseLeaderElection implements LeaderElection { private static final Logger LOG = LoggerFactory.getLogger(LeaseLeaderElection.class); - private static final AtomicLong ID = new AtomicLong(); + protected static final AtomicLong ID = new AtomicLong(); // Result of election - private volatile boolean isLeader; + protected volatile boolean isLeader; private TxnStore store; @@ -76,7 +76,7 @@ public class LeaseLeaderElection implements LeaderElection { // A daemon used for renewing the lock before timeout, // this happens when the current instance wins the election. - private LeaseWatcher heartbeater; + protected LeaseWatcher heartbeater; // For non-leader instances to check the lock periodically to // see if there is a chance to take over the leadership. @@ -86,15 +86,14 @@ public class LeaseLeaderElection implements LeaderElection { // Current lock id private volatile long lockId = -1; - // Leadership change listeners - private List listeners = new ArrayList<>(); + private volatile boolean stopped = false; - // Property for testing only - public static final String METASTORE_RENEW_LEASE = "metastore.renew.leader.lease"; + // Leadership change listeners + private final List listeners = new ArrayList<>(); - private String name; - private String userName; - private String hostName; + protected String name; + private final String userName; + private final String hostName; private boolean enforceMutex; public LeaseLeaderElection() throws IOException { @@ -111,10 +110,8 @@ private synchronized void doWork(LockResponse resp, Configuration conf, switch (resp.getState()) { case ACQUIRED: - boolean renewLease = conf.getBoolean(METASTORE_RENEW_LEASE, true); - heartbeater = renewLease ? - new Heartbeater(conf, tableName) : new ReleaseAndRequireWatcher(conf, tableName); - heartbeater.perform(); + heartbeater = new Heartbeater(conf, tableName); + heartbeater.startWatch(); if (!isLeader) { isLeader = true; notifyListener(); @@ -122,7 +119,7 @@ private synchronized void doWork(LockResponse resp, Configuration conf, break; case WAITING: nonLeaderWatcher = new NonLeaderWatcher(conf, tableName); - nonLeaderWatcher.perform(); + nonLeaderWatcher.startWatch(); if (isLeader) { isLeader = false; notifyListener(); @@ -134,7 +131,7 @@ private synchronized void doWork(LockResponse resp, Configuration conf, LOG.debug("Spent {}ms to notify the listeners, isLeader: {}", System.currentTimeMillis() - start, isLeader); } - private void notifyListener() { + protected void notifyListener() { listeners.forEach(listener -> { try { if (isLeader) { @@ -143,8 +140,7 @@ private void notifyListener() { listener.lossLeadership(this); } } catch (Exception e) { - LOG.error("Error notifying the listener: " + listener + - ", leader: " + isLeader, e); + LOG.error("Error notifying the listener: {}, leader: {}", listener, isLeader, e); } }); } @@ -157,14 +153,15 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept if (store == null) { store = TxnUtils.getTxnStore(conf); } - LockComponent component = new LockComponentBuilder() - .setDbName(table.getDb()) - .setTableName(table.getTable()) - .setLock(LockType.EXCL_WRITE) - .setOperationType(DataOperationType.NO_TXN) - .build(); - List components = new ArrayList(1); - components.add(component); + + List components = new ArrayList<>(); + components.add( + new LockComponentBuilder() + .setDbName(table.getDb()) + .setTableName(table.getTable()) + .setLock(LockType.EXCL_WRITE) + .setOperationType(DataOperationType.NO_TXN) + .build()); boolean lockable = false; Exception recentException = null; @@ -173,28 +170,27 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept int numRetries = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.LOCK_NUMRETRIES); long maxSleep = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); - for (int i = 0; i < numRetries; i++) { + for (int i = 0; i < numRetries && !stopped; i++) { try { LockResponse res = store.lock(req); if (res.getState() == LockState.WAITING || res.getState() == LockState.ACQUIRED) { lockable = true; + LOG.debug("{} Spent {}ms to take part in election, retries: {}", getName(), System.currentTimeMillis() - start, i); doWork(res, conf, table); - LOG.debug("Spent {}ms to lock the table {}, retries: {}", System.currentTimeMillis() - start, table, i); break; } } catch (NoSuchTxnException | TxnAbortedException e) { throw new AssertionError("This should not happen, we didn't open txn", e); } catch (MetaException e) { recentException = e; - LOG.warn("Error while locking the table: {}, num retries: {}," + - " max retries: {}, exception: {}", table, i, numRetries, e); + LOG.warn("Error while locking the table: {}, num retries: {}, max retries: {}", + table, i, numRetries, e); } backoff(maxSleep); } if (!lockable) { throw new LeaderException("Error locking the table: " + table + " in " + numRetries + - " retries, time spent: " + (System.currentTimeMillis() - start) + " ms", - recentException); + " retries, time spent: " + (System.currentTimeMillis() - start) + " ms", recentException); } } @@ -206,11 +202,11 @@ private void backoff(long maxSleep) { nextSleep = maxSleep; try { Thread.sleep(nextSleep); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } - private void shutdownWatcher() { + protected void shutdownWatcher() { if (heartbeater != null) { heartbeater.shutDown(); heartbeater = null; @@ -232,7 +228,7 @@ public boolean isLeader() { return isLeader; } - private abstract class LeaseWatcher extends Thread { + protected abstract class LeaseWatcher extends Thread { protected Configuration conf; @@ -240,17 +236,17 @@ private abstract class LeaseWatcher extends Thread { private volatile boolean stopped = false; - LeaseWatcher(Configuration conf, TableName tableName) { + protected LeaseWatcher(Configuration conf, TableName tableName) { this.conf = conf; this.tableName = tableName; setDaemon(true); - StringBuilder builder = new StringBuilder("Leader-Watcher-") - .append(name != null ? name : "") + StringBuilder builder = new StringBuilder("Lease-Watcher-") + .append(name != null ? name + "-" : "") .append(ID.incrementAndGet()); setName(builder.toString()); } - public void perform() { + public void startWatch() { LOG.info("Starting a watcher: {} for {}", getClass().getName(), name); start(); } @@ -275,11 +271,9 @@ public void shutDown() { } public void beforeRun() { - // do nothing } public void afterRun() { - // do nothing } public abstract void runInternal(); @@ -288,15 +282,15 @@ public void reclaim() { try { tryBeLeader(conf, tableName); } catch (Exception e) { - LOG.error("Error reclaiming the leader, will retry in next cycle", e); + LOG.error("Error reclaiming the lease, will retry in next cycle", e); } } } private class NonLeaderWatcher extends LeaseWatcher { - private long sleep; + private final long sleep; private int count; - private CheckLockRequest request; + private final CheckLockRequest request; NonLeaderWatcher(Configuration conf, TableName table) { super(conf, table); @@ -348,8 +342,8 @@ public void afterRun() { } private class Heartbeater extends LeaseWatcher { - private HeartbeatRequest req; - private long heartbeatInterval; + private final HeartbeatRequest req; + private final long heartbeatInterval; Heartbeater(Configuration conf, TableName table) { super(conf, table); @@ -389,7 +383,7 @@ public void runInternal() { reclaim(); } catch (Exception e) { // Wait for next cycle. - LOG.warn("Heartbeat failed with exception: " + e.getMessage(), e); + LOG.warn("Heartbeat failed with exception: {}", e.getMessage(), e); } } @@ -403,45 +397,9 @@ public void afterRun() { } } - // For testing purpose only, lock would become timeout and then acquire it again - private class ReleaseAndRequireWatcher extends LeaseWatcher { - long timeout; - public ReleaseAndRequireWatcher(Configuration conf, - TableName tableName) { - super(conf, tableName); - timeout = MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000; - setName("ReleaseAndRequireWatcher"); - } - - @Override - public void beforeRun() { - try { - Thread.sleep(timeout); - } catch (InterruptedException e) { - // ignore this - } - } - - @Override - public void runInternal() { - shutDown(); - // The timeout lock should be cleaned, - // sleep some time to let others take the chance to become the leader - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - // ignore - } - // Acquire the lock again - conf = new Configuration(conf); - conf.setBoolean(METASTORE_RENEW_LEASE, true); - reclaim(); - } - } - @Override public void close() { + stopped = true; shutdownWatcher(); if (isLeader) { isLeader = false; @@ -454,15 +412,11 @@ public void close() { } catch (NoSuchLockException | TxnOpenException e) { // ignore } catch (Exception e) { - LOG.error("Error while unlocking: " + lockId, e); + LOG.error("Error while unlocking: {}", lockId, e); } } } - public long getLockId() { - return lockId; - } - @Override public void setName(String name) { this.name = name; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 13f32f646302..b77dd08601eb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -388,6 +388,7 @@ LockResponse lock(LockRequest rqst) * @throws TxnAbortedException * @throws MetaException */ + @SqlRetry(lockInternally = true) @Transactional(POOL_TX) @RetrySemantics.SafeToRetry LockResponse checkLock(CheckLockRequest rqst) @@ -425,7 +426,7 @@ void unlock(UnlockRequest rqst) * @throws TxnAbortedException * @throws MetaException */ - @SqlRetry + @SqlRetry(lockInternally = true) @Transactional(POOL_TX) @RetrySemantics.SafeToRetry void heartbeat(HeartbeatRequest ids) diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java index 59f7dbc8fd64..c3d24cd2dced 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java @@ -20,16 +20,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.TableName; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.junit.Test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,7 +34,7 @@ public class TestLeaderElection { @Test public void testConfigLeaderElection() throws Exception { - LeaderElection election = new StaticLeaderElection(); + LeaderElection election = new StaticLeaderElection(); String leaderHost = "host1.work"; Configuration configuration = MetastoreConf.newMetastoreConf(); election.tryBeLeader(configuration, leaderHost); @@ -87,9 +81,7 @@ public void testLeaseLeaderElection() throws Exception { MetastoreConf.setBoolVar(configuration, MetastoreConf.ConfVars.HIVE_IN_TEST, true); TestTxnDbUtil.setConfValues(configuration); TestTxnDbUtil.prepDb(configuration); - TxnStore txnStore = TxnUtils.getTxnStore(configuration); - configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false); TableName mutex = new TableName("hive", "default", "leader_lease_ms"); LeaseLeaderElection instance1 = new LeaseLeaderElection(); AtomicBoolean flag1 = new AtomicBoolean(false); @@ -98,7 +90,6 @@ public void testLeaseLeaderElection() throws Exception { // elect1 as a leader now assertTrue(flag1.get() && instance1.isLeader()); - configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, true); LeaseLeaderElection instance2 = new LeaseLeaderElection(); AtomicBoolean flag2 = new AtomicBoolean(false); instance2.addStateListener(new TestLeaderListener(flag2)); @@ -106,52 +97,14 @@ public void testLeaseLeaderElection() throws Exception { // instance2 should not be leader as elect1 holds the lease assertFalse(flag2.get() || instance2.isLeader()); - ExecutorService service = Executors.newFixedThreadPool(4); - wait(service, flag1, flag2); - // now instance1 lease is timeout, the instance2 should be leader now + instance1.close(); + synchronized (flag2) { + flag2.wait(); + } + // now instance1 lease is released, the instance2 should be leader now assertTrue(instance2.isLeader() && flag2.get()); assertFalse(flag1.get() || instance1.isLeader()); assertTrue(flag2.get() && instance2.isLeader()); - - // remove leader's lease (instance2) - long lockId2 = instance2.getLockId(); - txnStore.unlock(new UnlockRequest(lockId2)); - wait(service, flag1, flag2); - assertFalse(flag2.get() || instance2.isLeader()); - assertTrue(lockId2 > 0); - assertFalse(instance2.getLockId() == lockId2); - - // remove leader's lease(instance1) - long lockId1 = instance1.getLockId(); - txnStore.unlock(new UnlockRequest(lockId1)); - wait(service, flag1, flag2); - assertFalse(lockId1 == instance1.getLockId()); - assertTrue(lockId1 > 0); - - for (int i = 0; i < 10; i++) { - assertFalse(flag1.get() || instance1.isLeader()); - assertTrue(flag2.get() && instance2.isLeader()); - Thread.sleep(1 * 1000); - } - } - - private void wait(ExecutorService service, Object... obj) throws Exception { - Future[] fs = new Future[obj.length]; - for (int i = 0; i < obj.length; i++) { - Object monitor = obj[i]; - fs[i] = service.submit(() -> { - try { - synchronized (monitor) { - monitor.wait(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - } - for (Future f : fs) { - f.get(); - } } }