Skip to content

Commit

Permalink
Fix support for absolute service paths
Browse files Browse the repository at this point in the history
  • Loading branch information
oklemenz2 committed Oct 17, 2024
1 parent 73e9fee commit 12d2b8e
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Support for http conform headers (`x-ws` and `x-websocket`)
- Revise error handling for websocket events
- Fix for operations without parameters
- Fix support for absolute service paths

## Version 1.3.0 - 2024-10-07

Expand Down
8 changes: 6 additions & 2 deletions src/adapter/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RedisAdapter extends BaseAdapter {
return;
}
try {
const channel = this.prefix + path;
const channel = this.getChannel(path);
await this.client.subscribe(channel, async (message, messageChannel) => {
try {
if (messageChannel === channel) {
Expand All @@ -43,12 +43,16 @@ class RedisAdapter extends BaseAdapter {
return;
}
try {
const channel = this.prefix + path;
const channel = this.getChannel(path);
await this.client.publish(channel, message);
} catch (err) {
LOG?.error(err);
}
}

getChannel(path) {
return `${this.prefix}/${path}`;
}
}

module.exports = RedisAdapter;
104 changes: 51 additions & 53 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,46 @@ function serveWebSocketServer(options) {
}
}
// Websockets events
if (cds.env.protocols.websocket || cds.env.protocols.ws) {
const eventServices = {};
for (const name in cds.model.definitions) {
const definition = cds.model.definitions[name];
if (definition.kind === "event" && (definition["@websocket"] || definition["@ws"])) {
const service = cds.services[definition._service?.name];
if (service && !isServedViaWebsocket(service)) {
eventServices[service.name] ??= eventServices[service.name] || {
name: service.name,
definition: service.definition,
endpoints: service.endpoints.map((endpoint) => {
const protocol =
cds.env.protocols[endpoint.kind] ||
(endpoint.kind === "odata" ? cds.env.protocols["odata-v4"] : null);
return {
kind: "websocket",
path:
(cds.env.protocols.websocket?.path || cds.env.protocols.ws?.path) +
normalizeServicePath(service.path, protocol.path),
};
}),
operations: () => {
return interableObject();
},
entities: () => {
return interableObject();
},
_events: interableObject(),
events: function () {
return this._events;
},
on: service.on.bind(service),
tx: service.tx.bind(service),
};
eventServices[service.name]._events[serviceLocalName(service, definition.name)] = definition;
}
const eventServices = {};
for (const name in cds.model.definitions) {
const definition = cds.model.definitions[name];
if (definition.kind === "event" && (definition["@websocket"] || definition["@ws"])) {
const service = cds.services[definition._service?.name];
if (service && !isServedViaWebsocket(service)) {
eventServices[service.name] ??= eventServices[service.name] || {
name: service.name,
definition: service.definition,
endpoints: service.endpoints.map((endpoint) => {
const protocol =
cds.env.protocols[endpoint.kind] ||
(endpoint.kind === "odata" ? cds.env.protocols["odata-v4"] : null);
let path = normalizeServicePath(service.path, protocol.path);
if (!path.startsWith("/")) {
path = (cds.env.protocols?.websocket?.path || cds.env.protocols?.ws?.path || "/ws") + "/" + path;
}
return { kind: "websocket", path };
}),
operations: () => {
return interableObject();
},
entities: () => {
return interableObject();
},
_events: interableObject(),
events: function () {
return this._events;
},
on: service.on.bind(service),
tx: service.tx.bind(service),
};
eventServices[service.name]._events[serviceLocalName(service, definition.name)] = definition;
}
}
for (const name in eventServices) {
const eventService = eventServices[name];
if (Object.keys(eventService.events()).length > 0) {
serveWebSocketService(socketServer, eventService, options);
}
}
for (const name in eventServices) {
const eventService = eventServices[name];
if (Object.keys(eventService.events()).length > 0) {
serveWebSocketService(socketServer, eventService, options);
}
}
LOG?.info("using websocket", { kind: cds.env.websocket.kind, adapter: socketServer.adapterActive });
Expand All @@ -112,8 +109,8 @@ async function initWebSocketServer(server, path) {
}

function normalizeServicePath(servicePath, protocolPath) {
if (servicePath.startsWith(protocolPath)) {
return servicePath.substring(protocolPath.length);
if (servicePath.startsWith(`${protocolPath}/`)) {
return servicePath.substring(`${protocolPath}/`.length);
}
return servicePath;
}
Expand Down Expand Up @@ -152,11 +149,11 @@ function bindServiceEvents(socketServer, service, path) {
const user = deriveUser(event, req.data, headers, req);
const context = deriveContext(event, req.data, headers);
const identifier = deriveIdentifier(event, req.data, headers);
const eventHeaders = headers?.websocket || headers?.ws ? { ...headers?.websocket, ...headers?.ws } : undefined;
path = normalizeEventPath(event["@websocket.path"] || event["@ws.path"] || path);
const eventHeaders = deriveEventHeaders(headers);
const eventPath = derivePath(event, path);
await socketServer.broadcast({
service,
path,
path: eventPath,
event: localEventName,
data: req.data,
tenant: req.tenant,
Expand Down Expand Up @@ -702,6 +699,14 @@ function deriveHeaders(headers, format) {
return headers;
}

function deriveEventHeaders(headers) {
return headers?.websocket || headers?.ws ? { ...headers?.websocket, ...headers?.ws } : undefined;
}

function derivePath(event, path) {
return event["@websocket.path"] || event["@ws.path"] || path;
}

function getDeepEntityColumns(entity) {
const columns = [];
for (const element of Object.values(entity.elements)) {
Expand All @@ -719,13 +724,6 @@ function getDeepEntityColumns(entity) {
return columns;
}

function normalizeEventPath(path) {
if (!path) {
return path;
}
return path.startsWith("/") ? path : `/${path}`;
}

function serviceLocalName(service, name) {
const servicePrefix = `${service.name}.`;
if (name.startsWith(servicePrefix)) {
Expand Down
9 changes: 9 additions & 0 deletions src/socket/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,15 @@ class SocketServer {
return require(impl);
}

/**
* Return service path including protocol prefix or absolute service path (if already absolute)
* @param {String} path path
* @returns {String} Service path
*/
servicePath(path) {
return path.startsWith("/") ? path : `${this.path}/${path}`;
}

/**
* Return format instance for service
* @param {Object} service Service definition
Expand Down
6 changes: 2 additions & 4 deletions src/socket/socket.io.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ class SocketIOServer extends SocketServer {
}

service(service, path, connected) {
const servicePath = `${this.path}${path}`;
const io = this.applyMiddlewares(this.io.of(servicePath));
const io = this.applyMiddlewares(this.io.of(this.servicePath(path)));
const format = this.format(service, undefined, "json");
io.on("connection", async (socket) => {
try {
Expand Down Expand Up @@ -175,8 +174,7 @@ class SocketIOServer extends SocketServer {
try {
path = path || this.defaultPath(service);
tenant = tenant || socket?.context.tenant;
const servicePath = `${this.path}${path}`;
let to = socket?.broadcast || this.io.of(servicePath);
let to = socket?.broadcast || this.io.of(this.servicePath(path));
if (context?.include?.length && identifier?.include?.length) {
for (const contextInclude of context.include) {
for (const identifierInclude of identifier.include) {
Expand Down
6 changes: 2 additions & 4 deletions src/socket/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class SocketWSServer extends SocketServer {

service(service, path, connected) {
this.adapter?.on(service, path);
const servicePath = `${this.path}${path}`;
const format = this.format(service);
this.services[servicePath] = (ws, request) => {
this.services[this.servicePath(path)] = (ws, request) => {
this.onInit(ws, request);
DEBUG?.("Initialized");
ws.on("close", () => {
Expand Down Expand Up @@ -154,8 +153,7 @@ class SocketWSServer extends SocketServer {
}
path = path || this.defaultPath(service);
tenant = tenant || socket?.context.tenant;
const servicePath = `${this.path}${path}`;
const serviceClients = this.fetchClients(tenant, servicePath);
const serviceClients = this.fetchClients(tenant, this.servicePath(path));
const clients = new Set(serviceClients.all);
if (user?.include?.length) {
this.keepEntriesFromSet(clients, this.collectFromMap(serviceClients.users, user?.include));
Expand Down
2 changes: 1 addition & 1 deletion test/socketio/facade_socket.io.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe("Facade", () => {
const facade = socket.serverSocket.facade;
expect(facade).toBeDefined();
expect(facade.service).toEqual(expect.any(Object));
expect(facade.path).toEqual("/chat");
expect(facade.path).toEqual("chat");
expect(facade.socket).toBeDefined();
const context = facade.context;
expect(context).toBeDefined();
Expand Down
3 changes: 2 additions & 1 deletion test/socketio/protocols_socket.io.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ describe("Protocols", () => {
});

test.each(protocols)("Protocol - %p", async (protocol) => {
const socket = await connect(`/ws/${protocol}`);
const path = (protocol.includes("absolute") ? "/" : "/ws/") + protocol;
const socket = await connect(path);
const waitProtocol = waitForEvent(socket, "test");
await emitEvent(socket, "trigger", { text: protocol });
const waitResult = await waitProtocol;
Expand Down
2 changes: 1 addition & 1 deletion test/ws/facade_ws.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe("Facade", () => {
const facade = socket.serverSocket.facade;
expect(facade).toBeDefined();
expect(facade.service).toEqual(expect.any(Object));
expect(facade.path).toEqual("/chat");
expect(facade.path).toEqual("chat");
expect(facade.socket).toBeDefined();
const context = facade.context;
expect(context).toBeDefined();
Expand Down
3 changes: 2 additions & 1 deletion test/ws/protocols_ws.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ describe("Protocols", () => {
});

test.each(protocols)("Protocol - %p", async (protocol) => {
const socket = await connect("/ws/" + protocol);
const path = (protocol.includes("absolute") ? "/" : "/ws/") + protocol;
const socket = await connect(path);
const waitProtocol = waitForEvent(socket, "test");
await emitEvent(socket, "trigger", { text: protocol });
const waitResult = await waitProtocol;
Expand Down

0 comments on commit 12d2b8e

Please sign in to comment.