-
Notifications
You must be signed in to change notification settings - Fork 0
/
AATC_Drone.py
267 lines (203 loc) · 9 KB
/
AATC_Drone.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
import socket,codecs,ast,recvall,sys,AATC_Crypto, AATC_Coordinate
class DroneInterface:
"""Interface with which to allow a Drone to communicate with the server.
Login using DroneCredentials. These can be stored on Drone eg sd card
!!! Will probably not allow flight abortion in air as drone may not have connection !!!
Updating drone status could be run on a different thread if nessesary.
Updating Drone status sends the coordinates and last battery to the server for reference.
Check for flight will get the next flight in the next MaxLookAheadTime seconds eg 1800 for 30min.
This will return a flightId which can be used to obtain the full flight details.
GetFlight and GetFlightWaypoints will get the data for the flight from the respective table.
Coordinates are stored in the form '(x,y,z)' as so must be converted to tuples first.
Mark Flight Complete will tell the server that the drone has sucessfully completed the flight.
Maybe Drone can calculate battery needed and can wait charge if low on battery
Eg calculate trip distance and get range*battery, if not enough, wait and charge.
"""
def __init__(self,Connection):
self._con = Connection
self._Crypto = AATC_Crypto.Crypter(self._con)
self._DroneName = ""
print("Welcome to the AATC Connection interface")
def Send(self,Code,data):
Info = self._Crypto.Encrypt(codecs.encode(str((Code,data))))
self._con.sendall(Info)
def Recv(self): #Returns tuple of Sucess,Message,Data of which data may just be useless for that function
try:
data = self._Crypto.Decrypt(recvall.recvall(self._con))
data = ast.literal_eval(codecs.decode(data))
# Sucess, Message , Data
return data[0],data[1],data[2]
except Exception as e:
print("Socket data receive error",e)
return (False,"Conversion/Transfer Error"+str(e),[])
#############################
def Login(self,DroneID,DronePassword):
self.Send("Login",(DroneID,DronePassword))
Sucess,Message,_ = self.Recv()
return Sucess,Message
#############################
def UpdateDroneStatus(self,LastCoords,LastBattery):
self.Send("UpdateDroneStatus",(LastCoords,LastBattery))
Sucess,Message,_ = self.Recv()
return Sucess,Message
############################
def DroneGetDroneInfo(self):
self.Send("DroneGetDroneInfo",())
Sucess,Message,DroneInfo = self.Recv()
return Sucess,Message,DroneInfo
############################
def CheckForFlight(self,MaxLookAheadTime = 1800):
self.Send("CheckForFlight",(MaxLookAheadTime,))
Sucess,Message,FlightID = self.Recv() #FlightID in form [(FlightID,)]
return Sucess,Message,FlightID
def GetFlight(self,FlightID):
self.Send("GetFlight",(FlightID,))
Sucess,Message,Flight = self.Recv()
return Sucess,Message,Flight
def GetFlightWaypoints(self,FlightID):
self.Send("GetFlightWaypoints",(FlightID,))
Sucess,Message,FlightWaypoints = self.Recv()
return Sucess,Message,FlightWaypoints
def MarkFlightComplete(self,FlightID,Code):
self.Send("MarkFlightComplete",(FlightID,Code))
Sucess,Message,_ = self.Recv()
return Sucess,Message
def Exit(self):
self.Send("Exit",())
Sucess,Message,_ = self.Recv()
self._con.close()
return Sucess,Message
class Flight: #Stores details about a flight
def __init__(self,FlightID,DroneID,StartCoord,EndCoord,StartTime,ETA,Distance):
self._FlightID = FlightID
self._DroneID = DroneID
self._StartCoord = StartCoord
self._EndCoord = EndCoord
self._StartTime = StartTime
self._ETA = ETA
self._Distance = Distance
def Get_FlightID(self):
return self._FlightID
def Get_DroneID(self):
return self._DroneID
def Get_StartCoord(self):
return self._StartCoord
def Get_EndCoord(self):
return self._EndCoord
def Get_StartTime(self):
return self._EndCoord
def Get_ETA(self):
return self._ETA
def Get_Distance(self):
return self._Distance
class Waypoint: #Stores details about a waypoint
def __init__(self,FlightID,WaypointNumber,Coord,ETA):
self._FlightID = FlightID
self._WaypointNumber = WaypointNumber
self._Coord = Coord
self._ETA = ETA
def Get_FlightID(self):
return self._FlightID
def Get_WaypointNumber(self):
return self._WaypointNumber
def Get_Coord(self):
return self._Coord
def Get_ETA(self):
return self._ETA
def ConvertCoordString(CoordString): #Converts a string coordinate to a coordinate object
CoordTuple = ast.literal_eval(CoordString)
Coord = AATC_Coordinate.Coordinate(CoordTuple[0],CoordTuple[1],CoordTuple[2],0,0,0)
return Coord
def GetFlightObject(Message,Data): #Generates the flight object
Data = Data[0] # as Data = [(Stuff,stuff,even more stuff)]
Columns = ast.literal_eval(Message)
FlightIDIndex = Columns.index("FlightID")
DroneIDIndex = Columns.index("DroneID")
StartCoordsIndex = Columns.index("StartCoords")
EndCoordsIndex = Columns.index("EndCoords")
StartTimeIndex = Columns.index("StartTime")
ETAIndex = Columns.index("ETA")
DistanceIndex = Columns.index("Distance")
FlightID = Data[FlightIDIndex]
DroneID = Data[DroneIDIndex]
## StartCoord = ConvertCoordString(Data[StartCoordsIndex])
## EndCoord = ConvertCoordString(Data[EndCoordsIndex])
StartCoord = Waypoint(FlightID,-1,ConvertCoordString(Data[StartCoordsIndex]),-1)
EndCoord = Waypoint(FlightID,-2,ConvertCoordString(Data[EndCoordsIndex]),-1)
StartTime = Data[FlightIDIndex]
ETA = Data[FlightIDIndex]
Distance = Data[DistanceIndex]
FlightObj = Flight(FlightID,DroneID,StartCoord,EndCoord,StartTime,ETA,Distance)
return FlightObj
def GetWaypointObjects(Message,Data): #Generates the waypoint objects
WaypointList = []
Columns = ast.literal_eval(Message)
FlightIDIndex = Columns.index("FlightID")
WaypointNumberIndex = Columns.index("WaypointNumber")
CoordIndex = Columns.index("Coords")
ETAIndex = Columns.index("ETA")
for point in Data:
FlightID = point[FlightIDIndex]
WaypointNumber = point[WaypointNumberIndex]
Coord = ConvertCoordString(point[CoordIndex])
ETA = point[ETAIndex]
waypoint = Waypoint(FlightID,WaypointNumber,Coord,ETA)
WaypointList.append(waypoint)
WaypointList = sorted(WaypointList, key = lambda x:x.Get_WaypointNumber())
## z = []
## for item in WaypointList: #Sorts into waypoint order
## z.append((item.WaypointNumber,item))
## heapq.heapify(z)
## WaypointList = []
## for item in z:
## WaypointList.append(item[1])
return WaypointList
def MakeDroneInfo(DroneMessage,DroneData):
return DroneInformation(DroneMessage,DroneData)
class DroneInformation: #Stores information about the drone to be simulated
def __init__(self,Message,DroneInfo):
DroneInfo = DroneInfo[0]
Message = ast.literal_eval(Message)
ColumnValue = {}
Titles = ["DroneID","UserID","DroneName","DroneType","DroneSpeed","DroneRange","DroneWeight"]
for title in Titles:
ColumnValue[title] = DroneInfo[Message.index(title)]
self._DroneID = ColumnValue["DroneID"]
self._UserID = ColumnValue["UserID"]
self._DroneName = ColumnValue["DroneName"]
self._DroneType = ColumnValue["DroneType"]
self._DroneSpeed = ColumnValue["DroneSpeed"]
self._DroneRange = ColumnValue["DroneRange"]
self._DroneWeight = ColumnValue["DroneWeight"] #Imports the drone information from the string
def Get_DroneID(self):
return self._DroneID
def Get_UserID(self):
return self._UserID
def Get_DroneName(self):
return self._DroneName
def Get_DroneType(self):
return self._DroneType
def Get_DroneSpeed(self):
return self._DroneSpeed
def Get_DroneRange(self):
return self._DroneRange
def Get_DroneWeight(self):
return self._DroneWeight
def Connect(remote_ip,PORT): #Connects to the server
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.connect((remote_ip, PORT))
print("Connected to > "+ remote_ip+":"+str(PORT))
return s
except Exception as e:
print("Error binding port")
print(e)
print("Check address and port is up")
print("Otherwise check server is functional")
print("Exiting...")
sys.exit()
def CreateDroneInterface(IP = "127.0.0.1",Port = 8002): #Creates a drone interface connection to the server
soc = Connect(IP,Port)
D = DroneInterface(soc)
return D