diff --git a/lantz/drivers/newport/842PE.py b/lantz/drivers/newport/842PE.py new file mode 100644 index 0000000..52ba2b2 --- /dev/null +++ b/lantz/drivers/newport/842PE.py @@ -0,0 +1,680 @@ +# -*- coding: utf-8 -*- +""" + lantz.drivers.newport.842PE + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Newport 842-PE powermeter + + :copyright: 2013 by Lantz Authors, see AUTHORS for more details. + :license: BSD, see LICENSE for more details. +""" +# Cheched working with following heads: 818-UV, 818P, 918D-UV +# This device also has a binary mode, witch is not implemented here. + +#TODO sometime it craches, leaving crap in the buffer +# should clear buffer at initialisation + +import time +import warnings + +from lantz import Action, Feat, Q_ +from lantz.serial import SerialDriver +from lantz.errors import InstrumentError + +nanometer = Q_(1, 'nm') +watt = Q_(1, 'W') +joule = Q_(1, 'J') +second = Q_(1, 's') +hertz = Q_(1, 'Hz') + + +class N842PE(SerialDriver): + """Newport 842-PE powermeter + """ + + ENCODING = 'ascii' + + # This string is returned by the device when the action/setter is done + # It is needeed because when there is an error it looks like this: + # 'Error XXX: YYY\r\nACK\r\n' + # So if we just detect a RECV_TERMINATION the next query will be polluted + # To use a command/setter, we then should use send_command() instead of + # query() + RECV_DONE = 'ACK\r\n' + + RECV_TERMINATION = '\r\n' + SEND_TERMINATION = '\r\n' + + BAUDRATE = 115200 + BYTESIZE = 8 + PARITY = 'none' + STOPBITS = 1 + + TIMEOUT = 10 + + #These are the whole scales the controller can reach + #No every head can whitstand all of them + Scales = ('auto', '1p', '3p', '10p', '30p', '100p', '300p', '1n', + '3n', '10n', '30n', '100n', '300n', '1u', '3u', '10u', + '30u', '100u', '300u', '1m', '3m', '10m', '30m', + '100m', '300m', '1', '3', '10', '30', '100', '300', + '1k', '3k', '10k', '30k', '100k', '300k', '1M', '3M', + '10M', '30M', '100M', '300M',) + + #TODO: is this functin complete? + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.initialize() + self.DeviceStatus = self.status # Read the device status + + #These are the whole scales the head can reach + self.ScalesHead = self.available_scales() + + def available_scales(self): + """ + Returns valid scales for the current head + """ + #check limits of the head + if self.energy_mode: #Joules + max_scale = self.DeviceStatus['ScaleMaxEnergy'] + min_scale = self.DeviceStatus['ScaleMinEnergy'] + else: + max_scale = self.DeviceStatus['ScaleMaxPower'] + min_scale = self.DeviceStatus['ScaleMinPower'] + + out = list(self.Scales[min_scale:max_scale+1]) + out.insert(0,self.Scales[0]) # 'auto' + return tuple(out) + + #Display commands + + display = Feat(values={'real-time': 0, 'histogram': 1, 'statistic': 2, + 'needle': 3, 'lineplot': 4}) + + @display.setter + def display(self, value): + """Set the device’s on-screen display mode. (sdu) + """ + self.send_command('*sdu {}'.format(value)) + + @Feat(values=set(Scales)) + def scale(self): + """Get the display range. (ssa) + """ + out = self.DeviceStatus['CurrentScale'] + return self.Scales[out] + + #TODO Is there is a cleaner way to do this? + #The problem is that the controler accepts a range of scales but the head has a narrower range + #This range (head'd one) is only known after initialisation + @scale.setter + def scale(self, value): + """Set the display of the current data into a specific range. (ssa) + """ + #Prevent setting the controler to a scale not compatible with the head + if value in self.ScalesHead: + self.send_command('*ssa {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + + @Action(values=set(('decrease','increase'))) + #An helper function to increase or decrease the scale + def change_scale(self, value): + """Increase or decrease the scale. + """ + current_scale = self.DeviceStatus['CurrentScale'] + #check limits + if self.energy_mode: #Joules + max_scale = self.DeviceStatus['ScaleMaxEnergy'] + min_scale = self.DeviceStatus['ScaleMinEnergy'] + else: + max_scale = self.DeviceStatus['ScaleMaxPower'] + min_scale = self.DeviceStatus['ScaleMinPower'] + #increase or decrease the scale + mod = 1 if (value == 'increase') else -1 + new_scale = current_scale + mod + #cannot go furter limits + if (new_scale > max_scale) or (new_scale < min_scale): + new_scale = current_scale + #actually change the scale + self.scale = self.Scales[new_scale] + + dBm = Feat(values={False: 0, True: 1}) + + @dBm.setter + def dBm(self, value): + """Set the on-screen display unit to dBm. (dbu) + """ + if not('Photodiode' in self.DeviceStatus['HeadType']): + warnings.warn('Cannot put dBm without a Photodiode Detector') + else: # dBm can only be set with Photodiode Detectors + self.send_command('*dbu {}'.format(value)) + + @Feat(values={False: 0, True: 1}) + def high_resolution(self): + """Get the high resolution digits on the on-screen reading. (shl) + """ + return self.DeviceStatus['HighResolutionDisplay'] + + @high_resolution.setter + def high_resolution(self, value): + """Set the high resolution digits on the on-screen reading. (shl) + """ + self.send_command('*shl {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + + #MEASUREMENT COMMANDS - DATA ACQUISITION + + @Feat() + def current_value(self): + """Get the value that is currently being displayed on the device’s + screen. The value is displayed in Watts or in Joules (not in dBm). + (cvu) + """ + out = float(self.parse_query('*cvu', + format='Current Value: {}')) + if self.energy_mode: # joules + return Q_(out, joule) + else: # watts + return Q_(out, watt) + + @Feat(values={False: 'New Data Not Available', True: 'New Data Available'}) + def value_available(self): + """Check whether a new value is available from + the device. Though optional, its use is recommended when used with + single pulse operations. (nvu) + """ + return self.query('*nvu') + + @Feat() + def get_statistics(self): + """Get all the statistics data, provided that the device has previously + been set into statistic mode. (vsu) + """ + if self.energy_mode: #Joules + out = self.parse_query('*vsu', format = 'Current Value: {[CurrentValue]:f}\t' # joule + 'Maximum: {[Maximum]:f}\t' # joule + 'Minimum: {[Minimum]:f}\t' # joule + 'Average: {[Average]:f}\t' # joule + 'Standard Deviation: {[StandardDeviation]:f}\t' # joule + 'RMS stability: {[RMSstability]:f}%\t' # % + 'PTP stability: {[PTPstability]:f}%\t' # % + 'Pulse Number: {[PulseNumber]:d}\t' # integer + 'Total Pulses: {[TotalPulses]:d}\t' # integer + 'Average Power: {[AveragePower]:f}\t' #joule + 'Rep Rate: {[RepRate]:f}\t' # Hz + 'Uncorrected Value: {[UncorrectedValue]:f}') # joule + + # transform the list of dictionaries into a single dictionary + statistics = reshape_list_dict_in_dict(out) + + #units + statistics['CurrentValue'] = Q_(statistics['CurrentValue'], joule) + statistics['Maximum'] = Q_(statistics['Maximum'], joule) + statistics['Minimum'] = Q_(statistics['Minimum'], joule) + statistics['StandardDeviation'] = Q_(statistics['StandardDeviation'], joule) + statistics['AveragePower'] = Q_(statistics['AveragePower'], joule) + statistics['RepRate'] = Q_(statistics['RepRate'], hertz) + statistics['UncorrectedValue'] = Q_(statistics['UncorrectedValue'], joule) + + else: #Watt + out = self.parse_query('*vsu', format='Current Value: {[CurrentValue]:f}\t' # watt + 'Maximum: {[Maximum]:f}\t' # watt + 'Minimum: {[Minimum]:f}\t' # watt + 'Average: {[Average]:f}\t' # watt + 'Standard Deviation: {[StandardDeviation]:f}\t' # watt + 'RMS stability: {[RMSstability]:f}%\t' # % + 'PTP stability: {[PTPstability]:f}%\t' # % + 'Time: {[Time]:d}\t' # second + 'Acquisition Time: {[AcquisitionTime]:d}\t' # second + 'Uncorrected Value: {[UncorrectedValue]:f}') # watt + + # transform the list of dictionaries into a single dictionary + statistics = reshape_list_dict_in_dict(out) + + #units + statistics['CurrentValue'] = Q_(statistics['CurrentValue'], watt) + statistics['Maximum'] = Q_(statistics['Maximum'], watt) + statistics['Minimum'] = Q_(statistics['Minimum'], watt) + statistics['StandardDeviation'] = Q_(statistics['StandardDeviation'], watt) + statistics['Time'] = Q_(statistics['Time'], second) + statistics['AcquisitionTime'] = Q_(statistics['AcquisitionTime'], second) + statistics['UncorrectedValue'] = Q_(statistics['UncorrectedValue'], watt) + + return statistics + + logging = Feat(values={'stop': 0, 'start raw': 1, 'start saving': 2, + 'save both': 3}) + + @logging.setter + def logging(self, value): + """set logging data on the 842-PE meter’s EEPROM. (log) + """ + self.send_command('*log {}'.format(value)) + + #Buggy + @Feat() + def get_log(self): + """Retrieve a logged file from the device. (fdl) + """ + #return self.send_command('*fdl {}'.format(value)) + #return self.send_command('*fdl 0') + self.send('*fdl 0') + time.sleep(1) + return self.recv(termination='chr(26)\n') + + warnings.warn('This function does not work!') + + #Buggy + @Feat() + def get_data(self): + """Get data according to the data sampling setting. The maximum + transfer speed is 200Hz. (cau) + """ + #return self.send_command('*cau') + warnings.warn('This function does not work!') + + #MEASUREMENT COMMANDS - SETUP + + + @Feat(units='nm') + def wavelength(self): + """Get the wavelength being used on the + detector. (swa) + """ + return self.DeviceStatus['ActiveWavelength'] + + #TODO is there is a cleaner way to do this? + #I don't know if/how we can use limits, because it depends on the head we are using and only known after initialisation + #TODO value should be a int + @wavelength.setter + def wavelength(self, value): + """Set the wavelength being used on the + detector. (swa) + """ + #Prevent setting to a wavekength outside of the head range + if (value >= self.DeviceStatus['MinWavelengthIndex']) and (value <= self.DeviceStatus['MaxWavelengthIndex']): + self.send_command('*swa {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + + @Feat(values={False: 0, True: 1}) + def attenuator(self): + """Get if whether the detector is using an external attenuator or not. + (atu) + """ + return self.DeviceStatus['Attenuator'] + + @attenuator.setter + def attenuator(self, value): + """Set if whether the detector is using an external attenuator or not. + This is used to adjust the processing of the meter with the readings of + the detector. (atu) + """ + self.send_command('*atu {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + + @Feat() + def multiplier1(self): + """Get the value of multiplier1. (smu) + """ + return self.DeviceStatus['Multiplier1'] + + @multiplier1.setter + def multiplier1(self, multiplier): + """Set the value of multiplier1. (smu) + """ + self.send_command('*smu 1 {}'.format(multiplier)) + self.DeviceStatus = self.status # update cached device status + + @Feat() + def multiplier2(self): + """Get the value of multiplier2. (smu) + """ + return self.DeviceStatus['Multiplier2'] + + @multiplier2.setter + def multiplier2(self, multiplier): + """Set the value of multiplier2. (smu) + """ + self.send_command('*smu 2 {}'.format(multiplier)) + self.DeviceStatus = self.status # update cached device status + + @Feat() + def offset1(self): + """Get the value of offset1. (sou) + """ + return self.DeviceStatus['Offset1'] + + @offset1.setter + def offset1(self, offset): + """Set the value of offset1. (sou) + """ + self.send_command('*sou 1 {}'.format(offset)) + self.DeviceStatus = self.status # update cached device status + + @Feat() + def offset2(self): + """Get the value of offset2. (sou) + """ + return self.DeviceStatus['Offset2'] + + @offset2.setter + def offset2(self, offset): + """Set the value of offset2. (sou) + """ + self.send_command('*sou 2 {}'.format(offset)) + self.DeviceStatus = self.status # update cached device status + + SampleRateUnit = {'Seconds': '0', 'Minutes': '1', 'Hours': '2', + 'Days': '3', '% of pulses': '4'} + SamplePeriodUnit = {'Seconds': '0', 'Minutes': '1', 'Hours': '2', + 'Days': '3', 'Weeks': '4', 'Pulses': '5'} + TotalDurationUnit = {'Period': '0', 'Seconds': '1', 'Minutes': '2', + 'Hours': '3', 'Days': '4', 'Weeks': '5', + 'Continuous': '6', 'Points': '7'} + TimeStampEnabled = {False: 0, True: 1} + + #TODO units ? + @Feat(values=(None, SampleRateUnit, None, SamplePeriodUnit, None, + TotalDurationUnit, TimeStampEnabled,)) + def sampling(self): + """Get the current data sampling settings. (rds) + """ + out = self.parse_query('*rds', format='Sample Rate: {} / {}\t' + 'Sample Period: {} / {}\t' + 'Total Duration: {} {}\t' + 'Time Stamp: {}') + #For out[i] where i is odd, the device returns human readable units + #('Seconds', etc) (i.e. the keys of the values's dictionnaries). So + #we need to convert them to 'numbers' (i.e. values of the keys). + #For example, the device returns 'Seconds' instead of '0' for out[1] + out[0] = int(out[0]) + out[1] = self.SampleRateUnit[out[1]] + out[2] = int(out[2]) + out[3] = self.SamplePeriodUnit[out[3]] + out[4] = int(out[4]) + if out[5] == 'Continous': # The is a typo in returned value from the + out[5] = 'Continuous' # device! + out[5] = self.TotalDurationUnit[out[5]] + out[6] = 1 if (out[6] == 'ON') else 0 + return out + + @sampling.setter + def sampling(self, value): + """Set the data sampling parameters for the logging and statistics + environments. (dsu) + """ + # Using '% of pulses' as SampleRateUnit + # or 'Pulses' as SamplePeriodUnit + # is not valid in Power Mode (and will create an error) + if not self.energy_mode: # we are in Power Mode + if (value[1] == '4') or (value[3] == '5'): + warnings.warn('Setting not available with Power Mode') + return + self.send_command('*dsu {} {} {} {} {} {} {}'.format(*value)) + + @Feat(limits=(1,100)) + def trigger(self): + """Get the internal trigger level when using the device in energy + reading mode. (tla) + """ + return self.DeviceStatus['TrigLevel'] + + @trigger.setter + def trigger(self, value): + """Set the internal trigger level when using the device in energy + reading mode. (tla) + """ + if self.energy_mode: #Energy mode + self.send_command('*tla {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + else: #Watt mode. This command is not valid + pass + #TODO should raise an error as it is not valid to set the trigger in watt mode + + #MEASUREMENT COMMANDS - MEASUREMENT CONTROL + + set_statistics = Feat(values={'disable': 0, 'enable': 1, 'reset': 2}) + + @set_statistics.setter + def set_statistics(self, value=True): + """Used start, stop and reset the statistics calculating process on the + data currently being acquisitioned. (esu) + """ + self.send_command('*esu {}'.format(value)) + + @Feat(values={False: 0, True: 1}) + def energy_mode(self): + return self.DeviceStatus['EnergyMode'] + + @energy_mode.setter + def energy_mode(self, value): + """Toggle the Energy mode when using a high power detector. (sca) + """ + if not('Photodiode' in self.DeviceStatus['HeadType']): + self.send_command('*sca {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + self.ScalesHead = self.available_scales() + else: # energy_mode cannot be activated with photodiodes + warnings.warn('Cannot put energy_mode with a Photodiode') + + @Feat(values={False: 0, True: 1}) + def anticipation(self): + """Get whether the anticipation processing is active when the device is + reading from a wattmeter or not. (eaa) + """ + return self.DeviceStatus['Anticipation'] + + @anticipation.setter + def anticipation(self, value): + """Enable or disable the anticipation processing when the device is + reading from a wattmeter. (eaa) + """ + self.send_command('*eaa {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + + set_zero = Feat(values={'off': 0, 'on': 1, 'undo': 2}) + + @set_zero.setter + def set_zero(self, value): + """Set the instrument zero mode. (eoa) + + off: not active + on: current value is substracted from all future measurements the + moment the command is issued. + undo: + """ + self.send_command('*eoa {}'.format(value)) + + #INSTRUMENT AND DETECTOR INFORMATION COMMANDS + + @Feat() + def idn(self): + """Get the device to get information about the firmware version and the + device type. (ver) + """ + return self.query('*ver') + + # The device is a bit buggy for this one as it return the info with 3 \n\r. + # So the hack here: we have to read it 3 times to read everything + # In addition, it seems to never return the last field (dBm). + @Feat() + def status(self): + """Query the current device status. (sta) + It stores it in self.Device Status for later retrival by other query + function. + """ + DeviceStatus = {} + out = self.parse_query('*sta', format='Head Type: {[HeadType]}\t' # WattMeter, Photodiode, others? + 'Head Version: {[HeadVersion]}\t' # Is that really the head version? + 'Head Serial Number: {[HeadSerialNumber]}\t{[HeadName]}\t' + 'Calibration Sensitivity: {[CalibrationSensitivity]:f} V/W\t' # unit V/W + 'Default WaveLength: {[CalibrationWavelength]} nm\t' # unit nm, seems non-sense for photodiodes. + 'Active Sensitivity: {[ActiveSensitivity]:f} V/W\t' # unit V/W + 'Active WaveLength: {[ActiveWavelength]:f} nm\t' # unit nm + 'Scale Min Power : {[ScaleMinPower]:d}\t' + 'Scale Max Power: {[ScaleMaxPower]:d}\t' + 'Scale Min Energy : {[ScaleMinEnergy]}\t' # is N/A for photodiodes + 'Scale Max Energy: {[ScaleMaxEnergy]}\t' # is N/A for photodiodes + 'Current Scale: {[CurrentScale]:d}\t' + 'Energy Mode: {[EnergyMode]}\t' + 'Anticipation: {[Anticipation]}\t' + 'Trig Level: {[TrigLevel]:f}%\t' # unit % + 'Zero Offset: {[Zero Offset]:f}\t' + 'Multiplier #1: {[Multiplier1]:f}\t' + 'Offset #1: {[Offset1]:f}\t' + 'Multiplier #2: {[Multiplier2]:f}\t' + 'Offset #2: {[Offset2]:f}\t' + 'Currently Logging data: {[CurrentlyLoggingData]}\t' + 'Analog Output: {[AnalogOutput]}\t' + 'Resolution: {[Resolution]:f}\t' + 'Currently Calculating Stats: {[CurrentlyCalculatingStats]}\t' + 'High Resolution Display: {[HighResolutionDisplay]}\t' + #Putting nex two fields as float will produce an error with some heads + 'Min Wavelength index: {[MinWavelengthIndex]:d}\t' # For photodiodes, it's the minimum wavelength in nm + 'Max Wavelength index: {[MaxWavelengthIndex]:d}\t' # For photodiodes, it's the minimum wavelength in nm + 'Upper Bound: {[UpperBound]:f}\t' + 'Lower Bound: {[LowerBound]:f}\t' + 'Reference Value: {[ReferenceValue]:f}\t' + 'P/F Status: {[P/FStatus]}\t' # N/A, .... (TODO: To be tested) + 'Threshold: {[Threshold]}') # On, Off (TODO: To be tested) + # add remaining parameters (see description of this bug just before the function declaration + out.append(self.parse_query('', format='Attenuator: {[Attenuator]}')) + out.append(self.parse_query('', format='AutoScale: {[AutoScale]}')) + + # transform the list of dictionaries in a single dictionary + DeviceStatus = reshape_list_dict_in_dict(out) + + # units + DeviceStatus['ActiveWavelength'] = Q_(DeviceStatus['ActiveWavelength'], + nanometer) + DeviceStatus['CalibrationWavelength'] = Q_(DeviceStatus['CalibrationWavelength'], nanometer) + + DeviceStatus['EnergyMode'] = 1 if (DeviceStatus['EnergyMode'] == 'On') else 0 + DeviceStatus['Anticipation'] = 1 if (DeviceStatus['Anticipation'] == 'On') else 0 + DeviceStatus['CurrentlyLoggingData'] = 1 if (DeviceStatus['CurrentlyLoggingData'] == 'Yes') else 0 + DeviceStatus['AnalogOutput'] = 1 if (DeviceStatus['AnalogOutput'] == 'On') else 0 + DeviceStatus['CurrentlyCalculatingStats'] = 1 if (DeviceStatus['CurrentlyCalculatingStats'] == 'Yes') else 0 + DeviceStatus['HighResolutionDisplay'] = 1 if (DeviceStatus['HighResolutionDisplay'] == 'On') else 0 + DeviceStatus['Attenuator'] = 1 if (DeviceStatus['Attenuator'] == 'On') else 0 # On, Off or N/A. N/A is when there is not + DeviceStatus['AutoScale'] = 1 if (DeviceStatus['AutoScale'] == 'On') else 0 + + if DeviceStatus['HeadType'] == 'WattMeter': # WattMeters have a valid ScaleMinEnergy and ScaleMaxEnergy + # We cannot use self.energy_mode in the test condition because it is not + # defined at instrument initialisation, so it creates an error + DeviceStatus['ScaleMinEnergy'] = int(DeviceStatus['ScaleMinEnergy']) + DeviceStatus['ScaleMaxEnergy'] = int(DeviceStatus['ScaleMaxEnergy']) + + if DeviceStatus['EnergyMode'] == 1: # energy mode (joules) + DeviceStatus['Resolution'] = Q_(DeviceStatus['Resolution'], joule) + else: # power mode (watt) + DeviceStatus['Resolution'] = Q_(DeviceStatus['Resolution'], watt) + + return DeviceStatus + + @Feat() + def battery(self): + """Get the device’s remaining battery power. (bat) + """ + return self.query('*bat') + + clock = Feat() + + @clock.setter + def clock(self, t): + """Set the time and date of the monitor's + internal clock. (clk) + + t is a time.struct_time + """ + s = time.strftime('*clk %d %m %Y %I %M %S ', t) + pm = '1' if time.strftime('%p', t) == 'PM' else '0' # PM -> 1, AM -> 0 + s += pm + self.send_command(s) + + #INSTRUMENT CONTROL COMMANDS + + backlight = Feat(values={False: 0, True: 1}) + + @backlight.setter + def backlight(self, value=True): + """Set the backlight of the device display on. (blk) + """ + self.send_command('*bkl {}'.format(value)) + + @Feat(values={False: 0, True: 1}) + def analog_port(self): + """Get whether the output of the current value on the analog port of + the meter is enabled or not. (ano) + """ + return self.DeviceStatus['AnalogOutput'] + + @analog_port.setter + def analog_port(self, value): + """Enable or disable the output of the current value on the analog port + of the meter. (ano) + """ + self.send_command('*ano {}'.format(value)) + self.DeviceStatus = self.status # update cached device status + + #COMMUNICATIONS COMMANDS + + @Action(values=set((2400, 9600, 14400, 19200, 38400, 155200))) + def baud_rate(self, value=True): + """Set the current baud rate of the serial port of the device. (brs) + """ + self.send_command('*brs {}'.format(value)) + + def query(self, command, send_args=(None, None), + recv_args=(None, None)): + answer = super().query(command, send_args=send_args, + recv_args=recv_args) + if answer == 'ERROR': + raise InstrumentError + return answer + + def send_command(self, s): + """Send a command (@Action or setter to the device) and wait for + RECV_DONE. + """ + self.send(s) + self.recv(termination=self.RECV_DONE) # wait for RECV_DONE + + #Helper functions + def print_device_status(self): + #Print dictionary (for debug) + for key in sorted(self.DeviceStatus.keys()): + print ("%s \t %s" % (key, self.DeviceStatus[key])) + +def reshape_list_dict_in_dict(list_of_dict): + # transform a list of dictionaries in a single dictionary + out = {} + for i in list_of_dict: + out.update(i) + return out + +if __name__ == '__main__': + import argparse + import lantz.log + + parser = argparse.ArgumentParser(description='Newport 842-PE powermeter') + parser.add_argument('-i', '--interactive', action='store_true', + default=False, help='Show interactive GUI') + parser.add_argument('-p', '--port', type=str, default='17', + help='Serial port to connect to') + parser.add_argument('-t', '--test', action='store_true', + default=False, help='Test the device') + + args = parser.parse_args() + lantz.log.log_to_screen(lantz.log.DEBUG) + with N842PE(args.port) as inst: + if args.interactive: + from lantz.ui.qtwidgets import start_test_app + start_test_app(inst) + elif args.test: + import test842PE + test842PE.test_instrument(inst) + else: + inst.energy_mode = False + time.sleep(2) + inst.energy_mode = True + diff --git a/lantz/drivers/newport/__init__.py b/lantz/drivers/newport/__init__.py new file mode 100644 index 0000000..0c278c3 --- /dev/null +++ b/lantz/drivers/newport/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +""" + lantz.drivers.newport + ~~~~~~~~~~~~~~~~~~~ + + :company: Newport + :description: + :website: http://www.newport.com + + ---- + + :copyright: 2013 by Lantz Authors, see AUTHORS for more details. + :license: BSD, see LICENSE for more details. +""" + +from .842PE import N842PE + +__all__ = ['N842PE',] + diff --git a/lantz/drivers/newport/test842PE.py b/lantz/drivers/newport/test842PE.py new file mode 100644 index 0000000..17686ff --- /dev/null +++ b/lantz/drivers/newport/test842PE.py @@ -0,0 +1,336 @@ +# -*- coding: utf-8 -*- +""" + lantz.drivers.newport.842PE + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Newport 842-PE powermeter + + :copyright: 2013 by Lantz Authors, see AUTHORS for more details. + :license: BSD, see LICENSE for more details. +""" +# Test functionnalities of the powermeter +import time + +def test_instrument(inst): + print('Testing instrument...') + + if False: + print("test energy_mode") + test = (False, True,) + for i in test: + print('SET : %s' % (i,)) + inst.energy_mode = i + print('Get : %s' % (inst.energy_mode,)) + time.sleep(1) + print("---") + + print("test everything else against energy_mode") + test = (False, True,) + for i in test: + print("-----------------------------------------") + print('test with energy mode = %s' % (i,)) + inst.energy_mode = i + print('Energy mode : %s' % (inst.energy_mode,)) + time.sleep(1) + if inst.energy_mode != i: + pass # Cannot be in Energy Mode with this detector/head + else: + print("---") + + print("Set auto scale") + inst.scale = 'auto' + time.sleep(1) + + print("Instrument status:") + inst.print_device_status() + print(inst.status) + print("-----") + + if False: + print("Instrument status:") + #print(inst.status) # can also work (but un-ordered) + inst.print_device_status() + + if False: + #test display type. + #'statistic' and 'lineplot' doesn't seem to produce anything + test = ('real-time', 'histogram', 'statistic', 'needle', 'lineplot',) + for i in test: + time.sleep(2) + print('SET Display type: %s' % (i,)) + inst.display = i + time.sleep(.2) + inst.display = 'real-time' + + if False: + #test display scale + print("Available scales") + print(inst.ScalesHead) + print("Test over all scales") + for i in inst.Scales: + time.sleep(.5) + print("---") + print('SET Display scale: %s' % (i,)) + inst.scale = i + print('GET : %s' % (inst.scale,)) + + if False: + #check limits + if inst.energy_mode: #Joules + max_scale = inst.DeviceStatus['ScaleMaxEnergy'] + min_scale = inst.DeviceStatus['ScaleMinEnergy'] + else: + max_scale = inst.DeviceStatus['ScaleMaxPower'] + min_scale = inst.DeviceStatus['ScaleMinPower'] + start_scale = int((min_scale + max_scale)/2) + #Put the head to its mid range scale + print("Put scale to mid range") + start_scale_str = inst.Scales[start_scale] + inst.scale = start_scale_str + + print("Increase scale many times") + for i in range(int((max_scale-min_scale)/2)+3): + inst.change_scale('increase') + print('New scale : %s' % (inst.scale,)) + time.sleep(.5) + + print("Decrease scale many times") + for i in range((max_scale-min_scale)+3): + inst.change_scale('decrease') + print('New scale : %s' % (inst.scale,)) + time.sleep(.5) + + if False: + #test dBm mode + print("Testing dBm mode") + test = (True, False,) + for i in test: + print("---") + print('SET : %s' % (i,)) + inst.dBm = i + time.sleep(1) + + if False: + print("test high_resolution mode") + test = (True, False,) + for i in test: + print("---") + print('SET : %s' % (i,)) + inst.high_resolution = i + print('Get : %s' % (inst.high_resolution,)) + time.sleep(1) + + if False: + print("Testing getter current_value") + print("Current Value:") + print(inst.current_value) + + if False: + print("Testing getter value_available") + print("Value Available:") + print(inst.value_available) + + if False: + print("Testing getter get_statistics") + print("Statistics:") + print(inst.get_statistics) + + + if False: + print("test logging (setter)") + test = ('start raw', 'stop', 'start saving', 'stop', 'save both', 'stop') + for i in test: + print('SET : %s' % (i,)) + inst.logging = i + time.sleep(1) + print("---") + + + if True: + print("test get_log") + print("set sampling") + # Set a single data acquisition run for 5 seconds with timestamps at 10 hertz. + sampling_settings = (2, 'Seconds', 1, 'Seconds', 1, 'Period', True) + inst.sampling = sampling_settings + print("start acquisition") + inst.logging = 'save both' + print("wait...") + time.sleep(2) + inst.get_log(0) + + # #Buggy + # @Feat() + # def get_log(inst, value): + + # #Buggy + # @Feat() + # def get_data(inst): + + if False: + print("test wavelength") + test = (500, 700, 900, 1100, 1300, 300, 100, -100,) + for i in test: + print('SET : %s' % (i * nanometer,)) + inst.wavelength = i * nanometer + print('Get : %s' % (inst.wavelength,)) + time.sleep(1) + print("---") + + if False: + print("test attenuator mode") + test = (True, False,) + for i in test: + print('SET : %s' % (i,)) + inst.attenuator = i + print('Get : %s' % (inst.attenuator,)) + time.sleep(1) + print("---") + + #TODO sometime, we run on Timeouts + import numpy as np + if False: + print("test multiplier1") + for j in range(10): + i = (np.random.random_sample()-.5) * 4 + print('SET : %s' % (i,)) + inst.multiplier1 = i + print('Get : %s' % (inst.multiplier1,)) + time.sleep(1) + print("---") + + import numpy as np + if False: + print("test multiplier2") + for j in range(10): + i = (np.random.random_sample()-.5) * 4 + print('SET : %s' % (i,)) + inst.multiplier2 = i + print('Get : %s' % (inst.multiplier2,)) + time.sleep(1) + print("---") + + import numpy as np + if False: + print("test offset1") + for j in range(10): + i = (np.random.random_sample()-.5) * 4 + print('SET : %s' % (i,)) + inst.offset1 = i + print('Get : %s' % (inst.offset1,)) + time.sleep(1) + print("---") + + import numpy as np + if False: + print("test offset2") + for j in range(10): + i = (np.random.random_sample()-.5) * 4 + print('SET : %s' % (i,)) + inst.offset2 = i + print('Get : %s' % (inst.offset2,)) + time.sleep(1) + print("---") + + if False: + from random import choice + import numpy as np + print("test sampling") + for j in range(1000): + sample_rate = np.random.random_integers(1,999) + sample_rate_unit = choice(['Seconds', 'Minutes', 'Hours', 'Days', '% of pulses']) + sample_period = np.random.random_integers(1,100) + sample_period_unit = choice(['Seconds', 'Minutes', 'Hours', 'Days', 'Weeks', 'Pulses']) + total_duration = np.random.random_integers(1,100) + total_duration_unit = choice(['Period', 'Seconds', 'Minutes', 'Hours', 'Days', 'Weeks', 'Continuous', 'Points']) + timestamp = choice([False, True]) + sampling_settings = (sample_rate, sample_rate_unit, sample_period, sample_period_unit, total_duration, total_duration_unit, timestamp) + #sampling_settings = (10, 'Seconds', 10, 'Seconds', 1, 'Period', True) + + print('SET : RATE: %s %s, PERIOD %s %s, TOTAL DURATION: %s %s, TIMESTAMP %s' % sampling_settings ) + inst.sampling = sampling_settings + print('GET : : %s %s, %s %s, : %s %s, %s' % inst.sampling) + #time.sleep(1) + print("---") + + + + if False: + print("test trigger") + test = (2,5,10,34,45.45,60.1,60.2,60.3,60.4,60.51,67,99,100,1,) + for i in test: + print('SET : %s' % (i,)) + inst.trigger = i + print('Get : %s' % (inst.trigger,)) + time.sleep(1) + print("---") + + #TODO : No error, but how to test it really? + if False: + print("test set_statistics") + test = ('disable', 'enable', 'reset') + for i in test: + print('SET : %s' % (i,)) + set_statistics = i + time.sleep(1) + print("---") + + if False: + print("test anticipation mode") + test = (True, False,) + for i in test: + print('SET : %s' % (i,)) + inst.anticipation = i + print('Get : %s' % (inst.anticipation,)) + time.sleep(1) + print("---") + + if False: + print("test set_zero") + test = ('off', 'on', 'undo') + for i in test: + print('SET : %s' % (i,)) + inst.set_zero = i + time.sleep(1) + print("---") + + if False: + print("test idn") + print(inst.idn) + time.sleep(1) + print("---") + + if False: + print("test status") + print(inst.status) + time.sleep(1) + print("---") + + if False: + print("test battery") + print(inst.battery) + time.sleep(1) + print("---") + + if False: + print("test clock (setter)") + inst.clock = time.localtime() + + if False: + print("test backlight mode") + test = (True, False,) + for i in test: + print('SET : %s' % (i,)) + inst.backlight = i + time.sleep(1) + print("---") + + if False: + print("test analog_port mode") + test = (True, False,) + for i in test: + print('SET : %s' % (i,)) + inst.analog_port = i + print('Get : %s' % (inst.analog_port,)) + time.sleep(1) + print("---") +