-
Notifications
You must be signed in to change notification settings - Fork 2
/
mmmComm.m
290 lines (261 loc) · 11 KB
/
mmmComm.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
classdef mmmComm < handle
%mmmComm
%
% Based directly on srv.StimulusControl. Adds Alyx handling.
%
% Interface to, and info about a remote rig setup
% This interface is used and mc to communicate with
% one another. The data are sent over TCP/IP through a java Web Socket
% (net.entropy_mill.websocket). This object can be used to send
% arbitraty data over the network. It is used by expServer to send a
% receive parrameter structures and status updates in the form of
% strings.
%
% NB: This class replaces SRV.REMOTERIG. See also SRV.SERVICE,
% IO.WSJCOMMUNICATOR, EUI.MCONTROL
%
% Part of Rigbox
% 2013-06 CB created
% 2018-01 NS extended
properties
Uri %
Services = {} % List of remote services that are to be interfaced with during an experiment. Should be a list of string ids or hostnames
SelectedServices % Logical array the size of Services, indicating which services to use (set by eui.MControl)
Name % The name of the remote rig, usually the host name
ExpPreDelay = 0 % The delay in seconds before the experiment is to start, set by mc
ExpPostDelay = 0 % The delay in seconds after the experiment has ended before data are saved, listeners deleted, etc. Set by mc
ResponseTimeout = 15 % Time to wait for a response from the remote host before throwing an error
end
properties (Dependent = true)
%current status of the rig:
%'disconnected' if not currently connected, 'idle' if connected but no
%active services on the rig, 'active' if any services are currently
%running
Status
ExpRunnning %Reference of currently running experiment, if any/known
end
properties (Transient, SetAccess = protected, Hidden)
Socket
hSocket%handle to java socket
NextMsgId = 0
Responses %Map from message IDs to responses
LogTimes = zeros(10000,2)
LogCount = 0
end
properties (Transient, Hidden)
AlyxInstance = [] % Property to store rig specific Alyx token
end
properties (Constant)
DefaultPort = 2014
end
events
Connected
Disconnected
ExpStarting
ExpStarted
ExpStopped
ExpUpdate
AlyxRequest
AlyxSend
end
methods (Static)
function s = create(name, uri)
if nargin < 2
uri = name;
end
s = mmmComm;
s.Name = name;
if isempty(regexp(uri, '^ws://', 'once'))
uri = ['ws://' uri]; %default protocol prefix
end
if isempty(regexp(uri, '^ws://.+:\d+$', 'once'))
uri = sprintf('%s:%i', uri, s.DefaultPort); %default port suffix
end
s.Uri = uri;
end
end
methods
function s = char(obj)
s = obj.Name;
end
function value = get.Status(obj)
if ~connected(obj)
value = 'disconnected';
else
r = exchange(obj, {'status'});
value = r{1};
end
end
function value = get.ExpRunnning(obj)
value = []; % default to empty means none
if connected(obj)
r = obj.exchange({'status'});
if strcmp(r{1}, 'running')
value = r{2};
end
end
end
function quitExperiment(obj, immediately)
if nargin < 2
immediately = false;
end
r = obj.exchange({'quit', immediately, obj.AlyxInstance});
obj.errorOnFail(r);
end
function startExperiment(obj, expRef)
%startExperiment
%Ensure the experiment ref exists
assert(dat.expExists(expRef), 'Experiment ref ''%s'' does not exist', expRef);
preDelay = obj.ExpPreDelay;
postDelay = obj.ExpPostDelay;
r = obj.exchange({'run', expRef, preDelay, postDelay, obj.AlyxInstance});
obj.errorOnFail(r);
end
function connect(obj, block)
if nargin < 2
block = false;
end
if ~connected(obj)
obj.Responses = containers.Map;
if isempty(obj.Socket)
[obj.Socket, obj.hSocket] = webSocket(obj);
end
obj.Socket.connect();
if block
%wait until connected (or timeout elapsed)
timeoutMs = 1000*obj.ResponseTimeout;
t = systime;
while (systime - t < timeoutMs) &&...
~obj.Socket.isOpen()
pause(20e-3);
end
% pause(0.6); %bug: need to wait before allow messages to be sent
assert(obj.Socket.isOpen(),...
'Could not connect to ''%s''', obj.Uri);
end
end
end
function disconnect(obj)
if ~isempty(obj.Socket)% && obj.Socket.isOpen()
obj.Socket.close();
pause(15e-3); % pause briefly to let any evoked callbacks run
set(obj.hSocket, 'BinaryReceivedCallback', [],...
'ClosedCallback', [], 'OpenedCallback', []);% clear callbacks
delete(obj.hSocket);% delete the handle
obj.hSocket = [];
obj.Responses = [];
obj.Socket = [];
end
end
function delete(obj)
disconnect(obj);
end
end
methods %(Access = protected)
function b = connected(obj)
b = ~isempty(obj.Socket) && obj.Socket.isOpen();
end
function [sock, hSock] = webSocket(obj)
% connect to a WebSocket client
sock = net.entropy_mill.websocket.Client(obj.Uri); % cb-tools\java\net\entropy_mill\websocket\Client.class
hSock = handle(sock, 'CallbackProperties');
set(hSock,...
'BinaryReceivedCallback', @obj.onWSReceived,...
'ClosedCallback', @obj.onWSClosed,...
'OpenedCallback', @obj.onWSOpened);
end
function onWSReceived(obj, ~, eventArgs)
packet = hlp_deserialize(typecast(eventArgs.getMessage(), 'uint8'));
id = packet.id;
data = packet.data;
if isKey(obj.Responses, id)% response to a previous call
obj.Responses(id) = data;
else% route notification & misc messages
switch id
case 'signals'
% fprintf('% i signal updates received\n', numel(data));
notify(obj, 'ExpUpdate', srv.ExpEvent('signals', [], data));
case 'status'
type = data{1};
switch type
case 'starting'
%experiment about to start
ref = data{2};
notify(obj, 'ExpStarting', srv.ExpEvent('starting', ref));
case 'completed'
%experiment stopped without any exceptions
ref = data{2};
notify(obj, 'ExpStopped', srv.ExpEvent('completed', ref));
case 'expException'
%experiment stopped with an exception
ref = data{2}; err = data{3};
notify(obj, 'ExpStopped', srv.ExpEvent('exception', ref, err));
case 'update'
ref = data{2}; args = data(3:end);
if strcmp(args{1}, 'event') && strcmp(args{2}, 'experimentInit')
notify(obj, 'ExpStarted', srv.ExpEvent('started', ref));
end
notify(obj, 'ExpUpdate', srv.ExpEvent('update', ref, args));
% if numel(args) > 0 && strcmpi(args{1}, 'inputSensorPos')
% trec = GetSecs;
% tsent = args{3};
% obj.LogCount = obj.LogCount + 1;
% obj.LogTimes(obj.LogCount,:) = [trec tsent];
% end
end
case 'AlyxRequest'
% expServer requested the AlyxInstance
ref = data; % expRef
% send AlyxInstance to experiment server
r = obj.exchange({'updateAlyxInstance', obj.AlyxInstance});
obj.errorOnFail(r);
% notify listeners of request for AlyxInstance
notify(obj, 'AlyxRequest', srv.ExpEvent('AlyxRequest', ref));
case 'AlyxSend'
ai = data{1};
notify(obj, 'AlyxSend', srv.ExpEvent('AlyxSend', [], ai));
end
end
end
function onWSOpened(obj, ~, ~)
% disp('connected');
notify(obj, 'Connected');
end
function onWSClosed(obj, ~, ~)
% disp('disconnected');
notify(obj, 'Disconnected');
disconnect(obj);
end
function send(obj, id, data)
packet.id = id;
packet.data = data;
bytes = hlp_serialize(packet);
obj.Socket.send(bytes);
end
function response = exchange(obj, message)
id = num2str(obj.NextMsgId);
obj.NextMsgId = obj.NextMsgId + 1;
obj.Responses(id) = nil; % empty place holder means awaiting
% send the the command to make the function call
send(obj, id, message);
% wait for response
response = waitForMessage(obj, id);
end
function msg = waitForMessage(obj, id)
%wait until message with id arrives (or timeout elapsed)
timeoutMs = 1000*obj.ResponseTimeout;
t = systime;
while isNil(obj.Responses(id)) && (systime - t < timeoutMs)
pause(1e-3);
end
msg = obj.Responses(id);
assert(~isNil(msg), 'Timed out waiting for message with id ''%s''', id);
remove(obj.Responses, id); % no longer waiting, remove place holder
end
function errorOnFail(obj, r)
if iscell(r) && strcmp(r{1}, 'fail')
error(r{3});
end
end
end
end