Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add earliest partition date stat to table stats collector #205

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public IcebergTableStats collectTableStats() {
IcebergTableStats stats = IcebergTableStats.builder().build();

IcebergTableStats statsWithMetadataData =
TableStatsCollectorUtil.populateTableMetadata(table, stats);
TableStatsCollectorUtil.populateTableMetadata(table, spark, stats);
IcebergTableStats statsWithReferenceFiles =
TableStatsCollectorUtil.populateStatsOfAllReferencedFiles(
fqtn, table, spark, statsWithMetadataData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

/** Utility class to collect stats for a given table. */
@Slf4j
Expand Down Expand Up @@ -172,9 +174,11 @@ protected static IcebergTableStats populateStorageStats(
* Collect table metadata for a given table.
*
* @param table
* @param spark
* @param stats
*/
protected static IcebergTableStats populateTableMetadata(Table table, IcebergTableStats stats) {
protected static IcebergTableStats populateTableMetadata(
Table table, SparkSession spark, IcebergTableStats stats) {
Map<String, Object> policyMap = getTablePolicies(table);
return stats
.builder()
Expand All @@ -197,6 +201,7 @@ protected static IcebergTableStats populateTableMetadata(Table table, IcebergTab
.sharingEnabled(
policyMap.containsKey("sharingEnabled") && (Boolean) policyMap.get("sharingEnabled"))
.retentionPolicies(buildRetentionStats(policyMap))
.earliestPartitionDate(getEarliestPartitionDate(table, spark, policyMap))
chenselena marked this conversation as resolved.
Show resolved Hide resolved
.build();
}

Expand Down Expand Up @@ -247,6 +252,47 @@ private static long getSumOfFileSizeBytes(Dataset<Row> allDataFiles) {
.getLong(0);
}

/**
* Get the earliest partition date on a table.
*
* @param table
* @param spark
* @param policyMap
* @return
*/
private static String getEarliestPartitionDate(
Table table, SparkSession spark, Map<String, Object> policyMap) {
String partitionColumnName =
policyMap.containsKey("columnName")
? (String) policyMap.get("columnName")
: getPartitionColumnName(table);
if (partitionColumnName == null) {
return null;
}

Dataset<Row> partitionData =
SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.PARTITIONS);
String partitionColumn = String.format("partition.%s", partitionColumnName);

String earliestPartitionDate =
partitionData
.select(partitionColumn)
.orderBy(functions.asc(partitionColumn))
.first()
.get(0)
.toString();

return earliestPartitionDate;
}

private static String getPartitionColumnName(Table table) {
return StreamSupport.stream(table.spec().partitionType().fields().spliterator(), false)
.filter(field -> field.type() instanceof Types.DateType)
chenselena marked this conversation as resolved.
Show resolved Hide resolved
.map(Types.NestedField::name)
.findFirst()
.orElse(null);
}

/**
* Get table policies.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -527,6 +529,40 @@ private static Path prepareOrphanTableDirectory(Operations ops, String tableName
return tbLoc;
}

@Test
public void testCollectEarliestPartitionDateStat() throws Exception {
final String tableName = "db.test_collect_earliest_partition_date";
List<String> rowValue = new ArrayList<>();

try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// Test table with no partition
prepareTable(ops, tableName);
IcebergTableStats stats = ops.collectTableStats(tableName);
Assertions.assertNull(stats.getEarliestPartitionDate());

// Test yyyy-mm-dd format on table with multiple partitioned columns
prepareTableWithPoliciesWithMultipleStringPartition(ops, tableName, "30d", false);
rowValue.add("202%s-07-16");
rowValue.add("202%s-07-17");
rowValue.add("202%s-08-16");
rowValue.add("202%s-09-16");
populateTableWithMultipleStringColumn(ops, tableName, 1, rowValue);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(stats.getEarliestPartitionDate(), "202%s-07-16");
rowValue.clear();

// Test timestamp format
prepareTableWithPolicies(ops, tableName, "30d", false);
populateTable(ops, tableName, 1, 2);
populateTable(ops, tableName, 1, 1);
populateTable(ops, tableName, 1, 0);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(
stats.getEarliestPartitionDate(),
LocalDate.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
}

@Test
public void testCollectTablePolicyStats() throws Exception {
final String tableName = "db.test_collect_table_stats_with_policy";
Expand Down Expand Up @@ -662,6 +698,20 @@ private static void populateTableWithStringColumn(
}
}

private static void populateTableWithMultipleStringColumn(
Operations ops, String tableName, int numRows, List<String> dataFormats) {
for (String dataFormat : dataFormats) {
for (int row = 0; row < numRows; ++row) {
ops.spark()
.sql(
String.format(
"INSERT INTO %s VALUES ('%s', '%s', %d)",
tableName, dataFormat, String.format(dataFormat, row), row))
.show();
}
}
}

private static void prepareTable(Operations ops, String tableName) {
prepareTable(ops, tableName, false);
}
Expand Down Expand Up @@ -739,6 +789,24 @@ private static void prepareTableWithPoliciesWithStringPartition(
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithMultipleStringPartition(
Operations ops, String tableName, String retention, boolean sharing) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (datepartition string, data string, num int) PARTITIONED BY (datepartition, num)",
tableName))
.show();
ops.spark()
.sql(
String.format(
"ALTER TABLE %s SET POLICY (RETENTION=%s ON COLUMN datepartition)",
tableName, retention));
ops.spark().sql(String.format("ALTER TABLE %s SET POLICY (SHARING=%s)", tableName, sharing));
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithCustomStringPartition(
Operations ops, String tableName, String retention, String pattern) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public class BaseTableMetadata {
private Boolean sharingEnabled;

private RetentionStatsSchema retentionPolicies;
chenselena marked this conversation as resolved.
Show resolved Hide resolved

private String earliestPartitionDate;
}
Loading