From fced444ea6130232350d9dd85bb862392097abd7 Mon Sep 17 00:00:00 2001 From: Richard Koehler Date: Fri, 23 Apr 2021 07:47:17 +0200 Subject: [PATCH] Add ripple example scripts --- icn_ripple/front_end_configs.py | 104 ++++++++++++++++++++++++++++++++ icn_ripple/spike_average.py | 69 +++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 icn_ripple/front_end_configs.py create mode 100644 icn_ripple/spike_average.py diff --git a/icn_ripple/front_end_configs.py b/icn_ripple/front_end_configs.py new file mode 100644 index 0000000..4b35f51 --- /dev/null +++ b/icn_ripple/front_end_configs.py @@ -0,0 +1,104 @@ +""" +This script is collection of functions show examples of +how to use xipppy to interact with non-implantable front ends +connected to the NIP. +""" + +import xipppy as xp + +FS_CLK = 30000 + + +def print_nip_info(): + """ + xipppy informational functions. + """ + print('NIP serial number= ', xp.get_nip_serial()) + print('NIP exec version number= ', xp.get_nipexec_version()) + print('Connected front end version= ', xp.get_fe_version(0)) + + +def print_min_elapsed(): + """ + This is an example shows definition of xipppy.time(). + """ + elapsed_time_seconds = xp.time() / FS_CLK + elapsed_time_minutes = elapsed_time_seconds / 60 + print("Time elapsed after NIP start up: {:.2f} min ". + format(elapsed_time_minutes)) + + +def get_fe_first_electrode(front_end_type): + """ + This function returns first electrode (index 0) of connected front_end. + + :param front_end_type: supported front end type + :return: index 0 electrode of provided front end. None if not connected. + """ + if (len(xp.list_elec(front_end_type)) > 0): + return xp.list_elec(front_end_type)[0] + else: + return None + + +def fe_electrodes_and_streams(front_end_types): + """ + xipppy.list_elec() and xipppy.get_fe_streams() usage: + + This example prints electrode numbers and type of supported streams for + each front end that connected to the NIP. + + :param front_end_types: list of front ends types + """ + + all_elecs = [] + fe_elecs = [] + for fe_type in front_end_types: + all_elecs.append(xp.list_elec(fe_type)) + if len(all_elecs[-1]) > 0: + fe_elecs.append(all_elecs[-1][0]) + print('{:s} electrode numbers:'.format(fe_type)) + print(all_elecs[-1]) + print('{:s} streams:'.format(fe_type)) + print(xp.get_fe_streams(fe_elecs[-1])) + else: + fe_elecs.append(None) + + +def print_enabled_streams(front_end_types): + """ + xipppy.get_fe(), xipppy.signal() usage: + This example shows available streams for each connected front end. + + :param front_end_types: list of frond end types + """ + streams = ['raw', 'lfp', 'hi-res', 'spk', 'stim'] + + print('Enabled streams:') + for i, fe_type in enumerate(front_end_types): + fe_electrode = get_fe_first_electrode(fe_type) + if fe_electrode is not None: + print('{:s} FE(s) (indices {:s}):'.format(fe_type, str( + xp.get_fe(fe_electrode)))) + for stream in streams: + stream_is_active = [] + try: + stream_is_active.append( + str(xp.signal(fe_electrode, stream))) + except xp.exception as e: + stream_is_active.append('N/A') + print( + '{:s}:\t{:s}'.format(stream, ', '.join(stream_is_active))) + + +if __name__ == '__main__': + with xp.xipppy_open(): + front_end_types = ['stim', 'micro', 'nano', 'surf', 'EMG', 'analog'] + # NIP and FE info + print_nip_info() + # Elapsed time + print_min_elapsed() + # Streaming types + fe_electrodes_and_streams(front_end_types) + # Fe type streams: + print_enabled_streams(front_end_types) \ No newline at end of file diff --git a/icn_ripple/spike_average.py b/icn_ripple/spike_average.py new file mode 100644 index 0000000..7b2adba --- /dev/null +++ b/icn_ripple/spike_average.py @@ -0,0 +1,69 @@ +""" +This script is an example of how to get and process spike data using xipppy. +""" +import time + +import matplotlib.pyplot as plt +import numpy as np +from numpy import empty + +import xipppy as xp + +SPK_SAMPLES = 52 +FS_CLK = 30000 +SPK_BUFFER_SIZE = 100 + + +def _unit_color(class_id): + return { + 0: 'b', + 1: 'm', + 2: 'y', + 3: 'g', + 4: 'r' + }[class_id] + + +def plot_spike_average(elec): + """ + This example plots last 100 spikes received by front end, calculate the + average and plots it. + """ + plt.figure(1) + plt.xlabel('Time(ms)') + plt.ylabel('uV') + spk_count = 1 + spk_avg = empty([0, SPK_SAMPLES]) + spk_buffer = empty([SPK_BUFFER_SIZE, SPK_SAMPLES]) + spk_t = np.arange(0, 52000 / FS_CLK, 1000 / FS_CLK, dtype=np.float32) + spk_buffer[:] = np.nan + + # Note: data starts collecting in a buffer *after* xipppy is instantiated. + # Wait for 1 sec = 1000 samples/1000 Hz. + # The data are concatenated into one array, the reason for time.sleep(1) + # below. + time.sleep(1) + + while plt.fignum_exists(1): + _, seg_data = xp.spk_data(elec) + for p in seg_data: + if spk_count < SPK_BUFFER_SIZE: + plt.title('Spike count: %s' % (spk_count)) + plt.plot(spk_t, p.wf, color=_unit_color(p.class_id), + linewidth=0.3) + spk_buffer[spk_count] = p.wf + spk_avg = np.nanmean(spk_buffer, axis=0) + plt.pause(0.05) + spk_count += 1 + else: + plt.title('Electrode %s - Last %s spikes average' + % (elec, SPK_BUFFER_SIZE)) + plt.plot(spk_t, spk_avg, color='k', linewidth=4.0) + plt.pause(0.1) + plt.show() + break + + +if __name__ == '__main__': + with xp.xipppy_open(): + plot_spike_average(0)