diff --git a/lantz/drivers/acton/sp2150i.py b/lantz/drivers/acton/sp2150i.py new file mode 100644 index 0000000..6f2847e --- /dev/null +++ b/lantz/drivers/acton/sp2150i.py @@ -0,0 +1,185 @@ +from lantz import Feat, DictFeat, Action +from lantz.messagebased import MessageBasedDriver + +from pyvisa import constants + +from numpy import abs, ceil + +from time import sleep + + +class SP2150i(MessageBasedDriver): + """ + Implements controls for the Princeton Instruments Acton Series SP2150i + Monochromater over the internal USB virtual serial port. + + Communication with the device is a little finnicky, so if you run into + problems, I would suggest adding more buffer clears to avoid garbage + accumulating in the buffer and messing up your commands. + + Author: Peter Mintun (pmintun@uchicago.edu) + Date: 1/21/2016 + """ + DEFAULTS = {'ASRL': {'write_termination': '\r', + 'read_termination': 'ok\r\n', + 'baud_rate': 9600, + 'data_bits': 8, + 'parity': constants.Parity.none, + 'stop_bits': constants.StopBits.one, + 'encoding': 'latin-1', + 'timeout': 2000}} + + max_speed = 100 + wavelength_min = 380 + wavelength_max = 520 + + num_gratings = 8 + + def initialize(self): + """ + """ + super().initialize() + self.clear_buffer() + + def clear_buffer(self): + """ + This function sends an empty query just to clear any junk from the read + buffer...This could probably be done more elegantly...but it works, for + now at least. + """ + return self.query('') + + @Feat(limits=(wavelength_min, wavelength_max)) + def nm(self): + """ + Returns current wavelength of monochromater. + """ + self.clear_buffer() + read = self.query('?NM') + wavelength = read.replace('nm', '') + return float(wavelength) + + @nm.setter + def nm(self, wavelength): + """ + Sets output to specified wavelength, traveling at the current scan + rate. + """ + curr_wavelength = self.nm + scan_rate = self.scan_speed + delta_lambda = abs(curr_wavelength - wavelength) + if delta_lambda > 0.1: + scan_time = ceil(delta_lambda / scan_rate * 60) + 2 # need seconds + self.clear_buffer() + print('Scanning from {}nm to {}nm, will take {}sec'.format( + curr_wavelength, wavelength, scan_time)) + return self.write_message_wait('{0:.1f} NM'.format(wavelength), + scan_time) + else: + return + + + @Feat(limits=(0, max_speed)) + def scan_speed(self): + """ + Get scan rate in nm/min. + """ + self.clear_buffer() + read = self.query('?NM/MIN') + speed = read.replace('nm/min', '') + return float(speed) + + @scan_speed.setter + def scan_speed(self, speed): + """ + Sets current scan speed in nm/min. + """ + return self.write_message_wait('{0:.1f} NM/MIN'.format(speed), 0.1) + + @Feat(limits=(1, 2, 1)) + def grating(self): + """ + Returns the current grating position + """ + response = self.query('?GRATING') + return int(response) + + @grating.setter + def grating(self, grating_num): + """ + Changes the current grating to be the one in slot grating_num. + """ + print('Warning: Untested code! No other gratings were installed.') + print('Changing grating, please wait 20 seconds...') + return self.write_message_wait('{} GRATING'.format(grating_num), 20) + + @Feat(limits=(1, 3, 1)) + def turret(self): + """ + Returns the selected turret number. + """ + response = self.query('?TURRET') + return int(response.replace(' ok', '')) + + @turret.setter + def turret(self, turr_set): + """ + Selects the parameters for the grating on turret turr_set + """ + print('Warning: untested. No other turrets were installed.') + return self.write_message_wait('{} TURRET'.format(turr_set), 1) + + @Feat() + def turret_spacing(self): + """ + Returns the groove spacing of the grating for each turret. + """ + print('Warning: This command does\'t do anything?') + return self.write_message_wait('?TURRETS', 0.5) + + @Feat() + def grating_settings(self): + """ + Returns the groove spacing and blaze wavelength of grating positions + for all possible gratings. + """ + gratings = [] + self.write('?GRATINGS') + self.read() + for i in range(0, self.num_gratings): + gratings.append(self.read()) + self.read() + return gratings + + def write_message_wait(self, message, wait_time): + self.write(message) + sleep(wait_time) + return self.read() + +if __name__ == '__main__': + with SP2150i('ASRL4::INSTR') as inst: + print('== Monochromater Wavelength ==') + print('Wavelength:{}nm'.format(inst.nm)) + print('Scan rate:{}nm/min'.format(inst.scan_speed)) + + print('== Monochromater Information ==') + print('Selected turret:{}'.format(inst.turret)) + print('Selected grating:{}'.format(inst.grating)) + # inst.turret = 3 + # inst.grating = 2 + print('Selected turret:{}'.format(inst.turret)) + print('Selected grating:{}'.format(inst.grating)) + print('Turret Spacing:{}'.format(inst.turret_spacing)) + print('Gratings:{}'.format(inst.grating_settings)) + + for i in range(1, 20): + inst.scan_speed = 100.0 + print('Scan rate:{}nm/min'.format(inst.scan_speed)) + print('Wavelength:{}nm'.format(inst.nm)) + inst.nm = 400.0 + print('Wavelength:{}nm'.format(inst.nm)) + inst.scan_speed = 50.0 + print('Scan rate:{}nm/min'.format(inst.scan_speed)) + inst.nm = 500.0 + print('Wavelength:{}nm'.format(inst.nm)) + print('Cycle {} complete.'.format(i)) diff --git a/lantz/drivers/agilent/__init__.py b/lantz/drivers/agilent/__init__.py new file mode 100644 index 0000000..779fbc7 --- /dev/null +++ b/lantz/drivers/agilent/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +""" + lantz.drivers.aeroflex + ~~~~~~~~~~~~~~~~~~~~~~ + + :company: Aeroflex + :description: Test and measurement equipment and microelectronic solutions. + :website: http://www.aeroflex.com/ + + ---- + + :copyright: 2015 by Lantz Authors, see AUTHORS for more details. + :license: BSD, see LICENSE for more details. +""" + +from .n51xx import N51xx +from .ag33220A import Ag33220A + +__all__ = ['N51xx', 'Ag33220A'] diff --git a/lantz/drivers/agilent/ag33220A.py b/lantz/drivers/agilent/ag33220A.py new file mode 100644 index 0000000..4bcefc7 --- /dev/null +++ b/lantz/drivers/agilent/ag33220A.py @@ -0,0 +1,201 @@ +from lantz.messagebased import MessageBasedDriver +from lantz import Feat, DictFeat, Action + +#from lantz import Q_ + +import socket +import warnings + +class Ag33220A(MessageBasedDriver): + """ + Lantz driver for interfacing with Aiglent 33220A function generator. + + Includes testing code, which should work out of the box assuming you give + it the correct IP address. + + Author: P. Mintun + Date: 10/21/2016 + Version: 0.1 + """ + + DEFAULTS = { + 'COMMON': { + 'write_termination': '\n', + 'read_termination': '\n', + } + } + + function_dict = { + 'sine': 'SIN', + 'square': 'SQU', + 'ramp': 'RAMP', + 'pulse': 'PULS', + 'noise': 'NOIS', + 'DC': 'DC', + 'user': 'USER' + } + + @Feat() + def idn(self): + """ + Identifiies the instrument. + """ + return self.query('*IDN?') + + @Feat(values=function_dict) + def function(self): + """ + Returns the selected output function. + """ + return self.query('FUNC?') + + @function.setter + def function(self, func_type): + """ + Sets the output function. + """ + return self.write('FUNC {}'.format(func_type)) + + @Feat() + def frequency(self): + """ + Queries the output frequency of the function generator. + """ + return float(self.query('FREQ?')) + + @frequency.setter + def frequency(self, Hz): + """ + Sets the output frequency of the function generator (in Hz). + """ + return self.write('FREQ {}'.format(Hz)) + + @Feat(values={'on': 1, 'off': 0}) + def output_toggle(self): + """ + Returns whether or not the output is on or off. + """ + return int(self.query('OUTP?')) + + @output_toggle.setter + def output_toggle(self, on_off): + """ + Sets the output to be either on, or off. + """ + return self.write('OUTP {}'.format(on_off)) + + @Feat(limits=(0.01,10)) + def voltage(self): + """ + Returns the amplitude voltage setting. + """ + return float(self.query('VOLT?')) + + @voltage.setter + def voltage(self, volts): + """ + Sets the amplitude voltage setting to volts. + """ + return self.write('VOLT {}'.format(volts)) + + @Feat() + def voltage_offset(self): + """ + Returns the DC offset voltage, in volts. + """ + return float(self.query('VOLT:OFFS?')) + + @voltage_offset.setter + def voltage_offset(self, V_dc): + """ + Sets the DC offset voltage to be V_dc, in volts. + """ + return self.write('VOLT:OFFS {}'.format(V_dc)) + + # special commands for pulse mode + @Feat() + def pulse_width(self): + """ + Returns the pulse width in nanoseconds. + """ + return float(self.query('FUNC:PULS:WIDT?')) + + @pulse_width.setter + def pulse_width(self, sec): + """ + Sets the pulse width in seconds. + """ + return self.write('FUNC:PULS:WIDT {}'.format(sec)) + + # special functions for square waves + @Feat() + def square_duty_cycle(self): + """ + Returns the duty cycle of a square wave output, in percent. + """ + return float(self.query('FUNC:SQU:DCYC?')) + + @square_duty_cycle.setter + def square_duty_cycle(self, duty_cycle): + """ + Sets the square wave duty cycle to be duty_cycle. + """ + return self.write('FUNC:SQU:DCYC {}'.format(duty_cycle)) + + +if __name__ == '__main__': + address = 'xxx.yyy.zzz.aaa' + inst_num = 'instYY' + + print('Need to set IP address and inst_num!') + + with Ag33220A('TCPIP0::{}::{}::INSTR'.format(address, inst_num)) as inst: + print('Identification: {}'.format(inst.idn)) + + print('Function: {}'.format(inst.function)) + inst.function = 'sine' + print('Function: {}'.format(inst.function)) + inst.function = 'pulse' + print('Function: {}'.format(inst.function)) + inst.function = 'square' + print('Function: {}'.format(inst.function)) + + + print('Frequency:{}Hz'.format(inst.frequency)) + inst.frequency = 1000.0 + print('Frequency:{}Hz'.format(inst.frequency)) + inst.frequency = 20000.0 + print('Frequency:{}Hz'.format(inst.frequency)) + + + print('Output:{}'.format(inst.output_toggle)) + inst.output_toggle = 'off' + print('Output:{}'.format(inst.output_toggle)) + inst.output_toggle = 'on' + print('Output:{}'.format(inst.output_toggle)) + + print('Amplitude voltage:{}V'.format(inst.voltage)) + inst.voltage = 2.5 + print('Amplitude voltage:{}V'.format(inst.voltage)) + inst.voltage = 5.0 + print('Amplitude voltage:{}V'.format(inst.voltage)) + + print('Offset voltage:{}'.format(inst.voltage_offset)) + inst.voltage_offset = 2.1 + print('Offset voltage:{}'.format(inst.voltage_offset)) + inst.voltage_offset = 2.5 + print('Offset voltage:{}'.format(inst.voltage_offset)) + + inst.function = 'pulse' + print('Pulse width:{}s'.format(inst.pulse_width)) + inst.pulse_width = 50e-9 + print('Pulse width:{}s'.format(inst.pulse_width)) + inst.pulse_width = 20e-9 + print('Pulse width:{}s'.format(inst.pulse_width)) + + inst.function = 'square' + print('Duty cycle:{}'.format(inst.square_duty_cycle)) + inst.square_duty_cycle = 25.0 + print('Duty cycle:{}'.format(inst.square_duty_cycle)) + inst.square_duty_cycle = 50.0 + print('Duty cycle:{}'.format(inst.square_duty_cycle)) diff --git a/lantz/drivers/agilent/n51xx.py b/lantz/drivers/agilent/n51xx.py new file mode 100644 index 0000000..9ff3447 --- /dev/null +++ b/lantz/drivers/agilent/n51xx.py @@ -0,0 +1,156 @@ +from lantz.messagebased import MessageBasedDriver +from lantz import Feat, DictFeat, Action + +#from lantz import Q_ + +import socket +import warnings + +# class MontanaWarning(Warning): +# """ +# """ +# def __init__(self): +# super().__init__() + +class N51xx(MessageBasedDriver): + """ + Lantz driver for interfacing with AR SG6000 vector signal generator. + + Includes testing code, which should work out of the box assuming you give + it the correct IP address. + + Author: P. Mintun + Date: 8/31/2016 + Version: 0.1 + """ + + DEFAULTS = { + 'COMMON': { + 'write_termination': '\n', + 'read_termination': '\n', + } + } + + freq_limits = (1e5,6e9) + power_limits = (-200,0) + + + @Feat() + def idn(self): + """ + Identifiies the instrument. + """ + return self.query('*IDN?') + + + @Feat(values={True: 1, False: 0}) + def rf_toggle(self): + """ + enable or disable RF output + """ + return float(self.query('OUTP:STAT?')) + + @rf_toggle.setter + def rf_toggle(self,value): + self.write('OUTP:STAT {}'.format(value)) + + @Feat(units='Hz', limits=freq_limits) + def frequency(self): + """ + RF output frequency, in Hz. + """ + return self.query('SOUR:FREQ:CW?') + + @frequency.setter + def frequency(self, frequency): + """ + RF output frequency, in Hz. + """ + return self.write('SOUR:FREQ:CW {}Hz'.format(frequency)) + + @Feat(limits=power_limits) + def power(self): + """ + RF output power, in dBm. + """ + return self.query('SOUR:POW?') + + @power.setter + def power(self, value): + """ + Sets RF output power, in dBm. + """ + return self.write('SOUR:POW {}DBM'.format(value)) + + @Feat(units='radians',limits=(-3.14,3.14)) + def phase(self): + """ + Returns RF signal carrier phase in radians + """ + return self.query('SOUR:PHAS?') + + @phase.setter + def phase(self, radians): + """ + Sets RF signal carrier phase in degrees + """ + return self.write('SOUR:PHAS {}RAD'.format(radians)) + + @Feat(limits=(-200,30)) + def power_limit(self): + """ + Returns the user set output limit of the signal generator, in dBm. + """ + print('For some reason, this command doesn\'t seem to work...') + return self.write('SOUR:POW:USER:MAX?') + + @power_limit.setter + def power_limit(self, max_dBm): + """ + Sets the maximum output of the signal generator in dBm. + """ + self.write('SOUR:POW:USER:ENAB 0') + return self.write('SOUR:POW:USER:MAX {}'.format(max_dBm)) + + + + + + +if __name__ == '__main__': + address = '192.168.1.102' + inst_num = 'inst0' + + with N51xx('TCPIP0::{}::{}::INSTR'.format(address, inst_num)) as inst: + print('Identification: {}'.format(inst.idn)) + + inst.power_limit = 0.0 + print('Power limit:{}dBm'.format(inst.power_limit)) + + print('RF output settings') + print('RF on?:{}'.format(inst.rf_toggle)) + print('Output frequency: {}'.format(inst.frequency)) + inst.frequency = 1e6 + print('Output frequency: {}'.format(inst.frequency)) + inst.frequency = 2.87e9 + print('Output frequency: {}'.format(inst.frequency)) + + print('Output power: {}dBm'.format(inst.power)) + inst.power = -10.0 + print('Output power: {}dBm'.format(inst.power)) + inst.power = 0.0 + print('Output power: {}dBm'.format(inst.power)) + inst.power = -10.0 + print('Output power: {}dBm'.format(inst.power)) + + print('Phase:{}'.format(inst.phase)) + inst.phase = 3.14159/2.0 + print('Phase:{}'.format(inst.phase)) + inst.phase = 0.0 + print('Phase:{}'.format(inst.phase)) + + + inst.rf_toggle = 1 + print('RF on?:{}'.format(inst.rf_toggle)) + inst.rf_toggle = 0 + print('RF on?:{}'.format(inst.rf_toggle)) diff --git a/lantz/drivers/cobrite/cobritedx1.py b/lantz/drivers/cobrite/cobritedx1.py new file mode 100644 index 0000000..a680296 --- /dev/null +++ b/lantz/drivers/cobrite/cobritedx1.py @@ -0,0 +1,393 @@ +from lantz import Feat, DictFeat, Action +from lantz.messagebased import MessageBasedDriver +from pyvisa import constants + +from time import sleep + + +class CoBriteDX1(MessageBasedDriver): + """ + + """ + + # Set these parameters from the datasheet of your model. + channels = ['1,1,1'] # can add additional channels here! + + freq_min = 191.3 # THz + freq_max = 196.25 # THz + + min_power = 6 # dBm + max_power = 15.5 # dBm + offset_lim = 12.0 # GHz + + sbs_lims = (0, 1, 0.1) + + THz_to_nm = 299792 # 1 THz in nm + lambda_min = THz_to_nm/freq_max + lambda_max = THz_to_nm/freq_min + + on_off = {'off': 0, 'on': 1} + yes_no = {'no': 0, 'yes': 1} + + DEFAULTS = {'ASRL': {'write_termination': ';', + 'read_termination': ';', + 'baud_rate': 115200, + 'data_bits': 8, + 'parity': constants.Parity.none, + 'stop_bits': constants.StopBits.one, + 'encoding': 'utf-8', + 'timeout': 2000}} + + def initialize(self): + """ + """ + super().initialize() + sleep(1) + self.echo = 'off' # need to do this to drop command echoing + + @Feat(values=on_off) + def echo(self): + """ + Checks to see if ECHO mode is enabled. Note that the code in this + driver assumes that the command echo is disabled. + """ + return int(self.query(':SYS:ECHO?')) + + @echo.setter + def echo(self, status): + """ + Sets echo mode to be controlled by on_off. + """ + print(':SYS:ECHO {}'.format(status)) + return self.query(':SYS:ECHO {}'.format(status)) + + @Feat() + def idn(self): + """ + Returns the laser firmware and hardware information. + """ + return self.query('*idn?') + + @Feat() + def layout(self): + """ + Returns the configuration of the attached system. + + This will be in format: + SYSTEM , , + ,...\n , , \n + , \n... + """ + return self.query(':SYS:LAY?') + + @DictFeat(keys=channels, limits=(lambda_min, lambda_max)) + def wavelength(self, channel): + """ + Returns the current laser wavelength, in nm. + """ + result = self.query(':SOUR:WAV? {}'.format(channel)) + return float(result.replace(channel + ',', '')) + + @wavelength.setter + def wavelength(self, channel, lambd): + """ + Sets the current laser wavelength to lambda (nm). + """ + delay = 1 + value = self.query(':SOUR:WAV {}, {:.4f}'.format(channel, lambd)) + while not self.operation_complete: + sleep(delay) + #self.read() + return value + + @DictFeat(keys=channels, limits=(lambda_min, lambda_max)) + def wavelength_lim(self, channel): + """ + Returns wavelength limits of the laser at channel in nm. + """ + result = self.query(':SOUR:WAV:LIM? {}'.format(channel)) + return self.split_floats(channel, result) + + @DictFeat(keys=channels, limits=(freq_min, freq_max)) + def frequency(self, channel): + """ + Returns the frequency of the laser at channel in THz. + """ + return float(self.query(':SOUR:FREQ? {}'.format(channel))) + + @frequency.setter + def frequency(self, channel, THz): + """ + Sets the frequency of the laser at channel to THz. + """ + delay = 1 + value = self.write(':SOUR:FREQ {},{:.4f}'.format(channel, THz)) + while not self.operation_complete: + sleep(delay) + self.read() + return value + + @DictFeat(keys=channels, limits=(freq_min, freq_max)) + def frequency_lim(self, channel): + """ + Return the frequency limits of the laser at channel in THz. + """ + result = self.query(':SOUR:FREQ:LIM? {}'.format(channel)) + return self.split_floats(channel, result) + + @DictFeat(keys=channels, limits=(0, offset_lim)) + def offset(self, channel): + """ + Returns the offset of the laser at channel in GHz. + """ + return float(self.query(':SOUR:OFF? {}'.format(channel))) + + @offset.setter + def offset(self, channel, GHz): + """ + Sets the offset of the laser at channel to GHz. + """ + delay = 1 + value = self.write(':SOUR:OFF {},{:.4f}'.format(channel, GHz)) + while not self.operation_complete: + sleep(delay) + self.read() + return value + + @DictFeat(keys=channels, limits=(0, offset_lim)) + def offset_lim(self, channel): + """ + Returns the fine tuning limits of the laser at channel in GHz. + """ + return float(self.query(':SOUR:OFF:LIM? {}'.format(channel))) + + @DictFeat(keys=channels, limits=(min_power, max_power)) + def power(self, channel): + """ + Returns the laser power. + """ + return float(self.query(':SOUR:POW? {}'.format(channel))) + + @power.setter + def power(self, channel, dBm): + """ + Sets the power of the laser at channel to dBm. + """ + delay = 1 + value = self.write(':SOUR:POW {},{:.4f}'.format(channel, dBm)) + while not self.operation_complete: + sleep(delay) + self.read() + return value + + @DictFeat(keys=channels, limits=(0, max_power)) + def power_lim(self, channel): + """ + Returns maximum power of the laser at channel in dBm. + """ + result = self.query(':SOUR:FREQ:LIM? {}'.format(channel)) + return self.split_floats(channel, result) + + @DictFeat(keys=channels) + def limits(self, channel): + """ + Returns all limits of the laser at channel in form: + [Min freq (THz), Max freq(THz), Tune range (GHz), Min pow (dBm), + Max power (dBm)] + """ + result = self.query(':SOUR:LIM? {}'.format(channel)) + return self.split_floats(channel, result) + + @DictFeat(keys=channels, values=on_off) + def state(self, channel): + """ + Returns the current state of the laser at channel. + """ + return int(self.query(':SOUR:STAT? {}'.format(channel))) + + @state.setter + def state(self, channel, on_status): + """ + Sets the state of the laser at channel to on_status + """ + delay = 1 + value = self.write(':SOUR:STAT {},{}'.format(channel, on_status)) + while not self.operation_complete: + sleep(delay) + self.read() + return value + + @DictFeat(keys=channels) + def configuration(self, channel): + """ + Returns the current laser configuration in format: + [Freq (THz), Fine tuning (GHz), Power (dBm), On/off (0/1), Busy (0/1), + Dither (0/1)] + """ + result = self.query(':SOUR:CONF? {}'.format(channel)).split(',') + return (list(map(float, result[0:3])) + list(map(int, result[3:6]))) + + @DictFeat(keys=channels, values=yes_no) + def busy(self, channel): + """ + Query state of a device. + If 1, the laser is currently tuning and not settled. + """ + return int(self.query(':SOUR:BUSY? {}'.format(channel))) + + @DictFeat(keys=channels) + def monitor(self, channel): + """ + Returns monitor information from laser in format: + [LD chip temp (deg C), LD base temp (deg C), LD chip current (mA), + TEC current (mA)] + """ + result = self.query(':SOUR:MON? {}'.format(channel)) + return self.split_floats(channel, result) + + @DictFeat(keys=channels, values=on_off) + def dither(self, channel): + """ + Queries whether or not the laser at channel is on or off. + """ + return int(self.query(':SOUR:DIT? {}'.format(channel))) + + @dither.setter + def dither(self, channel, dither_status): + """ + Sets the dither tone of the laser at channel to be dither_status. + """ + return self.query(':SOUR:DIT {},{}'.format(channel, dither_status)) + + @DictFeat(keys=channels, values=on_off) + def sbs(self, channel): + """ + Queries whether or not the stimulated Brillouin scattering (SBS) + supression is on or off. + """ + return int(self.query(':SOUR:SBS:STAT? {}'.format(channel))) + + @sbs.setter + def sbs(self, channel, sbs_status): + """ + Sets the stimulated Brillouin scattering (SBS) supression to be + sbs_status. + """ + return self.query(':SOUR:SBS:STAT {},{}'.format(channel, sbs_status)) + + @Feat(limits=sbs_lims) + def sbs_freq(self): + """ + Queries the stimulated Brillouin scattering (SBS) supression amplitude + for all lasers in GHz. + """ + return float(self.query(':SOUR:SBS:FREQ?')) + + @sbs_freq.setter + def sbs_freq(self, GHz): + """ + Sets the stimulated Brillouin scattering (SBS) supression amplitude + for all lasers to GHz. + """ + return self.query(':SOUR:SBS:FREQ {}'.format(GHz)) + + @Feat() + def operation_complete(self): + """ + Operation complete query, checks to see if all lasers are settled. + """ + if self.query('*opc?') == '1': + return True + else: + return False + + @Action() + def save_curr_state(self, channel): + """ + Permanently saves the current laser port state for channel, which will + be loaded again after a power on/off cycle or reset. Autostart must be + enabled for this to work. + """ + if channel in self.channels: + return self.query(':SYS:SCSTAT'.format(channel)) + else: + print('Invalid channel.') + return 0 + + @Feat(values=on_off) + def autostart(self): + """ + Check to see if laser autostart is enabled. + """ + return int(self.query(':SYS:AUTOSTA?')) + + @autostart.setter + def autostart(self, autostart_status): + """ + Sets the laser autostart to be autostart_status. + """ + return self.query(':SYS:AUTOSTA'.format(autostart_status)) + + def split_floats(self, channel, result): + """ + Helper function for returning tuples returned during communication. + """ + ret_vals = [] + for val in result.replace(channel + ',', '').split(','): + ret_vals.append(float(val)) + return ret_vals + +if __name__ == '__main__': + with CoBriteDX1.via_serial(8) as inst: + inst.echo = 'off' + print('== System Information ==') + print('IDN:{}'.format(inst.idn)) + print('Layout:{}'.format(inst.layout)) + print('Autostart:{}'.format(inst.autostart)) + + print('== System Control Demo ==') + + for chan in inst.channels: + print('Laser at channel: {}'.format(chan)) + print('== Laser Parameters ==') + print('Wavelength: {} nm'.format(inst.wavelength[chan])) + print('Frequency: {} THz'.format(inst.frequency[chan])) + print('Power: {} dBm'.format(inst.power[chan])) + print('Offset: {} GHz'.format(inst.offset[chan])) + print('State: {}'.format(inst.state[chan])) + print('Busy?: {}'.format(inst.busy[chan])) + print('Configuration: {}'.format(inst.configuration[chan])) + print('Dither?: {}'.format(inst.dither[chan])) + print('SBS?: {}'.format(inst.sbs[chan])) + print('SBS Frequency: {} GHz'.format(inst.sbs_freq)) + + print('== Laser limits ==') + print('Wavelength limits: {} nm'.format(inst.wavelength_lim[chan])) + print('Frequency limits: {} THz'.format(inst.frequency_lim[chan])) + print('Offset limit: {} GHz'.format(inst.offset_lim[chan])) + print('Power limit: {} dBm'.format(inst.power_lim[chan])) + print('All limits: {}'.format(inst.limits[chan])) + + print('== Laser Control Demo ==') + #print('Saving current state...') + #inst.save_curr_state(channel=chan) + + inst.frequency[chan] = 195.3 + print('Configuration: {}'.format(inst.configuration[chan])) + inst.wavelength[chan] = 1530.1234 + print('Configuration: {}'.format(inst.configuration[chan])) + inst.frequency[chan] = 191.3 + inst.offset[chan] = 6.5 + inst.state[chan] = 'on' + print('Configuration: {}'.format(inst.configuration[chan])) + print('Monitor: {}'.format(inst.monitor[chan])) + inst.offset[chan] = 12 + inst.power[chan] = 6.1 + print('Configuration: {}'.format(inst.configuration[chan])) + print('Busy:{}'.format(inst.busy[chan])) + inst.power[chan] = 15.5 + print('Configuration: {}'.format(inst.configuration[chan])) + inst.offset[chan] = 0.0 + print('Configuration: {}'.format(inst.configuration[chan])) + inst.state[chan] = 'off' + print('Configuration: {}'.format(inst.configuration[chan])) diff --git a/lantz/drivers/hinds/hindspem90.py b/lantz/drivers/hinds/hindspem90.py new file mode 100644 index 0000000..db5e401 --- /dev/null +++ b/lantz/drivers/hinds/hindspem90.py @@ -0,0 +1,170 @@ +from lantz import Feat, DictFeat, Action +from lantz.messagebased import MessageBasedDriver +from pyvisa import constants + +from numpy import ceil + +from time import sleep + +from lantz.log import log_to_screen, DEBUG + + +class HindsPEM90(MessageBasedDriver): + """ + + """ + + # Set paramters here + off_on = {'off': 1, 'on': 0} + waves_limits = (0.0, 19999.9) + retardation_limits = (0.0, 1.0) + + DEFAULTS = {'ASRL': {'write_termination': '\r', + 'read_termination': '\r\n', + 'baud_rate': 9600, + 'data_bits': 8, + 'parity': constants.Parity.none, + 'stop_bits': constants.StopBits.one, + 'encoding': 'utf-8', + 'timeout': 3000}} + + def initialize(self): + """ + """ + super().initialize() + self.reset() + self.echo = 'off' + + @Feat(values=off_on) + def echo(self): + """ + Checks to see if ECHO mode is enabled. Note that the code in this + driver assumes that the command echo is disabled. + """ + + print('Can\'t read this, can only set.') + return 0 + + @echo.setter + def echo(self, status): + """ + Sets echo mode to be controlled by on_off. + """ + self.clear_buffer() + result = self.write('E:{}'.format(status)) + self.read() + return result + + + @Feat(limits=waves_limits) + def wavelength(self): + """ + Reads out current wavelength in nm + """ + self.clear_buffer() + self.write('W') + self.read() + return float(self.read())/10.0 + + @wavelength.setter + def wavelength(self, nm): + """ + Sets wavelength in nm. + """ + result = self.write('W:{0:0>6.0f}'.format(nm*10.0)) + self.read() + return result + + @Feat(limits=retardation_limits) + def retardation(self): + """ + Reads out current retardation in wave units + """ + self.clear_buffer() + self.write('R') + self.read() + return float(self.read())/1000.0 + + @retardation.setter + def retardation(self, wave_units): + """ + Sets retardation in wave units. + """ + return self.write('R:{0:04.0f}'.format(ceil(wave_units*1000))) + + @Feat() + def frequency(self): + """ + Reads out the reference frequency in hertz + """ + self.clear_buffer() + self.write('F') + self.read() + return float(self.read()) + + @Feat() + def frequency2(self): + """ + Reads out the reference frequency2 in hertz + """ + self.clear_buffer() + self.query('2F') + return float(self.read()) + + @Feat(values=off_on) + def inhibitor(self): + """ + Returns 0 for the retardation inhibitor + """ + return 0 + + @inhibitor.setter + def inhibitor(self, status): + """ + Sets the mode to be controlled by on_off. + """ + print('I:{}'.format(status)) + return self.query('I:{}'.format(status)) + + @Action() + def reset(self): + """ + Resets PEM-90 to default factory settings. + """ + print('Resetting PEM90...') + self.clear_buffer() + return self.write('Z') + + def clear_buffer(self): + """ + Clears the buffer of PEM-90. + """ + while True: + try: + self.read() + except: + print('Timeout?') + return + + + +if __name__ == '__main__': + print('Hi') + log_to_screen(DEBUG) + with HindsPEM90.via_serial(12) as inst: + for a in range(0, 100): + print('Frequency:{}Hz'.format(inst.frequency)) + print('Wavelength:{}nm'.format(inst.wavelength)) + print('Retardation:{}'.format(inst.retardation)) + + inst.wavelength = 500.0 + print('Wavelength:{}nm'.format(inst.wavelength)) + + inst.retardation = 0.5 + print('Retardation:{}'.format(inst.retardation)) + + inst.wavelength = 400.0 + print('Wavelength:{}nm'.format(inst.wavelength)) + + inst.retardation = 0.25 + print('Retardation:{}'.format(inst.retardation)) diff --git a/lantz/drivers/kepco/bop.py b/lantz/drivers/kepco/bop.py new file mode 100644 index 0000000..ab5d8f1 --- /dev/null +++ b/lantz/drivers/kepco/bop.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +""" + lantz.drivers.kepco.bop + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Implementation of Kepco BOP power supply + Author: Kevin Miao + Date: 1/18/2016 +""" + +from lantz import Action, Feat +from lantz.messagebased import MessageBasedDriver + +class BOP(MessageBasedDriver): + + @Feat() + def idn(self): + return self.query('*IDN?') + + @Action(values={'curr', 'volt'}) + def mode(self, value): + """ + sets operation mode + curr - constant current mode + volt - constant voltage mode + """ + return self.write('FUNC:MODE {}'.format(value)) + + @Feat(units='A') + def current(self): + return self.query('MEAS:CURR?') + + @current.setter + def current(self, value): + self.write('CURR {:1.1f}'.format(value)) + + @Feat(units='V') + def voltage(self): + return self.query('MEAS:VOLT?') + + @voltage.setter + def voltage(self, value): + self.write('VOLT {:1.1f}'.format(value)) diff --git a/lantz/drivers/lakeshore/lakeshore332.py b/lantz/drivers/lakeshore/lakeshore332.py new file mode 100644 index 0000000..71ef8af --- /dev/null +++ b/lantz/drivers/lakeshore/lakeshore332.py @@ -0,0 +1,280 @@ +# Lakeshore 332 Temperature Controller Driver +# Peter Mintun + +# This file is a driver for the Lakeshore 332 series temperature controller. + +# Some sort of license information goes here. + +from lantz import Feat, Action, DictFeat +from lantz.messagebased import MessageBasedDriver +from time import sleep + + +class Lakeshore332(MessageBasedDriver): + """ + Lakeshore 332 Temperature controlller. + + This class, based off of the Lantz MessageBasedDriver class, implements a + set of basic controls for the Lakeshore 332 series temperature controller. + It essentially a port of a nice driver written for QtLab by Reinier Heeres. + + Full documentation of the device is available at: + http://www.lakeshore.com/ObsoleteAndResearchDocs/332_Manual.pdf + """ + + # These defaults assume that you have set the IEEE Term setting to: Lf Cr + DEFAULTS = {'COMMON': {'write_termination': '\n', + 'read_termination': ''}} + + GPIB_name = None + GPIB_address = -1 + + channels = ['a', 'b'] + loops = ['1', '2'] + heater_range_vals = {'off': 0, 'low': 1, 'medium': 2, 'high': 3} + heater_status_vals = {'no error': 0, 'open load': 1, 'short': 2} + controller_modes = {'local': 0, 'remote': 1, 'remote, local lockout': 2} + cmodes = {'manual PID': 1, 'zone': 2, 'open loop': 3, 'AutoTune PID': 4, + 'AutoTune PI': 5, 'AutoTune P': 6} + + T_min = 0 + T_max = 350 + + T_min_set = 1.8 + T_max_set = 350 + + _verbose = True + + @Feat() + def idn(self): + """ + Returns the instrument identification. + """ + print('getting IDN') + return self.query('*IDN?') + + @Action() + def reset(self): + """ + Resets the Lakeshore 332 temperature controller. + """ + self.write('*RST') + + @DictFeat(limits=(T_min, T_max), keys=channels) + def kelvin_meas(self, channel): + """ + Returns measured temperature reading from specified channel in Kelvin. + """ + return float(self.query('KRDG?{}'.format(channel))) + + @DictFeat(keys=channels) + def sensor(self, channel): + """ + Returns sensor reading from specified channel. + """ + return float(self.query('SRDG?{}'.format(channel))) + + @Feat(values=heater_status_vals) + def heater_status(self): + """ + Returns the heater status. + """ + return int(self.query('HTRST?')) + + @Feat(values=heater_range_vals) + def heater_range(self): + """ + Queries the instrument, prints a message describing the current heater + range setting, then returns the heater range value. + """ + return int(self.query('RANGE?')) + + @heater_range.setter + def heater_range(self, heater_setting): + """ + Sets heater range to heater_setting. + + heater_setting must be an integer between 0 and 3 inclusive. + """ + return self.write('RANGE {}'.format(heater_setting)) + + @Feat() + def heater_output_1(self): + """ + Returns Loop 1 heater output in percent (%). + """ + return float(self.query('HTR?')) + + @Feat() + def heater_output_2(self): + """ + Returns Loop 2 heater output in percent (%). + """ + return float(self.query('AOUT?')) + + @Feat(values=controller_modes) + def mode(self): + """ + Reads the mode setting of the controller. + """ + return int(self.query('MODE?')) + + @mode.setter + def mode(self, mode): + """ + Sets controller mode, valid mode inputs are: + local (0) + remote (1) + remote, local lockout (2) + """ + return self.query('MODE{}'.format(mode)) + + @DictFeat(keys=loops) + def pid(self, loop): + """ + Get parameters for PID loop. + """ + return self.query('PID?{}'.format(loop)) + + @pid.setter + def pid(self, loop, pid): + """ + Get parameters for PID loop + """ + p = pid[0] + i = pid[1] + d = pid[2] + return self.query('PID{},{},{},{}'.format(loop, p, i, d)) + + @DictFeat(limits=(T_min_set, T_max_set), keys=loops) + def setpoint(self, channel): + """ + Return the temperature controller setpoint. + """ + return float(self.query('SETP?{}'.format(channel))) + + @setpoint.setter + def setpoint(self, loop, T_set): + """ + Sets the setpoint of channel channel to value value + """ + self.query('SETP{},{}'.format(loop, T_set)) + sleep(0.05) + return + + @DictFeat(limits=(0, 100), keys=loops) + def mout(self, loop): + """ + Returns loop manual heater power output. + """ + return self.query('MOUT?{}'.format(loop)) + + @mout.setter + def mout(self, loop, percent): + """ + Sets loop manual heater power output in percent. + """ + return self.query('MOUT{},{}'.format(loop, percent)) + + @DictFeat(values=cmodes, keys=loops) + def cmode(self, loop): + """ + Returns the control mode according to the following table. + 'manual PID' (1) + 'zone'(2) + 'open loop' (3) + 'AutoTune PID' (4) + 'AutoTune PI' (5) + 'AutoTune P' (6) + """ + return int(self.query('CMODE?{}'.format(loop))) + + @cmode.setter + def cmode(self, loop, value): + """ + Sets the control mode according to the following table. + 'manual PID' (1) + 'zone'(2) + 'open loop' (3) + 'AutoTune PID' (4) + 'AutoTune PI' (5) + 'AutoTune P' (6) + """ + return self.query('CMODE{},{}'.format(loop, value)) + + +if __name__ == '__main__': + with Lakeshore332('GPIB0::16::INSTR') as inst: + print('The instrument identification is ' + inst.idn) + + print('resetting...') + inst.reset + print('reset.') + + # Testing mode switching functionality + print('The current mode is ' + inst.mode + '.') + inst.mode = 'remote, local lockout' + print('Now the mode is ' + inst.mode + '.') + inst.mode = 'remote' + print('Now the mode is ' + inst.mode + '.') + + # Testing Kelvin read functionality + print('Current temperature on channel a is ' + + str(inst.kelvin_meas['a']) + ' Kelvin') + print('Current temperature on channel b is ' + + str(inst.kelvin_meas['b']) + ' Kelvin') + + # Testing sensor reading functionality + print('Sensor reading on channel a is ' + str(inst.sensor['a'])) + print('Sensor reading on channel b is ' + str(inst.sensor['b'])) + + # Testing heater status + print('Heater status is ' + str(inst.heater_status)) + + # Testing heater range + print('Heater range is ' + str(inst.heater_range)) + inst.heater_range = 'low' + print('Heater range is ' + str(inst.heater_range)) + inst.heater_range = 'off' + print('Heater range is ' + str(inst.heater_range)) + + # Testing heater output + print('Loop 1 heater output ' + str(inst.heater_output_1) + '%') + print('Loop 2 heater output ' + str(inst.heater_output_2) + '%') + + # Testing manual output + print('Loop 1 manual output ' + str(inst.mout['1'])) + print('Loop 2 manual output ' + str(inst.mout['2'])) + inst.mout['1'] = 50 + inst.mout['2'] = 50 + print('Loop 1 manual output ' + str(inst.mout['1'])) + print('Loop 2 manual output ' + str(inst.mout['2'])) + inst.mout['1'] = 0 + inst.mout['2'] = 0 + print('Loop 1 manual output ' + str(inst.mout['1'])) + print('Loop 2 manual output ' + str(inst.mout['2'])) + + # Testing cmode + print('Loop 1 Command Mode: ' + str(inst.cmode['1'])) + inst.cmode['1'] = 'open loop' + print('Loop 1 Command Mode: ' + str(inst.cmode['1'])) + inst.cmode['1'] = 'AutoTune P' + print('Loop 1 Command Mode: ' + str(inst.cmode['1'])) + print('Loop 2 Command Mode: ' + str(inst.cmode['2'])) + + # Testing setpoint + inst.setpoint['1'] = 25 + print('Loop 1 setpoint is ' + str(inst.setpoint['1'])) + inst.setpoint['1'] = 50 + print('Loop 1 setpoint is ' + str(inst.setpoint['1'])) + inst.setpoint['1'] = 300 + print('Loop 1 setpoint is ' + str(inst.setpoint['1'])) + inst.setpoint['2'] = 300 + print('Loop 2 setpoint is ' + str(inst.setpoint['2'])) + + # Testing PID + inst.pid['1'] = list([10.0, 10.0, 10.0]) + print('Loop 1 PID parameters:' + str(inst.pid['1'])) + inst.pid['1'] = list([50.0, 20.0, 1.0]) + print('Loop 1 PID parameters:' + str(inst.pid['1'])) + print('Loop 2 PID parameters:' + str(inst.pid['2'])) diff --git a/lantz/drivers/montana/cryostation.py b/lantz/drivers/montana/cryostation.py new file mode 100644 index 0000000..50c0894 --- /dev/null +++ b/lantz/drivers/montana/cryostation.py @@ -0,0 +1,255 @@ +from lantz.driver import Driver +from lantz import Feat, DictFeat, Action + +from lantz import Q_ + +import socket +import warnings + +# class MontanaWarning(Warning): +# """ +# """ +# def __init__(self): +# super().__init__() + +class Cryostation(Driver): + """ + Lantz driver for interfacing with Montana instruments cryostat. + + Includes testing code, which should work out of the box assuming you give + it the correct IP address. + + Author: P. Mintun + Date: 8/3/2016 + Version: 0.1 + """ + + + def __init__(self, address, port=7773, timeout=20.0): + super().__init__() + self.address = address + self.port = port + + def initialize(self): + """ + Initialize function for Cryostation communication port, uses Python + sockets library to open socket communication with Cryostation. + """ + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.address, self.port)) + self.socket.settimeout(2) + + def finalize(self): + """ + Closes socket communication with Cryostation software. + """ + self.socket.close() + + @Feat() + def alarm_state(self): + """ + Returns true or false, indicating the presence (T) or absence (F) of + a system error. + """ + error = self.send_and_recv('GAS') + if (error == 'T'): + + warnings.warn('Montana in alarm state.') + return True + + return False + + @Feat(units='millitorr') + def chamber_pressure(self): + """ + Returns the chamber pressure in mTorr, or -0.1 if the pressure is + unavailable. + """ + return float(self.send_and_recv('GCP', raise_warning=True)) + + @Feat(units='kelvin') + def platform_temperature(self): + """ + Returns the current platform temperature in K, or -0.100 if the current + temperature is unavailable. + """ + return float(self.send_and_recv('GPT', raise_warning=True)) + + @Feat(units='kelvin') + def platform_stability(self): + """ + Returns the platform stability in K, or -0.100 if unavailable. + """ + return float(self.send_and_recv('GPS')) + + @Feat(units='watts') + def platform_heater_pow(self): + """ + Returns the current platform heater power in W, or -0.100 if + unavailable. + """ + return float(self.send_and_recv('GPHP', raise_warning=True)) + + @Feat(units='kelvin') + def stage_1_temperature(self): + """ + Returns the current stage 1 temperature in K, or -0.100 if the current + temperature is unavailable. + """ + return float(self.send_and_recv('GS1T', raise_warning=True)) + + @Feat(units='watts') + def stage_1_heater_pow(self): + """ + Returns the current stage 1 heater power in W, or -0.100 if + unavailable. + """ + return float(self.send_and_recv('GS1HP', raise_warning=True)) + + @Feat(units='kelvin') + def stage_2_temperature(self): + """ + Returns the current stage 2 temperature in K, or -0.100 if the current + temperature is unavailable. + """ + return float(self.send_and_recv('GS2T', raise_warning=True)) + + @Feat(units='kelvin') + def sample_stability(self): + """ + Returns the sample stability in K, or -0.100 if unavailable. + """ + return float(self.send_and_recv('GSS')) + + @Feat(units='kelvin') + def sample_temperature(self): + """ + Returns the sample temperature in K, or -0.100 if unavailable. + """ + return float(self.send_and_recv('GST', raise_warning=True)) + + @Feat(units='kelvin') + def temp_set_point(self): + """ + Returns the temperature setpoint of the Cryostation software. + """ + return float(self.send_and_recv('GTSP', raise_warning=True)) + + @temp_set_point.setter + def temp_set_point(self, setpoint_kelvin): + """ + Sets the temperature setpoint of the Cryostation software. + """ + return self.send_and_recv('STSP{0:.2f}'.format(setpoint_kelvin)) + + @Feat(units='kelvin') + def user_temperature(self): + """ + Returns the user thermometer temperature in K, or -0.100 if unavailable. + """ + return float(self.send_and_recv('GUT')) + + @Feat(units='kelvin') + def user_stability(self): + """ + Returns the user thermometer stability in K, or -0.100 if unavailable. + """ + return float(self.send_and_recv('GUS')) + + @Action() + def start_cool_down(self): + """ + Initializes cooldown to temperature setpoint. + """ + return self.send_and_recv('SCD') + + @Action() + def start_standby(self): + """ + Puts the system into standby mode (see manual for details). + """ + return self.send_and_recv('SSB') + + @Feat() + def SToP(self): + """ + Returns the status of the last command. + + If OK, the system cannot stop at this time. + """ + return self.send_and_recv('STP') + + @Action() + def start_warm_up(self): + """ + Starts the system warmup to room temperature. + """ + return self.send_and_recv('SWU') + + def send_and_recv(self, message, raise_warning=False): + """ + Params: + message = command to be sent to Cryostation + return_bytes = number of bytes to be stripped off return string + """ + buffer_size = 1024 + m1 = '0{}{}'.format(str(len(message)), message) + + self.socket.send(m1.encode()) + data = str() + received = self.socket.recv(buffer_size) + if received: + data = received.decode() + + if ((data[2:] == '-0.100') & raise_warning): + + warnings.warn('Unable to return parameter from command {}.'.format(message)) + return 0 + + return data[2:] + +def main(): + + address = '192.168.1.100' + port = 7773 + + kelvin = Q_(1, 'kelvin') + + warnings.simplefilter('always', UserWarning) + + + with Cryostation(address, port) as inst: + print('Alarm state: {}'.format(inst.alarm_state)) + print('Chamber pressure: {}'.format(inst.chamber_pressure)) + + print('Temperature and stabiity metrics') + + print('Platform temperature: {}'.format(inst.platform_temperature)) + print('Platform stability: {}'.format(inst.platform_stability)) + print('Stage 1 temperature: {}'.format(inst.stage_1_temperature)) + print('Stage 2 temperature: {}'.format(inst.stage_2_temperature)) + + print('Sample temperature :{}'.format(inst.sample_temperature)) + print('Sample stabiity: {}'.format(inst.sample_stability)) + + print('Heater metrics') + print('Platform heater power: {}'.format(inst.platform_heater_pow)) + print('Stage 1 heater power: {}'.format(inst.stage_1_heater_pow)) + + print('Testing temperature set point...') + print('System set point: {}'.format(inst.temp_set_point)) + inst.temp_set_point = 20.0 * kelvin + print('System set point: {}'.format(inst.temp_set_point)) + inst.temp_set_point = 3.2 * kelvin + print('System set point: {}'.format(inst.temp_set_point)) + + print('Test system actions') + print('Starting cooldown?: {}'.format(inst.start_cool_down())) + print('Starting standby?: {}'.format(inst.start_standby())) + print('Starting warmup?: {}'.format(inst.start_warm_up())) + + + + +if __name__ == '__main__': + main() diff --git a/lantz/drivers/newport/fsm300.py b/lantz/drivers/newport/fsm300.py new file mode 100644 index 0000000..79eeb86 --- /dev/null +++ b/lantz/drivers/newport/fsm300.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +""" + lantz.drivers.newport.fsm300 + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Implementation of FSM300 using NI DAQ controller + + Author: Kevin Miao + Date: 9/27/2016 +""" + +from lantz import Driver +from lantz.driver import Feat, DictFeat, Action +from lantz.drivers.ni.daqmx import AnalogOutputTask, VoltageOutputChannel + +from lantz import Q_ + +import time + +import numpy as np + +def enforce_point_units(point, units='um'): + x, y = point + if not isinstance(x, Q_): + x = Q_(x, 'um') + if not isinstance(y, Q_): + y = Q_(y, 'um') + point = x, y + return point + + +class FSM300(Driver): + + def __init__(self, x_ao_ch, y_ao_ch, + ao_smooth_rate=Q_('10 kHz'), ao_smooth_steps=Q_('1000 1/V'), + limits=((Q_(-10, 'V'), Q_(10, 'V')), (Q_(-10, 'V'), Q_(10, 'V'))), + cal=(Q_(9.5768, 'um/V'), Q_(7.1759, 'um/V'))): + x_limits_mag = tuple(float(val / Q_('1 V')) for val in limits[0]) + y_limits_mag = tuple(float(val / Q_('1 V')) for val in limits[1]) + self.task = AnalogOutputTask('fsm300') + VoltageOutputChannel(x_ao_ch, name='fsm_x', min_max=x_limits_mag, units='volts', task=self.task) + VoltageOutputChannel(y_ao_ch, name='fsm_y', min_max=y_limits_mag, units='volts', task=self.task) + self.ao_smooth_rate = ao_smooth_rate + self.ao_smooth_steps = ao_smooth_steps + self.cal = cal + + self._position = (Q_('0 um'), Q_('0 um')) + + super().__init__() + + return + + def ao_smooth_func(self, init_point, final_point): + init_x, init_y = init_point + final_x, final_y = final_point + + init_x_voltage, final_x_voltage = init_x / self.cal[0], final_x / self.cal[0] + init_y_voltage, final_y_voltage = init_y / self.cal[1], final_y / self.cal[1] + diff_x_voltage = final_x_voltage - init_x_voltage + diff_y_voltage = final_y_voltage - init_y_voltage + + diff_voltage = max(abs(diff_x_voltage), abs(diff_y_voltage)) + steps = int(np.ceil(diff_voltage * self.ao_smooth_steps)) + init = np.array([val.to('V').magnitude for val in [init_x_voltage, init_y_voltage]]) + diff = np.array([val.to('V').magnitude for val in [diff_x_voltage, diff_y_voltage]]) + + versine_steps = (1.0 - np.cos(np.linspace(0.0, np.pi, steps))) / 2.0 + + step_voltages = np.outer(np.ones(steps), init) + np.outer(versine_steps, diff) + return step_voltages + + def ao_linear_func(self, init_point, final_point, steps): + init_x, init_y = init_point + final_x, final_y = final_point + + init_x_voltage, final_x_voltage = init_x / self.cal[0], final_x / self.cal[0] + init_y_voltage, final_y_voltage = init_y / self.cal[1], final_y / self.cal[1] + diff_x_voltage = final_x_voltage - init_x_voltage + diff_y_voltage = final_y_voltage - init_y_voltage + + diff_voltage = max(abs(diff_x_voltage), abs(diff_y_voltage)) + init = np.array([val.to('V').magnitude for val in [init_x_voltage, init_y_voltage]]) + diff = np.array([val.to('V').magnitude for val in [diff_x_voltage, diff_y_voltage]]) + + linear_steps = np.linspace(0.0, 1.0, steps) + + step_voltages = np.outer(np.ones(steps), init) + np.outer(linear_steps, diff) + return step_voltages + + @Feat() + def abs_position(self): + return self._position + + + @abs_position.setter + def abs_position(self, point): + point = enforce_point_units(point) + step_voltages = self.ao_smooth_func(self._position, point) + if step_voltages.size: + steps = step_voltages.shape[0] + clock_config = { + 'source': 'OnboardClock', + 'rate': self.ao_smooth_rate.to('Hz').magnitude, + 'sample_mode': 'finite', + 'samples_per_channel': steps, + } + self.task.configure_timing_sample_clock(**clock_config) + task_config = { + 'data': step_voltages, + 'auto_start': False, + 'timeout': 0, + 'group_by': 'scan', + } + self.task.write(**task_config) + self.task.start() + time.sleep((steps / self.ao_smooth_rate).to('s').magnitude) + self.task.stop() + self._position = point + + @Action() + def line_scan(self, init_point, final_point, steps, acq_task, acq_rate=Q_('10 kHz'), pts_per_pos=100): + init_point = enforce_point_units(init_point) + final_point = enforce_point_units(final_point) + step_voltages = self.ao_linear_func(init_point, final_point, steps) + step_voltages = np.repeat(step_voltages, pts_per_pos, axis=0) + + if acq_task.IO_TYPE == 'CI': + + # add extra sample for taking diff + clock_config = { + #'source': '/Dev1/ao/SampleClock', + 'rate': acq_rate.to('Hz').magnitude, + 'sample_mode': 'finite', + 'samples_per_channel': len(step_voltages) + 1, + } + self.task.configure_timing_sample_clock(**clock_config) + clock_config = { + 'source': '/Dev1/ao/SampleClock', + 'rate': acq_rate.to('Hz').magnitude, + 'sample_mode': 'finite', + 'samples_per_channel': len(step_voltages) + 1, + } + acq_task.configure_timing_sample_clock(**clock_config) + task_config = { + 'data': step_voltages, + 'auto_start': False, + 'timeout': 0, + 'group_by': 'scan', + } + acq_task.arm_start_trigger_source = '/Dev1/ao/StartTrigger' + acq_task.arm_start_trigger_type = 'digital_edge' + acq_task.start() + + self.task.write(**task_config) + + self.task.start() + time.sleep(len(step_voltages)/acq_rate.to('Hz').magnitude) + #acq_task.arm_start_trigger_source = 'Dev1/PFI15' + #acq_task.arm_start_trigger_type = 'digital_edge' + + scanned = acq_task.read(samples_per_channel=len(step_voltages)+1, + timeout=(len(step_voltages) + 1)/acq_rate.to('Hz').magnitude) + + #delta_cts = np.insert(np.diff(scanned), scanned[0], 0) + delta_cts = np.diff(scanned) + rate = delta_cts * acq_rate.to('Hz').magnitude + acq_task.stop() + self.task.stop() + nb_chan = scanned.shape[0] + + # TODO: check to make sure that this reshapes the way it should + return rate.reshape((pts_per_pos,steps)) + + else: + + clock_config = { + 'source': 'OnboardClock', + 'rate': acq_rate.to('Hz').magnitude, + 'sample_mode': 'finite', + 'samples_per_channel': len(step_voltages), + } + self.task.configure_timing_sample_clock(**clock_config) + acq_task.configure_timing_sample_clock(**clock_config) + task_config = { + 'data': step_voltages, + 'auto_start': False, + 'timeout': 0, + 'group_by': 'scan', + } + self.task.write(**task_config) + self.task.configure_trigger_digital_edge_start('ai/StartTrigger') + self.task.start() + acq_task.start() + scanned = acq_task.read(samples_per_channel=len(step_voltages)) + acq_task.stop() + self.task.stop() + nb_chan = scanned.shape[0] + return scanned.reshape((nb_chan,steps,pts_per_pos)).mean(axis=2) diff --git a/lantz/drivers/newport/newport69907.py b/lantz/drivers/newport/newport69907.py new file mode 100644 index 0000000..a4b2539 --- /dev/null +++ b/lantz/drivers/newport/newport69907.py @@ -0,0 +1,315 @@ +from lantz import Feat, Action +from lantz.messagebased import MessageBasedDriver +from time import sleep +from pyvisa import constants + + +class Newport69907(MessageBasedDriver): + """ + This class implements serial control of a Newport69907 arc lamp power + supply. + """ + DEFAULTS = {'ASRL': {'write_termination': '\r', + 'read_termination': '\r', + 'baud_rate': 9600, + 'data_bits': 8, + 'parity': constants.Parity.none, + 'stop_bits': constants.StopBits.one, + 'encoding': 'latin-1', + 'timeout': 10000}} + + max_lamp_amps = 12 # amps + max_lamp_power = 100 # watts + + def parse_status_register(self, status_register): + """ + Parses the status registry codes bit by bit into meaningful statement. + These error bits correspond to whether or not the corresponding LEDs + on the front panel are illuminated. + + Bit 7 - Lamp on + Bit 6 - Ext + Bit 5 - Power/Current mode + Bit 4 - Cal mode + Bit 3 - Fault + Bit 2 - Communication + Bit 1 - Limit + Bit 0 - Interlock + """ + status_code = int(('0x' + status_register.replace('STB', '')), 16) + status = [] + # print('Status code:{}'.format(status_code)) + if status_code > 128: + status.append('Lamp on') + status_code -= 128 + if status_code > 64: + status.append('Ext') + status_code -= 64 + if status_code > 32: + status.append('Power Mode') + status_code -= 32 + else: + status.append('Current Mode') + if status_code > 16: + status.append('Cal Mode') + status_code -= 16 + if status_code > 8: + status.append('Fault') + status_code -= 8 + if status_code > 4: + status.append('Comm') + status_code -= 4 + if status_code > 2: + status.append('Limit') + status_code -= 2 + if status_code == 1: + status.append('Interlock') + return status + + def parse_error_register(self, error_register): + """ + Parses the status registry codes bit by bit into meaningful statement. + + Bit 7 - Power on + Bit 6 - User request + Bit 5 - Command error + Bit 4 - Execution error + Bit 3 - Device dependent error + Bit 2 - Query error + Bit 1 - Request control + Bit 0 - Operation complete + """ + # convert hex string into integer + err_code = int(('0x' + error_register.replace('ESR', '')), 16) + errors = [] + # print('error code:{}'.format(err_code)) + if err_code > 128: + errors.append('Power on') + err_code -= 128 + if err_code > 64: + errors.append('User request') + err_code -= 64 + if err_code > 32: + errors.append('Command error') + err_code -= 32 + if err_code > 16: + errors.append('Execution error') + err_code -= 16 + if err_code > 8: + errors.append('Device dependent error.') + err_code -= 8 + if err_code > 4: + errors.append('Query error.') + err_code -= 4 + if err_code > 2: + errors.append('Request control') + err_code -= 2 + # if err_code == 1: + # no error + return errors + + @Feat() + def idn(self): + """ + Return power supply model number + """ + return self.query('IDN?') + + @Feat() + def status(self): + """ + Returns status information of the instrument. + """ + status = self.query('STB?') + return self.parse_status_register(status) + + @Feat() + def error_status(self): + """ + Returns the instrument error status. + """ + error_register = self.query('ESR?') + return self.parse_error_register(error_register) + + @Feat() + def amps(self): + """ + Returns output amperage displayed on the front panel. + """ + return float(self.query('AMPS?')) + + @Feat() + def volts(self): + """ + Return output voltage displayed on front panel. + """ + return float(self.query('VOLTS?')) + + @Feat() + def watts(self): + """ + Returns output wattage displayed on front panel. + """ + return int(self.query('WATTS?')) + + @Feat() + def lamp_hrs(self): + """ + Returns the number of hours on the current lamp. + """ + return self.query('LAMP HRS?') + + @Feat() + def amp_preset(self): + """ + Returns the lamp amperage preset value. + """ + return float(self.query('A-PRESET?')) + + @amp_preset.setter + def amp_preset(self, preset_val): + """ + Sets the lamp amperage preset value to preset_val. + """ + error_status = self.query('A-PRESET={0:.1f}'.format(preset_val)) + return self.parse_error_register(error_status) + + @Feat() + def power_preset(self): + """ + Returns the lamp power preset value (in watts). + """ + return int(self.query('P-PRESET?')) + + @power_preset.setter + def power_preset(self, power_preset): + """ + Sets the lamp powr preset value (in watts). + """ + error_status = self.query('P-PRESET={0:04d}'.format(int(power_preset))) + return self.parse_error_register(error_status) + + @Feat(limits=(0, max_lamp_amps)) + def amp_lim(self): + """ + Return current lamp amperage limit. + """ + return float(self.query('A-LIM?')) + + @amp_lim.setter + def amp_lim(self, max_amps): + """ + Sets current amperage limit to max_amps + """ + error_register = self.query('A-LIM={0:.1f}'.format(max_amps)) + return self.parse_error_register(error_register) + + @Feat(limits=(0, max_lamp_power)) + def power_lim(self): + """ + Return current power limit. + """ + return int(self.query('P-LIM?')) + + @power_lim.setter + def power_lim(self, max_power): + """ + Sets the power limit of the lamp to max_power. + """ + error_register = self.query('P-LIM={0:04d}'.format(int(max_power))) + return self.parse_error_register(error_register) + + @Action() + def start_lamp(self): + """ + Turns on the arc lamp. + """ + error_register = self.query('START') + return self.parse_error_register(error_register) + + @Action() + def stop_lamp(self): + """ + Turns off the arc lamp. + """ + error_register = self.query('STOP') + return self.parse_error_register(error_register) + + @Action() + def reset(self): + """ + Returns the arc lamp to factory default settings. + """ + error_register = self.query('RST') + return self.parse_error_register(error_register) + + +if __name__ == '__main__': + with Newport69907('ASRL9::INSTR') as inst: + print('== Instrument Information ==') + print('Serial number:{}'.format(inst.idn)) + print('Status:{}'.format(inst.status)) + print('Errors:{}'.format(inst.error_status)) + + print('== Current lamp status ==') + print('Amps:{}A'.format(inst.amps)) + print('Volts:{}V'.format(inst.volts)) + print('Watts:{}W'.format(inst.watts)) + print('Lamp hours:{}hrs'.format(inst.lamp_hrs)) + + print('== Lamp Settings ==') + print('Current preset:{}A'.format(inst.amp_preset)) + print('Power preset:{}W'.format(inst.power_preset)) + print('Current limit:{}A'.format(inst.amp_lim)) + print('Power limit:{}W'.format(inst.power_lim)) + + print('== Lamp Control ==') + print('Starting lamp...') + inst.start_lamp() + sleep(5) + print('Amps:{}A'.format(inst.amps)) + print('Volts:{}V'.format(inst.volts)) + print('Watts:{}W'.format(inst.watts)) + print('Stopping lamp...') + inst.stop_lamp() + + print('== Changing Lamp Settings ==') + inst.amp_lim = 12.0 + inst.amp_preset = 7.5 + inst.power_lim = 98 + inst.power_preset = 95 + + print('Current preset:{}A'.format(inst.amp_preset)) + print('Power preset:{}W'.format(inst.power_preset)) + print('Current limit:{}A'.format(inst.amp_lim)) + print('Power limit:{}W'.format(inst.power_lim)) + + inst.amp_lim = 10.0 + inst.amp_preset = 5.5 + inst.power_lim = 80 + inst.power_preset = 75 + + print('Current preset:{}A'.format(inst.amp_preset)) + print('Power preset:{}W'.format(inst.power_preset)) + print('Current limit:{}A'.format(inst.amp_lim)) + print('Power limit:{}W'.format(inst.power_lim)) + + print('Starting lamp...') + inst.start_lamp() + sleep(5) + print('Amps:{}A'.format(inst.amps)) + print('Volts:{}V'.format(inst.volts)) + print('Watts:{}W'.format(inst.watts)) + print('Stopping lamp...') + inst.stop_lamp() + sleep(1) + + inst.amp_lim = 12.0 + inst.amp_preset = 7.5 + inst.power_lim = 98 + inst.power_preset = 95 + + print('Current preset:{}A'.format(inst.amp_preset)) + print('Power preset:{}W'.format(inst.power_preset)) + print('Current limit:{}A'.format(inst.amp_lim)) + print('Power limit:{}W'.format(inst.power_lim)) diff --git a/lantz/drivers/ni/Newport_FSM.py b/lantz/drivers/ni/Newport_FSM.py new file mode 100644 index 0000000..db67c23 --- /dev/null +++ b/lantz/drivers/ni/Newport_FSM.py @@ -0,0 +1,486 @@ +# Demo code to control Newport FSM, based off of original by David J. Christle +# Original code here: +# https://github.com/dchristle/qtlab/blob/master/instrument_plugins/Newport_FSM.py + +from lantz.drivers.ni.daqmx import System +from lantz.drivers.ni.daqmx import AnalogOutputTask, VoltageOutputChannel +from lantz.drivers.ni.daqmx import AnalogInputTask, VoltageInputChannel #CounterInputTask, CountEdgesChannel, DigitalOutputChannel, DigitalOutputTask + +from lantz import Feat, DictFeat, Action + +import numpy as np + +from numpy import abs, ceil, pi, linspace, cos, ones + +from time import sleep + +from ctypes import c_uint32 + + +class Newport_FSM(System): + """ + Class for controlling Newport FSM using National Instruments DAQ. + """ + + def initialize(self): + """ + Creates AO tasks for controlling FSM. + """ + self.task = AnalogOutputTask('FSM') + + self.dev_name = 'dev1/' + + self.fsm_dimensions = { + 'x': {'micron_per_volt': 9.5768, + 'min_v': -10.0, + 'max_v': +10.0, + 'default': 0.0, + 'origin': 0.0, + 'ao_channel': 'ao0'}, + 'y': {'micron_per_volt': 7.1759, + 'min_v': -10.0, + 'max_v': +10.0, + 'default': 0.0, + 'origin': 0.0, + 'ao_channel': 'ao1'} + } + + self.ao_smooth_rate = 50000.0 # Hz + self.ao_smooth_steps_per_volt = 1000.0 # steps/V + #self.ao_smooth_steps_per_volt = 200.0 # steps/V + self.count_task = None + + + for dim in self.fsm_dimensions: + chan_name = dim + phys_chan = self.dev_name + self.fsm_dimensions[dim]['ao_channel'] + V_min = self.fsm_dimensions[dim]['min_v'] + V_max = self.fsm_dimensions[dim]['max_v'] + + chan = VoltageOutputChannel(phys_chan, name=chan_name, + min_max=(V_min, V_max), units='volts', + task=self.task) + + + + def finalize(self): + """ + Stops AO tasks for controlling FSM. + """ + #self.task.stop() + + @Feat() + def abs_position_pair(self): + x_loc = abs_position_single['x'] + y_loc = abs_position_single['y'] + print('FSM at ({}um,{}um)'.format(x_loc, y_loc)) + return [x_loc, y_loc] + + @abs_position_pair.setter + def abs_position_pair(self, xy_pos): + """ + Sets absolute position of scanning mirror, as a micron pair. + """ + abs_position_single['x'] = xy_pos[0] + abs_position_single['y'] = xy_pos[1] + print('Set FSM to ({}um,{}um).'.format(xy_pos[0], xy_pos[1])) + return 0 + + @DictFeat() + def abs_position_single(self, dimension): + """ + Returns absolute position of scanning mirror along dimension. + """ + return 0 + + @abs_position_single.setter + def abs_position_single(self, dimension, pos): + """ + Sets absolute position of scanning mirror along dimension to be pos. + """ + return 0 + + def convert_V_to_um(self, dim, V): + """ + Returns micron position corresponding to channel voltage. + """ + um = V * self.fsm_dimensions[dim]['micron_per_volt'] + ( + self.fsm_dimensions[dim]['origin']) + return um + + def convert_um_to_V(self, um, dim): + """ + Returns voltage corresponding to micron position. + """ + V = (um - self.fsm_dimensions[dim]['origin']) / ( + self.fsm_dimensions[dim]['micron_per_volt']) + return V + + def set_V_to_zero(self): + """ + Sets output voltages to 0. + """ + self.abs_position_pair = (self.convert_V_to_um(0, 'x'), + self.convert_V_to_um(0, 'y')) + + def ao_smooth(self, x_init, x_final, channel): + """ + Smooths output of DAQ to avoid hysteresis w/ moving FSM mirror. + + Copy of dchristle's algorithm. (I know what you're trying to do, and + just stop.) + """ + + v_data = self.AO_smooth_func(x_init, x_final, channel) + n_steps = v_data.size + + samp_arr = np.empty(n_steps * 2, dtype=float) + + if channel == 'x': + samp_arr[0::2] = v_data + samp_arr[1::2] = np.zeros(n_steps) + else: + samp_arr[0::2] = np.zeros(n_steps) + samp_arr[1::2] = v_data + + + + print(samp_arr) + print(n_steps) + + #samp_arr = 1.5 + + + self.task.configure_timing_sample_clock(source='OnboardClock', rate=self.ao_smooth_rate, + sample_mode='finite', + samples_per_channel=int(n_steps)) + + self.task.write(data=samp_arr, auto_start=False, timeout=0, group_by='scan') + + + try: + self.task.start() + print('Started FSM Scan') + print('Expected scan time:{} sec'.format(n_steps*1.0/self.ao_smooth_rate)) + print('Number of scan points:{}'.format(n_steps)) + sleep(n_steps*1.0/self.ao_smooth_rate) + self.task.stop() + + except: + print('Failed in counter start phase...') + + + def AO_smooth_func(self, x_init, x_final, channel): + """ + :param x_min: minimum x coordinate (in microns) for scan + :param x_max: maximum x coordinate (in microns) for scan + :param channel: channel to scan over (i.e. x or y) + + :return: array of voltages to write + """ + + v_init = self.convert_um_to_V(x_init, channel) + v_final = self.convert_um_to_V(x_final, channel) + + n_steps = int(ceil(abs(v_final - v_init) * self.ao_smooth_steps_per_volt)) + + v_data = v_init * ones(n_steps) + (v_final - v_init) * (1.0 - cos( + linspace(0.0, pi, n_steps))) / 2.0 + + return v_data + + + def fsm_line_generator(self, x_min, x_max, y_min, y_max, freq, input_voltage_chan): + """ + Scan FSM by writing an array of points over analog out + + Args: + :param ctrchan: (string) device channel for counter + :param x_min: (float) minimum x position (in microns) for scan + :param x_max: (float) maximum x position (in microns) for scan + :param y_min: (float) minimum y position (in microns) for scan + :param y_max: (float) maximum y position (in microns) for scan + :param freq: (float) frequency for FSM scan + + :param input_voltage_chan: (str) device channel for analog input voltage to read (i.e. 'dev1/ai0') + + :return: returns a tuple containing two float arrays. + The first array contains the x-y positions used for the scan. + The second array contains the counts corresponding to each position. + """ + v_data, nx_steps, ny_steps = self.generate_2D_scan_array(x_min, x_max, y_min, y_max) + + v_chunks = np.array_split(v_data, ny_steps) + + counts = np.zeros(nx_steps) + + if self.count_task is None: + self.count_task = AnalogInputTask('voltage') + counter_channel = VoltageInputChannel(input_voltage_chan) + self.count_task.add_channel(counter_channel) + + sample_clock = 'OnboardClock' + self.task.configure_timing_sample_clock(source=sample_clock, rate=freq, + sample_mode='finite', + samples_per_channel=nx_steps) + + self.count_task.configure_timing_sample_clock(source=sample_clock, rate=freq, + sample_mode='finite', + samples_per_channel=nx_steps) + trigger_src = 'ai/StartTrigger' + self.task.configure_trigger_digital_edge_start(trigger_src) + + + for chunk in v_chunks: + # Write the array of voltages that will be used to control the DAQ here. + self.task.write(chunk, auto_start=False, timeout=0, group_by='scan') + + # Synchronize tasks to both start on the AnalogInput start trigger + + # This arms the trigger, thereby starting the AnalogOutputTask (FSM) + # simultaneously with the AnalogInputTask + self.task.start() + + + scan_time = nx_steps*3.0/freq + # Executing the read task executes the two tasks synchronously + t_extra = 0.0 # can add grace period, probably unnecessary + counts = self.count_task.read(samples_per_channel=nx_steps, timeout=scan_time + t_extra) + + yield np.ndarray.flatten(counts) + + self.task.stop() + self.count_task.stop() + + return + + + def generate_2D_scan_array(self, x_min, x_max, y_min, y_max): + """ + Generates array of points to write to analog output for x, y channels + + Args: + :param ctrchan: (string) device channel for counter + :param x_min: (float) minimum x position (in microns) for scan + :param x_max: (float) maximum x position (in microns) for scan + :param y_min: (float) minimum y position (in microns) for scan + :param y_max: (float) maximum y position (in microns) for scan + + """ + + x_voltages = self.AO_smooth_func(x_min, x_max, 'x') + nx_steps = x_voltages.size + y_voltages = self.AO_smooth_func(y_min, y_max, 'y') + ny_steps = y_voltages.size + + x_pts = self.convert_V_to_um('x', x_voltages) + y_pts = self.convert_V_to_um('y', y_voltages) + + + # Create a 2D array of tuples for the xy points in the scan. + # Note that the AO smooth procedure means these points do not fall on a perfect grid + # That's why I've calculated them here and returned them to be available to the user. + + # Note: change counts data type to be int, if using edge count APD + count_tuple = np.dtype([('x', 'float'), ('y', 'float'), ('counts', 'float')]) + xy_pts = np.zeros([nx_steps, ny_steps], dtype=count_tuple) + + # This avoids a nested for loop (if you have large array of points, O(N^2)) + # I assumed that numpy does both of these operations relatively fast... + xy_pts['x'] = np.tile(x_pts, ny_steps).reshape(nx_steps, ny_steps) + xy_pts['y'] = np.repeat(y_pts, nx_steps).reshape(nx_steps, ny_steps) + + n_steps = (nx_steps + 1) * ny_steps + v_data = np.zeros(2*n_steps) + + # Avoids piezo hysteresis by ensuring they always take data scanning in the same direction + hyst_offset = 0.2 #volts, your MMV with this particular choice + x_voltages = np.hstack([x_voltages[0] - hyst_offset, x_voltages]) + + v_data[0::2] = np.tile(x_voltages, ny_steps) #scan in x repeatedly (x_min to x_max, x_min to x_max, ...) + v_data[1::2] = np.repeat(y_voltages, nx_steps + 1) #scan in y slowly (y_min, y_min, ..., y_min+1, y_min+1, ..., y_max) + + + return v_data, nx_steps, ny_steps + + + def FSM_2D_counts(self, x_min, x_max, y_min, y_max, freq, input_voltage_chan): + """ + Scan FSM by writing an array of points over analog out + + Args: + :param ctrchan: (string) device channel for counter + :param x_min: (float) minimum x position (in microns) for scan + :param x_max: (float) maximum x position (in microns) for scan + :param y_min: (float) minimum y position (in microns) for scan + :param y_max: (float) maximum y position (in microns) for scan + :param freq: (float) frequency for FSM scan + + :param input_voltage_chan: (str) device channel for analog input voltage to read (i.e. 'dev1/ai0') + + :return: returns a tuple containing two float arrays. + The first array contains the x-y positions used for the scan. + The second array contains the counts corresponding to each position. + """ + + x_voltages = self.AO_smooth_func(x_min, x_max, 'x') + nx_steps = x_voltages.size + y_voltages = self.AO_smooth_func(y_min, y_max, 'y') + ny_steps = y_voltages.size + + x_pts = self.convert_V_to_um('x', x_voltages) + y_pts = self.convert_V_to_um('y', y_voltages) + + + # Create a 2D array of tuples for the xy points in the scan. + # Note that the AO smooth procedure means these points do not fall on a perfect grid + # That's why I've calculated them here and returned them to be available to the user. + + # Note: change counts data type to be int, if using edge count APD + count_tuple = np.dtype([('x', 'float'), ('y', 'float'), ('counts', 'float')]) + xy_pts = np.zeros([nx_steps, ny_steps], dtype=count_tuple) + + # This avoids a nested for loop (if you have large array of points, O(N^2)) + # I assumed that numpy does both of these operations relatively fast... + xy_pts['x'] = np.tile(x_pts, ny_steps).reshape(nx_steps, ny_steps) + xy_pts['y'] = np.repeat(y_pts, nx_steps).reshape(nx_steps, ny_steps) + + n_steps = (nx_steps + 1) * ny_steps + v_data = np.zeros(2*n_steps) + + # Avoids piezo hysteresis by ensuring they always take data scanning in the same direction + hyst_offset = 0.2 #volts, your MMV with this particular choice + x_voltages = np.hstack([x_voltages[0] - hyst_offset, x_voltages]) + + v_data[0::2] = np.tile(x_voltages, ny_steps) #scan in x repeatedly (x_min to x_max, x_min to x_max, ...) + v_data[1::2] = np.repeat(y_voltages, nx_steps + 1) #scan in y slowly (y_min, y_min, ..., y_min+1, y_min+1, ..., y_max) + + counts = np.zeros(n_steps) + + self.count_task = AnalogInputTask('voltage') + counter_channel = VoltageInputChannel(input_voltage_chan) + self.count_task.add_channel(counter_channel) + + + # Synchronize tasks to both read at a rate, which is controlled by the internal DAQ clock. + sample_clock = 'OnboardClock' + self.task.configure_timing_sample_clock(source=sample_clock, rate=freq, + sample_mode='finite', + samples_per_channel=n_steps) + + self.count_task.configure_timing_sample_clock(source=sample_clock, rate=freq, + sample_mode='finite', + samples_per_channel=n_steps) + + # Write the array of voltages that will be used to control the DAQ here. + self.task.write(v_data, auto_start=False, timeout=0, group_by='scan') + + # Synchronize tasks to both start on the AnalogInput start trigger + trigger_src = 'ai/StartTrigger' + self.task.configure_trigger_digital_edge_start(trigger_src) + + # This arms the trigger, thereby starting the AnalogOutputTask (FSM) simultaneously + # with the AnalogInputTask + self.task.start() + + print('Tasks configured.') + + + try: + print('Starting FSM Scan...') + scan_time = n_steps*1.0/freq + print('Expected scan time:{} sec'.format(scan_time)) + print('Number of scan points:{}'.format(n_steps)) + # Executing the read task executes the two tasks synchronously + t_extra = 1.0 # adds 1 second grace period, probably unnecessary + counts = self.count_task.read(samples_per_channel=n_steps, timeout=scan_time + t_extra) + + # Stop both tasks + self.task.stop() + self.count_task.stop() + + except: + print('FSM: 2D Scan Failed in counter start phase.') + + counts = counts.reshape([nx_steps + 1, ny_steps]) + + xy_pts['counts'] = counts[1:,:] #drop first column + + print(counts.shape) + print(xy_pts.shape) + + # This then returns a FSM 2D scan image as a tuple (x_coord, y_coord, counts) + return counts + + + + # Burst mode code below + + # Write the array of voltages that will be used to control the DAQ here. + self.task.write(v_data, auto_start=False, timeout=0, group_by='scan') + + # Synchronize tasks to both start on the AnalogInput start trigger + trigger_src = 'ai/StartTrigger' + self.task.configure_trigger_digital_edge_start(trigger_src) + + # This arms the trigger, thereby starting the AnalogOutputTask (FSM) simultaneously + # with the AnalogInputTask + self.task.start() + + print('Tasks configured.') + + + try: + print('Starting FSM Scan...') + scan_time = n_steps*1.0/freq + print('Expected scan time:{} sec'.format(scan_time)) + print('Number of scan points:{}'.format(n_steps)) + # Executing the read task executes the two tasks synchronously + t_extra = 1.0 # adds 1 second grace period, probably unnecessary + counts = self.count_task.read(samples_per_channel=n_steps, timeout=scan_time + t_extra) + + # Stop both tasks + self.task.stop() + self.count_task.stop() + + except: + print('FSM: 2D Scan Failed in counter start phase.') + + counts = counts.reshape([nx_steps + 1, ny_steps]) + + xy_pts['counts'] = counts[1:,:] #drop first column + + print(counts.shape) + print(xy_pts.shape) + + # This then returns a FSM 2D scan image as a tuple (x_coord, y_coord, counts) + return counts + + + + + + + +if __name__ == '__main__': + with Newport_FSM() as inst: + print('testing!') + print('Sweep from -5.0 to 5.0') + #inst.ao_smooth(-5.0, 5.0, 'x') + # print('Sweep from 5.0 to -25.0') + # inst.ao_smooth(5.0, -25.0, 'x') + # sleep(1) + # print('Sweep from -25.0 to 25.0') + # inst.ao_smooth(-25.0, 25.0, 'x') + # sleep(1) + # inst.ao_smooth(-5.0, 5.0, 'y') + # sleep(1) + # inst.ao_smooth(5.0, -25.0, 'y') + # sleep(1) + # inst.ao_smooth(-25.0, 25.0, 'y') + # sleep(1) + + print('testing 2D count and scan...') + inst.FSM_2D_counts(-10.0, 10.0, -10.0, 10.0, 50000.0, 'dev1/ai0') + + print('derp derp derp derp!') diff --git a/lantz/drivers/ni/__init__.py b/lantz/drivers/ni/__init__.py index d981148..9d25891 100644 --- a/lantz/drivers/ni/__init__.py +++ b/lantz/drivers/ni/__init__.py @@ -4,7 +4,7 @@ ~~~~~~~~~~~~~~~~ :company: National Instruments - :description: + :description: :website: http://www.ni.com/ ---- @@ -13,5 +13,8 @@ :license: BSD, see LICENSE for more details. """ -from . import daqmx - +#__all__ = ['daqmx'] +#import lantz.drivers.ni.daqmx +#import daqmx +#from . import daqmx +# diff --git a/lantz/drivers/ni/daqmx/__init__.py b/lantz/drivers/ni/daqmx/__init__.py index a749b22..d44629d 100644 --- a/lantz/drivers/ni/daqmx/__init__.py +++ b/lantz/drivers/ni/daqmx/__init__.py @@ -14,7 +14,7 @@ :company: National Instruments - :description: + :description: :website: http://www.ni.com/ ---- @@ -23,9 +23,8 @@ :license: BSD, see LICENSE for more details. """ +__all__ = ['base', 'channels', 'tasks', 'constants'] from .base import System, Task, Channel, Device from .channels import * from .tasks import * from .constants import Constants, Types - - diff --git a/lantz/drivers/ni/daqmx/base.py b/lantz/drivers/ni/daqmx/base.py index 50be24f..b3dc79b 100644 --- a/lantz/drivers/ni/daqmx/base.py +++ b/lantz/drivers/ni/daqmx/base.py @@ -1,5 +1,4 @@ -# -*- coding: utf-8 -*- -""" +""" lantz.drivers.ni.daqmx.base ~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -151,10 +150,10 @@ def __init__(self, *args, **kwargs): @Feat(read_once=True) def version(self): - """Version of installed NI-DAQ library. + """Version of installed NI-DAQ library. """ - err, major = self.lib.GetSysNIDAQMajorVersion(RetValue('u32')) - err, minor = self.lib.GetSysNIDAQMinorVersion(RetValue('u32')) + err, major = self.lib.GetSysNIDAQMajorVersion(RetValue('u32')) + err, minor = self.lib.GetSysNIDAQMinorVersion(RetValue('u32')) return major, minor def _device_names(self): @@ -377,12 +376,12 @@ def typed_task(cls, io_type): return cls._REGISTRY[io_type] def _load_task(self, name): - err, self.__task_handle = self.lib.LoadTask(name, RetValue('u32')) + err, self.__task_handle = self.lib.LoadTask(name, RetValue('u64')) self.name = name self.log_debug('Loaded task with {} ({})'.format(self.name, self.__task_handle)) def _create_task(self, name): - err, self.__task_handle = self.lib.CreateTask(name, RetValue('u32')) + err, self.__task_handle = self.lib.CreateTask(name, RetValue('u64')) err, self.name = self.lib.GetTaskName(*RetStr(default_buf_size)) self.log_debug('Created task with {} ({})'.format(self.name, self.__task_handle)) @@ -422,6 +421,13 @@ def _channel_names(self): names = tuple(n.strip() for n in buf.split(',') if n.strip()) return names + def number_of_channels(self): + """Returns the number of virtual channels in the task. + """ + err, buf = self.lib.GetTaskChannels(*RetStr(default_buf_size)) + names = tuple(n.strip() for n in buf.split(',') if n.strip()) + return len(names) + def _device_names(self): """Return a tuple with the names of all devices in the task. """ @@ -501,7 +507,6 @@ def start(self): repeatedly. Starting and stopping a task repeatedly reduces the performance of the application. """ - self.lib.StartTask() @Action() @@ -849,9 +854,10 @@ def configure_timing_sample_clock(self, source='on_board_clock', rate=1, active_ """ if source == 'on_board_clock': source = None - self.samples_per_channel = samples_per_channel self.sample_mode = sample_mode - self.lib.CfgSampClkTiming(source, float64(rate), active_edge, sample_mode, samples_per_channel) + + + self.lib.CfgSampClkTiming(source, rate, active_edge, sample_mode, samples_per_channel) def configure_timing_burst_handshaking_export_clock(self, *args, **kws): """ @@ -951,7 +957,13 @@ def configure_trigger_digital_edge_start(self, source, edge='rising'): acquiring or generating samples: rising or falling edge(s). """ - self.lib.CfgDigEdgeStartTrig(self, source, edge) + if edge == Constants.Val_Falling: + edge_val = int(Constants.Val_Falling) + else: + edge_val = int(Constants.Val_Rising) + + + self.lib.CfgDigEdgeStartTrig(source, edge_val) @Action(values=(str, str, _WHEN_MATCH)) def configure_trigger_digital_pattern_start(self, source, pattern, match=True): @@ -975,7 +987,7 @@ def configure_trigger_digital_pattern_start(self, source, pattern, match=True): Specifies the conditions under which the trigger occurs: pattern matches or not. """ - self.lib.CfgDigPatternStartTrig(self, source, pattern, match) + self.lib.CfgDigPatternStartTrig(source, pattern, match) def configure_trigger_disable_start(self): """ @@ -1028,6 +1040,18 @@ def configure_analog_edge_reference_trigger(self, source, slope='rising',level=1 self.lib.CfgAnlgEdgeRefTrig(source, slope, level, pre_trigger_samps) + @Action() + def send_software_trigger(self, triggerID=Constants.Val_AdvanceTrigger): + """ + + :param triggerID: (int32) Specifies which software trigger to generate. + + DAQmx_Val_AdvanceTrigger Generate the advance trigger + + """ + + self.lib.SendSoftwareTrigger(triggerID) + @Feat(values=(str, _WHEN_WINDOW, None, None, None)) def configure_analog_window_reference_trigger(self, source, when='entering',top=1.0, bottom=1.0, pre_trigger_samps=0): @@ -1147,6 +1171,20 @@ def configure_digital_pattern_reference_trigger(self, source, pattern, match=Tru self.lib.CfgDigPatternRefTrig(self, source, pattern, match, pre_trigger_samps) + @Action() + def configure_output_buffer(self, samples_per_channel=1000): + """ + + :params: taskHandle (TaskHandle): task to configure + :params: num_samps_per_chan (uInt32): number of samples to output per channel. + Zero indicates no buffer should be allocated. Use a buffer size of 0 to perform + a hardware-timed operation without using a buffer. + + :return: status (int32) - error code returned by the function in the event of an + error or warning + """ + return self.lib.CfgOutputBuffer(self, samples_per_channel) + @Action() def disable_reference_trigger(self): """ @@ -1318,7 +1356,6 @@ def arm_start_trigger_source(self): err, value = self.lib.GetDigEdgeArmStartTrigSrc(RetStr(default_buf_size)) return value - @arm_start_trigger_source.setter @arm_start_trigger_source.setter def arm_start_trigger_source(self, source): source = str (source) diff --git a/lantz/drivers/ni/daqmx/channels.py b/lantz/drivers/ni/daqmx/channels.py index 3a19073..fa9c080 100644 --- a/lantz/drivers/ni/daqmx/channels.py +++ b/lantz/drivers/ni/daqmx/channels.py @@ -73,11 +73,11 @@ class VoltageInputChannel(Channel): def __init__(self, phys_channel, name='', terminal='default', min_max=(-10., 10.), units='volts', task=None): + terminal_val = self.terminal_map[terminal] + if not name: name = ''#phys_channel - terminal_val = self.terminal_map[terminal] - if units != 'volts': custom_scale_name = units units = Constants.Val_FromCustomScale @@ -100,19 +100,27 @@ class VoltageOutputChannel(Channel): CHANNEL_TYPE = 'AO' - def __init__(self, phys_channel, channel_name='', terminal='default', min_max=(-1, -1), units='volts'): + CREATE_FUN = 'CreateAOVoltageChan' - terminal_val = self.terminal_map[terminal] + def __init__(self, phys_channel, name='', min_max=(-10., 10.), + units='volts', task=None): + + if not name: + name = '' # phys_channel if units != 'volts': custom_scale_name = units - units = Constants.FROM_CUSTOM_SCALE + units = Constants.Val_FromCustomScale else: custom_scale_name = None - units = Constants.VOLTS + units = Constants.Val_Volts + + self._create_args = (phys_channel, name, min_max[0], min_max[1], units, + custom_scale_name) + + super().__init__(task=task, name=name) + - err = self.lib.CreateAOVoltageChan(phys_channel, channel_name, - min_max[0], min_max[1], units, custom_scale_name) # Not implemented: # DAQmxCreateAIAccelChan, DAQmxCreateAICurrentChan, DAQmxCreateAIFreqVoltageChan, @@ -168,17 +176,26 @@ class DigitalInputChannel(Channel): 'all_lines' - One channel for all lines """ + CHANNEL_TYPE = 'DI' + + CREATE_FUN = 'CreateDIChan' + + def __init__(self, lines, name='', group_by='line'): if group_by == 'line': - grouping_val = Constants.ChanPerLine + grouping_val = Constants.Val_ChanPerLine self.one_channel_for_all_lines = False else: - grouping_val = Constants.ChanForAllLines + grouping_val = Constants.Val_ChanForAllLines self.one_channel_for_all_lines = True - self.lib.CreateDIChan(lines, name, grouping_val) + self._create_args = (lines, name, grouping_val) + + super().__init__()#task=task, name=name) + + #self.lib.CreateDIChan(lines, name, grouping_val) class DigitalOutputChannel(Channel): @@ -195,16 +212,22 @@ class DigitalOutputChannel(Channel): See DigitalInputChannel """ + CHANNEL_TYPE = 'DO' + + CREATE_FUN = 'CreateDOChan' + def __init__(self, lines, name='', group_by='line'): if group_by == 'line': - grouping_val = Constants.ChanPerLine + grouping_val = Constants.Val_ChanPerLine self.one_channel_for_all_lines = False else: - grouping_val = Constants.ChanForAllLines + grouping_val = Constants.Val_ChanForAllLines self.one_channel_for_all_lines = True - self.lib.CreateDOChan(lines, name, grouping_val) + self._create_args = (lines, name, grouping_val) + + super().__init__()#task=task, name=name) class CountEdgesChannel(Channel): @@ -266,20 +289,24 @@ class CountEdgesChannel(Channel): CHANNEL_TYPE = 'CI' + CREATE_FUN = 'CreateCICountEdgesChan' + - def __init__ (self, counter, name="", edge='rising', init=0, direction='up'): + def __init__ (self, phys_counter, name="", edge='rising', init=0, direction='up', task=None): if edge == 'rising': - edge_val = Constants.RISING + edge_val = Constants.Val_Rising else: - edge_val = Constants.FALLING + edge_val = Constants.Val_Falling if direction == 'up': - direction_val = Constants.COUNT_UP + direction_val = Constants.Val_CountUp else: - direction_val = Constants.COUNT_DOWN + direction_val = Constants.Val_CountDown - self.lib.CreateCICountEdgesChan(counter, name, edge_val, direction_val) + self._create_args = (phys_counter, name, edge_val, init, direction_val) + + super().__init__(task=task, name=name) class LinearEncoderChannel(Channel): @@ -500,37 +527,117 @@ class MeasureFrequencyChannel(Channel): None. """ + CREATE_FUN = 'CreateCIFreqChan' + + def __init__(self, counter, name='', min_val=1e2, max_val=1e3, units="hertz", edge="rising", method="low_freq", - meas_time=1.0, divisor=1, custom_scale_name=None): + meas_time=1.0, divisor=1, custom_scale_name=None, + task=None): + self.data_type = float + name='test' + + assert divisor > 0 if method == 'low_freq': - meas_meth_val = Constants.LOW_FREQ1_CTR + meas_meth_val = Constants.Val_LowFreq1Ctr elif method == 'high_freq': - meas_meth_val = Constants.HIGH_FREQ2_CTR + meas_meth_val = Constants.Val_HighFreq2Ctr elif method == 'large_range': - meas_meth_val = Constants.LARGE_RANGE2_CTR + meas_meth_val = Constants.Val_LargeRng2Ctr if units != ('hertz', 'ticks'): custom_scale_name = units - units = Constants.FROM_CUSTOM_SCALE + units_val = Constants.Val_FromCustomScale else: custom_scale_name = None if units == 'hertz': - units = Constants.HZ + units_val = Constants.Val_Hz else: - units = Contstants.TICKS + units_val = Constants.Val_Ticks + + if edge == 'rising': + edge_val = Constants.Val_Rising + else: + edge_val = Constants.Val_Falling - self.lib.CreateCIFreqChan(counter, name, min_max[0], min_max[1], - units_val, edge_val, meas_meth_val, - meas_time, divisor, custom_scale_name) + self._create_args = (counter, name, min_val, max_val, units_val, + edge_val, meas_meth_val, meas_time, divisor, + custom_scale_name) + + super().__init__(task=task, name=name) +class CounterOutTicksChannel(Channel): + """ + Class for CounterOutputTicks Channel + + :class: + """ + + CREATE_FUN = 'DAQmxCreateCOPulseChanTicks' + + + + def __init__(self, counter, name='', source_terminal='', idle_state='high', init_delay=1, low_ticks=10, high_ticks=10, + task=None): + """ + Create channel(s) to generate digital pulses defined by the number of timebase ticks that the pulse is at a + high state and the number of timebase ticks that the pulse is at a low state and also adds the channel to the + task you specify with taskHandle. The pulses appear on the default output terminal of the counter unless you + select a different output terminal. + + + Args: + taskHandle: TaskHandle to which to add the channels that this function creates. + + counter: Name of the counter to use to create virtual channels. You can specify a list or range of physical + channels. + + nameToAssignToChannel: (string) name(s) to assign to the created virtual channel(s). If you do not specify + a name, NI-DAQmx uses the physical channel name as the virtual channel name. If you specify your own names + for nameToAssignToChannel, you must use the names when you refer to these channels in other NI-DAQmx + functions. If you create multiple virtual channels with one call to this function, you can specify a list + of names separated by commas. If you provide fewer names than the number of virtual channels you create, + NI-DAQmx automatically assigns names to the virtual channels. + + sourceTerminal: (string) terminal to which you connect an external timebase. You also can specify a source + terminal by using a terminal name. + + idleState: (int32) resting state of the output terminal. + Value Description + DAQmx_Val_High High state. + DAQmx_Val_Low Low state. + + initialDelay: (int32) number of timebase ticks to wait before generating the first pulse. + + lowTicks: (int32) The number of timebase ticks that the pulse is low. + + highTicks: (int32) number of timebase ticks that the pulse is high. + + Returns: + + status: The error code returned by the function in the event of an error or warning. A value of 0 + indicates success. A positive value indicates a warning. A negative value indicates an error. + + """ + + if idle_state == 'high': + idle_state_val = Constants.Val_High + else: + idle_state_val == 'low' + + + + self._create_args = (counter, name, source_terminal, idle_state_val, init_delay, low_ticks, high_ticks) + + + super().__init__(name=name, task=task) def create_channel_frequency(self, counter, name="", units='hertz', idle_state='low', delay=0.0, freq=1.0, duty_cycle=0.5): @@ -739,3 +846,21 @@ def create_channel_time(self, counter, name="", units="seconds", idle_state='low idle_state_val = self._get_map_value('idle_state', idle_state_map, idle_state) return CALL('CreateCOPulseChanTime', self, counter, name, units_val, idle_state_val, float64 (delay), float64(low_time), float64(high_time))==0 + + + +class CounterOutPulseChannel(Channel): + """ + Counter output pulse channel + """ + + def __init__(self): + """ + + + + Returns + -------- + + status: int32 + """ diff --git a/lantz/drivers/ni/daqmx/tasks.py b/lantz/drivers/ni/daqmx/tasks.py index eddfc73..a9a14c1 100644 --- a/lantz/drivers/ni/daqmx/tasks.py +++ b/lantz/drivers/ni/daqmx/tasks.py @@ -127,18 +127,22 @@ def read(self, samples_per_channel=None, timeout=10.0, group_by='channel'): number_of_channels = self.number_of_channels() if group_by == Constants.Val_GroupByScanNumber: - data = np.zeros((samples_per_channel, number_of_channels), dtype=np.float64) + data = np.zeros((samples_per_channel, number_of_channels), + dtype=np.float64) else: - data = np.zeros((number_of_channels, samples_per_channel), dtype=np.float64) + data = np.zeros((number_of_channels, samples_per_channel), + dtype=np.float64) - err, data, count = self.lib.ReadAnalogF64(samples_per_channel, timeout, group_by, - data.ctypes.data, data.size, RetValue('i32'), None) + err, count = self.lib.ReadAnalogF64(samples_per_channel, timeout, + group_by, data.ctypes.data, + data.size, RetValue('i32'), + None) if samples_per_channel < count: if group_by == 'scan': return data[:count] else: - return data[:,:count] + return data[:, :count] return data @@ -149,9 +153,11 @@ class AnalogOutputTask(Task): CHANNEL_TYPE = 'AO' + @Action(units=(None, None, 'seconds', None), values=(None, None, None, _GROUP_BY)) def write(self, data, auto_start=True, timeout=10.0, group_by='scan'): - """Write multiple floating-point samples or a scalar to a task + """ + Write multiple floating-point samples or a scalar to a task that contains one or more analog output channels. Note: If you configured timing for your task, your write is @@ -198,9 +204,10 @@ def write(self, data, auto_start=True, timeout=10.0, group_by='scan'): data = np.asarray(data, dtype = np.float64) + number_of_channels = self.number_of_channels() - if data.ndims == 1: + if data.ndim == 1: if number_of_channels == 1: samples_per_channel = data.shape[0] shape = (samples_per_channel, 1) @@ -218,7 +225,11 @@ def write(self, data, auto_start=True, timeout=10.0, group_by='scan'): else: samples_per_channel = data.shape[-1] - err, count = self.lib.WriteAnalogF64(samples_per_channel, auto_start, + #print(data) + + samps_per_channel = int(samples_per_channel) + + err, count = self.lib.WriteAnalogF64(samps_per_channel, auto_start, timeout, group_by, data.ctypes.data, RetValue('i32'), None) @@ -226,6 +237,7 @@ def write(self, data, auto_start=True, timeout=10.0, group_by='scan'): return count + class DigitalTask(Task): @Action(units=(None, 'seconds', None), values=(None, None, _GROUP_BY)) @@ -335,7 +347,8 @@ class DigitalInputTask(DigitalTask): """Exposes NI-DAQmx digital input task to Python. """ - CHANNEL_TYPE = 'DI' + IO_TYPE = 'DI' + class DigitalOutputTask(DigitalTask): @@ -344,6 +357,8 @@ class DigitalOutputTask(DigitalTask): CHANNEL_TYPE = 'DO' + + @Action(units=(None, None, 'seconds', None), values=(None, {True, False}, None, _GROUP_BY)) def write(self, data, auto_start=True, timeout=10.0, group_by='scan'): """ @@ -396,14 +411,16 @@ def write(self, data, auto_start=True, timeout=10.0, group_by='scan'): 'group_by_scan_number' - Group by scan number (interleaved). """ - number_of_channels = self.get_number_of_channels() + number_of_channels = self.number_of_channels() if np.isscalar(data): data = np.array([data]*number_of_channels, dtype = np.uint8) else: data = np.asarray(data, dtype = np.uint8) - if data.ndims == 1: + print(data) + + if data.ndim == 1: if number_of_channels == 1: samples_per_channel = data.shape[0] shape = (samples_per_channel, 1) @@ -422,8 +439,8 @@ def write(self, data, auto_start=True, timeout=10.0, group_by='scan'): samples_per_channel = data.shape[-1] err, count = self.lib.WriteDigitalLines(samples_per_channel, - bool32(auto_start), - float64(timeout), group_by, + auto_start, + timeout, group_by, data.ctypes.data, RetValue('u32'), None) return count @@ -434,7 +451,8 @@ class CounterInputTask(Task): """Exposes NI-DAQmx counter input task to Python. """ - CHANNEL_TYPE = 'CI' + IO_TYPE = 'CI' + def read_scalar(self, timeout=10.0): """Read a single floating-point sample from a counter task. Use @@ -503,14 +521,17 @@ def read(self, samples_per_channel=None, timeout=10.0): :return: The array of samples read. """ + if samples_per_channel is None: samples_per_channel = self.samples_per_channel_available() data = np.zeros((samples_per_channel,),dtype=np.int32) - err, count = self.lib.ReadCounterU32(samples_per_channel, float64(timeout), + + err, count = self.lib.ReadCounterU32(samples_per_channel, float(timeout), data.ctypes.data, data.size, RetValue('i32'), None) + return data[:count] @@ -523,3 +544,4 @@ class CounterOutputTask(Task): Task.register_class(AnalogInputTask) +Task.register_class(DigitalInputTask) diff --git a/lantz/drivers/signalrecovery/signalrecovery7265.py b/lantz/drivers/signalrecovery/signalrecovery7265.py new file mode 100644 index 0000000..8a3c0a1 --- /dev/null +++ b/lantz/drivers/signalrecovery/signalrecovery7265.py @@ -0,0 +1,349 @@ +from lantz import Feat, DictFeat, Action +from lantz.messagebased import MessageBasedDriver + +from collections import OrderedDict + +from time import sleep + + +class SignalRecovery7265(MessageBasedDriver): + """Signal Recovery 7265 + DSP Lock-in Amplifier + + Author: P. Mintun + Version: 0.0.1 + Date: 11/13/2015 + + "Stay safe and happy locking in..." + + This driver assumes that the COMM settings of the 7265 are set up as + follows: + + ADDRESS = N + TERMINATOR = [EOI] + TECH ECHO = DISABLED + + SDC=ADF 1 ENABLED + """ + + DEFAULTS = {'COMMON': {'write_termination': '', + 'read_termination': ''}} + + INT_EXT_REF = OrderedDict([ + ('int', 0), + ('rear ext', 1), + ('ext', 2) + ]) + + TIME_CONSTANTS = OrderedDict([ + (0, 10e-6), + (1, 20e-6), + (2, 40e-6), + (3, 80e-6), + (4, 160e-6), + (5, 320e-6), + (6, 640e-6), + (7, 5e-3), + (8, 10e-3), + (9, 20e-3), + (10, 50e-3), + (11, 100e-3), + (12, 0.2), + (13, 0.5), + (14, 1), + (15, 2), + (16, 5), + (17, 10), + (18, 20), + (19, 50), + (20, 100), + (21, 200), + (22, 500), + (23, 1e3), + (24, 2e3), + (25, 5e3), + (26, 10e3), + (27, 20e3), + (28, 50e3), + (29, 100) + ]) + + AC_GAINS = OrderedDict([ + ('0dB', 0), + ('10dB', 1), + ('20dB', 2), + ('30dB', 3), + ('40dB', 4), + ('50dB', 5), + ('60dB', 6), + ('70dB', 7), + ('80dB', 8), + ('90dB', 9), + ]) + + SENSITIVITIES = OrderedDict([ + ('2e-9V', 1), + ('5e-9V', 2), + ('1e-8V', 3), + ('2e-8V', 4), + ('5e-8V', 5), + ('1e-7V', 6), + ('2e-7V', 7), + ('5e-7V', 8), + ('1e-6V', 9), + ('2e-6V', 10), + ('5e-6V', 11), + ('1e-5V', 12), + ('2e-5V', 13), + ('5e-5V', 14), + ('1e-4V', 15), + ('2e-4V', 16), + ('5e-4V', 17), + ('1e-3V', 18), + ('2e-3V', 19), + ('5e-3V', 20), + ('1e-2V', 21), + ('2e-2V', 22), + ('5e-2V', 23), + ('1e-1V', 24), + ('2e-1V', 25), + ('5e-1V', 26), + ('1V', 27), + ]) + + @Feat() + def idn(self): + """ + Returns current instrument identification. + """ + return self.query('ID') + + @Feat() + def x(self): + """ + Read x value from lockin. + """ + read = self.query('X.') + return float(read.replace('\x00', '')) + + @Feat() + def y(self): + """ + Read y value from lockin. + """ + read = self.query('Y.') + return float(read.replace('\x00', '')) + + @Feat() + def xy(self): + """ + Read x and y values from lockin simultaneously. + """ + read = self.query('XY.') + return [float(x) for x in read.replace('\x00', '').split(',')] + + @Feat() + def magnitude(self): + """ + Read signal magnitude from lockin. + """ + read = self.query('MAG.') + return float(read.replace('\x00', '')) + + @Feat() + def phase(self): + """ + Read signal phase from lockin. + """ + read = self.query('PHA.') + return float(read.replace('\x00', '')) + + @Feat() + def mag_phase(self): + """ + Read signal magnitude and phase from lockin. + """ + read = self.query('MP.') + return [float(x) for x in read.replace('\x00', '').split(',')] + + @Action() + def autophase(self): + """ + Adds an offset to the phase of the lockin to minimize the y-channel + signal and maximize the x-channel signal. + """ + return self.write('AQN') + + @Feat(limits=(0, 29, 1)) + def time_constant_integer(self): + """ + Read current lockin time constant mode setting + """ + return int(self.query('TC')) + + @time_constant_integer.setter + def time_constant_integer(self, integer): + """ + Set lockin time constant. + """ + return self.write('TC{}'.format(integer)) + + @Feat(values=TIME_CONSTANTS) + def time_constant(self): + """ + Returns current time constant setting (in seconds). + """ + return float(self.query('TC.')) + + @time_constant.setter + def time_constant(self, time_const): + """ + Not implemented, not a built in functionality. + """ + print('Error: invalid operation, cannot directly set TC') + return 0 + + @Feat() + def frequency(self): + """ + Read current signal frequency. + """ + read = self.query('FRQ.') + return float(read.replace('\x00', '')) + + @Feat(limits=(0, 250e3)) + def oscillator_freq(self): + """ + Read internal oscillator frequency. + """ + return float(self.query('OF.')) + + @oscillator_freq.setter + def oscillator_freq(self, frequency): + """ + Set internal oscillator frequency. + """ + return self.write('OF.{}'.format(frequency)) + + @Feat(values=AC_GAINS) + def gain(self): + """ + Read current AC gain (dB). + """ + return int(self.query('ACGAIN')) + + @gain.setter + def gain(self, gain_value): + """ + Set current AC gain (dB) + """ + return self.write('ACGAIN{}'.format(gain_value)) + + @Feat(values=SENSITIVITIES) + def sensitivity(self): + """ + Gets sensitivity according to the SENSITIVITIES table. + """ + return int(self.query('SEN')) + + # @Feat(values=SENSITIVITIES) + # def sensitivity_int(self): + # """ + # Returns integer value of sensitivity as described by SENSITIVITIES. + # """ + # return int(self.query('SEN')) + + @sensitivity.setter + def sensitivity(self, sen_it): + """ + Sets value of sensitivity as described by SENSITIVITIES. + """ + return self.write('SEN{}'.format(sen_it)) + + @Action() + def autosensitivity(self): + """ + Runs an autosensitivity operation. + + The instrument adjusts its full-scale sensitivity so that the magnitude + output lies between 30-90 percent of full-scale + """ + a = self.write('AS') + sleep(7.5) # wait for operation to complete + return a + + @Action() + def autogain(self): + """ + Set current AC gain to automatic. + """ + return self.write('AUTOMATIC1') + + @Feat(values=INT_EXT_REF) + def int_ext_ref(self): + """ + Check if lockin is internally or externally referenced + """ + return int(self.query('IE')) + + @int_ext_ref.setter + def int_ext_ref(self, value): + """ + Set lockin to be internal or external reference + """ + return self.write('IE {}'.format(value)) + +if __name__ == '__main__': + with SignalRecovery7265.via_gpib(7) as inst: + print('The instrument identification is ' + inst.idn) + + print('Testing signal readings') + print('Signal X: {}V'.format(inst.x)) + print('Signal Y: {}V'.format(inst.y)) + print('Signal magnitude: {}V'.format(inst.magnitude)) + print('Signal phase: {}degrees'.format(inst.phase)) + + print('Testing full quadrature readings') + print('X,Y: {}V'.format(list(inst.xy))) + print('Magnitude, Phase: {}'.format(list(inst.mag_phase))) + inst.autophase + sleep(2) + print('Magnitude, Phase: {}'.format(list(inst.mag_phase))) + + print('Testing frequency code') + print('Ref f: {}Hz'.format(inst.frequency)) + inst.oscillator_freq = 137.0 + print('Ref f: {}Hz'.format(inst.frequency)) + inst.oscillator_freq = 17 + print('Ref f: {}Hz'.format(inst.frequency)) + # + print('Internal External Reference check') + print('Internal/ext reference: {}'.format(inst.int_ext_ref)) + inst.int_ext_ref = 'ext' + print('Internal/ext reference: {}'.format(inst.int_ext_ref)) + inst.int_ext_ref = 'int' + print('Internal/ext reference: {}'.format(inst.int_ext_ref)) + + print('Time constant check') + print('Int TC: {}'.format(inst.time_constant_integer)) + print('TC (sec): {}s'.format(inst.time_constant)) + inst.time_constant_integer = 15 + print('Int TC: {}'.format(inst.time_constant_integer)) + print('TC (sec): {}s'.format(inst.time_constant)) + inst.time_constant_integer = 10 + print('Int TC: {}'.format(inst.time_constant_integer)) + print('TC (sec): {}s'.format(inst.time_constant)) + + print('AC Gain Check') + print('AC Gain: {}'.format(inst.gain)) + inst.gain = '30dB' + print('AC Gain: {}'.format(inst.gain)) + inst.autogain + print('AC Gain: {}'.format(inst.gain)) + + print('Sensitivity Check') + print('Sen: {}'.format(inst.sensitivity)) + inst.sensitivity = '2e-8V' + print('Sen: {}'.format(inst.sensitivity)) + inst.autosensitivity() + print('Sen: {}'.format(inst.sensitivity)) diff --git a/lantz/drivers/thorlabs/ITC4020.py b/lantz/drivers/thorlabs/ITC4020.py new file mode 100644 index 0000000..bac16a4 --- /dev/null +++ b/lantz/drivers/thorlabs/ITC4020.py @@ -0,0 +1,113 @@ + + +from lantz import Feat, DictFeat, Action +from lantz.errors import InstrumentError +from lantz.messagebased import MessageBasedDriver + +from time import sleep + +class ITC4020(MessageBasedDriver): + + + DEFAULTS = {'COMMON': {'write_termination': '\n', + 'read_termination': '\n'}} + + COM_delay = 0.2 + + def write(self, *args, **kwargs): + super(ITC4020, self).write(*args, **kwargs) + sleep(self.COM_delay) + + @Feat(read_once=True) + def idn(self): + return self.query('*IDN?') + + @Feat + def key_locked(self): + return bool(int(self.query("OUTP:PROT:KEYL:TRIP?"))) + + @Feat + def temperature(self): + return float(self.query("MEAS:SCAL:TEMP?")) + + @Feat + def temperature_setpoint(self): + return float(self.query("SOUR2:TEMP?")) + + @Feat + def LD_current_setpoint(self): + return float(self.query("SOUR:CURR?")) + + @Feat(limits=(0,1)) + def LD_current(self): + return float(self.query("MEAS:CURR?")) + + @LD_current.setter + def LD_current(self, value): + inst.write("SOUR:CURR {:.5f}".format(value)) + + @Feat + def output_state(self): + return bool(int(self.query("OUTP:STAT?"))) + + @Feat + def PD_power(self): + return float(self.query("MEAS:POW2?")) + + @Action() + def read_error_queue(self): + no_error = '+0,"No error"' + error = inst.query("SYST:ERR:NEXT?") + while(error != no_error): + print(error) + error = inst.query("SYST:ERR:NEXT?") + + @Action() + def turn_on_seq(self, temp_error=0.05, current_error=0.005): + if self.output_state: + print("Laser is already ON!") + return + + #Turn ON sequence: + # 1. TEC ON + # 2. Wait for temperature == set_temperature + # 3. LD ON + # 4. Wait for current == set_current + + # 1. TEC ON + self.write("OUTP2:STAT ON") + + # 2. Wait + setpoint = self.temperature_setpoint + while(abs(setpoint-self.temperature)>temp_error):pass + + + # 3. LD ON + self.write("OUTP1:STAT ON") + + # 4. Wait + setpoint = self.LD_current_setpoint + while(abs(setpoint-self.LD_current)>current_error):pass + + @Action() + def turn_off_seq(self, current_error=0.005): + #Turn OFF sequence: + # 1. LD OFF + # 2. Wait for current == 0 + # 3. TEC OFF + + # 1. LD OFF + self.write("OUTP1:STAT OFF") + + # 2. Wait + while(abs(self.LD_current)>current_error):pass + + # 1. TEC OFF + self.write("OUTP2:STAT OFF") + + + +if __name__ == '__main__': + inst = ITC4020('USB0::0x1313::0x804A::M00336070::INSTR') + inst.initialize() + print(inst.query('*IDN?')) diff --git a/lantz/log.py b/lantz/log.py index aec47e3..7eaf20a 100644 --- a/lantz/log.py +++ b/lantz/log.py @@ -23,6 +23,7 @@ from socketserver import (ThreadingUDPServer, DatagramRequestHandler, ThreadingTCPServer, StreamRequestHandler) +from colorama import Fore, Back, Style from stringparser import Parser diff --git a/setup.py b/setup.py index 2091f50..509508a 100644 --- a/setup.py +++ b/setup.py @@ -62,6 +62,7 @@ def read(filename): 'lantz.utils', 'lantz.drivers'] + ['lantz.drivers.' + company for company in companies] + + ['lantz.drivers.ni.daqmx'] + ['lantz.drivers.legacy.' + company for company in legacy_companies], test_suite='lantz.testsuite.testsuite', install_requires=['pint>=0.6',