Skip to content

Commit ca3d50b

Browse files
feat: Add Timestream UNLOAD (#2284)
* feat: Add Timestream UNLOAD * Add unload & unload_to_files * [skip ci] Update api.rst * Read column names from CSV metadata * Add missing parameter to s3.list_objects type stubs * [skip ci] Docstrings * [skip ci] Update scale docs * [skip ci] Update docstrings * [skip ci] Update tutorial * Error handling * [skip ci] Docs formatting fix --------- Co-authored-by: Lucas Hanson <[email protected]>
1 parent 097110f commit ca3d50b

File tree

8 files changed

+445
-15
lines changed

8 files changed

+445
-15
lines changed

awswrangler/s3/_list.pyi

+4
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def list_directories(
6969
def list_objects(
7070
path: str,
7171
chunked: Literal[False],
72+
suffix: Union[str, List[str], None] = ...,
7273
ignore_suffix: Union[str, List[str], None] = ...,
7374
last_modified_begin: Optional[datetime.datetime] = ...,
7475
last_modified_end: Optional[datetime.datetime] = ...,
@@ -79,6 +80,7 @@ def list_objects(
7980
@overload
8081
def list_objects(
8182
path: str,
83+
suffix: Union[str, List[str], None] = ...,
8284
ignore_suffix: Union[str, List[str], None] = ...,
8385
last_modified_begin: Optional[datetime.datetime] = ...,
8486
last_modified_end: Optional[datetime.datetime] = ...,
@@ -90,6 +92,7 @@ def list_objects(
9092
def list_objects(
9193
path: str,
9294
chunked: Literal[True],
95+
suffix: Union[str, List[str], None] = ...,
9396
ignore_suffix: Union[str, List[str], None] = ...,
9497
last_modified_begin: Optional[datetime.datetime] = ...,
9598
last_modified_end: Optional[datetime.datetime] = ...,
@@ -101,6 +104,7 @@ def list_objects(
101104
def list_objects(
102105
path: str,
103106
chunked: bool,
107+
suffix: Union[str, List[str], None] = ...,
104108
ignore_suffix: Union[str, List[str], None] = ...,
105109
last_modified_begin: Optional[datetime.datetime] = ...,
106110
last_modified_end: Optional[datetime.datetime] = ...,

awswrangler/timestream/__init__.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44
from awswrangler.timestream._create import create_database, create_table
55
from awswrangler.timestream._delete import delete_database, delete_table
66
from awswrangler.timestream._list import list_databases, list_tables
7-
from awswrangler.timestream._read import query
8-
from awswrangler.timestream._write import batch_load, batch_load_from_files, wait_batch_load_task, write
7+
from awswrangler.timestream._read import query, unload, unload_to_files
8+
from awswrangler.timestream._write import (
9+
batch_load,
10+
batch_load_from_files,
11+
wait_batch_load_task,
12+
write,
13+
)
914

1015
__all__ = [
1116
"create_database",
@@ -19,4 +24,6 @@
1924
"batch_load",
2025
"batch_load_from_files",
2126
"wait_batch_load_task",
27+
"unload_to_files",
28+
"unload",
2229
]

awswrangler/timestream/_read.py

+283-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
"""Amazon Timestream Read Module."""
22

3+
import json
34
import logging
45
from datetime import datetime
5-
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union, cast
6+
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Literal, Optional, Union, cast
67

78
import boto3
89
import pandas as pd
910
from botocore.config import Config
1011

11-
from awswrangler import _utils
12+
from awswrangler import _utils, exceptions, s3
13+
from awswrangler._config import apply_configs
1214

1315
_logger: logging.Logger = logging.getLogger(__name__)
1416

@@ -113,6 +115,16 @@ def _paginate_query(
113115
rows = []
114116

115117

118+
def _get_column_names_from_metadata(unload_path: str, boto3_session: Optional[boto3.Session] = None) -> List[str]:
119+
client_s3 = _utils.client(service_name="s3", session=boto3_session)
120+
metadata_path = s3.list_objects(path=unload_path, suffix="_metadata.json", boto3_session=boto3_session)[0]
121+
bucket, key = _utils.parse_path(metadata_path)
122+
metadata_content = json.loads(client_s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8"))
123+
columns = [column["Name"] for column in metadata_content["ColumnInfo"]]
124+
_logger.debug("Read %d columns from metadata file in: %s", len(columns), metadata_path)
125+
return columns
126+
127+
116128
def query(
117129
sql: str,
118130
chunked: bool = False,
@@ -155,3 +167,272 @@ def query(
155167
# Modin's concat() can not concatenate empty data frames
156168
return pd.concat(results, ignore_index=True)
157169
return pd.DataFrame()
170+
171+
172+
@_utils.validate_distributed_kwargs(
173+
unsupported_kwargs=["boto3_session", "s3_additional_kwargs"],
174+
)
175+
@apply_configs
176+
def unload(
177+
sql: str,
178+
path: str,
179+
unload_format: Optional[Literal["CSV", "PARQUET"]] = None,
180+
compression: Optional[Literal["GZIP", "NONE"]] = None,
181+
partition_cols: Optional[List[str]] = None,
182+
encryption: Optional[Literal["SSE_KMS", "SSE_S3"]] = None,
183+
kms_key_id: Optional[str] = None,
184+
field_delimiter: Optional[str] = ",",
185+
escaped_by: Optional[str] = "\\",
186+
chunked: Union[bool, int] = False,
187+
keep_files: bool = False,
188+
use_threads: Union[bool, int] = True,
189+
boto3_session: Optional[boto3.Session] = None,
190+
s3_additional_kwargs: Optional[Dict[str, str]] = None,
191+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
192+
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
193+
"""
194+
Unload query results to Amazon S3 and read the results as Pandas Data Frame.
195+
196+
https://docs.aws.amazon.com/timestream/latest/developerguide/export-unload.html
197+
198+
Parameters
199+
----------
200+
sql : str
201+
SQL query
202+
path : str
203+
S3 path to write stage files (e.g. s3://bucket_name/any_name/)
204+
unload_format : str, optional
205+
Format of the unloaded S3 objects from the query.
206+
Valid values: "CSV", "PARQUET". Case sensitive. Defaults to "PARQUET"
207+
compression : str, optional
208+
Compression of the unloaded S3 objects from the query.
209+
Valid values: "GZIP", "NONE". Defaults to "GZIP"
210+
partition_cols : List[str], optional
211+
Specifies the partition keys for the unload operation
212+
encryption : str, optional
213+
Encryption of the unloaded S3 objects from the query.
214+
Valid values: "SSE_KMS", "SSE_S3". Defaults to "SSE_S3"
215+
kms_key_id : str, optional
216+
Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be
217+
used to encrypt data files on Amazon S3
218+
field_delimiter : str, optional
219+
A single ASCII character that is used to separate fields in the output file,
220+
such as pipe character (|), a comma (,), or tab (/t). Only used with CSV format
221+
escaped_by : str, optional
222+
The character that should be treated as an escape character in the data file
223+
written to S3 bucket. Only used with CSV format
224+
chunked : Union[int, bool]
225+
If passed will split the data in a Iterable of DataFrames (Memory friendly).
226+
If `True` awswrangler iterates on the data by files in the most efficient way without guarantee of chunksize.
227+
If an `INTEGER` is passed awswrangler will iterate on the data by number of rows equal the received INTEGER.
228+
keep_files : bool
229+
Should keep stage files?
230+
use_threads : bool, int
231+
True to enable concurrent requests, False to disable multiple threads.
232+
If enabled os.cpu_count() will be used as the max number of threads.
233+
If integer is provided, specified number is used.
234+
boto3_session : boto3.Session(), optional
235+
Boto3 Session. The default boto3 session is used if None
236+
s3_additional_kwargs : Dict[str, str], optional
237+
Forward to botocore requests.
238+
pyarrow_additional_kwargs : Dict[str, Any], optional
239+
Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
240+
Valid values include "split_blocks", "self_destruct", "ignore_metadata".
241+
e.g. pyarrow_additional_kwargs={'split_blocks': True}.
242+
243+
Returns
244+
-------
245+
Union[pandas.DataFrame, Iterator[pandas.DataFrame]]
246+
Result as Pandas DataFrame(s).
247+
248+
Examples
249+
--------
250+
Unload and read as Parquet (default).
251+
252+
>>> import awswrangler as wr
253+
>>> df = wr.timestream.unload(
254+
... sql="SELECT time, measure, dimension FROM database.mytable",
255+
... path="s3://bucket/extracted_parquet_files/",
256+
... )
257+
258+
Unload and read partitioned Parquet. Note: partition columns must be at the end of the table.
259+
260+
>>> import awswrangler as wr
261+
>>> df = wr.timestream.unload(
262+
... sql="SELECT time, measure, dim1, dim2 FROM database.mytable",
263+
... path="s3://bucket/extracted_parquet_files/",
264+
... partition_cols=["dim2"],
265+
... )
266+
267+
Unload and read as CSV.
268+
269+
>>> import awswrangler as wr
270+
>>> df = wr.timestream.unload(
271+
... sql="SELECT time, measure, dimension FROM database.mytable",
272+
... path="s3://bucket/extracted_parquet_files/",
273+
... unload_format="CSV",
274+
... )
275+
276+
"""
277+
path = path if path.endswith("/") else f"{path}/"
278+
279+
if unload_format not in [None, "CSV", "PARQUET"]:
280+
raise exceptions.InvalidArgumentValue("<unload_format> argument must be 'CSV' or 'PARQUET'")
281+
282+
unload_to_files(
283+
sql=sql,
284+
path=path,
285+
unload_format=unload_format,
286+
compression=compression,
287+
partition_cols=partition_cols,
288+
encryption=encryption,
289+
kms_key_id=kms_key_id,
290+
field_delimiter=field_delimiter,
291+
escaped_by=escaped_by,
292+
boto3_session=boto3_session,
293+
)
294+
results_path = f"{path}results/"
295+
try:
296+
if unload_format == "CSV":
297+
column_names: List[str] = _get_column_names_from_metadata(path, boto3_session)
298+
return s3.read_csv(
299+
path=results_path,
300+
header=None,
301+
names=[column for column in column_names if column not in set(partition_cols)]
302+
if partition_cols is not None
303+
else column_names,
304+
dataset=True,
305+
use_threads=use_threads,
306+
boto3_session=boto3_session,
307+
s3_additional_kwargs=s3_additional_kwargs,
308+
)
309+
else:
310+
return s3.read_parquet(
311+
path=results_path,
312+
chunked=chunked,
313+
dataset=True,
314+
use_threads=use_threads,
315+
boto3_session=boto3_session,
316+
s3_additional_kwargs=s3_additional_kwargs,
317+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
318+
)
319+
finally:
320+
if keep_files is False:
321+
_logger.debug("Deleting objects in S3 path: %s", path)
322+
s3.delete_objects(
323+
path=path,
324+
use_threads=use_threads,
325+
boto3_session=boto3_session,
326+
s3_additional_kwargs=s3_additional_kwargs,
327+
)
328+
329+
330+
@_utils.validate_distributed_kwargs(
331+
unsupported_kwargs=["boto3_session"],
332+
)
333+
@apply_configs
334+
def unload_to_files(
335+
sql: str,
336+
path: str,
337+
unload_format: Optional[Literal["CSV", "PARQUET"]] = None,
338+
compression: Optional[Literal["GZIP", "NONE"]] = None,
339+
partition_cols: Optional[List[str]] = None,
340+
encryption: Optional[Literal["SSE_KMS", "SSE_S3"]] = None,
341+
kms_key_id: Optional[str] = None,
342+
field_delimiter: Optional[str] = ",",
343+
escaped_by: Optional[str] = "\\",
344+
boto3_session: Optional[boto3.Session] = None,
345+
) -> None:
346+
"""
347+
Unload query results to Amazon S3.
348+
349+
https://docs.aws.amazon.com/timestream/latest/developerguide/export-unload.html
350+
351+
Parameters
352+
----------
353+
sql : str
354+
SQL query
355+
path : str
356+
S3 path to write stage files (e.g. s3://bucket_name/any_name/)
357+
unload_format : str, optional
358+
Format of the unloaded S3 objects from the query.
359+
Valid values: "CSV", "PARQUET". Case sensitive. Defaults to "PARQUET"
360+
compression : str, optional
361+
Compression of the unloaded S3 objects from the query.
362+
Valid values: "GZIP", "NONE". Defaults to "GZIP"
363+
partition_cols : List[str], optional
364+
Specifies the partition keys for the unload operation
365+
encryption : str, optional
366+
Encryption of the unloaded S3 objects from the query.
367+
Valid values: "SSE_KMS", "SSE_S3". Defaults to "SSE_S3"
368+
kms_key_id : str, optional
369+
Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be
370+
used to encrypt data files on Amazon S3
371+
field_delimiter : str, optional
372+
A single ASCII character that is used to separate fields in the output file,
373+
such as pipe character (|), a comma (,), or tab (/t). Only used with CSV format
374+
escaped_by : str, optional
375+
The character that should be treated as an escape character in the data file
376+
written to S3 bucket. Only used with CSV format
377+
boto3_session : boto3.Session(), optional
378+
Boto3 Session. The default boto3 session is used if None
379+
380+
Returns
381+
-------
382+
None
383+
384+
Examples
385+
--------
386+
Unload and read as Parquet (default).
387+
388+
>>> import awswrangler as wr
389+
>>> wr.timestream.unload_to_files(
390+
... sql="SELECT time, measure, dimension FROM database.mytable",
391+
... path="s3://bucket/extracted_parquet_files/",
392+
... )
393+
394+
Unload and read partitioned Parquet. Note: partition columns must be at the end of the table.
395+
396+
>>> import awswrangler as wr
397+
>>> wr.timestream.unload_to_files(
398+
... sql="SELECT time, measure, dim1, dim2 FROM database.mytable",
399+
... path="s3://bucket/extracted_parquet_files/",
400+
... partition_cols=["dim2"],
401+
... )
402+
403+
Unload and read as CSV.
404+
405+
>>> import awswrangler as wr
406+
>>> wr.timestream.unload_to_files(
407+
... sql="SELECT time, measure, dimension FROM database.mytable",
408+
... path="s3://bucket/extracted_parquet_files/",
409+
... unload_format="CSV",
410+
... )
411+
412+
"""
413+
timestream_client = _utils.client(service_name="timestream-query", session=boto3_session)
414+
415+
partitioned_by_str: str = (
416+
f"""partitioned_by = ARRAY [{','.join([f"'{col}'" for col in partition_cols])}],\n"""
417+
if partition_cols is not None
418+
else ""
419+
)
420+
kms_key_id_str: str = f"kms_key = '{kms_key_id}',\n" if kms_key_id is not None else ""
421+
field_delimiter_str: str = f"field_delimiter = '{field_delimiter}',\n" if unload_format == "CSV" else ""
422+
escaped_by_str: str = f"escaped_by = '{escaped_by}',\n" if unload_format == "CSV" else ""
423+
424+
sql = (
425+
f"UNLOAD ({sql})\n"
426+
f"TO '{path}'\n"
427+
f"WITH (\n"
428+
f"{partitioned_by_str}"
429+
f"format='{unload_format or 'PARQUET'}',\n"
430+
f"compression='{compression or 'GZIP'}',\n"
431+
f"{field_delimiter_str}"
432+
f"{escaped_by_str}"
433+
f"{kms_key_id_str}"
434+
f"encryption='{encryption or 'SSE_S3'}'\n"
435+
f")"
436+
)
437+
438+
timestream_client.query(QueryString=sql)

0 commit comments

Comments
 (0)