Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added communication library #283

Merged
merged 16 commits into from
Mar 26, 2024
3 changes: 3 additions & 0 deletions modules.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,8 @@
"tabs": [
"physics_2d"
]
},
"communication": {
"tabs": []
}
}
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@
"ace-builds": "^1.25.1",
"classnames": "^2.3.1",
"dayjs": "^1.10.4",
"events": "^3.3.0",
"gl-matrix": "^3.3.0",
"js-slang": "^1.0.48",
"lodash": "^4.17.21",
"mqtt": "^4.3.7",
"os": "^0.1.2",
"patch-package": "^6.5.1",
"phaser": "^3.54.0",
"plotly.js-dist": "^2.17.1",
Expand All @@ -120,7 +123,9 @@
"save-file": "^2.3.1",
"source-academy-utils": "^1.0.0",
"source-academy-wabt": "^1.0.4",
"tslib": "^2.3.1"
"tslib": "^2.3.1",
"uniqid": "^5.4.0",
"url": "^0.11.3"
},
"jest": {
"projects": [
Expand Down
200 changes: 200 additions & 0 deletions src/bundles/communication/Communications.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import context from 'js-slang/context';
import { MultiUserController } from './MultiUserController';
import { GlobalStateController } from './GlobalStateController';
import { RpcController } from './RpcController';

class CommunicationModuleState {
multiUser: MultiUserController;
globalState: GlobalStateController | null = null;
rpc: RpcController | null = null;

constructor(address: string, port: number, user: string, password: string) {
const multiUser = new MultiUserController();
multiUser.setupController(address, port, user, password);
this.multiUser = multiUser;
}
}

/**
* Initializes connection with MQTT broker.
* Currently only supports WebSocket.
*
* @param address Address of broker.
* @param port WebSocket port number for broker.
* @param user Username of account, use empty string if none.
* @param password Password of account, use empty string if none.
*/
export function initCommunications(
address: string,
port: number,
user: string,
password: string,
) {
if (getModuleState() instanceof CommunicationModuleState) {
return;
}
const newModuleState = new CommunicationModuleState(
address,
port,
user,
password,
);
context.moduleContexts.communication.state = newModuleState;
}

function getModuleState() {
return context.moduleContexts.communication.state;
}

// Loop

let interval: number | undefined;

/**
* Keeps the program running so that messages can come in.
*/
export function keepRunning() {
interval = window.setInterval(() => {}, 20000);
}

/**
* Removes interval that keeps the program running.
*/
export function stopRunning() {
if (interval !== undefined) {
window.clearInterval(interval);
interval = undefined;
}
}

// Global State

/**
* Initializes global state.
*
* @param topicHeader MQTT topic to use for global state.
* @param callback Callback to receive updates of global state.
*/
export function initGlobalState(
8kdesign marked this conversation as resolved.
Show resolved Hide resolved
topicHeader: string,
callback: (state: any) => void,
) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
if (moduleState.globalState instanceof GlobalStateController) {
return;
}
moduleState.globalState = new GlobalStateController(
topicHeader,
moduleState.multiUser,
callback,
);
return;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Obtains the current global state.
*
* @returns Current global state.
*/
export function getGlobalState() {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
return moduleState.globalState?.globalState;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Broadcasts the new states to all devices.
* Has ability to modify only part of the JSON state.
*
* @param path Path within the json state.
* @param updatedState Replacement value at specified path.
*/
export function updateGlobalState(path: string, updatedState: any) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.globalState?.updateGlobalState(path, updatedState);
return;
}
throw new Error('Error: Communication module not initialized.');
}

// Rpc

/**
* Initializes RPC.
*
* @param topicHeader MQTT topic to use for rpc.
* @param userId Identifier for this user, set undefined to generate a random ID.
*/
export function initRpc(topicHeader: string, userId?: string) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.rpc = new RpcController(
topicHeader,
moduleState.multiUser,
userId,
);
return;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Obtains the user's ID.
*
* @returns String for user ID.
*/
export function getUserId(): string {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
let userId = moduleState.rpc?.getUserId();
if (userId) {
return userId;
}
throw new Error('Error: UserID not found.');
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Exposes the specified function to other users.
* Other users can use "callFunction" to call this function.
*
* @param name Identifier for the function.
* @param func Function to call when request received.
*/
export function expose(name: string, func: (...args: any[]) => any) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.rpc?.expose(name, func);
return;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Calls a function exposed by another user.
*
* @param receiver Identifier for the user whose function we want to call.
* @param name Identifier for function to call.
* @param args Array of arguments to pass into the function.
* @param callback Callback with return value.
*/
export function callFunction(
receiver: string,
name: string,
args: any[],
callback: (args: any[]) => void,
) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.rpc?.callFunction(receiver, name, args, callback);
return;
}
throw new Error('Error: Communication module not initialized.');
}
125 changes: 125 additions & 0 deletions src/bundles/communication/GlobalStateController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { type MultiUserController } from './MultiUserController';

/**
* Controller for maintaining a global state across all devices.
* Depends on MQTT implementation in MultiUserController.
*
* @param topicHeader Identifier for all global state messages, must not include '/'.
* @param multiUser Instance of multi user controller.
* @param callback Callback called when the global state changes.
*/
export class GlobalStateController {
private topicHeader: string;
private multiUser: MultiUserController;
private callback: (state: any) => void;
globalState: any;
8kdesign marked this conversation as resolved.
Show resolved Hide resolved

constructor(
topicHeader: string,
multiUser: MultiUserController,
callback: (state: any) => void,
) {
this.topicHeader = topicHeader;
this.multiUser = multiUser;
this.callback = callback;
this.setupGlobalState();
}

/**
* Sets up callback for global state messages.
* Parses received message and stores it as global state.
*/
private setupGlobalState() {
if (this.topicHeader.length <= 0) return;
this.multiUser.addMessageCallback(this.topicHeader, (topic, message) => {
const shortenedTopic = topic.substring(
this.topicHeader.length,
topic.length,
);
this.parseGlobalStateMessage(shortenedTopic, message);
});
}

/**
* Parses the message received via MQTT and updates the global state.
*
* @param shortenedTopic Path of JSON branch.
* @param message New value to set.
*/
public parseGlobalStateMessage(shortenedTopic: string, message: string) {
let preSplitTopic = shortenedTopic.trim();
if (preSplitTopic.length === 0) {
try {
this.setGlobalState(JSON.parse(message));
} catch {
this.setGlobalState(undefined);
}
return;
}
if (!preSplitTopic.startsWith('/')) {
preSplitTopic = `/${preSplitTopic}`;
}
const splitTopic = preSplitTopic.split('/');
try {
let newGlobalState = { ...this.globalState };
if (
this.globalState instanceof Array ||

Check warning on line 66 in src/bundles/communication/GlobalStateController.ts

View workflow job for this annotation

GitHub Actions / Verify all tests pass and build success

'||' should be placed at the beginning of the line
typeof this.globalState === 'string'
) {
newGlobalState = {};
}
let currentJson = newGlobalState;
for (let i = 1; i < splitTopic.length - 1; i++) {
const subTopic = splitTopic[i];
if (
!(currentJson[subTopic] instanceof Object) ||

Check warning on line 75 in src/bundles/communication/GlobalStateController.ts

View workflow job for this annotation

GitHub Actions / Verify all tests pass and build success

'||' should be placed at the beginning of the line
currentJson[subTopic] instanceof Array ||

Check warning on line 76 in src/bundles/communication/GlobalStateController.ts

View workflow job for this annotation

GitHub Actions / Verify all tests pass and build success

'||' should be placed at the beginning of the line
typeof currentJson[subTopic] === 'string'
) {
currentJson[subTopic] = {};
}
currentJson = currentJson[subTopic];
}
if (message === undefined || message.length === 0) {
delete currentJson[splitTopic[splitTopic.length - 1]];
} else {
const jsonMessage = JSON.parse(message);
currentJson[splitTopic[splitTopic.length - 1]] = jsonMessage;
}
this.setGlobalState(newGlobalState);
} catch (error) {
console.log('Failed to parse message', error);
}
}

/**
* Sets the new global state and calls the callback to notify changes.
*
* @param newState New state received.
*/
private setGlobalState(newState: any) {
this.globalState = newState;
this.callback(newState);
}

/**
* Broadcasts the new states to all devices.
* Has ability to modify only part of the JSON state.
*
* @param path Path within the json state.
* @param updatedState Replacement value at specified path.
*/
public updateGlobalState(path: string, updatedState: any) {
if (this.topicHeader.length === 0) return;
let topic = this.topicHeader;
if (path.length !== 0 && !path.startsWith('/')) {
topic += '/';
}
topic += path;
this.multiUser.controller?.publish(
topic,
JSON.stringify(updatedState),
false,
);
}
}
Loading
Loading