Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "python",
"TagPrefix": "python/storage/azure-storage-blob",
"Tag": "python/storage/azure-storage-blob_63518ebc38"
"Tag": "python/storage/azure-storage-blob_16c5acad24"
}
223 changes: 179 additions & 44 deletions sdk/storage/azure-storage-blob/tests/test_cpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,49 +686,184 @@ def test_snapshot_blob(self, **kwargs):
assert blob_snapshot is not None
self._teardown(bsc)

# @BlobPreparer()
# @recorded_by_proxy
# def test_append_block_with_rekeying(self, **kwargs):
# storage_account_name = kwargs.pop("storage_account_name")
# storage_account_key = kwargs.pop("storage_account_key")
#
# # Arrange
# bsc = BlobServiceClient(self.account_url(storage_account_name, "blob"), credential=storage_account_key)
# self._setup(bsc)
#
# source_blob_client = bsc.get_blob_client(self.container_name, self.get_resource_name("sourceblob"))
# source_blob_client.create_append_blob(cpk=TEST_ENCRYPTION_KEY)
# source_blob_sas = self.generate_sas(
# generate_blob_sas,
# source_blob_client.account_name,
# source_blob_client.container_name,
# source_blob_client.blob_name,
# snapshot=source_blob_client.snapshot,
# account_key=source_blob_client.credential.account_key,
# permission=BlobSasPermissions(read=True),
# expiry=datetime.utcnow() + timedelta(hours=1)
# )
# source_blob_url = source_blob_client.url + "?" + source_blob_sas
# source_blob_client.upload_blob(self.byte_data, blob_type=BlobType.AppendBlob, cpk=TEST_ENCRYPTION_KEY)
#
# destination_blob_client = self._create_append_blob(bsc, cpk=NEW_TEST_ENCRYPTION_KEY)
#
# # Act
# append_blob_prop = destination_blob_client.append_block_from_url(
# source_blob_url,
# source_offset=0,
# source_length=10 * 1024,
# cpk=NEW_TEST_ENCRYPTION_KEY,
# source_cpk=TEST_ENCRYPTION_KEY
# )
#
# # Assert
# assert append_blob_prop is not None
# assert append_blob_prop['etag'] is not None
# assert append_blob_prop['last_modified'] is not None
# assert append_blob_prop['request_server_encrypted']
# assert append_blob_prop['encryption_key_sha256'] == NEW_TEST_ENCRYPTION_KEY.key_hash
#
# self._teardown(bsc)
@BlobPreparer()
@recorded_by_proxy
def test_append_block_from_url_with_rekeying(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

# Arrange
bsc = BlobServiceClient(self.account_url(storage_account_name, "blob"), credential=storage_account_key)
self._setup(bsc)

source_blob_client = bsc.get_blob_client(self.container_name, self.get_resource_name("sourceblob"))
source_blob_client.upload_blob(self.byte_data, blob_type=BlobType.APPENDBLOB, cpk=TEST_ENCRYPTION_KEY)
source_blob_sas = self.generate_sas(
generate_blob_sas,
source_blob_client.account_name,
source_blob_client.container_name,
source_blob_client.blob_name,
account_key=source_blob_client.credential.account_key,
permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1)
)
source_blob_url = source_blob_client.url + "?" + source_blob_sas

destination_blob_client = self._create_append_blob(bsc, cpk=NEW_TEST_ENCRYPTION_KEY)

# Act
props = destination_blob_client.append_block_from_url(
source_blob_url,
source_offset=0,
source_length=len(self.byte_data),
cpk=NEW_TEST_ENCRYPTION_KEY,
source_cpk=TEST_ENCRYPTION_KEY
)

# Assert
assert props is not None
assert props['etag'] is not None
assert props['last_modified'] is not None
assert props['request_server_encrypted']

if self.is_live:
assert props['encryption_key_sha256'] == NEW_TEST_ENCRYPTION_KEY.key_hash

self._teardown(bsc)

@BlobPreparer()
@recorded_by_proxy
def test_upload_blob_from_url_with_rekeying(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

# Arrange
bsc = BlobServiceClient(self.account_url(storage_account_name, "blob"), credential=storage_account_key)
self._setup(bsc)

source_blob_client = bsc.get_blob_client(self.container_name, self.get_resource_name("sourceblob"))
source_blob_client.upload_blob(self.byte_data, cpk=TEST_ENCRYPTION_KEY)
source_blob_sas = self.generate_sas(
generate_blob_sas,
source_blob_client.account_name,
source_blob_client.container_name,
source_blob_client.blob_name,
account_key=source_blob_client.credential.account_key,
permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1)
)
source_blob_url = source_blob_client.url + "?" + source_blob_sas

destination_blob_client, _ = self._create_block_blob(bsc, cpk=NEW_TEST_ENCRYPTION_KEY)

# Act
props = destination_blob_client.upload_blob_from_url(
source_blob_url,
overwrite=True,
cpk=NEW_TEST_ENCRYPTION_KEY,
source_cpk=TEST_ENCRYPTION_KEY
)

# Assert
assert props is not None
assert props['etag'] is not None
assert props['last_modified'] is not None
assert props['request_server_encrypted']

if self.is_live:
assert props['encryption_key_sha256'] == NEW_TEST_ENCRYPTION_KEY.key_hash

self._teardown(bsc)

@BlobPreparer()
@recorded_by_proxy
def test_stage_block_from_url_with_rekeying(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

# Arrange
bsc = BlobServiceClient(self.account_url(storage_account_name, "blob"), credential=storage_account_key)
self._setup(bsc)

source_blob_client = bsc.get_blob_client(self.container_name, self.get_resource_name("sourceblob"))
source_blob_client.upload_blob(self.byte_data, cpk=TEST_ENCRYPTION_KEY)
source_blob_sas = self.generate_sas(
generate_blob_sas,
source_blob_client.account_name,
source_blob_client.container_name,
source_blob_client.blob_name,
account_key=source_blob_client.credential.account_key,
permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1)
)
source_blob_url = source_blob_client.url + "?" + source_blob_sas

destination_blob_client, _ = self._create_block_blob(bsc, cpk=NEW_TEST_ENCRYPTION_KEY)

# Act
block_id = '1'
props = destination_blob_client.stage_block_from_url(
block_id,
source_blob_url,
source_offset=0,
source_length=len(self.byte_data),
cpk=NEW_TEST_ENCRYPTION_KEY,
source_cpk=TEST_ENCRYPTION_KEY
)

# Assert
assert props is not None
assert props['request_server_encrypted']

if self.is_live:
assert props['encryption_key_sha256'] == NEW_TEST_ENCRYPTION_KEY.key_hash

self._teardown(bsc)

@BlobPreparer()
@recorded_by_proxy
def test_upload_pages_from_url_with_rekeying(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

# Arrange
bsc = BlobServiceClient(self.account_url(storage_account_name, "blob"), credential=storage_account_key)
self._setup(bsc)

source_blob_client = bsc.get_blob_client(self.container_name, self.get_resource_name("sourceblob"))
source_blob_client.upload_blob(self.byte_data, blob_type=BlobType.PAGEBLOB, cpk=TEST_ENCRYPTION_KEY)
source_blob_sas = self.generate_sas(
generate_blob_sas,
source_blob_client.account_name,
source_blob_client.container_name,
source_blob_client.blob_name,
account_key=source_blob_client.credential.account_key,
permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1)
)
source_blob_url = source_blob_client.url + "?" + source_blob_sas

destination_blob_client = self._create_page_blob(bsc, cpk=NEW_TEST_ENCRYPTION_KEY)

# Act
props = destination_blob_client.upload_pages_from_url(
source_blob_url,
offset=0,
length=len(self.byte_data),
source_offset=0,
cpk=NEW_TEST_ENCRYPTION_KEY,
source_cpk=TEST_ENCRYPTION_KEY
)

# Assert
assert props is not None
assert props['etag'] is not None
assert props['last_modified'] is not None
assert props['request_server_encrypted']

if self.is_live:
assert props['encryption_key_sha256'] == NEW_TEST_ENCRYPTION_KEY.key_hash

self._teardown(bsc)

# ------------------------------------------------------------------------------
Loading
Loading