Skip to content

Commit

Permalink
Add earliest partition date stat to table stats collector
Browse files Browse the repository at this point in the history
  • Loading branch information
chenselena committed Sep 20, 2024
1 parent 779a46b commit c76185f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public IcebergTableStats collectTableStats() {
IcebergTableStats stats = IcebergTableStats.builder().build();

IcebergTableStats statsWithMetadataData =
TableStatsCollectorUtil.populateTableMetadata(table, stats);
TableStatsCollectorUtil.populateTableMetadata(table, stats, spark);
IcebergTableStats statsWithReferenceFiles =
TableStatsCollectorUtil.populateStatsOfAllReferencedFiles(
fqtn, table, spark, statsWithMetadataData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

/** Utility class to collect stats for a given table. */
@Slf4j
Expand Down Expand Up @@ -174,7 +176,8 @@ protected static IcebergTableStats populateStorageStats(
* @param table
* @param stats
*/
protected static IcebergTableStats populateTableMetadata(Table table, IcebergTableStats stats) {
protected static IcebergTableStats populateTableMetadata(
Table table, IcebergTableStats stats, SparkSession spark) {
Map<String, Object> policyMap = getTablePolicies(table);
return stats
.builder()
Expand All @@ -197,6 +200,13 @@ protected static IcebergTableStats populateTableMetadata(Table table, IcebergTab
.sharingEnabled(
policyMap.containsKey("sharingEnabled") && (Boolean) policyMap.get("sharingEnabled"))
.retentionPolicies(buildRetentionStats(policyMap))
.earliestPartitionDate(
getEarliestPartitionDate(
table,
spark,
policyMap.containsKey("columnName")
? (String) policyMap.get("columnName")
: getPartitionColumnName(table)))
.build();
}

Expand Down Expand Up @@ -247,6 +257,43 @@ private static long getSumOfFileSizeBytes(Dataset<Row> allDataFiles) {
.getLong(0);
}

/**
* Get the earliest partition date on a table.
*
* @param table
* @param spark
* @param partitionColumnName
* @return
*/
private static String getEarliestPartitionDate(
Table table, SparkSession spark, String partitionColumnName) {
if (partitionColumnName == null) {
return null;
}

Dataset<Row> partitionData =
SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.PARTITIONS);
String partitionColumn = String.format("partition.%s", partitionColumnName);

String earliestPartitionDate =
partitionData
.select(partitionColumn)
.orderBy(functions.asc(partitionColumn))
.first()
.get(0)
.toString();

return earliestPartitionDate;
}

private static String getPartitionColumnName(Table table) {
return StreamSupport.stream(table.spec().partitionType().fields().spliterator(), false)
.filter(field -> field.type() instanceof Types.DateType)
.map(Types.NestedField::name)
.findFirst()
.orElse(null);
}

/**
* Get table policies.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -527,6 +529,40 @@ private static Path prepareOrphanTableDirectory(Operations ops, String tableName
return tbLoc;
}

@Test
public void testCollectEarliestPartitionDateStat() throws Exception {
final String tableName = "db.test_collect_earliest_partition_date";
List<String> rowValue = new ArrayList<>();

try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// Test table with no partition
prepareTable(ops, tableName);
IcebergTableStats stats = ops.collectTableStats(tableName);
Assertions.assertNull(stats.getEarliestPartitionDate());

// Test yyyy-mm-dd format on table with multiple partitioned columns
prepareTableWithPoliciesWithMultipleStringPartition(ops, tableName, "30d", false);
rowValue.add("202%s-07-16");
rowValue.add("202%s-07-17");
rowValue.add("202%s-08-16");
rowValue.add("202%s-09-16");
populateTableWithMultipleStringColumn(ops, tableName, 1, rowValue);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(stats.getEarliestPartitionDate(), "202%s-07-16");
rowValue.clear();

// Test timestamp format
prepareTableWithPolicies(ops, tableName, "30d", false);
populateTable(ops, tableName, 1, 2);
populateTable(ops, tableName, 1, 1);
populateTable(ops, tableName, 1, 0);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(
stats.getEarliestPartitionDate(),
LocalDate.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
}

@Test
public void testCollectTablePolicyStats() throws Exception {
final String tableName = "db.test_collect_table_stats_with_policy";
Expand Down Expand Up @@ -662,6 +698,20 @@ private static void populateTableWithStringColumn(
}
}

private static void populateTableWithMultipleStringColumn(
Operations ops, String tableName, int numRows, List<String> dataFormats) {
for (String dataFormat : dataFormats) {
for (int row = 0; row < numRows; ++row) {
ops.spark()
.sql(
String.format(
"INSERT INTO %s VALUES ('%s', '%s', %d)",
tableName, dataFormat, String.format(dataFormat, row), row))
.show();
}
}
}

private static void prepareTable(Operations ops, String tableName) {
prepareTable(ops, tableName, false);
}
Expand Down Expand Up @@ -739,6 +789,24 @@ private static void prepareTableWithPoliciesWithStringPartition(
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithMultipleStringPartition(
Operations ops, String tableName, String retention, boolean sharing) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (datepartition string, data string, num int) PARTITIONED BY (datepartition, num)",
tableName))
.show();
ops.spark()
.sql(
String.format(
"ALTER TABLE %s SET POLICY (RETENTION=%s ON COLUMN datepartition)",
tableName, retention));
ops.spark().sql(String.format("ALTER TABLE %s SET POLICY (SHARING=%s)", tableName, sharing));
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithCustomStringPartition(
Operations ops, String tableName, String retention, String pattern) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public class BaseTableMetadata {
private Boolean sharingEnabled;

private RetentionStatsSchema retentionPolicies;

private String earliestPartitionDate;
}

0 comments on commit c76185f

Please sign in to comment.