-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f1424bd
commit fced444
Showing
2 changed files
with
173 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
""" | ||
This script is collection of functions show examples of | ||
how to use xipppy to interact with non-implantable front ends | ||
connected to the NIP. | ||
""" | ||
|
||
import xipppy as xp | ||
|
||
FS_CLK = 30000 | ||
|
||
|
||
def print_nip_info(): | ||
""" | ||
xipppy informational functions. | ||
""" | ||
print('NIP serial number= ', xp.get_nip_serial()) | ||
print('NIP exec version number= ', xp.get_nipexec_version()) | ||
print('Connected front end version= ', xp.get_fe_version(0)) | ||
|
||
|
||
def print_min_elapsed(): | ||
""" | ||
This is an example shows definition of xipppy.time(). | ||
""" | ||
elapsed_time_seconds = xp.time() / FS_CLK | ||
elapsed_time_minutes = elapsed_time_seconds / 60 | ||
print("Time elapsed after NIP start up: {:.2f} min ". | ||
format(elapsed_time_minutes)) | ||
|
||
|
||
def get_fe_first_electrode(front_end_type): | ||
""" | ||
This function returns first electrode (index 0) of connected front_end. | ||
:param front_end_type: supported front end type | ||
:return: index 0 electrode of provided front end. None if not connected. | ||
""" | ||
if (len(xp.list_elec(front_end_type)) > 0): | ||
return xp.list_elec(front_end_type)[0] | ||
else: | ||
return None | ||
|
||
|
||
def fe_electrodes_and_streams(front_end_types): | ||
""" | ||
xipppy.list_elec() and xipppy.get_fe_streams() usage: | ||
This example prints electrode numbers and type of supported streams for | ||
each front end that connected to the NIP. | ||
:param front_end_types: list of front ends types | ||
""" | ||
|
||
all_elecs = [] | ||
fe_elecs = [] | ||
for fe_type in front_end_types: | ||
all_elecs.append(xp.list_elec(fe_type)) | ||
if len(all_elecs[-1]) > 0: | ||
fe_elecs.append(all_elecs[-1][0]) | ||
print('{:s} electrode numbers:'.format(fe_type)) | ||
print(all_elecs[-1]) | ||
print('{:s} streams:'.format(fe_type)) | ||
print(xp.get_fe_streams(fe_elecs[-1])) | ||
else: | ||
fe_elecs.append(None) | ||
|
||
|
||
def print_enabled_streams(front_end_types): | ||
""" | ||
xipppy.get_fe(), xipppy.signal() usage: | ||
This example shows available streams for each connected front end. | ||
:param front_end_types: list of frond end types | ||
""" | ||
streams = ['raw', 'lfp', 'hi-res', 'spk', 'stim'] | ||
|
||
print('Enabled streams:') | ||
for i, fe_type in enumerate(front_end_types): | ||
fe_electrode = get_fe_first_electrode(fe_type) | ||
if fe_electrode is not None: | ||
print('{:s} FE(s) (indices {:s}):'.format(fe_type, str( | ||
xp.get_fe(fe_electrode)))) | ||
for stream in streams: | ||
stream_is_active = [] | ||
try: | ||
stream_is_active.append( | ||
str(xp.signal(fe_electrode, stream))) | ||
except xp.exception as e: | ||
stream_is_active.append('N/A') | ||
print( | ||
'{:s}:\t{:s}'.format(stream, ', '.join(stream_is_active))) | ||
|
||
|
||
if __name__ == '__main__': | ||
with xp.xipppy_open(): | ||
front_end_types = ['stim', 'micro', 'nano', 'surf', 'EMG', 'analog'] | ||
# NIP and FE info | ||
print_nip_info() | ||
# Elapsed time | ||
print_min_elapsed() | ||
# Streaming types | ||
fe_electrodes_and_streams(front_end_types) | ||
# Fe type streams: | ||
print_enabled_streams(front_end_types) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
""" | ||
This script is an example of how to get and process spike data using xipppy. | ||
""" | ||
import time | ||
|
||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
from numpy import empty | ||
|
||
import xipppy as xp | ||
|
||
SPK_SAMPLES = 52 | ||
FS_CLK = 30000 | ||
SPK_BUFFER_SIZE = 100 | ||
|
||
|
||
def _unit_color(class_id): | ||
return { | ||
0: 'b', | ||
1: 'm', | ||
2: 'y', | ||
3: 'g', | ||
4: 'r' | ||
}[class_id] | ||
|
||
|
||
def plot_spike_average(elec): | ||
""" | ||
This example plots last 100 spikes received by front end, calculate the | ||
average and plots it. | ||
""" | ||
plt.figure(1) | ||
plt.xlabel('Time(ms)') | ||
plt.ylabel('uV') | ||
spk_count = 1 | ||
spk_avg = empty([0, SPK_SAMPLES]) | ||
spk_buffer = empty([SPK_BUFFER_SIZE, SPK_SAMPLES]) | ||
spk_t = np.arange(0, 52000 / FS_CLK, 1000 / FS_CLK, dtype=np.float32) | ||
spk_buffer[:] = np.nan | ||
|
||
# Note: data starts collecting in a buffer *after* xipppy is instantiated. | ||
# Wait for 1 sec = 1000 samples/1000 Hz. | ||
# The data are concatenated into one array, the reason for time.sleep(1) | ||
# below. | ||
time.sleep(1) | ||
|
||
while plt.fignum_exists(1): | ||
_, seg_data = xp.spk_data(elec) | ||
for p in seg_data: | ||
if spk_count < SPK_BUFFER_SIZE: | ||
plt.title('Spike count: %s' % (spk_count)) | ||
plt.plot(spk_t, p.wf, color=_unit_color(p.class_id), | ||
linewidth=0.3) | ||
spk_buffer[spk_count] = p.wf | ||
spk_avg = np.nanmean(spk_buffer, axis=0) | ||
plt.pause(0.05) | ||
spk_count += 1 | ||
else: | ||
plt.title('Electrode %s - Last %s spikes average' | ||
% (elec, SPK_BUFFER_SIZE)) | ||
plt.plot(spk_t, spk_avg, color='k', linewidth=4.0) | ||
plt.pause(0.1) | ||
plt.show() | ||
break | ||
|
||
|
||
if __name__ == '__main__': | ||
with xp.xipppy_open(): | ||
plot_spike_average(0) |