-
Notifications
You must be signed in to change notification settings - Fork 0
/
ghs.da
352 lines (291 loc) · 14.6 KB
/
ghs.da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
import sys
import random
import enum
from collections import deque
import time
from tools import EdgeState, NodeState, Observer, genGraphEdges
INF = 999999
class Agent(process):
def setup(neighs, weights, name, namesDict, obs):
self.state = NodeState.sleep
self.level = 0
self.stach = {} #dictionary <nextNode, EdgeState>
# queue for the delayed <connect> messages
# (to be re-processed if the level increases)
self.connectMessQueue = deque()
# queue for the delayed <test> messages
# (to be re-processed after the receipt of <initiate>)
self.testMessQueue = deque()
# queue for the delayed <report> messages
# (to be re-processed if the node's state changes)
self.reportMessQueue = deque()
for n in neighs: self.stach[n]=EdgeState.basic
self.father = None
self.fragName = name
self.name = name
self.testch = None
self.bestch = None
self.rec = 0 #num of records received from sons
self.bestwt = INF
self.namesDict = namesDict #dict to associates nodes' to their ID
def run():
# 1. INITIALIZATION
# At the beginning, this node's fragment consists only of this node.
# In order to expand the fragment, the node connects to another fragment on its lowest-weight outgoing edge.
minWeightEdge = min(weights, key=weights.get)
self.stach[minWeightEdge] = EdgeState.branch
self.state = NodeState.found
send(('connect',self.level), to=minWeightEdge)
output("Node ", self.name, "sent <connect> to node ", self.namesDict[minWeightEdge])
#await termination of the algorithm to end the process
await(self.state == NodeState.finished)
def cleanConnectsQueue(q):
#removes from the queue all the <connect> messages arrived from q
for i in range(len(self.connectMessQueue)):
m = self.connectMessQueue.popleft()
if m[1] != q: self.connectMessQueue.append(m)
def checkConnectsQueue():
# tries to process delayed <connect> messages;
# returns True if at least one message in the queue is consumed
queueChanged = False
for i in range(len(self.connectMessQueue)):
m = self.connectMessQueue.popleft()
if processConnectMess(m[0], m[1]): queueChanged = True
return queueChanged
def processConnectMess(Lq, q):
#called at the reception of a <connect> message
if Lq < self.level:
# combine with RULE A: the <connect> message comes from a smaller fragment (lower level)
# hence, that fragment must join this node's fragment: an <initiate> message is sent
# and the other fragment's nodes will acquire our level, name and state
self.stach[q] = EdgeState.branch
send(('initiate', self.level, self.fragName, self.state), to=q)
output("Node ", self.name, "sent <initiateA> to node ", self.namesDict[q])
#once this fragment merges with q's one, we discard the previous <connect> msgs from the same fragment
cleanConnectsQueue(q)
return True #message is consumed
elif self.stach[q]==EdgeState.basic:
#process the message later, after the edge has been explored (this node sends <connect> to q)
# or the level has increased
self.connectMessQueue.append((Lq, q))
output("Node ", self.name, "has delayed <connect> message from ", self.namesDict[q])
return False #message is not consumed
else:
#RULE B: the two fragments are of the same level, they reciprocally exchange the following <initiate> message
send(('initiate', self.level+1, self.weights[q], NodeState.find), to=q)
output("Node ", self.name, "sent <initiateB> to node ", self.namesDict[q])
return True
def receive(msg=('connect', Lq), from_=q):
output("Node ", self.name, "received <connect> from node ", self.namesDict[q])
#2. RECEIPT OF <CONNECT> MESSAGE
processConnectMess(Lq, q)
def findMinBasicEdge():
# returns the basic edge with the minimum weight. Used in test() procedure
minw = INF
candidate = None
for destNode, stc in self.stach.items():
if stc==EdgeState.basic and self.weights[destNode] < minw:
minw = self.weights[destNode]
candidate = destNode
return candidate
def countBranchEdges():
# returns the number of 'branch' edges that don't lead to the node's father
# Used in procedure report()
num = 0
for destNode, stc in self.stach.items():
if (stc == EdgeState.branch and destNode != self.father): num+=1
return num
def names(l):
# Takes as input a list of nodes' objects, returns a list of the correspondig IDs.
# Used for pretty printing in the receive('initiate') function, when the node is
# informing the subtree about the new fragment's infos
toReturn = []
for a in l: toReturn.append(self.namesDict[a])
return toReturn
def processTestMsg(Lq, FNq, q):
# procedure to process a <test> message
if Lq > self.level:
# process is delayed because this node and q could be in the same fragment,
# but this node may still not know that
self.testMessQueue.append((Lq,FNq,q))
output("Node ", self.name, "has delayed <test> message from ", self.namesDict[q])
return False #message is not consumed
elif FNq == self.fragName:
#received from node in the same fragment
if self.stach[q]==EdgeState.basic: self.stach[q]=EdgeState.reject #reject edges in the same fragment
if self.testch != q: # send <reject> only if we didn't send <test> before (avoids redundancy)
send('reject', to=q)
output("Node ", self.name, "sent <reject> to node ", self.namesDict[q])
else: test() # received <test> msg (from node in same fragment) on the same channel that we were testing:
# continue search on other edges
return True #message is consumed
else:
#q has found the best local outgoing edge
send('accept', to=q)
output("Node ", self.name, "sent <accept> to node ", self.namesDict[q])
return True #message is consumed
def checkTestMessQueue():
# tries to process delayed <test> messages;
# returns True if at least one message in the queue is consumed
queueChanged = False
for i in range(len(self.testMessQueue)):
mess = self.testMessQueue.popleft()
Lq = mess[0]
FNq = mess[1]
q = mess[2]
if self.processTestMsg(Lq, FNq, q): queueChanged = True
return queueChanged
def changeroot():
#10. Procedure called when the fragment has found its lowest-weight outgoing edge and the <report> edge has reached
# the core node on whose side this edge is located. The procedure sends a <changeroot> message that is forwarded
# through the framment's nodes on their respective 'bestch' towards the leaf incident to the lowest-weight outgoing edge.
if self.stach[self.bestch] == EdgeState.branch:
send('changeroot', to=self.bestch)
output("Node ", self.name, "sent <changeroot> to node ", self.namesDict[self.bestch])
else:
# when the <changeroot> message reaches the leaf incident to the lowest-weight outgoing edge, this node
# sends a <connect> on the edge to start unifying with the new fragment.
send (('connect', self.level), to=self.bestch)
self.stach[self.bestch] = EdgeState.branch
output("Node ", self.name, "sent <connect> to node ", self.namesDict[self.bestch], " in changeroot process.")
checkConnectsQueue()
def processReportFromFather(w):
# the only nodes that receive reports from their fathers are the core nodes
# (each one is father of the other one)
if self.state == NodeState.find:
# this node is still waiting for its sons' reports: delay the message
self.reportMessQueue.append(w)
output("Node ", self.name, "has delayed <report> message ")
return False #message not processed
elif w > self.bestwt:
# this is the core node on the side of the leaf incident to the
# lowest-weight outgoing edge: start changeroot()
changeroot()
return True #message is processed
elif (w == self.bestwt and w == INF):
# the fragment's search of the lowest weight outgoing edge has terminated without
# finding any other possible edge: algorithm has terminated
output("FINISHED")
send('complete', to=obs) #inform the observer process (only to plot the SPT)
return True
def checkReportsQueue():
#process delayed reports, after node state changes to 'found'
queueChanged = False
for i in range(len(self.reportMessQueue)):
w = self.reportMessQueue.popleft()
if processReportFromFather(w): queueChanged= True
return queueChanged
def report():
#8. REPORT TO FATHER THAT A LOWEST WEIGHT OUTGOING EDGE HAS BEEN FOUND
output("Node ", self.name, " called report() procedure")
if(self.rec == self.countBranchEdges() and (self.testch is None)):
# report only if I have received from all my sons and if i have finished my local search
self.state = NodeState.found
send(('report', self.bestwt), to=self.father)
output("Node ", self.name, "sent <report> to node ", self.namesDict[self.father], " (father)")
checkReportsQueue() #since state has changes to 'found', process the delayed reports
def test():
#4. TEST PROCEDURE: local search for the lowest-weight outgoing edge
self.testch = findMinBasicEdge()
if self.testch is None:
# if there aren't any other edges to explore
report()
else:
# else, explore best edge
send(('test', self.level, self.fragName), to=self.testch)
output("Node ", self.name, "sent <test> to node ", self.namesDict[self.testch])
def checkAllQueues():
someQueueChanged = True
while(someQueueChanged):
someQueueChanged = False
if checkTestMessQueue(): someQueueChanged = True
if checkConnectsQueue(): someQueueChanged = True
if checkReportsQueue(): someQueueChanged = True
def receive(msg=('initiate', Lq, FNq, Sq), from_=q):
#3. RECEIPT OF <INITIATE> MESSAGE: union of 2 fragments
output("Node ", self.name, "received <initiate> from node ", self.namesDict[q])
self.level = Lq
self.fragName = FNq
self.state= Sq
self.father = q
self.bestch = None
self.bestwt = INF
dests = [n for n in neighs if (n != q and self.stach[n]==EdgeState.branch)]
send(('initiate', Lq, FNq, Sq), to=dests) #send to entire subtree
output("Node ", self.name, "sent <initiateA> to neighbors ", names(dests))
if self.state == NodeState.find:
# new fragment gets involved in the lowest-weight outgoing edge search
self.rec = 0
test()
#### when the level of the node changes, re-process delayed messages
checkAllQueues()
def receive(msg=('test', Lq, FNq), from_=q):
#5. Receipt of <test> message
output("Node ", self.name, "[FN: ",self.fragName, "]", " received <test> from node ", self.namesDict[q], " with FN= ", FNq)
self.processTestMsg(Lq, FNq, q)
def receive(msg=('accept'), from_=q):
#6. Answer to <test> message, q is in another fragment: we found the local best edge.
output("Node ", self.name, "received <accept> from node ", self.namesDict[q])
self.testch = None
if self.weights[q] < self.bestwt:
self.bestwt = self.weights[q]
self.bestch = q
report()
def receive(msg=('reject'), from_=q):
#7 Answer to <test> message, q is in the same fragment.
output("Node ", self.name, "received <reject> from node ", self.namesDict[q])
if self.stach[q]==EdgeState.basic:
self.stach[q]=EdgeState.reject #exclude edge from SPT
checkConnectsQueue() #<connect> msgs coming from the rejected edge will be removed from the queue
test() #continue search
def receive(msg=('report', w), from_=q):
#9. Receive report
output("Node ", self.name, "received <report> from node ", self.namesDict[q])
if q != self.father:
#reply at <initiate> message
if(w < self.bestwt):
self.bestwt = w
self.bestch = q
self.rec += 1
report()
else:
#pq is the core edge
processReportFromFather(w)
def receive(msg=('changeroot'), from_=q):
# Process <changeroot> message
output("Node ", self.name, "received <changeroot> from node ", self.namesDict[q])
changeroot()
def receive(msg=('obsQuery')):
# Receive query from Observer and answer with local data useful to bul, the complete SPT
send(('answer', self.stach, weights), to=obs)
self.state = NodeState.finished
def main():
NUM_NODES = 20
agents = list(new(Agent, num=NUM_NODES))
nodesDic = {} #dictionary used from agents to associate nodes to their number (just for pretty printing)
for k in range(len(agents)):
nodesDic[agents[k]]=k
#generate random graph with random weights
topo = genGraphEdges(NUM_NODES)
weights = random.sample(range(1,3000), len(topo))
#create and start Observer process, which will gather the data on the SPT
# when the algorithm is completed and will plot the tree
obs = new(Observer)
setup(obs, (agents, topo, weights, nodesDic))
start(obs)
#setup the agents
for k in range(len(agents)):
neighs_k = []
weights_k = {}
for i in range(len(topo)):
u = topo[i][0]
v = topo[i][1]
if(u==k):
neighs_k.append(agents[v])
weights_k[agents[v]] = weights[i]
elif v==k:
neighs_k.append(agents[u])
weights_k[agents[u]] = weights[i]
setup(agents[k], args=(neighs_k, weights_k, k, nodesDic, obs))
#start the algorithm
start(agents)