forked from ECE-196/ControlWithPython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
101 lines (74 loc) · 2.79 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from __future__ import annotations
from threading import Thread, Lock
from serial import Serial, SerialException # type: ignore
from serial.tools.list_ports import comports # type: ignore
import tkinter as tk
import tkinter.ttk as ttk
from tkinter.messagebox import showerror
S_OK: int = 0xaa
S_ERR: int = 0xff
def detached_callback(f):
return lambda *args, **kwargs: Thread(target=f, args=args, kwargs=kwargs).start()
class LockedSerial(Serial):
_lock: Lock = Lock()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def read(self, size=1) -> bytes:
with self._lock:
return super().read(size)
def write(self, b: bytes, /) -> int | None:
with self._lock:
super().write(b)
def close(self):
with self._lock:
super().close()
class SerialPortal(tk.Toplevel):
def __init__(self, parent: App):
super().__init__(parent)
self.parent = parent
self.parent.withdraw()
ttk.OptionMenu(self, parent.port, '', *[d.device for d in comports()]).pack()
ttk.Button(self, text='Connect', command=self.connect, default='active').pack()
@detached_callback
def connect(self):
self.parent.connect()
self.destroy()
self.parent.deiconify()
class App(tk.Tk):
ser: LockedSerial
def __init__(self):
super().__init__()
self.title("LED Blinker")
self.port = tk.StringVar()
self.led = tk.BooleanVar()
ttk.Checkbutton(self, text='Toggle LED', variable=self.led, command=self.update_led).pack()
ttk.Button(self, text='Send Invalid', command=self.send_invalid).pack()
ttk.Button(self, text='Disconnect', command=self.disconnect, default='active').pack()
SerialPortal(self)
@detached_callback
def connect(self):
self.ser = LockedSerial(self.port.get())
@detached_callback
def disconnect(self):
self.ser.close()
SerialPortal(self)
@detached_callback
def update_led(self):
self.write(bytes([self.led.get()]))
@detached_callback
def send_invalid(self):
self.write(bytes([0x10]))
def write(self, b: bytes):
try:
self.ser.write(b)
if int.from_bytes(self.ser.read(), 'big') == S_ERR:
showerror('Device Error', 'The device reported an invalid command.')
except SerialException:
showerror('Serial Error', 'Write failed.')
def __enter__(self):
return self
def __exit__(self, *_):
self.disconnect()
if __name__ == '__main__':
with App() as app:
app.mainloop()