Skip to content

Commit af8ef0c

Browse files
committed
HIVE-28930: Implement a metastore service that expires iceberg table snapshots periodically
1 parent 4554364 commit af8ef0c

File tree

11 files changed

+565
-79
lines changed

11 files changed

+565
-79
lines changed

common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2238,7 +2238,7 @@ public static enum ConfVars {
22382238
"Use stats from iceberg table snapshot for query planning. This has two values metastore and iceberg"),
22392239
HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS("hive.iceberg.expire.snapshot.numthreads", 4,
22402240
"The number of threads to be used for deleting files during expire snapshot. If set to 0 or below it uses the" +
2241-
" defult DirectExecutorService"),
2241+
" default DirectExecutorService"),
22422242

22432243
HIVE_ICEBERG_MASK_DEFAULT_LOCATION("hive.iceberg.mask.default.location", false,
22442244
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import java.util.Set;
4040
import java.util.UUID;
4141
import java.util.concurrent.ExecutorService;
42-
import java.util.concurrent.Executors;
43-
import java.util.concurrent.atomic.AtomicInteger;
4442
import java.util.function.Function;
4543
import java.util.stream.Collectors;
4644
import java.util.stream.Stream;
@@ -1141,7 +1139,8 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num
11411139
try {
11421140
if (numThreads > 0) {
11431141
LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads);
1144-
deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
1142+
deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(),
1143+
numThreads);
11451144
}
11461145

11471146
HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable);
@@ -1164,7 +1163,7 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap
11641163
try {
11651164
if (numThreads > 0) {
11661165
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
1167-
deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
1166+
deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), numThreads);
11681167
}
11691168
if (expireSnapshotsSpec == null) {
11701169
expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService);
@@ -1243,15 +1242,6 @@ private void expireSnapshotByIds(Table icebergTable, String[] idsToExpire,
12431242
}
12441243
}
12451244

1246-
private ExecutorService getDeleteExecutorService(String completeName, int numThreads) {
1247-
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
1248-
return Executors.newFixedThreadPool(numThreads, runnable -> {
1249-
Thread thread = new Thread(runnable);
1250-
thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
1251-
return thread;
1252-
});
1253-
}
1254-
12551245
@Override
12561246
public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
12571247
AlterTableSnapshotRefSpec alterTableSnapshotRefSpec) {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.hive;
21+
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
26+
public class HiveIcebergUtil {
27+
28+
private HiveIcebergUtil() {
29+
}
30+
31+
public static ExecutorService getDeleteExecutorService(String completeName, int numThreads) {
32+
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
33+
return Executors.newFixedThreadPool(numThreads, runnable -> {
34+
Thread thread = new Thread(runnable);
35+
thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
36+
return thread;
37+
});
38+
}
39+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.mr.hive.metastore.task;
20+
21+
import java.util.List;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hive.common.TableName;
26+
import org.apache.hadoop.hive.conf.HiveConf;
27+
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
28+
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
29+
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
30+
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
31+
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
32+
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
33+
import org.apache.hadoop.hive.metastore.utils.TableFetcher;
34+
import org.apache.iceberg.ExpireSnapshots;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.mr.hive.HiveIcebergUtil;
37+
import org.apache.iceberg.mr.hive.IcebergTableUtil;
38+
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
39+
import org.apache.thrift.TException;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
public class IcebergHouseKeeperService implements MetastoreTaskThread {
44+
private static final Logger LOG = LoggerFactory.getLogger(IcebergHouseKeeperService.class);
45+
46+
private Configuration conf;
47+
48+
@Override
49+
public long runFrequency(TimeUnit unit) {
50+
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_INTERVAL, unit);
51+
}
52+
53+
@Override
54+
public void run() {
55+
LOG.debug("Running IcebergHouseKeeperService...");
56+
57+
String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_CATALOG_NAME);
58+
String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN);
59+
String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_TABLE_PATTERN);
60+
61+
try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
62+
// TODO: Future improvement – modify TableFetcher to return HMS Table API objects directly,
63+
// avoiding the need for subsequent msc.getTable calls to fetch each matched table individually
64+
List<TableName> tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables();
65+
66+
LOG.debug("{} candidate tables found", tables.size());
67+
68+
for (TableName table : tables) {
69+
expireSnapshotsForTable(getIcebergTable(table, msc));
70+
}
71+
} catch (Exception e) {
72+
LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", catalogName, dbPattern,
73+
tablePattern, e);
74+
}
75+
}
76+
77+
@VisibleForTesting
78+
TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String dbPattern, String tablePattern) {
79+
return new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes(
80+
"EXTERNAL_TABLE")
81+
.tableCondition(
82+
hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type like \"ICEBERG\" ")
83+
.build();
84+
}
85+
86+
private Table getIcebergTable(TableName table, IMetaStoreClient msc) throws TException {
87+
GetTableRequest request = new GetTableRequest(table.getDb(), table.getTable());
88+
return IcebergTableUtil.getTable(conf, msc.getTable(request));
89+
}
90+
91+
/**
92+
* Deletes snapshots of an Iceberg table, using the number of threads defined by the
93+
* Hive config HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.
94+
* This is largely equivalent to the HiveIcebergStorageHandler.expireSnapshotWithDefaultParams method.
95+
*
96+
* @param icebergTable the iceberg Table reference
97+
*/
98+
private void expireSnapshotsForTable(Table icebergTable) {
99+
LOG.info("Expire snapshots for: {}", icebergTable);
100+
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
101+
102+
int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
103+
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
104+
105+
ExecutorService deleteExecutorService = null;
106+
try {
107+
if (numThreads > 0) {
108+
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
109+
deleteExecutorService = HiveIcebergUtil.getDeleteExecutorService(icebergTable.name(), numThreads);
110+
}
111+
if (deleteExecutorService != null) {
112+
expireSnapshots.executeDeleteWith(deleteExecutorService);
113+
}
114+
expireSnapshots.commit();
115+
} finally {
116+
if (deleteExecutorService != null) {
117+
deleteExecutorService.shutdown();
118+
}
119+
}
120+
}
121+
122+
@Override
123+
public Configuration getConf() {
124+
return conf;
125+
}
126+
127+
@Override
128+
public void setConf(Configuration configuration) {
129+
conf = configuration;
130+
}
131+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.mr.hive.metastore.task;
20+
21+
import java.io.File;
22+
import java.util.Arrays;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Optional;
26+
27+
import org.apache.hadoop.hive.common.TableName;
28+
import org.apache.hadoop.hive.conf.HiveConf;
29+
import org.apache.hadoop.hive.metastore.TableType;
30+
import org.apache.hadoop.hive.metastore.api.FieldSchema;
31+
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
32+
import org.apache.hadoop.hive.metastore.utils.TableFetcher;
33+
import org.apache.hadoop.hive.ql.metadata.Hive;
34+
import org.apache.hadoop.hive.ql.metadata.Table;
35+
import org.apache.iceberg.DataFile;
36+
import org.apache.iceberg.DataFiles;
37+
import org.apache.iceberg.mr.hive.IcebergTableUtil;
38+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
39+
import org.junit.AfterClass;
40+
import org.junit.Assert;
41+
import org.junit.BeforeClass;
42+
import org.junit.Test;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
public class TestIcebergHouseKeeperService {
47+
private static final Logger LOG = LoggerFactory.getLogger(TestIcebergHouseKeeperService.class);
48+
49+
private static final HiveConf conf = new HiveConf(TestIcebergHouseKeeperService.class);
50+
private static Hive db;
51+
52+
@BeforeClass
53+
public static void beforeClass() throws Exception {
54+
conf.set("hive.security.authorization.enabled", "false");
55+
conf.set("hive.security.authorization.manager",
56+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory");
57+
conf.set("iceberg.engine.hive.lock-enabled", "false");
58+
59+
db = Hive.get(conf);
60+
}
61+
62+
@AfterClass
63+
public static void afterClass() {
64+
db.close(true);
65+
}
66+
67+
@Test
68+
public void testIcebergTableFetched() throws Exception {
69+
createIcebergTable("iceberg_table");
70+
71+
IcebergHouseKeeperService service = new IcebergHouseKeeperService();
72+
TableFetcher tableFetcher = service.getTableFetcher(db.getMSC(), null, "default", "*");
73+
74+
List<TableName> tables = tableFetcher.getTables();
75+
Assert.assertEquals(new TableName("hive", "default", "iceberg_table"), tables.get(0));
76+
}
77+
78+
@Test
79+
public void testExpireSnapshotsByServiceRun() throws Exception {
80+
String tableName = "iceberg_table_snapshot_expiry_e2e_test";
81+
Table table = createIcebergTable(tableName);
82+
IcebergHouseKeeperService service = getServiceForTable("default", tableName);
83+
84+
GetTableRequest request = new GetTableRequest("default", tableName);
85+
org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, db.getMSC().getTable(request));
86+
87+
String metadataDirectory = icebergTable.location().replaceAll("^[a-zA-Z]+:", "") + "/metadata";
88+
89+
DataFile datafile = DataFiles.builder(icebergTable.spec())
90+
.withRecordCount(3)
91+
.withPath("/tmp/file.parquet")
92+
.withFileSizeInBytes(10)
93+
.build();
94+
95+
icebergTable.newAppend().appendFile(datafile).commit();
96+
assertSnapshotFiles(metadataDirectory, 1);
97+
icebergTable.newAppend().appendFile(datafile).commit();
98+
assertSnapshotFiles(metadataDirectory, 2);
99+
100+
Thread.sleep(1000); // allow snapshots that are 1000ms old to become eligible for snapshot expiry
101+
service.run();
102+
103+
assertSnapshotFiles(metadataDirectory, 1);
104+
db.dropTable("default", "iceberg_table_snapshot_expiry_e2e_test");
105+
}
106+
107+
private Table createIcebergTable(String name) throws Exception {
108+
Table table = new Table("default", name);
109+
List<FieldSchema> columns = Lists.newArrayList();
110+
columns.add(new FieldSchema("col", "string", "First column"));
111+
table.setFields(columns); // Set columns
112+
113+
table.setProperty("EXTERNAL", "TRUE");
114+
table.setTableType(TableType.EXTERNAL_TABLE);
115+
table.setProperty("table_type", "ICEBERG");
116+
117+
table.setProperty("history.expire.max-snapshot-age-ms", "500");
118+
119+
db.createTable(table);
120+
return table;
121+
}
122+
123+
/**
124+
* Creates IcebergHouseKeeperService that's configured to clean up a table by database and table name.
125+
*
126+
* @param tableName to be cleaned up
127+
* @return IcebergHouseKeeperService
128+
*/
129+
private IcebergHouseKeeperService getServiceForTable(String dbName, String tableName) {
130+
IcebergHouseKeeperService service = new IcebergHouseKeeperService();
131+
HiveConf serviceConf = new HiveConf(conf);
132+
serviceConf.set("hive.metastore.iceberg.table.expiry.database.pattern", dbName);
133+
serviceConf.set("hive.metastore.iceberg.table.expiry.table.pattern", tableName);
134+
service.setConf(serviceConf);
135+
return service;
136+
}
137+
138+
private void assertSnapshotFiles(String metadataDirectory, int numberForSnapshotFiles) {
139+
File[] matchingFiles = new File(metadataDirectory).listFiles((dir, name) -> name.startsWith("snap-"));
140+
List<File> files = Optional.ofNullable(matchingFiles).map(Arrays::asList).orElse(Collections.emptyList());
141+
LOG.debug("Snapshot files found in directory({}): {}", metadataDirectory, files);
142+
Assert.assertEquals(String.format("Unexpected no. of snapshot files in metadata directory: %s",
143+
metadataDirectory), numberForSnapshotFiles, files.size());
144+
}
145+
}

itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/StartMiniHS2Cluster.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public void testRunCluster() throws Exception {
5050
String confFilesProperty = System.getProperty("miniHS2.conf", "../../data/conf/hive-site.xml");
5151
boolean usePortsFromConf = Boolean.parseBoolean(System.getProperty("miniHS2.usePortsFromConf", "false"));
5252
boolean isMetastoreRemote = Boolean.getBoolean("miniHS2.isMetastoreRemote");
53+
boolean withHouseKeepingThreads = Boolean.getBoolean("miniHS2.withHouseKeepingThreads");
5354
boolean queryHistory = Boolean.getBoolean("miniHS2.queryHistory");
5455

5556
// Load conf files
@@ -77,7 +78,8 @@ public void testRunCluster() throws Exception {
7778
}
7879

7980
miniHS2 = new MiniHS2.Builder().withConf(conf).withClusterType(clusterType).withPortsFromConf(usePortsFromConf)
80-
.withRemoteMetastore(isMetastoreRemote).withQueryHistory(queryHistory).build();
81+
.withRemoteMetastore(isMetastoreRemote).withHouseKeepingThreads(withHouseKeepingThreads)
82+
.withQueryHistory(queryHistory).build();
8183
Map<String, String> confOverlay = new HashMap<String, String>();
8284
miniHS2.start(confOverlay);
8385

0 commit comments

Comments
 (0)