Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion edx_prefectutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Top-level package for edx-prefectutils.
"""

__version__ = '2.3.6'
__version__ = '2.3.7'
7 changes: 7 additions & 0 deletions edx_prefectutils/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ def export_snowflake_table_to_s3(
escape_unenclosed_field: str = None,
null_marker: str = None,
binary_format: str = None,
encoding: str = None,
overwrite: bool = True,
single: bool = False,
generate_manifest: bool = False,
Expand Down Expand Up @@ -459,6 +460,7 @@ def export_snowflake_table_to_s3(
null_marker (str, optional): String used to convert SQL NULL. Defaults to None which lets snowflake
use its default '\\N'
binary_format (str, optional): String to define encoding for binary output
encoding (str, optional): String to define file encoding
overwrite (bool, optional): Whether to overwrite existing data in S3. Defaults to `TRUE`.
single (bool, optional): Whether to generate a single file in S3. Defaults to `FALSE`. The maximum file size
for a single file defaults to 16MB, although that default can be updated by adding a MAX_FILE_SIZE
Expand Down Expand Up @@ -498,6 +500,9 @@ def export_snowflake_table_to_s3(
binary_format_clause = '' if binary_format is None \
else "BINARY_FORMAT = {binary_format}".format(
binary_format=binary_format)
encoding_clause = '' if encoding is None \
else "ENCODING = {encoding}".format(
encoding=encoding)

query = """
COPY INTO '{export_path}'
Expand All @@ -509,6 +514,7 @@ def export_snowflake_table_to_s3(
{null_if_clause}
{binary_format_clause}
COMPRESSION = NONE
{encoding_clause}
)
OVERWRITE={overwrite}
SINGLE={single}
Expand All @@ -523,6 +529,7 @@ def export_snowflake_table_to_s3(
escape_clause=escape_clause,
null_if_clause=null_if_clause,
binary_format_clause=binary_format_clause,
encoding_clause=encoding_clause,
overwrite=overwrite,
single=single,
max_file_size=EXPORT_MAX_FILESIZE,
Expand Down
42 changes: 36 additions & 6 deletions tests/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def test_export_snowflake_table_to_s3_overwrite(mock_sf_connection): # noqa: F8

mock_cursor.execute.assert_has_calls(
[
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
]
)

Expand Down Expand Up @@ -356,7 +356,7 @@ def test_export_snowflake_table_to_s3_no_escape(mock_sf_connection): # noqa: F8

mock_cursor.execute.assert_has_calls(
[
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n \n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n \n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
]
)

Expand All @@ -383,7 +383,7 @@ def test_export_snowflake_table_to_s3_no_enclosure(mock_sf_connection): # noqa:

mock_cursor.execute.assert_has_calls(
[
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' \n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' \n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
]
)

Expand All @@ -410,7 +410,7 @@ def test_export_snowflake_table_to_s3_no_null_if(mock_sf_connection): # noqa: F

mock_cursor.execute.assert_has_calls(
[
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n \n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n \n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa
]
)

Expand Down Expand Up @@ -472,7 +472,7 @@ def test_export_snowflake_table_to_s3_no_overwrite(mock_sf_connection): # noqa:

mock_cursor.execute.assert_has_calls(
[
mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa
mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa
]
)

Expand Down Expand Up @@ -501,7 +501,37 @@ def test_export_snowflake_table_to_s3_with_binary_format(mock_sf_connection): #

mock_cursor.execute.assert_has_calls(
[
mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n BINARY_FORMAT = UTF8\n COMPRESSION = NONE\n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa
mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n BINARY_FORMAT = UTF8\n COMPRESSION = NONE\n \n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa
]
)


def test_export_snowflake_table_to_s3_with_encoding(mock_sf_connection): # noqa: F811
mock_cursor = mock_sf_connection.cursor()

with Flow("test") as f:
snowflake.export_snowflake_table_to_s3(
sf_credentials={},
sf_database="test_database",
sf_schema="test_schema",
sf_table="test_table",
sf_role="test_role",
sf_warehouse="test_warehouse",
sf_storage_integration="test_storage_integration",
s3_path="s3://edx-test/test/",
overwrite=False,
enclosed_by='"',
escape_unenclosed_field='\\\\',
null_marker='NULL',
binary_format='UTF8',
encoding='UTF8',
)
state = f.run()
assert state.is_successful()

mock_cursor.execute.assert_has_calls(
[
mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n BINARY_FORMAT = UTF8\n COMPRESSION = NONE\n ENCODING = UTF8\n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa
]
)

Expand Down