diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md index e9382e701e49..5e9c6e5d1147 100644 --- a/docs/docs/spark-configuration.md +++ b/docs/docs/spark-configuration.md @@ -78,7 +78,7 @@ Both catalogs are configured using properties nested under the catalog name. Com | spark.sql.catalog._catalog-name_.table-default._propertyKey_ | | Default Iceberg table property value for property key _propertyKey_, which will be set on tables created by this catalog if not overridden | | spark.sql.catalog._catalog-name_.table-override._propertyKey_ | | Enforced Iceberg table property value for property key _propertyKey_, which cannot be overridden by user | -Additional properties can be found in common [catalog configuration](configuration.md#catalog-properties). +Additional properties can be found in common [catalog configuration](../configuration.md#catalog-properties). ### Using catalogs diff --git a/docs/docs/spark-ddl.md b/docs/docs/spark-ddl.md index ea85d5c9919a..e1376ddcf667 100644 --- a/docs/docs/spark-ddl.md +++ b/docs/docs/spark-ddl.md @@ -33,14 +33,14 @@ CREATE TABLE prod.db.sample ( USING iceberg; ``` -Iceberg will convert the column type in Spark to corresponding Iceberg type. Please check the section of [type compatibility on creating table](spark-writes.md#spark-type-to-iceberg-type) for details. +Iceberg will convert the column type in Spark to corresponding Iceberg type. Please check the section of [type compatibility on creating table](../spark-getting-started.md#spark-type-to-iceberg-type) for details. Table create commands, including CTAS and RTAS, support the full range of Spark create clauses, including: * `PARTITIONED BY (partition-expressions)` to configure partitioning * `LOCATION '(fully-qualified-uri)'` to set the table location * `COMMENT 'table documentation'` to set a table description -* `TBLPROPERTIES ('key'='value', ...)` to set [table configuration](configuration.md) +* `TBLPROPERTIES ('key'='value', ...)` to set [table configuration](../configuration.md) Create commands may also set the default format with the `USING` clause. This is only supported for `SparkCatalog` because Spark handles the `USING` clause differently for the built-in catalog. @@ -59,7 +59,7 @@ USING iceberg PARTITIONED BY (category); ``` -The `PARTITIONED BY` clause supports transform expressions to create [hidden partitions](partitioning.md). +The `PARTITIONED BY` clause supports transform expressions to create [hidden partitions](../partitioning.md). ```sql CREATE TABLE prod.db.sample ( @@ -86,7 +86,7 @@ Note: Old syntax of `years(ts)`, `months(ts)`, `days(ts)` and `hours(ts)` are al ## `CREATE TABLE ... AS SELECT` -Iceberg supports CTAS as an atomic operation when using a [`SparkCatalog`](spark-configuration.md#catalog-configuration). CTAS is supported, but is not atomic when using [`SparkSessionCatalog`](spark-configuration.md#replacing-the-session-catalog). +Iceberg supports CTAS as an atomic operation when using a [`SparkCatalog`](../spark-configuration.md#catalog-configuration). CTAS is supported, but is not atomic when using [`SparkSessionCatalog`](../spark-configuration.md#replacing-the-session-catalog). ```sql CREATE TABLE prod.db.sample @@ -106,7 +106,7 @@ AS SELECT ... ## `REPLACE TABLE ... AS SELECT` -Iceberg supports RTAS as an atomic operation when using a [`SparkCatalog`](spark-configuration.md#catalog-configuration). RTAS is supported, but is not atomic when using [`SparkSessionCatalog`](spark-configuration.md#replacing-the-session-catalog). +Iceberg supports RTAS as an atomic operation when using a [`SparkCatalog`](../spark-configuration.md#catalog-configuration). RTAS is supported, but is not atomic when using [`SparkSessionCatalog`](../spark-configuration.md#replacing-the-session-catalog). Atomic table replacement creates a new snapshot with the results of the `SELECT` query, but keeps table history. @@ -168,7 +168,7 @@ Iceberg has full `ALTER TABLE` support in Spark 3, including: * Widening the type of `int`, `float`, and `decimal` fields * Making required columns optional -In addition, [SQL extensions](spark-configuration.md#sql-extensions) can be used to add support for partition evolution and setting a table's write order +In addition, [SQL extensions](../spark-configuration.md#sql-extensions) can be used to add support for partition evolution and setting a table's write order ### `ALTER TABLE ... RENAME TO` @@ -184,7 +184,7 @@ ALTER TABLE prod.db.sample SET TBLPROPERTIES ( ); ``` -Iceberg uses table properties to control table behavior. For a list of available properties, see [Table configuration](configuration.md). +Iceberg uses table properties to control table behavior. For a list of available properties, see [Table configuration](../configuration.md). `UNSET` is used to remove properties: @@ -325,7 +325,7 @@ ALTER TABLE prod.db.sample DROP COLUMN point.z; ## `ALTER TABLE` SQL extensions -These commands are available in Spark 3 when using Iceberg [SQL extensions](spark-configuration.md#sql-extensions). +These commands are available in Spark 3 when using Iceberg [SQL extensions](../spark-configuration.md#sql-extensions). ### `ALTER TABLE ... ADD PARTITION FIELD` diff --git a/docs/docs/spark-getting-started.md b/docs/docs/spark-getting-started.md index d9dd70e4fa33..72642cc6e14f 100644 --- a/docs/docs/spark-getting-started.md +++ b/docs/docs/spark-getting-started.md @@ -40,7 +40,7 @@ spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{{ iceb ### Adding catalogs -Iceberg comes with [catalogs](spark-configuration.md#catalogs) that enable SQL commands to manage tables and load them by name. Catalogs are configured using properties under `spark.sql.catalog.(catalog_name)`. +Iceberg comes with [catalogs](../spark-configuration.md#catalogs) that enable SQL commands to manage tables and load them by name. Catalogs are configured using properties under `spark.sql.catalog.(catalog_name)`. This command creates a path-based catalog named `local` for tables under `$PWD/warehouse` and adds support for Iceberg tables to Spark's built-in catalog: @@ -56,7 +56,7 @@ spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{{ iceber ### Creating a table -To create your first Iceberg table in Spark, use the `spark-sql` shell or `spark.sql(...)` to run a [`CREATE TABLE`](spark-ddl.md#create-table) command: +To create your first Iceberg table in Spark, use the `spark-sql` shell or `spark.sql(...)` to run a [`CREATE TABLE`](../spark-ddl.md#create-table) command: ```sql -- local is the path-based catalog defined above @@ -65,21 +65,21 @@ CREATE TABLE local.db.table (id bigint, data string) USING iceberg; Iceberg catalogs support the full range of SQL DDL commands, including: -* [`CREATE TABLE ... PARTITIONED BY`](spark-ddl.md#create-table) -* [`CREATE TABLE ... AS SELECT`](spark-ddl.md#create-table-as-select) -* [`ALTER TABLE`](spark-ddl.md#alter-table) -* [`DROP TABLE`](spark-ddl.md#drop-table) +* [`CREATE TABLE ... PARTITIONED BY`](../spark-ddl.md#create-table) +* [`CREATE TABLE ... AS SELECT`](../spark-ddl.md#create-table-as-select) +* [`ALTER TABLE`](../spark-ddl.md#alter-table) +* [`DROP TABLE`](../spark-ddl.md#drop-table) ### Writing -Once your table is created, insert data using [`INSERT INTO`](spark-writes.md#insert-into): +Once your table is created, insert data using [`INSERT INTO`](../spark-writes.md#insert-into): ```sql INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c'); INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1; ``` -Iceberg also adds row-level SQL updates to Spark, [`MERGE INTO`](spark-writes.md#merge-into) and [`DELETE FROM`](spark-writes.md#delete-from): +Iceberg also adds row-level SQL updates to Spark, [`MERGE INTO`](../spark-writes.md#merge-into) and [`DELETE FROM`](../spark-writes.md#delete-from): ```sql MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id @@ -87,7 +87,7 @@ WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count WHEN NOT MATCHED THEN INSERT *; ``` -Iceberg supports writing DataFrames using the new [v2 DataFrame write API](spark-writes.md#writing-with-dataframes): +Iceberg supports writing DataFrames using the new [v2 DataFrame write API](../spark-writes.md#writing-with-dataframes): ```scala spark.table("source").select("id", "data") @@ -106,7 +106,7 @@ FROM local.db.table GROUP BY data; ``` -SQL is also the recommended way to [inspect tables](spark-queries.md#inspecting-tables). To view all snapshots in a table, use the `snapshots` metadata table: +SQL is also the recommended way to [inspect tables](../spark-queries.md#inspecting-tables). To view all snapshots in a table, use the `snapshots` metadata table: ```sql SELECT * FROM local.db.table.snapshots; ``` @@ -121,18 +121,78 @@ SELECT * FROM local.db.table.snapshots; +-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+ ``` -[DataFrame reads](spark-queries.md#querying-with-dataframes) are supported and can now reference tables by name using `spark.table`: +[DataFrame reads](../spark-queries.md#querying-with-dataframes) are supported and can now reference tables by name using `spark.table`: ```scala val df = spark.table("local.db.table") df.count() ``` +### Type compatibility + +Spark and Iceberg support different set of types. Iceberg does the type conversion automatically, but not for all combinations, +so you may want to understand the type conversion in Iceberg in prior to design the types of columns in your tables. + +#### Spark type to Iceberg type + +This type conversion table describes how Spark types are converted to the Iceberg types. The conversion applies on both creating Iceberg table and writing to Iceberg table via Spark. + +| Spark | Iceberg | Notes | +|-----------------|----------------------------|-------| +| boolean | boolean | | +| short | integer | | +| byte | integer | | +| integer | integer | | +| long | long | | +| float | float | | +| double | double | | +| date | date | | +| timestamp | timestamp with timezone | | +| timestamp_ntz | timestamp without timezone | | +| char | string | | +| varchar | string | | +| string | string | | +| binary | binary | | +| decimal | decimal | | +| struct | struct | | +| array | list | | +| map | map | | + +!!! info + The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here're some points on write: + + * Iceberg numeric types (`integer`, `long`, `float`, `double`, `decimal`) support promotion during writes. e.g. You can write Spark types `short`, `byte`, `integer`, `long` to Iceberg type `long`. + * You can write to Iceberg `fixed` type using Spark `binary` type. Note that assertion on the length will be performed. + +#### Iceberg type to Spark type + +This type conversion table describes how Iceberg types are converted to the Spark types. The conversion applies on reading from Iceberg table via Spark. + +| Iceberg | Spark | Note | +|----------------------------|-------------------------|---------------| +| boolean | boolean | | +| integer | integer | | +| long | long | | +| float | float | | +| double | double | | +| date | date | | +| time | | Not supported | +| timestamp with timezone | timestamp | | +| timestamp without timezone | timestamp_ntz | | +| string | string | | +| uuid | string | | +| fixed | binary | | +| binary | binary | | +| decimal | decimal | | +| struct | struct | | +| list | array | | +| map | map | | + ### Next steps Next, you can learn more about Iceberg tables in Spark: -* [DDL commands](spark-ddl.md): `CREATE`, `ALTER`, and `DROP` -* [Querying data](spark-queries.md): `SELECT` queries and metadata tables -* [Writing data](spark-writes.md): `INSERT INTO` and `MERGE INTO` -* [Maintaining tables](spark-procedures.md) with stored procedures +* [DDL commands](../spark-ddl.md): `CREATE`, `ALTER`, and `DROP` +* [Querying data](../spark-queries.md): `SELECT` queries and metadata tables +* [Writing data](../spark-writes.md): `INSERT INTO` and `MERGE INTO` +* [Maintaining tables](../spark-procedures.md) with stored procedures diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 6b3cb06c3af7..ee8a8466c70d 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -20,7 +20,7 @@ title: "Procedures" # Spark Procedures -To use Iceberg in Spark, first configure [Spark catalogs](spark-configuration.md). Stored procedures are only available when using [Iceberg SQL extensions](spark-configuration.md#sql-extensions) in Spark 3. +To use Iceberg in Spark, first configure [Spark catalogs](../spark-configuration.md). Stored procedures are only available when using [Iceberg SQL extensions](../spark-configuration.md#sql-extensions) in Spark 3. ## Usage @@ -272,7 +272,7 @@ the `expire_snapshots` procedure will never remove files which are still require | `stream_results` | | boolean | When true, deletion files will be sent to Spark driver by RDD partition (by default, all the files will be sent to Spark driver). This option is recommended to set to `true` to prevent Spark driver OOM from large file size | | `snapshot_ids` | | array of long | Array of snapshot IDs to expire. | -If `older_than` and `retain_last` are omitted, the table's [expiration properties](configuration.md#table-behavior-properties) will be used. +If `older_than` and `retain_last` are omitted, the table's [expiration properties](../configuration.md#table-behavior-properties) will be used. Snapshots that are still referenced by branches or tags won't be removed. By default, branches and tags never expire, but their retention policy can be changed with the table property `history.expire.max-ref-age-ms`. The `main` branch never expires. #### Output @@ -357,7 +357,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile | `partial-progress.max-commits` | 10 | Maximum amount of commits that this rewrite is allowed to produce if partial progress is enabled | | `use-starting-sequence-number` | true | Use the sequence number of the snapshot at compaction start time instead of that of the newly produced snapshot | | `rewrite-job-order` | none | Force the rewrite job order based on the value. | -| `target-file-size-bytes` | 536870912 (512 MB, default value of `write.target-file-size-bytes` from [table properties](configuration.md#write-properties)) | Target output file size | +| `target-file-size-bytes` | 536870912 (512 MB, default value of `write.target-file-size-bytes` from [table properties](../configuration.md#write-properties)) | Target output file size | | `min-file-size-bytes` | 75% of target file size | Files under this threshold will be considered for rewriting regardless of any other criteria | | `max-file-size-bytes` | 180% of target file size | Files with sizes above this threshold will be considered for rewriting regardless of any other criteria | | `min-input-files` | 5 | Any file group exceeding this number of files will be rewritten regardless of other criteria | @@ -480,7 +480,7 @@ Dangling deletes are always filtered out during rewriting. | `partial-progress.enabled` | false | Enable committing groups of files prior to the entire rewrite completing | | `partial-progress.max-commits` | 10 | Maximum amount of commits that this rewrite is allowed to produce if partial progress is enabled | | `rewrite-job-order` | none | Force the rewrite job order based on the value. | -| `target-file-size-bytes` | 67108864 (64MB, default value of `write.delete.target-file-size-bytes` from [table properties](configuration.md#write-properties)) | Target output file size | +| `target-file-size-bytes` | 67108864 (64MB, default value of `write.delete.target-file-size-bytes` from [table properties](../configuration.md#write-properties)) | Target output file size | | `min-file-size-bytes` | 75% of target file size | Files under this threshold will be considered for rewriting regardless of any other criteria | | `max-file-size-bytes` | 180% of target file size | Files with sizes above this threshold will be considered for rewriting regardless of any other criteria | | `min-input-files` | 5 | Any file group exceeding this number of files will be rewritten regardless of other criteria | diff --git a/docs/docs/spark-queries.md b/docs/docs/spark-queries.md index 536c136d7e55..092ed6b1d636 100644 --- a/docs/docs/spark-queries.md +++ b/docs/docs/spark-queries.md @@ -20,11 +20,11 @@ title: "Queries" # Spark Queries -To use Iceberg in Spark, first configure [Spark catalogs](spark-configuration.md). Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. +To use Iceberg in Spark, first configure [Spark catalogs](../spark-configuration.md). Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. ## Querying with SQL -In Spark 3, tables use identifiers that include a [catalog name](spark-configuration.md#using-catalogs). +In Spark 3, tables use identifiers that include a [catalog name](../spark-configuration.md#using-catalogs). ```sql SELECT * FROM prod.db.table; -- catalog: prod, namespace: db, table: table diff --git a/docs/docs/spark-structured-streaming.md b/docs/docs/spark-structured-streaming.md index 0ac753808d9e..50799042073f 100644 --- a/docs/docs/spark-structured-streaming.md +++ b/docs/docs/spark-structured-streaming.md @@ -68,7 +68,7 @@ Iceberg supports `append` and `complete` output modes: * `append`: appends the rows of every micro-batch to the table * `complete`: replaces the table contents every micro-batch -Prior to starting the streaming query, ensure you created the table. Refer to the [SQL create table](spark-ddl.md#create-table) documentation to learn how to create the Iceberg table. +Prior to starting the streaming query, ensure you created the table. Refer to the [SQL create table](../spark-ddl.md#create-table) documentation to learn how to create the Iceberg table. Iceberg doesn't support experimental [continuous processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing), as it doesn't provide the interface to "commit" the output. @@ -76,7 +76,7 @@ Iceberg doesn't support experimental [continuous processing](https://spark.apach Iceberg requires sorting data by partition per task prior to writing the data. In Spark tasks are split by Spark partition. against partitioned table. For batch queries you're encouraged to do explicit sort to fulfill the requirement -(see [here](spark-writes.md#writing-distribution-modes)), but the approach would bring additional latency as +(see [here](../spark-writes.md#writing-distribution-modes)), but the approach would bring additional latency as repartition and sort are considered as heavy operations for streaming workload. To avoid additional latency, you can enable fanout writer to eliminate the requirement. @@ -107,13 +107,13 @@ documents how to configure the interval. ### Expire old snapshots -Each batch written to a table produces a new snapshot. Iceberg tracks snapshots in table metadata until they are expired. Snapshots accumulate quickly with frequent commits, so it is highly recommended that tables written by streaming queries are [regularly maintained](maintenance.md#expire-snapshots). [Snapshot expiration](spark-procedures.md#expire_snapshots) is the procedure of removing the metadata and any data files that are no longer needed. By default, the procedure will expire the snapshots older than five days. +Each batch written to a table produces a new snapshot. Iceberg tracks snapshots in table metadata until they are expired. Snapshots accumulate quickly with frequent commits, so it is highly recommended that tables written by streaming queries are [regularly maintained](../maintenance.md#expire-snapshots). [Snapshot expiration](../spark-procedures.md#expire_snapshots) is the procedure of removing the metadata and any data files that are no longer needed. By default, the procedure will expire the snapshots older than five days. ### Compacting data files -The amount of data written from a streaming process is typically small, which can cause the table metadata to track lots of small files. [Compacting small files into larger files](maintenance.md#compact-data-files) reduces the metadata needed by the table, and increases query efficiency. Iceberg and Spark [comes with the `rewrite_data_files` procedure](spark-procedures.md#rewrite_data_files). +The amount of data written from a streaming process is typically small, which can cause the table metadata to track lots of small files. [Compacting small files into larger files](../maintenance.md#compact-data-files) reduces the metadata needed by the table, and increases query efficiency. Iceberg and Spark [comes with the `rewrite_data_files` procedure](../spark-procedures.md#rewrite_data_files). ### Rewrite manifests To optimize write latency on a streaming workload, Iceberg can write the new snapshot with a "fast" append that does not automatically compact manifests. -This could lead lots of small manifest files. Iceberg can [rewrite the number of manifest files to improve query performance](maintenance.md#rewrite-manifests). Iceberg and Spark [come with the `rewrite_manifests` procedure](spark-procedures.md#rewrite_manifests). +This could lead lots of small manifest files. Iceberg can [rewrite the number of manifest files to improve query performance](../maintenance.md#rewrite-manifests). Iceberg and Spark [come with the `rewrite_manifests` procedure](../spark-procedures.md#rewrite_manifests). diff --git a/docs/docs/spark-writes.md b/docs/docs/spark-writes.md index 1baea04deed5..efc15e7e35fc 100644 --- a/docs/docs/spark-writes.md +++ b/docs/docs/spark-writes.md @@ -20,9 +20,9 @@ title: "Writes" # Spark Writes -To use Iceberg in Spark, first configure [Spark catalogs](spark-configuration.md). +To use Iceberg in Spark, first configure [Spark catalogs](../spark-configuration.md). -Some plans are only available when using [Iceberg SQL extensions](spark-configuration.md#sql-extensions) in Spark 3. +Some plans are only available when using [Iceberg SQL extensions](../spark-configuration.md#sql-extensions) in Spark 3. Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions: @@ -200,7 +200,7 @@ Branch writes can also be performed as part of a write-audit-publish (WAP) workf Note WAP branch and branch identifier cannot both be specified. Also, the branch must exist before performing the write. The operation does **not** create the branch if it does not exist. -For more information on branches please refer to [branches](branching.md). +For more information on branches please refer to [branches](../branching.md). ```sql -- INSERT (1,' a') (2, 'b') into the audit branch. @@ -385,7 +385,7 @@ sort-order. Further division and coalescing of tasks may take place because of When writing data to Iceberg with Spark, it's important to note that Spark cannot write a file larger than a Spark task and a file cannot span an Iceberg partition boundary. This means although Iceberg will always roll over a file -when it grows to [`write.target-file-size-bytes`](configuration.md#write-properties), but unless the Spark task is +when it grows to [`write.target-file-size-bytes`](../configuration.md#write-properties), but unless the Spark task is large enough that will not happen. The size of the file created on disk will also be much smaller than the Spark task since the on disk data will be both compressed and in columnar format as opposed to Spark's uncompressed row representation. This means a 100 megabyte Spark task will create a file much smaller than 100 megabytes even if that @@ -404,63 +404,3 @@ columnar-compressed size, so a larger value than the target file size will need in-memory size to on disk size is data dependent. Future work in Spark should allow Iceberg to automatically adjust this parameter at write time to match the `write.target-file-size-bytes`. -## Type compatibility - -Spark and Iceberg support different set of types. Iceberg does the type conversion automatically, but not for all combinations, -so you may want to understand the type conversion in Iceberg in prior to design the types of columns in your tables. - -### Spark type to Iceberg type - -This type conversion table describes how Spark types are converted to the Iceberg types. The conversion applies on both creating Iceberg table and writing to Iceberg table via Spark. - -| Spark | Iceberg | Notes | -|-----------------|----------------------------|-------| -| boolean | boolean | | -| short | integer | | -| byte | integer | | -| integer | integer | | -| long | long | | -| float | float | | -| double | double | | -| date | date | | -| timestamp | timestamp with timezone | | -| timestamp_ntz | timestamp without timezone | | -| char | string | | -| varchar | string | | -| string | string | | -| binary | binary | | -| decimal | decimal | | -| struct | struct | | -| array | list | | -| map | map | | - -!!! info - The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here're some points on write: - - * Iceberg numeric types (`integer`, `long`, `float`, `double`, `decimal`) support promotion during writes. e.g. You can write Spark types `short`, `byte`, `integer`, `long` to Iceberg type `long`. - * You can write to Iceberg `fixed` type using Spark `binary` type. Note that assertion on the length will be performed. - -### Iceberg type to Spark type - -This type conversion table describes how Iceberg types are converted to the Spark types. The conversion applies on reading from Iceberg table via Spark. - -| Iceberg | Spark | Note | -|----------------------------|-------------------------|---------------| -| boolean | boolean | | -| integer | integer | | -| long | long | | -| float | float | | -| double | double | | -| date | date | | -| time | | Not supported | -| timestamp with timezone | timestamp | | -| timestamp without timezone | timestamp_ntz | | -| string | string | | -| uuid | string | | -| fixed | binary | | -| binary | binary | | -| decimal | decimal | | -| struct | struct | | -| list | array | | -| map | map | | -