Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add earliest partition date stat to table stats collector #205

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

/** Utility class to collect stats for a given table. */
@Slf4j
Expand Down Expand Up @@ -106,12 +108,15 @@ protected static IcebergTableStats populateStatsForSnapshots(
.map(snapshot -> snapshot.timestampMillis())
.orElse(null);

String earliestPartitionDate = getEarliestPartitionDate(table, spark, getTablePolicies(table));

log.info(
"Table: {}, Count of total Data files: {}, Sum of file sizes in bytes: {} for snaphot: {}",
"Table: {}, Count of total Data files: {}, Sum of file sizes in bytes: {}, Earliest partition date: {}, for snapshot: {}",
fqtn,
countOfDataFiles,
sumOfFileSizeBytes,
currentSnapshotId);
currentSnapshotId,
earliestPartitionDate);

// Find minimum timestamp of all snapshots where snapshots is iterator
Long oldestSnapshotTimestamp =
Expand All @@ -127,6 +132,7 @@ protected static IcebergTableStats populateStatsForSnapshots(
.oldestSnapshotTimestamp(oldestSnapshotTimestamp)
.numCurrentSnapshotReferencedDataFiles(countOfDataFiles)
.totalCurrentSnapshotReferencedDataFilesSizeInBytes(sumOfFileSizeBytes)
.earliestPartitionDate(earliestPartitionDate)
.build();
}

Expand Down Expand Up @@ -247,6 +253,47 @@ private static long getSumOfFileSizeBytes(Dataset<Row> allDataFiles) {
.getLong(0);
}

/**
* Get the earliest partition date on a table.
*
* @param table
* @param spark
* @param policyMap
* @return
*/
private static String getEarliestPartitionDate(
Table table, SparkSession spark, Map<String, Object> policyMap) {
String partitionColumnName =
policyMap.containsKey("columnName")
? (String) policyMap.get("columnName")
: getPartitionColumnName(table);
if (partitionColumnName == null) {
return null;
}

Dataset<Row> partitionData =
SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.PARTITIONS);
String partitionColumn = String.format("partition.%s", partitionColumnName);

String earliestPartitionDate =
partitionData
.select(partitionColumn)
.orderBy(functions.asc(partitionColumn))
.first()
.get(0)
.toString();

return earliestPartitionDate;
}

private static String getPartitionColumnName(Table table) {
return StreamSupport.stream(table.spec().partitionType().fields().spliterator(), false)
.filter(field -> field.type() instanceof Types.DateType)
chenselena marked this conversation as resolved.
Show resolved Hide resolved
.map(Types.NestedField::name)
.findFirst()
.orElse(null);
}

/**
* Get table policies.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -527,6 +529,40 @@ private static Path prepareOrphanTableDirectory(Operations ops, String tableName
return tbLoc;
}

@Test
public void testCollectEarliestPartitionDateStat() throws Exception {
final String tableName = "db.test_collect_earliest_partition_date";
List<String> rowValue = new ArrayList<>();

try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// Test table with no partition
prepareTable(ops, tableName);
IcebergTableStats stats = ops.collectTableStats(tableName);
Assertions.assertNull(stats.getEarliestPartitionDate());

// Test yyyy-mm-dd format on table with multiple partitioned columns
prepareTableWithPoliciesWithMultipleStringPartition(ops, tableName, "30d", false);
rowValue.add("202%s-07-16");
rowValue.add("202%s-07-17");
rowValue.add("202%s-08-16");
rowValue.add("202%s-09-16");
populateTableWithMultipleStringColumn(ops, tableName, 1, rowValue);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(stats.getEarliestPartitionDate(), "202%s-07-16");
rowValue.clear();

// Test timestamp format
prepareTableWithPolicies(ops, tableName, "30d", false);
populateTable(ops, tableName, 1, 2);
populateTable(ops, tableName, 1, 1);
populateTable(ops, tableName, 1, 0);
stats = ops.collectTableStats(tableName);
Assertions.assertEquals(
stats.getEarliestPartitionDate(),
LocalDate.now().minusDays(2).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
}
}

@Test
public void testCollectTablePolicyStats() throws Exception {
final String tableName = "db.test_collect_table_stats_with_policy";
Expand Down Expand Up @@ -662,6 +698,20 @@ private static void populateTableWithStringColumn(
}
}

private static void populateTableWithMultipleStringColumn(
Operations ops, String tableName, int numRows, List<String> dataFormats) {
for (String dataFormat : dataFormats) {
for (int row = 0; row < numRows; ++row) {
ops.spark()
.sql(
String.format(
"INSERT INTO %s VALUES ('%s', '%s', %d)",
tableName, dataFormat, String.format(dataFormat, row), row))
.show();
}
}
}

private static void prepareTable(Operations ops, String tableName) {
prepareTable(ops, tableName, false);
}
Expand Down Expand Up @@ -739,6 +789,24 @@ private static void prepareTableWithPoliciesWithStringPartition(
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithMultipleStringPartition(
Operations ops, String tableName, String retention, boolean sharing) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (datepartition string, data string, num int) PARTITIONED BY (datepartition, num)",
tableName))
.show();
ops.spark()
.sql(
String.format(
"ALTER TABLE %s SET POLICY (RETENTION=%s ON COLUMN datepartition)",
tableName, retention));
ops.spark().sql(String.format("ALTER TABLE %s SET POLICY (SHARING=%s)", tableName, sharing));
ops.spark().sql(String.format("DESCRIBE %s", tableName)).show();
}

private static void prepareTableWithPoliciesWithCustomStringPartition(
Operations ops, String tableName, String retention, String pattern) {
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public class IcebergTableStats extends BaseTableMetadata {
private Long numReferencedManifestFiles;

private Long numReferencedManifestLists;

private String earliestPartitionDate;
}
Loading