diff --git a/edx_prefectutils/__init__.py b/edx_prefectutils/__init__.py index 3c02ce8..d6f20c2 100644 --- a/edx_prefectutils/__init__.py +++ b/edx_prefectutils/__init__.py @@ -2,4 +2,4 @@ Top-level package for edx-prefectutils. """ -__version__ = '2.3.6' +__version__ = '2.3.7' diff --git a/edx_prefectutils/snowflake.py b/edx_prefectutils/snowflake.py index 0e4b379..c409e5a 100644 --- a/edx_prefectutils/snowflake.py +++ b/edx_prefectutils/snowflake.py @@ -430,6 +430,7 @@ def export_snowflake_table_to_s3( escape_unenclosed_field: str = None, null_marker: str = None, binary_format: str = None, + encoding: str = None, overwrite: bool = True, single: bool = False, generate_manifest: bool = False, @@ -459,6 +460,7 @@ def export_snowflake_table_to_s3( null_marker (str, optional): String used to convert SQL NULL. Defaults to None which lets snowflake use its default '\\N' binary_format (str, optional): String to define encoding for binary output + encoding (str, optional): String to define file encoding overwrite (bool, optional): Whether to overwrite existing data in S3. Defaults to `TRUE`. single (bool, optional): Whether to generate a single file in S3. Defaults to `FALSE`. The maximum file size for a single file defaults to 16MB, although that default can be updated by adding a MAX_FILE_SIZE @@ -498,6 +500,9 @@ def export_snowflake_table_to_s3( binary_format_clause = '' if binary_format is None \ else "BINARY_FORMAT = {binary_format}".format( binary_format=binary_format) + encoding_clause = '' if encoding is None \ + else "ENCODING = {encoding}".format( + encoding=encoding) query = """ COPY INTO '{export_path}' @@ -509,6 +514,7 @@ def export_snowflake_table_to_s3( {null_if_clause} {binary_format_clause} COMPRESSION = NONE + {encoding_clause} ) OVERWRITE={overwrite} SINGLE={single} @@ -523,6 +529,7 @@ def export_snowflake_table_to_s3( escape_clause=escape_clause, null_if_clause=null_if_clause, binary_format_clause=binary_format_clause, + encoding_clause=encoding_clause, overwrite=overwrite, single=single, max_file_size=EXPORT_MAX_FILESIZE, diff --git a/tests/test_snowflake.py b/tests/test_snowflake.py index 9af501b..a9f0642 100755 --- a/tests/test_snowflake.py +++ b/tests/test_snowflake.py @@ -327,7 +327,7 @@ def test_export_snowflake_table_to_s3_overwrite(mock_sf_connection): # noqa: F8 mock_cursor.execute.assert_has_calls( [ - mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa + mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa ] ) @@ -356,7 +356,7 @@ def test_export_snowflake_table_to_s3_no_escape(mock_sf_connection): # noqa: F8 mock_cursor.execute.assert_has_calls( [ - mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n \n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa + mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n \n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa ] ) @@ -383,7 +383,7 @@ def test_export_snowflake_table_to_s3_no_enclosure(mock_sf_connection): # noqa: mock_cursor.execute.assert_has_calls( [ - mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' \n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa + mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' \n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa ] ) @@ -410,7 +410,7 @@ def test_export_snowflake_table_to_s3_no_null_if(mock_sf_connection): # noqa: F mock_cursor.execute.assert_has_calls( [ - mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n \n \n COMPRESSION = NONE\n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa + mock.call("\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = NONE\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n \n \n COMPRESSION = NONE\n \n )\n OVERWRITE=True\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n "), # noqa ] ) @@ -472,7 +472,7 @@ def test_export_snowflake_table_to_s3_no_overwrite(mock_sf_connection): # noqa: mock_cursor.execute.assert_has_calls( [ - mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa + mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n \n COMPRESSION = NONE\n \n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa ] ) @@ -501,7 +501,37 @@ def test_export_snowflake_table_to_s3_with_binary_format(mock_sf_connection): # mock_cursor.execute.assert_has_calls( [ - mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n BINARY_FORMAT = UTF8\n COMPRESSION = NONE\n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa + mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n BINARY_FORMAT = UTF8\n COMPRESSION = NONE\n \n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa + ] + ) + + +def test_export_snowflake_table_to_s3_with_encoding(mock_sf_connection): # noqa: F811 + mock_cursor = mock_sf_connection.cursor() + + with Flow("test") as f: + snowflake.export_snowflake_table_to_s3( + sf_credentials={}, + sf_database="test_database", + sf_schema="test_schema", + sf_table="test_table", + sf_role="test_role", + sf_warehouse="test_warehouse", + sf_storage_integration="test_storage_integration", + s3_path="s3://edx-test/test/", + overwrite=False, + enclosed_by='"', + escape_unenclosed_field='\\\\', + null_marker='NULL', + binary_format='UTF8', + encoding='UTF8', + ) + state = f.run() + assert state.is_successful() + + mock_cursor.execute.assert_has_calls( + [ + mock.call("""\n COPY INTO 's3://edx-test/test/test_database-test_schema-test_table/'\n FROM test_database.test_schema.test_table\n STORAGE_INTEGRATION = test_storage_integration\n FILE_FORMAT = ( TYPE = CSV EMPTY_FIELD_AS_NULL = FALSE\n FIELD_DELIMITER = ',' FIELD_OPTIONALLY_ENCLOSED_BY = '"'\n ESCAPE_UNENCLOSED_FIELD = '\\\\'\n NULL_IF = ( 'NULL' )\n BINARY_FORMAT = UTF8\n COMPRESSION = NONE\n ENCODING = UTF8\n )\n OVERWRITE=False\n SINGLE=False\n DETAILED_OUTPUT = TRUE\n MAX_FILE_SIZE = 104857600\n """), # noqa ] )