Skip to content

Commit e57e08b

Browse files
committed
I/O: Adapter for Apache Iceberg
1 parent 991c1ca commit e57e08b

File tree

6 files changed

+238
-4
lines changed

6 files changed

+238
-4
lines changed

cratedb_toolkit/cli.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from .cmd.tail.cli import cli as tail_cli
1010
from .docs.cli import cli as docs_cli
1111
from .info.cli import cli as info_cli
12-
from .io.cli import cli as io_cli
12+
from .io.cli import cli_load as io_cli_load
13+
from .io.cli import cli_save as io_cli_save
1314
from .query.cli import cli as query_cli
1415
from .settings.cli import cli as settings_cli
1516
from .shell.cli import cli as shell_cli
@@ -30,7 +31,8 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
3031
cli.add_command(cfr_cli, name="cfr")
3132
cli.add_command(cloud_cli, name="cluster")
3233
cli.add_command(docs_cli, name="docs")
33-
cli.add_command(io_cli, name="load")
34+
cli.add_command(io_cli_load, name="load")
35+
cli.add_command(io_cli_save, name="save")
3436
cli.add_command(query_cli, name="query")
3537
cli.add_command(rockset_cli, name="rockset")
3638
cli.add_command(shell_cli, name="shell")

cratedb_toolkit/cluster/core.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
DatabaseAddressMissingError,
2121
OperationFailed,
2222
)
23+
from cratedb_toolkit.io.iceberg import from_iceberg, to_iceberg
2324
from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress
2425
from cratedb_toolkit.util.client import jwt_token_patch
2526
from cratedb_toolkit.util.data import asbool
@@ -569,6 +570,9 @@ def load_table(
569570
else:
570571
raise NotImplementedError("Loading full data via Kinesis not implemented yet")
571572

573+
elif source_url_obj.scheme.startswith("iceberg") or source_url_obj.scheme.endswith("iceberg"):
574+
return from_iceberg(str(source_url_obj), target_url)
575+
572576
elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
573577
if "+cdc" in source_url_obj.scheme:
574578
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")
@@ -599,6 +603,30 @@ def load_table(
599603

600604
return self
601605

606+
def save_table(
607+
self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None
608+
) -> "StandaloneCluster":
609+
"""
610+
Export data from a database table on a standalone CrateDB Server.
611+
612+
Synopsis
613+
--------
614+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
615+
616+
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
617+
ctk load table mongodb://localhost:27017/testdrive/demo
618+
"""
619+
source_url = self.address.dburi
620+
target_url_obj = URL(target.url)
621+
# source_url = source.url
622+
623+
if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
624+
return to_iceberg(source_url, target.url)
625+
else:
626+
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")
627+
628+
return self
629+
602630

603631
class DatabaseCluster:
604632
"""

cratedb_toolkit/io/cli.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
1919
@click.version_option()
2020
@click.pass_context
21-
def cli(ctx: click.Context, verbose: bool, debug: bool):
21+
def cli_load(ctx: click.Context, verbose: bool, debug: bool):
2222
"""
2323
Load data into CrateDB.
2424
"""
2525
return boot_click(ctx, verbose, debug)
2626

2727

28-
@make_command(cli, name="table")
28+
@make_command(cli_load, name="table")
2929
@click.argument("url")
3030
@option_cluster_id
3131
@option_cluster_name
@@ -67,3 +67,60 @@ def load_table(
6767
cluster_url=cluster_url,
6868
)
6969
cluster.load_table(source=source, target=target, transformation=transformation)
70+
71+
72+
@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
73+
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
74+
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
75+
@click.version_option()
76+
@click.pass_context
77+
def cli_save(ctx: click.Context, verbose: bool, debug: bool):
78+
"""
79+
Export data from CrateDB.
80+
"""
81+
return boot_click(ctx, verbose, debug)
82+
83+
84+
@make_command(cli_save, name="table")
85+
@click.argument("url")
86+
@option_cluster_id
87+
@option_cluster_name
88+
@option_cluster_url
89+
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
90+
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
91+
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
92+
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
93+
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
94+
@click.pass_context
95+
def save_table(
96+
ctx: click.Context,
97+
url: str,
98+
cluster_id: str,
99+
cluster_name: str,
100+
cluster_url: str,
101+
schema: str,
102+
table: str,
103+
format_: str,
104+
compression: str,
105+
transformation: t.Union[Path, None],
106+
):
107+
"""
108+
Export data from CrateDB and CrateDB Cloud clusters.
109+
"""
110+
111+
# When `--transformation` is given, but empty, fix it.
112+
if transformation is not None and transformation.name == "":
113+
transformation = None
114+
115+
# Encapsulate source and target parameters.
116+
source = TableAddress(schema=schema, table=table)
117+
target = InputOutputResource(url=url, format=format_, compression=compression)
118+
print("target:", target)
119+
120+
# Dispatch "load table" operation.
121+
cluster = DatabaseCluster.create(
122+
cluster_id=cluster_id,
123+
cluster_name=cluster_name,
124+
cluster_url=cluster_url,
125+
)
126+
cluster.save_table(source=source, target=target, transformation=transformation)

cratedb_toolkit/io/iceberg.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import dataclasses
2+
import logging
3+
4+
import polars as pl
5+
import pyarrow.parquet as pq
6+
import sqlalchemy as sa
7+
from boltons.urlutils import URL
8+
from pyiceberg.catalog import Catalog, load_catalog
9+
from sqlalchemy_cratedb import insert_bulk
10+
11+
from cratedb_toolkit.model import DatabaseAddress
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
CHUNK_SIZE = 75_000
17+
18+
19+
@dataclasses.dataclass
20+
class IcebergAddress:
21+
path: str
22+
catalog: str
23+
table: str
24+
25+
@classmethod
26+
def from_url(cls, url: str):
27+
iceberg_url = URL(url)
28+
if iceberg_url.host == ".":
29+
iceberg_url.path = iceberg_url.path.lstrip("/")
30+
return cls(
31+
path=iceberg_url.path,
32+
catalog=iceberg_url.query_params.get("catalog"),
33+
table=iceberg_url.query_params.get("table"),
34+
)
35+
36+
def load_catalog(self) -> Catalog:
37+
return load_catalog(
38+
self.catalog,
39+
**{
40+
"type": "sql",
41+
"uri": f"sqlite:///{self.path}/pyiceberg_catalog.db",
42+
"warehouse": f"file://{self.path}",
43+
},
44+
)
45+
46+
@property
47+
def identifier(self):
48+
return (self.catalog, self.table)
49+
50+
def load_table(self) -> pl.LazyFrame:
51+
if self.catalog is not None:
52+
catalog = self.load_catalog()
53+
return catalog.load_table(self.identifier).to_polars()
54+
else:
55+
return pl.scan_iceberg(self.path)
56+
57+
58+
def from_iceberg(source_url, cratedb_url, progress: bool = False):
59+
"""
60+
Scan an Iceberg table from local filesystem or object store, and load into CrateDB.
61+
https://docs.pola.rs/api/python/stable/reference/api/polars.scan_iceberg.html
62+
63+
Synopsis
64+
--------
65+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
66+
ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
67+
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
68+
"""
69+
70+
iceberg_address = IcebergAddress.from_url(source_url)
71+
72+
# Parse parameters.
73+
logger.info(
74+
f"Iceberg address: Path: {iceberg_address.path}, catalog: {iceberg_address.catalog}, table: {iceberg_address.table}"
75+
)
76+
77+
cratedb_address = DatabaseAddress.from_string(cratedb_url)
78+
cratedb_url, cratedb_table = cratedb_address.decode()
79+
if cratedb_table.table is None:
80+
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
81+
logger.info(f"Target address: {cratedb_address}")
82+
83+
# Invoke copy operation.
84+
logger.info("Running Iceberg copy")
85+
engine = sa.create_engine(str(cratedb_url))
86+
87+
pl.Config.set_streaming_chunk_size(CHUNK_SIZE)
88+
table = iceberg_address.load_table()
89+
90+
# This conversion to pandas is zero-copy,
91+
# so we can utilize their SQL utils for free.
92+
# https://github.com/pola-rs/polars/issues/7852
93+
# Note: This code also uses the most efficient `insert_bulk` method with CrateDB.
94+
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
95+
table.collect(streaming=True).to_pandas().to_sql(
96+
name=cratedb_table.table,
97+
schema=cratedb_table.schema,
98+
con=engine,
99+
if_exists="replace",
100+
index=False,
101+
chunksize=CHUNK_SIZE,
102+
method=insert_bulk,
103+
)
104+
105+
# Note: This was much slower.
106+
# table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace")
107+
108+
109+
def to_iceberg(source_url, target_url, progress: bool = False):
110+
"""
111+
Synopsis
112+
--------
113+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
114+
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
115+
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
116+
"""
117+
118+
iceberg_address = IcebergAddress.from_url(target_url)
119+
catalog = iceberg_address.load_catalog()
120+
print("catalog:", catalog)
121+
122+
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe
123+
df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet")
124+
125+
# Create a new Iceberg table.
126+
catalog.create_namespace_if_not_exists("default")
127+
table = catalog.create_table_if_not_exists(
128+
"default.taxi_dataset",
129+
schema=df.schema,
130+
)
131+
132+
# Append the dataframe to the table.
133+
table.append(df)
134+
len(table.scan().to_arrow())

doc/io/iceberg/index.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
(iceberg)=
2+
# Apache Iceberg I/O
3+
4+
## About
5+
Import and export data into/from Iceberg tables, for humans and machines.
6+
7+
8+
```{toctree}
9+
:maxdepth: 1
10+
11+
loader
12+
```

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ optional-dependencies.io = [
173173
"fsspec[s3,http]",
174174
"pandas>=1,<2.3",
175175
"polars<1.30",
176+
"pyiceberg[pyarrow,sql-postgres]<0.10",
176177
"sqlalchemy>=2",
177178
"universal-pathlib<0.3",
178179
]

0 commit comments

Comments
 (0)