forked from jeroennijhof/LoRaWAN
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtexter.py
232 lines (202 loc) · 6.97 KB
/
texter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
import sys
import argparse
from time import sleep
from SX127x.LoRa import *
from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config_ada import BOARD
import LoRaWAN
from LoRaWAN.MHDR import MHDR
from random import randrange
import reset_ada
# Import the SSD1306 module.
import adafruit_ssd1306
from digitalio import DigitalInOut, Direction, Pull
import board
import busio
import RPi.GPIO as GPIO
import helium
import keys
import frame
TIMEOUT = 50
# Button A
btnA = DigitalInOut(board.D5)
btnA.direction = Direction.INPUT
btnA.pull = Pull.UP
# Button B
btnB = DigitalInOut(board.D6)
btnB.direction = Direction.INPUT
btnB.pull = Pull.UP
# Button C
btnC = DigitalInOut(board.D12)
btnC.direction = Direction.INPUT
btnC.pull = Pull.UP
BOARD.setup()
parser = LoRaArgumentParser("LoRaWAN sender")
# Create the I2C interface.
i2c = busio.I2C(board.SCL, board.SDA)
# 128x32 OLED Display
reset_pin = DigitalInOut(board.D4)
display = adafruit_ssd1306.SSD1306_I2C(128, 32, i2c, reset=reset_pin)
# Clear the display.
display.fill(0)
display.text('Helium Texter', 7, 7, 1)
display.show()
width = display.width
height = display.height
class LoRaWANotaa(LoRa):
def __init__(self, verbose = False):
super(LoRaWANotaa, self).__init__(verbose)
self.ack = False
self.msg_index = 0
self.msgs = ['On my way.','LOL','Yes.','No.','Maybe.','Call me.',':)',':(']
self.waiting = False
self.timeout = 0
def on_rx_done(self):
self.clear_irq_flags(RxDone=1)
payload = self.read_payload(nocheck=True)
print("Raw payload: {}".format(payload))
lorawan = LoRaWAN.new(keys.nwskey, keys.appskey)
lorawan.read(payload)
decoded = "".join(list(map(chr, lorawan.get_payload())))
print("Decoded: {}".format(decoded))
print("\n")
if lorawan.get_mhdr().get_mtype() == MHDR.UNCONF_DATA_DOWN:
print("Unconfirmed data down.")
downlink = decoded
res = lorawan.mac_payload.get_fhdr().get_fctrl()
if 0x20 & res != 0: # Check Ack bit.
print("Server ack")
if len(downlink) == 0:
downlink = "Server ack"
elif lorawan.get_mhdr().get_mtype() == MHDR.CONF_DATA_DOWN:
print("Confirmed data down.")
self.ack = True
downlink = decoded
elif lorawan.get_mhdr().get_mtype() == MHDR.CONF_DATA_UP:
print("Confirmed data up.")
downlink = decoded
else:
print("Other packet.")
downlink = ''
self.set_mode(MODE.STDBY)
s = ''
s += " pkt_snr_value %f\n" % self.get_pkt_snr_value()
s += " pkt_rssi_value %d\n" % self.get_pkt_rssi_value()
s += " rssi_value %d\n" % self.get_rssi_value()
s += " msg: %s" % downlink
print(s)
self.update("Rx: {}".format(downlink))
self.waiting = False
self.timeout = 0
def increment(self):
self.tx_counter += 1
data_file = open("frame.py", "w")
data_file.write(
'frame = {}\n'.format(self.tx_counter))
data_file.close()
def tx(self, msg="", conf=False):
if len(msg) > 0:
self.update("Sending message...")
else:
self.update("Checking for msgs...")
if conf:
data = MHDR.CONF_DATA_UP
print('Sending confirmed data up.')
else:
data = MHDR.UNCONF_DATA_UP
print('Sending unconfirmed data up.')
self.increment()
lorawan = LoRaWAN.new(keys.nwskey, keys.appskey)
if self.ack:
print('Sending with Ack {}'.format(self.tx_counter))
lorawan.create(data, {'devaddr': keys.devaddr, 'fcnt': self.tx_counter, 'data': list(map(ord, msg)), 'ack':True})
self.ack = False
else:
print('Sending without Ack {}'.format(self.tx_counter))
lorawan.create(data, {'devaddr': keys.devaddr, 'fcnt': self.tx_counter, 'data': list(map(ord, msg))})
print("tx: {}".format(lorawan.to_raw()))
self.write_payload(lorawan.to_raw())
self.set_mode(MODE.TX)
def update(self,msg=''):
display.fill(0)
display.text('Outgoing msg: {}'.format(self.msgs[self.msg_index]), 0, 0, 1)
display.text(msg, 0, 12, 1)
height = 24
display.text('Change', 0, height, 1)
display.text('Send', 50, height, 1)
display.text('Get', 96, height, 1)
display.show()
def start(self):
self.update()
while True:
sleep(.1)
if not btnA.value: # Select message.
self.msg_index += 1
if self.msg_index >= len(self.msgs):
self.msg_index = 0
self.update()
if not btnB.value: # Send message.
self.setup_tx()
msg = self.msgs[self.msg_index]
self.tx(msg)
self.timeout = TIMEOUT
if not btnC.value: # Check for incoming messages.
self.waiting = True
self.timeout = TIMEOUT
self.setup_tx()
self.tx()
if self.timeout > 0:
self.timeout -= 1
if self.timeout == 0:
if self.waiting:
self.update("No msg received.")
self.waiting = False
else:
self.update()
def set_frame(self,frame):
self.tx_counter = frame
def setup_tx(self):
self.clear_irq_flags(RxDone=1)
self.set_mode(MODE.SLEEP)
self.set_dio_mapping([1,0,0,0,0,0])
self.set_freq(helium.UPFREQ)
self.set_bw(7)
self.set_spreading_factor(7)
self.set_pa_config(max_power=0x0F, output_power=0x0E)
self.set_sync_word(0x34)
self.set_rx_crc(True)
self.set_invert_iq(0)
assert(self.get_agc_auto_on() == 1)
def on_tx_done(self):
self.clear_irq_flags(TxDone=1)
self.set_mode(MODE.SLEEP)
self.set_dio_mapping([0,0,0,0,0,0])
self.set_freq(helium.DOWNFREQ)
self.set_bw(9)
self.set_spreading_factor(7)
self.set_pa_config(pa_select=1)
self.set_sync_word(0x34)
self.set_rx_crc(True)
self.set_invert_iq(1)
self.reset_ptr_rx()
self.set_mode(MODE.RXCONT)
def init(frame):
lora = LoRaWANotaa(False)
lora.set_frame(frame)
try:
print("Sending LoRaWAN tx\n")
lora.start()
except KeyboardInterrupt:
sys.stdout.flush()
print("\nKeyboardInterrupt")
finally:
sys.stdout.flush()
lora.set_mode(MODE.SLEEP)
BOARD.teardown()
def main():
global msg
msg = 'Test'
init(frame.frame)
if __name__ == "__main__":
main()