-
Notifications
You must be signed in to change notification settings - Fork 0
/
tracker.py
111 lines (101 loc) · 3.41 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
import urllib.request
from bs4 import BeautifulSoup
import time
import sqlite3
import cet_bus
from cet_bus.haversine import haversine
from cet_bus.geo import Point
class Arrival:
def __init__(self, lat, lon, bus_id, stop_id, received):
self.lat = lat
self.lon = lon
self.bus_id = bus_id
self.stop_id = stop_id
self.received = received
def __eq__(self, other):
return self.stop_id == other.stop_id
def __hash__(self):
return hash(self.stop_id)
class BusTracker:
# Start is the stop id at which the bus restarts its route
# End is the stop id at which the bus turns around and flips its direction id
def __init__(self, stops, route_id, start, end):
self.latest_position = None
self.latest_stops = set()
self.stops = stops
self.route_id = route_id
self.start = start
self.end = end
self.direction_id = 0
self.bus_stop_radius = 50
self.new_stops = set()
self.latest_stops = set()
def update(self, new_bus):
try:
bus_pos = Point(float(new_bus['latitude']), float(new_bus['longitude']))
except ValueError:
# Empty lat/long
return
except KeyError:
# No lat/long
return
current_stops = set()
for stop in self.stops:
stop_pos = Point(float(stop['stop_lat']), float(stop['stop_lon']))
if haversine(stop_pos, bus_pos) < self.bus_stop_radius:
if stop['stop_id'] == self.start:
self.direction_id = 0
if stop['stop_id'] == self.end:
self.direction_id = 1
if self.direction_id == int(stop['direction_id']):
current_stops.add(Arrival(
stop_pos.x,
stop_pos.y,
new_bus['bus'],
stop['stop_id'],
new_bus['received']))
if self.latest_stops is not None:
self.new_stops = current_stops - self.latest_stops
else:
self.new_stops = set()
self.latest_stops = set(current_stops)
self.latest_position = (new_bus['latitude'], new_bus['longitude'])
class TransitSystemTracker:
def __init__(self, get_bus_data, stops_info, routes, log_arrival, log_position):
self.trackers = {}
self.get_bus_data = get_bus_data
self.stops_info = stops_info
self.routes = routes
self.log_arrival = log_arrival
self.log_position = log_position
def update(self):
bus_info = self.get_bus_data()
for bus in bus_info:
bus_id = str(bus['bus'])
route_id = str(bus['Route'])
if bus_id not in self.trackers:
self.add_bus_tracker(bus_id, route_id)
elif self.trackers[bus_id] is not None:
self.update_bus_route(bus_id, route_id)
self.log_position(bus)
self.trackers[bus_id].update(bus)
news = self.trackers[bus_id].new_stops
if len(news):
print(f'arrival: {news}')
for arrival in news:
self.log_arrival(arrival)
def add_bus_tracker(self, bus_id, route_id):
stops_for_bus = self.stops_info[route_id]
try:
start = self.routes[route_id].start
end = self.routes[route_id].end
except:
print(f'No start/end data for route {route_id}; skipping')
self.trackers[bus_id] = None
return
self.trackers[bus_id] = BusTracker(stops_for_bus, route_id, start, end)
# Re-initialize the tracker for the given bus if the route changes
def update_bus_route(self, bus_id, route_id):
if self.trackers[bus_id].route_id != route_id:
self.add_bus_tracker(bus_id, route_id)