Skip to content

Commit

Permalink
Add earliest partition date stat to table stats collector (#205)
Browse files Browse the repository at this point in the history
## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

This PR adds the earliest partition date on a table as a stat to the
table stats collector.

For tables where the retention policy has been set on a string type
column, the stat is retrieved based on the retention column name, since
this stat will be used as a comparison point for Openhouse alerting to
identify if older partitions should have been cleaned up through
retention jobs.
For tables that are partitioned on timestamp, it searches for the
partition column name that is `Timestamp`. Openhouse only allows a table
to be partitioned on 1 timestamp type column so we can identify that as
the column set for retention.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

Added unit testing.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
chenselena authored Sep 23, 2024
1 parent 1802fc9 commit 67a957f
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

/** Utility class to collect stats for a given table. */
@Slf4j
Expand Down Expand Up @@ -106,12 +108,15 @@ protected static IcebergTableStats populateStatsForSnapshots(
.map(snapshot -> snapshot.timestampMillis())
.orElse(null);

String earliestPartitionDate = getEarliestPartitionDate(table, spark, getTablePolicies(table));

log.info(
"Table: {}, Count of total Data files: {}, Sum of file sizes in bytes: {} for snaphot: {}",
"Table: {}, Count of total Data files: {}, Sum of file sizes in bytes: {}, Earliest partition date: {}, for snapshot: {}",
fqtn,
countOfDataFiles,
sumOfFileSizeBytes,
currentSnapshotId);
currentSnapshotId,
earliestPartitionDate);

// Find minimum timestamp of all snapshots where snapshots is iterator
Long oldestSnapshotTimestamp =
Expand All @@ -127,6 +132,7 @@ protected static IcebergTableStats populateStatsForSnapshots(
.oldestSnapshotTimestamp(oldestSnapshotTimestamp)
.numCurrentSnapshotReferencedDataFiles(countOfDataFiles)
.totalCurrentSnapshotReferencedDataFilesSizeInBytes(sumOfFileSizeBytes)
.earliestPartitionDate(earliestPartitionDate)
.build();
}

Expand Down Expand Up @@ -247,6 +253,47 @@ private static long getSumOfFileSizeBytes(Dataset<Row> allDataFiles) {
.getLong(0);
}

/**
* Get the earliest partition date on a table.
*
* @param table
* @param spark
* @param policyMap
* @return
*/
private static String getEarliestPartitionDate(
Table table, SparkSession spark, Map<String, Object> policyMap) {
String partitionColumnName =
policyMap.containsKey("columnName")
? (String) policyMap.get("columnName")
: getPartitionColumnName(table);
if (partitionColumnName == null) {
return null;
}

Dataset<Row> partitionData =
SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.PARTITIONS);
String partitionColumn = String.format("partition.%s", partitionColumnName);

String earliestPartitionDate =
partitionData
.select(partitionColumn)
.orderBy(functions.asc(partitionColumn))
.first()
.get(0)
.toString();

return earliestPartitionDate;
}

private static String getPartitionColumnName(Table table) {
return StreamSupport.stream(table.spec().partitionType().fields().spliterator(), false)
.filter(field -> field.type() instanceof Types.DateType)
.map(Types.NestedField::name)
.findFirst()
.orElse(null);
}

/**
* Get table policies.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -527,6 +529,40 @@ private static Path prepareOrphanTableDirectory(Operations ops, String tableName
return tbLoc;
}

@Test
public void testCollectEarliestPartitionDateStat() throws Exception {
final String tableName = "db.test_collect_earliest_partition_date";
List<String> rowValue = new ArrayList<>();

try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// Test table with no partition
prepareTable(ops, tableName);
IcebergTableStats stats = ops.collectTableStats(tableName);
Assertions.assertNull(stats.getEarliestPartitionDate());

// Test yyyy-mm-dd format on table with multiple partitioned columns
prepareTableWithPoliciesWithMultipleStringPartition(ops, tableName, "30d", false);
rowValue.add("202%s-07-16");
rowValue.add("202%s-07-17");
rowValue.add("202%s-08-16");
rowValue.add("202%s-09-16");
populateTableWithMultipleStringColumn(ops, tableName, 1, rowValue);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(stats.getEarliestPartitionDate(), "202%s-07-16");
rowValue.clear();

// Test timestamp format
prepareTableWithPolicies(ops, tableName, "30d", false);
populateTable(ops, tableName, 1, 2);
populateTable(ops, tableName, 1, 1);
populateTable(ops, tableName, 1, 0);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(
stats.getEarliestPartitionDate(),
LocalDate.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
}

@Test
public void testCollectTablePolicyStats() throws Exception {
final String tableName = "db.test_collect_table_stats_with_policy";
Expand Down Expand Up @@ -662,6 +698,20 @@ private static void populateTableWithStringColumn(
}
}

private static void populateTableWithMultipleStringColumn(
Operations ops, String tableName, int numRows, List<String> dataFormats) {
for (String dataFormat : dataFormats) {
for (int row = 0; row < numRows; ++row) {
ops.spark()
.sql(
String.format(
"INSERT INTO %s VALUES ('%s', '%s', %d)",
tableName, dataFormat, String.format(dataFormat, row), row))
.show();
}
}
}

private static void prepareTable(Operations ops, String tableName) {
prepareTable(ops, tableName, false);
}
Expand Down Expand Up @@ -739,6 +789,24 @@ private static void prepareTableWithPoliciesWithStringPartition(
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithMultipleStringPartition(
Operations ops, String tableName, String retention, boolean sharing) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (datepartition string, data string, num int) PARTITIONED BY (datepartition, num)",
tableName))
.show();
ops.spark()
.sql(
String.format(
"ALTER TABLE %s SET POLICY (RETENTION=%s ON COLUMN datepartition)",
tableName, retention));
ops.spark().sql(String.format("ALTER TABLE %s SET POLICY (SHARING=%s)", tableName, sharing));
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithCustomStringPartition(
Operations ops, String tableName, String retention, String pattern) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public class IcebergTableStats extends BaseTableMetadata {
private Long numReferencedManifestFiles;

private Long numReferencedManifestLists;

private String earliestPartitionDate;
}

0 comments on commit 67a957f

Please sign in to comment.