Skip to content

Commit e2fbda7

Browse files
authored
Implement zigpy TX power API (#698)
* Implement zigpy TX power API * Fix typo in command name * Bump zigpy * Add tests
1 parent 2b79ae6 commit e2fbda7

File tree

9 files changed

+240
-2
lines changed

9 files changed

+240
-2
lines changed

bellows/ezsp/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
from bellows.exception import EzspError, InvalidCommandError, InvalidCommandPayload
2121
from bellows.ezsp import xncp
2222
from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig
23-
from bellows.ezsp.xncp import FirmwareFeatures, FlowControlType, GetRouteTableEntryRsp
23+
from bellows.ezsp.xncp import (
24+
FirmwareFeatures,
25+
FlowControlType,
26+
GetRouteTableEntryRsp,
27+
GetTxPowerInfoRsp,
28+
)
2429
import bellows.types as t
2530
import bellows.uart
2631

@@ -842,3 +847,8 @@ async def xncp_set_route_table_entry(
842847
cost=cost,
843848
)
844849
)
850+
851+
async def xncp_get_tx_power_info(self, country_code: str) -> GetTxPowerInfoRsp:
852+
"""Get maximum and recommended TX power for a country (ISO 3166-1 alpha-2)."""
853+
code = country_code.upper().encode("ascii")
854+
return await self.send_xncp_frame(xncp.GetTxPowerInfoReq(country_code=code))

bellows/ezsp/xncp.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class XncpCommandId(t.enum16):
4242
GET_CHIP_INFO_REQ = 0x0005
4343
SET_ROUTE_TABLE_ENTRY_REQ = 0x0006
4444
GET_ROUTE_TABLE_ENTRY_REQ = 0x0007
45+
GET_TX_POWER_INFO_REQ = 0x0008
4546

4647
GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000
4748
SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000
@@ -51,6 +52,7 @@ class XncpCommandId(t.enum16):
5152
GET_CHIP_INFO_RSP = GET_CHIP_INFO_REQ | 0x8000
5253
SET_ROUTE_TABLE_ENTRY_RSP = SET_ROUTE_TABLE_ENTRY_REQ | 0x8000
5354
GET_ROUTE_TABLE_ENTRY_RSP = GET_ROUTE_TABLE_ENTRY_REQ | 0x8000
55+
GET_TX_POWER_INFO_RSP = GET_TX_POWER_INFO_REQ | 0x8000
5456

5557
UNKNOWN = 0xFFFF
5658

@@ -118,6 +120,9 @@ class FirmwareFeatures(t.bitmap32):
118120
# Route table entries can be set
119121
RESTORE_ROUTE_TABLE = 1 << 6
120122

123+
# Recommended and maximum TX power can be queried by country code
124+
TX_POWER_INFO = 1 << 7
125+
121126

122127
class XncpCommandPayload(t.Struct):
123128
pass
@@ -217,6 +222,17 @@ class GetRouteTableEntryRsp(XncpCommandPayload):
217222
cost: t.uint8_t
218223

219224

225+
@register_command(XncpCommandId.GET_TX_POWER_INFO_REQ)
226+
class GetTxPowerInfoReq(XncpCommandPayload):
227+
country_code: Bytes
228+
229+
230+
@register_command(XncpCommandId.GET_TX_POWER_INFO_RSP)
231+
class GetTxPowerInfoRsp(XncpCommandPayload):
232+
recommended_power_dbm: t.int8s
233+
max_power_dbm: t.int8s
234+
235+
220236
@register_command(XncpCommandId.UNKNOWN)
221237
class Unknown(XncpCommandPayload):
222238
pass

bellows/types/struct.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,18 @@ class NV3StackTrustCenterToken(EzspStruct):
376376
key: named.KeyData
377377

378378

379+
class NV3StackNodeData(EzspStruct):
380+
"""NV3 stack node data token value."""
381+
382+
panId: named.EmberPanId
383+
radioTxPower: basic.int8s
384+
radioFreqChannel: basic.uint8_t
385+
stackProfile: basic.uint8_t # Always 0x02
386+
nodeType: named.EmberNodeType
387+
zigbeeNodeId: named.EmberNodeId
388+
extendedPanId: named.ExtendedPanId
389+
390+
379391
class EmberKeyStruct(EzspStruct):
380392
# A structure containing a key and its associated data.
381393
# A bitmask indicating the presence of data within the various fields

bellows/zigbee/application.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,34 @@ async def _get_board_info(self) -> tuple[str, str, str] | tuple[None, None, None
153153

154154
return None, None, None
155155

156+
async def _get_recommended_tx_power(self, country: str) -> float:
157+
"""Get firmware recommended TX power for the given country."""
158+
if FirmwareFeatures.TX_POWER_INFO not in self._ezsp._xncp_features:
159+
return await super()._get_recommended_tx_power(country)
160+
161+
tx_power_info = await self._ezsp.xncp_get_tx_power_info(country)
162+
return tx_power_info.recommended_power_dbm
163+
164+
async def _get_maximum_tx_power(self, country: str) -> float:
165+
"""Get firmware maximum TX power for the given country."""
166+
if FirmwareFeatures.TX_POWER_INFO not in self._ezsp._xncp_features:
167+
return await super()._get_maximum_tx_power(country)
168+
169+
tx_power_info = await self._ezsp.xncp_get_tx_power_info(country)
170+
return tx_power_info.max_power_dbm
171+
172+
async def _set_tx_power(self, tx_power: float) -> float | None:
173+
"""Set TX power (if supported by the radio), returning the actual TX power."""
174+
actual_power = int(tx_power)
175+
await self._ezsp.setRadioPower(power=actual_power)
176+
177+
# We intentionally do not reset after changing the TX power. Instead, we just
178+
# persist the changes to NVRAM (if necessary), they will be reloaded on next
179+
# boot.
180+
await repairs.update_tx_power(self._ezsp, tx_power=actual_power)
181+
182+
return float(actual_power)
183+
156184
async def connect(self) -> None:
157185
self._ezsp = bellows.ezsp.EZSP(self.config[zigpy.config.CONF_DEVICE], self)
158186

bellows/zigbee/repairs.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,42 @@ async def fix_invalid_tclk_partner_ieee(ezsp: EZSP) -> bool:
4949
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK
5050

5151
return True
52+
53+
54+
async def update_tx_power(ezsp: EZSP, tx_power: int) -> bool:
55+
"""Persist transmit power in NVRAM."""
56+
57+
try:
58+
rsp = await ezsp.getTokenData(token=t.NV3KeyId.NVM3KEY_STACK_NODE_DATA, index=0)
59+
assert t.sl_Status.from_ember_status(rsp.status) == t.sl_Status.OK
60+
except (InvalidCommandError, AttributeError, AssertionError):
61+
LOGGER.warning("NV3 interface not available in this firmware, please upgrade!")
62+
return False
63+
64+
token, remaining = t.NV3StackNodeData.deserialize(rsp.value)
65+
assert not remaining
66+
67+
# No point in writing to NVRAM if the TX power is correct
68+
if token.radioTxPower == tx_power:
69+
return False
70+
71+
status, node_type, nwk_params = await ezsp.getNetworkParameters()
72+
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK
73+
74+
# Sanity check
75+
assert token.panId == nwk_params.panId
76+
assert token.radioFreqChannel == nwk_params.radioChannel
77+
assert token.stackProfile == 0x02
78+
assert token.nodeType == node_type
79+
assert token.extendedPanId == nwk_params.extendedPanId
80+
81+
(status,) = await ezsp.setTokenData(
82+
token=t.NV3KeyId.NVM3KEY_STACK_NODE_DATA,
83+
index=0,
84+
token_data=token.replace(radioTxPower=tx_power).serialize(),
85+
)
86+
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK
87+
88+
LOGGER.debug("Persisted TX power %d to NVRAM", tx_power)
89+
90+
return True

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ dependencies = [
1717
"click",
1818
"click-log>=0.2.1",
1919
"voluptuous",
20-
"zigpy>=0.85.0",
20+
"zigpy>=0.87.0",
2121
]
2222

2323
[tool.setuptools.packages.find]

tests/test_application.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
FlowControlType,
2323
GetChipInfoRsp,
2424
GetRouteTableEntryRsp,
25+
GetTxPowerInfoRsp,
2526
)
2627
import bellows.types
2728
import bellows.types as t
@@ -2639,3 +2640,46 @@ async def test_migration_failure_eui64_overwrite_confirmation(
26392640
assert app._ezsp.write_custom_eui64.mock_calls == [
26402641
call(t.EUI64.convert("aa:aa:aa:aa:aa:aa:aa:aa"), burn_into_userdata=True)
26412642
]
2643+
2644+
2645+
async def test_tx_power_with_xncp_feature(app: ControllerApplication) -> None:
2646+
"""Test TX power methods with XNCP TX_POWER_INFO feature."""
2647+
app._ezsp._xncp_features |= FirmwareFeatures.TX_POWER_INFO
2648+
app._ezsp.xncp_get_tx_power_info = AsyncMock(
2649+
return_value=GetTxPowerInfoRsp(recommended_power_dbm=10, max_power_dbm=20)
2650+
)
2651+
2652+
assert await app.get_recommended_tx_power("US") == 10.0
2653+
assert await app.get_maximum_tx_power("US") == 20.0
2654+
2655+
2656+
async def test_tx_power_without_xncp_feature(app: ControllerApplication) -> None:
2657+
"""Test TX power methods fall back to parent class without XNCP feature."""
2658+
app._ezsp._xncp_features = FirmwareFeatures.NONE
2659+
app._ezsp.xncp_get_tx_power_info = AsyncMock()
2660+
2661+
app_cls = zigpy.application.ControllerApplication
2662+
2663+
ezsp_rec_tx_power = await app.get_recommended_tx_power("US")
2664+
base_rec_tx_power = await app_cls.get_recommended_tx_power(app, "US")
2665+
assert ezsp_rec_tx_power == base_rec_tx_power
2666+
2667+
ezsp_max_tx_power = await app.get_maximum_tx_power("US")
2668+
base_max_tx_power = await app_cls.get_maximum_tx_power(app, "US")
2669+
assert ezsp_max_tx_power == base_max_tx_power
2670+
2671+
assert len(app._ezsp.xncp_get_tx_power_info.mock_calls) == 0
2672+
2673+
2674+
async def test_set_tx_power(app: ControllerApplication) -> None:
2675+
"""Test set_tx_power with float-to-int conversion and NVRAM persistence."""
2676+
app._ezsp.setRadioPower = AsyncMock()
2677+
2678+
with patch(
2679+
"bellows.zigbee.repairs.update_tx_power", return_value=True
2680+
) as mock_update:
2681+
result = await app.set_tx_power(12.7)
2682+
2683+
assert result == 12.0
2684+
assert app._ezsp.setRadioPower.mock_calls == [call(power=12)]
2685+
assert mock_update.mock_calls == [call(app._ezsp, tx_power=12)]

tests/test_xncp.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,26 @@ async def test_xncp_route_table_operations(ezsp_f: EZSP) -> None:
291291
).serialize()
292292
)
293293
]
294+
295+
296+
async def test_xncp_get_tx_power_info(ezsp_f: EZSP) -> None:
297+
"""Test XNCP get_tx_power_info."""
298+
ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock(
299+
return_value=[
300+
t.EmberStatus.SUCCESS,
301+
xncp.XncpCommand.from_payload(
302+
xncp.GetTxPowerInfoRsp(recommended_power_dbm=10, max_power_dbm=20)
303+
).serialize(),
304+
]
305+
)
306+
307+
rsp = await ezsp_f.xncp_get_tx_power_info("us")
308+
assert rsp.recommended_power_dbm == 10
309+
assert rsp.max_power_dbm == 20
310+
assert customFrame.mock_calls == [
311+
call(
312+
xncp.XncpCommand.from_payload(
313+
xncp.GetTxPowerInfoReq(country_code=b"US")
314+
).serialize()
315+
)
316+
]

tests/test_zigbee_repairs.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,69 @@ async def test_fix_invalid_tclk_all_versions(
174174
]
175175
else:
176176
assert "NV3 interface not available in this firmware" in caplog.text
177+
178+
179+
async def test_update_tx_power(ezsp_f: EZSP, caplog) -> None:
180+
"""Test update_tx_power behavior in various scenarios."""
181+
token_data = t.NV3StackNodeData(
182+
panId=t.EmberPanId(0x1234),
183+
radioTxPower=t.int8s(5),
184+
radioFreqChannel=t.uint8_t(15),
185+
stackProfile=t.uint8_t(0x02),
186+
nodeType=t.EmberNodeType.COORDINATOR,
187+
zigbeeNodeId=t.EmberNodeId(0x0000),
188+
extendedPanId=t.ExtendedPanId.convert("AA:BB:CC:DD:EE:FF:00:11"),
189+
)
190+
191+
# Test 1: NV3 interface unavailable
192+
ezsp_f.getTokenData = AsyncMock(side_effect=InvalidCommandError())
193+
with caplog.at_level(logging.WARNING):
194+
assert await repairs.update_tx_power(ezsp_f, tx_power=10) is False
195+
assert "NV3 interface not available in this firmware" in caplog.text
196+
197+
# Test 2: TX power already correct (no write needed)
198+
ezsp_f.getTokenData = AsyncMock(
199+
return_value=GetTokenDataRsp(
200+
status=t.EmberStatus.SUCCESS,
201+
value=token_data.replace(radioTxPower=t.int8s(10)).serialize(),
202+
)
203+
)
204+
ezsp_f.setTokenData = AsyncMock()
205+
ezsp_f.getNetworkParameters = AsyncMock()
206+
assert await repairs.update_tx_power(ezsp_f, tx_power=10) is False
207+
assert len(ezsp_f.setTokenData.mock_calls) == 0
208+
assert len(ezsp_f.getNetworkParameters.mock_calls) == 0
209+
210+
# Test 3: Successful TX power update
211+
ezsp_f.getTokenData = AsyncMock(
212+
return_value=GetTokenDataRsp(
213+
status=t.EmberStatus.SUCCESS,
214+
value=token_data.serialize(),
215+
)
216+
)
217+
ezsp_f.getNetworkParameters = AsyncMock(
218+
return_value=[
219+
t.EmberStatus.SUCCESS,
220+
t.EmberNodeType.COORDINATOR,
221+
t.EmberNetworkParameters(
222+
panId=t.EmberPanId(0x1234),
223+
extendedPanId=t.ExtendedPanId.convert("AA:BB:CC:DD:EE:FF:00:11"),
224+
radioChannel=t.uint8_t(15),
225+
radioTxPower=t.int8s(5),
226+
joinMethod=t.EmberJoinMethod.USE_MAC_ASSOCIATION,
227+
nwkManagerId=t.EmberNodeId(0x0000),
228+
nwkUpdateId=t.uint8_t(0),
229+
channels=t.Channels.ALL_CHANNELS,
230+
),
231+
]
232+
)
233+
ezsp_f.setTokenData = AsyncMock(return_value=[t.EmberStatus.SUCCESS])
234+
235+
assert await repairs.update_tx_power(ezsp_f, tx_power=15) is True
236+
assert ezsp_f.setTokenData.mock_calls == [
237+
call(
238+
token=t.NV3KeyId.NVM3KEY_STACK_NODE_DATA,
239+
index=0,
240+
token_data=token_data.replace(radioTxPower=t.int8s(15)).serialize(),
241+
)
242+
]

0 commit comments

Comments
 (0)