diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java index 37808f742..cc4937063 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java @@ -11,6 +11,7 @@ import io.whitefox.core.*; import io.whitefox.core.Schema; import io.whitefox.core.Share; +import io.whitefox.core.services.capabilities.ClientCapabilities; import io.whitefox.core.services.capabilities.ResponseFormat; import java.util.*; import java.util.stream.Collectors; @@ -29,7 +30,8 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Schema schema2api( .share(schema.share()); } - public static ReadTableRequest api2ReadTableRequest(QueryRequest request) { + public static ReadTableRequest api2ReadTableRequest( + QueryRequest request, ClientCapabilities clientCapabilities) { if (request.getStartingVersion() != null && request.getEndingVersion() != null) { throw new IllegalArgumentException("The startingVersion and endingVersion are not supported"); } else if (request.getStartingVersion() != null) { @@ -43,18 +45,21 @@ public static ReadTableRequest api2ReadTableRequest(QueryRequest request) { Optional.ofNullable(request.getPredicateHints()), Optional.ofNullable(request.getJsonPredicateHints()), Optional.ofNullable(request.getLimitHint()), - request.getVersion()); + request.getVersion(), + clientCapabilities); } else if (request.getVersion() == null && request.getTimestamp() != null) { return new ReadTableRequest.ReadTableAsOfTimestamp( Optional.ofNullable(request.getPredicateHints()), - Optional.ofNullable(request.getJsonPredicateHints()), Optional.ofNullable(request.getLimitHint()), - CommonMappers.parseTimestamp(request.getTimestamp())); + Optional.ofNullable(request.getJsonPredicateHints()), + CommonMappers.parseTimestamp(request.getTimestamp()), + clientCapabilities); } else if (request.getVersion() == null && request.getTimestamp() == null) { return new ReadTableRequest.ReadTableCurrentVersion( Optional.ofNullable(request.getPredicateHints()), Optional.ofNullable(request.getJsonPredicateHints()), - Optional.ofNullable(request.getLimitHint())); + Optional.ofNullable(request.getLimitHint()), + clientCapabilities); } else { throw new IllegalArgumentException("Cannot specify both version and timestamp"); } @@ -83,11 +88,9 @@ private static ParquetMetadata metadata2Api(Metadata metadata) { .numFiles(metadata.numFiles()) .build()) .build(); - case delta: - throw new IllegalArgumentException("Delta response format is not supported"); default: throw new IllegalArgumentException( - String.format("%s response format is not supported", metadata.format())); + String.format("%s file format is not supported", metadata.format())); } } @@ -110,10 +113,12 @@ private static ParquetFile file2Api(TableFile f) { .build(); } - public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest( - QueryRequest request, String share, String schema, String table) { - return new TableReferenceAndReadRequest(share, schema, table, api2ReadTableRequest(request)); - } + // public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest( + // QueryRequest request, String share, String schema, String table, ClientCapabilities + // clientCapabilities) { + // return new TableReferenceAndReadRequest(share, schema, table, api2ReadTableRequest(request, + // clientCapabilities)); + // } public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api( SharedTable sharedTable) { diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java index eceda2e08..0f18599d0 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java @@ -90,12 +90,12 @@ public Response getTableMetadata( deltaSharesService.getTableVersion(share, schema, table, startingTimestamp), v -> Response.ok( tableResponseSerializer.serialize( - DeltaMappers.toTableResponseMetadata(m)), + DeltaMappers.toTableResponseMetadata(m.metadata())), ndjsonMediaType) .header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v)) .header( DELTA_SHARE_CAPABILITIES_HEADER, - DeltaMappers.toResponseFormatHeader(m.format())) + DeltaMappers.toResponseFormatHeader(m.responseFormat())) .build())); }, exceptionToResponse); @@ -196,12 +196,13 @@ public Response queryTable( String deltaSharingCapabilities) { return wrapExceptions( () -> { + var clientCapabilities = + clientCapabilitiesMapper.parseDeltaSharingCapabilities(deltaSharingCapabilities); var readResult = deltaSharesService.queryTable( share, schema, table, - DeltaMappers.api2ReadTableRequest(queryRequest), - clientCapabilitiesMapper.parseDeltaSharingCapabilities(deltaSharingCapabilities)); + DeltaMappers.api2ReadTableRequest(queryRequest, clientCapabilities)); var serializedReadResult = tableQueryResponseSerializer.serialize(DeltaMappers.readTableResult2api(readResult)); return Response.ok(serializedReadResult, ndjsonMediaType) diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapperTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapperTest.java index 80a9bf0e1..ba5354012 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapperTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapperTest.java @@ -17,42 +17,42 @@ public class ClientCapabilitiesMapperTest implements DeltaHeaders { @Test void parseSimpleResponseFormatDelta() { Assertions.assertEquals( - ResponseFormat.delta, - mapper.parseDeltaSharingCapabilities(responseFormatDelta).responseFormat()); + Set.of(ResponseFormat.delta, ResponseFormat.parquet), + mapper.parseDeltaSharingCapabilities(responseFormatDelta).responseFormats()); } @Test void parseSimpleResponseFormatParquet() { Assertions.assertEquals( - ResponseFormat.parquet, - mapper.parseDeltaSharingCapabilities("responseformat=PaRquEt").responseFormat()); + Set.of(ResponseFormat.parquet), + mapper.parseDeltaSharingCapabilities("responseformat=PaRquEt").responseFormats()); } @Test void failToParseUnknownResponseFormatAndFail() { Assertions.assertThrows( UnknownResponseFormat.class, - () -> mapper.parseDeltaSharingCapabilities("responseformat=iceberg").responseFormat()); + () -> mapper.parseDeltaSharingCapabilities("responseformat=iceberg").responseFormats()); } @Test void failToParseUnknownResponseFormatAndReturnOthers() { Assertions.assertEquals( - ResponseFormat.delta, + Set.of(ResponseFormat.delta, ResponseFormat.parquet), mapper .parseDeltaSharingCapabilities("responseformat=iceberg,parquet,delta") - .responseFormat()); + .responseFormats()); } @Test void noCapabilitiesEqualsDefault() { Assertions.assertEquals( - ResponseFormat.parquet, - mapper.parseDeltaSharingCapabilities((String) null).responseFormat()); + Set.of(ResponseFormat.parquet), + mapper.parseDeltaSharingCapabilities((String) null).responseFormats()); Assertions.assertEquals( Set.of(), mapper.parseDeltaSharingCapabilities((String) null).readerFeatures()); Assertions.assertEquals( - ResponseFormat.parquet, mapper.parseDeltaSharingCapabilities("").responseFormat()); + Set.of(ResponseFormat.parquet), mapper.parseDeltaSharingCapabilities("").responseFormats()); Assertions.assertEquals(Set.of(), mapper.parseDeltaSharingCapabilities("").readerFeatures()); } @@ -103,7 +103,8 @@ void kitchenSink() { var responseFormat = "responseformat=iceberg,parquet,delta"; var capabilities = mapper.parseDeltaSharingCapabilities( String.format("%s;%s", readerFeatures, responseFormat)); - Assertions.assertEquals(ResponseFormat.delta, capabilities.responseFormat()); + Assertions.assertEquals( + Set.of(ResponseFormat.delta, ResponseFormat.parquet), capabilities.responseFormats()); Assertions.assertEquals( Set.of(ReaderFeatures.COLUMN_MAPPING, ReaderFeatures.DOMAIN_METADATA), capabilities.readerFeatures()); diff --git a/server/core/src/main/java/io/whitefox/core/FileFormat.java b/server/core/src/main/java/io/whitefox/core/FileFormat.java new file mode 100644 index 000000000..7a408c1a9 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/FileFormat.java @@ -0,0 +1,5 @@ +package io.whitefox.core; + +public enum FileFormat { + parquet +} diff --git a/server/core/src/main/java/io/whitefox/core/Metadata.java b/server/core/src/main/java/io/whitefox/core/Metadata.java index 53f074adf..465ea30a5 100644 --- a/server/core/src/main/java/io/whitefox/core/Metadata.java +++ b/server/core/src/main/java/io/whitefox/core/Metadata.java @@ -1,6 +1,5 @@ package io.whitefox.core; -import io.whitefox.core.services.capabilities.ResponseFormat; import java.util.List; import java.util.Map; import java.util.Optional; @@ -11,7 +10,7 @@ public class Metadata { String id; Optional name; Optional description; - ResponseFormat format; + FileFormat format; TableSchema tableSchema; List partitionColumns; Map configuration; diff --git a/server/core/src/main/java/io/whitefox/core/ReadTableRequest.java b/server/core/src/main/java/io/whitefox/core/ReadTableRequest.java index 55950d454..0230f765b 100644 --- a/server/core/src/main/java/io/whitefox/core/ReadTableRequest.java +++ b/server/core/src/main/java/io/whitefox/core/ReadTableRequest.java @@ -1,190 +1,43 @@ package io.whitefox.core; -import io.whitefox.annotations.SkipCoverageGenerated; +import io.whitefox.core.services.capabilities.ClientCapabilities; import java.util.List; -import java.util.Objects; import java.util.Optional; +import lombok.Value; public interface ReadTableRequest { - class ReadTableVersion implements ReadTableRequest { - private final Optional> predicateHints; - private final Optional jsonPredicateHints; - private final Optional limitHint; - - private final Long version; - - public ReadTableVersion( - Optional> predicateHints, - Optional jsonPredicateHints, - Optional limitHint, - Long version) { - - this.predicateHints = predicateHints; - this.jsonPredicateHints = jsonPredicateHints; - this.limitHint = limitHint; - this.version = version; - } - - public Optional jsonPredicateHints() { - return jsonPredicateHints; - } - - public Optional> predicateHints() { - return predicateHints; - } - - public Optional limitHint() { - return limitHint; - } + Optional> predicateHints(); - public Long version() { - return version; - } + Optional jsonPredicateHints(); - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ReadTableVersion that = (ReadTableVersion) o; - return Objects.equals(predicateHints, that.predicateHints) - && Objects.equals(jsonPredicateHints, that.jsonPredicateHints) - && Objects.equals(limitHint, that.limitHint) - && Objects.equals(version, that.version); - } + Optional limitHint(); - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash(predicateHints, jsonPredicateHints, limitHint, version); - } + ClientCapabilities clientCapabilities(); - @Override - @SkipCoverageGenerated - public String toString() { - return "ReadTableVersion{" + "predicateHints=" - + predicateHints + "jsonPredicateHints=" - + jsonPredicateHints + ", limitHint=" - + limitHint + ", version=" - + version + '}'; - } + @Value + class ReadTableVersion implements ReadTableRequest { + Optional> predicateHints; + Optional jsonPredicateHints; + Optional limitHint; + Long version; + ClientCapabilities clientCapabilities; } + @Value class ReadTableAsOfTimestamp implements ReadTableRequest { - private final Optional> predicateHints; - - private final Optional limitHint; - private final Optional jsonPredicateHints; - private final Long timestamp; - - public ReadTableAsOfTimestamp( - Optional> predicateHints, - Optional jsonPredicateHints, - Optional limitHint, - Long timestamp) { - - this.predicateHints = predicateHints; - this.jsonPredicateHints = jsonPredicateHints; - this.limitHint = limitHint; - this.timestamp = timestamp; - } - - public Optional jsonPredicateHints() { - return jsonPredicateHints; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ReadTableAsOfTimestamp that = (ReadTableAsOfTimestamp) o; - return Objects.equals(predicateHints, that.predicateHints) - && Objects.equals(jsonPredicateHints, that.jsonPredicateHints) - && Objects.equals(limitHint, that.limitHint) - && Objects.equals(timestamp, that.timestamp); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash(jsonPredicateHints, predicateHints, limitHint, timestamp); - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "ReadTableAsOfTimestamp{" + "predicateHints=" - + predicateHints + "jsonPredicateHints=" - + jsonPredicateHints + ", limitHint=" - + limitHint + ", timestamp=" - + timestamp + '}'; - } - - public Optional> predicateHints() { - return predicateHints; - } - - public Optional limitHint() { - return limitHint; - } - - public Long timestamp() { - return timestamp; - } + Optional> predicateHints; + Optional limitHint; + Optional jsonPredicateHints; + Long timestamp; + ClientCapabilities clientCapabilities; } + @Value class ReadTableCurrentVersion implements ReadTableRequest { - private final Optional> predicateHints; - private final Optional jsonPredicateHints; - private final Optional limitHint; - - public ReadTableCurrentVersion( - Optional> predicateHints, - Optional jsonPredicateHints, - Optional limitHint) { - this.predicateHints = predicateHints; - this.jsonPredicateHints = jsonPredicateHints; - this.limitHint = limitHint; - } - - public Optional> predicateHints() { - return predicateHints; - } - - public Optional jsonPredicateHints() { - return jsonPredicateHints; - } - - public Optional limitHint() { - return limitHint; - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "ReadTableCurrentVersion{" + "predicateHints=" - + predicateHints + "jsonPredicateHints=" - + jsonPredicateHints + ", limitHint=" - + limitHint + '}'; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ReadTableCurrentVersion that = (ReadTableCurrentVersion) o; - return Objects.equals(predicateHints, that.predicateHints) - && Objects.equals(jsonPredicateHints, that.jsonPredicateHints) - && Objects.equals(limitHint, that.limitHint); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash(jsonPredicateHints, predicateHints, limitHint); - } + Optional> predicateHints; + Optional jsonPredicateHints; + Optional limitHint; + ClientCapabilities clientCapabilities; } } diff --git a/server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java b/server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java index 20324b7d4..a34d5f3e9 100644 --- a/server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java +++ b/server/core/src/main/java/io/whitefox/core/ReadTableResultToBeSigned.java @@ -1,5 +1,6 @@ package io.whitefox.core; +import io.whitefox.core.services.capabilities.ResponseFormat; import java.util.List; import lombok.Value; @@ -7,6 +8,7 @@ public class ReadTableResultToBeSigned { Protocol protocol; Metadata metadata; - List other; + List filesToBeSigned; long version; + ResponseFormat responseFormat; } diff --git a/server/core/src/main/java/io/whitefox/core/SharedTable.java b/server/core/src/main/java/io/whitefox/core/SharedTable.java index 85a6abb65..569a5b515 100644 --- a/server/core/src/main/java/io/whitefox/core/SharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/SharedTable.java @@ -59,4 +59,8 @@ public String toString() { + share + '\'' + ", internalTable=" + internalTable + '}'; } + + public String description() { + return "name='" + name + '\'' + ", schema='" + schema + '\'' + ", share='" + share + '\''; + } } diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java index abdc92656..5cc4047f4 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java @@ -6,28 +6,33 @@ import io.delta.standalone.DeltaLog; import io.delta.standalone.Snapshot; import io.delta.standalone.actions.AddFile; +import io.delta.standalone.internal.DeltaLogImpl; +import io.delta.standalone.internal.SnapshotImpl; import io.whitefox.core.*; import io.whitefox.core.Metadata; import io.whitefox.core.TableSchema; +import io.whitefox.core.services.capabilities.ClientCapabilities; import io.whitefox.core.services.capabilities.ResponseFormat; +import io.whitefox.core.services.exceptions.IncompatibleTableWithClient; import io.whitefox.core.types.predicates.PredicateException; import java.sql.Timestamp; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.commons.lang3.NotImplementedException; import org.apache.log4j.Logger; public class DeltaSharedTable implements InternalSharedTable { private final Logger logger = Logger.getLogger(this.getClass()); - private final DeltaLog deltaLog; + private final DeltaLogImpl deltaLog; private final TableSchemaConverter tableSchemaConverter; private final SharedTable tableDetails; private final String location; private DeltaSharedTable( - DeltaLog deltaLog, + DeltaLogImpl deltaLog, TableSchemaConverter tableSchemaConverter, SharedTable sharedTable, String location) { @@ -53,7 +58,7 @@ public static DeltaSharedTable of( throw new IllegalArgumentException( String.format("Cannot find a delta table at %s", dataPath)); } - return new DeltaSharedTable(dt, tableSchemaConverter, sharedTable, dataPath); + return new DeltaSharedTable((DeltaLogImpl) dt, tableSchemaConverter, sharedTable, dataPath); } else { throw new IllegalArgumentException( String.format("%s is not a delta table", sharedTable.name())); @@ -64,8 +69,13 @@ public static DeltaSharedTable of(SharedTable sharedTable) { return of(sharedTable, TableSchemaConverter.INSTANCE, new HadoopConfigBuilder()); } - public Optional getMetadata(Optional startingTimestamp) { - return getSnapshot(startingTimestamp).map(this::metadataFromSnapshot); + @Override + public Optional getMetadata( + Optional startingTimestamp, ClientCapabilities clientCapabilities) { + return getSnapshot(startingTimestamp).map(snapshot -> { + final ResponseFormat responseFormat = chooseResponseFormat(snapshot, clientCapabilities); + return new MetadataResponse(metadataFromSnapshot(snapshot), responseFormat); + }); } private Metadata metadataFromSnapshot(Snapshot snapshot) { @@ -73,7 +83,7 @@ private Metadata metadataFromSnapshot(Snapshot snapshot) { snapshot.getMetadata().getId(), Optional.of(tableDetails.name()), Optional.ofNullable(snapshot.getMetadata().getDescription()), - ResponseFormat.parquet, + FileFormat.parquet, new TableSchema(tableSchemaConverter.convertDeltaSchemaToWhitefox( snapshot.getMetadata().getSchema())), snapshot.getMetadata().getPartitionColumns(), @@ -121,60 +131,82 @@ public boolean filterFilesBasedOnJsonPredicates(Optional predicates, Add } } + private ResponseFormat chooseResponseFormat( + SnapshotImpl snapshotImpl, ClientCapabilities clientCapabilities) { + return _chooseResponseFormat(snapshotImpl, clientCapabilities) + .orElseThrow(() -> new IncompatibleTableWithClient(String.format( + "Table %s cannot be read by client with capabilities %s", + tableDetails.description(), clientCapabilities))); + } + + private Optional _chooseResponseFormat( + SnapshotImpl snapshotImpl, ClientCapabilities clientCapabilities) { + if (snapshotImpl.protocol().getMinReaderVersion() == 1 + && clientCapabilities.isCompatibleWith(ResponseFormat.parquet)) { + return Optional.of(ResponseFormat.parquet); + } else if (clientCapabilities.isCompatibleWith(ResponseFormat.delta)) { + return Optional.of(ResponseFormat.delta); + } else { + return Optional.empty(); + } + } + public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { - Optional predicates; - Optional> sqlPredicates; - Snapshot snapshot; + final Optional predicates = readTableRequest.jsonPredicateHints(); + final Optional> sqlPredicates = readTableRequest.predicateHints(); + SnapshotImpl snapshot; if (readTableRequest instanceof ReadTableRequest.ReadTableCurrentVersion) { snapshot = deltaLog.snapshot(); - predicates = - ((ReadTableRequest.ReadTableCurrentVersion) readTableRequest).jsonPredicateHints(); - sqlPredicates = - ((ReadTableRequest.ReadTableCurrentVersion) readTableRequest).predicateHints(); } else if (readTableRequest instanceof ReadTableRequest.ReadTableAsOfTimestamp) { snapshot = deltaLog.getSnapshotForTimestampAsOf( ((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).timestamp()); - predicates = - ((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).jsonPredicateHints(); - sqlPredicates = ((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).predicateHints(); } else if (readTableRequest instanceof ReadTableRequest.ReadTableVersion) { snapshot = deltaLog.getSnapshotForVersionAsOf( ((ReadTableRequest.ReadTableVersion) readTableRequest).version()); - predicates = ((ReadTableRequest.ReadTableVersion) readTableRequest).jsonPredicateHints(); - sqlPredicates = ((ReadTableRequest.ReadTableVersion) readTableRequest).predicateHints(); } else { throw new IllegalArgumentException("Unknown ReadTableRequest type: " + readTableRequest); } - var metadata = metadataFromSnapshot(snapshot); - return new ReadTableResultToBeSigned( - new Protocol(Optional.of(1)), - metadata, - snapshot.getAllFiles().stream() - .filter(f -> filterFilesBasedOnJsonPredicates(predicates, f)) - .filter(f -> filterFilesBasedOnSqlPredicates(sqlPredicates, f, metadata)) - .map(f -> new TableFileToBeSigned( - location() + "/" + f.getPath(), - f.getSize(), - snapshot.getVersion(), - snapshot.getMetadata().getCreatedTime(), - f.getStats(), - f.getPartitionValues())) - .collect(Collectors.toList()), - snapshot.getVersion()); + + final ResponseFormat responseFormat = + chooseResponseFormat(snapshot, readTableRequest.clientCapabilities()); + if (ResponseFormat.parquet == responseFormat) { + final var metadata = metadataFromSnapshot(snapshot); + return new ReadTableResultToBeSigned( + new Protocol(Optional.of(1)), + metadata, + snapshot.getAllFiles().stream() + .filter(f -> filterFilesBasedOnJsonPredicates(predicates, f)) + .filter(f -> filterFilesBasedOnSqlPredicates(sqlPredicates, f, metadata)) + .map(f -> new TableFileToBeSigned( + location() + "/" + f.getPath(), + f.getSize(), + snapshot.getVersion(), + snapshot.getMetadata().getCreatedTime(), + f.getStats(), + f.getPartitionValues())) + .collect(Collectors.toList()), + snapshot.getVersion(), + responseFormat); + } else { + throw new NotImplementedException(String.format( + "Delta protocol is currently not implemented, " + + "table %s can't be read by the current version of whitefox", + tableDetails.description())); + } } - private Optional getSnapshot(Optional startingTimestamp) { + private Optional getSnapshot(Optional startingTimestamp) { return startingTimestamp .map(Timestamp::getTime) .map(this::getSnapshotForTimestampAsOf) .orElse(Optional.of(getSnapshot())); } - private Snapshot getSnapshot() { + private SnapshotImpl getSnapshot() { return deltaLog.snapshot(); } - private Optional getSnapshotForTimestampAsOf(long l) { + private Optional getSnapshotForTimestampAsOf(long l) { try { return Optional.of(deltaLog.getSnapshotForTimestampAsOf(l)); } catch (IllegalArgumentException iea) { diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java index 3d6bcc297..6eec1f154 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java @@ -1,7 +1,6 @@ package io.whitefox.core.services; import io.whitefox.core.*; -import io.whitefox.core.Metadata; import io.whitefox.core.Schema; import io.whitefox.core.Share; import io.whitefox.core.SharedTable; @@ -18,7 +17,7 @@ Optional getTableVersion( ContentAndToken> listShares( Optional nextPageToken, Optional maxResults); - Optional getTableMetadata( + Optional getTableMetadata( String share, String schema, String table, @@ -38,9 +37,5 @@ Optional>> listTablesOfShare( String share, Optional token, Optional maxResults); ReadTableResult queryTable( - String share, - String schema, - String table, - ReadTableRequest queryRequest, - ClientCapabilities clientCapabilities); + String share, String schema, String table, ReadTableRequest queryRequest); } diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java index 41e1355f0..3c3f966c8 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java @@ -2,8 +2,6 @@ import io.whitefox.core.*; import io.whitefox.core.services.capabilities.ClientCapabilities; -import io.whitefox.core.services.capabilities.ResponseFormat; -import io.whitefox.core.services.exceptions.IncompatibleTableWithClient; import io.whitefox.core.services.exceptions.TableNotFound; import io.whitefox.persistence.StorageManager; import jakarta.enterprise.context.ApplicationScoped; @@ -11,7 +9,6 @@ import java.sql.Timestamp; import java.util.List; import java.util.Optional; -import java.util.function.Function; import java.util.stream.Collectors; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -64,7 +61,7 @@ public ContentAndToken> listShares( } @Override - public Optional getTableMetadata( + public Optional getTableMetadata( String share, String schema, String tableName, @@ -73,9 +70,7 @@ public Optional getTableMetadata( var table = storageManager .getSharedTable(share, schema, tableName) .map(t -> tableLoaderFactory.newTableLoader(t.internalTable()).loadTable(t)); - return table - .flatMap(t -> t.getMetadata(startingTimestamp)) - .map(m -> checkResponseFormat(clientCapabilities, Metadata::format, m, tableName)); + return table.flatMap(t -> t.getMetadata(startingTimestamp, clientCapabilities)); } @Override @@ -134,11 +129,7 @@ public Optional>> listTablesOfShare( @Override public ReadTableResult queryTable( - String share, - String schema, - String tableName, - ReadTableRequest queryRequest, - ClientCapabilities clientCapabilities) { + String share, String schema, String tableName, ReadTableRequest queryRequest) { SharedTable sharedTable = storageManager .getSharedTable(share, schema, tableName) .orElseThrow(() -> new TableNotFound(String.format( @@ -146,36 +137,17 @@ public ReadTableResult queryTable( var fileSigner = fileSignerFactory.newFileSigner(sharedTable.internalTable().provider().storage()); - var readTableResultToBeSigned = tableLoaderFactory - .newTableLoader(sharedTable.internalTable()) - .loadTable(sharedTable) - .queryTable(queryRequest); - return checkResponseFormat( - clientCapabilities, - ReadTableResult::responseFormat, - new ReadTableResult( - readTableResultToBeSigned.protocol(), - readTableResultToBeSigned.metadata(), - readTableResultToBeSigned.other().stream() - .map(fileSigner::sign) - .collect(Collectors.toList()), - readTableResultToBeSigned.version(), - ResponseFormat.parquet), - tableName); - } + var loadedTable = + tableLoaderFactory.newTableLoader(sharedTable.internalTable()).loadTable(sharedTable); - private A checkResponseFormat( - ClientCapabilities clientCapabilities, - Function formatExtractor, - A formatContainer, - String tableName) { - if (!clientCapabilities - .responseFormat() - .isCompatibleWith(formatExtractor.apply(formatContainer))) { - throw new IncompatibleTableWithClient( - "Table " + tableName + " is not compatible with client " + clientCapabilities); - } else { - return formatContainer; - } + var readTableResultToBeSigned = loadedTable.queryTable(queryRequest); + return new ReadTableResult( + readTableResultToBeSigned.protocol(), + readTableResultToBeSigned.metadata(), + readTableResultToBeSigned.filesToBeSigned().stream() + .map(fileSigner::sign) + .collect(Collectors.toList()), + readTableResultToBeSigned.version(), + readTableResultToBeSigned.responseFormat()); } } diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index fb160b527..54c88c34a 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -1,9 +1,7 @@ package io.whitefox.core.services; -import io.whitefox.core.Metadata; -import io.whitefox.core.ReadTableRequest; -import io.whitefox.core.ReadTableResultToBeSigned; -import io.whitefox.core.TableSchema; +import io.whitefox.core.*; +import io.whitefox.core.services.capabilities.ClientCapabilities; import io.whitefox.core.services.capabilities.ResponseFormat; import java.sql.Timestamp; import java.util.Optional; @@ -33,16 +31,12 @@ public static IcebergSharedTable of(Table icebergTable) { return new IcebergSharedTable(icebergTable, new TableSchemaConverter()); } - public Optional getMetadata(Optional startingTimestamp) { - return getSnapshot(startingTimestamp).map(this::getMetadataFromSnapshot); - } - private Metadata getMetadataFromSnapshot(Snapshot snapshot) { return new Metadata( String.valueOf(snapshot.snapshotId()), Optional.of(icebergTable.name()), Optional.empty(), - ResponseFormat.parquet, + FileFormat.parquet, new TableSchema(tableSchemaConverter.convertIcebergSchemaToWhitefox( icebergTable.schema().asStruct())), icebergTable.spec().fields().stream() @@ -80,4 +74,11 @@ public Optional getTableVersion(Optional startingTimestamp) { public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { throw new NotImplementedException(); } + + @Override + public Optional getMetadata( + Optional startingTimestamp, ClientCapabilities clientCapabilities) { + return getSnapshot(startingTimestamp) + .map(s -> new MetadataResponse(getMetadataFromSnapshot(s), ResponseFormat.parquet)); + } } diff --git a/server/core/src/main/java/io/whitefox/core/services/InternalSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/InternalSharedTable.java index 7ab99a583..d56eeb4df 100644 --- a/server/core/src/main/java/io/whitefox/core/services/InternalSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/InternalSharedTable.java @@ -1,14 +1,15 @@ package io.whitefox.core.services; -import io.whitefox.core.Metadata; import io.whitefox.core.ReadTableRequest; import io.whitefox.core.ReadTableResultToBeSigned; +import io.whitefox.core.services.capabilities.ClientCapabilities; import java.sql.Timestamp; import java.util.Optional; public interface InternalSharedTable { - Optional getMetadata(Optional startingTimestamp); + Optional getMetadata( + Optional startingTimestamp, ClientCapabilities clientCapabilities); Optional getTableVersion(Optional startingTimestamp); diff --git a/server/core/src/main/java/io/whitefox/core/services/MetadataResponse.java b/server/core/src/main/java/io/whitefox/core/services/MetadataResponse.java new file mode 100644 index 000000000..1afa9aec5 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/MetadataResponse.java @@ -0,0 +1,11 @@ +package io.whitefox.core.services; + +import io.whitefox.core.Metadata; +import io.whitefox.core.services.capabilities.ResponseFormat; +import lombok.Value; + +@Value +public class MetadataResponse { + Metadata metadata; + ResponseFormat responseFormat; +} diff --git a/server/core/src/main/java/io/whitefox/core/services/capabilities/ClientCapabilities.java b/server/core/src/main/java/io/whitefox/core/services/capabilities/ClientCapabilities.java index c4f3b2a8d..6001f5166 100644 --- a/server/core/src/main/java/io/whitefox/core/services/capabilities/ClientCapabilities.java +++ b/server/core/src/main/java/io/whitefox/core/services/capabilities/ClientCapabilities.java @@ -6,7 +6,13 @@ public interface ClientCapabilities { Set readerFeatures(); - ResponseFormat responseFormat(); + Set responseFormats(); + + /** + * This is seen from the client perspective, i.e. a parquet client is not compatible with a delta response + * while the other way around is compatible + */ + boolean isCompatibleWith(ResponseFormat other); ParquetClientCapabilities PARQUET_INSTANCE = new ParquetClientCapabilities(); @@ -27,13 +33,18 @@ public Set readerFeatures() { } @Override - public ResponseFormat responseFormat() { - return ResponseFormat.delta; + public Set responseFormats() { + return Set.of(ResponseFormat.parquet, ResponseFormat.delta); } private DeltaClientCapabilities(Set readerFeatures) { this.readerFeatures = Collections.unmodifiableSet(readerFeatures); } + + @Override + public boolean isCompatibleWith(ResponseFormat other) { + return responseFormats().contains(other); + } } class ParquetClientCapabilities implements ClientCapabilities { @@ -46,8 +57,13 @@ public Set readerFeatures() { } @Override - public ResponseFormat responseFormat() { - return ResponseFormat.parquet; + public Set responseFormats() { + return Set.of(ResponseFormat.parquet); + } + + @Override + public boolean isCompatibleWith(ResponseFormat other) { + return responseFormats().contains(other); } } } diff --git a/server/core/src/main/java/io/whitefox/core/services/capabilities/ResponseFormat.java b/server/core/src/main/java/io/whitefox/core/services/capabilities/ResponseFormat.java index bbd9b9f88..c48ffcca0 100644 --- a/server/core/src/main/java/io/whitefox/core/services/capabilities/ResponseFormat.java +++ b/server/core/src/main/java/io/whitefox/core/services/capabilities/ResponseFormat.java @@ -1,7 +1,5 @@ package io.whitefox.core.services.capabilities; -import io.whitefox.core.services.exceptions.UnknownResponseFormat; - public enum ResponseFormat { parquet("parquet"), delta("delta"); @@ -15,26 +13,4 @@ public String stringRepresentation() { ResponseFormat(String str) { stringRepresentation = str; } - - /** - * This is seen from the client perspective, i.e. a parquet client is not compatible with a delta response - * while the other way around is compatible - */ - public boolean isCompatibleWith(ResponseFormat other) { - switch (this) { - case parquet: - switch (other) { - case parquet: - return true; - case delta: - return false; - default: - throw new UnknownResponseFormat("Unknown response format: " + other); - } - case delta: - return true; - default: - throw new UnknownResponseFormat("Unknown response format: " + this); - } - } } diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java index e20697748..f2478d0e2 100644 --- a/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java @@ -7,13 +7,13 @@ import io.whitefox.core.Share; import io.whitefox.core.SharedTable; import io.whitefox.core.services.capabilities.ClientCapabilities; +import io.whitefox.core.services.capabilities.ReaderFeatures; import io.whitefox.core.services.exceptions.TableNotFound; import io.whitefox.persistence.StorageManager; import io.whitefox.persistence.memory.InMemoryStorageManager; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; + +import java.util.*; + import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -205,11 +205,12 @@ public void getDeltaTableMetadata() { StorageManager storageManager = new InMemoryStorageManager(shares); DeltaSharesService deltaSharesService = new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory); - var tableMetadata = deltaSharesService.getTableMetadata( + var tableMetadataResponse = deltaSharesService.getTableMetadata( "name", "default", "table1", Optional.empty(), ClientCapabilities.parquet()); - Assertions.assertTrue(tableMetadata.isPresent()); + Assertions.assertTrue(tableMetadataResponse.isPresent()); Assertions.assertEquals( - "56d48189-cdbc-44f2-9b0e-2bded4c79ed7", tableMetadata.get().id()); + "56d48189-cdbc-44f2-9b0e-2bded4c79ed7", + tableMetadataResponse.get().metadata().id()); } @Test @@ -256,8 +257,7 @@ public void queryExistingTable() { "default", "partitioned-delta-table", new ReadTableRequest.ReadTableCurrentVersion( - Optional.empty(), Optional.empty(), Optional.empty()), - ClientCapabilities.parquet()); + Optional.empty(), Optional.empty(), Optional.empty(), ClientCapabilities.parquet())); Assertions.assertEquals(9, resultTable.files().size()); } @@ -278,7 +278,67 @@ public void queryNonExistingTable() { new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory); Assertions.assertThrows( TableNotFound.class, - () -> deltaSharesService.queryTable( - "name", "default", "tableNotFound", null, ClientCapabilities.parquet())); + () -> deltaSharesService.queryTable("name", "default", "tableNotFound", null)); + } + + @Test + @DisabledOnOs(OS.WINDOWS) + public void queryParquetTableWithDeltaCapabilities() { + var shares = List.of(createShare( + "name", + "key", + Map.of( + "default", + new Schema( + "default", + List.of(new SharedTable( + "partitioned-delta-table", + "default", + "name", + DeltaTestUtils.deltaTable("partitioned-delta-table"))), + "name")))); + StorageManager storageManager = new InMemoryStorageManager(shares); + DeltaSharesService deltaSharesService = + new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory); + var resultTable = deltaSharesService.queryTable( + "name", + "default", + "partitioned-delta-table", + new ReadTableRequest.ReadTableCurrentVersion( + Optional.empty(), Optional.empty(), Optional.empty(), + ClientCapabilities.delta(Set.of(ReaderFeatures.CHECK_CONSTRAINTS)))); + Assertions.assertEquals(9, resultTable.files().size()); + } + + @Test + @DisabledOnOs(OS.WINDOWS) + public void queryDeltaTableWithoutDeltaCapabilities() { + var shares = List.of(createShare( + "name", + "key", + Map.of( + "default", + new Schema( + "default", + List.of(new SharedTable( + "delta-table-v2", + "default", + "name", + DeltaTestUtils.deltaTable("delta-table-v2"))), + "name")))); + StorageManager storageManager = new InMemoryStorageManager(shares); + DeltaSharesService deltaSharesService = + new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory); + Assertions.assertThrows(IllegalArgumentException.class, () -> + deltaSharesService.queryTable( + "name", + "default", + "delta-table-v2", + new ReadTableRequest.ReadTableCurrentVersion( + Optional.empty(), Optional.empty(), Optional.empty(), + ClientCapabilities.parquet())) + ); + } + } diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java index ae9bf1024..2ab04dff0 100644 --- a/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java @@ -8,9 +8,11 @@ import io.whitefox.core.Protocol; import io.whitefox.core.ReadTableRequest; import io.whitefox.core.SharedTable; +import io.whitefox.core.services.capabilities.ClientCapabilities; import java.time.format.DateTimeParseException; import java.util.List; import java.util.Optional; +import java.util.Set; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; @@ -31,9 +33,11 @@ void getTableVersion() { void getTableMetadata() { var PTable = new SharedTable("delta-table", "default", "share1", deltaTable("delta-table")); var DTable = DeltaSharedTable.of(PTable); - var metadata = DTable.getMetadata(Optional.empty()); - assertTrue(metadata.isPresent()); - assertEquals("56d48189-cdbc-44f2-9b0e-2bded4c79ed7", metadata.get().id()); + var metadataResponse = DTable.getMetadata(Optional.empty(), ClientCapabilities.parquet()); + assertTrue(metadataResponse.isPresent()); + assertEquals( + "56d48189-cdbc-44f2-9b0e-2bded4c79ed7", + metadataResponse.get().metadata().id()); } @Test @@ -81,10 +85,10 @@ void queryTableWithoutPredicate() { "partitioned-delta-table", "default", "share1", deltaTable("partitioned-delta-table")); var DTable = DeltaSharedTable.of(PTable); var request = new ReadTableRequest.ReadTableCurrentVersion( - Optional.empty(), Optional.empty(), Optional.empty()); + Optional.empty(), Optional.empty(), Optional.empty(), ClientCapabilities.delta(Set.of())); var response = DTable.queryTable(request); assertEquals(response.protocol(), new Protocol(Optional.of(1))); - assertEquals(response.other().size(), 9); + assertEquals(response.filesToBeSigned().size(), 9); } @Test @@ -105,9 +109,12 @@ void queryTableWithSqlPredicates() { predicatesAndExpectedResult.forEach(p -> { var request = new ReadTableRequest.ReadTableCurrentVersion( - Optional.of(p.getLeft()), Optional.empty(), Optional.empty()); + Optional.of(p.getLeft()), + Optional.empty(), + Optional.empty(), + ClientCapabilities.delta(Set.of())); var response = DTable.queryTable(request); - assertEquals(p.getRight(), response.other().size()); + assertEquals(p.getRight(), response.filesToBeSigned().size()); }); } @@ -119,9 +126,12 @@ void queryTableWithNonPartitionSqlPredicate() { var PTable = new SharedTable(tableName, "default", "share1", deltaTable(tableName)); var DTable = DeltaSharedTable.of(PTable); var request = new ReadTableRequest.ReadTableCurrentVersion( - Optional.of(predicates), Optional.empty(), Optional.empty()); + Optional.of(predicates), + Optional.empty(), + Optional.empty(), + ClientCapabilities.delta(Set.of())); var response = DTable.queryTable(request); - assertEquals(1, response.other().size()); + assertEquals(1, response.filesToBeSigned().size()); } @Test @@ -210,9 +220,12 @@ void queryTableWithJsonPredicates() { predicatesAndExpectedResult.forEach(p -> { var request = new ReadTableRequest.ReadTableCurrentVersion( - Optional.empty(), Optional.of(p.getLeft()), Optional.empty()); + Optional.empty(), + Optional.of(p.getLeft()), + Optional.empty(), + ClientCapabilities.delta(Set.of())); var response = DTable.queryTable(request); - assertEquals(p.getRight(), response.other().size()); + assertEquals(p.getRight(), response.filesToBeSigned().size()); }); } } diff --git a/server/core/src/test/java/io/whitefox/core/services/IcebergAwsSharedTableTest.java b/server/core/src/test/java/io/whitefox/core/services/IcebergAwsSharedTableTest.java index d78efb1d3..0f688cb9a 100644 --- a/server/core/src/test/java/io/whitefox/core/services/IcebergAwsSharedTableTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/IcebergAwsSharedTableTest.java @@ -8,6 +8,7 @@ import io.whitefox.AwsGlueTestConfig; import io.whitefox.S3TestConfig; import io.whitefox.core.SharedTable; +import io.whitefox.core.services.capabilities.ClientCapabilities; import java.util.Optional; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; @@ -30,9 +31,9 @@ void getTableMetadata() { s3IcebergTableWithAwsGlueCatalog( s3TestConfig, awsGlueTestConfig, "test_glue_db", "icebergtable1")); var DTable = icebergTableLoader.loadTable(PTable); - var metadata = DTable.getMetadata(Optional.empty()); - assertTrue(metadata.isPresent()); - assertEquals("7819530050735196523", metadata.get().id()); + var metadataResponse = DTable.getMetadata(Optional.empty(), ClientCapabilities.parquet()); + assertTrue(metadataResponse.isPresent()); + assertEquals("7819530050735196523", metadataResponse.get().metadata().id()); } @Test diff --git a/server/core/src/test/java/io/whitefox/core/services/IcebergSharedTableTest.java b/server/core/src/test/java/io/whitefox/core/services/IcebergSharedTableTest.java index d2251052a..19e92e7e8 100644 --- a/server/core/src/test/java/io/whitefox/core/services/IcebergSharedTableTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/IcebergSharedTableTest.java @@ -6,6 +6,7 @@ import static org.wildfly.common.Assert.assertTrue; import io.whitefox.core.SharedTable; +import io.whitefox.core.services.capabilities.ClientCapabilities; import java.util.Optional; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; @@ -25,9 +26,9 @@ void getTableMetadata() { "share1", icebergTableWithHadoopCatalog("test_db", "icebergtable1")); var DTable = icebergTableLoader.loadTable(PTable); - var metadata = DTable.getMetadata(Optional.empty()); - assertTrue(metadata.isPresent()); - assertEquals("3369848726892806393", metadata.get().id()); + var metadataResponse = DTable.getMetadata(Optional.empty(), ClientCapabilities.parquet()); + assertTrue(metadataResponse.isPresent()); + assertEquals("3369848726892806393", metadataResponse.get().metadata().id()); } @Test @@ -38,9 +39,10 @@ void getTableMetadataWithTimestamp() { "share1", icebergTableWithHadoopCatalog("test_db", "icebergtable2")); var DTable = icebergTableLoader.loadTable(PTable); - var metadata = DTable.getMetadata(TestDateUtils.parseTimestamp("2024-01-25T01:32:15+01:00")); - assertTrue(metadata.isPresent()); - assertEquals("2174306913745765008", metadata.get().id()); + var metadataResponse = DTable.getMetadata( + TestDateUtils.parseTimestamp("2024-01-25T01:32:15+01:00"), ClientCapabilities.parquet()); + assertTrue(metadataResponse.isPresent()); + assertEquals("2174306913745765008", metadataResponse.get().metadata().id()); } @Test diff --git a/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java b/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java index fa6cb3d56..2f6b0784c 100644 --- a/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java +++ b/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java @@ -2,10 +2,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; +import io.whitefox.core.FileFormat; import io.whitefox.core.Metadata; import io.whitefox.core.PredicateUtils; import io.whitefox.core.TableSchema; -import io.whitefox.core.services.capabilities.ResponseFormat; import io.whitefox.core.types.DateType; import io.whitefox.core.types.StructField; import io.whitefox.core.types.StructType; @@ -37,7 +37,7 @@ void testParsingOfInvalidSql() { "id", Optional.empty(), Optional.empty(), - ResponseFormat.parquet, + FileFormat.parquet, new TableSchema( new StructType(List.of(new StructField("date", DateType.DATE, true, Map.of())))), List.of("date", "age"), @@ -61,7 +61,7 @@ void testParsingOfSqlEqual() throws PredicateException { "id", Optional.empty(), Optional.empty(), - ResponseFormat.parquet, + FileFormat.parquet, new TableSchema( new StructType(List.of(new StructField("date", DateType.DATE, true, Map.of())))), List.of("date", "age"), diff --git a/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/.part-00000-e3e34531-8c96-46ec-acfd-4a530b1b6232-c000.snappy.parquet.crc b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/.part-00000-e3e34531-8c96-46ec-acfd-4a530b1b6232-c000.snappy.parquet.crc new file mode 100644 index 000000000..a62949e71 Binary files /dev/null and b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/.part-00000-e3e34531-8c96-46ec-acfd-4a530b1b6232-c000.snappy.parquet.crc differ diff --git a/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/.00000000000000000000.json.crc b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 000000000..f4f20c91f Binary files /dev/null and b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/.00000000000000000000.json.crc differ diff --git a/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/.00000000000000000001.json.crc b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/.00000000000000000001.json.crc new file mode 100644 index 000000000..81a32bb4c Binary files /dev/null and b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/.00000000000000000001.json.crc differ diff --git a/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/00000000000000000000.json b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..216e118da --- /dev/null +++ b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1709400373976,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"527"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"78af8598-9b1a-4e5d-b657-ae0331f84fb6"}} +{"metaData":{"id":"930f6f27-1b35-4d5e-a72d-7279a2f9d851","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1709400373821}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"add":{"path":"part-00000-e3e34531-8c96-46ec-acfd-4a530b1b6232-c000.snappy.parquet","partitionValues":{},"size":527,"modificationTime":1709400373911,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"id\":20},\"maxValues\":{\"id\":29},\"nullCount\":{\"id\":0}}"}} diff --git a/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/00000000000000000001.json b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/00000000000000000001.json new file mode 100644 index 000000000..9ee891f72 --- /dev/null +++ b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/_delta_log/00000000000000000001.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1709400379062,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.feature.deletionvectors\":\"supported\"}"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"6fe1dd9e-cd19-4fb4-9588-9c04e2d8dc82"}} +{"metaData":{"id":"930f6f27-1b35-4d5e-a72d-7279a2f9d851","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1709400373821}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["appendOnly","invariants","deletionVectors"]}} diff --git a/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/part-00000-e3e34531-8c96-46ec-acfd-4a530b1b6232-c000.snappy.parquet b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/part-00000-e3e34531-8c96-46ec-acfd-4a530b1b6232-c000.snappy.parquet new file mode 100644 index 000000000..9abe262ed Binary files /dev/null and b/server/core/src/testFixtures/resources/delta/samples/delta-table-v2/part-00000-e3e34531-8c96-46ec-acfd-4a530b1b6232-c000.snappy.parquet differ