Skip to content

Commit eaf56f8

Browse files
committed
chore: claim unit test
Signed-off-by: exploreriii <[email protected]>
1 parent f2dd8df commit eaf56f8

File tree

1 file changed

+293
-0
lines changed

1 file changed

+293
-0
lines changed
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
import pytest
2+
3+
from hiero_sdk_python.hapi.services import timestamp_pb2
4+
from hiero_sdk_python.hapi.services import transaction_pb2
5+
from hiero_sdk_python.hapi.services.token_claim_airdrop_pb2 import ( # pylint: disable=no-name-in-module
6+
TokenClaimAirdropTransactionBody,
7+
)
8+
from hiero_sdk_python.transaction.transaction_id import TransactionId
9+
from hiero_sdk_python.account.account_id import AccountId
10+
from hiero_sdk_python.tokens.nft_id import NftId
11+
from hiero_sdk_python.tokens.token_id import TokenId
12+
from hiero_sdk_python.tokens.token_airdrop_claim import TokenClaimAirdropTransaction
13+
from hiero_sdk_python.tokens.token_airdrop_pending_id import PendingAirdropId
14+
15+
pytestmark = pytest.mark.unit
16+
17+
def _make_fungible_pending(sender: AccountId, receiver: AccountId, num: int) -> PendingAirdropId:
18+
return PendingAirdropId(sender, receiver, TokenId(0, 0, num), None)
19+
20+
def _make_nft_pending(sender: AccountId, receiver: AccountId, num: int, serial: int) -> PendingAirdropId:
21+
return PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, num), serial))
22+
23+
def test_add_pending_airdrop_id():
24+
"""Test adding one pending fungible airdrop id using chaining method"""
25+
sender = AccountId(0, 0, 1001)
26+
receiver = AccountId(0, 0, 1002)
27+
28+
pending_airdrop_fungible_1 = _make_fungible_pending(sender, receiver, 1000)
29+
30+
tx_claim = TokenClaimAirdropTransaction()
31+
chained = tx_claim.add_pending_airdrop_id(pending_airdrop_fungible_1)
32+
assert chained is tx_claim # chaining should return same instance
33+
34+
ids = tx_claim.get_pending_airdrop_ids()
35+
assert isinstance(ids, list)
36+
assert len(ids) == 1
37+
assert ids[0] == pending_airdrop_fungible_1
38+
39+
def test_add_pending_airdrop_id_nft():
40+
"""Test adding one pending NFT airdrop id using chaining method"""
41+
sender = AccountId(0, 0, 2001)
42+
receiver = AccountId(0, 0, 2002)
43+
44+
pending_airdrop_nft_1 = _make_nft_pending(sender, receiver, 2000, 1)
45+
46+
tx_claim = TokenClaimAirdropTransaction()
47+
chained = tx_claim.add_pending_airdrop_id(pending_airdrop_nft_1)
48+
assert chained is tx_claim # chaining should return same instance
49+
50+
ids = tx_claim.get_pending_airdrop_ids()
51+
assert isinstance(ids, list)
52+
assert len(ids) == 1
53+
assert ids[0] == pending_airdrop_nft_1
54+
55+
def test_add_pending_airdrop_ids_mixed_fungible_and_nft():
56+
"""Claim one fungible and one NFT pending airdrop in a single transaction."""
57+
sender = AccountId(0, 0, 3001)
58+
receiver = AccountId(0, 0, 3002)
59+
60+
fungible = _make_fungible_pending(sender, receiver, 3000) # token num=3000
61+
nft = _make_nft_pending(sender, receiver, 4000, 1) # token num=4000, serial=1
62+
63+
tx_claim = TokenClaimAirdropTransaction()
64+
tx_claim.add_pending_airdrop_id(fungible).add_pending_airdrop_id(nft)
65+
66+
ids = tx_claim.get_pending_airdrop_ids()
67+
assert isinstance(ids, list)
68+
assert len(ids) == 2
69+
70+
# Order should be preserved: [fungible, nft]
71+
assert ids[0] == fungible
72+
assert ids[1] == nft
73+
74+
def test_add_pending_airdrop_ids_multiple_mixed_dynamic():
75+
"""Test adding several fungible + NFT pending airdrop IDs built dynamically."""
76+
sender = AccountId(0, 0, 6201)
77+
receiver = AccountId(0, 0, 6202)
78+
79+
pending_ids = []
80+
# Add fungible IDs
81+
for token_num in (6200, 6201):
82+
pending_ids.append(PendingAirdropId(sender, receiver, TokenId(0, 0, token_num), None))
83+
# Add NFT IDs
84+
for serial in (1, 2):
85+
pending_ids.append(PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 7200), serial)))
86+
87+
tx_claim = TokenClaimAirdropTransaction()
88+
tx_claim.add_pending_airdrop_ids(pending_ids)
89+
90+
ids = tx_claim.get_pending_airdrop_ids()
91+
assert ids == pending_ids
92+
93+
def test_cannot_exceed_max_airdrops():
94+
""" Tests that 10 airdrops is fine but anything more not"""
95+
sender = AccountId(0, 0, 8001)
96+
receiver = AccountId(0, 0, 8002)
97+
tx = TokenClaimAirdropTransaction()
98+
99+
items = [PendingAirdropId(sender, receiver, TokenId(0, 0, 8000 + i), None)
100+
for i in range(tx.MAX_IDS)]
101+
tx.add_pending_airdrop_ids(items)
102+
assert len(tx.get_pending_airdrop_ids()) == tx.MAX_IDS
103+
104+
with pytest.raises(ValueError):
105+
tx.add_pending_airdrop_id(PendingAirdropId(sender, receiver, TokenId(0, 0, 9999), None)) #This would be 11
106+
107+
def test_add_batch_overflow_is_atomic():
108+
sender_account = AccountId(0, 0, 9001)
109+
receiver_account = AccountId(0, 0, 9002)
110+
transaction_claim = TokenClaimAirdropTransaction()
111+
112+
# Fill to exactly MAX_IDS - 1
113+
initial_ids = [
114+
PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9000 + i), None)
115+
for i in range(transaction_claim.MAX_IDS - 1)
116+
]
117+
transaction_claim.add_pending_airdrop_ids(initial_ids)
118+
119+
overflow_batch = [
120+
PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9990), None),
121+
PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9991), 1)),
122+
]
123+
124+
before_ids = transaction_claim.get_pending_airdrop_ids()
125+
with pytest.raises(ValueError):
126+
transaction_claim.add_pending_airdrop_ids(overflow_batch)
127+
after_ids = transaction_claim.get_pending_airdrop_ids()
128+
129+
assert after_ids == before_ids
130+
131+
def test_min_ids_enforced_on_build_hits_validation():
132+
""" Tests that at least one airdrop is required to claim"""
133+
transaction_claim = TokenClaimAirdropTransaction()
134+
transaction_claim.transaction_id = TransactionId(AccountId(0, 0, 9999), timestamp_pb2.Timestamp(seconds=1))
135+
transaction_claim.node_account_id = AccountId(0, 0, 3)
136+
137+
with pytest.raises(ValueError):
138+
transaction_claim.build_transaction_body()
139+
140+
def test_rejects_duplicate_fungible():
141+
sender = AccountId(0, 0, 8101)
142+
receiver = AccountId(0, 0, 8102)
143+
144+
f1 = PendingAirdropId(sender, receiver, TokenId(0, 0, 8100), None)
145+
f2 = PendingAirdropId(sender, receiver, TokenId(0, 0, 8100), None) # duplicate
146+
147+
tx = TokenClaimAirdropTransaction().add_pending_airdrop_id(f1)
148+
149+
with pytest.raises(ValueError):
150+
tx.add_pending_airdrop_ids([f2])
151+
152+
# List should remain unchanged because it should deduplicate
153+
ids = tx.get_pending_airdrop_ids()
154+
assert ids == [f1]
155+
156+
def test_rejects_duplicate_nft():
157+
sender = AccountId(0, 0, 8201)
158+
receiver = AccountId(0, 0, 8202)
159+
160+
n1 = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8200), 1))
161+
n2 = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8200), 1)) # duplicate
162+
163+
tx = TokenClaimAirdropTransaction().add_pending_airdrop_id(n1)
164+
165+
with pytest.raises(ValueError):
166+
tx.add_pending_airdrop_ids([n2])
167+
168+
# List should remain unchanged because it should deduplicate
169+
ids = tx.get_pending_airdrop_ids()
170+
assert ids == [n1]
171+
172+
def test_build_transaction_body_populates_proto():
173+
sender = AccountId(0, 0, 8401)
174+
receiver = AccountId(0, 0, 8402)
175+
176+
fungible_airdrop = PendingAirdropId(sender, receiver, TokenId(0, 0, 8400), None)
177+
nft_airdrop = PendingAirdropId(sender, receiver, None, NftId(TokenId(0, 0, 8405), 3))
178+
179+
tx_claim = TokenClaimAirdropTransaction().add_pending_airdrop_ids(
180+
[fungible_airdrop, nft_airdrop]
181+
)
182+
183+
# Satisfy base preconditions: set transaction_id and node_account_id
184+
tx_claim.transaction_id = TransactionId(
185+
sender, timestamp_pb2.Timestamp(seconds=1, nanos=0)
186+
)
187+
tx_claim.node_account_id = AccountId(0, 0, 3) # dummy node account
188+
189+
body: transaction_pb2.TransactionBody = tx_claim.build_transaction_body()
190+
191+
claim = body.tokenClaimAirdrop
192+
assert isinstance(claim, TokenClaimAirdropTransactionBody)
193+
assert len(claim.pending_airdrops) == 2
194+
195+
expected = [a._to_proto().SerializeToString() for a in [fungible_airdrop, nft_airdrop]]
196+
actual = [a.SerializeToString() for a in claim.pending_airdrops]
197+
assert actual == expected
198+
199+
def test_from_proto_round_trip():
200+
sender_account = AccountId(0, 0, 9041)
201+
receiver_account = AccountId(0, 0, 9042)
202+
original_ids = [
203+
PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9040), None),
204+
PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9045), 7)),
205+
]
206+
proto_body = TokenClaimAirdropTransactionBody(pending_airdrops=[i._to_proto() for i in original_ids])
207+
208+
rebuilt = TokenClaimAirdropTransaction._from_proto(proto_body) # pylint: disable=protected-access
209+
assert rebuilt.get_pending_airdrop_ids() == original_ids
210+
211+
def test_get_pending_airdrop_ids_returns_copy():
212+
sender_account = AccountId(0, 0, 9021)
213+
receiver_account = AccountId(0, 0, 9022)
214+
airdrop_id = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9020), None)
215+
216+
transaction_claim = TokenClaimAirdropTransaction().add_pending_airdrop_id(airdrop_id)
217+
snapshot = transaction_claim.get_pending_airdrop_ids()
218+
snapshot.append(PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9999), None))
219+
220+
assert transaction_claim.get_pending_airdrop_ids() == [airdrop_id] # unchanged
221+
222+
def test_order_preserved_across_batched_adds():
223+
sender_account = AccountId(0, 0, 9031)
224+
receiver_account = AccountId(0, 0, 9032)
225+
226+
id_a = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9030), None)
227+
id_b = PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9035), 1))
228+
id_c = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9031), None)
229+
id_d = PendingAirdropId(sender_account, receiver_account, None, NftId(TokenId(0, 0, 9035), 2))
230+
231+
transaction_claim = TokenClaimAirdropTransaction()
232+
transaction_claim.add_pending_airdrop_ids([id_a, id_b]).add_pending_airdrop_ids([id_c]).add_pending_airdrop_ids([id_d])
233+
234+
assert transaction_claim.get_pending_airdrop_ids() == [id_a, id_b, id_c, id_d]
235+
236+
def test_add_empty_list_is_noop():
237+
sender_account = AccountId(0, 0, 9071)
238+
receiver_account = AccountId(0, 0, 9072)
239+
first_id = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9070), None)
240+
241+
transaction_claim = TokenClaimAirdropTransaction().add_pending_airdrop_id(first_id)
242+
transaction_claim.add_pending_airdrop_ids([])
243+
244+
assert transaction_claim.get_pending_airdrop_ids() == [first_id]
245+
246+
def test_from_proto_rejects_too_many():
247+
sender_account = AccountId(0, 0, 9051)
248+
receiver_account = AccountId(0, 0, 9052)
249+
too_many = [PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9050 + i), None)
250+
for i in range(TokenClaimAirdropTransaction.MAX_IDS + 1)]
251+
body = TokenClaimAirdropTransactionBody(pending_airdrops=[x._to_proto() for x in too_many])
252+
253+
with pytest.raises(ValueError):
254+
TokenClaimAirdropTransaction._from_proto(body) # pylint: disable=protected-access
255+
256+
def test_from_proto_rejects_duplicates():
257+
sender_account = AccountId(0, 0, 9061)
258+
receiver_account = AccountId(0, 0, 9062)
259+
duplicate = PendingAirdropId(sender_account, receiver_account, TokenId(0, 0, 9060), None)
260+
body = TokenClaimAirdropTransactionBody(pending_airdrops=[duplicate._to_proto(), duplicate._to_proto()])
261+
262+
with pytest.raises(ValueError):
263+
TokenClaimAirdropTransaction._from_proto(body) # pylint: disable=protected-access
264+
265+
def test_reject_pending_airdrop_with_both_token_and_nft():
266+
"""A PendingAirdropId must not have both token_id and nft_id at the same time"""
267+
sender = AccountId(0, 0, 9111)
268+
receiver = AccountId(0, 0, 9112)
269+
270+
token_id = TokenId(0, 0, 5001)
271+
nft_id = NftId(TokenId(0, 0, 5002), 1)
272+
273+
# Expect ValueError because both token_id and nft_id are provided
274+
with pytest.raises(ValueError, match="Exactly one of 'token_id' or 'nft_id' must be required."):
275+
PendingAirdropId(sender, receiver, token_id, nft_id)
276+
277+
def test_from_proto_with_invalid_pending_airdrop():
278+
"""_from_proto should raise if proto contains a PendingAirdropId with neither token_id nor nft_id"""
279+
sender = AccountId(0, 0, 9111)
280+
receiver = AccountId(0, 0, 9112)
281+
282+
# Build an invalid PendingAirdropId (both token_id and nft_id are None)
283+
with pytest.raises(ValueError):
284+
PendingAirdropId(sender, receiver, token_id=None, nft_id=None)
285+
286+
def test_str_and_repr():
287+
sender = AccountId(0, 0, 1)
288+
receiver = AccountId(0, 0, 2)
289+
tx = TokenClaimAirdropTransaction()
290+
assert str(tx) == "No pending airdrops in this transaction."
291+
tx.add_pending_airdrop_id(PendingAirdropId(sender, receiver, TokenId(0,0,10), None))
292+
assert "Pending Airdrops to claim:" in str(tx)
293+
assert repr(tx).startswith("TokenClaimAirdropTransaction(")

0 commit comments

Comments
 (0)