Skip to content

Commit f5e7fae

Browse files
authored
Merge pull request #913 from SilverBull239/examples/textchat-replymessage-demo
Add textchat.py and replymessage.py examples
2 parents e86a495 + b05a6a6 commit f5e7fae

2 files changed

Lines changed: 146 additions & 0 deletions

File tree

examples/replymessage.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Reply message demo script.
2+
To run: python examples/replymessage.py
3+
To run with TCP: python examples/replymessage.py --host 192.168.1.5
4+
To run with BLE: python examples/replymessage.py --ble 24:62:AB:DD:DF:3A
5+
"""
6+
7+
import argparse
8+
import time
9+
from typing import Any, Optional, Union
10+
from pubsub import pub
11+
import meshtastic.serial_interface
12+
import meshtastic.tcp_interface
13+
import meshtastic.ble_interface
14+
from meshtastic.mesh_interface import MeshInterface
15+
16+
def onReceive(packet: dict, interface: MeshInterface) -> None: # pylint: disable=unused-argument
17+
"""Reply to every received packet with some info"""
18+
text: Optional[str] = packet.get("decoded", {}).get("text")
19+
if text:
20+
rx_snr: Any = packet.get("rxSnr", "unknown")
21+
hop_limit: Any = packet.get("hopLimit", "unknown")
22+
print(f"message: {text}")
23+
reply: str = f"got msg '{text}' with rxSnr: {rx_snr} and hopLimit: {hop_limit}"
24+
print("Sending reply: ", reply)
25+
interface.sendText(reply)
26+
27+
def onConnection(interface: MeshInterface, topic: Any = pub.AUTO_TOPIC) -> None: # pylint: disable=unused-argument
28+
"""called when we (re)connect to the radio"""
29+
print("Connected. Will auto-reply to all messages while running.")
30+
31+
parser = argparse.ArgumentParser(description="Meshtastic Auto-Reply Feature Demo")
32+
group = parser.add_mutually_exclusive_group()
33+
group.add_argument("--host", help="Connect via TCP to this hostname or IP")
34+
group.add_argument("--ble", help="Connect via BLE to this MAC address")
35+
36+
args = parser.parse_args()
37+
38+
pub.subscribe(onReceive, "meshtastic.receive")
39+
pub.subscribe(onConnection, "meshtastic.connection.established")
40+
41+
iface: Optional[Union[
42+
meshtastic.tcp_interface.TCPInterface,
43+
meshtastic.ble_interface.BLEInterface,
44+
meshtastic.serial_interface.SerialInterface
45+
]] = None
46+
47+
# defaults to serial, use --host for TCP or --ble for Bluetooth
48+
try:
49+
if args.host:
50+
# note: timeout only applies after connection, not during the initial connect attempt
51+
# TCPInterface.myConnect() calls socket.create_connection() without a timeout
52+
iface = meshtastic.tcp_interface.TCPInterface(hostname=args.host, timeout=10)
53+
elif args.ble:
54+
iface = meshtastic.ble_interface.BLEInterface(address=args.ble, timeout=10)
55+
else:
56+
iface = meshtastic.serial_interface.SerialInterface(timeout=10)
57+
except KeyboardInterrupt as exc:
58+
raise SystemExit(0) from exc
59+
except Exception as e:
60+
print(f"Error: Could not connect. {e}")
61+
raise SystemExit(1) from e
62+
63+
try:
64+
while True:
65+
time.sleep(1)
66+
except KeyboardInterrupt:
67+
pass
68+
finally:
69+
try:
70+
if iface:
71+
iface.close()
72+
except AttributeError:
73+
pass

examples/textchat.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Simple text chat demo for meshtastic.
2+
To run: python examples/textchat.py
3+
To run with TCP: python examples/textchat.py --host 192.168.1.5
4+
To run with BLE: python examples/textchat.py --ble 24:62:AB:DD:DF:3A
5+
"""
6+
7+
import argparse
8+
from typing import Any, Optional, Union
9+
from pubsub import pub
10+
import meshtastic.serial_interface
11+
import meshtastic.tcp_interface
12+
import meshtastic.ble_interface
13+
from meshtastic.mesh_interface import MeshInterface
14+
15+
def onReceive(packet: dict, interface: MeshInterface) -> None: # pylint: disable=unused-argument
16+
"""called when a packet arrives"""
17+
text: Optional[str] = packet.get("decoded", {}).get("text")
18+
if text:
19+
sender: str = packet.get("fromId", "unknown")
20+
print(f"{sender}: {text}")
21+
22+
def onConnection(interface: MeshInterface, topic: Any = pub.AUTO_TOPIC) -> None: # pylint: disable=unused-argument
23+
"""called when we (re)connect to the radio"""
24+
print("Connected. Type a message and press Enter to send. Ctrl+C to exit.")
25+
26+
parser = argparse.ArgumentParser(description="Meshtastic text chat demo")
27+
group = parser.add_mutually_exclusive_group()
28+
group.add_argument("--host", help="Connect via TCP to this hostname or IP")
29+
group.add_argument("--ble", help="Connect via BLE to this MAC address or device name")
30+
31+
args = parser.parse_args()
32+
33+
pub.subscribe(onReceive, "meshtastic.receive")
34+
pub.subscribe(onConnection, "meshtastic.connection.established")
35+
36+
iface: Optional[Union[
37+
meshtastic.tcp_interface.TCPInterface,
38+
meshtastic.ble_interface.BLEInterface,
39+
meshtastic.serial_interface.SerialInterface
40+
]] = None
41+
42+
# defaults to serial, use --host for TCP or --ble for Bluetooth
43+
try:
44+
if args.host:
45+
# note: timeout only applies after connection, not during the initial connect attempt
46+
# TCPInterface.myConnect() calls socket.create_connection() without a timeout
47+
iface = meshtastic.tcp_interface.TCPInterface(hostname=args.host, timeout=10)
48+
elif args.ble:
49+
iface = meshtastic.ble_interface.BLEInterface(address=args.ble, timeout=10)
50+
else:
51+
iface = meshtastic.serial_interface.SerialInterface(timeout=10)
52+
except KeyboardInterrupt as exc:
53+
raise SystemExit(0) from exc
54+
except Exception as e:
55+
print(f"Error: Could not connect. {e}")
56+
raise SystemExit(1) from e
57+
58+
assert iface is not None
59+
try:
60+
while True:
61+
line = input()
62+
if line:
63+
iface.sendText(line)
64+
except KeyboardInterrupt:
65+
pass
66+
except EOFError:
67+
pass
68+
finally:
69+
try:
70+
if iface:
71+
iface.close()
72+
except AttributeError:
73+
pass

0 commit comments

Comments
 (0)