Skip to content

--json Outputs JSON objects for --nodes, --request-telemetry/position #577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions meshtastic/__main__.py
Original file line number Diff line number Diff line change
@@ -253,7 +253,7 @@ def onConnected(interface):
args = mt_config.args

# do not print this line if we are exporting the config
if not args.export_config:
if not (args.export_config or args.json):
print("Connected to radio")

if args.setlat or args.setlon or args.setalt:
@@ -445,17 +445,24 @@ def onConnected(interface):
else:
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)")
interface.sendTelemetry(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex)
if not args.json:
print(f"Sending telemetry request to {args.dest} "
f"on channelIndex:{channelIndex} (this could take a while)")
interface.sendTelemetry(
destinationId=args.dest, wantResponse=True, channelIndex=channelIndex, jsonResponse=args.json
)

if args.request_position:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)")
interface.sendPosition(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex)
if not args.json:
print(f"Sending position request to {args.dest} "
f"on channelIndex:{channelIndex} (this could take a while)")
interface.sendPosition(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex,
jsonResponse=args.json)

if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
if args.dest == BROADCAST_ADDR:
@@ -819,9 +826,13 @@ def setSimpleConfig(modem_preset):
if args.nodes:
closeNow = True
if args.dest != BROADCAST_ADDR:
print("Showing node list of a remote node is not supported.")
if args.json:
print("[]")
else:
print("Showing node list of a remote node is not supported.")
return
interface.showNodes()
interface.showNodes(jsonResponse=args.json)


if args.qr or args.qr_all:
closeNow = True
@@ -866,7 +877,10 @@ def setSimpleConfig(modem_preset):
interface.close() # after running command then exit

except Exception as ex:
print(f"Aborting due to: {ex}")
if args.json:
print("")
else:
print(f"Aborting due to: {ex}")
interface.close() # close the connection now, so that our app exits
sys.exit(1)

@@ -1461,6 +1475,12 @@ def initParser():
"--debug", help="Show API library debug log messages", action="store_true"
)

group.add_argument(
"--json",
help="Output JSON objects for --nodes, --request-telemetry, --request-position",
action="store_true"
)

group.add_argument(
"--test",
help="Run stress test against all connected Meshtastic devices",
190 changes: 134 additions & 56 deletions meshtastic/mesh_interface.py
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ def showInfo(self, file=sys.stdout) -> str: # pylint: disable=W0613
print(infos)
return infos

def showNodes(self, includeSelf: bool=True, file=sys.stdout) -> str: # pylint: disable=W0613
def showNodes(self, includeSelf: bool=True, jsonResponse: bool=False) -> List: # pylint: disable=W0613
"""Show table summary of nodes in mesh"""

def formatFloat(value, precision=2, unit="") -> Optional[str]:
@@ -224,9 +224,13 @@ def getTimeAgo(ts) -> Optional[str]:
for i, row in enumerate(rows):
row["N"] = i + 1

table = tabulate(rows, headers="keys", missingval="N/A", tablefmt="fancy_grid")
print(table)
return table
if jsonResponse:
print(json.dumps(rows, indent=2))
else:
table = tabulate(rows, headers="keys", missingval="N/A", tablefmt="fancy_grid")
print(table)

Comment on lines +227 to +232
Copy link
Preview

Copilot AI May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The showNodes function prints either a table or JSON but always returns the rows list. Consider separating the logic for generating node data from the presentation logic to maintain consistency and better testability.

Copilot uses AI. Check for mistakes.

return rows

def getNode(self, nodeId: str, requestChannels: bool=True) -> meshtastic.node.Node:
"""Return a node object which contains device settings and channel info"""
@@ -353,6 +357,7 @@ def sendPosition(
wantAck: bool=False,
wantResponse: bool=False,
channelIndex: int=0,
jsonResponse: bool=False,
):
"""
Send a position packet to some other node (normally a broadcast)
@@ -384,7 +389,10 @@ def sendPosition(
logging.debug(f"p.time:{p.time}")

if wantResponse:
onResponse = self.onResponsePosition
if jsonResponse:
onResponse = self.onResponsePositionJson
else:
onResponse = self.onResponsePosition
else:
onResponse = None

@@ -401,33 +409,69 @@ def sendPosition(
self.waitForPosition()
return d

def onResponsePosition(self, p):
"""on response for position"""
if p["decoded"]["portnum"] == 'POSITION_APP':
self._acknowledgment.receivedPosition = True
position = mesh_pb2.Position()
position.ParseFromString(p["decoded"]["payload"])

ret = "Position received: "
if position.latitude_i != 0 and position.longitude_i != 0:
ret += f"({position.latitude_i * 10**-7}, {position.longitude_i * 10**-7})"
else:
ret += "(unknown)"
if position.altitude != 0:
ret += f" {position.altitude}m"

if position.precision_bits not in [0,32]:
ret += f" precision:{position.precision_bits}"
elif position.precision_bits == 32:
ret += " full precision"
elif position.precision_bits == 0:
ret += " position disabled"
def onResponsePosition(self,
destinationId: Union[int, str] = BROADCAST_ADDR,
jsonResponse: bool = False
):

print(ret)
def responsePosition(p):
"""on response for position"""
if p["decoded"]["portnum"] == 'POSITION_APP':
self._acknowledgment.receivedPosition = True
position = mesh_pb2.Position()
position.ParseFromString(p["decoded"]["payload"])

elif p["decoded"]["portnum"] == 'ROUTING_APP':
if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE':
our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.")
if jsonResponse:
self.printJsonPosition(position, destinationId)
else:
self.printPosition(position)

elif p["decoded"]["portnum"] == 'ROUTING_APP':
if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE':
if jsonResponse:
our_exit("{}")
else:
our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.")
return responsePosition

@staticmethod
def printPosition(position: mesh_pb2.Position):
ret = "Position received: "
if position.latitude_i != 0 and position.longitude_i != 0:
ret += f"({position.latitude_i * 10**-7}, {position.longitude_i * 10**-7})"
else:
ret += "(unknown)"
if position.altitude != 0:
ret += f" {position.altitude}m"

if position.precision_bits not in [0,32]:
ret += f" precision:{position.precision_bits}"
elif position.precision_bits == 32:
ret += " full precision"
elif position.precision_bits == 0:
ret += " position disabled"

print(ret)

@staticmethod
def printJsonPosition(position: mesh_pb2.Position,
destinationId: Union[int, str] = BROADCAST_ADDR):
json_output = {
"node_id": destinationId
}
json_output["latitude"] = position.latitude_i * 10**-7
json_output["longitude"] = position.longitude_i * 10**-7
json_output["altitude"] = position.altitude

json_output["precision"] = position.precision_bits
json_output["full"] = False
json_output["enabled"] = True
if position.precision_bits == 32:
json_output["full"] = True
elif position.precision_bits == 0:
json_output["enabled"] = False

print(json.dumps(json_output, indent=2))

def sendTraceRoute(self, dest: Union[int, str], hopLimit: int, channelIndex: int=0):
"""Send the trace route"""
@@ -460,7 +504,11 @@ def onResponseTraceRoute(self, p):

self._acknowledgment.receivedTraceRoute = True

def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantResponse: bool=False, channelIndex: int=0):
def sendTelemetry(self,
destinationId: Union[int, str] = BROADCAST_ADDR,
wantResponse: bool = False,
channelIndex: int = 0,
jsonResponse: bool = False):
"""Send telemetry and optionally ask for a response"""
r = telemetry_pb2.Telemetry()

@@ -482,10 +530,9 @@ def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantRespon
if air_util_tx is not None:
r.device_metrics.air_util_tx = air_util_tx

onResponse = None
if wantResponse:
onResponse = self.onResponseTelemetry
else:
onResponse = None
onResponse = self.onResponseTelemetry(destinationId, jsonResponse)

self.sendData(
r,
@@ -498,28 +545,59 @@ def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantRespon
if wantResponse:
self.waitForTelemetry()

def onResponseTelemetry(self, p):
"""on response for telemetry"""
if p["decoded"]["portnum"] == 'TELEMETRY_APP':
self._acknowledgment.receivedTelemetry = True
telemetry = telemetry_pb2.Telemetry()
telemetry.ParseFromString(p["decoded"]["payload"])

print("Telemetry received:")
if telemetry.device_metrics.battery_level is not None:
print(f"Battery level: {telemetry.device_metrics.battery_level:.2f}%")
if telemetry.device_metrics.voltage is not None:
print(f"Voltage: {telemetry.device_metrics.voltage:.2f} V")
if telemetry.device_metrics.channel_utilization is not None:
print(
f"Total channel utilization: {telemetry.device_metrics.channel_utilization:.2f}%"
)
if telemetry.device_metrics.air_util_tx is not None:
print(f"Transmit air utilization: {telemetry.device_metrics.air_util_tx:.2f}%")

elif p["decoded"]["portnum"] == 'ROUTING_APP':
if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE':
our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.")
def onResponseTelemetry(self,
destinationId: Union[int, str] = BROADCAST_ADDR,
jsonResponse: bool = False
):
def responseTelemetry(p):
"""on response for telemetry"""
if p["decoded"]["portnum"] == 'TELEMETRY_APP':
self._acknowledgment.receivedTelemetry = True
telemetry = telemetry_pb2.Telemetry()
telemetry.ParseFromString(p["decoded"]["payload"])

if jsonResponse:
self.printJsonTelemetry(telemetry, destinationId)
else:
self.printTelemetry(telemetry)

elif p["decoded"]["portnum"] == 'ROUTING_APP':
if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE':
if jsonResponse:
our_exit("{}")
else:
our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.")
return responseTelemetry

@staticmethod
def printTelemetry(telemetry: telemetry_pb2.Telemetry):
print("Telemetry received:")
if telemetry.device_metrics.battery_level is not None:
print(f"Battery level: {telemetry.device_metrics.battery_level:.2f}%")
if telemetry.device_metrics.voltage is not None:
print(f"Voltage: {telemetry.device_metrics.voltage:.2f} V")
if telemetry.device_metrics.channel_utilization is not None:
print(
f"Total channel utilization: {telemetry.device_metrics.channel_utilization:.2f}%"
)
if telemetry.device_metrics.air_util_tx is not None:
print(f"Transmit air utilization: {telemetry.device_metrics.air_util_tx:.2f}%")

@staticmethod
def printJsonTelemetry(telemetry: telemetry_pb2.Telemetry,
destinationId: Union[int, str] = BROADCAST_ADDR):
json_output = {
"node_id": destinationId
}
if telemetry.device_metrics.battery_level is not None:
json_output["batteryLevel"] = telemetry.device_metrics.battery_level
if telemetry.device_metrics.voltage is not None:
json_output["voltage"] = telemetry.device_metrics.voltage
if telemetry.device_metrics.channel_utilization is not None:
json_output["channel_utilization"] = telemetry.device_metrics.channel_utilization
if telemetry.device_metrics.air_util_tx is not None:
json_output["air_util_tx"] = telemetry.device_metrics.air_util_tx
print(json.dumps(json_output, indent=2))

def _addResponseHandler(self, requestId: int, callback: Callable):
self.responseHandlers[requestId] = ResponseHandler(callback)
29 changes: 26 additions & 3 deletions meshtastic/tests/test_main.py
Original file line number Diff line number Diff line change
@@ -408,15 +408,38 @@ def test_main_nodes(capsys):

iface = MagicMock(autospec=SerialInterface)

def mock_showNodes():
print("inside mocked showNodes")
def mock_showNodes(jsonResponse: bool = False):
print(f"inside mocked showNodes {jsonResponse}")
assert not jsonResponse

iface.showNodes.side_effect = mock_showNodes
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r"Connected to radio", out, re.MULTILINE)
assert re.search(r"inside mocked showNodes", out, re.MULTILINE)
assert re.search(r"inside mocked showNodes False", out, re.MULTILINE)
assert err == ""
mo.assert_called()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_nodes_json(capsys):
"""Test --nodes --json"""
sys.argv = ["", "--nodes", "--json"]
mt_config.args = sys.argv

iface = MagicMock(autospec=SerialInterface)

def mock_showNodes(jsonResponse: bool = False):
print(f"inside mocked showNodes {jsonResponse}")
assert jsonResponse

iface.showNodes.side_effect = mock_showNodes
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r"inside mocked showNodes True", out, re.MULTILINE)
assert err == ""
mo.assert_called()