Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Widgets and runviewer parser for DummyIntermediateDevice #53

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 189 additions & 20 deletions labscript_devices/DummyIntermediateDevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,67 @@
# This file represents a dummy labscript device for purposes of testing BLACS
# and labscript. The device is a Intermediate Device, and can be attached to
# a pseudoclock in labscript in order to test the pseudoclock behaviour
# without needing a real Intermediate Device.
#
# without needing a real Intermediate Device.
#
# You can attach an arbitrary number of outputs to this device, however we
# currently only support outputs of type AnalogOut and DigitalOut. I would be
# currently only support outputs of type AnalogOut and DigitalOut. It would be
# easy to extend this is anyone needed further functionality.


from labscript_devices import labscript_device, BLACS_tab, BLACS_worker
from labscript import IntermediateDevice, DigitalOut, AnalogOut, config
from labscript_devices import (
BLACS_tab,
runviewer_parser,
)
from labscript import (
IntermediateDevice,
DigitalOut,
AnalogOut,
config,
set_passed_properties,
)
from labscript_devices.NI_DAQmx.utils import split_conn_AO, split_conn_DO
import numpy as np
import labscript_utils.h5_lock # noqa: F401
import h5py

from blacs.device_base_class import DeviceTab
from blacs.tab_base_classes import Worker


class DummyIntermediateDevice(IntermediateDevice):

description = 'Dummy IntermediateDevice'
clock_limit = 1e6

# If this is updated, then you need to update generate_code to support whatever types you add
# If this is updated, then you need to update generate_code to support whatever
# types you add
allowed_children = [DigitalOut, AnalogOut]

def __init__(self, name, parent_device, BLACS_connection='dummy_connection', **kwargs):
self.BLACS_connection = BLACS_connection
@set_passed_properties(
property_names={
"connection_table_properties": [
"AO_range",
"num_AO",
"ports",
"clock_limit",
],
"device_properties": [],
}
)
def __init__(
self,
name,
parent_device=None,
AO_range=[-10.0, 10.0],
num_AO=4,
ports={'port0': {'num_lines': 32, 'supports_buffered': True}},
clock_limit=1e6,
**kwargs,
):
self.AO_range = AO_range
self.num_AO = num_AO
self.ports = ports if ports is not None else {}
self.clock_limit = clock_limit
self.BLACS_connection = 'dummy_connection'
IntermediateDevice.__init__(self, name, parent_device, **kwargs)

def generate_code(self, hdf5_file):
Expand All @@ -54,43 +94,172 @@ def generate_code(self, hdf5_file):
device_dtype = np.int8
elif isinstance(device, AnalogOut):
device_dtype = np.float64
dtypes.append((device.name, device_dtype))
dtypes.append((device.connection, device_dtype))

# create dataset
out_table = np.zeros(len(times), dtype=dtypes)
for device in self.child_devices:
out_table[device.name][:] = device.raw_output
out_table[device.connection][:] = device.raw_output

group.create_dataset('OUTPUTS', compression=config.compression, data=out_table)


from blacs.device_base_class import DeviceTab
from blacs.tab_base_classes import Worker

@BLACS_tab
class DummyIntermediateDeviceTab(DeviceTab):
def initialise_GUI(self):
self.create_worker("main_worker",DummyIntermediateDeviceWorker,{})
# Get capabilities from connection table properties:
connection_table = self.settings['connection_table']
properties = connection_table.find_by_name(self.device_name).properties

num_AO = properties['num_AO']
# num_DO = properties['num_DO']
ports = properties['ports']

AO_base_units = 'V'
if num_AO > 0:
AO_base_min, AO_base_max = properties['AO_range']
else:
AO_base_min, AO_base_max = None, None
AO_base_step = 0.1
AO_base_decimals = 3

# Create output objects:
AO_prop = {}
for i in range(num_AO):
AO_prop['ao%d' % i] = {
'base_unit': AO_base_units,
'min': AO_base_min,
'max': AO_base_max,
'step': AO_base_step,
'decimals': AO_base_decimals,
}

DO_proplist = []
DO_hardware_names = []
for port_num in range(len(ports)):
port_str = 'port%d' % port_num
port_props = {}
for line in range(ports[port_str]['num_lines']):
hardware_name = 'port%d/line%d' % (port_num, line)
port_props[hardware_name] = {}
DO_hardware_names.append(hardware_name)
DO_proplist.append((port_str, port_props))

# Create the output objects
self.create_analog_outputs(AO_prop)

# Create widgets for outputs defined so far (i.e. analog outputs only)
_, AO_widgets, _ = self.auto_create_widgets()

# now create the digital output objects one port at a time
for _, DO_prop in DO_proplist:
self.create_digital_outputs(DO_prop)

# Manually create the digital output widgets so they are grouped separately
DO_widgets_by_port = {}
for port_str, DO_prop in DO_proplist:
DO_widgets_by_port[port_str] = self.create_digital_widgets(DO_prop)

# Auto place the widgets in the UI, specifying sort keys for ordering them:
widget_list = [("Analog outputs", AO_widgets, split_conn_AO)]
for port_num in range(len(ports)):
port_str = 'port%d' % port_num
DO_widgets = DO_widgets_by_port[port_str]
name = "Digital outputs: %s" % port_str
if ports[port_str]['supports_buffered']:
name += ' (buffered)'
else:
name += ' (static)'
widget_list.append((name, DO_widgets, split_conn_DO))
self.auto_place_widgets(*widget_list)

# Create and set the primary worker
self.create_worker(
"main_worker",
DummyIntermediateDeviceWorker,
{
'Vmin': AO_base_min,
'Vmax': AO_base_max,
'num_AO': num_AO,
'ports': ports,
'DO_hardware_names': DO_hardware_names,
},
)
self.primary_worker = "main_worker"

# Set the capabilities of this device
self.supports_remote_value_check(False)
self.supports_smart_programming(False)


class DummyIntermediateDeviceWorker(Worker):
def init(self):
pass

def get_output_table(self, h5file, device_name):
"""Return the OUTPUT table from the file, or None if it does not exist."""
with h5py.File(h5file, 'r') as hdf5_file:
group = hdf5_file['devices'][device_name]
try:
return group['OUTPUTS'][:]
except KeyError:
return None

def program_manual(self, front_panel_values):
return front_panel_values
return front_panel_values

def transition_to_buffered(self, device_name, h5file, initial_values, fresh):
return initial_values
# Get the data to be programmed into the output tasks:
outputs = self.get_output_table(h5file, device_name)

# Collect the final values of the outputs
final_values = dict(zip(outputs.dtype.names, outputs[-1]))

def transition_to_manual(self,abort = False):
return final_values

def transition_to_manual(self, abort=False):
return True

def abort_transition_to_buffered(self):
return self.transition_to_manual(True)

def abort_buffered(self):
return self.transition_to_manual(True)

def shutdown(self):
pass
pass


@runviewer_parser
class DummyIntermediateDeviceParser(object):
def __init__(self, path, device):
self.path = path
self.name = device.name
self.device = device

def get_traces(self, add_trace, clock=None):
times, clock_value = clock[0], clock[1]

clock_indices = np.where((clock_value[1:] - clock_value[:-1]) == 1)[0] + 1
# If initial clock value is 1, then this counts as a rising edge (clock should
# be 0 before experiment) but this is not picked up by the above code. So we
# insert it!
if clock_value[0] == 1:
clock_indices = np.insert(clock_indices, 0, 0)
clock_ticks = times[clock_indices]

# Get the output table from the experiment shot file
with h5py.File(self.path, 'r') as hdf5_file:
outputs = hdf5_file[f"devices/{self.name}/OUTPUTS"][:]

traces = {}

for channel in outputs.dtype.names:
traces[channel] = (clock_ticks, outputs[channel])

for channel_name, channel in self.device.child_list.items():
if channel.parent_port in traces:
trace = traces[channel.parent_port]
add_trace(channel_name, trace, self.name, channel.parent_port)

return {}