From d05f49efd7e2331ede26efa0a5988fec79a891c5 Mon Sep 17 00:00:00 2001 From: emiliyank Date: Wed, 3 Dec 2025 16:42:01 +0200 Subject: [PATCH] Allow PublicKey for TokenUpdateKeys in TokenUpdateTransaction Signed-off-by: emiliyank --- CHANGELOG.md | 2 + .../consensus/topic_create_transaction.py | 36 ++-- .../tokens/token_create_transaction.py | 64 ++---- .../tokens/token_update_transaction.py | 82 ++++---- src/hiero_sdk_python/utils/key_utils.py | 49 +++++ .../token_update_transaction_e2e_test.py | 194 ++++++++++++++++++ tests/unit/test_key_utils.py | 91 ++++++++ tests/unit/test_token_create_transaction.py | 65 +----- tests/unit/test_token_update_transaction.py | 191 +++++++++++++++++ 9 files changed, 609 insertions(+), 165 deletions(-) create mode 100644 src/hiero_sdk_python/utils/key_utils.py create mode 100644 tests/unit/test_key_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f42ac0f0e..3e9ae07dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,9 @@ This changelog is based on [Keep a Changelog](https://keepachangelog.com/en/1.1. - Add examples/tokens/token_create_transaction_pause_key.py example demonstrating token pause/unpause behavior and pause key usage (#833) - Added `docs/sdk_developers/training/transaction_lifecycle.md` to explain the typical lifecycle of executing a transaction using the Hedera Python SDK. - Add inactivity bot workflow to unassign stale issue assignees (#952) + ### Changed +- Allow `PublicKey` for `TokenUpdateKeys` in `TokenUpdateTransaction`, enabling non-custodial workflows where operators can build transactions using only public keys (#934). ### Fixed - Fixed inactivity bot workflow not checking out repository before running (#964) diff --git a/src/hiero_sdk_python/consensus/topic_create_transaction.py b/src/hiero_sdk_python/consensus/topic_create_transaction.py index 8aa9c17f6..e62ab17f6 100644 --- a/src/hiero_sdk_python/consensus/topic_create_transaction.py +++ b/src/hiero_sdk_python/consensus/topic_create_transaction.py @@ -21,10 +21,7 @@ from hiero_sdk_python.channels import _Channel from hiero_sdk_python.executable import _Method from hiero_sdk_python.account.account_id import AccountId -from hiero_sdk_python.crypto.private_key import PrivateKey -from hiero_sdk_python.crypto.public_key import PublicKey - -Key = Union[PrivateKey, PublicKey] +from hiero_sdk_python.utils.key_utils import Key, key_to_proto class TopicCreateTransaction(Transaction): @@ -174,25 +171,16 @@ def set_fee_exempt_keys(self, keys: List[Key]) -> "TopicCreateTransaction": self.fee_exempt_keys = keys return self - def _to_proto_key(self, key: Optional[Key]) -> Optional[basic_types_pb2.Key]: + def _to_proto_key(self, key: Optional[Key]): """ - Helper method to convert a key (PrivateKey or PublicKey) to protobuf Key format. + Backwards-compatible wrapper around `key_to_proto` for converting SDK keys + (PrivateKey or PublicKey) to protobuf `Key` messages. - Args: - key (Optional[Key]): The key to convert (PrivateKey or PublicKey), or None - - Returns: - basic_types_pb2.Key (Optional): The protobuf key or None if key is None + This exists so that existing unit tests and any external callers that rely + on `_to_proto_key` continue to work after centralizing the logic in + `hiero_sdk_python.utils.key_utils.key_to_proto`. """ - if not key: - return None - - # If it's a PrivateKey, get the public key first, then convert to proto - if isinstance(key, PrivateKey): - return key.public_key()._to_proto() - - # If it's a PublicKey, convert directly to proto - return key._to_proto() + return key_to_proto(key) def _build_proto_body(self) -> consensus_create_topic_pb2.ConsensusCreateTopicTransactionBody: """ @@ -202,8 +190,8 @@ def _build_proto_body(self) -> consensus_create_topic_pb2.ConsensusCreateTopicTr ConsensusCreateTopicTransactionBody: The protobuf body for this transaction. """ return consensus_create_topic_pb2.ConsensusCreateTopicTransactionBody( - adminKey=self._to_proto_key(self.admin_key), - submitKey=self._to_proto_key(self.submit_key), + adminKey=key_to_proto(self.admin_key), + submitKey=key_to_proto(self.submit_key), autoRenewPeriod=( self.auto_renew_period._to_proto() if self.auto_renew_period is not None @@ -214,8 +202,8 @@ def _build_proto_body(self) -> consensus_create_topic_pb2.ConsensusCreateTopicTr else None), memo=self.memo, custom_fees=[custom_fee._to_topic_fee_proto() for custom_fee in self.custom_fees], - fee_schedule_key=self._to_proto_key(self.fee_schedule_key), - fee_exempt_key_list=[self._to_proto_key(key) for key in self.fee_exempt_keys], + fee_schedule_key=key_to_proto(self.fee_schedule_key), + fee_exempt_key_list=[key_to_proto(key) for key in self.fee_exempt_keys], ) def build_transaction_body(self) -> transaction_pb2.TransactionBody: diff --git a/src/hiero_sdk_python/tokens/token_create_transaction.py b/src/hiero_sdk_python/tokens/token_create_transaction.py index b00c130cf..c8873d93a 100644 --- a/src/hiero_sdk_python/tokens/token_create_transaction.py +++ b/src/hiero_sdk_python/tokens/token_create_transaction.py @@ -26,15 +26,12 @@ from hiero_sdk_python.tokens.token_type import TokenType from hiero_sdk_python.tokens.supply_type import SupplyType from hiero_sdk_python.account.account_id import AccountId -from hiero_sdk_python.crypto.private_key import PrivateKey -from hiero_sdk_python.crypto.public_key import PublicKey from hiero_sdk_python.tokens.custom_fee import CustomFee +from hiero_sdk_python.utils.key_utils import Key, key_to_proto AUTO_RENEW_PERIOD = Duration(7890000) # around 90 days in seconds DEFAULT_TRANSACTION_FEE = 3_000_000_000 -Key = Union[PrivateKey, PublicKey] - @dataclass class TokenParams: """ @@ -446,38 +443,6 @@ def set_metadata(self, metadata: bytes | str) -> "TokenCreateTransaction": self._token_params.metadata = metadata return self - def _to_proto_key(self, key: Optional[Key]) -> Optional[basic_types_pb2.Key]: - """ - Helper method to convert a PrivateKey or PublicKey to the protobuf Key format. - - This ensures only public keys are serialized: - - If a PublicKey is provided, it is used directly. - - If a PrivateKey is provided, its corresponding public key is extracted and used. - - Args: - key (Key, Optional): The PrivateKey or PublicKey to convert. - - Returns: - basic_types_pb2.Key (Optional): The protobuf key, or None. - - Raises: - TypeError: If the provided key is not a PrivateKey, PublicKey, or None. - """ - if not key: - return None - - # If it's a PrivateKey, get its public key first - if isinstance(key, PrivateKey): - return key.public_key()._to_proto() - - # If it's already a PublicKey, just convert it - if isinstance(key, PublicKey): - return key._to_proto() - - # Safety net: This will fail if a non-key is passed - raise TypeError("Key must be of type PrivateKey or PublicKey") - - def freeze_with(self, client) -> "TokenCreateTransaction": """ Freeze the transaction with the given client. @@ -501,6 +466,17 @@ def freeze_with(self, client) -> "TokenCreateTransaction": return super().freeze_with(client) + def _to_proto_key(self, key: Optional[Key]): + """ + Backwards-compatible wrapper around `key_to_proto` for converting SDK keys + (PrivateKey or PublicKey) to protobuf `Key` messages. + + This exists so that existing unit tests and any external callers that rely + on `_to_proto_key` continue to work after centralizing the logic in + `hiero_sdk_python.utils.key_utils.key_to_proto`. + """ + return key_to_proto(key) + def _build_proto_body(self) -> token_create_pb2.TokenCreateTransactionBody: """ Returns the protobuf body for the token create transaction. @@ -518,14 +494,14 @@ def _build_proto_body(self) -> token_create_pb2.TokenCreateTransactionBody: TokenCreateValidator._validate_token_freeze_status(self._keys, self._token_params) # Convert keys - admin_key_proto = self._to_proto_key(self._keys.admin_key) - supply_key_proto = self._to_proto_key(self._keys.supply_key) - freeze_key_proto = self._to_proto_key(self._keys.freeze_key) - wipe_key_proto = self._to_proto_key(self._keys.wipe_key) - metadata_key_proto = self._to_proto_key(self._keys.metadata_key) - pause_key_proto = self._to_proto_key(self._keys.pause_key) - kyc_key_proto = self._to_proto_key(self._keys.kyc_key) - fee_schedules_key_proto = self._to_proto_key(self._keys.fee_schedule_key); + admin_key_proto = key_to_proto(self._keys.admin_key) + supply_key_proto = key_to_proto(self._keys.supply_key) + freeze_key_proto = key_to_proto(self._keys.freeze_key) + wipe_key_proto = key_to_proto(self._keys.wipe_key) + metadata_key_proto = key_to_proto(self._keys.metadata_key) + pause_key_proto = key_to_proto(self._keys.pause_key) + kyc_key_proto = key_to_proto(self._keys.kyc_key) + fee_schedules_key_proto = key_to_proto(self._keys.fee_schedule_key) # Resolve enum values with defaults token_type_value = ( diff --git a/src/hiero_sdk_python/tokens/token_update_transaction.py b/src/hiero_sdk_python/tokens/token_update_transaction.py index 97bae372d..e0b867123 100644 --- a/src/hiero_sdk_python/tokens/token_update_transaction.py +++ b/src/hiero_sdk_python/tokens/token_update_transaction.py @@ -10,7 +10,6 @@ from google.protobuf.wrappers_pb2 import (BytesValue, StringValue) from hiero_sdk_python.Duration import Duration -from hiero_sdk_python.crypto.private_key import PrivateKey from hiero_sdk_python.hbar import Hbar from hiero_sdk_python.timestamp import Timestamp from hiero_sdk_python.tokens.token_id import TokenId @@ -23,6 +22,7 @@ SchedulableTransactionBody, ) from hiero_sdk_python.hapi.services import token_update_pb2, transaction_pb2 +from hiero_sdk_python.utils.key_utils import Key, key_to_proto @dataclass class TokenUpdateParams: @@ -59,14 +59,14 @@ class TokenUpdateKeys: metadata_key: The new metadata key for the token. pause_key: The new pause key for the token. """ - admin_key: Optional[PrivateKey] = None - supply_key: Optional[PrivateKey] = None - freeze_key: Optional[PrivateKey] = None - wipe_key: Optional[PrivateKey] = None - metadata_key: Optional[PrivateKey] = None - pause_key: Optional[PrivateKey] = None - kyc_key: Optional[PrivateKey] = None - fee_schedule_key: Optional[PrivateKey] = None + admin_key: Optional[Key] = None + supply_key: Optional[Key] = None + freeze_key: Optional[Key] = None + wipe_key: Optional[Key] = None + metadata_key: Optional[Key] = None + pause_key: Optional[Key] = None + kyc_key: Optional[Key] = None + fee_schedule_key: Optional[Key] = None class TokenUpdateTransaction(Transaction): @@ -124,14 +124,14 @@ def __init__( # Initialize keys attributes keys: TokenUpdateKeys = token_keys or TokenUpdateKeys() - self.admin_key: Optional[PrivateKey] = keys.admin_key - self.freeze_key: Optional[PrivateKey] = keys.freeze_key - self.wipe_key: Optional[PrivateKey] = keys.wipe_key - self.supply_key: Optional[PrivateKey] = keys.supply_key - self.pause_key: Optional[PrivateKey] = keys.pause_key - self.metadata_key: Optional[PrivateKey] = keys.metadata_key - self.kyc_key: Optional[PrivateKey] = keys.kyc_key - self.fee_schedule_key: Optional[PrivateKey] = keys.fee_schedule_key + self.admin_key: Optional[Key] = keys.admin_key + self.freeze_key: Optional[Key] = keys.freeze_key + self.wipe_key: Optional[Key] = keys.wipe_key + self.supply_key: Optional[Key] = keys.supply_key + self.pause_key: Optional[Key] = keys.pause_key + self.metadata_key: Optional[Key] = keys.metadata_key + self.kyc_key: Optional[Key] = keys.kyc_key + self.fee_schedule_key: Optional[Key] = keys.fee_schedule_key self.token_key_verification_mode: TokenKeyValidation = token_key_verification_mode @@ -284,13 +284,13 @@ def set_expiration_time(self, expiration_time: Timestamp) -> "TokenUpdateTransac def set_admin_key( self, - admin_key: PrivateKey + admin_key: Key ) -> "TokenUpdateTransaction": """ Sets the new admin key for the token. Args: - admin_key (PrivateKey): The new admin key to set. + admin_key (Key): The new admin key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -301,13 +301,13 @@ def set_admin_key( def set_freeze_key( self, - freeze_key: PrivateKey + freeze_key: Key ) -> "TokenUpdateTransaction": """ Sets the new freeze key for the token. Args: - freeze_key (PrivateKey): The new freeze key to set. + freeze_key (Key): The new freeze key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -318,13 +318,13 @@ def set_freeze_key( def set_wipe_key( self, - wipe_key: PrivateKey + wipe_key: Key ) -> "TokenUpdateTransaction": """ Sets the new wipe key for the token. Args: - wipe_key (PrivateKey): The new wipe key to set. + wipe_key (Key): The new wipe key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -335,13 +335,13 @@ def set_wipe_key( def set_supply_key( self, - supply_key: PrivateKey + supply_key: Key ) -> "TokenUpdateTransaction": """ Sets the new supply key for the token. Args: - supply_key (PrivateKey): The new supply key to set. + supply_key (Key): The new supply key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -352,13 +352,13 @@ def set_supply_key( def set_pause_key( self, - pause_key: PrivateKey + pause_key: Key ) -> "TokenUpdateTransaction": """ Sets the new pause key for the token. Args: - pause_key (PrivateKey): The new pause key to set. + pause_key (Key): The new pause key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -369,13 +369,13 @@ def set_pause_key( def set_metadata_key( self, - metadata_key: PrivateKey + metadata_key: Key ) -> "TokenUpdateTransaction": """ Sets the new metadata key for the token. Args: - metadata_key (PrivateKey): The new metadata key to set. + metadata_key (Key): The new metadata key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -384,12 +384,12 @@ def set_metadata_key( self.metadata_key = metadata_key return self - def set_kyc_key(self, kyc_key: PrivateKey) -> "TokenUpdateTransaction": + def set_kyc_key(self, kyc_key: Key) -> "TokenUpdateTransaction": """ Sets the kyc key for the token Args: - kyc_key (Private Key): The new kyc_key to set. + kyc_key (Key): The new kyc_key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -398,12 +398,12 @@ def set_kyc_key(self, kyc_key: PrivateKey) -> "TokenUpdateTransaction": self.kyc_key = kyc_key return self - def set_fee_schedule_key(self, fee_schedule_key: PrivateKey) -> "TokenUpdateTransaction": + def set_fee_schedule_key(self, fee_schedule_key: Key) -> "TokenUpdateTransaction": """ Sets the fee schedule key for the token Args: - fee_schedule_key (Private Key): The new fee_schedule_key to set. + fee_schedule_key (Key): The new fee_schedule_key to set (PrivateKey or PublicKey). Returns: TokenUpdateTransaction: This transaction instance. @@ -507,18 +507,18 @@ def _set_keys_to_proto( Sets the keys to the protobuf transaction body. """ if self.admin_key: - token_update_body.adminKey.CopyFrom(self.admin_key.public_key()._to_proto()) + token_update_body.adminKey.CopyFrom(key_to_proto(self.admin_key)) if self.freeze_key: - token_update_body.freezeKey.CopyFrom(self.freeze_key.public_key()._to_proto()) + token_update_body.freezeKey.CopyFrom(key_to_proto(self.freeze_key)) if self.wipe_key: - token_update_body.wipeKey.CopyFrom(self.wipe_key.public_key()._to_proto()) + token_update_body.wipeKey.CopyFrom(key_to_proto(self.wipe_key)) if self.supply_key: - token_update_body.supplyKey.CopyFrom(self.supply_key.public_key()._to_proto()) + token_update_body.supplyKey.CopyFrom(key_to_proto(self.supply_key)) if self.metadata_key: - token_update_body.metadata_key.CopyFrom(self.metadata_key.public_key()._to_proto()) + token_update_body.metadata_key.CopyFrom(key_to_proto(self.metadata_key)) if self.pause_key: - token_update_body.pause_key.CopyFrom(self.pause_key.public_key()._to_proto()) + token_update_body.pause_key.CopyFrom(key_to_proto(self.pause_key)) if self.kyc_key: - token_update_body.kycKey.CopyFrom(self.kyc_key.public_key()._to_proto()) + token_update_body.kycKey.CopyFrom(key_to_proto(self.kyc_key)) if self.fee_schedule_key: - token_update_body.fee_schedule_key.CopyFrom(self.fee_schedule_key.public_key()._to_proto()) + token_update_body.fee_schedule_key.CopyFrom(key_to_proto(self.fee_schedule_key)) diff --git a/src/hiero_sdk_python/utils/key_utils.py b/src/hiero_sdk_python/utils/key_utils.py new file mode 100644 index 000000000..e1b038c22 --- /dev/null +++ b/src/hiero_sdk_python/utils/key_utils.py @@ -0,0 +1,49 @@ +""" +hiero_sdk_python.utils.key_utils +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Utility functions and type definitions for working with cryptographic keys. +""" + +from typing import Optional, Union + +from hiero_sdk_python.crypto.private_key import PrivateKey +from hiero_sdk_python.crypto.public_key import PublicKey +from hiero_sdk_python.hapi.services import basic_types_pb2 + +# Type alias for keys that can be either PrivateKey or PublicKey +Key = Union[PrivateKey, PublicKey] + + +def key_to_proto(key: Optional[Key]) -> Optional[basic_types_pb2.Key]: + """ + Helper function to convert a key (PrivateKey or PublicKey) to protobuf Key format. + + This function handles the conversion of SDK key types to protobuf format: + - If a PrivateKey is provided, its corresponding public key is extracted and converted. + - If a PublicKey is provided, it is converted directly to protobuf. + - If None is provided, None is returned. + + Args: + key (Optional[Key]): The key to convert (PrivateKey or PublicKey), or None + + Returns: + basic_types_pb2.Key (Optional): The protobuf key or None if key is None + + Raises: + TypeError: If the provided key is not a PrivateKey, PublicKey, or None. + """ + if not key: + return None + + # If it's a PrivateKey, get the public key first, then convert to proto + if isinstance(key, PrivateKey): + return key.public_key()._to_proto() + + # If it's a PublicKey, convert directly to proto + if isinstance(key, PublicKey): + return key._to_proto() + + # Safety net: This will fail if a non-key is passed + raise TypeError("Key must be of type PrivateKey or PublicKey") + diff --git a/tests/integration/token_update_transaction_e2e_test.py b/tests/integration/token_update_transaction_e2e_test.py index 635c98349..a79791be4 100644 --- a/tests/integration/token_update_transaction_e2e_test.py +++ b/tests/integration/token_update_transaction_e2e_test.py @@ -3,6 +3,7 @@ from hiero_sdk_python.Duration import Duration from hiero_sdk_python.crypto.private_key import PrivateKey +from hiero_sdk_python.crypto.public_key import PublicKey from hiero_sdk_python.hbar import Hbar from hiero_sdk_python.response_code import ResponseCode from hiero_sdk_python.timestamp import Timestamp @@ -15,6 +16,7 @@ from hiero_sdk_python.tokens.token_update_transaction import TokenUpdateTransaction from hiero_sdk_python.account.account_create_transaction import AccountCreateTransaction from hiero_sdk_python.tokens.token_mint_transaction import TokenMintTransaction +from hiero_sdk_python.transaction.transaction import Transaction from tests.integration.utils_for_test import IntegrationTestEnv, create_fungible_token, create_nft_token private_key = PrivateKey.generate() @@ -916,3 +918,195 @@ def test_integation_token_update_fee_schedule_key_nft(): assert token_info.custom_fees[0].fee_collector_account_id == env.client.operator_account_id finally: env.close() + + +@pytest.mark.integration +def test_integration_token_update_with_public_key(): + """Test updating token keys with PublicKey directly (not PrivateKey).""" + env = IntegrationTestEnv() + + try: + token_id = create_fungible_token(env) + + # Generate a new key pair for the freeze key + freeze_private_key = PrivateKey.generate_ed25519() + freeze_public_key = freeze_private_key.public_key() + + # Update token with PublicKey + receipt = ( + TokenUpdateTransaction() + .set_token_id(token_id) + .set_token_name("UpdatedWithPublicKey") + .set_freeze_key(freeze_public_key) # Using PublicKey directly + .freeze_with(env.client) + .execute(env.client) + ) + + assert receipt.status == ResponseCode.SUCCESS, f"Token update transaction failed with status: {ResponseCode.get_name(receipt.status)}" + + # Query the token and verify the freeze key matches the public key + info = TokenInfoQuery().set_token_id(token_id).execute(env.client) + + assert info.name == "UpdatedWithPublicKey", "Token name failed to update" + assert info.freeze_key is not None + + # info.freeze_key is already a PublicKey object, so we can compare directly + assert info.freeze_key.to_bytes_raw() == freeze_public_key.to_bytes_raw(), "Freeze key on network should match the PublicKey used in transaction" + finally: + env.close() + + +@pytest.mark.integration +def test_integration_token_update_non_custodial_workflow(): + """ + Test the non-custodial workflow where: + 1. Operator builds a TokenUpdateTransaction using only a PublicKey + 2. Operator gets the transaction bytes + 3. User (with the PrivateKey) signs the bytes + 4. Operator executes the signed transaction + """ + env = IntegrationTestEnv() + + try: + token_id = create_fungible_token(env) + + # 1. SETUP: Create a new key pair for the "user" + user_private_key = PrivateKey.generate_ed25519() + user_public_key = user_private_key.public_key() + + # ================================================================= + # STEP 1 & 2: OPERATOR (CLIENT) BUILDS THE TRANSACTION + # ================================================================= + + tx = ( + TokenUpdateTransaction() + .set_token_id(token_id) + .set_token_name("NonCustodialToken") + .set_freeze_key(user_public_key) # <-- Using PublicKey! + .freeze_with(env.client) + ) + + tx_bytes = tx.to_bytes() + + # ================================================================= + # STEP 3: USER (SIGNER) SIGNS THE TRANSACTION + # ================================================================= + + tx_from_bytes = Transaction.from_bytes(tx_bytes) + tx_from_bytes.sign(user_private_key) + + # ================================================================= + # STEP 4: OPERATOR (CLIENT) EXECUTES THE SIGNED TX + # ================================================================= + + receipt = tx_from_bytes.execute(env.client) + + assert receipt.status == ResponseCode.SUCCESS, f"Token update failed with status: {ResponseCode.get_name(receipt.status)}" + + # PROOF: Query the token and check if the freeze key matches + token_info = TokenInfoQuery(token_id=token_id).execute(env.client) + + assert token_info.freeze_key is not None + + # This is the STRONG assertion: + # Compare the bytes of the key from the network + # with the bytes of the key we originally used. + # token_info.freeze_key is already a PublicKey object, so we can compare directly + assert token_info.freeze_key.to_bytes_raw() == user_public_key.to_bytes_raw(), "Freeze key on network should match the PublicKey used in transaction" + assert token_info.name == "NonCustodialToken", "Token name should be updated" + finally: + env.close() + + +@pytest.mark.integration +def test_integration_token_update_with_ecdsa_public_key(): + """Test updating token keys with ECDSA PublicKey.""" + env = IntegrationTestEnv() + + try: + token_id = create_fungible_token(env) + + # Generate a new ECDSA key pair for the admin key + admin_private_key = PrivateKey.generate_ecdsa() + admin_public_key = admin_private_key.public_key() + + # Update token with ECDSA PublicKey + # Note: When updating admin_key, we need to sign with the new admin key + receipt = ( + TokenUpdateTransaction() + .set_token_id(token_id) + .set_token_name("UpdatedWithECDSA") + .set_admin_key(admin_public_key) # Using ECDSA PublicKey + .freeze_with(env.client) + .sign(admin_private_key) # Sign with the new admin key + .execute(env.client) + ) + + assert receipt.status == ResponseCode.SUCCESS, f"Token update transaction failed with status: {ResponseCode.get_name(receipt.status)}" + + # Query the token and verify the admin key matches the public key + token_info = TokenInfoQuery(token_id=token_id).execute(env.client) + assert token_info is not None + assert token_info.admin_key is not None + + # token_info.admin_key is already a PublicKey object, so we can compare directly + assert token_info.admin_key.to_bytes_raw() == admin_public_key.to_bytes_raw(), "Admin key on network should match the ECDSA PublicKey used in transaction" + assert token_info.name == "UpdatedWithECDSA", "Token name should be updated" + finally: + env.close() + + +@pytest.mark.integration +def test_integration_token_update_with_mixed_key_types(): + """Test updating token with mixed PrivateKey and PublicKey types.""" + env = IntegrationTestEnv() + + try: + token_id = create_fungible_token(env) + + # Generate keys - mix of PrivateKey and PublicKey + admin_private_key = PrivateKey.generate_ed25519() + freeze_public_key = PrivateKey.generate_ed25519().public_key() + wipe_private_key = PrivateKey.generate_ecdsa() + supply_public_key = PrivateKey.generate_ecdsa().public_key() + + # Update token with mixed key types + # Note: When updating admin_key, we need to sign with the new admin key + receipt = ( + TokenUpdateTransaction() + .set_token_id(token_id) + .set_token_name("MixedKeysToken") + .set_admin_key(admin_private_key) # PrivateKey + .set_freeze_key(freeze_public_key) # PublicKey + .set_wipe_key(wipe_private_key) # PrivateKey + .set_supply_key(supply_public_key) # PublicKey + .freeze_with(env.client) + .sign(admin_private_key) # Sign with the new admin key + .execute(env.client) + ) + + assert receipt.status == ResponseCode.SUCCESS, f"Token update transaction failed with status: {ResponseCode.get_name(receipt.status)}" + + # Query the token and verify all keys are correctly set + token_info = TokenInfoQuery(token_id=token_id).execute(env.client) + + assert token_info.name == "MixedKeysToken", "Token name should be updated" + assert token_info.admin_key is not None + assert token_info.freeze_key is not None + assert token_info.wipe_key is not None + assert token_info.supply_key is not None + + # token_info keys are already PublicKey objects, so we can compare directly + # Verify admin key (from PrivateKey) + assert token_info.admin_key.to_bytes_raw() == admin_private_key.public_key().to_bytes_raw() + + # Verify freeze key (from PublicKey) + assert token_info.freeze_key.to_bytes_raw() == freeze_public_key.to_bytes_raw() + + # Verify wipe key (from PrivateKey) + assert token_info.wipe_key.to_bytes_raw() == wipe_private_key.public_key().to_bytes_raw() + + # Verify supply key (from PublicKey) + assert token_info.supply_key.to_bytes_raw() == supply_public_key.to_bytes_raw() + finally: + env.close() diff --git a/tests/unit/test_key_utils.py b/tests/unit/test_key_utils.py new file mode 100644 index 000000000..821dbcec8 --- /dev/null +++ b/tests/unit/test_key_utils.py @@ -0,0 +1,91 @@ +"""Tests for the key_utils module.""" + +import pytest + +from hiero_sdk_python.crypto.private_key import PrivateKey +from hiero_sdk_python.crypto.public_key import PublicKey +from hiero_sdk_python.hapi.services import basic_types_pb2 +from hiero_sdk_python.utils.key_utils import Key, key_to_proto + +pytestmark = pytest.mark.unit + + +def test_key_to_proto_with_ed25519_public_key(): + """Tests key_to_proto with an Ed25519 PublicKey.""" + private_key = PrivateKey.generate_ed25519() + public_key = private_key.public_key() + + expected_proto = public_key._to_proto() + result_proto = key_to_proto(public_key) + + assert result_proto == expected_proto + assert isinstance(result_proto, basic_types_pb2.Key) + + +def test_key_to_proto_with_ecdsa_public_key(): + """Tests key_to_proto with an ECDSA PublicKey.""" + private_key = PrivateKey.generate_ecdsa() + public_key = private_key.public_key() + + expected_proto = public_key._to_proto() + result_proto = key_to_proto(public_key) + + assert result_proto == expected_proto + assert isinstance(result_proto, basic_types_pb2.Key) + + +def test_key_to_proto_with_ed25519_private_key(): + """Tests key_to_proto with an Ed25519 PrivateKey (extracts public key).""" + private_key = PrivateKey.generate_ed25519() + public_key = private_key.public_key() + + # We expect the *public key's* proto, even though we passed a private key + expected_proto = public_key._to_proto() + + # Call the function with the PrivateKey + result_proto = key_to_proto(private_key) + + # Assert it correctly converted it to the public key proto + assert result_proto == expected_proto + assert isinstance(result_proto, basic_types_pb2.Key) + + +def test_key_to_proto_with_ecdsa_private_key(): + """Tests key_to_proto with an ECDSA PrivateKey (extracts public key).""" + private_key = PrivateKey.generate_ecdsa() + public_key = private_key.public_key() + + expected_proto = public_key._to_proto() + result_proto = key_to_proto(private_key) + + assert result_proto == expected_proto + assert isinstance(result_proto, basic_types_pb2.Key) + + +def test_key_to_proto_with_none(): + """Tests key_to_proto with None.""" + result = key_to_proto(None) + assert result is None + + +def test_key_to_proto_with_invalid_string_raises_error(): + """Tests key_to_proto raises TypeError with invalid input.""" + with pytest.raises(TypeError) as e: + key_to_proto("this is not a key") + + assert "Key must be of type PrivateKey or PublicKey" in str(e.value) + + +def test_key_type_alias(): + """Tests that the Key type alias works correctly.""" + private_key = PrivateKey.generate_ed25519() + public_key = private_key.public_key() + + # Test that both PrivateKey and PublicKey can be assigned to Key type + key1: Key = private_key + key2: Key = public_key + + # Both should work with key_to_proto + assert key_to_proto(key1) is not None + assert key_to_proto(key2) is not None + diff --git a/tests/unit/test_token_create_transaction.py b/tests/unit/test_token_create_transaction.py index b141683fb..b8e679b71 100644 --- a/tests/unit/test_token_create_transaction.py +++ b/tests/unit/test_token_create_transaction.py @@ -1187,71 +1187,24 @@ def test_token_info_query_structure(): print("✅ TokenInfoQuery structure test passed") -# --- Tests for _to_proto_key (Proof of Safety) --- +# --- Tests for _to_proto_key (backward compatibility wrapper) --- +# Note: Core functionality tests for key_to_proto are in test_key_utils.py -def test_to_proto_key_with_ed25519_public_key(): - """Tests _to_proto_key with an Ed25519 PublicKey (New Happy Path).""" +def test_to_proto_key_wrapper_still_works(): + """Tests that _to_proto_key wrapper method still works for backward compatibility.""" tx = TokenCreateTransaction() private_key = PrivateKey.generate_ed25519() public_key = private_key.public_key() - expected_proto = public_key._to_proto() + # Test that the wrapper method still exists and works result_proto = tx._to_proto_key(public_key) - - assert result_proto == expected_proto - assert isinstance(result_proto, basic_types_pb2.Key) - -def test_to_proto_key_with_ecdsa_public_key(): - """Tests _to_proto_key with an ECDSA PublicKey (New Happy Path).""" - tx = TokenCreateTransaction() - private_key = PrivateKey.generate_ecdsa() - public_key = private_key.public_key() - - expected_proto = public_key._to_proto() - result_proto = tx._to_proto_key(public_key) - - assert result_proto == expected_proto - assert isinstance(result_proto, basic_types_pb2.Key) - -def test_to_proto_key_with_ed25519_private_key(): - """Tests _to_proto_key with an Ed25519 PrivateKey (Backward-Compatibility).""" - tx = TokenCreateTransaction() - private_key = PrivateKey.generate_ed25519() - public_key = private_key.public_key() - - # We expect the *public key's* proto, even though we passed a private key - expected_proto = public_key._to_proto() - - # Call the function with the PrivateKey - result_proto = tx._to_proto_key(private_key) - - # Assert it correctly converted it to the public key proto - assert result_proto == expected_proto + assert result_proto is not None assert isinstance(result_proto, basic_types_pb2.Key) - -def test_to_proto_key_with_ecdsa_private_key(): - """Tests _to_proto_key with an ECDSA PrivateKey (Backward-Compatibility).""" - tx = TokenCreateTransaction() - private_key = PrivateKey.generate_ecdsa() - public_key = private_key.public_key() - expected_proto = public_key._to_proto() - result_proto = tx._to_proto_key(private_key) - - assert result_proto == expected_proto - assert isinstance(result_proto, basic_types_pb2.Key) - -def test_to_proto_key_with_none(): - """Tests the _to_proto_key function with None (Non-Happy Path).""" - tx = TokenCreateTransaction() - result = tx._to_proto_key(None) - assert result is None - -def test_to_proto_key_with_invalid_string_raises_error(): - """Tests the _to_proto_key safety net with a string (Non-Happy Path).""" - tx = TokenCreateTransaction() + # Test with None + assert tx._to_proto_key(None) is None + # Test error handling with pytest.raises(TypeError) as e: tx._to_proto_key("this is not a key") - assert "Key must be of type PrivateKey or PublicKey" in str(e.value) diff --git a/tests/unit/test_token_update_transaction.py b/tests/unit/test_token_update_transaction.py index 6573ac6d6..9e2ff7781 100644 --- a/tests/unit/test_token_update_transaction.py +++ b/tests/unit/test_token_update_transaction.py @@ -12,6 +12,8 @@ from hiero_sdk_python.timestamp import Timestamp from hiero_sdk_python.tokens.token_update_transaction import TokenUpdateKeys, TokenUpdateParams, TokenUpdateTransaction from hiero_sdk_python.hapi.services import response_header_pb2, response_pb2, transaction_get_receipt_pb2 +from hiero_sdk_python.crypto.private_key import PrivateKey +from hiero_sdk_python.crypto.public_key import PublicKey from tests.unit.mock_server import mock_hedera_servers pytestmark = pytest.mark.unit @@ -256,3 +258,192 @@ def test_build_scheduled_body(mock_account_ids, private_key, new_token_data): assert schedulable_body.tokenUpdate.autoRenewAccount == operator_id._to_proto() assert schedulable_body.tokenUpdate.treasury == operator_id._to_proto() assert schedulable_body.tokenUpdate.adminKey.HasField("ed25519") + + +# Helper functions for key generation and verification +def create_key(key_type, use_private): + """ + Create a key based on type and whether to use private or public. + + Args: + key_type: "ed25519" or "ecdsa" + use_private: True for PrivateKey, False for PublicKey + + Returns: + The created key (PrivateKey or PublicKey) + """ + if key_type == "ed25519": + private_key = PrivateKey.generate_ed25519() + else: # ecdsa + private_key = PrivateKey.generate_ecdsa() + + return private_key if use_private else private_key.public_key() + + +def get_expected_public_key(key): + """ + Get the public key from either PrivateKey or PublicKey. + + Args: + key: PrivateKey or PublicKey + + Returns: + PublicKey + """ + return key if isinstance(key, PublicKey) else key.public_key() + + +def verify_key_in_proto(proto_key, expected_public_key, key_type): + """ + Verify the proto key matches expected public key. + + Args: + proto_key: The proto key from the transaction body + expected_public_key: The expected PublicKey + key_type: "ed25519" or "ecdsa" + """ + if key_type == "ed25519": + assert proto_key.ed25519 == expected_public_key.to_bytes_raw() + else: # ecdsa + assert proto_key.HasField("ECDSA_secp256k1") + assert proto_key.ECDSA_secp256k1 == expected_public_key.to_bytes_raw() + + +# Tests for PrivateKey and PublicKey support (ED25519 and ECDSA) +@pytest.mark.parametrize("key_type,use_private", [ + ("ed25519", True), + ("ed25519", False), + ("ecdsa", True), + ("ecdsa", False), +]) +@pytest.mark.parametrize("field_name,setter_name,proto_path", [ + ("admin_key", "set_admin_key", "adminKey"), + ("freeze_key", "set_freeze_key", "freezeKey"), + ("wipe_key", "set_wipe_key", "wipeKey"), + ("supply_key", "set_supply_key", "supplyKey"), + ("metadata_key", "set_metadata_key", "metadata_key"), + ("pause_key", "set_pause_key", "pause_key"), + ("kyc_key", "set_kyc_key", "kycKey"), + ("fee_schedule_key", "set_fee_schedule_key", "fee_schedule_key"), +]) +def test_single_key_fields(mock_account_ids, key_type, use_private, field_name, setter_name, proto_path): + """Test single key fields with different key types (PrivateKey and PublicKey).""" + operator_id, _, node_account_id, token_id, _ = mock_account_ids + + # Create the key + key = create_key(key_type, use_private) + expected_public_key = get_expected_public_key(key) + + # Create transaction and set the key + tx = TokenUpdateTransaction() + tx.set_token_id(token_id) + getattr(tx, setter_name)(key) + tx.operator_account_id = operator_id + tx.node_account_id = node_account_id + + # Build transaction body + transaction_body = tx.build_transaction_body() + + # Get the proto key from the transaction body + proto_key = getattr(transaction_body.tokenUpdate, proto_path) + + # Verify the proto key matches the expected public key + verify_key_in_proto(proto_key, expected_public_key, key_type) + + +@pytest.mark.parametrize("key_type,use_private", [ + ("ed25519", True), + ("ed25519", False), + ("ecdsa", True), + ("ecdsa", False), +]) +def test_constructor_with_public_key(mock_account_ids, key_type, use_private, new_token_data): + """Test constructor with PublicKey in TokenUpdateKeys.""" + operator_id, _, _, token_id, _ = mock_account_ids + + admin_key = create_key(key_type, use_private) + freeze_key = create_key(key_type, use_private) + expected_admin_public = get_expected_public_key(admin_key) + expected_freeze_public = get_expected_public_key(freeze_key) + + token_keys = TokenUpdateKeys( + admin_key=admin_key, + freeze_key=freeze_key, + ) + + update_tx = TokenUpdateTransaction( + token_id=token_id, + token_keys=token_keys, + ) + + assert update_tx.admin_key == admin_key + assert update_tx.freeze_key == freeze_key + + # Verify keys are correctly stored + update_tx.operator_account_id = operator_id + update_tx.node_account_id = operator_id # Using operator_id as node_account_id for test + transaction_body = update_tx.build_transaction_body() + + verify_key_in_proto(transaction_body.tokenUpdate.adminKey, expected_admin_public, key_type) + verify_key_in_proto(transaction_body.tokenUpdate.freezeKey, expected_freeze_public, key_type) + + +def test_mixed_key_types_in_constructor(mock_account_ids): + """Test constructor with mixed PrivateKey and PublicKey types.""" + operator_id, _, _, token_id, _ = mock_account_ids + + ed25519_private = PrivateKey.generate_ed25519() + ed25519_public = PrivateKey.generate_ed25519().public_key() + ecdsa_private = PrivateKey.generate_ecdsa() + ecdsa_public = PrivateKey.generate_ecdsa().public_key() + + token_keys = TokenUpdateKeys( + admin_key=ed25519_private, + freeze_key=ed25519_public, + wipe_key=ecdsa_private, + supply_key=ecdsa_public, + ) + + tx = TokenUpdateTransaction( + token_id=token_id, + token_keys=token_keys, + ) + tx.operator_account_id = operator_id + tx.node_account_id = operator_id + + transaction_body = tx.build_transaction_body() + + # Verify all keys are correctly converted + assert transaction_body.tokenUpdate.adminKey.ed25519 == ed25519_private.public_key().to_bytes_raw() + assert transaction_body.tokenUpdate.freezeKey.ed25519 == ed25519_public.to_bytes_raw() + assert transaction_body.tokenUpdate.wipeKey.HasField("ECDSA_secp256k1") + assert transaction_body.tokenUpdate.supplyKey.HasField("ECDSA_secp256k1") + + +@pytest.mark.parametrize("key_type,use_private", [ + ("ed25519", True), + ("ed25519", False), + ("ecdsa", True), + ("ecdsa", False), +]) +def test_build_transaction_body_with_keys(mock_account_ids, key_type, use_private, new_token_data): + """Test building transaction body with keys (both PrivateKey and PublicKey).""" + operator_id, _, node_account_id, token_id, _ = mock_account_ids + + admin_key = create_key(key_type, use_private) + freeze_key = create_key(key_type, use_private) + expected_admin_public = get_expected_public_key(admin_key) + expected_freeze_public = get_expected_public_key(freeze_key) + + update_tx = TokenUpdateTransaction(token_id=token_id) + update_tx.set_admin_key(admin_key) + update_tx.set_freeze_key(freeze_key) + update_tx.set_token_name(new_token_data["name"]) + update_tx.operator_account_id = operator_id + update_tx.node_account_id = node_account_id + + transaction_body = update_tx.build_transaction_body() + + assert transaction_body.tokenUpdate.name == new_token_data["name"] + verify_key_in_proto(transaction_body.tokenUpdate.adminKey, expected_admin_public, key_type) + verify_key_in_proto(transaction_body.tokenUpdate.freezeKey, expected_freeze_public, key_type)