Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch 2 #1

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cal4at3
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#calibrate 4 channels + 13 to 3L/min


import serial
import time

ser = serial.Serial('com3')
ser.boudrate = 9600
ser.flush()

print("Calibrating")
setFlow 0:3;1:3;2:3;3:3;4:3;13:3;
print("Calibration Complete")
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# olfactometer
Set of python scripts used to automate olfacotmeter tasks at SWRI
Set of python scripts used to automate olfacotmeter tasks at SWRI.
All experiments will start and end by emitting clean air for 1 second, so that no cross-contamination may occur between experiments

USAGE

DEPENDENCIES
- pyserial: helps python interact with serial ports. can be installed with "pip install pyserial" on the computer that will use the olfactometer.
- time: helps add pauses between opening valves when needed. As far as I am aware, this module comes with any default python installation.
SCRIPTS
20 changes: 20 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import serial
import time
ser = serial.Serial('com4') #defines the port that will be used (how do we know that?)
ser.boudrate = 9600 #sets the speed at which data will be transferred to ARDUINO (default?)
ser.flush() # flushes the port buffer
input("Press any key to start\n") #wait
print("Now we are going to generate an odour on channel 1")
print('The odour "duration" of 200ms is controlled by Sniff-0 ')
input("Press any key to generate the odour\n")
ser.write(b'setchannel 1\r')
ser.write(b'openvalvetimed 200\r')
print("Now the clean air channel is going to be opened")
print("Clean air will be delivered for 1000ms")
input("Press any key to open clean air\n")
ser.write(b'setchannel 0\r')
ser.write(b'setvalve 1\r')
time.sleep(1)
ser.write(b'setvalve 0\r')
print("End")
ser.close()
10 changes: 10 additions & 0 deletions experiments_yaml/clean.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
parameters:
port: "com3"
constant_flow_channel_id: 0
constant_flow_rate: 2
calibration: no

clean:
seconds: 20
channels:
13: 1
23 changes: 23 additions & 0 deletions experiments_yaml/expt1.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
parameters:
port: "com3"
constant_flow_channel_id: 0
constant_flow_rate: 2
calibration: no

clean:
seconds: 20
channels:
13: 1

test1:
seconds: 20
channels:
1: 1
2: 1
3: 1






158 changes: 158 additions & 0 deletions olfactometer_yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import serial
import argparse
import yaml
import sys
import time

def parse():
parser = argparse.ArgumentParser()
parser.add_argument('-i',"--input",help="Name of the yaml file with the experiment",default=None)
args = parser.parse_args()
return args

def set_parameters(yml):
if list(yml.keys())[0]!="parameters": # In python 3, dict_keys object is not subscriptable
print ("First key in input file is not parameters. Exiting.")
sys.exit()
else:
port=yml["parameters"]["port"]
constant_flow_rate=yml["parameters"]["constant_flow_rate"]
constant_flow_id=yml["parameters"]["constant_flow_channel_id"]
calibration=yml["parameters"]["calibration"]
print("Olfactometer will be runing from port ", port)
print("Constant flow will come from channel ", constant_flow_id)
print("Constant flow will be set at ", constant_flow_rate," SPLM")
if calibration==True:
print("Channels will be calibrated before starting each experiment.")
else:
print("Calibration will NOT be done for any channels. Make sure it is already done.")
z=input("Press enter to continue or ctrl+C to exit.")
return port, constant_flow_rate, constant_flow_id, calibration

def check_expts(yml):
if yml["parameters"]["calibration"]==False:
print("Calibration has been done externally. Displayed flow rate values may not be true")
for key in list(yml.keys())[1:]: # first key is ALWAYS the parameters
print("Checking step:",key, "...")
print("Run time:",yml[key]["seconds"],"seconds")
for channel in list(yml[key]["channels"].keys()):
print("Channel",channel,"will run at",yml[key]["channels"][channel],"SPLM")
return

def run_calibration(ser,channel,flow):
if ser is None:
print("Cannot verify calibration in emulator mode. Assuming it was successful.")
return True

cmd_str="setverbose 1"
ser_exec(cmd)
cmd_str="setflow "+str(channel_id)+":"+str(flow_i)+";"
ser.exec(cmd)

readback=ser_listen(ser)
if readback is None:
return False

if ("Result" in readback):
print (readback)
readback=ser_listen(ser)
if readback is None:
return False
if ("*OK" in readback):
print('Calibration completed successfully')
return True

def run_expt(yml,ser, constant_flow_rate, constant_flow_id, calibration):
# 0) Calibrate the constant flow channel if required
if calibration is True:
print("calibrating constant flow")
outcome=run_calibration(ser,constant_flow_id,constant_flow_rate)
print(outcome)
if outcome is False:
print("constant flow calibration failed. Exiting.")
return

# 1) Open constant flow
cmd_str="setchannel "+str(constant_flow_id)
ser_exec(ser,cmd_str)

cmd_str="setvalve 1"
ser_exec(ser,cmd_str)

# 2) Run experiments
for key in list(yml.keys())[1:]: # first key is ALWAYS the parameters
z=input("Press Enter to start: "+key)
timeopenvalve=yml[key]["seconds"] #This is the time this experiment will run for

# 2.1) Calibrate flows of odorant channels if required
if calibration is True:
print("calibrating odorant channels")
for channel in list(yml[key]["channels"].keys()):
flow=yml[key]["channels"][channel]
outcome=run_calibration(ser,channel,flow)
print(channel,flow,outcome)
if outcome is False:
print("Calibration of channel",channel,"failed. Exiting.")
return

# 2.2) Open the odorant channels (timed)
time_ms=timeopenvalve*1000
cmd_str="openmultivalvetimed "+str(time_ms)+" "+";".join(map(str,list(yml[key]["channels"].keys())))
ser_exec(ser,cmd_str)

# 2.3) Once all commands are submitted, wait for the execution time + 10 seconds to catch up.
time.sleep(timeopenvalve+10)


# 3) Close the constant flow
cmd_str="setchannel "+str(constant_flow_id)
ser_exec(ser,cmd_str)

cmd_str="setvalve 0"
ser_exec(ser,cmd_str)
return

def ser_exec(ser,cmd_str):
print("WRITING TO SERIAL: ", cmd_str)
if ser is not None:
cmd_bytes=(cmd_str+"\r").encode()
ser.write(cmd_bytes)
return

def ser_listen(ser):
readback=None
waiting_time=0
readback = ser.readline().decode('utf-8')
while(readback is None):
readback = ser.readline().decode('utf-8')
time.sleep(1)
waiting_time+=1
if waiting_time>60:
print("instrument is taking too long to reply. Aborting.")
break
return readback

if __name__=="__main__":
args=parse()
with open(args.input, 'r') as file:
try:
yml = yaml.safe_load(file)
print(yml)
except Exception as error:
print("The input file has formatting errors.")
print(error)
sys.exit()
else:
port, constant_flow_rate, constant_flow_id, calibration=set_parameters(yml)
try:
ser = serial.Serial(port)
except:
print("Serial Port ", port, " cannot be reached.")
print("Running in emulator mode.")
ser=None
z=input("Press enter to continue or CTRL+C to break execution.")
check_expts(yml)
run_expt(yml,ser, constant_flow_rate, constant_flow_id, calibration)