From 2e8391d42e0f1d158a94b32fce92ca1b53c31ecf Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Fri, 27 Sep 2024 17:29:09 +0200 Subject: [PATCH 1/8] cloud events (wip) --- CHANGELOG.md | 3 + README.md | 8 +- src/format/base.js | 49 ++++++++++- src/format/cloudevent.js | 79 ++++++++++++++++++ src/format/pcp.js | 35 +++----- src/index.js | 36 +++++--- src/socket/base.js | 15 ++-- src/socket/socket.io.js | 10 ++- src/socket/ws.js | 12 +-- test/_env/srv/cloudevent.cds | 73 ++++++++++++++++ test/_env/srv/handlers/cloudevent.js | 48 +++++++++++ test/_env/srv/handlers/pcp.js | 11 +++ test/_env/srv/handlers/todo.js | 2 +- test/_env/srv/index.cds | 1 + test/_env/srv/pcp.cds | 7 ++ test/_env/srv/todo.cds | 2 +- test/_env/util/socket.io.js | 3 +- test/_env/util/ws.js | 6 +- test/socketio/fns_socket.io.test.js | 4 +- test/socketio/pcp_socket.io.test.js | 12 +++ test/ws/cloudevent_ws.test.js | 120 +++++++++++++++++++++++++++ test/ws/fns_ws.test.js | 4 +- test/ws/pcp_ws.test.js | 12 +++ test/ws/redis_ws.test.js | 5 +- 24 files changed, 492 insertions(+), 65 deletions(-) create mode 100644 src/format/cloudevent.js create mode 100644 test/_env/srv/cloudevent.cds create mode 100644 test/_env/srv/handlers/cloudevent.js create mode 100644 test/ws/cloudevent_ws.test.js diff --git a/CHANGELOG.md b/CHANGELOG.md index a0458fb..d60593f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Add support for cloud events - Include or exclude defined list of users - Support exclusion of event contexts - Overrule path of websocket event via `@websocket.path` or `@ws.path` for non-websocket services @@ -16,10 +17,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Optimization of client determination for kind `ws` - Allow empty PCP message in event definition - Improve documentation and examples +- Provide event headers to formatter ### Fixed - Ignore not modeled PCP fields in payload serialization +- Fix annotations value derivation for non-websocket service events - Fix annotations `wsCurrentUserInclude`, `currentUserInclude`, `wsCurrentUserExclude`, `currentUserExclude` ## Version 1.2.0 - 2024-09-04 diff --git a/README.md b/README.md index 569f947..6ffd8f9 100644 --- a/README.md +++ b/README.md @@ -221,10 +221,10 @@ It abstracts from the concrete websocket implementation by exposing the followin - `context: Object`: CDS context object for the websocket server socket - `on(event: String, callback: Function)`: Register websocket event - `async emit(event: String, data: Object)`: Emit websocket event with data -- `async broadcast(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?)`: - Broadcast websocket event (except to sender) by optionally restrict to users, contexts or identifiers -- `async broadcastAll(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?)`: - Broadcast websocket event (including to sender) by optionally restrict to users, contexts or identifiers +- `async broadcast(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?, headers: Object?)`: + Broadcast websocket event (except to sender) by optionally restrict to users, contexts or identifiers and optionally providing headers +- `async broadcastAll(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?, headers: Object?)`: + Broadcast websocket event (including to sender) by optionally restrict to users, contexts or identifiers and optionally providing headers - `async enter(context: String)`: Enter a context - `async exit(context: String)`: Exit a context - `async disconnect()`: Disconnect server socket diff --git a/src/format/base.js b/src/format/base.js index d25867f..7a96b64 100644 --- a/src/format/base.js +++ b/src/format/base.js @@ -21,9 +21,56 @@ class BaseFormat { * Compose the event and internal data (JSON) into a formatted string * @param {String} event Event name * @param {Object} data Event internal data + * @param {Object} headers Event headers * @returns {String|Object} Formatted string or a JSON object (for kind `socket.io` only) */ - compose(event, data) {} + compose(event, data, headers) {} + + /** + * Derive value from event data via annotations + * @param {String} event Event definition + * @param {Object} data Event data + * @param {Object} headers Event headers + * @param {[String]} headersNames Header names + * @param {[String]} annotationNames Annotations names + * @param {*} [fallback] Fallback value + * @returns {*} Derived value + */ + deriveValue(event, data, headers, { headerNames, annotationNames, fallback }) { + const eventDefinition = this.service.events()[event]; + if (eventDefinition) { + if (headers) { + for (const header of headerNames || []) { + if (headers[header] !== undefined) { + return headers[header]; + } + } + } + for (const annotation of annotationNames || []) { + if (eventDefinition[annotation] !== undefined) { + return eventDefinition[annotation]; + } + } + if (data) { + const eventElements = Object.values(eventDefinition?.elements || {}); + if (eventElements.length > 0) { + for (const annotation of annotationNames) { + const eventElement = eventElements.find((element) => { + return element[annotation]; + }); + if (eventElement) { + const elementValue = data[eventElement.name]; + if (elementValue !== undefined) { + delete data[eventElement.name]; + return elementValue; + } + } + } + } + } + } + return fallback; + } } module.exports = BaseFormat; diff --git a/src/format/cloudevent.js b/src/format/cloudevent.js new file mode 100644 index 0000000..1ac0a45 --- /dev/null +++ b/src/format/cloudevent.js @@ -0,0 +1,79 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const BaseFormat = require("./base"); + +const LOG = cds.log("/websocket/cloudevent"); + +class CloudEventFormat extends BaseFormat { + constructor(service, origin) { + super(service, origin); + } + + parse(data) { + try { + const cloudEvent = data?.constructor === Object ? data : JSON.parse(data); + // TODO: Map to CAP operation + + } catch (err) { + LOG?.error(err); + } + LOG?.warn("Error parsing cloud-event format", data); + return { + event: undefined, + data: {}, + }; + } + + compose(event, data, headers) { + const cloudEvent = { + specversion: "1.0", + type: `${this.service.name}.${event}`, + source: this.service.name, + subject: null, + id: cds.utils.uuid(), + time: new Date().toISOString(), + datacontenttype: "application/json", + data, + }; + const annotations = this.collectAnnotations(event); + for (const annotation of annotations) { + const value = this.deriveValue(event, data, headers, { + headerValues: [`cloudevent-${annotation}`, `cloudevent_${annotation}`, `cloudevent.${annotation}`, `cloudevent${annotation}`, annotation], + annotationValues: [`@websocket.cloudevent.${annotation}`, `@ws.cloudevent.${annotation}`], + }); + if (value !== undefined) { + cloudEvent[annotation] = value; + } + } + return this.origin === "json" ? cloudEvent : JSON.stringify(cloudEvent); + } + + collectAnnotations(event) { + const annotations = new Set(); + const eventDefinition = this.service.events()[event]; + for (const annotation in eventDefinition) { + if (annotation.startsWith("@websocket.cloudevent.")) { + annotations.add(annotation.substring("@websocket.cloudevent.".length)); + } + if (annotation.startsWith("@ws.cloudevent.")) { + annotations.add(annotation.substring("@ws.cloudevent.".length)); + } + } + const eventElements = Object.values(eventDefinition?.elements || {}); + for (const element of eventElements) { + for (const annotation in element) { + if (annotation.startsWith("@websocket.cloudevent.")) { + annotations.add(annotation.substring("@websocket.cloudevent.".length)); + } + if (annotation.startsWith("@ws.cloudevent.")) { + annotations.add(annotation.substring("@ws.cloudevent.".length)); + } + } + } + return annotations; + } +} + +module.exports = CloudEventFormat; diff --git a/src/format/pcp.js b/src/format/pcp.js index 7db576f..908f928 100644 --- a/src/format/pcp.js +++ b/src/format/pcp.js @@ -52,35 +52,22 @@ class PCPFormat extends BaseFormat { }; } - compose(event, data) { + compose(event, data, headers) { const eventDefinition = this.service.events()[event]; - const eventElements = Object.values(eventDefinition?.elements || {}); - const messageElement = eventElements.find((element) => { - return element["@websocket.pcp.message"] || element["@ws.pcp.message"]; + const pcpMessage = this.deriveValue(event, data, headers, { + headerNames: ["pcp-message", "pcp_message", "pcp.message", "pcpmessage"], + annotationNames: ["@websocket.pcp.message", "@ws.pcp.message"], + fallback: event, }); - const actionElement = eventElements.find((element) => { - return element["@websocket.pcp.action"] || element["@ws.pcp.action"]; + const pcpAction = this.deriveValue(event, data, headers, { + headerNames: ["pcp-action", "pcp_action", "pcp.action", "pcpaction"], + annotationNames: ["@websocket.pcp.action", "@ws.pcp.action"], + fallback: MESSAGE, }); - const message = - eventDefinition?.["@websocket.pcp.message"] ?? - eventDefinition?.["@ws.pcp.message"] ?? - data[messageElement?.name] ?? - event; - if (data[messageElement?.name]) { - delete data[messageElement?.name]; - } - const pcpAction = - eventDefinition?.["@websocket.pcp.action"] ?? - eventDefinition?.["@ws.pcp.action"] ?? - data[actionElement?.name] ?? - MESSAGE; - if (data[actionElement?.name]) { - delete data[actionElement?.name]; - } const pcpEvent = eventDefinition?.["@websocket.pcp.event"] || eventDefinition?.["@ws.pcp.event"] ? event : undefined; - const pcpFields = serializePcpFields(data, typeof message, pcpAction, pcpEvent, eventDefinition?.elements); - return pcpFields + message; + const pcpFields = serializePcpFields(data, typeof pcpMessage, pcpAction, pcpEvent, eventDefinition?.elements); + return pcpFields + pcpMessage; } } diff --git a/src/index.js b/src/index.js index 7728c1c..140c033 100644 --- a/src/index.js +++ b/src/index.js @@ -65,26 +65,25 @@ function serveWebSocketServer(options) { }; }), operations: () => { - return []; + return interableObject(); }, entities: () => { - return []; + return interableObject(); + }, + _events: interableObject(), + events: function() { + return this._events; }, - events: [], on: service.on.bind(service), tx: service.tx.bind(service), }; - eventServices[service.name].events.push(definition); + eventServices[service.name]._events[serviceLocalName(service, definition.name)] = definition; } } } for (const name in eventServices) { const eventService = eventServices[name]; - const events = eventService.events; - if (events.length > 0) { - eventService.events = () => { - return events; - }; + if (Object.keys(eventService.events()).length > 0) { serveWebSocketService(socketServer, eventService, options); } } @@ -151,6 +150,7 @@ function bindServiceEvents(socketServer, service, path) { const user = deriveUser(event, req.data, req.headers, req); const context = deriveContext(event, req.data, req.headers); const identifier = deriveIdentifier(event, req.data, req.headers); + const headers = req.headers?.websocket || req.headers?.ws; path = normalizeEventPath(event["@websocket.path"] || event["@ws.path"] || path); await socketServer.broadcast({ service, @@ -161,6 +161,7 @@ function bindServiceEvents(socketServer, service, path) { user, context, identifier, + headers, socket: null, }); } catch (err) { @@ -171,7 +172,7 @@ function bindServiceEvents(socketServer, service, path) { } function bindServiceDefaults(socket, service) { - if (service.operations(WebSocketAction.Disconnect).length) { + if (service.operations[WebSocketAction.Disconnect]) { socket.onDisconnect(async (reason) => { await processEvent(socket, service, WebSocketAction.Disconnect, { reason }); }); @@ -182,7 +183,7 @@ function bindServiceDefaults(socket, service) { } else { await socket.exit(data?.context); } - if (service.operations(WebSocketAction.Context).length) { + if (service.operations[WebSocketAction.Context]) { await processEvent(socket, service, WebSocketAction.Context, data, callback); } else { callback && (await callback()); @@ -242,7 +243,7 @@ function bindServiceEntities(socket, service) { } async function emitConnect(socket, service) { - if (service.operations(WebSocketAction.Connect).length) { + if (service.operations[WebSocketAction.Connect]) { await processEvent(socket, service, WebSocketAction.Connect); } } @@ -674,6 +675,17 @@ function serviceLocalName(service, name) { return name; } +function interableObject(object) { + return { + ...object, + [Symbol.iterator]: function* () { + for (const event in this) { + yield this[event]; + } + }, + }; +} + function isServedViaWebsocket(service) { if (!service) { return false; diff --git a/src/socket/base.js b/src/socket/base.js index 68de6ec..6836d38 100644 --- a/src/socket/base.js +++ b/src/socket/base.js @@ -86,7 +86,7 @@ class SocketServer { /** * Broadcast websocket event to all sockets except to sender with options * @param {String} event Event - * @param {Object} data Data + * @param {Object} data Event data * @param {Object} [user] Users to be included/excluded, undefined: no restriction * @param {[String]} [user.include] Users to be included, undefined: no restriction * @param {[String]} [user.exclude] Users to be excluded, undefined: no restriction @@ -96,15 +96,16 @@ class SocketServer { * @param {Object} [identifier] Unique consumer-provided socket client identifiers to be included/excluded, undefined: no restriction * @param {[String]} [identifier.include] Client identifiers to be included, undefined: no restriction * @param {[String]} [identifier.exclude] Client identifiers to be excluded, undefined: no restriction + * @param {Object} [headers] Event headers * @returns {Promise} Promise when broadcasting completed */ - broadcast: async (event, data, user, context, identifier) => { + broadcast: async (event, data, user, context, identifier, headers) => { return Promise.resolve(); }, /** * Broadcast websocket event to all sockets with options * @param {String} event Event - * @param {Object} data Data + * @param {Object} data Event data * @param {Object} [user] Users to be included/excluded, undefined: no restriction * @param {[String]} [user.include] Users to be included, undefined: no restriction * @param {[String]} [user.exclude] Users to be excluded, undefined: no restriction @@ -114,9 +115,10 @@ class SocketServer { * @param {Object} [identifier] Unique consumer-provided socket client identifiers to be included/excluded, undefined: no restriction * @param {[String]} [identifier.include] Client identifiers to be included, undefined: no restriction * @param {[String]} [identifier.exclude] Client identifiers to be excluded, undefined: no restriction + * @param {Object} [headers] Event headers * @returns {Promise} Promise when broadcasting completed */ - broadcastAll: async (event, data, user, context, identifier) => { + broadcastAll: async (event, data, user, context, identifier, headers) => { return Promise.resolve(); }, /** @@ -153,7 +155,7 @@ class SocketServer { * @param {String} service Service definition * @param {String} [path] Service path, e.g. "/path" (relative to websocket server path), undefined: default service path * @param {String} event Event name or event message JSON content (no additional parameters provided (incl. 'data', except 'local')) - * @param {Object} [data] Data object + * @param {Object} [data] Event data * @param {String} [tenant] Tenant for isolation * @param {Object} [user] Users to be included/excluded, undefined: no restriction * @param {[String]} [user.include] Users to be included, undefined: no restriction @@ -164,11 +166,12 @@ class SocketServer { * @param {Object} [identifier] Unique consumer-provided socket client identifiers to be included/excluded, undefined: no restriction * @param {[String]} [identifier.include] Client identifiers to be included, undefined: no restriction * @param {[String]} [identifier.exclude] Client identifiers to be excluded, undefined: no restriction + * @param {Object} [headers] Event headers * @param {Object} [socket] Broadcast client socket to be excluded, undefined: no exclusion * @param {boolean} [local] Broadcast only locally (i.e. not via adapter), default: falsy * @returns {Promise} Promise when broadcasting completed */ - async broadcast({ service, path, event, data, tenant, user, context, identifier, socket, local }) {} + async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket, local }) {} /** * Handle HTTP request response diff --git a/src/socket/socket.io.js b/src/socket/socket.io.js index 57fb3f9..c5c133f 100644 --- a/src/socket/socket.io.js +++ b/src/socket/socket.io.js @@ -70,7 +70,7 @@ class SocketIOServer extends SocketServer { emit: async (event, data) => { await socket.emit(event, format.compose(event, data)); }, - broadcast: async (event, data, user, context, identifier) => { + broadcast: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -80,10 +80,11 @@ class SocketIOServer extends SocketServer { user, context, identifier, + headers, socket, }); }, - broadcastAll: async (event, data, user, context, identifier) => { + broadcastAll: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -93,6 +94,7 @@ class SocketIOServer extends SocketServer { user, context, identifier, + headers, socket: null, }); }, @@ -161,7 +163,7 @@ class SocketIOServer extends SocketServer { }); } - async broadcast({ service, path, event, data, tenant, user, context, identifier, socket }) { + async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket }) { path = path || this.defaultPath(service); tenant = tenant || socket?.context.tenant; let to = socket?.broadcast || this.io.of(path); @@ -222,7 +224,7 @@ class SocketIOServer extends SocketServer { } } const format = this.format(service, event, "json"); - to.emit(event, format.compose(event, data)); + to.emit(event, format.compose(event, data, headers)); } respond(socket, statusCode, body) { diff --git a/src/socket/ws.js b/src/socket/ws.js index 73fe378..27da908 100644 --- a/src/socket/ws.js +++ b/src/socket/ws.js @@ -73,7 +73,7 @@ class SocketWSServer extends SocketServer { emit: async (event, data) => { await ws.send(format.compose(event, data)); }, - broadcast: async (event, data, user, context, identifier) => { + broadcast: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -83,10 +83,11 @@ class SocketWSServer extends SocketServer { user, context, identifier, + headers, socket: ws, }); }, - broadcastAll: async (event, data, user, context, identifier) => { + broadcastAll: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -96,6 +97,7 @@ class SocketWSServer extends SocketServer { user, context, identifier, + headers, socket: null, }); }, @@ -127,7 +129,7 @@ class SocketWSServer extends SocketServer { }; } - async broadcast({ service, path, event, data, tenant, user, context, identifier, socket, local }) { + async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket, local }) { const eventMessage = event; const isEventMessage = !data; if (isEventMessage) { @@ -179,7 +181,7 @@ class SocketWSServer extends SocketServer { } if (clients.size > 0) { const format = this.format(service, event); - const clientMessage = format.compose(event, data); + const clientMessage = format.compose(event, data, headers); for (const client of clients) { if (client !== socket && client.readyState === WebSocket.OPEN) { await client.send(clientMessage); @@ -189,7 +191,7 @@ class SocketWSServer extends SocketServer { if (!local) { const adapterMessage = isEventMessage ? eventMessage - : JSON.stringify({ event, data, tenant, user, context, identifier }); + : JSON.stringify({ event, data, tenant, user, context, identifier, headers }); await this.adapter?.emit(service, path, adapterMessage); } } diff --git a/test/_env/srv/cloudevent.cds b/test/_env/srv/cloudevent.cds new file mode 100644 index 0000000..ec9ea1c --- /dev/null +++ b/test/_env/srv/cloudevent.cds @@ -0,0 +1,73 @@ +@ws +@ws.format: 'cloudevent' +@path : 'cloudevent' +service CloudEventService { + + @ws.cloudevent.type: 'com.example.someevent' + action sendCloudEvent( + @ws.cloudevent.specversion specversion : String, + @ws.cloudevent.type type : String, + @ws.cloudevent.source source : String, + @ws.cloudevent.subject subject : String, + @ws.cloudevent.id id : String, + @ws.cloudevent.time time : String, + @ws.cloudevent.comexampleextension1 comexampleextension1 : String, + @ws.cloudevent.comexampleothervalue comexampleothervalue : String, + @ws.cloudevent.datacontenttype datacontenttype : String, + appinfoA : String, + appinfoB : Integer, + appinfoC : Boolean) returns Boolean; + + event cloudEvent1 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + + @ws.cloudevent.specversion : '1.1' + @ws.cloudevent.type : 'com.example.someevent' + @ws.cloudevent.source : '/mycontext' + @ws.cloudevent.subject : 'example' + @ws.cloudevent.id : 'C234-1234-1234' + @ws.cloudevent.time : '2018-04-05T17:31:00Z' + @ws.cloudevent.comexampleextension1: 'value' + @ws.cloudevent.comexampleothervalue: 5 + @ws.cloudevent.datacontenttype : 'application/cloudevents+json' + event cloudEvent2 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + + event cloudEvent3 { + @ws.cloudevent.specversion + specversion : String; + + @ws.cloudevent.type + type : String; + + @ws.cloudevent.source + source : String; + + @ws.cloudevent.subject + subject : String; + + @ws.cloudevent.id + id : String; + + @ws.cloudevent.time + time : String; + + @ws.cloudevent.comexampleextension1 + extension1 : String; + + @ws.cloudevent.comexampleothervalue + othervalue : String; + + @ws.cloudevent.datacontenttype + datacontenttype : String; + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } +} diff --git a/test/_env/srv/handlers/cloudevent.js b/test/_env/srv/handlers/cloudevent.js new file mode 100644 index 0000000..6eb0a10 --- /dev/null +++ b/test/_env/srv/handlers/cloudevent.js @@ -0,0 +1,48 @@ +"use strict"; + +module.exports = (srv) => { + srv.on("sendCloudEvent", async (req) => { + await srv.emit("cloudEvent1", { + appinfoA: req.data.appinfoA || "abc", + appinfoB: req.data.appinfoB || 123, + appinfoC: req.data.appinfoC || true, + }); + await srv.emit("cloudEvent2", { + appinfoA: req.data.appinfoA || "abc", + appinfoB: req.data.appinfoB || 123, + appinfoC: req.data.appinfoC || true, + }); + await srv.emit("cloudEvent3", { + specversion: "1.1", + type: "com.example.someevent", + source: "/mycontext", + subject: "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + extension1: "value", + othervalue: 5, + datacontenttype: "application/cloudevents+json", + appinfoA: req.data.appinfoA || "abc", + appinfoB: req.data.appinfoB || 123, + appinfoC: req.data.appinfoC || true, + }); + await srv.emit("cloudEvent4", { + appinfoA: req.data.appinfoA || "abc", + appinfoB: req.data.appinfoB || 123, + appinfoC: req.data.appinfoC || true, + }, { + ws: { + specversion: "1.1", + type: "com.example.someevent", + source: "/mycontext", + subject: "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + extension1: "value", + othervalue: 5, + datacontenttype: "application/cloudevents+json", + } + }); + return true; + }); +}; diff --git a/test/_env/srv/handlers/pcp.js b/test/_env/srv/handlers/pcp.js index cea2015..c2c4d14 100644 --- a/test/_env/srv/handlers/pcp.js +++ b/test/_env/srv/handlers/pcp.js @@ -17,6 +17,17 @@ module.exports = (srv) => { field1: req.data.field1 || "value1", field2: req.data.field2 || "value2", }); + await srv.emit("notification4", { + message: "no body!", + action: "MESSAGE", + field1: req.data.field1 || "value1", + field2: req.data.field2 || "value2", + }, { + ws: { + pcpaction: "ABC", + pcpmessage: "Header", + }, + }); return true; }); }; diff --git a/test/_env/srv/handlers/todo.js b/test/_env/srv/handlers/todo.js index 6baec4e..a73887b 100644 --- a/test/_env/srv/handlers/todo.js +++ b/test/_env/srv/handlers/todo.js @@ -12,7 +12,7 @@ module.exports = class TodoService extends cds.ApplicationService { const ID = context.params?.[0]?.ID; if (ID && context.target === Todo && ["CREATE", "UPDATE", "DELETE"].includes(context.event)) { await this.emit("refresh", { ID }); - await this.emit("notify", { text: ID }); + await this.emit("notify", { text: "4711" }); const service = await cds.connect.to("TodoWSService"); await service.emit("refresh", { ID }); } diff --git a/test/_env/srv/index.cds b/test/_env/srv/index.cds index df0a98c..13df20a 100644 --- a/test/_env/srv/index.cds +++ b/test/_env/srv/index.cds @@ -1,4 +1,5 @@ using from './chat'; +using from './cloudevent'; using from './fns'; using from './main'; using from './odata'; diff --git a/test/_env/srv/pcp.cds b/test/_env/srv/pcp.cds index 65a8ed7..af11f8b 100644 --- a/test/_env/srv/pcp.cds +++ b/test/_env/srv/pcp.cds @@ -30,4 +30,11 @@ service PCPService { field1: String; field2: String; } + + @ws.pcp.event + event notification4 { + action: String; + field1: String; + field2: String; + } } \ No newline at end of file diff --git a/test/_env/srv/todo.cds b/test/_env/srv/todo.cds index e83e853..53273ff 100644 --- a/test/_env/srv/todo.cds +++ b/test/_env/srv/todo.cds @@ -26,8 +26,8 @@ service TodoService { @ws @ws.pcp.event @ws.pcp.message: '' - @ws.path : 'fns-websocket' @ws.format : 'pcp' + @ws.path : 'fns-websocket' event notify { text : String }; diff --git a/test/_env/util/socket.io.js b/test/_env/util/socket.io.js index 23bf9d2..344f235 100644 --- a/test/_env/util/socket.io.js +++ b/test/_env/util/socket.io.js @@ -5,12 +5,13 @@ const ioc = require("socket.io-client"); const auth = require("./auth"); -async function connect(service, options = {}) { +async function connect(service, options = {}, headers) { const port = cds.app.server.address().port; const socket = ioc(`http://localhost:${port}/${service}${options?.id ? `?id=${options?.id}` : ""}`, { path: "/ws", extraHeaders: { authorization: options?.authorization || auth.alice, + ...headers }, }); cds.io.of(service).once("connection", (serverSocket) => { diff --git a/test/_env/util/ws.js b/test/_env/util/ws.js index e030336..5b1bce8 100644 --- a/test/_env/util/ws.js +++ b/test/_env/util/ws.js @@ -5,11 +5,13 @@ const WebSocket = require("ws"); const auth = require("./auth"); -async function connect(service, options = {}) { +async function connect(service, options = {}, headers = {}, protoocls) { const port = cds.app.server.address().port; - const socket = new WebSocket(`ws://localhost:${port}` + service, { + protoocls ??= []; + const socket = new WebSocket(`ws://localhost:${port}` + service, protoocls, { headers: { authorization: options?.authorization || auth.alice, + ...headers }, }); cds.wss.once("connection", async (serverSocket) => { diff --git a/test/socketio/fns_socket.io.test.js b/test/socketio/fns_socket.io.test.js index 36a88b4..216fef9 100644 --- a/test/socketio/fns_socket.io.test.js +++ b/test/socketio/fns_socket.io.test.js @@ -52,8 +52,10 @@ describe("Fns", () => { expect(waitResult).toMatchObject({ ID }); const waitFnsResult = await waitNotifyPromise; expect(waitFnsResult).toEqual(`pcp-action:MESSAGE +pcp-event:notify pcp-body-type:text +text:4711 -notify`); +`); }); }); diff --git a/test/socketio/pcp_socket.io.test.js b/test/socketio/pcp_socket.io.test.js index f4de538..2d39a10 100644 --- a/test/socketio/pcp_socket.io.test.js +++ b/test/socketio/pcp_socket.io.test.js @@ -42,6 +42,15 @@ field2:value2 `; +const pcpMessage4 = `pcp-action:ABC +pcp-event:notification4 +pcp-body-type:text +action:MESSAGE +field1:value1 +field2:value2 + +Header`; + describe("PCP", () => { let socket; @@ -57,6 +66,7 @@ describe("PCP", () => { const waitNotification1Promise = waitForEvent(socket, "notification1"); const waitNotification2Promise = waitForEvent(socket, "notification2"); const waitNotification3Promise = waitForEvent(socket, "notification3"); + const waitNotification4Promise = waitForEvent(socket, "notification4"); const result = await emitEvent(socket, "sendNotification", pcpMessage); expect(result).toBe(true); const waitResult1 = await waitNotification1Promise; @@ -65,6 +75,8 @@ describe("PCP", () => { expect(waitResult2).toEqual(pcpMessage2); const waitResult3 = await waitNotification3Promise; expect(waitResult3).toEqual(pcpMessage3); + const waitResult4 = await waitNotification4Promise; + expect(waitResult4).toEqual(pcpMessage4); }); test("PCP format error", async () => { diff --git a/test/ws/cloudevent_ws.test.js b/test/ws/cloudevent_ws.test.js new file mode 100644 index 0000000..912a933 --- /dev/null +++ b/test/ws/cloudevent_ws.test.js @@ -0,0 +1,120 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const { connect, disconnect, emitMessage, waitForMessage } = require("../_env/util/ws"); + +cds.test(__dirname + "/../_env"); + +const cloudEvent1Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +const cloudEvent2Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +const cloudEvent3Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +const cloudEvent4Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +describe("CloudEvent", () => { + let socket; + + beforeAll(async () => { + socket = await connect( + "/ws/cloudevent", + {}, + { + "Sec-WebSocket-Protocol": "cloudevents.json", + }, + ["cloudevents.json"], + ); + }); + + afterAll(async () => { + await disconnect(socket); + }); + + test("Event Cloud Protocol", async () => { + expect(socket._protocol).toEqual("cloudevents.json"); + }); + + test.skip("Event Cloud Event", async () => { + const waitCloudEvent1Promise = waitForMessage(socket, "cloudEvent1"); + const waitCloudEvent2Promise = waitForMessage(socket, "cloudEvent2"); + const waitCloudEvent3Promise = waitForMessage(socket, "cloudEvent3"); + const waitCloudEvent4Promise = waitForMessage(socket, "cloudEvent4"); + const result = await emitMessage(socket, cloudEvent1Message); + expect(result).toBeNull(); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1Message); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2Message); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual(cloudEvent3Message); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual(cloudEvent3Message); + }); + + test("Cloud event format error", async () => { + const result = await emitMessage(socket, "This is not a Cloud Event message!"); + expect(result).toEqual(null); + }); +}); diff --git a/test/ws/fns_ws.test.js b/test/ws/fns_ws.test.js index 5b39b5d..eb1500a 100644 --- a/test/ws/fns_ws.test.js +++ b/test/ws/fns_ws.test.js @@ -47,8 +47,10 @@ describe("Fns", () => { expect(waitResult).toMatchObject({ ID }); const waitFnsResult = await waitNotifyPromise; expect(waitFnsResult).toEqual(`pcp-action:MESSAGE +pcp-event:notify pcp-body-type:text +text:4711 -notify`); +`); }); }); diff --git a/test/ws/pcp_ws.test.js b/test/ws/pcp_ws.test.js index 8efc978..9204fe3 100644 --- a/test/ws/pcp_ws.test.js +++ b/test/ws/pcp_ws.test.js @@ -37,6 +37,15 @@ field2:value2 `; +const pcpMessage4 = `pcp-action:ABC +pcp-event:notification4 +pcp-body-type:text +action:MESSAGE +field1:value1 +field2:value2 + +Header`; + describe("PCP", () => { let socket; @@ -52,6 +61,7 @@ describe("PCP", () => { const waitNotification1Promise = waitForMessage(socket, "notification1"); const waitNotification2Promise = waitForMessage(socket, "notification2"); const waitNotification3Promise = waitForMessage(socket, "notification3"); + const waitNotification4Promise = waitForMessage(socket, "notification4"); const result = await emitMessage(socket, pcpMessage); expect(result).toBeNull(); const waitResult1 = await waitNotification1Promise; @@ -60,6 +70,8 @@ describe("PCP", () => { expect(waitResult2).toEqual(pcpMessage2); const waitResult3 = await waitNotification3Promise; expect(waitResult3).toEqual(pcpMessage3); + const waitResult4 = await waitNotification4Promise; + expect(waitResult4).toEqual(pcpMessage4); }); test("PCP format error", async () => { diff --git a/test/ws/redis_ws.test.js b/test/ws/redis_ws.test.js index 255f189..15cfd26 100644 --- a/test/ws/redis_ws.test.js +++ b/test/ws/redis_ws.test.js @@ -52,8 +52,9 @@ describe("Redis", () => { expect(redis.client.connect).toHaveBeenCalledWith(); expect(redis.client.on).toHaveBeenNthCalledWith(1, "error", expect.any(Function)); expect(redis.client.subscribe).toHaveBeenNthCalledWith(1, "websocket/chat", expect.any(Function)); - expect(redis.client.subscribe).toHaveBeenNthCalledWith(2, "websocket/fns-websocket", expect.any(Function)); - expect(redis.client.subscribe).toHaveBeenNthCalledWith(3, "websocket/main", expect.any(Function)); + expect(redis.client.subscribe).toHaveBeenNthCalledWith(2, "websocket/cloudevent", expect.any(Function)); + expect(redis.client.subscribe).toHaveBeenNthCalledWith(3, "websocket/fns-websocket", expect.any(Function)); + expect(redis.client.subscribe).toHaveBeenNthCalledWith(4, "websocket/main", expect.any(Function)); expect(redis.client.publish).toHaveBeenCalledWith( "websocket/chat", `{"event":"received","data":{"text":"test","user":"alice"},"tenant":"t1"}`, From 9d053a145b36a6af722893a844016a41876588aa Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Fri, 27 Sep 2024 17:37:20 +0200 Subject: [PATCH 2/8] cloud events (wip) --- src/format/base.js | 13 +++ src/format/cloudevent.js | 27 ++++- src/format/pcp.js | 17 +-- test/socketio/cloudevent_socket.io.test.js | 125 +++++++++++++++++++++ test/ws/cloudevent_ws.test.js | 4 +- 5 files changed, 166 insertions(+), 20 deletions(-) create mode 100644 test/socketio/cloudevent_socket.io.test.js diff --git a/src/format/base.js b/src/format/base.js index 7a96b64..184ff54 100644 --- a/src/format/base.js +++ b/src/format/base.js @@ -71,6 +71,19 @@ class BaseFormat { } return fallback; } + + localName(name) { + return name.startsWith(`${this.service.name}.`) ? name.substring(this.service.name.length + 1) : name; + }; + + stringValue(value) { + if (value instanceof Date) { + return value.toISOString(); + } else if (value instanceof Object) { + return JSON.stringify(value); + } + return String(value); + } } module.exports = BaseFormat; diff --git a/src/format/cloudevent.js b/src/format/cloudevent.js index 1ac0a45..d1fd172 100644 --- a/src/format/cloudevent.js +++ b/src/format/cloudevent.js @@ -14,8 +14,23 @@ class CloudEventFormat extends BaseFormat { parse(data) { try { const cloudEvent = data?.constructor === Object ? data : JSON.parse(data); - // TODO: Map to CAP operation - + const result = {}; + const operation = Object.values(this.service.operations || {}).find((operation) => { + return ( + operation["@websocket.cloudevent.type"] === cloudEvent.type || + operation["@ws.cloudevent.type"] === cloudEvent.type || + operation.name === cloudEvent.type + ); + }); + if (operation) { + for (const param of operation.params) { + // TODO: Parse into data + } + return { + event: this.localName(operation.name), + data: result, + }; + } } catch (err) { LOG?.error(err); } @@ -40,7 +55,13 @@ class CloudEventFormat extends BaseFormat { const annotations = this.collectAnnotations(event); for (const annotation of annotations) { const value = this.deriveValue(event, data, headers, { - headerValues: [`cloudevent-${annotation}`, `cloudevent_${annotation}`, `cloudevent.${annotation}`, `cloudevent${annotation}`, annotation], + headerValues: [ + `cloudevent-${annotation}`, + `cloudevent_${annotation}`, + `cloudevent.${annotation}`, + `cloudevent${annotation}`, + annotation, + ], annotationValues: [`@websocket.cloudevent.${annotation}`, `@ws.cloudevent.${annotation}`], }); if (value !== undefined) { diff --git a/src/format/pcp.js b/src/format/pcp.js index 908f928..d10db94 100644 --- a/src/format/pcp.js +++ b/src/format/pcp.js @@ -40,7 +40,7 @@ class PCPFormat extends BaseFormat { } } return { - event: localName(operation.name, this.service), + event: this.localName(operation.name), data: result, }; } @@ -81,7 +81,7 @@ const serializePcpFields = (pcpFields, messageType, pcpAction, pcpEvent, element let serialized = ""; if (pcpFields && typeof pcpFields === "object") { for (const fieldName in pcpFields) { - const fieldValue = stringValue(pcpFields[fieldName]); + const fieldValue = this.stringValue(pcpFields[fieldName]); if (fieldValue && fieldName.indexOf("pcp-") !== 0 && elements?.[fieldName]) { serialized += escape(fieldName) + ":" + escape(fieldValue) + "\n"; } @@ -126,17 +126,4 @@ const unescape = (escaped) => { .join("\u0008"); }; -const localName = (name, service) => { - return name.startsWith(`${service.name}.`) ? name.substring(service.name.length + 1) : name; -}; - -function stringValue(value) { - if (value instanceof Date) { - return value.toISOString(); - } else if (value instanceof Object) { - return JSON.stringify(value); - } - return String(value); -} - module.exports = PCPFormat; diff --git a/test/socketio/cloudevent_socket.io.test.js b/test/socketio/cloudevent_socket.io.test.js new file mode 100644 index 0000000..c521477 --- /dev/null +++ b/test/socketio/cloudevent_socket.io.test.js @@ -0,0 +1,125 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const { connect, disconnect, emitEvent, emitMessage, waitForEvent } = require("../_env/util/socket.io"); + +cds.test(__dirname + "/../_env"); + +cds.env.websocket = { + kind: "socket.io", + impl: null, +}; + +const cloudEvent1Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +const cloudEvent2Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +const cloudEvent3Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +const cloudEvent4Message = JSON.stringify({ + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}); + +describe("CloudEvent", () => { + let socket; + + beforeAll(async () => { + socket = await connect( + "/ws/cloudevent", + {}, + { + "Sec-WebSocket-Protocol": "cloudevents.json", + }, + ["cloudevents.json"], + ); + }); + + afterAll(async () => { + await disconnect(socket); + }); + + test("Cloud Event Protocol", async () => { + expect(socket._protocol).toEqual("cloudevents.json"); + }); + + test("Cloud event", async () => { + const waitCloudEvent1Promise = waitForEvent(socket, "cloudEvent1"); + const waitCloudEvent2Promise = waitForEvent(socket, "cloudEvent2"); + const waitCloudEvent3Promise = waitForEvent(socket, "cloudEvent3"); + const waitCloudEvent4Promise = waitForEvent(socket, "cloudEvent4"); + const result = await emitEvent(socket, "sendCloudEvent", cloudEvent1Message); + expect(result).toBe(true); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1Message); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2Message); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual(cloudEvent3Message); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual(cloudEvent4Message); + }); + + test("Cloud event format error", async () => { + const result = await emitMessage(socket, "This is not a Cloud Event message!"); + expect(result).toEqual(null); + }); +}); diff --git a/test/ws/cloudevent_ws.test.js b/test/ws/cloudevent_ws.test.js index 912a933..c761a10 100644 --- a/test/ws/cloudevent_ws.test.js +++ b/test/ws/cloudevent_ws.test.js @@ -96,7 +96,7 @@ describe("CloudEvent", () => { expect(socket._protocol).toEqual("cloudevents.json"); }); - test.skip("Event Cloud Event", async () => { + test.skip("Cloud event", async () => { const waitCloudEvent1Promise = waitForMessage(socket, "cloudEvent1"); const waitCloudEvent2Promise = waitForMessage(socket, "cloudEvent2"); const waitCloudEvent3Promise = waitForMessage(socket, "cloudEvent3"); @@ -110,7 +110,7 @@ describe("CloudEvent", () => { const waitResult3 = await waitCloudEvent3Promise; expect(waitResult3).toEqual(cloudEvent3Message); const waitResult4 = await waitCloudEvent4Promise; - expect(waitResult4).toEqual(cloudEvent3Message); + expect(waitResult4).toEqual(cloudEvent4Message); }); test("Cloud event format error", async () => { From c9ae55a607f19fbef79686aa7931a3253bd215e0 Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Mon, 30 Sep 2024 14:24:08 +0200 Subject: [PATCH 3/8] Cloud Events --- CHANGELOG.md | 3 +- README.md | 121 +++++++++- jest.config.js | 2 +- src/format/base.js | 61 +---- src/format/cloudevent.js | 100 +++----- src/format/generic.js | 252 +++++++++++++++++++++ src/format/pcp.js | 38 +++- src/index.js | 5 +- test/_env/srv/cloudevent.cds | 72 ++++-- test/_env/srv/handlers/cloudevent.js | 84 ++++--- test/_env/srv/handlers/pcp.js | 25 +- test/_env/srv/pcp.cds | 4 +- test/_env/srv/todo.cds | 4 +- test/_env/util/socket.io.js | 2 +- test/_env/util/ws.js | 5 +- test/socketio/cloudevent_socket.io.test.js | 125 +++++----- test/ws/cloudevent_ws.test.js | 127 ++++++----- 17 files changed, 717 insertions(+), 313 deletions(-) create mode 100644 src/format/generic.js diff --git a/CHANGELOG.md b/CHANGELOG.md index d60593f..4148b57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added -- Add support for cloud events +- Add support for Cloud Events - Include or exclude defined list of users - Support exclusion of event contexts - Overrule path of websocket event via `@websocket.path` or `@ws.path` for non-websocket services - Overrule format of websocket event via `@websocket.format` or `@ws.format` for non-websocket services +- Ignore event elements or operation parameters with `@websocket.ignore` or `@ws.ignore` - Optimization of client determination for kind `ws` - Allow empty PCP message in event definition - Improve documentation and examples diff --git a/README.md b/README.md index 6ffd8f9..b7dca16 100644 --- a/README.md +++ b/README.md @@ -377,7 +377,7 @@ Furthermore, also additional equivalent annotations alternatives are available: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.user: 'includeCurrent' @@ -434,7 +434,7 @@ Valid annotation values are: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.user.exclude: 'ABC' @@ -498,7 +498,7 @@ Valid annotation values are: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.context: 'ABC' @@ -642,7 +642,7 @@ Valid annotation values are: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.identifier.include: 'ABC' @@ -765,6 +765,10 @@ specification. All event annotation values (static or dynamic) and header values emit according to their kind. Values of all headers and annotations of same semantic type are unified for single and array values. +### Ignore Elements + +To ignore elements during event emit, the annotation `@websocket.ignore` or `@ws.ignore` is available on event element level. + ### WebSocket Format Per default the CDS websocket format is `json`, as CDS internally works with JSON objects. @@ -820,6 +824,109 @@ To configure the PCP message format the following annotations are available: - `@websocket.pcp.action, @ws.pcp.action: Boolean`: Expose the string value of the annotated event element as `pcp-action` field in the PCP message. Default `MESSAGE`. +#### Cloud Events + +CDS WebSocket module supports the Cloud Events specification out-of-the-box according to +[WebSockets Protocol Binding for CloudEvents](https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/websockets-protocol-binding.md). + +A Cloud Event message has the following structure: + +```json +{ + "specversion": "1.0", + "type": "com.example.someevent", + "source": "/mycontext", + "subject": null, + "id": "C234-1234-1234", + "time": "2018-04-05T17:31:00Z", + "comexampleextension1": "value", + "comexampleothervalue": 5, + "datacontenttype": "application/json", + "data": { + "appinfoA": "abc", + "appinfoB": 123, + "appinfoC": true + } +} +``` + +##### Modeling Cloud Event + +```cds +event cloudEvent { + specversion : String; + type : String; + source : String; + subject : String; + id : String; + time : String; + comexampleextension1 : String; + comexampleothervalue : String; + datacontenttype : String; + data: { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } +} +``` + +##### Mapping Cloud Event + +**Examples:** + +**Event Level:** + +```cds +@ws.cloudevent.specversion : '1.1' +@ws.cloudevent.type : 'com.example.someevent' +@ws.cloudevent.source : '/mycontext' +@ws.cloudevent.subject : 'example' +@ws.cloudevent.id : 'C234-1234-1234' +@ws.cloudevent.time : '2018-04-05T17:31:00Z' +@ws.cloudevent.comexampleextension1: 'value' +@ws.cloudevent.comexampleothervalue: 5 +@ws.cloudevent.datacontenttype : 'application/cloudevents+json' +event cloudEvent2 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; +} +``` + +Event is published only via cloud event sub-protocol, with the specified static cloud event attributes. + +**Event Element Level:** + +```cds +event cloudEvent3 { + @ws.cloudevent.specversion + specversion : String + @ws.cloudevent.type + type : String + @ws.cloudevent.source + source : String + @ws.cloudevent.subject + subject : String + @ws.cloudevent.id + id : String + @ws.cloudevent.time + time : String + @ws.cloudevent.comexampleextension1 + extension1 : String + @ws.cloudevent.comexampleothervalue + othervalue : String + @ws.cloudevent.datacontenttype + datacontenttype : String; + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; +} +``` + +Event is published only via cloud event sub-protocol, with the specified dynamic cloud event attributes derived from +CDS event elements. + #### Custom Format A custom websocket format implementation can be provided via a path relative to the project root @@ -835,6 +942,12 @@ In addition, it can implement the following functions (optional): - **constructor(service)**: Setup instance with service definition on creation +#### Generic Format + +Additionally, a custom formatter can be based on the generic implementation `format/generic.js` providing a name. +CDS annotations and header values are then derived from format name based on wildcard annotations +`@websocket..` or `@ws..`. + ### Connect & Disconnect Every time a server socket is connected via websocket client, the CDS service is notified by calling the corresponding diff --git a/jest.config.js b/jest.config.js index 18d9ac3..a026fed 100644 --- a/jest.config.js +++ b/jest.config.js @@ -12,7 +12,7 @@ module.exports = { coverageThreshold: { global: { branches: 80, - functions: 90, + functions: 95, lines: 90, statements: 90, }, diff --git a/src/format/base.js b/src/format/base.js index 184ff54..5fbd14f 100644 --- a/src/format/base.js +++ b/src/format/base.js @@ -13,7 +13,7 @@ class BaseFormat { /** * Parse the event data into internal data (JSON), i.e. `{ event, data }` * @param {String|Object} data Event data - * @returns {event: String, data: Object} Parsed data + * @returns [{event: String, data: Object}] Parsed data */ parse(data) {} @@ -25,65 +25,6 @@ class BaseFormat { * @returns {String|Object} Formatted string or a JSON object (for kind `socket.io` only) */ compose(event, data, headers) {} - - /** - * Derive value from event data via annotations - * @param {String} event Event definition - * @param {Object} data Event data - * @param {Object} headers Event headers - * @param {[String]} headersNames Header names - * @param {[String]} annotationNames Annotations names - * @param {*} [fallback] Fallback value - * @returns {*} Derived value - */ - deriveValue(event, data, headers, { headerNames, annotationNames, fallback }) { - const eventDefinition = this.service.events()[event]; - if (eventDefinition) { - if (headers) { - for (const header of headerNames || []) { - if (headers[header] !== undefined) { - return headers[header]; - } - } - } - for (const annotation of annotationNames || []) { - if (eventDefinition[annotation] !== undefined) { - return eventDefinition[annotation]; - } - } - if (data) { - const eventElements = Object.values(eventDefinition?.elements || {}); - if (eventElements.length > 0) { - for (const annotation of annotationNames) { - const eventElement = eventElements.find((element) => { - return element[annotation]; - }); - if (eventElement) { - const elementValue = data[eventElement.name]; - if (elementValue !== undefined) { - delete data[eventElement.name]; - return elementValue; - } - } - } - } - } - } - return fallback; - } - - localName(name) { - return name.startsWith(`${this.service.name}.`) ? name.substring(this.service.name.length + 1) : name; - }; - - stringValue(value) { - if (value instanceof Date) { - return value.toISOString(); - } else if (value instanceof Object) { - return JSON.stringify(value); - } - return String(value); - } } module.exports = BaseFormat; diff --git a/src/format/cloudevent.js b/src/format/cloudevent.js index d1fd172..a78d398 100644 --- a/src/format/cloudevent.js +++ b/src/format/cloudevent.js @@ -2,47 +2,29 @@ const cds = require("@sap/cds"); -const BaseFormat = require("./base"); +const GenericFormat = require("./generic"); -const LOG = cds.log("/websocket/cloudevent"); - -class CloudEventFormat extends BaseFormat { +class CloudEventFormat extends GenericFormat { constructor(service, origin) { - super(service, origin); + super(service, origin, "cloudevent", "type"); } parse(data) { - try { - const cloudEvent = data?.constructor === Object ? data : JSON.parse(data); - const result = {}; - const operation = Object.values(this.service.operations || {}).find((operation) => { - return ( - operation["@websocket.cloudevent.type"] === cloudEvent.type || - operation["@ws.cloudevent.type"] === cloudEvent.type || - operation.name === cloudEvent.type - ); - }); - if (operation) { - for (const param of operation.params) { - // TODO: Parse into data - } - return { - event: this.localName(operation.name), - data: result, - }; - } - } catch (err) { - LOG?.error(err); + data = this.deserialize(data); + const operation = this.determineOperation(data); + if (typeof data?.data === "object" && !operation?.params?.data) { + const ceData = data.data; + delete data.data; + data = { + ...data, + ...ceData, + }; } - LOG?.warn("Error parsing cloud-event format", data); - return { - event: undefined, - data: {}, - }; + return super.parse(data); } compose(event, data, headers) { - const cloudEvent = { + let cloudEvent = { specversion: "1.0", type: `${this.service.name}.${event}`, source: this.service.name, @@ -50,50 +32,22 @@ class CloudEventFormat extends BaseFormat { id: cds.utils.uuid(), time: new Date().toISOString(), datacontenttype: "application/json", - data, + data: {}, }; - const annotations = this.collectAnnotations(event); - for (const annotation of annotations) { - const value = this.deriveValue(event, data, headers, { - headerValues: [ - `cloudevent-${annotation}`, - `cloudevent_${annotation}`, - `cloudevent.${annotation}`, - `cloudevent${annotation}`, - annotation, - ], - annotationValues: [`@websocket.cloudevent.${annotation}`, `@ws.cloudevent.${annotation}`], - }); - if (value !== undefined) { - cloudEvent[annotation] = value; - } - } - return this.origin === "json" ? cloudEvent : JSON.stringify(cloudEvent); - } - - collectAnnotations(event) { - const annotations = new Set(); const eventDefinition = this.service.events()[event]; - for (const annotation in eventDefinition) { - if (annotation.startsWith("@websocket.cloudevent.")) { - annotations.add(annotation.substring("@websocket.cloudevent.".length)); - } - if (annotation.startsWith("@ws.cloudevent.")) { - annotations.add(annotation.substring("@ws.cloudevent.".length)); - } + if (eventDefinition?.elements?.data && data.data) { + cloudEvent = { + ...data, + }; + } else { + cloudEvent.data = data; } - const eventElements = Object.values(eventDefinition?.elements || {}); - for (const element of eventElements) { - for (const annotation in element) { - if (annotation.startsWith("@websocket.cloudevent.")) { - annotations.add(annotation.substring("@websocket.cloudevent.".length)); - } - if (annotation.startsWith("@ws.cloudevent.")) { - annotations.add(annotation.substring("@ws.cloudevent.".length)); - } - } - } - return annotations; + const result = super.compose(event, data, headers); + cloudEvent = { + ...cloudEvent, + ...result, + }; + return this.serialize(cloudEvent); } } diff --git a/src/format/generic.js b/src/format/generic.js new file mode 100644 index 0000000..39960cb --- /dev/null +++ b/src/format/generic.js @@ -0,0 +1,252 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const BaseFormat = require("./base"); + +class GenericFormat extends BaseFormat { + constructor(service, origin, name, identifier) { + super(service, origin); + this.name = name; + this.identifier = identifier || "name"; + this.LOG = cds.log(`/websocket/${name}`); + } + + parse(data) { + data = this.deserialize(data); + const operation = this.determineOperation(data); + if (operation) { + const annotations = this.collectAnnotations(operation.name); + // Ignore identifier annotation + annotations.delete("name"); + let mappedData = {}; + if (annotations.size > 0) { + this.mapValues(operation.name, data, mappedData, annotations); + } else { + mappedData = data; + } + const result = {}; + for (const param of operation.params) { + if (mappedData[param.name] !== undefined) { + result[param.name] = mappedData[param.name]; + } + } + return { + event: this.localName(operation.name), + data: result, + }; + } + this.LOG?.error(`Operation could not be determined`, data); + return { + event: undefined, + data: {}, + }; + } + + compose(event, data, headers) { + const result = {}; + const annotations = this.collectAnnotations(event); + for (const header in headers) { + annotations.add(header); + } + for (const annotation of annotations) { + const value = this.deriveValue(event, data, headers, { + headerNames: [ + `${this.name}-${annotation}`, + `${this.name}_${annotation}`, + `${this.name}.${annotation}`, + `${this.name}${annotation}`, + annotation, + ], + annotationNames: [`@websocket.${this.name}.${annotation}`, `@ws.${this.name}.${annotation}`], + }); + if (value !== undefined) { + result[annotation] = value; + } + } + return result; + } + + /** + * Determine operation based on event data + * @param {Object} data Event data + * @returns {Object} Service Operation + */ + determineOperation(data) { + if (!data) { + return; + } + return Object.values(this.service.operations || {}).find((operation) => { + return ( + (operation[`@websocket.${this.name}.name`] && + operation[`@websocket.${this.name}.name`] === data[this.identifier]) || + (operation[`@ws.${this.name}.name`] && operation[`@ws.${this.name}.name`] === data[this.identifier]) || + operation.name === data[this.identifier] + ); + }); + } + + /** + * Collect annotations for an CDS definition (event, operation) and CDS definition elements (elements, params) + * @param name Service definition name (event, operation) + * @returns {Set} Set of annotations + */ + collectAnnotations(name) { + const annotations = new Set(); + const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; + for (const annotation in definition) { + if (annotation.startsWith(`@websocket.${this.name}.`)) { + annotations.add(annotation.substring(`@websocket.${this.name}.`.length)); + } + if (annotation.startsWith(`@ws.${this.name}.`)) { + annotations.add(annotation.substring(`@ws.${this.name}.`.length)); + } + } + const elements = Object.values(definition?.elements || definition?.params || {}); + for (const element of elements) { + for (const annotation in element) { + if (annotation.startsWith(`@websocket.${this.name}.`)) { + annotations.add(annotation.substring(`@websocket.${this.name}.`.length)); + } + if (annotation.startsWith(`@ws.${this.name}.`)) { + annotations.add(annotation.substring(`@ws.${this.name}.`.length)); + } + } + } + return annotations; + } + + /** + * Derive value from data, headers and fallback using header names and annotation names + * @param {String} name Definition name (event, operation) + * @param {Object} data Data + * @param {Object} [headers] Header data + * @param {[String]} [headerNames] Header names to derive value from + * @param {[String]} [annotationNames] Annotation names to derived values from + * @param {*} [fallback] Fallback value + * @returns {*} Derived value + */ + deriveValue(name, data, headers, { headerNames, annotationNames, fallback }) { + if (headers) { + for (const header of headerNames || []) { + if (headers[header] !== undefined) { + return headers[header]; + } + } + } + const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; + if (definition) { + for (const annotation of annotationNames || []) { + if (definition[annotation] !== undefined) { + return definition[annotation]; + } + } + if (data) { + const elements = Object.values(definition?.elements || definition?.params || {}); + for (const annotation of annotationNames || []) { + const element = elements.find((element) => { + return element[annotation] && !(element["@websocket.ignore"] || element["@ws.ignore"]); + }); + if (element) { + const elementValue = data[element.name]; + if (elementValue !== undefined) { + delete data[element.name]; + return elementValue; + } + } + } + } + } + return fallback; + } + + /** + * Derive annotation value from datausing annotation names + * @param {String} name Definition name (event, operation) + * @param {Object} data Data + * @param {Object} mappedData Data to be mapped into + * @param {[String]} [localAnnotationNames] Local annotation names to be mapped + * @returns {*} Derived value + */ + mapValues(name, data, mappedData, localAnnotationNames) { + data ??= {}; + const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; + if (definition) { + for (const localAnnotation of localAnnotationNames || []) { + for (const annotation of [ + `@websocket.${this.name}.${localAnnotation}`, + `@ws.${this.name}.${localAnnotation}`, + ]) { + if (definition[annotation] !== undefined) { + data[localAnnotation] = definition[annotation]; + } + } + } + const elements = Object.values(definition?.elements || definition?.params || {}); + for (const element of elements) { + if (element["@websocket.ignore"] || element["@ws.ignore"]) { + continue; + } + let mapped = false; + for (const localAnnotation of localAnnotationNames || []) { + if (mapped) { + break; + } + for (const annotation of [ + `@websocket.${this.name}.${localAnnotation}`, + `@ws.${this.name}.${localAnnotation}`, + ]) { + if (!element[annotation]) { + continue; + } + if (data[localAnnotation] !== undefined) { + mappedData[element.name] = data[localAnnotation]; + mapped = true; + break; + } + } + } + if (!mapped) { + mappedData[element.name] = data[element.name]; + } + } + } + } + + /** + * Get local name of a definition name (event, operation) + * @param {String} name Service definition name + * @returns {String} Local name of the definition + */ + localName(name) { + return name.startsWith(`${this.service.name}.`) ? name.substring(this.service.name.length + 1) : name; + } + + /** + * Deserialize data + * @param {String|Object} data Data + * @returns {Object} Deserialized data + */ + deserialize(data) { + if (data === undefined) { + return; + } + try { + return data?.constructor === Object ? data : JSON.parse(data); + } catch (err) { + this.LOG?.error(err); + this.LOG?.error(`Error parsing ${this.name} format`, data); + } + } + + /** + * Serialize data based on format origin + * @param {String|Object} data Data + * @returns {String|Object} Serialized data + */ + serialize(data) { + return this.origin === "json" ? data : JSON.stringify(data); + } +} + +module.exports = GenericFormat; diff --git a/src/format/pcp.js b/src/format/pcp.js index d10db94..8c17534 100644 --- a/src/format/pcp.js +++ b/src/format/pcp.js @@ -2,7 +2,7 @@ const cds = require("@sap/cds"); -const BaseFormat = require("./base"); +const GenericFormat = require("./generic"); const DESERIALIZE_REGEX = /((?:[^:\\]|(?:\\.))+):((?:[^:\\\n]|(?:\\.))*)/; const MESSAGE = "MESSAGE"; @@ -10,7 +10,7 @@ const SEPARATOR = "\n\n"; const LOG = cds.log("/websocket/pcp"); -class PCPFormat extends BaseFormat { +class PCPFormat extends GenericFormat { constructor(service, origin) { super(service, origin); } @@ -23,18 +23,23 @@ class PCPFormat extends BaseFormat { } if (splitPos !== -1) { const result = {}; + const message = data.substring(splitPos + SEPARATOR.length); const pcpFields = extractPcpFields(data.substring(0, splitPos)); const operation = Object.values(this.service.operations || {}).find((operation) => { return ( - operation["@websocket.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE) || - operation["@ws.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE) || + (operation["@websocket.pcp.action"] && + operation["@websocket.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE)) || + (operation["@ws.pcp.action"] && operation["@ws.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE)) || operation.name === (pcpFields["pcp-action"] || MESSAGE) ); }); if (operation) { for (const param of operation.params) { + if (param["@websocket.ignore"] || param["@ws.ignore"]) { + continue; + } if (param["@websocket.pcp.message"] || param["@ws.pcp.message"]) { - result[param.name] = data.substring(splitPos + SEPARATOR.length); + result[param.name] = message; } else if (pcpFields[param.name] !== undefined) { result[param.name] = pcpFields[param.name]; } @@ -45,7 +50,7 @@ class PCPFormat extends BaseFormat { }; } } - LOG?.warn("Error parsing pcp format", data); + LOG?.error("Error parsing pcp format", data); return { event: undefined, data: {}, @@ -81,9 +86,15 @@ const serializePcpFields = (pcpFields, messageType, pcpAction, pcpEvent, element let serialized = ""; if (pcpFields && typeof pcpFields === "object") { for (const fieldName in pcpFields) { - const fieldValue = this.stringValue(pcpFields[fieldName]); - if (fieldValue && fieldName.indexOf("pcp-") !== 0 && elements?.[fieldName]) { - serialized += escape(fieldName) + ":" + escape(fieldValue) + "\n"; + const fieldValue = stringValue(pcpFields[fieldName]); + const element = elements?.[fieldName]; + if (element) { + if (element["@websocket.ignore"] || element["@ws.ignore"]) { + continue; + } + if (fieldValue && fieldName.indexOf("pcp-") !== 0) { + serialized += escape(fieldName) + ":" + escape(fieldValue) + "\n"; + } } } } @@ -126,4 +137,13 @@ const unescape = (escaped) => { .join("\u0008"); }; +const stringValue = (value) => { + if (value instanceof Date) { + return value.toISOString(); + } else if (value instanceof Object) { + return JSON.stringify(value); + } + return String(value); +}; + module.exports = PCPFormat; diff --git a/src/index.js b/src/index.js index 140c033..dd3837e 100644 --- a/src/index.js +++ b/src/index.js @@ -71,7 +71,7 @@ function serveWebSocketServer(options) { return interableObject(); }, _events: interableObject(), - events: function() { + events: function () { return this._events; }, on: service.on.bind(service), @@ -553,6 +553,9 @@ function deriveValues( if (event.elements) { for (const name in event.elements) { const element = event.elements[name]; + if (element["@websocket.ignore"] || element["@ws.ignore"]) { + continue; + } const annotationValue = element[annotationName]; if (annotationExcludeValues?.includes(annotationValue)) { continue; diff --git a/test/_env/srv/cloudevent.cds b/test/_env/srv/cloudevent.cds index ec9ea1c..fc15521 100644 --- a/test/_env/srv/cloudevent.cds +++ b/test/_env/srv/cloudevent.cds @@ -3,20 +3,33 @@ @path : 'cloudevent' service CloudEventService { - @ws.cloudevent.type: 'com.example.someevent' - action sendCloudEvent( - @ws.cloudevent.specversion specversion : String, - @ws.cloudevent.type type : String, - @ws.cloudevent.source source : String, - @ws.cloudevent.subject subject : String, - @ws.cloudevent.id id : String, - @ws.cloudevent.time time : String, - @ws.cloudevent.comexampleextension1 comexampleextension1 : String, - @ws.cloudevent.comexampleothervalue comexampleothervalue : String, - @ws.cloudevent.datacontenttype datacontenttype : String, + type CloudEventDataType : { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + @ws.ignore + appinfoD : String; + }; + + @ws.cloudevent.name: 'com.example.someevent.model' + action sendCloudEventModel( specversion : String, type : String, source : String, subject : String, id : String, time : String, comexampleextension1 : String, comexampleothervalue : Integer, datacontenttype : String, data: CloudEventDataType) returns Boolean; + + @ws.cloudevent.name: 'com.example.someevent.map' + @ws.cloudevent.subject: 'cloud-example' + action sendCloudEventMap( + @ws.cloudevent.specversion _specversion : String, + @ws.cloudevent.type _type : String, + @ws.cloudevent.source _source : String, + @ws.cloudevent.subject _subject : String, + @ws.cloudevent.id _id : String, + @ws.cloudevent.time _time : String, + @ws.cloudevent.comexampleextension1 _comexampleextension1 : String, + @ws.cloudevent.comexampleothervalue _comexampleothervalue : Integer, + @ws.cloudevent.datacontenttype _datacontenttype : String, appinfoA : String, appinfoB : Integer, - appinfoC : Boolean) returns Boolean; + appinfoC : Boolean, + @ws.ignore appinfoD : String) returns Boolean; event cloudEvent1 { appinfoA : String; @@ -24,8 +37,8 @@ service CloudEventService { appinfoC : Boolean; } - @ws.cloudevent.specversion : '1.1' - @ws.cloudevent.type : 'com.example.someevent' + @websocket.cloudevent.specversion : '1.1' + @ws.cloudevent.type : 'com.example.someevent.cloudEvent2' @ws.cloudevent.source : '/mycontext' @ws.cloudevent.subject : 'example' @ws.cloudevent.id : 'C234-1234-1234' @@ -40,7 +53,9 @@ service CloudEventService { } event cloudEvent3 { - @ws.cloudevent.specversion + @websocket.ignore + skipValue : String; + @websocket.cloudevent.specversion specversion : String; @ws.cloudevent.type @@ -62,7 +77,7 @@ service CloudEventService { extension1 : String; @ws.cloudevent.comexampleothervalue - othervalue : String; + othervalue : Integer; @ws.cloudevent.datacontenttype datacontenttype : String; @@ -70,4 +85,27 @@ service CloudEventService { appinfoB : Integer; appinfoC : Boolean; } -} + + event cloudEvent4 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + + event cloudEvent5 { + specversion : String; + type : String; + source : String; + subject : String; + id : String; + time : String; + comexampleextension1 : String; + comexampleothervalue : Integer; + datacontenttype : String; + data: { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + } +} \ No newline at end of file diff --git a/test/_env/srv/handlers/cloudevent.js b/test/_env/srv/handlers/cloudevent.js index 6eb0a10..07a1e6a 100644 --- a/test/_env/srv/handlers/cloudevent.js +++ b/test/_env/srv/handlers/cloudevent.js @@ -1,47 +1,75 @@ "use strict"; module.exports = (srv) => { - srv.on("sendCloudEvent", async (req) => { + srv.on(["sendCloudEventModel", "sendCloudEventMap"], async (req) => { + const appinfoA = (req.data.appinfoA ?? req.data.data?.appinfoA ?? "abc") + "d"; + const appinfoB = (req.data.appinfoB ?? req.data.data?.appinfoB ?? 123) + 1111; + const appinfoC = (req.data.appinfoC ?? req.data.data?.appinfoC ?? true) !== true; + await srv.emit("cloudEvent1", { - appinfoA: req.data.appinfoA || "abc", - appinfoB: req.data.appinfoB || 123, - appinfoC: req.data.appinfoC || true, + appinfoA, + appinfoB, + appinfoC, }); + await srv.emit("cloudEvent2", { - appinfoA: req.data.appinfoA || "abc", - appinfoB: req.data.appinfoB || 123, - appinfoC: req.data.appinfoC || true, + appinfoA, + appinfoB, + appinfoC, }); + await srv.emit("cloudEvent3", { specversion: "1.1", - type: "com.example.someevent", + type: "com.example.someevent.cloudEvent3", source: "/mycontext", - subject: "example", + subject: req.data._subject || "example", id: "C234-1234-1234", time: "2018-04-05T17:31:00Z", extension1: "value", othervalue: 5, datacontenttype: "application/cloudevents+json", - appinfoA: req.data.appinfoA || "abc", - appinfoB: req.data.appinfoB || 123, - appinfoC: req.data.appinfoC || true, + appinfoA, + appinfoB, + appinfoC, }); - await srv.emit("cloudEvent4", { - appinfoA: req.data.appinfoA || "abc", - appinfoB: req.data.appinfoB || 123, - appinfoC: req.data.appinfoC || true, - }, { - ws: { - specversion: "1.1", - type: "com.example.someevent", - source: "/mycontext", - subject: "example", - id: "C234-1234-1234", - time: "2018-04-05T17:31:00Z", - extension1: "value", - othervalue: 5, - datacontenttype: "application/cloudevents+json", - } + + await srv.emit( + "cloudEvent4", + { + appinfoA, + appinfoB, + appinfoC, + }, + { + ws: { + specversion: "1.1", + type: "com.example.someevent.cloudEvent4", + source: "/mycontext", + subject: req.data._subject || "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/cloudevents+json", + }, + }, + ); + + await srv.emit("cloudEvent5", { + specversion: "1.1", + type: "com.example.someevent.cloudEvent5", + source: "/mycontext", + subject: req.data._subject || "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/cloudevents+json", + data: { + appinfoA, + appinfoB, + appinfoC, + }, }); return true; }); diff --git a/test/_env/srv/handlers/pcp.js b/test/_env/srv/handlers/pcp.js index c2c4d14..4caf125 100644 --- a/test/_env/srv/handlers/pcp.js +++ b/test/_env/srv/handlers/pcp.js @@ -17,17 +17,22 @@ module.exports = (srv) => { field1: req.data.field1 || "value1", field2: req.data.field2 || "value2", }); - await srv.emit("notification4", { - message: "no body!", - action: "MESSAGE", - field1: req.data.field1 || "value1", - field2: req.data.field2 || "value2", - }, { - ws: { - pcpaction: "ABC", - pcpmessage: "Header", + await srv.emit( + "notification4", + { + message: "no body!", + action: "MESSAGE", + field1: req.data.field1 || "value1", + field2: req.data.field2 || "value2", + field3: "ignore", }, - }); + { + ws: { + pcpaction: "ABC", + pcpmessage: "Header", + }, + }, + ); return true; }); }; diff --git a/test/_env/srv/pcp.cds b/test/_env/srv/pcp.cds index af11f8b..80aa58e 100644 --- a/test/_env/srv/pcp.cds +++ b/test/_env/srv/pcp.cds @@ -4,7 +4,7 @@ service PCPService { @ws.pcp.action: 'MESSAGE' - action sendNotification(@ws.pcp.message message: String, field1: String, field2: String, ![pcp-action]: String) returns Boolean; + action sendNotification(@ws.pcp.message message: String, field1: String, field2: String, @ws.ignore field3: String, ![pcp-action]: String) returns Boolean; @ws.pcp.event @ws.pcp.message: 'this is the body!' @@ -36,5 +36,7 @@ service PCPService { action: String; field1: String; field2: String; + @ws.ignore + field3: String; } } \ No newline at end of file diff --git a/test/_env/srv/todo.cds b/test/_env/srv/todo.cds index 53273ff..6bb088e 100644 --- a/test/_env/srv/todo.cds +++ b/test/_env/srv/todo.cds @@ -20,7 +20,7 @@ service TodoService { @ws event refresh { - ID : String + ID : String; }; @ws @@ -29,6 +29,6 @@ service TodoService { @ws.format : 'pcp' @ws.path : 'fns-websocket' event notify { - text : String + text : String; }; } diff --git a/test/_env/util/socket.io.js b/test/_env/util/socket.io.js index 344f235..6d156b3 100644 --- a/test/_env/util/socket.io.js +++ b/test/_env/util/socket.io.js @@ -11,7 +11,7 @@ async function connect(service, options = {}, headers) { path: "/ws", extraHeaders: { authorization: options?.authorization || auth.alice, - ...headers + ...headers, }, }); cds.io.of(service).once("connection", (serverSocket) => { diff --git a/test/_env/util/ws.js b/test/_env/util/ws.js index 5b1bce8..3d0b631 100644 --- a/test/_env/util/ws.js +++ b/test/_env/util/ws.js @@ -11,7 +11,7 @@ async function connect(service, options = {}, headers = {}, protoocls) { const socket = new WebSocket(`ws://localhost:${port}` + service, protoocls, { headers: { authorization: options?.authorization || auth.alice, - ...headers + ...headers, }, }); cds.wss.once("connection", async (serverSocket) => { @@ -82,12 +82,13 @@ async function waitForNoEvent(socket, event, timeout = 100) { }); } -async function waitForMessage(socket, event, cb) { +async function waitForMessage(socket, event, cb, parse) { _initListeners(socket); return new Promise((resolve) => { socket._listeners.push((message) => { message = message.toString(); if (message.includes(event)) { + message = parse ? JSON.parse(message) : message; resolve(message); cb && cb(message); } diff --git a/test/socketio/cloudevent_socket.io.test.js b/test/socketio/cloudevent_socket.io.test.js index c521477..9e12540 100644 --- a/test/socketio/cloudevent_socket.io.test.js +++ b/test/socketio/cloudevent_socket.io.test.js @@ -11,7 +11,7 @@ cds.env.websocket = { impl: null, }; -const cloudEvent1Message = JSON.stringify({ +const cloudEvent = { specversion: "1.0", type: "com.example.someevent", source: "/mycontext", @@ -26,70 +26,69 @@ const cloudEvent1Message = JSON.stringify({ appinfoB: 123, appinfoC: true, }, -}); +}; -const cloudEvent2Message = JSON.stringify({ - specversion: "1.0", - type: "com.example.someevent", - source: "/mycontext", - subject: null, - id: "C234-1234-1234", - time: "2018-04-05T17:31:00Z", - comexampleextension1: "value", - comexampleothervalue: 5, - datacontenttype: "application/json", - data: { - appinfoA: "abc", - appinfoB: 123, - appinfoC: true, - }, -}); +const cloudEventModel = { + ...cloudEvent, + type: "com.example.someevent.model", +}; + +const cloudEventMap = { + ...cloudEvent, + type: "com.example.someevent.map", +}; -const cloudEvent3Message = JSON.stringify({ +const cloudEvent1 = { specversion: "1.0", - type: "com.example.someevent", - source: "/mycontext", + type: "CloudEventService.cloudEvent1", + source: "CloudEventService", subject: null, - id: "C234-1234-1234", - time: "2018-04-05T17:31:00Z", - comexampleextension1: "value", - comexampleothervalue: 5, + id: expect.any(String), + time: expect.any(String), datacontenttype: "application/json", data: { - appinfoA: "abc", - appinfoB: 123, - appinfoC: true, + appinfoA: "abcd", + appinfoB: 1234, + appinfoC: false, }, -}); +}; -const cloudEvent4Message = JSON.stringify({ - specversion: "1.0", - type: "com.example.someevent", +const cloudEvent2 = { + ...cloudEvent1, + specversion: "1.1", + type: "com.example.someevent.cloudEvent2", source: "/mycontext", - subject: null, - id: "C234-1234-1234", - time: "2018-04-05T17:31:00Z", + subject: "example", comexampleextension1: "value", comexampleothervalue: 5, - datacontenttype: "application/json", - data: { - appinfoA: "abc", - appinfoB: 123, - appinfoC: true, - }, -}); + datacontenttype: "application/cloudevents+json", +}; + +const cloudEvent3 = { + ...cloudEvent2, + type: "com.example.someevent.cloudEvent3", +}; + +const cloudEvent4 = { + ...cloudEvent3, + type: "com.example.someevent.cloudEvent4", +}; + +const cloudEvent5 = { + ...cloudEvent4, + type: "com.example.someevent.cloudEvent5", +}; describe("CloudEvent", () => { let socket; beforeAll(async () => { socket = await connect( - "/ws/cloudevent", + "cloudevent", {}, { "Sec-WebSocket-Protocol": "cloudevents.json", }, - ["cloudevents.json"], ); }); @@ -98,24 +97,48 @@ describe("CloudEvent", () => { }); test("Cloud Event Protocol", async () => { - expect(socket._protocol).toEqual("cloudevents.json"); + expect(socket._opts.extraHeaders["Sec-WebSocket-Protocol"]).toEqual("cloudevents.json"); + expect(socket.serverSocket.handshake.headers["sec-websocket-protocol"]).toEqual("cloudevents.json"); + }); + + test("Cloud event (modeling)", async () => { + const waitCloudEvent1Promise = waitForEvent(socket, "cloudEvent1"); + const waitCloudEvent2Promise = waitForEvent(socket, "cloudEvent2"); + const waitCloudEvent3Promise = waitForEvent(socket, "cloudEvent3"); + const waitCloudEvent4Promise = waitForEvent(socket, "cloudEvent4"); + const waitCloudEvent5Promise = waitForEvent(socket, "cloudEvent5"); + const result = await emitEvent(socket, "sendCloudEventModel", cloudEventModel); + expect(result).toBe(true); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual(cloudEvent3); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual(cloudEvent4); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual(cloudEvent5); }); - test("Cloud event", async () => { + test("Cloud event (mapping)", async () => { const waitCloudEvent1Promise = waitForEvent(socket, "cloudEvent1"); const waitCloudEvent2Promise = waitForEvent(socket, "cloudEvent2"); const waitCloudEvent3Promise = waitForEvent(socket, "cloudEvent3"); const waitCloudEvent4Promise = waitForEvent(socket, "cloudEvent4"); - const result = await emitEvent(socket, "sendCloudEvent", cloudEvent1Message); + const waitCloudEvent5Promise = waitForEvent(socket, "cloudEvent5"); + const result = await emitEvent(socket, "sendCloudEventMap", cloudEventMap); expect(result).toBe(true); const waitResult1 = await waitCloudEvent1Promise; - expect(waitResult1).toEqual(cloudEvent1Message); + expect(waitResult1).toEqual(cloudEvent1); const waitResult2 = await waitCloudEvent2Promise; - expect(waitResult2).toEqual(cloudEvent2Message); + expect(waitResult2).toEqual(cloudEvent2); const waitResult3 = await waitCloudEvent3Promise; - expect(waitResult3).toEqual(cloudEvent3Message); + expect(waitResult3).toEqual({ ...cloudEvent3, subject: "cloud-example" }); const waitResult4 = await waitCloudEvent4Promise; - expect(waitResult4).toEqual(cloudEvent4Message); + expect(waitResult4).toEqual({ ...cloudEvent4, subject: "cloud-example" }); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual({ ...cloudEvent5, subject: "cloud-example" }); }); test("Cloud event format error", async () => { diff --git a/test/ws/cloudevent_ws.test.js b/test/ws/cloudevent_ws.test.js index c761a10..3aaf0d3 100644 --- a/test/ws/cloudevent_ws.test.js +++ b/test/ws/cloudevent_ws.test.js @@ -6,7 +6,7 @@ const { connect, disconnect, emitMessage, waitForMessage } = require("../_env/ut cds.test(__dirname + "/../_env"); -const cloudEvent1Message = JSON.stringify({ +const cloudEvent = { specversion: "1.0", type: "com.example.someevent", source: "/mycontext", @@ -21,58 +21,58 @@ const cloudEvent1Message = JSON.stringify({ appinfoB: 123, appinfoC: true, }, -}); +}; -const cloudEvent2Message = JSON.stringify({ - specversion: "1.0", - type: "com.example.someevent", - source: "/mycontext", - subject: null, - id: "C234-1234-1234", - time: "2018-04-05T17:31:00Z", - comexampleextension1: "value", - comexampleothervalue: 5, - datacontenttype: "application/json", - data: { - appinfoA: "abc", - appinfoB: 123, - appinfoC: true, - }, -}); +const cloudEventModel = { + ...cloudEvent, + type: "com.example.someevent.model", +}; -const cloudEvent3Message = JSON.stringify({ +const cloudEventMap = { + ...cloudEvent, + type: "com.example.someevent.map", +}; + +const cloudEvent1 = { specversion: "1.0", - type: "com.example.someevent", - source: "/mycontext", + type: "CloudEventService.cloudEvent1", + source: "CloudEventService", subject: null, - id: "C234-1234-1234", - time: "2018-04-05T17:31:00Z", - comexampleextension1: "value", - comexampleothervalue: 5, + id: expect.any(String), + time: expect.any(String), datacontenttype: "application/json", data: { - appinfoA: "abc", - appinfoB: 123, - appinfoC: true, + appinfoA: "abcd", + appinfoB: 1234, + appinfoC: false, }, -}); +}; -const cloudEvent4Message = JSON.stringify({ - specversion: "1.0", - type: "com.example.someevent", +const cloudEvent2 = { + ...cloudEvent1, + specversion: "1.1", + type: "com.example.someevent.cloudEvent2", source: "/mycontext", - subject: null, - id: "C234-1234-1234", - time: "2018-04-05T17:31:00Z", + subject: "example", comexampleextension1: "value", comexampleothervalue: 5, - datacontenttype: "application/json", - data: { - appinfoA: "abc", - appinfoB: 123, - appinfoC: true, - }, -}); + datacontenttype: "application/cloudevents+json", +}; + +const cloudEvent3 = { + ...cloudEvent2, + type: "com.example.someevent.cloudEvent3", +}; + +const cloudEvent4 = { + ...cloudEvent3, + type: "com.example.someevent.cloudEvent4", +}; + +const cloudEvent5 = { + ...cloudEvent4, + type: "com.example.someevent.cloudEvent5", +}; describe("CloudEvent", () => { let socket; @@ -96,21 +96,44 @@ describe("CloudEvent", () => { expect(socket._protocol).toEqual("cloudevents.json"); }); - test.skip("Cloud event", async () => { - const waitCloudEvent1Promise = waitForMessage(socket, "cloudEvent1"); - const waitCloudEvent2Promise = waitForMessage(socket, "cloudEvent2"); - const waitCloudEvent3Promise = waitForMessage(socket, "cloudEvent3"); - const waitCloudEvent4Promise = waitForMessage(socket, "cloudEvent4"); - const result = await emitMessage(socket, cloudEvent1Message); + test("Cloud event (modeling)", async () => { + const waitCloudEvent1Promise = waitForMessage(socket, "cloudEvent1", null, true); + const waitCloudEvent2Promise = waitForMessage(socket, "cloudEvent2", null, true); + const waitCloudEvent3Promise = waitForMessage(socket, "cloudEvent3", null, true); + const waitCloudEvent4Promise = waitForMessage(socket, "cloudEvent4", null, true); + const waitCloudEvent5Promise = waitForMessage(socket, "cloudEvent5", null, true); + const result = await emitMessage(socket, JSON.stringify(cloudEventModel)); + expect(result).toBeNull(); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual(cloudEvent3); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual(cloudEvent4); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual(cloudEvent5); + }); + + test("Cloud event (mapping)", async () => { + const waitCloudEvent1Promise = waitForMessage(socket, "cloudEvent1", null, true); + const waitCloudEvent2Promise = waitForMessage(socket, "cloudEvent2", null, true); + const waitCloudEvent3Promise = waitForMessage(socket, "cloudEvent3", null, true); + const waitCloudEvent4Promise = waitForMessage(socket, "cloudEvent4", null, true); + const waitCloudEvent5Promise = waitForMessage(socket, "cloudEvent5", null, true); + const result = await emitMessage(socket, JSON.stringify(cloudEventMap)); expect(result).toBeNull(); const waitResult1 = await waitCloudEvent1Promise; - expect(waitResult1).toEqual(cloudEvent1Message); + expect(waitResult1).toEqual(cloudEvent1); const waitResult2 = await waitCloudEvent2Promise; - expect(waitResult2).toEqual(cloudEvent2Message); + expect(waitResult2).toEqual(cloudEvent2); const waitResult3 = await waitCloudEvent3Promise; - expect(waitResult3).toEqual(cloudEvent3Message); + expect(waitResult3).toEqual({ ...cloudEvent3, subject: "cloud-example" }); const waitResult4 = await waitCloudEvent4Promise; - expect(waitResult4).toEqual(cloudEvent4Message); + expect(waitResult4).toEqual({ ...cloudEvent4, subject: "cloud-example" }); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual({ ...cloudEvent5, subject: "cloud-example" }); }); test("Cloud event format error", async () => { From 210c8d63953780f99674303d6dee2626bbe8e682 Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Mon, 30 Sep 2024 14:35:31 +0200 Subject: [PATCH 4/8] Coverage --- src/format/generic.js | 54 ++++++++++++++++++++++++------------------- src/format/pcp.js | 2 +- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/format/generic.js b/src/format/generic.js index 39960cb..7fcb36e 100644 --- a/src/format/generic.js +++ b/src/format/generic.js @@ -17,7 +17,7 @@ class GenericFormat extends BaseFormat { const operation = this.determineOperation(data); if (operation) { const annotations = this.collectAnnotations(operation.name); - // Ignore identifier annotation + // Ignore name annotation corresponding to the identifier annotations.delete("name"); let mappedData = {}; if (annotations.size > 0) { @@ -76,7 +76,7 @@ class GenericFormat extends BaseFormat { if (!data) { return; } - return Object.values(this.service.operations || {}).find((operation) => { + return Object.values(this.service.operations).find((operation) => { return ( (operation[`@websocket.${this.name}.name`] && operation[`@websocket.${this.name}.name`] === data[this.identifier]) || @@ -102,14 +102,16 @@ class GenericFormat extends BaseFormat { annotations.add(annotation.substring(`@ws.${this.name}.`.length)); } } - const elements = Object.values(definition?.elements || definition?.params || {}); - for (const element of elements) { - for (const annotation in element) { - if (annotation.startsWith(`@websocket.${this.name}.`)) { - annotations.add(annotation.substring(`@websocket.${this.name}.`.length)); - } - if (annotation.startsWith(`@ws.${this.name}.`)) { - annotations.add(annotation.substring(`@ws.${this.name}.`.length)); + if (definition?.elements || definition?.params) { + const elements = Object.values(definition?.elements || definition?.params); + for (const element of elements) { + for (const annotation in element) { + if (annotation.startsWith(`@websocket.${this.name}.`)) { + annotations.add(annotation.substring(`@websocket.${this.name}.`.length)); + } + if (annotation.startsWith(`@ws.${this.name}.`)) { + annotations.add(annotation.substring(`@ws.${this.name}.`.length)); + } } } } @@ -127,8 +129,8 @@ class GenericFormat extends BaseFormat { * @returns {*} Derived value */ deriveValue(name, data, headers, { headerNames, annotationNames, fallback }) { - if (headers) { - for (const header of headerNames || []) { + if (headers && headerNames) { + for (const header of headerNames) { if (headers[header] !== undefined) { return headers[header]; } @@ -136,22 +138,26 @@ class GenericFormat extends BaseFormat { } const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; if (definition) { - for (const annotation of annotationNames || []) { - if (definition[annotation] !== undefined) { - return definition[annotation]; + if (annotationNames) { + for (const annotation of annotationNames) { + if (definition[annotation] !== undefined) { + return definition[annotation]; + } } } if (data) { const elements = Object.values(definition?.elements || definition?.params || {}); - for (const annotation of annotationNames || []) { - const element = elements.find((element) => { - return element[annotation] && !(element["@websocket.ignore"] || element["@ws.ignore"]); - }); - if (element) { - const elementValue = data[element.name]; - if (elementValue !== undefined) { - delete data[element.name]; - return elementValue; + for (const annotation of annotationNames) { + if (annotationNames) { + const element = elements.find((element) => { + return element[annotation] && !(element["@websocket.ignore"] || element["@ws.ignore"]); + }); + if (element) { + const elementValue = data[element.name]; + if (elementValue !== undefined) { + delete data[element.name]; + return elementValue; + } } } } diff --git a/src/format/pcp.js b/src/format/pcp.js index 8c17534..7bfff6f 100644 --- a/src/format/pcp.js +++ b/src/format/pcp.js @@ -25,7 +25,7 @@ class PCPFormat extends GenericFormat { const result = {}; const message = data.substring(splitPos + SEPARATOR.length); const pcpFields = extractPcpFields(data.substring(0, splitPos)); - const operation = Object.values(this.service.operations || {}).find((operation) => { + const operation = Object.values(this.service.operations).find((operation) => { return ( (operation["@websocket.pcp.action"] && operation["@websocket.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE)) || From da363a50d82d9813f01513fffdb09d16031c562b Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Mon, 30 Sep 2024 17:57:52 +0200 Subject: [PATCH 5/8] Docs --- README.md | 105 ++++++++++++++++++++++++++++++--- src/format/generic.js | 8 +-- src/index.js | 3 +- test/_env/srv/handlers/chat.js | 6 +- test/ws/redis_ws.test.js | 2 +- 5 files changed, 109 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index b7dca16..cb8251f 100644 --- a/README.md +++ b/README.md @@ -765,6 +765,25 @@ specification. All event annotation values (static or dynamic) and header values emit according to their kind. Values of all headers and annotations of same semantic type are unified for single and array values. +#### Format Headers + +In addition to the above event emit headers, format specific event headers can be specified in the `websocket` or `ws` section +during event emit. + +```js +await srv.emit("customEvent", { ... }, { + ws: { + a: 1 + }, + websocket: { + b: "c" + } +}); +``` + +These headers are made available to the format `compose(event, data, headers)` function, to be included in the +composed WebSocket message, if applicable (e.g. format: `pcp`, `cloudevent`). + ### Ignore Elements To ignore elements during event emit, the annotation `@websocket.ignore` or `@ws.ignore` is available on event element level. @@ -850,8 +869,15 @@ A Cloud Event message has the following structure: } ``` +To create a cloud-event compatible CDS event, either the event is modeled as CDS service event according to the specification +or a CDS event is mapped via annotations to a cloud-event compatible event. + ##### Modeling Cloud Event +Cloud event can be explicitly modelled as CDS event, matching the specification of cloud event attributes. + +**Example:** + ```cds event cloudEvent { specversion : String; @@ -871,14 +897,61 @@ event cloudEvent { } ``` +The CDS event `cloudEvent` is explicitly modeled according to the cloud-event specification. +The event data is passed inbound and outbound in the exact same representation as JSON object, as specified. +No additional annotation is necessary to be defined. + ##### Mapping Cloud Event +CDS events can also be mapped to cloud-event compatible events via headers and CDS annotations. The implementation is based on the +`generic` formatter (see section below), that allows to map CDS events to cloud-event compatible events based on +cloud event specific headers and wildcard annotations, starting with `@websocket.cloudevent.` or `@ws.cloudevent.` +to match the cloud-event specific attributes. + +The provided header values in the `websocket` or `ws` section are mapped to the cloud event attributes generically, if available. + +**Example:** + +```js +await srv.emit("cloudEvent", + { + appinfoA, + appinfoB, + appinfoC, + }, + { + ws: { + specversion: "1.0", + type: "com.example.someevent.cloudEvent4", + source: "/mycontext", + subject: req.data._subject || "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + }, + }, +); +``` + +Subsequently, the following annotations are respected: + +- **Event level**: + - `@websocket.cloudevent.: ` + - Type: `any` (according to Cloud Event JSON format) + - Provide static cloud event attribute value, according to cloud event specification +- **Event element level**: + - `@websocket.cloudevent.` + - Type: `Boolean` + - Value from event data for the annotated element is used as dynamic cloud event attribute value, according to cloud event attribute specification + **Examples:** **Event Level:** ```cds -@ws.cloudevent.specversion : '1.1' +@ws.cloudevent.specversion : '1.0' @ws.cloudevent.type : 'com.example.someevent' @ws.cloudevent.source : '/mycontext' @ws.cloudevent.subject : 'example' @@ -886,7 +959,7 @@ event cloudEvent { @ws.cloudevent.time : '2018-04-05T17:31:00Z' @ws.cloudevent.comexampleextension1: 'value' @ws.cloudevent.comexampleothervalue: 5 -@ws.cloudevent.datacontenttype : 'application/cloudevents+json' +@ws.cloudevent.datacontenttype : 'application/json' event cloudEvent2 { appinfoA : String; appinfoB : Integer; @@ -894,7 +967,8 @@ event cloudEvent2 { } ``` -Event is published only via cloud event sub-protocol, with the specified static cloud event attributes. +Event is published via cloud event sub-protocol, with the specified static cloud event attributes. The event data is +consumed as cloud event data section. **Event Element Level:** @@ -924,8 +998,11 @@ event cloudEvent3 { } ``` -Event is published only via cloud event sub-protocol, with the specified dynamic cloud event attributes derived from -CDS event elements. +Event is published via cloud event sub-protocol, with the specified dynamic cloud event attributes derived from +CDS event elements. Annotated elements are consumed as cloud event attributes, non-annotated elements are consumed as +cloud event data section. + +Static and dynamic annotations can be combined. Dynamnic values are overwritten by static values, if defined. #### Custom Format @@ -944,9 +1021,21 @@ In addition, it can implement the following functions (optional): #### Generic Format -Additionally, a custom formatter can be based on the generic implementation `format/generic.js` providing a name. -CDS annotations and header values are then derived from format name based on wildcard annotations -`@websocket..` or `@ws..`. +Additionally, a custom formatter can be based on the generic implementation `format/generic.js` providing a name and identifier. +Values are derived via CDS annotations based on wildcard annotations +`@websocket..` or `@ws..` using the formatter name. +In addition, provided header values in the `websocket` or `ws` section are also used to derived values from. + +The following generic implementation specifics are included: + +- **parse:** Data is parsed generically + - Parsing is based on formatter specific wildcard annotations on operation level (static) or operation parameter level (dynamic), if available. + - CDS operation (action or function) is derived from generic annotation `@websocket..name` or `@ws..name`. + - Operation identification is based on the formatter identifier (default `name`) on event data, that can be specified per formatter. + - Data is passed further as-is, in case no CDS annotations are present for format +- **compose:** Data is composed generically + - First data is composed based on headers, if available + - Subsequently, formatter specific wildcard annotations on event level (static) or event element level (dynamic) are processed ### Connect & Disconnect diff --git a/src/format/generic.js b/src/format/generic.js index 7fcb36e..1c3a74f 100644 --- a/src/format/generic.js +++ b/src/format/generic.js @@ -171,14 +171,14 @@ class GenericFormat extends BaseFormat { * @param {String} name Definition name (event, operation) * @param {Object} data Data * @param {Object} mappedData Data to be mapped into - * @param {[String]} [localAnnotationNames] Local annotation names to be mapped + * @param {[String]} [localAnnotations] Local annotation names to be mapped * @returns {*} Derived value */ - mapValues(name, data, mappedData, localAnnotationNames) { + mapValues(name, data, mappedData, localAnnotations) { data ??= {}; const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; if (definition) { - for (const localAnnotation of localAnnotationNames || []) { + for (const localAnnotation of localAnnotations || []) { for (const annotation of [ `@websocket.${this.name}.${localAnnotation}`, `@ws.${this.name}.${localAnnotation}`, @@ -194,7 +194,7 @@ class GenericFormat extends BaseFormat { continue; } let mapped = false; - for (const localAnnotation of localAnnotationNames || []) { + for (const localAnnotation of localAnnotations || []) { if (mapped) { break; } diff --git a/src/index.js b/src/index.js index dd3837e..fa3facb 100644 --- a/src/index.js +++ b/src/index.js @@ -150,7 +150,8 @@ function bindServiceEvents(socketServer, service, path) { const user = deriveUser(event, req.data, req.headers, req); const context = deriveContext(event, req.data, req.headers); const identifier = deriveIdentifier(event, req.data, req.headers); - const headers = req.headers?.websocket || req.headers?.ws; + const headers = + req.headers?.websocket || req.headers?.ws ? { ...req.headers?.websocket, ...req.headers?.ws } : undefined; path = normalizeEventPath(event["@websocket.path"] || event["@ws.path"] || path); await socketServer.broadcast({ service, diff --git a/test/_env/srv/handlers/chat.js b/test/_env/srv/handlers/chat.js index ffacc72..4a694dd 100644 --- a/test/_env/srv/handlers/chat.js +++ b/test/_env/srv/handlers/chat.js @@ -3,7 +3,11 @@ module.exports = (srv) => { srv.on("message", async (req) => { req.data.user = req.user.id; - await srv.emit("received", req.data); + await srv.emit("received", req.data, { + ws: { + header: "value", + }, + }); return req.data.text; }); }; diff --git a/test/ws/redis_ws.test.js b/test/ws/redis_ws.test.js index 15cfd26..aa44414 100644 --- a/test/ws/redis_ws.test.js +++ b/test/ws/redis_ws.test.js @@ -57,7 +57,7 @@ describe("Redis", () => { expect(redis.client.subscribe).toHaveBeenNthCalledWith(4, "websocket/main", expect.any(Function)); expect(redis.client.publish).toHaveBeenCalledWith( "websocket/chat", - `{"event":"received","data":{"text":"test","user":"alice"},"tenant":"t1"}`, + '{"event":"received","data":{"text":"test","user":"alice"},"tenant":"t1","headers":{"header":"value"}}', ); // Duplicated because Redis mock publishes to same client (not done for real Redis) From 93e263f6f929a5319d8f02bb2218c5b56e09ae3f Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Mon, 30 Sep 2024 17:59:53 +0200 Subject: [PATCH 6/8] Docs --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index cb8251f..d5d193c 100644 --- a/README.md +++ b/README.md @@ -913,7 +913,8 @@ The provided header values in the `websocket` or `ws` section are mapped to the **Example:** ```js -await srv.emit("cloudEvent", +await srv.emit( + "cloudEvent", { appinfoA, appinfoB, From b01df4ee77d59da8046f487d1344c7b658f9a440 Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Tue, 1 Oct 2024 08:21:28 +0200 Subject: [PATCH 7/8] Refactoring --- README.md | 10 ++++- src/format/generic.js | 64 +++++++++++++++++++--------- src/format/pcp.js | 8 +++- test/_env/srv/handlers/cloudevent.js | 8 ++-- 4 files changed, 64 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index d5d193c..855e647 100644 --- a/README.md +++ b/README.md @@ -773,7 +773,10 @@ during event emit. ```js await srv.emit("customEvent", { ... }, { ws: { - a: 1 + a: 1, + cloudevent: { + e: true + } }, websocket: { b: "c" @@ -782,7 +785,8 @@ await srv.emit("customEvent", { ... }, { ``` These headers are made available to the format `compose(event, data, headers)` function, to be included in the -composed WebSocket message, if applicable (e.g. format: `pcp`, `cloudevent`). +composed WebSocket message, if applicable (e.g. format: `pcp`, `cloudevent`). Format specific headers can also be defined +in formatter named subsection, e.g. `ws.cloudevent.e: true` (for format `cloudevent`), to avoid conflicts. ### Ignore Elements @@ -1026,6 +1030,8 @@ Additionally, a custom formatter can be based on the generic implementation `for Values are derived via CDS annotations based on wildcard annotations `@websocket..` or `@ws..` using the formatter name. In addition, provided header values in the `websocket` or `ws` section are also used to derived values from. +Format specific headers can also be defined in formatter named subsection, e.g. `ws.cloudevent.e: true` (for format `cloudevent`), +to avoid conflicts. The following generic implementation specifics are included: diff --git a/src/format/generic.js b/src/format/generic.js index 1c3a74f..ab40a23 100644 --- a/src/format/generic.js +++ b/src/format/generic.js @@ -47,17 +47,43 @@ class GenericFormat extends BaseFormat { const result = {}; const annotations = this.collectAnnotations(event); for (const header in headers) { - annotations.add(header); - } - for (const annotation of annotations) { - const value = this.deriveValue(event, data, headers, { + if (header === this.name && typeof headers[header] === "object") { + continue; + } + const value = this.deriveValue(event, { + headers, headerNames: [ - `${this.name}-${annotation}`, - `${this.name}_${annotation}`, - `${this.name}.${annotation}`, - `${this.name}${annotation}`, - annotation, + `${this.name}-${header}`, + `${this.name}_${header}`, + `${this.name}.${header}`, + `${this.name}${header}`, + header, ], + }); + if (value !== undefined) { + result[header] = value; + } + } + if (headers?.[this.name]) { + for (const header in headers?.[this.name]) { + const value = this.deriveValue(event, { + headers: headers?.[this.name], + headerNames: [ + `${this.name}-${header}`, + `${this.name}_${header}`, + `${this.name}.${header}`, + `${this.name}${header}`, + header, + ], + }); + if (value !== undefined) { + result[header] = value; + } + } + } + for (const annotation of annotations) { + const value = this.deriveValue(event, { + data, annotationNames: [`@websocket.${this.name}.${annotation}`, `@ws.${this.name}.${annotation}`], }); if (value !== undefined) { @@ -121,14 +147,14 @@ class GenericFormat extends BaseFormat { /** * Derive value from data, headers and fallback using header names and annotation names * @param {String} name Definition name (event, operation) - * @param {Object} data Data * @param {Object} [headers] Header data * @param {[String]} [headerNames] Header names to derive value from + * @param {Object} [data] Data * @param {[String]} [annotationNames] Annotation names to derived values from * @param {*} [fallback] Fallback value * @returns {*} Derived value */ - deriveValue(name, data, headers, { headerNames, annotationNames, fallback }) { + deriveValue(name, { headers, headerNames, data, annotationNames, fallback }) { if (headers && headerNames) { for (const header of headerNames) { if (headers[header] !== undefined) { @@ -136,16 +162,16 @@ class GenericFormat extends BaseFormat { } } } - const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; - if (definition) { - if (annotationNames) { - for (const annotation of annotationNames) { - if (definition[annotation] !== undefined) { - return definition[annotation]; + if (data && annotationNames) { + const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; + if (definition) { + if (annotationNames) { + for (const annotation of annotationNames) { + if (definition[annotation] !== undefined) { + return definition[annotation]; + } } } - } - if (data) { const elements = Object.values(definition?.elements || definition?.params || {}); for (const annotation of annotationNames) { if (annotationNames) { diff --git a/src/format/pcp.js b/src/format/pcp.js index 7bfff6f..90d921a 100644 --- a/src/format/pcp.js +++ b/src/format/pcp.js @@ -59,13 +59,17 @@ class PCPFormat extends GenericFormat { compose(event, data, headers) { const eventDefinition = this.service.events()[event]; - const pcpMessage = this.deriveValue(event, data, headers, { + const pcpMessage = this.deriveValue(event, { + headers, headerNames: ["pcp-message", "pcp_message", "pcp.message", "pcpmessage"], + data, annotationNames: ["@websocket.pcp.message", "@ws.pcp.message"], fallback: event, }); - const pcpAction = this.deriveValue(event, data, headers, { + const pcpAction = this.deriveValue(event, { + headers, headerNames: ["pcp-action", "pcp_action", "pcp.action", "pcpaction"], + data, annotationNames: ["@websocket.pcp.action", "@ws.pcp.action"], fallback: MESSAGE, }); diff --git a/test/_env/srv/handlers/cloudevent.js b/test/_env/srv/handlers/cloudevent.js index 07a1e6a..7b3b243 100644 --- a/test/_env/srv/handlers/cloudevent.js +++ b/test/_env/srv/handlers/cloudevent.js @@ -48,9 +48,11 @@ module.exports = (srv) => { subject: req.data._subject || "example", id: "C234-1234-1234", time: "2018-04-05T17:31:00Z", - comexampleextension1: "value", - comexampleothervalue: 5, - datacontenttype: "application/cloudevents+json", + cloudevent: { + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/cloudevents+json", + }, }, }, ); From 5a4c11aa3742131ba73ebb40a541242f3b216d77 Mon Sep 17 00:00:00 2001 From: Oliver Klemenz Date: Tue, 1 Oct 2024 08:38:03 +0200 Subject: [PATCH 8/8] Docs --- README.md | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 855e647..5344ec2 100644 --- a/README.md +++ b/README.md @@ -788,9 +788,10 @@ These headers are made available to the format `compose(event, data, headers)` f composed WebSocket message, if applicable (e.g. format: `pcp`, `cloudevent`). Format specific headers can also be defined in formatter named subsection, e.g. `ws.cloudevent.e: true` (for format `cloudevent`), to avoid conflicts. -### Ignore Elements +### Ignore Definitions -To ignore elements during event emit, the annotation `@websocket.ignore` or `@ws.ignore` is available on event element level. +To ignore elements and parameters during event processing, the annotation `@websocket.ignore` or `@ws.ignore` is available +on event element and operation parameter level. The annotation can be used to exclude elements and parameters from WebSocket event. ### WebSocket Format @@ -873,8 +874,8 @@ A Cloud Event message has the following structure: } ``` -To create a cloud-event compatible CDS event, either the event is modeled as CDS service event according to the specification -or a CDS event is mapped via annotations to a cloud-event compatible event. +To create a Cloud Event compatible CDS event, either the event is modeled as CDS service event according to the specification +or a CDS event is mapped via annotations to a Cloud Event compatible event. ##### Modeling Cloud Event @@ -901,16 +902,16 @@ event cloudEvent { } ``` -The CDS event `cloudEvent` is explicitly modeled according to the cloud-event specification. +The CDS event `cloudEvent` is explicitly modeled according to the Cloud Event specification. The event data is passed inbound and outbound in the exact same representation as JSON object, as specified. -No additional annotation is necessary to be defined. +No additional annotations are necessary to be defined. ##### Mapping Cloud Event -CDS events can also be mapped to cloud-event compatible events via headers and CDS annotations. The implementation is based on the -`generic` formatter (see section below), that allows to map CDS events to cloud-event compatible events based on +CDS events can also be mapped to Cloud Event compatible events via headers and CDS annotations. The implementation is based on the +`generic` formatter (see section below), that allows to map CDS events to Cloud Event compatible events based on cloud event specific headers and wildcard annotations, starting with `@websocket.cloudevent.` or `@ws.cloudevent.` -to match the cloud-event specific attributes. +to match the Cloud Event specific attributes. The provided header values in the `websocket` or `ws` section are mapped to the cloud event attributes generically, if available. @@ -972,8 +973,8 @@ event cloudEvent2 { } ``` -Event is published via cloud event sub-protocol, with the specified static cloud event attributes. The event data is -consumed as cloud event data section. +Event is published via cloud event sub-protocol, with the specified static cloud event attributes. +The CDS event data is consumed as cloud event data section. **Event Element Level:** @@ -1007,7 +1008,7 @@ Event is published via cloud event sub-protocol, with the specified dynamic clou CDS event elements. Annotated elements are consumed as cloud event attributes, non-annotated elements are consumed as cloud event data section. -Static and dynamic annotations can be combined. Dynamnic values are overwritten by static values, if defined. +Static and dynamic annotations can be combined. Static values have precedence over dynamic values, if defined. #### Custom Format @@ -1017,18 +1018,19 @@ in `@websocket.format` resp. `@ws.format` annotation (e.g. `@ws.format: './forma The custom format class needs to implement the following functions: - **parse(data)**: Parse the event data into internal data (JSON), i.e. `{ event, data }` -- **compose(event, data)**: Compose the event and internal data (JSON) into a formatted string. For kind `socket.io`, it - can also be a JSON object. +- **compose(event, data, headers)**: Compose the internal event data (JSON) and event headers into a formatted string. + For kind `socket.io`, it can also be a JSON object. In addition, it can implement the following functions (optional): -- **constructor(service)**: Setup instance with service definition on creation +- **constructor(service, origin)**: Setup instance with service definition and origin format on creation #### Generic Format -Additionally, a custom formatter can be based on the generic implementation `format/generic.js` providing a name and identifier. +Additionally, a custom formatter can be based on the generic implementation `src/format/generic.js` providing a name and identifier. Values are derived via CDS annotations based on wildcard annotations `@websocket..` or `@ws..` using the formatter name. + In addition, provided header values in the `websocket` or `ws` section are also used to derived values from. Format specific headers can also be defined in formatter named subsection, e.g. `ws.cloudevent.e: true` (for format `cloudevent`), to avoid conflicts. @@ -1041,7 +1043,7 @@ The following generic implementation specifics are included: - Operation identification is based on the formatter identifier (default `name`) on event data, that can be specified per formatter. - Data is passed further as-is, in case no CDS annotations are present for format - **compose:** Data is composed generically - - First data is composed based on headers, if available + - First data is composed based on headers, if available (see section Format Headers) - Subsequently, formatter specific wildcard annotations on event level (static) or event element level (dynamic) are processed ### Connect & Disconnect @@ -1054,8 +1056,8 @@ service operation: #### Approuter -Authorization in provided in production by approuter component (e.g. via XSUAA auth). -Valid UAA bindings for approuter and backend are necessary, so that the authorization flow is working. +Authorization in provided in production by Approuter component (e.g. via XSUAA auth). +Valid UAA bindings for Approuter and backend are necessary, so that the authorization flow is working. Locally, the following default environment files need to exist: - `test/_env/default-env.json`