diff --git a/sdk/keyvault/azure-keyvault-secrets/CHANGELOG.md b/sdk/keyvault/azure-keyvault-secrets/CHANGELOG.md index be470f9ef2c6..d6bd2c7c6f8b 100644 --- a/sdk/keyvault/azure-keyvault-secrets/CHANGELOG.md +++ b/sdk/keyvault/azure-keyvault-secrets/CHANGELOG.md @@ -10,6 +10,14 @@ ### Other Changes +- Refactored package structure to use generation-focused design with `_patch.py` patching layers + - Public API remains unchanged and fully backward compatible + - Internal structure now matches `azure-keyvault-securitydomain` pattern + - Models moved to `models/` subdirectory with patch-based imports + - Client implementation moved to root `_patch.py` + - Created `_internal/` directory for utilities + - Updated `__init__.py` files to use patch-based import pattern + ## 4.10.0 (2025-06-16) ### Features Added diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/__init__.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/__init__.py index ec1b5aaa0651..6955950d67da 100644 --- a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/__init__.py +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/__init__.py @@ -2,18 +2,39 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ -from ._models import DeletedSecret, KeyVaultSecret, KeyVaultSecretIdentifier, SecretProperties -from ._shared.client_base import ApiVersion -from ._client import SecretClient +# pylint: disable=wrong-import-position + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._patch import * # pylint: disable=unused-wildcard-import + +from ._version import VERSION + +__version__ = VERSION + +try: + from ._patch import __all__ as _patch_all + from ._patch import * # type: ignore +except ImportError: + _patch_all = [] +from ._patch import patch_sdk as _patch_sdk + +from .models import ( # type: ignore + DeletedSecret, + KeyVaultSecret, + KeyVaultSecretIdentifier, + SecretProperties, +) __all__ = [ "ApiVersion", "SecretClient", + "DeletedSecret", "KeyVaultSecret", "KeyVaultSecretIdentifier", "SecretProperties", - "DeletedSecret" ] +__all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore -from ._version import VERSION -__version__ = VERSION +_patch_sdk() diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_internal/__init__.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_internal/__init__.py new file mode 100644 index 000000000000..994b28714f85 --- /dev/null +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_internal/__init__.py @@ -0,0 +1,28 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +"""Internal utilities for azure-keyvault-secrets.""" + +from .._shared.challenge_auth_policy import ChallengeAuthPolicy +from .._shared._polling import DeleteRecoverPollingMethod, KeyVaultOperationPoller +from .._shared import parse_key_vault_id + +__all__ = [ + "ChallengeAuthPolicy", + "DeleteRecoverPollingMethod", + "KeyVaultOperationPoller", + "parse_key_vault_id", +] + +try: + from .._shared.async_challenge_auth_policy import AsyncChallengeAuthPolicy + from .._shared._polling_async import AsyncKeyVaultOperationPoller, AsyncDeleteRecoverPollingMethod + + __all__.extend([ + "AsyncChallengeAuthPolicy", + "AsyncKeyVaultOperationPoller", + "AsyncDeleteRecoverPollingMethod", + ]) +except (SyntaxError, ImportError): + pass diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_operations/__init__.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_operations/__init__.py new file mode 100644 index 000000000000..eeab7e3523db --- /dev/null +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_operations/__init__.py @@ -0,0 +1,19 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------- +# pylint: disable=wrong-import-position + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._patch import * # pylint: disable=unused-wildcard-import + +from ._patch import __all__ as _patch_all +from ._patch import * +from ._patch import patch_sdk as _patch_sdk + +__all__ = [] +__all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore +_patch_sdk() diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_operations/_patch.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_operations/_patch.py new file mode 100644 index 000000000000..8bcb627aa475 --- /dev/null +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_operations/_patch.py @@ -0,0 +1,21 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------- +"""Customize generated code here. + +Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize +""" +from typing import List + +__all__: List[str] = [] # Add all objects you want publicly available to users at this package level + + +def patch_sdk(): + """Do not remove from this file. + + `patch_sdk` is a last resort escape hatch that allows you to do customizations + you can't accomplish using the techniques described in + https://aka.ms/azsdk/python/dpcodegen/python/customize + """ diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_patch.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_patch.py new file mode 100644 index 000000000000..6e7b06a2ea0d --- /dev/null +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/_patch.py @@ -0,0 +1,633 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------- +"""Customize generated code here. + +Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize +""" +from copy import deepcopy +from datetime import datetime +from enum import Enum +from functools import partial +from typing import Any, cast, Dict, List, Optional +from urllib.parse import urlparse + +from azure.core import CaseInsensitiveEnumMeta +from azure.core.credentials import TokenCredential +from azure.core.paging import ItemPaged +from azure.core.pipeline.policies import HttpLoggingPolicy +from azure.core.polling import LROPoller +from azure.core.rest import HttpRequest, HttpResponse +from azure.core.tracing.decorator import distributed_trace + +from ._generated import KeyVaultClient as _KeyVaultClient +from ._generated import models as _models +from ._generated._utils.serialization import Serializer +from ._internal import ChallengeAuthPolicy, DeleteRecoverPollingMethod, KeyVaultOperationPoller +from ._sdk_moniker import SDK_MONIKER +from .models import DeletedSecret, KeyVaultSecret, SecretProperties + +__all__: List[str] = [ + "SecretClient", + "ApiVersion", +] + + +class ApiVersion(str, Enum, metaclass=CaseInsensitiveEnumMeta): + """Key Vault API versions supported by this package""" + + #: this is the default version + V7_6 = "7.6" + V7_5 = "7.5" + V7_4 = "7.4" + V7_3 = "7.3" + V7_2 = "7.2" + V7_1 = "7.1" + V7_0 = "7.0" + V2016_10_01 = "2016-10-01" + + +DEFAULT_VERSION = ApiVersion.V7_6 + +_SERIALIZER = Serializer() +_SERIALIZER.client_side_validation = False + + +def _format_api_version(request: HttpRequest, api_version: str) -> HttpRequest: + """Returns a request copy that includes an api-version query parameter if one wasn't originally present. + + :param request: The HTTP request being sent. + :type request: ~azure.core.rest.HttpRequest + :param str api_version: The service API version that the request should include. + + :returns: A copy of the request that includes an api-version query parameter. + :rtype: ~azure.core.rest.HttpRequest + """ + request_copy = deepcopy(request) + params = {"api-version": api_version} # By default, we want to use the client's API version + query = urlparse(request_copy.url).query + + if query: + request_copy.url = request_copy.url.partition("?")[0] + existing_params = {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]} + params.update(existing_params) # If an api-version was provided, this will overwrite our default + + # Reconstruct the query parameters onto the URL + query_params = [] + for k, v in params.items(): + query_params.append("{}={}".format(k, v)) + query = "?" + "&".join(query_params) + request_copy.url = request_copy.url + query + return request_copy + + +class SecretClient: + """A high-level interface for managing a vault's secrets. + + :param str vault_url: URL of the vault the client will access. This is also called the vault's "DNS Name". + You should validate that this URL references a valid Key Vault resource. See https://aka.ms/azsdk/blog/vault-uri + for details. + :param credential: An object which can provide an access token for the vault, such as a credential from + :mod:`azure.identity` + :type credential: ~azure.core.credentials.TokenCredential + + :keyword api_version: Version of the service API to use. Defaults to the most recent. + :paramtype api_version: ~azure.keyvault.secrets.ApiVersion or str + :keyword bool verify_challenge_resource: Whether to verify the authentication challenge resource matches the Key + Vault domain. Defaults to True. + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START create_secret_client] + :end-before: [END create_secret_client] + :language: python + :caption: Create a new ``SecretClient`` + :dedent: 4 + """ + + # pylint:disable=protected-access + + def __init__(self, vault_url: str, credential: TokenCredential, **kwargs: Any) -> None: + if not credential: + raise ValueError( + "credential should be an object supporting the TokenCredential protocol, " + "such as a credential from azure-identity" + ) + if not vault_url: + raise ValueError("vault_url must be the URL of an Azure Key Vault") + + try: + self.api_version = kwargs.pop("api_version", DEFAULT_VERSION) + # If API version was provided as an enum value, need to make a plain string for 3.11 compatibility + if hasattr(self.api_version, "value"): + self.api_version = self.api_version.value + self._vault_url = vault_url.strip(" /") + + http_logging_policy = HttpLoggingPolicy(**kwargs) + http_logging_policy.allowed_header_names.update( + {"x-ms-keyvault-network-info", "x-ms-keyvault-region", "x-ms-keyvault-service-version"} + ) + + verify_challenge = kwargs.pop("verify_challenge_resource", True) + self._client = _KeyVaultClient( + credential=credential, + vault_base_url=self._vault_url, + api_version=self.api_version, + authentication_policy=ChallengeAuthPolicy(credential, verify_challenge_resource=verify_challenge), + sdk_moniker=SDK_MONIKER, + http_logging_policy=http_logging_policy, + **kwargs, + ) + self._models = _models + except ValueError as exc: + # Ignore pyright error that comes from not identifying ApiVersion as an iterable enum + raise NotImplementedError( + f"This package doesn't support API version '{self.api_version}'. " + + "Supported versions: " + + f"{', '.join(v.value for v in ApiVersion)}" # pyright: ignore[reportGeneralTypeIssues] + ) from exc + + @property + def vault_url(self) -> str: + return self._vault_url + + @distributed_trace + def get_secret(self, name: str, version: Optional[str] = None, **kwargs: Any) -> KeyVaultSecret: + """Get a secret. Requires the secrets/get permission. + + :param str name: The name of the secret + :param str version: (optional) Version of the secret to get. If unspecified, gets the latest version. + + :returns: The fetched secret. + :rtype: ~azure.keyvault.secrets.KeyVaultSecret + + :raises ~azure.core.exceptions.ResourceNotFoundError or ~azure.core.exceptions.HttpResponseError: + the former if the secret doesn't exist; the latter for other errors + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START get_secret] + :end-before: [END get_secret] + :language: python + :caption: Get a secret + :dedent: 8 + """ + bundle = self._client.get_secret( + secret_name=name, + secret_version=version or "", + **kwargs + ) + return KeyVaultSecret._from_secret_bundle(bundle) + + @distributed_trace + def set_secret( + self, + name: str, + value: str, + *, + enabled: Optional[bool] = None, + tags: Optional[Dict[str, str]] = None, + content_type: Optional[str] = None, + not_before: Optional[datetime] = None, + expires_on: Optional[datetime] = None, + **kwargs: Any, + ) -> KeyVaultSecret: + """Set a secret value. If `name` is in use, create a new version of the secret. If not, create a new secret. + + Requires secrets/set permission. + + :param str name: The name of the secret + :param str value: The value of the secret + + :keyword bool enabled: Whether the secret is enabled for use. + :keyword tags: Application specific metadata in the form of key-value pairs. + :paramtype tags: Dict[str, str] or None + :keyword str content_type: An arbitrary string indicating the type of the secret, e.g. 'password' + :keyword ~datetime.datetime not_before: Not before date of the secret in UTC + :keyword ~datetime.datetime expires_on: Expiry date of the secret in UTC + + :returns: The created or updated secret. + :rtype: ~azure.keyvault.secrets.KeyVaultSecret + + :raises ~azure.core.exceptions.HttpResponseError: + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START set_secret] + :end-before: [END set_secret] + :language: python + :caption: Set a secret's value + :dedent: 8 + + """ + if enabled is not None or not_before is not None or expires_on is not None: + attributes = self._models.SecretAttributes( + enabled=enabled, not_before=not_before, expires=expires_on + ) + else: + attributes = None + + parameters = self._models.SecretSetParameters( + value=value, + tags=tags, + content_type=content_type, + secret_attributes=attributes + ) + + bundle = self._client.set_secret( + secret_name=name, + parameters=parameters, + **kwargs + ) + return KeyVaultSecret._from_secret_bundle(bundle) + + @distributed_trace + def update_secret_properties( + self, + name: str, + version: Optional[str] = None, + *, + enabled: Optional[bool] = None, + tags: Optional[Dict[str, str]] = None, + content_type: Optional[str] = None, + not_before: Optional[datetime] = None, + expires_on: Optional[datetime] = None, + **kwargs: Any, + ) -> SecretProperties: + """Update properties of a secret other than its value. Requires secrets/set permission. + + This method updates properties of the secret, such as whether it's enabled, but can't change the secret's + value. Use :func:`set_secret` to change the secret's value. + + :param str name: Name of the secret + :param str version: (optional) Version of the secret to update. If unspecified, the latest version is updated. + + :keyword bool enabled: Whether the secret is enabled for use. + :keyword tags: Application specific metadata in the form of key-value pairs. + :paramtype tags: Dict[str, str] or None + :keyword str content_type: An arbitrary string indicating the type of the secret, e.g. 'password' + :keyword ~datetime.datetime not_before: Not before date of the secret in UTC + :keyword ~datetime.datetime expires_on: Expiry date of the secret in UTC + + :returns: The updated secret properties. + :rtype: ~azure.keyvault.secrets.SecretProperties + + :raises ~azure.core.exceptions.ResourceNotFoundError or ~azure.core.exceptions.HttpResponseError: + the former if the secret doesn't exist; the latter for other errors + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START update_secret] + :end-before: [END update_secret] + :language: python + :caption: Update a secret's attributes + :dedent: 8 + + """ + if enabled is not None or not_before is not None or expires_on is not None: + attributes = self._models.SecretAttributes( + enabled=enabled, not_before=not_before, expires=expires_on + ) + else: + attributes = None + + parameters = self._models.SecretUpdateParameters( + content_type=content_type, + secret_attributes=attributes, + tags=tags, + ) + + bundle = self._client.update_secret( + name, + secret_version=version or "", + parameters=parameters, + **kwargs + ) + return SecretProperties._from_secret_bundle(bundle) # pylint: disable=protected-access + + @distributed_trace + def list_properties_of_secrets(self, **kwargs: Any) -> ItemPaged[SecretProperties]: + """List identifiers and attributes of all secrets in the vault. Requires secrets/list permission. + + List items don't include secret values. Use :func:`get_secret` to get a secret's value. + + :returns: An iterator of secrets, excluding their values + :rtype: ~azure.core.paging.ItemPaged[~azure.keyvault.secrets.SecretProperties] + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START list_secrets] + :end-before: [END list_secrets] + :language: python + :caption: List all secrets + :dedent: 8 + + """ + return self._client.get_secrets( + maxresults=kwargs.pop("max_page_size", None), + cls=lambda objs: [SecretProperties._from_secret_item(x) for x in objs], + **kwargs + ) + + @distributed_trace + def list_properties_of_secret_versions(self, name: str, **kwargs: Any) -> ItemPaged[SecretProperties]: + """List properties of all versions of a secret, excluding their values. Requires secrets/list permission. + + List items don't include secret values. Use :func:`get_secret` to get a secret's value. + + :param str name: Name of the secret + + :returns: An iterator of secrets, excluding their values + :rtype: ~azure.core.paging.ItemPaged[~azure.keyvault.secrets.SecretProperties] + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START list_properties_of_secret_versions] + :end-before: [END list_properties_of_secret_versions] + :language: python + :caption: List all versions of a secret + :dedent: 8 + + """ + return self._client.get_secret_versions( + name, + maxresults=kwargs.pop("max_page_size", None), + cls=lambda objs: [SecretProperties._from_secret_item(x) for x in objs], + **kwargs + ) + + @distributed_trace + def backup_secret(self, name: str, **kwargs: Any) -> bytes: + """Back up a secret in a protected form useable only by Azure Key Vault. Requires secrets/backup permission. + + :param str name: Name of the secret to back up + + :returns: The backup result, in a protected bytes format that can only be used by Azure Key Vault. + :rtype: bytes + + :raises ~azure.core.exceptions.ResourceNotFoundError or ~azure.core.exceptions.HttpResponseError: + the former if the secret doesn't exist; the latter for other errors + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START backup_secret] + :end-before: [END backup_secret] + :language: python + :caption: Back up a secret + :dedent: 8 + + """ + backup_result = self._client.backup_secret(name, **kwargs) + return cast(bytes, backup_result.value) + + @distributed_trace + def restore_secret_backup(self, backup: bytes, **kwargs: Any) -> SecretProperties: + """Restore a backed up secret. Requires the secrets/restore permission. + + :param bytes backup: A secret backup as returned by :func:`backup_secret` + + :returns: The restored secret + :rtype: ~azure.keyvault.secrets.SecretProperties + + :raises ~azure.core.exceptions.ResourceExistsError or ~azure.core.exceptions.HttpResponseError: + the former if the secret's name is already in use; the latter for other errors + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START restore_secret_backup] + :end-before: [END restore_secret_backup] + :language: python + :caption: Restore a backed up secret + :dedent: 8 + + """ + bundle = self._client.restore_secret( + parameters=self._models.SecretRestoreParameters(secret_bundle_backup=backup), + **kwargs + ) + return SecretProperties._from_secret_bundle(bundle) + + @distributed_trace + def begin_delete_secret(self, name: str, **kwargs: Any) -> LROPoller[DeletedSecret]: # pylint:disable=bad-option-value,delete-operation-wrong-return-type + """Delete all versions of a secret. Requires secrets/delete permission. + + When this method returns Key Vault has begun deleting the secret. Deletion may take several seconds in a vault + with soft-delete enabled. This method therefore returns a poller enabling you to wait for deletion to complete. + + :param str name: Name of the secret to delete. + + :returns: A poller for the delete operation. The poller's `result` method returns the + :class:`~azure.keyvault.secrets.DeletedSecret` without waiting for deletion to complete. If the vault has + soft-delete enabled and you want to permanently delete the secret with :func:`purge_deleted_secret`, call + the poller's `wait` method first. It will block until the deletion is complete. The `wait` method requires + secrets/get permission. + :rtype: ~azure.core.polling.LROPoller[~azure.keyvault.secrets.DeletedSecret] + + :raises ~azure.core.exceptions.ResourceNotFoundError or ~azure.core.exceptions.HttpResponseError: + the former if the secret doesn't exist; the latter for other errors + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START delete_secret] + :end-before: [END delete_secret] + :language: python + :caption: Delete a secret + :dedent: 8 + + """ + polling_interval = kwargs.pop("_polling_interval", None) + if polling_interval is None: + polling_interval = 2 + # Ignore pyright warning about return type not being iterable because we use `cls` to return a tuple + pipeline_response, deleted_secret_bundle = self._client.delete_secret( + secret_name=name, + cls=lambda pipeline_response, deserialized, _: (pipeline_response, deserialized), + **kwargs, + ) # pyright: ignore[reportGeneralTypeIssues] + deleted_secret = DeletedSecret._from_deleted_secret_bundle(deleted_secret_bundle) + + command = partial(self.get_deleted_secret, name=name, **kwargs) + polling_method = DeleteRecoverPollingMethod( + # no recovery ID means soft-delete is disabled, in which case we initialize the poller as finished + finished=deleted_secret.recovery_id is None, + pipeline_response=pipeline_response, + command=command, + final_resource=deleted_secret, + interval=polling_interval, + ) + return KeyVaultOperationPoller(polling_method) + + @distributed_trace + def get_deleted_secret(self, name: str, **kwargs: Any) -> DeletedSecret: + """Get a deleted secret. Possible only in vaults with soft-delete enabled. Requires secrets/get permission. + + :param str name: Name of the deleted secret + + :returns: The deleted secret. + :rtype: ~azure.keyvault.secrets.DeletedSecret + + :raises ~azure.core.exceptions.ResourceNotFoundError or ~azure.core.exceptions.HttpResponseError: + the former if the deleted secret doesn't exist; the latter for other errors + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START get_deleted_secret] + :end-before: [END get_deleted_secret] + :language: python + :caption: Get a deleted secret + :dedent: 8 + + """ + bundle = self._client.get_deleted_secret(name, **kwargs) + return DeletedSecret._from_deleted_secret_bundle(bundle) + + @distributed_trace + def list_deleted_secrets(self, **kwargs: Any) -> ItemPaged[DeletedSecret]: + """Lists all deleted secrets. Possible only in vaults with soft-delete enabled. + + Requires secrets/list permission. + + :returns: An iterator of deleted secrets, excluding their values + :rtype: ~azure.core.paging.ItemPaged[~azure.keyvault.secrets.DeletedSecret] + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START list_deleted_secrets] + :end-before: [END list_deleted_secrets] + :language: python + :caption: List deleted secrets + :dedent: 8 + + """ + return self._client.get_deleted_secrets( + maxresults=kwargs.pop("max_page_size", None), + cls=lambda objs: [DeletedSecret._from_deleted_secret_item(x) for x in objs], + **kwargs + ) + + @distributed_trace + def purge_deleted_secret(self, name: str, **kwargs: Any) -> None: + """Permanently deletes a deleted secret. Possible only in vaults with soft-delete enabled. + + Performs an irreversible deletion of the specified secret, without possibility for recovery. The operation is + not available if the :py:attr:`~azure.keyvault.secrets.SecretProperties.recovery_level` does not specify + 'Purgeable'. This method is only necessary for purging a secret before its + :py:attr:`~azure.keyvault.secrets.DeletedSecret.scheduled_purge_date`. + + Requires secrets/purge permission. + + :param str name: Name of the deleted secret to purge + + :returns: None + + :raises ~azure.core.exceptions.HttpResponseError: + + Example: + .. code-block:: python + + # if the vault has soft-delete enabled, purge permanently deletes the secret + # (with soft-delete disabled, begin_delete_secret is permanent) + secret_client.purge_deleted_secret("secret-name") + + """ + self._client.purge_deleted_secret(name, **kwargs) + + @distributed_trace + def begin_recover_deleted_secret(self, name: str, **kwargs: Any) -> LROPoller[SecretProperties]: + """Recover a deleted secret to its latest version. Possible only in a vault with soft-delete enabled. + + Requires the secrets/recover permission. If the vault does not have soft-delete enabled, + :func:`begin_delete_secret` is permanent, and this method will return an error. Attempting to recover a + non-deleted secret will also return an error. When this method returns Key Vault has begun recovering the + secret. Recovery may take several seconds. This method therefore returns a poller enabling you to wait for + recovery to complete. Waiting is only necessary when you want to use the recovered secret in another operation + immediately. + + :param str name: Name of the deleted secret to recover + + :returns: A poller for the recovery operation. The poller's `result` method returns the recovered secret's + :class:`~azure.keyvault.secrets.SecretProperties` without waiting for recovery to complete. If you want to + use the recovered secret immediately, call the poller's `wait` method, which blocks until the secret is + ready to use. The `wait` method requires secrets/get permission. + :rtype: ~azure.core.polling.LROPoller[~azure.keyvault.secrets.SecretProperties] + + :raises ~azure.core.exceptions.HttpResponseError: + + Example: + .. literalinclude:: ../tests/test_samples_secrets.py + :start-after: [START recover_deleted_secret] + :end-before: [END recover_deleted_secret] + :language: python + :caption: Recover a deleted secret + :dedent: 8 + + """ + polling_interval = kwargs.pop("_polling_interval", None) + if polling_interval is None: + polling_interval = 2 + # Ignore pyright warning about return type not being iterable because we use `cls` to return a tuple + pipeline_response, recovered_secret_bundle = self._client.recover_deleted_secret( + secret_name=name, + cls=lambda pipeline_response, deserialized, _: (pipeline_response, deserialized), + **kwargs, + ) # pyright: ignore[reportGeneralTypeIssues] + recovered_secret = SecretProperties._from_secret_bundle(recovered_secret_bundle) + + command = partial(self.get_secret, name=name, **kwargs) + polling_method = DeleteRecoverPollingMethod( + finished=False, + pipeline_response=pipeline_response, + command=command, + final_resource=recovered_secret, + interval=polling_interval, + ) + return KeyVaultOperationPoller(polling_method) + + def __enter__(self) -> "SecretClient": + self._client.__enter__() + return self + + def __exit__(self, *args: Any) -> None: + self._client.__exit__(*args) + + def close(self) -> None: + """Close sockets opened by the client. + + Calling this method is unnecessary when using the client as a context manager. + """ + self._client.close() + + @distributed_trace + def send_request(self, request: HttpRequest, *, stream: bool = False, **kwargs: Any) -> HttpResponse: + """Runs a network request using the client's existing pipeline. + + The request URL can be relative to the vault URL. The service API version used for the request is the same as + the client's unless otherwise specified. This method does not raise if the response is an error; to raise an + exception, call `raise_for_status()` on the returned response object. For more information about how to send + custom requests with this method, see https://aka.ms/azsdk/dpcodegen/python/send_request. + + :param request: The network request you want to make. + :type request: ~azure.core.rest.HttpRequest + + :keyword bool stream: Whether the response payload will be streamed. Defaults to False. + + :return: The response of your network call. Does not do error handling on your response. + :rtype: ~azure.core.rest.HttpResponse + """ + request_copy = _format_api_version(request, self.api_version) + path_format_arguments = { + "vaultBaseUrl": _SERIALIZER.url("vault_base_url", self._vault_url, "str", skip_quote=True), + } + request_copy.url = self._client._client.format_url(request_copy.url, **path_format_arguments) + return self._client._client.send_request(request_copy, stream=stream, **kwargs) + + +def patch_sdk(): + """Do not remove from this file. + + `patch_sdk` is a last resort escape hatch that allows you to do customizations + you can't accomplish using the techniques described in + https://aka.ms/azsdk/python/dpcodegen/python/customize + """ diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/aio/__init__.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/aio/__init__.py index 44967833f2df..e37b2ecd0e1d 100644 --- a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/aio/__init__.py +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/aio/__init__.py @@ -1,7 +1,27 @@ -# ------------------------------------ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. -# ------------------------------------ -from ._client import SecretClient +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------- +# pylint: disable=wrong-import-position -__all__ = ["SecretClient"] +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._patch import * # pylint: disable=unused-wildcard-import + +from ._client import SecretClient # type: ignore + +try: + from ._patch import __all__ as _patch_all + from ._patch import * +except ImportError: + _patch_all = [] +from ._patch import patch_sdk as _patch_sdk + +__all__ = [ + "SecretClient", +] +__all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore + +_patch_sdk() diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/aio/_patch.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/aio/_patch.py new file mode 100644 index 000000000000..8bcb627aa475 --- /dev/null +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/aio/_patch.py @@ -0,0 +1,21 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------- +"""Customize generated code here. + +Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize +""" +from typing import List + +__all__: List[str] = [] # Add all objects you want publicly available to users at this package level + + +def patch_sdk(): + """Do not remove from this file. + + `patch_sdk` is a last resort escape hatch that allows you to do customizations + you can't accomplish using the techniques described in + https://aka.ms/azsdk/python/dpcodegen/python/customize + """ diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/models/__init__.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/models/__init__.py new file mode 100644 index 000000000000..eeab7e3523db --- /dev/null +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/models/__init__.py @@ -0,0 +1,19 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------- +# pylint: disable=wrong-import-position + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._patch import * # pylint: disable=unused-wildcard-import + +from ._patch import __all__ as _patch_all +from ._patch import * +from ._patch import patch_sdk as _patch_sdk + +__all__ = [] +__all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore +_patch_sdk() diff --git a/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/models/_patch.py b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/models/_patch.py new file mode 100644 index 000000000000..ee4251ba34f0 --- /dev/null +++ b/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets/models/_patch.py @@ -0,0 +1,407 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------- +"""Customize generated code here. + +Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize +""" +from datetime import datetime +from typing import Any, Dict, List, Optional, Union + +from .._generated import models as _models +from .._internal import parse_key_vault_id + +__all__: List[str] = [ + "DeletedSecret", + "KeyVaultSecret", + "KeyVaultSecretIdentifier", + "SecretProperties", +] + + +class SecretProperties(object): + """A secret's ID and attributes.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + self._attributes: Optional[_models.SecretAttributes] = args[0] if args else kwargs.get("attributes", None) + self._id: Optional[str] = args[1] if len(args) > 1 else kwargs.get("vault_id", None) + self._vault_id = KeyVaultSecretIdentifier(self._id) if self._id else None + self._content_type = kwargs.get("content_type", None) + self._key_id = kwargs.get("key_id", None) + self._managed = kwargs.get("managed", None) + self._tags = kwargs.get("tags", None) + + def __repr__(self) -> str: + return f""[:1024] + + @classmethod + def _from_secret_bundle( + cls, secret_bundle: Union[_models.DeletedSecretBundle, _models.SecretBundle] + ) -> "SecretProperties": + return cls( + secret_bundle.attributes, + secret_bundle.id, + content_type=secret_bundle.content_type, + key_id=secret_bundle.kid, + managed=secret_bundle.managed, + tags=secret_bundle.tags, + ) + + @classmethod + def _from_secret_item(cls, secret_item: Union[_models.DeletedSecretItem, _models.SecretItem]) -> "SecretProperties": + return cls( + secret_item.attributes, + secret_item.id, + content_type=secret_item.content_type, + managed=secret_item.managed, + tags=secret_item.tags, + ) + + @property + def content_type(self) -> Optional[str]: + """An arbitrary string indicating the type of the secret. + + :returns: The content type of the secret. + :rtype: str or None + """ + return self._content_type + + @property + def id(self) -> Optional[str]: + """The secret's ID. + + :returns: The secret's ID. + :rtype: str or None + """ + return self._id + + @property + def key_id(self) -> Optional[str]: + """If this secret backs a certificate, this property is the identifier of the corresponding key. + + :returns: The ID of the key backing the certificate that's backed by this secret. If the secret isn't backing a + certificate, this is None. + :rtype: str or None + """ + return self._key_id + + @property + def enabled(self) -> Optional[bool]: + """Whether the secret is enabled for use. + + :returns: True if the secret is enabled for use; False otherwise. + :rtype: bool or None + """ + return self._attributes.enabled if self._attributes else None + + @property + def not_before(self) -> Optional[datetime]: + """The time before which the secret cannot be used, in UTC. + + :returns: The time before which the secret cannot be used, in UTC. + :rtype: ~datetime.datetime or None + """ + return self._attributes.not_before if self._attributes else None + + @property + def expires_on(self) -> Optional[datetime]: + """When the secret expires, in UTC. + + :returns: When the secret expires, in UTC. + :rtype: ~datetime.datetime or None + """ + return self._attributes.expires if self._attributes else None + + @property + def created_on(self) -> Optional[datetime]: + """When the secret was created, in UTC. + + :returns: When the secret was created, in UTC. + :rtype: ~datetime.datetime or None + """ + return self._attributes.created if self._attributes else None + + @property + def updated_on(self) -> Optional[datetime]: + """When the secret was last updated, in UTC. + + :returns: When the secret was last updated, in UTC. + :rtype: ~datetime.datetime or None + """ + return self._attributes.updated if self._attributes else None + + @property + def recoverable_days(self) -> Optional[int]: + """The number of days the key is retained before being deleted from a soft-delete enabled Key Vault. + + :returns: The number of days the key is retained before being deleted from a soft-delete enabled Key Vault. + :rtype: int or None + """ + # recoverable_days was added in 7.1-preview + if self._attributes and hasattr(self._attributes, "recoverable_days"): + return self._attributes.recoverable_days + return None + + @property + def recovery_level(self) -> Optional[str]: + """The vault's deletion recovery level for secrets. + + :returns: The vault's deletion recovery level for secrets. + :rtype: str or None + """ + return self._attributes.recovery_level if self._attributes else None + + @property + def vault_url(self) -> Optional[str]: + """URL of the vault containing the secret. + + :returns: URL of the vault containing the secret. + :rtype: str or None + """ + return self._vault_id.vault_url if self._vault_id else None + + @property + def name(self) -> Optional[str]: + """The secret's name. + + :returns: The secret's name. + :rtype: str or None + """ + return self._vault_id.name if self._vault_id else None + + @property + def version(self) -> Optional[str]: + """The secret's version. + + :returns: The secret's version. + :rtype: str or None + """ + return self._vault_id.version if self._vault_id else None + + @property + def tags(self) -> Optional[Dict[str, str]]: + """Application specific metadata in the form of key-value pairs. + + :returns: A dictionary of tags attached to this secret. + :rtype: dict or None + """ + return self._tags + + @property + def managed(self) -> Optional[bool]: + """Whether the secret's lifetime is managed by Key Vault. If the secret backs a certificate, this will be true. + + :returns: True if the secret's lifetime is managed by Key Vault; False otherwise. + :rtype: bool or None + """ + return self._managed + + +class KeyVaultSecret(object): + """All of a secret's properties, and its value. + + :param properties: The secret's properties. + :type properties: ~azure.keyvault.secrets.SecretProperties + :param value: The value of the secret. + :type value: str or None + """ + + def __init__(self, properties: SecretProperties, value: Optional[str]) -> None: + self._properties = properties + self._value = value + + def __repr__(self) -> str: + return f""[:1024] + + @classmethod + def _from_secret_bundle(cls, secret_bundle: _models.SecretBundle) -> "KeyVaultSecret": + return cls( + properties=SecretProperties._from_secret_bundle(secret_bundle), # pylint: disable=protected-access + value=secret_bundle.value, + ) + + @property + def name(self) -> Optional[str]: + """The secret's name. + + :returns: The secret's name. + :rtype: str or None + """ + return self._properties.name + + @property + def id(self) -> Optional[str]: + """The secret's ID. + + :returns: The secret's ID. + :rtype: str or None + """ + return self._properties.id + + @property + def properties(self) -> SecretProperties: + """The secret's properties. + + :returns: The secret's properties. + :rtype: ~azure.keyvault.secrets.SecretProperties + """ + return self._properties + + @property + def value(self) -> Optional[str]: + """The secret's value. + + :returns: The secret's value. + :rtype: str or None + """ + return self._value + + +class KeyVaultSecretIdentifier(object): + """Information about a KeyVaultSecret parsed from a secret ID. + + :param str source_id: the full original identifier of a secret + + :raises ValueError: if the secret ID is improperly formatted + + Example: + .. literalinclude:: ../tests/test_parse_id.py + :start-after: [START parse_key_vault_secret_id] + :end-before: [END parse_key_vault_secret_id] + :language: python + :caption: Parse a secret's ID + :dedent: 8 + """ + + def __init__(self, source_id: str) -> None: + self._resource_id = parse_key_vault_id(source_id) + + @property + def source_id(self) -> str: + return self._resource_id.source_id + + @property + def vault_url(self) -> str: + return self._resource_id.vault_url + + @property + def name(self) -> str: + return self._resource_id.name + + @property + def version(self) -> Optional[str]: + return self._resource_id.version + + +class DeletedSecret(object): + """A deleted secret's properties and information about its deletion. + + If soft-delete is enabled, returns information about its recovery as well. + + :param properties: The deleted secret's properties. + :type properties: ~azure.keyvault.secrets.SecretProperties + :param deleted_date: When the secret was deleted, in UTC. + :type deleted_date: ~datetime.datetime or None + :param recovery_id: An identifier used to recover the deleted secret. + :type recovery_id: str or None + :param scheduled_purge_date: When the secret is scheduled to be purged by Key Vault, in UTC. + :type scheduled_purge_date: ~datetime.datetime or None + """ + + def __init__( + self, + properties: SecretProperties, + deleted_date: Optional[datetime] = None, + recovery_id: Optional[str] = None, + scheduled_purge_date: Optional[datetime] = None, + ) -> None: + self._properties = properties + self._deleted_date = deleted_date + self._recovery_id = recovery_id + self._scheduled_purge_date = scheduled_purge_date + + def __repr__(self) -> str: + return f""[:1024] + + @classmethod + def _from_deleted_secret_bundle(cls, deleted_secret_bundle: _models.DeletedSecretBundle) -> "DeletedSecret": + return cls( + properties=SecretProperties._from_secret_bundle(deleted_secret_bundle), # pylint: disable=protected-access + deleted_date=deleted_secret_bundle.deleted_date, + recovery_id=deleted_secret_bundle.recovery_id, + scheduled_purge_date=deleted_secret_bundle.scheduled_purge_date, + ) + + @classmethod + def _from_deleted_secret_item(cls, deleted_secret_item: _models.DeletedSecretItem) -> "DeletedSecret": + return cls( + properties=SecretProperties._from_secret_item(deleted_secret_item), # pylint: disable=protected-access + deleted_date=deleted_secret_item.deleted_date, + recovery_id=deleted_secret_item.recovery_id, + scheduled_purge_date=deleted_secret_item.scheduled_purge_date, + ) + + @property + def name(self) -> Optional[str]: + """The secret's name. + + :returns: The secret's name. + :rtype: str or None + """ + return self._properties.name + + @property + def id(self) -> Optional[str]: + """The secret's ID. + + :returns: The secret's ID. + :rtype: str or None + """ + return self._properties.id + + @property + def properties(self) -> SecretProperties: + """The properties of the deleted secret. + + :returns: The properties of the deleted secret. + :rtype: ~azure.keyvault.secrets.SecretProperties + """ + return self._properties + + @property + def deleted_date(self) -> Optional[datetime]: + """When the secret was deleted, in UTC. + + :returns: When the secret was deleted, in UTC. + :rtype: ~datetime.datetime or None + """ + return self._deleted_date + + @property + def recovery_id(self) -> Optional[str]: + """An identifier used to recover the deleted secret. + + :returns: An identifier used to recover the deleted secret. + :rtype: str or None + """ + return self._recovery_id + + @property + def scheduled_purge_date(self) -> Optional[datetime]: + """When the secret is scheduled to be purged by Key Vault, in UTC. + + :returns: When the secret is scheduled to be purged by Key Vault, in UTC. + :rtype: ~datetime.datetime or None + """ + return self._scheduled_purge_date + + +def patch_sdk(): + """Do not remove from this file. + + `patch_sdk` is a last resort escape hatch that allows you to do customizations + you can't accomplish using the techniques described in + https://aka.ms/azsdk/python/dpcodegen/python/customize + """