diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d4d2a34d..bf6519f05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This changelog is based on [Keep a Changelog](https://keepachangelog.com/en/1.1. ### Added - Added `docs/sdk_developers/pylance.md`, a new guide explaining how to set up and use **Pylance** in VS Code for validating imports, file references, and methods before review. (#713) +- feat: TokenAirdropClaim Transaction, examples (with signing required and not), unit and integration tests (#201) - docs: Add Google-style docstrings to `TokenId` class and its methods in `token_id.py`. - added Google-style docstrings to the `TransactionRecord` class including all dataclass fields, `__repr__`, `_from_proto()` & `_to_proto()` methods. - Standardized docstrings, improved error handling, and updated type hinting (`str | None` to `Optional[str]`) for the `FileId` class (#652). diff --git a/examples/token_airdrop_claim_signature_not_required_auto.py b/examples/token_airdrop_claim_signature_not_required_auto.py new file mode 100644 index 000000000..5e9128872 --- /dev/null +++ b/examples/token_airdrop_claim_signature_not_required_auto.py @@ -0,0 +1,313 @@ +""" +Hedera Token Airdrop Example Script + +This script demonstrates and end-to-end example for an account to automatically (no user action required) claim a set of airdrops. + +Unique configurations of this account: +- 10 auto-association slots. +- Does not require a signature to claim the airdrop. +The Hedera network will auto-associate the token and claim it on airdrop. + +This script demonstrates: +- Setting up a Hedera client +- Creating fungible and NFT tokens +- Creating a receiver account with unique configurations +- Performing token airdrops to the receiver +- Checking balances for verification purposes. + +Run this script using: +uv run examples/token_airdrop_pending_claim_signature_not_required_auto_airdrop.py +python examples/token_airdrop_pending_claim_signature_not_required_auto_airdrop.py +""" +import os +import sys +from typing import Iterable +from dotenv import load_dotenv +from hiero_sdk_python import ( + Client, + Network, + AccountId, + PrivateKey, + AccountCreateTransaction, + TokenCreateTransaction, + TokenMintTransaction, + TokenAirdropTransaction, + TokenType, + SupplyType, + NftId, + CryptoGetAccountBalanceQuery, + ResponseCode, + Hbar, + TokenId, + TokenNftInfoQuery +) + +load_dotenv() + +def setup_client(): + network_name = os.getenv("NETWORK", "testnet") + + # Validate environment variables + if not os.getenv("OPERATOR_ID") or not os.getenv("OPERATOR_KEY"): + print("❌ Missing OPERATOR_ID or OPERATOR_KEY in .env file.") + sys.exit(1) + + try: + network = Network(network_name) + client = Client(network) + + operator_id = AccountId.from_string(os.getenv("OPERATOR_ID", '')) + operator_key = PrivateKey.from_string(os.getenv("OPERATOR_KEY", '')) + client.set_operator(operator_id, operator_key) + + except Exception as e: + raise ConnectionError(f"Error initializing client: {e}") + + print(f"✅ Connected to Hedera {network_name} network as operator: {operator_id}") + return client, operator_id, operator_key + +def create_receiver( + client: Client, + signature_required: bool =False, + max_auto_assoc: int =10 + ): + receiver_key = PrivateKey.generate() + receiver_public_key = receiver_key.public_key() + + try: + receipt = ( + AccountCreateTransaction() + .set_key(receiver_public_key) + .set_initial_balance(Hbar(1)) + .set_receiver_signature_required(signature_required) + .set_max_automatic_token_associations(max_auto_assoc) + .freeze_with(client) + .sign(receiver_key) + .execute(client) + ) + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ Receiver account creation failed: {status_message}") + + receiver_id = receipt.account_id + print( + f"✅ Receiver account {receiver_id} created " + f"(auto-assoc={max_auto_assoc}, sig_required={signature_required})" + ) + return receiver_id, receiver_key + except Exception as e: + raise RuntimeError(f"❌ Error creating receiver account: {e}") from e + + +def create_fungible_token( + client: Client, + operator_id: AccountId, + operator_key: PrivateKey, + name: str ="My Fungible Token", + symbol: str ="MFT", + initial_supply: int =50, + max_supply: int = 1000, + ): + try: + receipt = ( + TokenCreateTransaction() + .set_token_name(name) + .set_token_symbol(symbol) + .set_initial_supply(initial_supply) + .set_token_type(TokenType.FUNGIBLE_COMMON) + .set_supply_type(SupplyType.FINITE) + .set_max_supply(max_supply) + .set_treasury_account_id(operator_id) + .freeze_with(client) + .sign(operator_key) + .execute(client) + ) + token_id = receipt.token_id + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ Fungible token creation failed: {status_message}") + + print(f"✅ Fungible token created: {token_id}") + return token_id + except Exception as e: + raise RuntimeError(f"❌ Error creating fungible token: {e}") from e + + +def create_nft_token( + client: Client, + operator_id: AccountId, + operator_key: PrivateKey, + name: str ="My NFT Token", + symbol: str ="MNT", + max_supply: int = 100 + ): + try: + receipt = ( + TokenCreateTransaction() + .set_token_name(name) + .set_token_symbol(symbol) + .set_initial_supply(0) + .set_token_type(TokenType.NON_FUNGIBLE_UNIQUE) + .set_supply_type(SupplyType.FINITE) + .set_max_supply(max_supply) + .set_treasury_account_id(operator_id) + .set_supply_key(operator_key) + .freeze_with(client) + .sign(operator_key) + .execute(client) + ) + token_id = receipt.token_id + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ NFT token creation failed: {status_message}") + + print(f"✅ NFT token created: {token_id}") + return token_id + except Exception as e: + raise RuntimeError(f"❌ Error creating NFT token: {e}") from e + + +def mint_nft_token( + client: Client, + operator_key: PrivateKey, + nft_token_id: TokenId, + ): + try: + receipt = ( + TokenMintTransaction() + .set_token_id(nft_token_id) + .set_metadata([b"NFT Metadata Example"]) + .freeze_with(client) + .sign(operator_key) + .execute(client) + ) + total_supply = receipt._receipt_proto.newTotalSupply + serial = receipt.serial_numbers[0] + nft_id = NftId(nft_token_id, serial) + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ NFT token mint failed: {status_message}") + + print(f"✅ NFT {nft_token_id} serial {serial} minted with NFT id of {nft_id}. Total NFT supply is {total_supply} ") + return nft_id + except Exception as e: + raise RuntimeError(f"❌ Error minting NFT token: {e}") from e +def log_balances( + client: Client, + operator_id: AccountId, + receiver_id: AccountId, + fungible_ids: Iterable[TokenId], + nft_ids: Iterable[NftId], + prefix: str = "" +): + print(f"\n===== {prefix} Balances =====") + + try: + operator_balance = CryptoGetAccountBalanceQuery().set_account_id(operator_id).execute(client) + receiver_balance = CryptoGetAccountBalanceQuery().set_account_id(receiver_id).execute(client) + except Exception as e: + print(f"❌ Failed to fetch balances: {e}") + return + + def log_fungible(account_id: AccountId, balances: dict, token_ids: Iterable[TokenId]): + print(" Fungible tokens:") + for token_id in token_ids: + print(f" {token_id}: {balances.get(token_id, 0)}") + + def log_nfts(account_id: AccountId, nft_ids: Iterable[NftId]): + print(" NFTs:") + owned = [] + for nft_id in nft_ids: + try: + info = TokenNftInfoQuery().set_nft_id(nft_id).execute(client) + if info.account_id == account_id: + owned.append(str(nft_id)) + except Exception as e: + print(f" ⚠️ Error fetching NFT {nft_id}: {e}") + if owned: + for nft in owned: + print(f" {nft}") + else: + print(" (none)") + + print(f"\nSender ({operator_id}):") + log_fungible(operator_id, dict(operator_balance.token_balances), fungible_ids) + log_nfts(operator_id, nft_ids) + + print(f"\nReceiver ({receiver_id}):") + log_fungible(receiver_id, dict(receiver_balance.token_balances), fungible_ids) + log_nfts(receiver_id, nft_ids) + + print("=============================================\n") + +def perform_airdrop( + client: Client, + operator_id: AccountId, + operator_key: PrivateKey, + receiver_id: AccountId, + fungible_ids: Iterable[TokenId], + nft_ids: Iterable[NftId], + ft_amount: int = 100 + ): + + try: + tx = TokenAirdropTransaction() + + for fungible_id in fungible_ids: + tx.add_token_transfer(fungible_id, operator_id, -ft_amount) + tx.add_token_transfer(fungible_id, receiver_id, ft_amount) + print(f"📤 Transferring {ft_amount} of fungible token {fungible_id} from {operator_id} → {receiver_id}") + + for nft_id in nft_ids: + tx.add_nft_transfer(nft_id, operator_id, receiver_id) + print(f"🎨 Transferring NFT {nft_id} from {operator_id} → {receiver_id}") + + print("\n⏳ Submitting airdrop transaction...") + receipt = tx.freeze_with(client).sign(operator_key).execute(client) + + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"Airdrop transaction failed with status: {status_message}") + + print(f"✅ Airdrop executed successfully! Transaction ID: {receipt.transaction_id}") + + except Exception as e: + print(f"❌ Airdrop failed: {e}") + raise RuntimeError("Airdrop execution failed") from e + +def main(): + # Set up client and return client, operator_id, operator_key + client, operator_id, operator_key = setup_client() + + # Create and return a fungible token to airdrop + print("Create 50 fungible tokens and 1 NFT to airdrop") + fungible_id = create_fungible_token(client, operator_id, operator_key, name="My Fungible Token", symbol="123", initial_supply=50, max_supply = 2000) + + # Create and return an nft token to airdrop + nft_token_id = create_nft_token(client, operator_id, operator_key, name="My NFT Token", symbol = "MNFT", max_supply=1000) + + # Mint and return an nft to airdrop + nft_serial = mint_nft_token(client, operator_key, nft_token_id) + + # Create a receiver that will test no signature is required to claim the auto-airdrop + # Ensure false for signature required + # Assume 10 max association slots + # Return the receiver id and receiver private key + print("Creating the account that will automatically receive the airdropped tokens") + receiver_id, receiver_key = create_receiver(client, False, 10) + + # Check pre-airdrop balances + print("\n🔍 Verifying sender has tokens to airdrop and receiver neither:") + log_balances(client, operator_id, receiver_id, [fungible_id], [nft_serial], prefix="Before airdrop") + + # Initiate airdrop of 20 fungible tokens and 1 nft token id + perform_airdrop(client, operator_id, operator_key, receiver_id, [fungible_id], [nft_serial], 20) + + print("\n🔍 Verifying receiver has received airdrop contents automatically and sender has sent:") + log_balances(client, operator_id, receiver_id, [fungible_id], [nft_serial], prefix="After airdrop") + + print("✅ Auto-association successful: Receiver accepted airdropped tokens without pre-association.") + print("✅ Airdrop successful: Receiver accepted new fungible tokens without pre-association.") + +if __name__ == "__main__": + main() diff --git a/examples/token_airdrop_claim_signature_required.py b/examples/token_airdrop_claim_signature_required.py new file mode 100644 index 000000000..f732fa15d --- /dev/null +++ b/examples/token_airdrop_claim_signature_required.py @@ -0,0 +1,484 @@ +""" +Hedera Token Airdrop Example Script + +This script demonstrates and end-to-end example for an account to claim a set of airdrops. + +Unique configurations of this account: +- 0 auto-association slots. +- Has no tokens associated before the claiming of the airdrop. +- Requires a signature to claim the airdrop. +Token airdrop claim will work despite no associations as the Hedera network will complete that step. + +This script demonstrates: +- Setting up a Hedera client +- Creating fungible and NFT tokens +- Creating a receiver account with signature required and 0 auto-association slots +- Performing token airdrops to the receiver +- Fetching and claiming pending airdrops +- Checking balances and token association statuses for verification purposes. + +Run this script using: +uv run examples/token_airdrop_pending_claim_signature_required.py +python examples/token_airdrop_pending_claim_signature_required.py +""" + +import os +import sys +from typing import Iterable, List, Dict +from dotenv import load_dotenv + +from hiero_sdk_python import ( + Client, + Network, + AccountId, + PrivateKey, + AccountCreateTransaction, + TokenCreateTransaction, + TokenMintTransaction, + TokenAirdropTransaction, + TokenType, + SupplyType, + NftId, + CryptoGetAccountBalanceQuery, + ResponseCode, + Hbar, + TokenId, + TokenNftInfoQuery, + TokenClaimAirdropTransaction, + PendingAirdropId, + TransactionRecordQuery, + TransactionRecord, + TransactionId +) + +load_dotenv() + +def setup_client(): + network_name = os.getenv("NETWORK", "testnet") + + # Validate environment variables + if not os.getenv("OPERATOR_ID") or not os.getenv("OPERATOR_KEY"): + print("❌ Missing OPERATOR_ID or OPERATOR_KEY in .env file.") + sys.exit(1) + + try: + network = Network(network_name) + client = Client(network) + + operator_id = AccountId.from_string(os.getenv("OPERATOR_ID", '')) + operator_key = PrivateKey.from_string(os.getenv("OPERATOR_KEY", '')) + client.set_operator(operator_id, operator_key) + + except Exception as e: + raise ConnectionError(f'Error initializing client: {e}') from e + + print(f"✅ Connected to Hedera {network_name} network as operator: {operator_id}") + return client, operator_id, operator_key + +def create_receiver( + client: Client, + signature_required: bool =True, + max_auto_assoc: int = 0 + ): + receiver_key = PrivateKey.generate() + receiver_public_key = receiver_key.public_key() + + try: + receipt = ( + AccountCreateTransaction() + .set_key(receiver_public_key) + .set_initial_balance(Hbar(1)) + .set_receiver_signature_required(signature_required) + .set_max_automatic_token_associations(max_auto_assoc) + .freeze_with(client) + .sign(receiver_key) + .execute(client) + ) + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ Receiver account creation failed: {status_message}") + + receiver_id = receipt.account_id + print( + f"✅ Receiver account {receiver_id} created " + f"(auto-assoc={max_auto_assoc}, sig_required={signature_required})" + ) + return receiver_id, receiver_key + except Exception as e: + raise RuntimeError(f"❌ Error creating receiver account: {e}") from e + +def create_fungible_token( + client: Client, + operator_id: AccountId, + operator_key: PrivateKey, + name: str ="My Fungible Token", + symbol: str ="MFT", + initial_supply: int =50, + max_supply: int = 1000, + ): + try: + receipt = ( + TokenCreateTransaction() + .set_token_name(name) + .set_token_symbol(symbol) + .set_initial_supply(initial_supply) + .set_token_type(TokenType.FUNGIBLE_COMMON) + .set_supply_type(SupplyType.FINITE) + .set_max_supply(max_supply) + .set_treasury_account_id(operator_id) + .freeze_with(client) + .sign(operator_key) + .execute(client) + ) + token_id = receipt.token_id + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ Fungible token creation failed: {status_message}") + + print(f"✅ Fungible token created: {token_id}") + return token_id + except Exception as e: + raise RuntimeError(f"❌ Error creating fungible token: {e}") from e + +def create_nft_token( + client: Client, + operator_id: AccountId, + operator_key: PrivateKey, + name: str ="My NFT Token", + symbol: str ="MNT", + max_supply: int = 100 + ): + try: + receipt = ( + TokenCreateTransaction() + .set_token_name(name) + .set_token_symbol(symbol) + .set_initial_supply(0) + .set_token_type(TokenType.NON_FUNGIBLE_UNIQUE) + .set_supply_type(SupplyType.FINITE) + .set_max_supply(max_supply) + .set_treasury_account_id(operator_id) + .set_supply_key(operator_key) + .freeze_with(client) + .sign(operator_key) + .execute(client) + ) + token_id = receipt.token_id + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ NFT token creation failed: {status_message}") + + print(f"✅ NFT token created: {token_id}") + return token_id + except Exception as e: + raise RuntimeError(f"❌ Error creating NFT token: {e}") from e + +def mint_nft_token( + client: Client, + operator_key: PrivateKey, + nft_token_id: TokenId, + ): + try: + receipt = ( + TokenMintTransaction() + .set_token_id(nft_token_id) + .set_metadata([b"NFT Metadata Example"]) + .freeze_with(client) + .sign(operator_key) + .execute(client) + ) + total_supply = receipt._receipt_proto.newTotalSupply + serial = receipt.serial_numbers[0] + nft_id = NftId(nft_token_id, serial) + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ NFT token mint failed: {status_message}") + + print(f"✅ NFT {nft_token_id} serial {serial} minted with NFT id of {nft_id}.") + print(f" Total NFT supply is {total_supply}") + return nft_id + except Exception as e: + raise RuntimeError(f"❌ Error minting NFT token: {e}") from e + +def get_token_association_status( + client: Client, + receiver_id: AccountId, + token_ids: List[TokenId] + ) -> Dict[TokenId, bool]: + try: + # Query the receiver's balance, which includes token associations + balance = CryptoGetAccountBalanceQuery()\ + .set_account_id(receiver_id)\ + .execute(client) + + associated_tokens = set(balance.token_balances.keys()) + association_status = {token_id: token_id in associated_tokens for token_id in token_ids} + + print(f"✅ Association status for account {receiver_id}:") + for tid, associated in association_status.items(): + print(f" Token {tid}: {'Associated' if associated else 'Not Associated'}") + + return association_status + + except Exception as e: + print(f"❌ Failed to fetch token associations for account {receiver_id}: {e}") + return {token_id: False for token_id in token_ids} +def log_fungible_balances(account_id: AccountId, balances: dict, token_ids: Iterable[TokenId]): + print(" Fungible tokens:") + for token_id in token_ids: + amount = balances.get(token_id, 0) + print(f" {token_id}: {amount}") + + +def log_nft_balances(client: Client, account_id: AccountId, nft_ids: Iterable[NftId]): + print(" NFTs:") + owned_nfts = [] + for nft_id in nft_ids: + try: + info = TokenNftInfoQuery().set_nft_id(nft_id).execute(client) + if info.account_id == account_id: + owned_nfts.append(str(nft_id)) + except Exception as e: + print(f" ⚠️ Error fetching NFT {nft_id}: {e}") + + if owned_nfts: + for nft in owned_nfts: + print(f" {nft}") + else: + print(" (none)") + + +def log_balances( + client: Client, + operator_id: AccountId, + receiver_id: AccountId, + fungible_ids: Iterable[TokenId], + nft_ids: Iterable[NftId], + prefix: str = "" +): + print(f"\n===== {prefix} Balances =====") + + try: + operator_balance = CryptoGetAccountBalanceQuery().set_account_id(operator_id).execute(client) + receiver_balance = CryptoGetAccountBalanceQuery().set_account_id(receiver_id).execute(client) + except Exception as e: + print(f"❌ Failed to fetch balances: {e}") + return + + operator_balances = dict(operator_balance.token_balances) + receiver_balances = dict(receiver_balance.token_balances) + + # ------------------------------ + # SENDER BALANCES + # ------------------------------ + print(f"\nSender ({operator_id}):") + log_fungible_balances(operator_id, operator_balances, fungible_ids) + log_nft_balances(client, operator_id, nft_ids) + + # ------------------------------ + # RECEIVER BALANCES + # ------------------------------ + print(f"\nReceiver ({receiver_id}):") + log_fungible_balances(receiver_id, receiver_balances, fungible_ids) + log_nft_balances(client, receiver_id, nft_ids) + + print("=============================================\n") + +def perform_airdrop( + client: Client, + operator_id: AccountId, + operator_key: PrivateKey, + receiver_id: AccountId, + fungible_ids: Iterable[TokenId], + nft_ids: Iterable[NftId], + ft_amount: int = 100 + ): + + try: + tx = TokenAirdropTransaction() + + for fungible_id in fungible_ids: + tx.add_token_transfer(fungible_id, operator_id, -ft_amount) + tx.add_token_transfer(fungible_id, receiver_id, ft_amount) + print(f"📤 Transferring {ft_amount} of fungible token {fungible_id}") + print(f" from {operator_id} → {receiver_id}") + + for nft_id in nft_ids: + tx.add_nft_transfer(nft_id, operator_id, receiver_id) + print(f"🎨 Transferring NFT {nft_id} from {operator_id} → {receiver_id}") + + print("\n⏳ Submitting airdrop transaction...") + receipt = tx.freeze_with(client).sign(operator_key).execute(client) + + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"Airdrop transaction failed with status: {status_message}") + + transaction_id = receipt.transaction_id + print(f"✅ Airdrop executed successfully! Transaction ID: {transaction_id}") + + return transaction_id + except Exception as e: + print(f"❌ Airdrop failed: {e}") + raise RuntimeError("Airdrop execution failed") from e + +def fetch_pending_airdrops( + client: Client, + transaction_id: TransactionId + ) -> List[PendingAirdropId]: + """ + Retrieve all pending airdrop IDs generated by a specific transaction. + + Executes a `TransactionRecordQuery` to inspect the transaction record and + extract any newly created `PendingAirdropId` objects from the + `new_pending_airdrops` field. + """ + try: + record: TransactionRecord = TransactionRecordQuery(transaction_id).execute(client) + pending_airdrops = record.new_pending_airdrops # List of PendingAirdropRecord + + pending_airdrop_ids = [p.pending_airdrop_id for p in pending_airdrops] + + print(f"✅ Found {len(pending_airdrop_ids)} pending airdrops") + for pid in pending_airdrop_ids: + print(" →", pid) + + return pending_airdrop_ids + + except Exception as e: + print(f"❌ Failed to fetch pending airdrops for transaction {transaction_id}: {e}") + return [] + +def claim_airdrop( + client: Client, + receiver_key: PrivateKey, + pending_airdrops: List[PendingAirdropId] + ): + """ + Claims one or more pending airdrops on behalf of the receiver. + + This function builds and executes a TokenClaimAirdropTransaction, which + must be signed by the receiver. It uses `get_pending_airdrop_ids()` to + safely retrieve and display the list of pending airdrop IDs before execution + """ + try: + tx = ( + TokenClaimAirdropTransaction() + .add_pending_airdrop_ids(pending_airdrops) + .freeze_with(client) + .sign(receiver_key) # Signing with receiver is required + ) + print(f"{tx}") + + receipt = tx.execute(client) + + if receipt.status != ResponseCode.SUCCESS: + status_message = ResponseCode(receipt.status).name + raise RuntimeError(f"❌ Airdrop claim failed: {status_message}") + + print(f"✅ airdrop claimed") + return receipt + except Exception as e: + raise RuntimeError(f"❌ Error claiming airdrop: {e}") from e + +def main(): + # Set up client and return client, operator_id, operator_key + client, operator_id, operator_key = setup_client() + + # Create and return a fungible token to airdrop + print("Create 50 fungible tokens and 1 NFT to airdrop") + fungible_id = create_fungible_token( + client, + operator_id, + operator_key, + name="My Fungible Token", + symbol="123", + initial_supply=50, + max_supply = 2000 + ) + + # Create and return an nft token to airdrop + nft_token_id = create_nft_token( + client, + operator_id, + operator_key, + name="My NFT Token", + symbol = "MNFT", + max_supply=1000 + ) + + # Mint and return an nft to airdrop + nft_serial = mint_nft_token( + client, + operator_key, + nft_token_id + ) + + # Create a receiver that will require signature to claim the airdrop + # Ensure true for signature required (for the receiver) + # 0 max association slots + # Return the receiver id and receiver private key + print("Creating the account that will receive the airdropped tokens on signing") + receiver_id, receiver_key = create_receiver( + client, + True, + 0 + ) + + # Verify this receiver does NOT have any of the fungible or NFT tokens associated + # Claim airdrop will work regardless + token_ids_to_check = [fungible_id, nft_token_id] + association_status = get_token_association_status( + client, + receiver_id, + token_ids_to_check + ) + print(association_status) + + # Check pre-airdrop balances + print("\n🔍 Verifying sender has tokens to airdrop and receiver neither:") + log_balances( + client, + operator_id, + receiver_id, + [fungible_id], + [nft_serial], + prefix="Before airdrop" + ) + + # Initiate airdrop of 20 fungible tokens and 1 nft token id + transaction_id = perform_airdrop( + client, + operator_id, + operator_key, + receiver_id, + [fungible_id], + [nft_serial], + 20 + ) + + print("\n🔍 Verifying no balance change as airdrop is not yet claimed:") + log_balances(client,operator_id,receiver_id,[fungible_id],[nft_serial],prefix="After airdrop") + + # Get a list of pending airdrops + pending_airdrop_ids = fetch_pending_airdrops(client, transaction_id) + print(pending_airdrop_ids) + + # Claim this list of pending airdrops + # Note that we are signing with the receiver key which is required as was set to True + # Claiming will work even without association and available auto-association slots + # This is because the signing itself causes the Hedera network to associate the tokens. + print("Claiming airdrop:") + claim_airdrop(client,receiver_key,pending_airdrop_ids) #Pass the receiver key which is needed to sign + + # Check airdrop has resulted in transfers + print("\n🔍 Verifying balances have now changed after claim:") + log_balances(client,operator_id,receiver_id,[fungible_id],[nft_serial],prefix="After claim") + + # Check Hedera network has associated these tokens behind the scenes + token_ids_to_check = [fungible_id, nft_token_id] + association_status = get_token_association_status(client,receiver_id,token_ids_to_check) + print(association_status) + +if __name__ == "__main__": + main() diff --git a/src/hiero_sdk_python/__init__.py b/src/hiero_sdk_python/__init__.py index b908328ea..875f171a9 100644 --- a/src/hiero_sdk_python/__init__.py +++ b/src/hiero_sdk_python/__init__.py @@ -49,6 +49,7 @@ from .tokens.hbar_transfer import HbarTransfer from .tokens.token_unpause_transaction import TokenUnpauseTransaction from .tokens.token_pause_transaction import TokenPauseTransaction +from .tokens.token_airdrop_claim import TokenClaimAirdropTransaction # Transaction from .transaction.transaction import Transaction @@ -181,6 +182,7 @@ "TokenUpdateTransaction", "TokenAirdropTransaction", "TokenCancelAirdropTransaction", + "TokenClaimAirdropTransaction", "PendingAirdropId", "PendingAirdropRecord", "TokenType", diff --git a/src/hiero_sdk_python/account/account_create_transaction.py b/src/hiero_sdk_python/account/account_create_transaction.py index c6f391ec4..c22dbf7b1 100644 --- a/src/hiero_sdk_python/account/account_create_transaction.py +++ b/src/hiero_sdk_python/account/account_create_transaction.py @@ -31,6 +31,7 @@ def __init__( receiver_signature_required: Optional[bool] = None, auto_renew_period: Optional[Duration] = AUTO_RENEW_PERIOD, memo: Optional[str] = None, + max_automatic_token_associations: Optional[int] = 0 ) -> None: """ Initializes a new AccountCreateTransaction instance with default values @@ -49,6 +50,7 @@ def __init__( self.receiver_signature_required: Optional[bool] = receiver_signature_required self.auto_renew_period: Optional[Duration] = auto_renew_period self.account_memo: Optional[str] = memo + self.max_automatic_token_associations: Optional[int] = max_automatic_token_associations self._default_transaction_fee = DEFAULT_TRANSACTION_FEE def set_key(self, key: PublicKey) -> "AccountCreateTransaction": @@ -130,6 +132,14 @@ def set_account_memo(self, memo: str) -> "AccountCreateTransaction": self.account_memo = memo return self + def set_max_automatic_token_associations(self, max_assoc: int) -> "AccountCreateTransaction": + """Sets the maximum number of automatic token associations for the account.""" + self._require_not_frozen() + if max_assoc < 0: + raise ValueError("max_automatic_token_associations must be a non-negative integer.") + self.max_automatic_token_associations = max_assoc + return self + def _build_proto_body(self): """ Returns the protobuf body for the account create transaction. @@ -157,6 +167,7 @@ def _build_proto_body(self): receiverSigRequired=self.receiver_signature_required, autoRenewPeriod=duration_pb2.Duration(seconds=self.auto_renew_period.seconds), memo=self.account_memo, + max_automatic_token_associations=self.max_automatic_token_associations ) def build_transaction_body(self) -> transaction_pb2.TransactionBody: diff --git a/src/hiero_sdk_python/tokens/token_airdrop_claim.py b/src/hiero_sdk_python/tokens/token_airdrop_claim.py new file mode 100644 index 000000000..554e4baa8 --- /dev/null +++ b/src/hiero_sdk_python/tokens/token_airdrop_claim.py @@ -0,0 +1,210 @@ +""" +Defines TokenClaimAirdropTransaction for claiming 1–10 unique pending airdrops +using Hedera's TokenClaimAirdropTransactionBody. + +Validations enforced: +- 1 ≤ pending_airdrop_ids ≤ 10 +- No duplicate PendingAirdropId entries +""" + +from typing import Optional, List, Any + +from hiero_sdk_python.transaction.transaction import Transaction +from hiero_sdk_python.tokens.token_airdrop_pending_id import PendingAirdropId +from hiero_sdk_python.hapi.services.token_claim_airdrop_pb2 import ( # pylint: disable=no-name-in-module + TokenClaimAirdropTransactionBody, +) +from hiero_sdk_python.hapi.services import transaction_pb2 +from hiero_sdk_python.channels import _Channel +from hiero_sdk_python.executable import _Method + + +class TokenClaimAirdropTransaction(Transaction): + """Claim 1–10 unique pending airdrops via TokenClaimAirdropTransactionBody. + TokenClaimAirdropTransaction is only required if the receiver account has required signing to claim airdrops. + This transaction MUST be signed by the receiver for each PendingAirdropId to claim. + """ + + MAX_IDS: int = 10 + MIN_IDS: int = 1 + + def __init__( + self, + pending_airdrop_ids: Optional[List[PendingAirdropId]] = None + ) -> None: + """Initialize the TokenClaimAirdropTransaction. + + Args: + pending_airdrop_ids: Optional list of pending airdrop IDs. + """ + super().__init__() + self._pending_airdrop_ids: List[PendingAirdropId] = list(pending_airdrop_ids or []) + + def _validate_all(self, ids: List[PendingAirdropId]) -> None: + """Validate a candidate list of pending airdrop IDs. + + Ensures the list contains no more than ``MAX_IDS`` entries and has no + duplicates. + + Args: + ids: Pending-airdrop IDs to validate. + + Raises: + ValueError: If more than ``MAX_IDS`` IDs are provided or duplicates + are detected. + """ + n = len(ids) + if n > self.MAX_IDS: + raise ValueError(f"Up to {self.MAX_IDS} airdrops can be claimed at once (got {n}).") + # Don't enforce MIN here—only enforce at build/serialize time + if len(set(ids)) != n: + raise ValueError("Duplicate airdrop IDs are not allowed.") + + def _validate_final(self) -> None: + """Validate the transaction's final pending-airdrop ID list. + + Checks that at least ``MIN_IDS`` IDs are present, and re-runs the global + validations (max count and no duplicates). + + Raises: + ValueError: If fewer than ``MIN_IDS`` IDs are present or if the + subsequent global validations fail. + """ + # Called right before build/serialize as pending airdrops are currenly optional + n = len(self._pending_airdrop_ids) + if n < self.MIN_IDS: + raise ValueError(f"You must claim at least {self.MIN_IDS} airdrop (got {n}).") + self._validate_all(self._pending_airdrop_ids) + + def add_pending_airdrop_id( + self, + pending_airdrop_id: PendingAirdropId + ) -> "TokenClaimAirdropTransaction": + """Append a single PendingAirdropId. + + Args: + pending_airdrop_id: The pending airdrop ID to add. + + Returns: + TokenClaimAirdropTransaction: self (for chaining). + """ + return self.add_pending_airdrop_ids([pending_airdrop_id]) + + def add_pending_airdrop_ids( + self, + pending_airdrop_ids: List[PendingAirdropId] + ) -> "TokenClaimAirdropTransaction": + """Add many pending airdrop IDs. + + Args: + pending_airdrop_ids: Additional list of pending airdrop IDs. + + Returns: + TokenClaimAirdropTransaction: self (for chaining). + + Raises: + ValueError: If the resulting list exceeds ``MAX_IDS`` or contains duplicates. + """ + self._require_not_frozen() + candidate = self._pending_airdrop_ids + list(pending_airdrop_ids) # extend list + self._validate_all(candidate) # enforce MAX and no dups + self._pending_airdrop_ids = candidate + return self + + def _pending_airdrop_ids_to_proto(self) -> List[Any]: + """Convert the current list of PendingAirdropId to protobuf messages. + + Returns: + List[Any]: The protobuf representations of the pending airdrop IDs. + """ + return [ + airdrop._to_proto() # type: ignore[reportPrivateUsage] # pylint: disable=protected-access + for airdrop in self._pending_airdrop_ids + ] + + @classmethod + def _from_proto( + cls, + proto: TokenClaimAirdropTransactionBody + ) -> "TokenClaimAirdropTransaction": + """Construct a TokenClaimAirdropTransaction from a TokenClaimAirdropTransactionBody. + + Args: + proto: The protobuf message to read from. + + Returns: + TokenClaimAirdropTransaction: A new transaction instance loaded from the protobuf. + + Raises: + ValueError: If the decoded IDs violate validation rules. + """ + pending_airdrops = [ + PendingAirdropId._from_proto(airdrop) # type: ignore[reportPrivateUsage] # pylint: disable=protected-access + for airdrop in proto.pending_airdrops + ] + inst = cls(pending_airdrop_ids=pending_airdrops) + inst._validate_all(inst._pending_airdrop_ids) # enforce max and no duplicates immediately + + return inst + + def build_transaction_body(self) -> transaction_pb2.TransactionBody: # pylint: disable=no-member + """Build the TransactionBody for this claim. + + Returns: + transaction_body_pb2.TransactionBody: + A TransactionBody with TokenClaimAirdrop populated. + + Raises: + ValueError: If validation fails. + """ + self._validate_final() + + pending_airdrop_claim_body = TokenClaimAirdropTransactionBody( + pending_airdrops=self._pending_airdrop_ids_to_proto() + ) + transaction_body: transaction_pb2.TransactionBody = self.build_base_transaction_body() # pylint: disable=no-member + transaction_body.tokenClaimAirdrop.CopyFrom(pending_airdrop_claim_body) + return transaction_body + + def _get_method(self, channel: _Channel) -> _Method: + """ + Returns the gRPC method used to claim pending token airdrops. + + Args: + channel: The channel with service stubs. + + Returns: + _Method: Wraps the gRPC method for TokenClaimAirdrop. + """ + return _Method( + transaction_func=channel.token.claimAirdrop, + query_func=None + ) + + def get_pending_airdrop_ids(self) -> List[PendingAirdropId]: + """Returns a copy of the list of pending airdrop IDs currently stored inside TokenClaimAirdropTransaction object""" + return list(self._pending_airdrop_ids) + + def __repr__(self) -> str: + """Developer-friendly representation with class name and pending IDs.""" + return ( + f"{self.__class__.__name__}(" + f"pending_airdrop_ids={self._pending_airdrop_ids!r})" + ) + + def __str__(self) -> str: + """Human-readable summary showing each pending airdrop on its own line""" + if not self._pending_airdrop_ids: + return "No pending airdrops in this transaction." + + lines = [ + f" → {aid}" + for aid in self._pending_airdrop_ids + ] + + ids_block = "\n".join(lines) + summary = ( + f"Pending Airdrops to claim:\n" + f"{ids_block}\n" + ) + return summary \ No newline at end of file diff --git a/src/hiero_sdk_python/tokens/token_airdrop_pending_id.py b/src/hiero_sdk_python/tokens/token_airdrop_pending_id.py index 036c8ed2c..2798bda7b 100644 --- a/src/hiero_sdk_python/tokens/token_airdrop_pending_id.py +++ b/src/hiero_sdk_python/tokens/token_airdrop_pending_id.py @@ -1,3 +1,18 @@ +""" +PendingAirdropId module. + + +Defines the PendingAirdropId class used to uniquely identify a specific pending token airdrop. + + +A PendingAirdropId acts as a reference to a single unclaimed token or NFT transfer, +typically created by an airdrop transaction. It is required when constructing a +TokenClaimAirdropTransaction to finalize and receive the associated asset. + + +This class supports safe construction, validation, and conversion to/from protobuf +for use within the Hiero SDK. +""" from typing import Optional from hiero_sdk_python.account.account_id import AccountId from hiero_sdk_python.hapi.services import basic_types_pb2 @@ -21,7 +36,7 @@ def __init__(self, sender_id: AccountId, receiver_id: AccountId, token_id: Optio """ if (token_id is None) == (nft_id is None): raise ValueError("Exactly one of 'token_id' or 'nft_id' must be required.") - + self.sender_id = sender_id self.receiver_id = receiver_id self.token_id = token_id @@ -39,12 +54,13 @@ def _from_proto(cls, proto: basic_types_pb2.PendingAirdropId) -> "PendingAirdrop Returns: PendingAirdropId: A new PendingAirdropId instance populated with data from the protobuf message. """ + fungible_token_type = None - if (proto.HasField("fungible_token_type")): + if proto.HasField("fungible_token_type"): fungible_token_type = TokenId._from_proto(proto.fungible_token_type) non_fungible_token = None - if (proto.HasField("non_fungible_token")): + if proto.HasField("non_fungible_token"): non_fungible_token = NftId._from_proto(proto.non_fungible_token) return cls( @@ -53,7 +69,7 @@ def _from_proto(cls, proto: basic_types_pb2.PendingAirdropId) -> "PendingAirdrop token_id=fungible_token_type, nft_id=non_fungible_token ) - + def _to_proto(self) -> basic_types_pb2.PendingAirdropId: """ Converts this PendingAirdropId instance to its protobuf representation. @@ -62,11 +78,11 @@ def _to_proto(self) -> basic_types_pb2.PendingAirdropId: basic_types_pb2.PendingAirdropId: The protobuf representation of the PendingAirdropId. """ fungible_token_type = None - if (self.token_id): + if self.token_id: fungible_token_type = self.token_id._to_proto() non_fungible_token = None - if (self.nft_id): + if self.nft_id: non_fungible_token = self.nft_id._to_proto() return basic_types_pb2.PendingAirdropId( @@ -75,9 +91,43 @@ def _to_proto(self) -> basic_types_pb2.PendingAirdropId: fungible_token_type=fungible_token_type, non_fungible_token=non_fungible_token ) - - def __str__(self): + + def __str__(self) -> str: + """Readable summary""" + if self.nft_id: + return ( + f"PendingAirdropId(" + f"sender_id={self.sender_id}, receiver_id={self.receiver_id}, " + f"nft_id={self.nft_id})" + ) + return ( + f"PendingAirdropId(" + f"sender_id={self.sender_id}, receiver_id={self.receiver_id}, " + f"token_id={self.token_id}, nft_id=None)" + ) + + def __repr__(self) -> str: + """Developer-friendly representation.""" + return ( + f"PendingAirdropId(" + f"sender_id={self.sender_id}, " + f"receiver_id={self.receiver_id}, " + f"token_id={self.token_id}, " + f"nft_id={self.nft_id})" + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, PendingAirdropId): + return NotImplemented + return ( + self.sender_id == other.sender_id and + self.receiver_id == other.receiver_id and + self.token_id == other.token_id and + self.nft_id == other.nft_id + ) + + def __hash__(self) -> int: """ - Returns a string representation of this PendingAirdropId instance. + Returns a hash value so this object can be used in sets and as dictionary keys. """ - return f'PendingAirdropId(sender_id={self.sender_id}, receiver_id={self.receiver_id}, token_id={self.token_id}, nft_id={self.nft_id})' \ No newline at end of file + return hash((self.sender_id, self.receiver_id, self.token_id, self.nft_id)) \ No newline at end of file diff --git a/tests/integration/token_airdrop_transaction_claim_e2e.py b/tests/integration/token_airdrop_transaction_claim_e2e.py new file mode 100644 index 000000000..c5da21254 --- /dev/null +++ b/tests/integration/token_airdrop_transaction_claim_e2e.py @@ -0,0 +1,206 @@ +import pytest +from hiero_sdk_python.response_code import ResponseCode +from hiero_sdk_python.crypto.private_key import PrivateKey +from hiero_sdk_python.account.account_update_transaction import AccountUpdateTransaction +from hiero_sdk_python.tokens.token_associate_transaction import TokenAssociateTransaction +from hiero_sdk_python.tokens.token_airdrop_transaction import TokenAirdropTransaction +from hiero_sdk_python.tokens.token_transfer import TokenTransfer +from hiero_sdk_python.tokens.token_airdrop_pending_id import PendingAirdropId +from hiero_sdk_python.tokens.token_airdrop_claim import TokenClaimAirdropTransaction +from hiero_sdk_python.account.account_id import AccountId +from hiero_sdk_python.tokens.token_id import TokenId +from hiero_sdk_python.query.transaction_record_query import TransactionRecordQuery +from tests.integration.utils_for_test import env, create_fungible_token, create_nft_token + +pytestmark = pytest.mark.integration + +# ====================== +# --- Inline Helpers --- +# ====================== + +def set_receiver_signature_required(env, account_id: AccountId, account_key: PrivateKey, required: bool): + receipt = ( + AccountUpdateTransaction() + .set_account_id(account_id) + .set_receiver_signature_required(required) + .freeze_with(env.client) + .sign(account_key) + .execute(env.client) + ) + assert receipt.status == ResponseCode.SUCCESS, f"Account update failed: {receipt.status}" + +def associate_token(env, account_id: AccountId, account_key: PrivateKey, token_id: TokenId): + receipt = ( + TokenAssociateTransaction() + .set_account_id(account_id) + .add_token_id(token_id) + .freeze_with(env.client) + .sign(account_key) + .execute(env.client) + ) + assert receipt.status == ResponseCode.SUCCESS, f"Association failed: {receipt.status}" + +def submit_airdrop(env, receiver_id: AccountId, token_id: TokenId, amount=1): + tx = TokenAirdropTransaction( + token_transfers=[ + TokenTransfer(token_id, env.operator_id, -amount), + TokenTransfer(token_id, receiver_id, amount), + ] + ).freeze_with(env.client).sign(env.operator_key).execute(env.client) + assert tx.status == ResponseCode.SUCCESS, f"Airdrop failed: {tx.status}" + + record = TransactionRecordQuery(tx.transaction_id).execute(env.client) + assert record is not None + return record + +def has_immediate_credit(record, token_id: TokenId, account_id: AccountId, amount=1): + token_map = record.token_transfers.get(token_id, {}) + return token_map.get(account_id, 0) == amount + +def has_new_pending(record): + return bool(record.new_pending_airdrops) + +def extract_pending_ids(record): + ids = [] + for item in record.new_pending_airdrops: + # If it's already a PendingAirdropId object, just append + if isinstance(item, PendingAirdropId): + ids.append(item) + else: + # Attempt to extract the protobuf field + pid_proto = getattr(item, "pending_airdrop_id", None) + if pid_proto is None and hasattr(item, "_to_proto"): + pid_proto = item._to_proto().pending_airdrop_id + + if pid_proto is None: + raise AssertionError(f"Cannot extract pending_airdrop_id from {type(item)}") + + ids.append(PendingAirdropId._from_proto(pid_proto)) + return ids + +def claim_pending(env, pending_ids, receiver_key): + tx = TokenClaimAirdropTransaction().add_pending_airdrop_ids(pending_ids) + tx.freeze_with(env.client).sign(receiver_key) + return tx.execute(env.client) + +# ====================== +# --- Integration Tests --- +# ====================== + +def test_immediate_airdrop_if_associated_and_no_sig_required(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + set_receiver_signature_required(env, receiver.id, receiver.key, False) + associate_token(env, receiver.id, receiver.key, token_id) + + record = submit_airdrop(env, receiver.id, token_id) + assert not has_new_pending(record) + assert has_immediate_credit(record, token_id, receiver.id) + +def test_pending_airdrop_if_unassociated_and_no_sig_required(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + set_receiver_signature_required(env, receiver.id, receiver.key, False) + record = submit_airdrop(env, receiver.id, token_id) + assert has_new_pending(record) + assert not has_immediate_credit(record, token_id, receiver.id) + +def test_pending_airdrop_if_sig_required_even_if_associated(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + set_receiver_signature_required(env, receiver.id, receiver.key, True) + associate_token(env, receiver.id, receiver.key, token_id) + record = submit_airdrop(env, receiver.id, token_id) + assert has_new_pending(record) + assert not has_immediate_credit(record, token_id, receiver.id) + +def test_claim_fungible_pending(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + record = submit_airdrop(env, receiver.id, token_id) + ids = extract_pending_ids(record) + receipt = claim_pending(env, ids, receiver.key) + assert ResponseCode(receipt.status) == ResponseCode.SUCCESS + + claim_record = TransactionRecordQuery(receipt.transaction_id).execute(env.client) + assert has_immediate_credit(claim_record, token_id, receiver.id) + +def test_claim_multiple_pendings(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + rec1 = submit_airdrop(env, receiver.id, token_id) + rec2 = submit_airdrop(env, receiver.id, token_id) + + ids = extract_pending_ids(rec1) + extract_pending_ids(rec2) + seen = set() + unique_ids = [] + for pid in ids: + key = (pid.token_id, pid.nft_id) + if key not in seen: + seen.add(key) + unique_ids.append(pid) + + receipt = claim_pending(env, unique_ids, receiver.key) + assert ResponseCode(receipt.status) == ResponseCode.SUCCESS + + +def test_claim_fails_without_signature(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + record = submit_airdrop(env, receiver.id, token_id) + ids = extract_pending_ids(record) + + tx = TokenClaimAirdropTransaction().add_pending_airdrop_ids(ids).freeze_with(env.client) + # Execute without signing + receipt = tx.execute(env.client) + + # The status should indicate failure + assert receipt.status != ResponseCode.SUCCESS + + +def test_cannot_claim_duplicate_ids(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + record = submit_airdrop(env, receiver.id, token_id) + pid = extract_pending_ids(record)[0] + + tx = TokenClaimAirdropTransaction() + with pytest.raises(ValueError): + tx.add_pending_airdrop_ids([pid, pid]) + +def test_cannot_claim_same_pending_twice(env): + receiver = env.create_account(initial_hbar=2.0) + token_id = create_fungible_token(env) + + record = submit_airdrop(env, receiver.id, token_id) + ids = extract_pending_ids(record) + + # First claim should succeed + first = claim_pending(env, ids, receiver.key) + assert ResponseCode(first.status) == ResponseCode.SUCCESS + + # Second claim should fail (already claimed) + second = claim_pending(env, ids, receiver.key) + assert ResponseCode(second.status) != ResponseCode.SUCCESS + +def test_claim_max_ids(env): + receiver = env.create_account(initial_hbar=2.0) + + tokens = [create_fungible_token(env) for _ in range(TokenClaimAirdropTransaction.MAX_IDS)] + + pending_ids = [] + for token_id in tokens: + record = submit_airdrop(env, receiver.id, token_id) + pending_ids.extend(extract_pending_ids(record)) + + assert len(pending_ids) == TokenClaimAirdropTransaction.MAX_IDS + + receipt = claim_pending(env, pending_ids, receiver.key) + assert ResponseCode(receipt.status) == ResponseCode.SUCCESS diff --git a/tests/unit/test_token_airdrop_claim.py b/tests/unit/test_token_airdrop_claim.py new file mode 100644 index 000000000..3b51f7fcd --- /dev/null +++ b/tests/unit/test_token_airdrop_claim.py @@ -0,0 +1,293 @@ +import pytest + +from hiero_sdk_python.hapi.services import timestamp_pb2 +from hiero_sdk_python.hapi.services import transaction_pb2 +from hiero_sdk_python.hapi.services.token_claim_airdrop_pb2 import ( # pylint: disable=no-name-in-module + TokenClaimAirdropTransactionBody, +) +from hiero_sdk_python.transaction.transaction_id import TransactionId +from hiero_sdk_python.account.account_id import AccountId +from hiero_sdk_python.tokens.nft_id import NftId +from hiero_sdk_python.tokens.token_id import TokenId +from hiero_sdk_python.tokens.token_airdrop_claim import TokenClaimAirdropTransaction +from hiero_sdk_python.tokens.token_airdrop_pending_id import PendingAirdropId + +pytestmark = pytest.mark.unit + +def _make_fungible_pending(sender: AccountId, receiver: AccountId, num: int) -> PendingAirdropId: + return PendingAirdropId(sender, receiver, TokenId(0, 0, num), None) + +def _make_nft_pending(sender: AccountId, receiver: AccountId, num: int, serial: int) -> PendingAirdropId: + return PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, num), serial)) + +def test_add_pending_airdrop_id(): + """Test adding one pending fungible airdrop id using chaining method""" + sender = AccountId(0, 0, 1001) + receiver = AccountId(0, 0, 1002) + + pending_airdrop_fungible_1 = _make_fungible_pending(sender, receiver, 1000) + + tx_claim = TokenClaimAirdropTransaction() + chained = tx_claim.add_pending_airdrop_id(pending_airdrop_fungible_1) + assert chained is tx_claim # chaining should return same instance + + ids = tx_claim.get_pending_airdrop_ids() + assert isinstance(ids, list) + assert len(ids) == 1 + assert ids[0] == pending_airdrop_fungible_1 + +def test_add_pending_airdrop_id_nft(): + """Test adding one pending NFT airdrop id using chaining method""" + sender = AccountId(0, 0, 2001) + receiver = AccountId(0, 0, 2002) + + pending_airdrop_nft_1 = _make_nft_pending(sender, receiver, 2000, 1) + + tx_claim = TokenClaimAirdropTransaction() + chained = tx_claim.add_pending_airdrop_id(pending_airdrop_nft_1) + assert chained is tx_claim # chaining should return same instance + + ids = tx_claim.get_pending_airdrop_ids() + assert isinstance(ids, list) + assert len(ids) == 1 + assert ids[0] == pending_airdrop_nft_1 + +def test_add_pending_airdrop_ids_mixed_fungible_and_nft(): + """Claim one fungible and one NFT pending airdrop in a single transaction.""" + sender = AccountId(0, 0, 3001) + receiver = AccountId(0, 0, 3002) + + fungible = _make_fungible_pending(sender, receiver, 3000) # token num=3000 + nft = _make_nft_pending(sender, receiver, 4000, 1) # token num=4000, serial=1 + + tx_claim = TokenClaimAirdropTransaction() + tx_claim.add_pending_airdrop_id(fungible).add_pending_airdrop_id(nft) + + ids = tx_claim.get_pending_airdrop_ids() + assert isinstance(ids, list) + assert len(ids) == 2 + + # Order should be preserved: [fungible, nft] + assert ids[0] == fungible + assert ids[1] == nft + +def test_add_pending_airdrop_ids_multiple_mixed_dynamic(): + """Test adding several fungible + NFT pending airdrop IDs built dynamically.""" + sender = AccountId(0, 0, 6201) + receiver = AccountId(0, 0, 6202) + + pending_ids = [] + # Add fungible IDs + for token_num in (6200, 6201): + pending_ids.append(PendingAirdropId(sender, receiver, TokenId(0, 0, token_num), None)) + # Add NFT IDs + for serial in (1, 2): + pending_ids.append(PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 7200), serial))) + + tx_claim = TokenClaimAirdropTransaction() + tx_claim.add_pending_airdrop_ids(pending_ids) + + ids = tx_claim.get_pending_airdrop_ids() + assert ids == pending_ids + +def test_cannot_exceed_max_airdrops(): + """ Tests that 10 airdrops is fine but anything more not""" + sender = AccountId(0, 0, 8001) + receiver = AccountId(0, 0, 8002) + tx = TokenClaimAirdropTransaction() + + items = [PendingAirdropId(sender, receiver, TokenId(0, 0, 8000 + i), None) + for i in range(tx.MAX_IDS)] + tx.add_pending_airdrop_ids(items) + assert len(tx.get_pending_airdrop_ids()) == tx.MAX_IDS + + with pytest.raises(ValueError): + tx.add_pending_airdrop_id(PendingAirdropId(sender, receiver, TokenId(0, 0, 9999), None)) #This would be 11 + +def test_add_batch_overflow_is_atomic(): + sender_account = AccountId(0, 0, 9001) + receiver_account = AccountId(0, 0, 9002) + transaction_claim = TokenClaimAirdropTransaction() + + # Fill to exactly MAX_IDS - 1 + initial_ids = [ + PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9000 + i), None) + for i in range(transaction_claim.MAX_IDS - 1) + ] + transaction_claim.add_pending_airdrop_ids(initial_ids) + + overflow_batch = [ + PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9990), None), + PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9991), 1)), + ] + + before_ids = transaction_claim.get_pending_airdrop_ids() + with pytest.raises(ValueError): + transaction_claim.add_pending_airdrop_ids(overflow_batch) + after_ids = transaction_claim.get_pending_airdrop_ids() + + assert after_ids == before_ids + +def test_min_ids_enforced_on_build_hits_validation(): + """ Tests that at least one airdrop is required to claim""" + transaction_claim = TokenClaimAirdropTransaction() + transaction_claim.transaction_id = TransactionId(AccountId(0, 0, 9999), timestamp_pb2.Timestamp(seconds=1)) + transaction_claim.node_account_id = AccountId(0, 0, 3) + + with pytest.raises(ValueError): + transaction_claim.build_transaction_body() + +def test_rejects_duplicate_fungible(): + sender = AccountId(0, 0, 8101) + receiver = AccountId(0, 0, 8102) + + f1 = PendingAirdropId(sender, receiver, TokenId(0, 0, 8100), None) + f2 = PendingAirdropId(sender, receiver, TokenId(0, 0, 8100), None) # duplicate + + tx = TokenClaimAirdropTransaction().add_pending_airdrop_id(f1) + + with pytest.raises(ValueError): + tx.add_pending_airdrop_ids([f2]) + + # List should remain unchanged because it should deduplicate + ids = tx.get_pending_airdrop_ids() + assert ids == [f1] + +def test_rejects_duplicate_nft(): + sender = AccountId(0, 0, 8201) + receiver = AccountId(0, 0, 8202) + + n1 = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8200), 1)) + n2 = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8200), 1)) # duplicate + + tx = TokenClaimAirdropTransaction().add_pending_airdrop_id(n1) + + with pytest.raises(ValueError): + tx.add_pending_airdrop_ids([n2]) + + # List should remain unchanged because it should deduplicate + ids = tx.get_pending_airdrop_ids() + assert ids == [n1] + +def test_build_transaction_body_populates_proto(): + sender = AccountId(0, 0, 8401) + receiver = AccountId(0, 0, 8402) + + fungible_airdrop = PendingAirdropId(sender, receiver, TokenId(0, 0, 8400), None) + nft_airdrop = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8405), 3)) + + tx_claim = TokenClaimAirdropTransaction().add_pending_airdrop_ids( + [fungible_airdrop, nft_airdrop] + ) + + # Satisfy base preconditions: set transaction_id and node_account_id + tx_claim.transaction_id = TransactionId( + sender, timestamp_pb2.Timestamp(seconds=1, nanos=0) + ) + tx_claim.node_account_id = AccountId(0, 0, 3) # dummy node account + + body: transaction_pb2.TransactionBody = tx_claim.build_transaction_body() + + claim = body.tokenClaimAirdrop + assert isinstance(claim, TokenClaimAirdropTransactionBody) + assert len(claim.pending_airdrops) == 2 + + expected = [a._to_proto().SerializeToString() for a in [fungible_airdrop, nft_airdrop]] + actual = [a.SerializeToString() for a in claim.pending_airdrops] + assert actual == expected + +def test_from_proto_round_trip(): + sender_account = AccountId(0, 0, 9041) + receiver_account = AccountId(0, 0, 9042) + original_ids = [ + PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9040), None), + PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9045), 7)), + ] + proto_body = TokenClaimAirdropTransactionBody(pending_airdrops=[i._to_proto() for i in original_ids]) + + rebuilt = TokenClaimAirdropTransaction._from_proto(proto_body) # pylint: disable=protected-access + assert rebuilt.get_pending_airdrop_ids() == original_ids + +def test_get_pending_airdrop_ids_returns_copy(): + sender_account = AccountId(0, 0, 9021) + receiver_account = AccountId(0, 0, 9022) + airdrop_id = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9020), None) + + transaction_claim = TokenClaimAirdropTransaction().add_pending_airdrop_id(airdrop_id) + snapshot = transaction_claim.get_pending_airdrop_ids() + snapshot.append(PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9999), None)) + + assert transaction_claim.get_pending_airdrop_ids() == [airdrop_id] # unchanged + +def test_order_preserved_across_batched_adds(): + sender_account = AccountId(0, 0, 9031) + receiver_account = AccountId(0, 0, 9032) + + id_a = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9030), None) + id_b = PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9035), 1)) + id_c = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9031), None) + id_d = PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9035), 2)) + + transaction_claim = TokenClaimAirdropTransaction() + transaction_claim.add_pending_airdrop_ids([id_a, id_b]).add_pending_airdrop_ids([id_c]).add_pending_airdrop_ids([id_d]) + + assert transaction_claim.get_pending_airdrop_ids() == [id_a, id_b, id_c, id_d] + +def test_add_empty_list_is_noop(): + sender_account = AccountId(0, 0, 9071) + receiver_account = AccountId(0, 0, 9072) + first_id = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9070), None) + + transaction_claim = TokenClaimAirdropTransaction().add_pending_airdrop_id(first_id) + transaction_claim.add_pending_airdrop_ids([]) + + assert transaction_claim.get_pending_airdrop_ids() == [first_id] + +def test_from_proto_rejects_too_many(): + sender_account = AccountId(0, 0, 9051) + receiver_account = AccountId(0, 0, 9052) + too_many = [PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9050 + i), None) + for i in range(TokenClaimAirdropTransaction.MAX_IDS + 1)] + body = TokenClaimAirdropTransactionBody(pending_airdrops=[x._to_proto() for x in too_many]) + + with pytest.raises(ValueError): + TokenClaimAirdropTransaction._from_proto(body) # pylint: disable=protected-access + +def test_from_proto_rejects_duplicates(): + sender_account = AccountId(0, 0, 9061) + receiver_account = AccountId(0, 0, 9062) + duplicate = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9060), None) + body = TokenClaimAirdropTransactionBody(pending_airdrops=[duplicate._to_proto(), duplicate._to_proto()]) + + with pytest.raises(ValueError): + TokenClaimAirdropTransaction._from_proto(body) # pylint: disable=protected-access + +def test_reject_pending_airdrop_with_both_token_and_nft(): + """A PendingAirdropId must not have both token_id and nft_id at the same time""" + sender = AccountId(0, 0, 9111) + receiver = AccountId(0, 0, 9112) + + token_id = TokenId(0, 0, 5001) + nft_id = NftId(TokenId(0, 0, 5002), 1) + + # Expect ValueError because both token_id and nft_id are provided + with pytest.raises(ValueError, match="Exactly one of 'token_id' or 'nft_id' must be required."): + PendingAirdropId(sender, receiver, token_id, nft_id) + +def test_from_proto_with_invalid_pending_airdrop(): + """_from_proto should raise if proto contains a PendingAirdropId with neither token_id nor nft_id""" + sender = AccountId(0, 0, 9111) + receiver = AccountId(0, 0, 9112) + + # Build an invalid PendingAirdropId (both token_id and nft_id are None) + with pytest.raises(ValueError): + PendingAirdropId(sender, receiver, token_id=None, nft_id=None) + +def test_str_and_repr(): + sender = AccountId(0, 0, 1) + receiver = AccountId(0, 0, 2) + tx = TokenClaimAirdropTransaction() + assert str(tx) == "No pending airdrops in this transaction." + tx.add_pending_airdrop_id(PendingAirdropId(sender, receiver, TokenId(0,0,10), None)) + assert "Pending Airdrops to claim:" in str(tx) + assert repr(tx).startswith("TokenClaimAirdropTransaction(") \ No newline at end of file