-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbackend.py
More file actions
92 lines (75 loc) · 2.83 KB
/
backend.py
File metadata and controls
92 lines (75 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Simple BCI backend for reading data. Does no filtering or classification.
#
# Created........: 28Feb2023 [ollie-d]
# Last Modified..: 16Mar2023 [ollie-d]
import pylsl
import time
import random
import collections
results_out = None
mrkstream_in = None
eeg_in = None
def lsl_mrk_outlet(name):
info = pylsl.StreamInfo(name, 'Markers', 1, 0, pylsl.cf_string, 'ID66666666');
outlet = pylsl.StreamOutlet(info, 1, 1)
print('backend.py created result outlet.')
return outlet
def lsl_inlet(name):
# Resolve all marker streams
inlet = None
tries = 0
info = pylsl.resolve_byprop('name', name)
inlet = pylsl.StreamInlet(info[0], recover=False)
print(f'backend.py has received the {info[0].type()} inlet.')
return inlet
def main():
terminate_backend = False
store_data = False
send_result = False
# Wait for a marker, then start recording EEG data
data = collections.deque() # fast datastructure for appending/popping in either direction
print('main function started')
while True and terminate_backend == False:
# Constantly check for a marker
try:
mrk, t_mrk = mrkstream_in.pull_sample(timeout=0)
eeg, t_eeg = eeg_in.pull_sample(timeout=0)
except pylsl.LostError:
print('Stream lost; terminating backend.')
break
# If we find a marker...
if mrk is not None:
print(f'{mrk[0]}')
# ...either start saving data
if mrk[0] in ('left', 'right'):
store_data = True
data = collections.deque() # reset the buffer
# ...or stop and send result
elif mrk[0] == 'blank':
store_data = False
send_result = True
# ...or terminate the backend
elif mrk[0] == 'die':
store_data = False
send_result = False
terminate_backend = True
# SAVE YOUR DATA HERE IF YOU CARE TO AND IF YOU SAVED YOUR BUFFERS
print('Backend received die command; terminating.')
if store_data and eeg is not None:
data.append(eeg)
elif send_result:
send_result = False
# do processing/classification here
res = ['left', 'right'][random.randint(0, 1)] # inclusive
# Wait 50ms then send a message (to give the task a chance to listen)
time.sleep(0.05)
print('Sent command')
results_out.push_sample([res])
if __name__ == "__main__":
# Initialize our streams
random.seed()
results_out = lsl_mrk_outlet('Result_Stream')
mrkstream_in = lsl_inlet('Task_Markers')
eeg_in = lsl_inlet('ollie_EEG')
# Run out main function
main()