-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
137 lines (113 loc) · 4.29 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from wiener_linien import WienerLinien, DepartureInfos, Departure
import argparse
from enum import Enum
from typing import List, Tuple
from datetime import datetime, timedelta
import asyncio
import board # type: ignore
import busio # type: ignore
import adafruit_character_lcd.character_lcd_rgb_i2c as character_lcd # type: ignore
def replace_umlaute(s: str) -> str:
s = s.replace("Ä", "Ae")
s = s.replace("Ö", "Oe")
s = s.replace("Ü", "Ue")
s = s.replace("ä", "ae")
s = s.replace("ß", "ss")
s = s.replace("ö", "oe")
s = s.replace("ü", "ue")
return s
class UpdateSpeed(Enum):
slow = 30
fast = 5
def next(self) -> "UpdateSpeed":
if self == UpdateSpeed.fast:
return UpdateSpeed.slow
else:
return UpdateSpeed.fast
update_speed = UpdateSpeed.slow
class LCD:
def __init__(self):
lcd_columns = 16
lcd_rows = 2
i2c = busio.I2C(board.SCL, board.SDA)
self.lcd = character_lcd.Character_LCD_RGB_I2C(i2c, lcd_columns, lcd_rows)
self.lcd.clear()
self.lcd.color = [100, 0, 0]
self.departures: List[Tuple[str, List[Departure]]] = []
self.current_station = 0
self.request_update = False
def update_departures(self, departures: DepartureInfos) -> None:
message_before = self.station_string()
self.departures = sorted(departures.items())
message_after = self.station_string()
if message_before != message_after:
self.request_update = True
def station_string(self) -> str:
if self.departures:
name, countdowns = self.departures[self.current_station % len(self.departures)]
countdown_str = " ".join(str(value) for value in countdowns)[:20]
fast_mode = "*** " if update_speed == UpdateSpeed.fast else ""
return fast_mode + replace_umlaute(f"""{name}\n{countdown_str:>16}""")
return ""
async def show_departures(self) -> None:
while True:
if self.request_update:
message = self.station_string()
print("set message")
self.lcd.clear()
self.lcd.message = message
self.request_update = False
await asyncio.sleep(0.1)
async def handle_buttons(self):
while True:
if self.lcd.down_button:
print("Down")
self.current_station += 1
self.request_update = True
await asyncio.sleep(0.5)
if self.lcd.up_button:
print("Up")
self.current_station -= 1
self.request_update = True
await asyncio.sleep(0.5)
if self.lcd.left_button:
print("Left")
self.lcd.move_left()
await asyncio.sleep(0.5)
if self.lcd.right_button:
print("Right")
self.lcd.move_right()
await asyncio.sleep(0.5)
if self.lcd.select_button:
print("Select")
global update_speed
update_speed = update_speed.next()
print(f"Speed = {update_speed.name}")
self.request_update = True
await asyncio.sleep(0.5)
await asyncio.sleep(0.1)
async def realtime_data_loop(RBL_numbers: List[int], lcd: LCD) -> None:
global update_speed
wiener_linien = WienerLinien()
last_update = datetime.now() - timedelta(seconds=1000)
while True:
if (datetime.now() - last_update).seconds > update_speed.value:
data = wiener_linien.get_departures(RBL_numbers)
print(data)
lcd.update_departures(data)
last_update = datetime.now()
await asyncio.sleep(0.5)
async def _main(RBL_numbers: List[int]) -> None:
lcd = LCD()
await asyncio.gather(
realtime_data_loop(RBL_numbers, lcd), lcd.show_departures(), lcd.handle_buttons()
)
def main() -> None:
parser = argparse.ArgumentParser(
description="Display real time information of Wiener Linien station on a Raspberry Pi"
)
parser.add_argument("RBL", type=int, nargs="+", help="RBL numbers for the stations")
args = parser.parse_args()
asyncio.run(_main(args.RBL))
if __name__ == "__main__":
main()