diff --git a/CHANGELOG.md b/CHANGELOG.md index a0458fb..4148b57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,17 +9,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Add support for Cloud Events - Include or exclude defined list of users - Support exclusion of event contexts - Overrule path of websocket event via `@websocket.path` or `@ws.path` for non-websocket services - Overrule format of websocket event via `@websocket.format` or `@ws.format` for non-websocket services +- Ignore event elements or operation parameters with `@websocket.ignore` or `@ws.ignore` - Optimization of client determination for kind `ws` - Allow empty PCP message in event definition - Improve documentation and examples +- Provide event headers to formatter ### Fixed - Ignore not modeled PCP fields in payload serialization +- Fix annotations value derivation for non-websocket service events - Fix annotations `wsCurrentUserInclude`, `currentUserInclude`, `wsCurrentUserExclude`, `currentUserExclude` ## Version 1.2.0 - 2024-09-04 diff --git a/README.md b/README.md index 569f947..5344ec2 100644 --- a/README.md +++ b/README.md @@ -221,10 +221,10 @@ It abstracts from the concrete websocket implementation by exposing the followin - `context: Object`: CDS context object for the websocket server socket - `on(event: String, callback: Function)`: Register websocket event - `async emit(event: String, data: Object)`: Emit websocket event with data -- `async broadcast(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?)`: - Broadcast websocket event (except to sender) by optionally restrict to users, contexts or identifiers -- `async broadcastAll(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?)`: - Broadcast websocket event (including to sender) by optionally restrict to users, contexts or identifiers +- `async broadcast(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?, headers: Object?)`: + Broadcast websocket event (except to sender) by optionally restrict to users, contexts or identifiers and optionally providing headers +- `async broadcastAll(event: String, data: Object, user: {include: String[], exclude: String[]}?, context: : {include: String[], exclude: String[]}?, identifier: {include: String[], exclude: String[]}?, headers: Object?)`: + Broadcast websocket event (including to sender) by optionally restrict to users, contexts or identifiers and optionally providing headers - `async enter(context: String)`: Enter a context - `async exit(context: String)`: Exit a context - `async disconnect()`: Disconnect server socket @@ -377,7 +377,7 @@ Furthermore, also additional equivalent annotations alternatives are available: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.user: 'includeCurrent' @@ -434,7 +434,7 @@ Valid annotation values are: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.user.exclude: 'ABC' @@ -498,7 +498,7 @@ Valid annotation values are: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.context: 'ABC' @@ -642,7 +642,7 @@ Valid annotation values are: **Examples:** -**Entity Level:** +**Event Level:** ```cds @websocket.identifier.include: 'ABC' @@ -765,6 +765,34 @@ specification. All event annotation values (static or dynamic) and header values emit according to their kind. Values of all headers and annotations of same semantic type are unified for single and array values. +#### Format Headers + +In addition to the above event emit headers, format specific event headers can be specified in the `websocket` or `ws` section +during event emit. + +```js +await srv.emit("customEvent", { ... }, { + ws: { + a: 1, + cloudevent: { + e: true + } + }, + websocket: { + b: "c" + } +}); +``` + +These headers are made available to the format `compose(event, data, headers)` function, to be included in the +composed WebSocket message, if applicable (e.g. format: `pcp`, `cloudevent`). Format specific headers can also be defined +in formatter named subsection, e.g. `ws.cloudevent.e: true` (for format `cloudevent`), to avoid conflicts. + +### Ignore Definitions + +To ignore elements and parameters during event processing, the annotation `@websocket.ignore` or `@ws.ignore` is available +on event element and operation parameter level. The annotation can be used to exclude elements and parameters from WebSocket event. + ### WebSocket Format Per default the CDS websocket format is `json`, as CDS internally works with JSON objects. @@ -820,6 +848,168 @@ To configure the PCP message format the following annotations are available: - `@websocket.pcp.action, @ws.pcp.action: Boolean`: Expose the string value of the annotated event element as `pcp-action` field in the PCP message. Default `MESSAGE`. +#### Cloud Events + +CDS WebSocket module supports the Cloud Events specification out-of-the-box according to +[WebSockets Protocol Binding for CloudEvents](https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/websockets-protocol-binding.md). + +A Cloud Event message has the following structure: + +```json +{ + "specversion": "1.0", + "type": "com.example.someevent", + "source": "/mycontext", + "subject": null, + "id": "C234-1234-1234", + "time": "2018-04-05T17:31:00Z", + "comexampleextension1": "value", + "comexampleothervalue": 5, + "datacontenttype": "application/json", + "data": { + "appinfoA": "abc", + "appinfoB": 123, + "appinfoC": true + } +} +``` + +To create a Cloud Event compatible CDS event, either the event is modeled as CDS service event according to the specification +or a CDS event is mapped via annotations to a Cloud Event compatible event. + +##### Modeling Cloud Event + +Cloud event can be explicitly modelled as CDS event, matching the specification of cloud event attributes. + +**Example:** + +```cds +event cloudEvent { + specversion : String; + type : String; + source : String; + subject : String; + id : String; + time : String; + comexampleextension1 : String; + comexampleothervalue : String; + datacontenttype : String; + data: { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } +} +``` + +The CDS event `cloudEvent` is explicitly modeled according to the Cloud Event specification. +The event data is passed inbound and outbound in the exact same representation as JSON object, as specified. +No additional annotations are necessary to be defined. + +##### Mapping Cloud Event + +CDS events can also be mapped to Cloud Event compatible events via headers and CDS annotations. The implementation is based on the +`generic` formatter (see section below), that allows to map CDS events to Cloud Event compatible events based on +cloud event specific headers and wildcard annotations, starting with `@websocket.cloudevent.` or `@ws.cloudevent.` +to match the Cloud Event specific attributes. + +The provided header values in the `websocket` or `ws` section are mapped to the cloud event attributes generically, if available. + +**Example:** + +```js +await srv.emit( + "cloudEvent", + { + appinfoA, + appinfoB, + appinfoC, + }, + { + ws: { + specversion: "1.0", + type: "com.example.someevent.cloudEvent4", + source: "/mycontext", + subject: req.data._subject || "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + }, + }, +); +``` + +Subsequently, the following annotations are respected: + +- **Event level**: + - `@websocket.cloudevent.: ` + - Type: `any` (according to Cloud Event JSON format) + - Provide static cloud event attribute value, according to cloud event specification +- **Event element level**: + - `@websocket.cloudevent.` + - Type: `Boolean` + - Value from event data for the annotated element is used as dynamic cloud event attribute value, according to cloud event attribute specification + +**Examples:** + +**Event Level:** + +```cds +@ws.cloudevent.specversion : '1.0' +@ws.cloudevent.type : 'com.example.someevent' +@ws.cloudevent.source : '/mycontext' +@ws.cloudevent.subject : 'example' +@ws.cloudevent.id : 'C234-1234-1234' +@ws.cloudevent.time : '2018-04-05T17:31:00Z' +@ws.cloudevent.comexampleextension1: 'value' +@ws.cloudevent.comexampleothervalue: 5 +@ws.cloudevent.datacontenttype : 'application/json' +event cloudEvent2 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; +} +``` + +Event is published via cloud event sub-protocol, with the specified static cloud event attributes. +The CDS event data is consumed as cloud event data section. + +**Event Element Level:** + +```cds +event cloudEvent3 { + @ws.cloudevent.specversion + specversion : String + @ws.cloudevent.type + type : String + @ws.cloudevent.source + source : String + @ws.cloudevent.subject + subject : String + @ws.cloudevent.id + id : String + @ws.cloudevent.time + time : String + @ws.cloudevent.comexampleextension1 + extension1 : String + @ws.cloudevent.comexampleothervalue + othervalue : String + @ws.cloudevent.datacontenttype + datacontenttype : String; + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; +} +``` + +Event is published via cloud event sub-protocol, with the specified dynamic cloud event attributes derived from +CDS event elements. Annotated elements are consumed as cloud event attributes, non-annotated elements are consumed as +cloud event data section. + +Static and dynamic annotations can be combined. Static values have precedence over dynamic values, if defined. + #### Custom Format A custom websocket format implementation can be provided via a path relative to the project root @@ -828,12 +1018,33 @@ in `@websocket.format` resp. `@ws.format` annotation (e.g. `@ws.format: './forma The custom format class needs to implement the following functions: - **parse(data)**: Parse the event data into internal data (JSON), i.e. `{ event, data }` -- **compose(event, data)**: Compose the event and internal data (JSON) into a formatted string. For kind `socket.io`, it - can also be a JSON object. +- **compose(event, data, headers)**: Compose the internal event data (JSON) and event headers into a formatted string. + For kind `socket.io`, it can also be a JSON object. In addition, it can implement the following functions (optional): -- **constructor(service)**: Setup instance with service definition on creation +- **constructor(service, origin)**: Setup instance with service definition and origin format on creation + +#### Generic Format + +Additionally, a custom formatter can be based on the generic implementation `src/format/generic.js` providing a name and identifier. +Values are derived via CDS annotations based on wildcard annotations +`@websocket..` or `@ws..` using the formatter name. + +In addition, provided header values in the `websocket` or `ws` section are also used to derived values from. +Format specific headers can also be defined in formatter named subsection, e.g. `ws.cloudevent.e: true` (for format `cloudevent`), +to avoid conflicts. + +The following generic implementation specifics are included: + +- **parse:** Data is parsed generically + - Parsing is based on formatter specific wildcard annotations on operation level (static) or operation parameter level (dynamic), if available. + - CDS operation (action or function) is derived from generic annotation `@websocket..name` or `@ws..name`. + - Operation identification is based on the formatter identifier (default `name`) on event data, that can be specified per formatter. + - Data is passed further as-is, in case no CDS annotations are present for format +- **compose:** Data is composed generically + - First data is composed based on headers, if available (see section Format Headers) + - Subsequently, formatter specific wildcard annotations on event level (static) or event element level (dynamic) are processed ### Connect & Disconnect @@ -845,8 +1056,8 @@ service operation: #### Approuter -Authorization in provided in production by approuter component (e.g. via XSUAA auth). -Valid UAA bindings for approuter and backend are necessary, so that the authorization flow is working. +Authorization in provided in production by Approuter component (e.g. via XSUAA auth). +Valid UAA bindings for Approuter and backend are necessary, so that the authorization flow is working. Locally, the following default environment files need to exist: - `test/_env/default-env.json` diff --git a/jest.config.js b/jest.config.js index 18d9ac3..a026fed 100644 --- a/jest.config.js +++ b/jest.config.js @@ -12,7 +12,7 @@ module.exports = { coverageThreshold: { global: { branches: 80, - functions: 90, + functions: 95, lines: 90, statements: 90, }, diff --git a/src/format/base.js b/src/format/base.js index d25867f..5fbd14f 100644 --- a/src/format/base.js +++ b/src/format/base.js @@ -13,7 +13,7 @@ class BaseFormat { /** * Parse the event data into internal data (JSON), i.e. `{ event, data }` * @param {String|Object} data Event data - * @returns {event: String, data: Object} Parsed data + * @returns [{event: String, data: Object}] Parsed data */ parse(data) {} @@ -21,9 +21,10 @@ class BaseFormat { * Compose the event and internal data (JSON) into a formatted string * @param {String} event Event name * @param {Object} data Event internal data + * @param {Object} headers Event headers * @returns {String|Object} Formatted string or a JSON object (for kind `socket.io` only) */ - compose(event, data) {} + compose(event, data, headers) {} } module.exports = BaseFormat; diff --git a/src/format/cloudevent.js b/src/format/cloudevent.js new file mode 100644 index 0000000..a78d398 --- /dev/null +++ b/src/format/cloudevent.js @@ -0,0 +1,54 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const GenericFormat = require("./generic"); + +class CloudEventFormat extends GenericFormat { + constructor(service, origin) { + super(service, origin, "cloudevent", "type"); + } + + parse(data) { + data = this.deserialize(data); + const operation = this.determineOperation(data); + if (typeof data?.data === "object" && !operation?.params?.data) { + const ceData = data.data; + delete data.data; + data = { + ...data, + ...ceData, + }; + } + return super.parse(data); + } + + compose(event, data, headers) { + let cloudEvent = { + specversion: "1.0", + type: `${this.service.name}.${event}`, + source: this.service.name, + subject: null, + id: cds.utils.uuid(), + time: new Date().toISOString(), + datacontenttype: "application/json", + data: {}, + }; + const eventDefinition = this.service.events()[event]; + if (eventDefinition?.elements?.data && data.data) { + cloudEvent = { + ...data, + }; + } else { + cloudEvent.data = data; + } + const result = super.compose(event, data, headers); + cloudEvent = { + ...cloudEvent, + ...result, + }; + return this.serialize(cloudEvent); + } +} + +module.exports = CloudEventFormat; diff --git a/src/format/generic.js b/src/format/generic.js new file mode 100644 index 0000000..ab40a23 --- /dev/null +++ b/src/format/generic.js @@ -0,0 +1,284 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const BaseFormat = require("./base"); + +class GenericFormat extends BaseFormat { + constructor(service, origin, name, identifier) { + super(service, origin); + this.name = name; + this.identifier = identifier || "name"; + this.LOG = cds.log(`/websocket/${name}`); + } + + parse(data) { + data = this.deserialize(data); + const operation = this.determineOperation(data); + if (operation) { + const annotations = this.collectAnnotations(operation.name); + // Ignore name annotation corresponding to the identifier + annotations.delete("name"); + let mappedData = {}; + if (annotations.size > 0) { + this.mapValues(operation.name, data, mappedData, annotations); + } else { + mappedData = data; + } + const result = {}; + for (const param of operation.params) { + if (mappedData[param.name] !== undefined) { + result[param.name] = mappedData[param.name]; + } + } + return { + event: this.localName(operation.name), + data: result, + }; + } + this.LOG?.error(`Operation could not be determined`, data); + return { + event: undefined, + data: {}, + }; + } + + compose(event, data, headers) { + const result = {}; + const annotations = this.collectAnnotations(event); + for (const header in headers) { + if (header === this.name && typeof headers[header] === "object") { + continue; + } + const value = this.deriveValue(event, { + headers, + headerNames: [ + `${this.name}-${header}`, + `${this.name}_${header}`, + `${this.name}.${header}`, + `${this.name}${header}`, + header, + ], + }); + if (value !== undefined) { + result[header] = value; + } + } + if (headers?.[this.name]) { + for (const header in headers?.[this.name]) { + const value = this.deriveValue(event, { + headers: headers?.[this.name], + headerNames: [ + `${this.name}-${header}`, + `${this.name}_${header}`, + `${this.name}.${header}`, + `${this.name}${header}`, + header, + ], + }); + if (value !== undefined) { + result[header] = value; + } + } + } + for (const annotation of annotations) { + const value = this.deriveValue(event, { + data, + annotationNames: [`@websocket.${this.name}.${annotation}`, `@ws.${this.name}.${annotation}`], + }); + if (value !== undefined) { + result[annotation] = value; + } + } + return result; + } + + /** + * Determine operation based on event data + * @param {Object} data Event data + * @returns {Object} Service Operation + */ + determineOperation(data) { + if (!data) { + return; + } + return Object.values(this.service.operations).find((operation) => { + return ( + (operation[`@websocket.${this.name}.name`] && + operation[`@websocket.${this.name}.name`] === data[this.identifier]) || + (operation[`@ws.${this.name}.name`] && operation[`@ws.${this.name}.name`] === data[this.identifier]) || + operation.name === data[this.identifier] + ); + }); + } + + /** + * Collect annotations for an CDS definition (event, operation) and CDS definition elements (elements, params) + * @param name Service definition name (event, operation) + * @returns {Set} Set of annotations + */ + collectAnnotations(name) { + const annotations = new Set(); + const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; + for (const annotation in definition) { + if (annotation.startsWith(`@websocket.${this.name}.`)) { + annotations.add(annotation.substring(`@websocket.${this.name}.`.length)); + } + if (annotation.startsWith(`@ws.${this.name}.`)) { + annotations.add(annotation.substring(`@ws.${this.name}.`.length)); + } + } + if (definition?.elements || definition?.params) { + const elements = Object.values(definition?.elements || definition?.params); + for (const element of elements) { + for (const annotation in element) { + if (annotation.startsWith(`@websocket.${this.name}.`)) { + annotations.add(annotation.substring(`@websocket.${this.name}.`.length)); + } + if (annotation.startsWith(`@ws.${this.name}.`)) { + annotations.add(annotation.substring(`@ws.${this.name}.`.length)); + } + } + } + } + return annotations; + } + + /** + * Derive value from data, headers and fallback using header names and annotation names + * @param {String} name Definition name (event, operation) + * @param {Object} [headers] Header data + * @param {[String]} [headerNames] Header names to derive value from + * @param {Object} [data] Data + * @param {[String]} [annotationNames] Annotation names to derived values from + * @param {*} [fallback] Fallback value + * @returns {*} Derived value + */ + deriveValue(name, { headers, headerNames, data, annotationNames, fallback }) { + if (headers && headerNames) { + for (const header of headerNames) { + if (headers[header] !== undefined) { + return headers[header]; + } + } + } + if (data && annotationNames) { + const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; + if (definition) { + if (annotationNames) { + for (const annotation of annotationNames) { + if (definition[annotation] !== undefined) { + return definition[annotation]; + } + } + } + const elements = Object.values(definition?.elements || definition?.params || {}); + for (const annotation of annotationNames) { + if (annotationNames) { + const element = elements.find((element) => { + return element[annotation] && !(element["@websocket.ignore"] || element["@ws.ignore"]); + }); + if (element) { + const elementValue = data[element.name]; + if (elementValue !== undefined) { + delete data[element.name]; + return elementValue; + } + } + } + } + } + } + return fallback; + } + + /** + * Derive annotation value from datausing annotation names + * @param {String} name Definition name (event, operation) + * @param {Object} data Data + * @param {Object} mappedData Data to be mapped into + * @param {[String]} [localAnnotations] Local annotation names to be mapped + * @returns {*} Derived value + */ + mapValues(name, data, mappedData, localAnnotations) { + data ??= {}; + const definition = this.service.events()[name] || this.service.operations()[this.localName(name)]; + if (definition) { + for (const localAnnotation of localAnnotations || []) { + for (const annotation of [ + `@websocket.${this.name}.${localAnnotation}`, + `@ws.${this.name}.${localAnnotation}`, + ]) { + if (definition[annotation] !== undefined) { + data[localAnnotation] = definition[annotation]; + } + } + } + const elements = Object.values(definition?.elements || definition?.params || {}); + for (const element of elements) { + if (element["@websocket.ignore"] || element["@ws.ignore"]) { + continue; + } + let mapped = false; + for (const localAnnotation of localAnnotations || []) { + if (mapped) { + break; + } + for (const annotation of [ + `@websocket.${this.name}.${localAnnotation}`, + `@ws.${this.name}.${localAnnotation}`, + ]) { + if (!element[annotation]) { + continue; + } + if (data[localAnnotation] !== undefined) { + mappedData[element.name] = data[localAnnotation]; + mapped = true; + break; + } + } + } + if (!mapped) { + mappedData[element.name] = data[element.name]; + } + } + } + } + + /** + * Get local name of a definition name (event, operation) + * @param {String} name Service definition name + * @returns {String} Local name of the definition + */ + localName(name) { + return name.startsWith(`${this.service.name}.`) ? name.substring(this.service.name.length + 1) : name; + } + + /** + * Deserialize data + * @param {String|Object} data Data + * @returns {Object} Deserialized data + */ + deserialize(data) { + if (data === undefined) { + return; + } + try { + return data?.constructor === Object ? data : JSON.parse(data); + } catch (err) { + this.LOG?.error(err); + this.LOG?.error(`Error parsing ${this.name} format`, data); + } + } + + /** + * Serialize data based on format origin + * @param {String|Object} data Data + * @returns {String|Object} Serialized data + */ + serialize(data) { + return this.origin === "json" ? data : JSON.stringify(data); + } +} + +module.exports = GenericFormat; diff --git a/src/format/pcp.js b/src/format/pcp.js index 7db576f..90d921a 100644 --- a/src/format/pcp.js +++ b/src/format/pcp.js @@ -2,7 +2,7 @@ const cds = require("@sap/cds"); -const BaseFormat = require("./base"); +const GenericFormat = require("./generic"); const DESERIALIZE_REGEX = /((?:[^:\\]|(?:\\.))+):((?:[^:\\\n]|(?:\\.))*)/; const MESSAGE = "MESSAGE"; @@ -10,7 +10,7 @@ const SEPARATOR = "\n\n"; const LOG = cds.log("/websocket/pcp"); -class PCPFormat extends BaseFormat { +class PCPFormat extends GenericFormat { constructor(service, origin) { super(service, origin); } @@ -23,64 +23,60 @@ class PCPFormat extends BaseFormat { } if (splitPos !== -1) { const result = {}; + const message = data.substring(splitPos + SEPARATOR.length); const pcpFields = extractPcpFields(data.substring(0, splitPos)); - const operation = Object.values(this.service.operations || {}).find((operation) => { + const operation = Object.values(this.service.operations).find((operation) => { return ( - operation["@websocket.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE) || - operation["@ws.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE) || + (operation["@websocket.pcp.action"] && + operation["@websocket.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE)) || + (operation["@ws.pcp.action"] && operation["@ws.pcp.action"] === (pcpFields["pcp-action"] || MESSAGE)) || operation.name === (pcpFields["pcp-action"] || MESSAGE) ); }); if (operation) { for (const param of operation.params) { + if (param["@websocket.ignore"] || param["@ws.ignore"]) { + continue; + } if (param["@websocket.pcp.message"] || param["@ws.pcp.message"]) { - result[param.name] = data.substring(splitPos + SEPARATOR.length); + result[param.name] = message; } else if (pcpFields[param.name] !== undefined) { result[param.name] = pcpFields[param.name]; } } return { - event: localName(operation.name, this.service), + event: this.localName(operation.name), data: result, }; } } - LOG?.warn("Error parsing pcp format", data); + LOG?.error("Error parsing pcp format", data); return { event: undefined, data: {}, }; } - compose(event, data) { + compose(event, data, headers) { const eventDefinition = this.service.events()[event]; - const eventElements = Object.values(eventDefinition?.elements || {}); - const messageElement = eventElements.find((element) => { - return element["@websocket.pcp.message"] || element["@ws.pcp.message"]; + const pcpMessage = this.deriveValue(event, { + headers, + headerNames: ["pcp-message", "pcp_message", "pcp.message", "pcpmessage"], + data, + annotationNames: ["@websocket.pcp.message", "@ws.pcp.message"], + fallback: event, }); - const actionElement = eventElements.find((element) => { - return element["@websocket.pcp.action"] || element["@ws.pcp.action"]; + const pcpAction = this.deriveValue(event, { + headers, + headerNames: ["pcp-action", "pcp_action", "pcp.action", "pcpaction"], + data, + annotationNames: ["@websocket.pcp.action", "@ws.pcp.action"], + fallback: MESSAGE, }); - const message = - eventDefinition?.["@websocket.pcp.message"] ?? - eventDefinition?.["@ws.pcp.message"] ?? - data[messageElement?.name] ?? - event; - if (data[messageElement?.name]) { - delete data[messageElement?.name]; - } - const pcpAction = - eventDefinition?.["@websocket.pcp.action"] ?? - eventDefinition?.["@ws.pcp.action"] ?? - data[actionElement?.name] ?? - MESSAGE; - if (data[actionElement?.name]) { - delete data[actionElement?.name]; - } const pcpEvent = eventDefinition?.["@websocket.pcp.event"] || eventDefinition?.["@ws.pcp.event"] ? event : undefined; - const pcpFields = serializePcpFields(data, typeof message, pcpAction, pcpEvent, eventDefinition?.elements); - return pcpFields + message; + const pcpFields = serializePcpFields(data, typeof pcpMessage, pcpAction, pcpEvent, eventDefinition?.elements); + return pcpFields + pcpMessage; } } @@ -95,8 +91,14 @@ const serializePcpFields = (pcpFields, messageType, pcpAction, pcpEvent, element if (pcpFields && typeof pcpFields === "object") { for (const fieldName in pcpFields) { const fieldValue = stringValue(pcpFields[fieldName]); - if (fieldValue && fieldName.indexOf("pcp-") !== 0 && elements?.[fieldName]) { - serialized += escape(fieldName) + ":" + escape(fieldValue) + "\n"; + const element = elements?.[fieldName]; + if (element) { + if (element["@websocket.ignore"] || element["@ws.ignore"]) { + continue; + } + if (fieldValue && fieldName.indexOf("pcp-") !== 0) { + serialized += escape(fieldName) + ":" + escape(fieldValue) + "\n"; + } } } } @@ -139,17 +141,13 @@ const unescape = (escaped) => { .join("\u0008"); }; -const localName = (name, service) => { - return name.startsWith(`${service.name}.`) ? name.substring(service.name.length + 1) : name; -}; - -function stringValue(value) { +const stringValue = (value) => { if (value instanceof Date) { return value.toISOString(); } else if (value instanceof Object) { return JSON.stringify(value); } return String(value); -} +}; module.exports = PCPFormat; diff --git a/src/index.js b/src/index.js index 7728c1c..fa3facb 100644 --- a/src/index.js +++ b/src/index.js @@ -65,26 +65,25 @@ function serveWebSocketServer(options) { }; }), operations: () => { - return []; + return interableObject(); }, entities: () => { - return []; + return interableObject(); + }, + _events: interableObject(), + events: function () { + return this._events; }, - events: [], on: service.on.bind(service), tx: service.tx.bind(service), }; - eventServices[service.name].events.push(definition); + eventServices[service.name]._events[serviceLocalName(service, definition.name)] = definition; } } } for (const name in eventServices) { const eventService = eventServices[name]; - const events = eventService.events; - if (events.length > 0) { - eventService.events = () => { - return events; - }; + if (Object.keys(eventService.events()).length > 0) { serveWebSocketService(socketServer, eventService, options); } } @@ -151,6 +150,8 @@ function bindServiceEvents(socketServer, service, path) { const user = deriveUser(event, req.data, req.headers, req); const context = deriveContext(event, req.data, req.headers); const identifier = deriveIdentifier(event, req.data, req.headers); + const headers = + req.headers?.websocket || req.headers?.ws ? { ...req.headers?.websocket, ...req.headers?.ws } : undefined; path = normalizeEventPath(event["@websocket.path"] || event["@ws.path"] || path); await socketServer.broadcast({ service, @@ -161,6 +162,7 @@ function bindServiceEvents(socketServer, service, path) { user, context, identifier, + headers, socket: null, }); } catch (err) { @@ -171,7 +173,7 @@ function bindServiceEvents(socketServer, service, path) { } function bindServiceDefaults(socket, service) { - if (service.operations(WebSocketAction.Disconnect).length) { + if (service.operations[WebSocketAction.Disconnect]) { socket.onDisconnect(async (reason) => { await processEvent(socket, service, WebSocketAction.Disconnect, { reason }); }); @@ -182,7 +184,7 @@ function bindServiceDefaults(socket, service) { } else { await socket.exit(data?.context); } - if (service.operations(WebSocketAction.Context).length) { + if (service.operations[WebSocketAction.Context]) { await processEvent(socket, service, WebSocketAction.Context, data, callback); } else { callback && (await callback()); @@ -242,7 +244,7 @@ function bindServiceEntities(socket, service) { } async function emitConnect(socket, service) { - if (service.operations(WebSocketAction.Connect).length) { + if (service.operations[WebSocketAction.Connect]) { await processEvent(socket, service, WebSocketAction.Connect); } } @@ -552,6 +554,9 @@ function deriveValues( if (event.elements) { for (const name in event.elements) { const element = event.elements[name]; + if (element["@websocket.ignore"] || element["@ws.ignore"]) { + continue; + } const annotationValue = element[annotationName]; if (annotationExcludeValues?.includes(annotationValue)) { continue; @@ -674,6 +679,17 @@ function serviceLocalName(service, name) { return name; } +function interableObject(object) { + return { + ...object, + [Symbol.iterator]: function* () { + for (const event in this) { + yield this[event]; + } + }, + }; +} + function isServedViaWebsocket(service) { if (!service) { return false; diff --git a/src/socket/base.js b/src/socket/base.js index 68de6ec..6836d38 100644 --- a/src/socket/base.js +++ b/src/socket/base.js @@ -86,7 +86,7 @@ class SocketServer { /** * Broadcast websocket event to all sockets except to sender with options * @param {String} event Event - * @param {Object} data Data + * @param {Object} data Event data * @param {Object} [user] Users to be included/excluded, undefined: no restriction * @param {[String]} [user.include] Users to be included, undefined: no restriction * @param {[String]} [user.exclude] Users to be excluded, undefined: no restriction @@ -96,15 +96,16 @@ class SocketServer { * @param {Object} [identifier] Unique consumer-provided socket client identifiers to be included/excluded, undefined: no restriction * @param {[String]} [identifier.include] Client identifiers to be included, undefined: no restriction * @param {[String]} [identifier.exclude] Client identifiers to be excluded, undefined: no restriction + * @param {Object} [headers] Event headers * @returns {Promise} Promise when broadcasting completed */ - broadcast: async (event, data, user, context, identifier) => { + broadcast: async (event, data, user, context, identifier, headers) => { return Promise.resolve(); }, /** * Broadcast websocket event to all sockets with options * @param {String} event Event - * @param {Object} data Data + * @param {Object} data Event data * @param {Object} [user] Users to be included/excluded, undefined: no restriction * @param {[String]} [user.include] Users to be included, undefined: no restriction * @param {[String]} [user.exclude] Users to be excluded, undefined: no restriction @@ -114,9 +115,10 @@ class SocketServer { * @param {Object} [identifier] Unique consumer-provided socket client identifiers to be included/excluded, undefined: no restriction * @param {[String]} [identifier.include] Client identifiers to be included, undefined: no restriction * @param {[String]} [identifier.exclude] Client identifiers to be excluded, undefined: no restriction + * @param {Object} [headers] Event headers * @returns {Promise} Promise when broadcasting completed */ - broadcastAll: async (event, data, user, context, identifier) => { + broadcastAll: async (event, data, user, context, identifier, headers) => { return Promise.resolve(); }, /** @@ -153,7 +155,7 @@ class SocketServer { * @param {String} service Service definition * @param {String} [path] Service path, e.g. "/path" (relative to websocket server path), undefined: default service path * @param {String} event Event name or event message JSON content (no additional parameters provided (incl. 'data', except 'local')) - * @param {Object} [data] Data object + * @param {Object} [data] Event data * @param {String} [tenant] Tenant for isolation * @param {Object} [user] Users to be included/excluded, undefined: no restriction * @param {[String]} [user.include] Users to be included, undefined: no restriction @@ -164,11 +166,12 @@ class SocketServer { * @param {Object} [identifier] Unique consumer-provided socket client identifiers to be included/excluded, undefined: no restriction * @param {[String]} [identifier.include] Client identifiers to be included, undefined: no restriction * @param {[String]} [identifier.exclude] Client identifiers to be excluded, undefined: no restriction + * @param {Object} [headers] Event headers * @param {Object} [socket] Broadcast client socket to be excluded, undefined: no exclusion * @param {boolean} [local] Broadcast only locally (i.e. not via adapter), default: falsy * @returns {Promise} Promise when broadcasting completed */ - async broadcast({ service, path, event, data, tenant, user, context, identifier, socket, local }) {} + async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket, local }) {} /** * Handle HTTP request response diff --git a/src/socket/socket.io.js b/src/socket/socket.io.js index 57fb3f9..c5c133f 100644 --- a/src/socket/socket.io.js +++ b/src/socket/socket.io.js @@ -70,7 +70,7 @@ class SocketIOServer extends SocketServer { emit: async (event, data) => { await socket.emit(event, format.compose(event, data)); }, - broadcast: async (event, data, user, context, identifier) => { + broadcast: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -80,10 +80,11 @@ class SocketIOServer extends SocketServer { user, context, identifier, + headers, socket, }); }, - broadcastAll: async (event, data, user, context, identifier) => { + broadcastAll: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -93,6 +94,7 @@ class SocketIOServer extends SocketServer { user, context, identifier, + headers, socket: null, }); }, @@ -161,7 +163,7 @@ class SocketIOServer extends SocketServer { }); } - async broadcast({ service, path, event, data, tenant, user, context, identifier, socket }) { + async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket }) { path = path || this.defaultPath(service); tenant = tenant || socket?.context.tenant; let to = socket?.broadcast || this.io.of(path); @@ -222,7 +224,7 @@ class SocketIOServer extends SocketServer { } } const format = this.format(service, event, "json"); - to.emit(event, format.compose(event, data)); + to.emit(event, format.compose(event, data, headers)); } respond(socket, statusCode, body) { diff --git a/src/socket/ws.js b/src/socket/ws.js index 73fe378..27da908 100644 --- a/src/socket/ws.js +++ b/src/socket/ws.js @@ -73,7 +73,7 @@ class SocketWSServer extends SocketServer { emit: async (event, data) => { await ws.send(format.compose(event, data)); }, - broadcast: async (event, data, user, context, identifier) => { + broadcast: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -83,10 +83,11 @@ class SocketWSServer extends SocketServer { user, context, identifier, + headers, socket: ws, }); }, - broadcastAll: async (event, data, user, context, identifier) => { + broadcastAll: async (event, data, user, context, identifier, headers) => { await this.broadcast({ service, path, @@ -96,6 +97,7 @@ class SocketWSServer extends SocketServer { user, context, identifier, + headers, socket: null, }); }, @@ -127,7 +129,7 @@ class SocketWSServer extends SocketServer { }; } - async broadcast({ service, path, event, data, tenant, user, context, identifier, socket, local }) { + async broadcast({ service, path, event, data, tenant, user, context, identifier, headers, socket, local }) { const eventMessage = event; const isEventMessage = !data; if (isEventMessage) { @@ -179,7 +181,7 @@ class SocketWSServer extends SocketServer { } if (clients.size > 0) { const format = this.format(service, event); - const clientMessage = format.compose(event, data); + const clientMessage = format.compose(event, data, headers); for (const client of clients) { if (client !== socket && client.readyState === WebSocket.OPEN) { await client.send(clientMessage); @@ -189,7 +191,7 @@ class SocketWSServer extends SocketServer { if (!local) { const adapterMessage = isEventMessage ? eventMessage - : JSON.stringify({ event, data, tenant, user, context, identifier }); + : JSON.stringify({ event, data, tenant, user, context, identifier, headers }); await this.adapter?.emit(service, path, adapterMessage); } } diff --git a/test/_env/srv/cloudevent.cds b/test/_env/srv/cloudevent.cds new file mode 100644 index 0000000..fc15521 --- /dev/null +++ b/test/_env/srv/cloudevent.cds @@ -0,0 +1,111 @@ +@ws +@ws.format: 'cloudevent' +@path : 'cloudevent' +service CloudEventService { + + type CloudEventDataType : { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + @ws.ignore + appinfoD : String; + }; + + @ws.cloudevent.name: 'com.example.someevent.model' + action sendCloudEventModel( specversion : String, type : String, source : String, subject : String, id : String, time : String, comexampleextension1 : String, comexampleothervalue : Integer, datacontenttype : String, data: CloudEventDataType) returns Boolean; + + @ws.cloudevent.name: 'com.example.someevent.map' + @ws.cloudevent.subject: 'cloud-example' + action sendCloudEventMap( + @ws.cloudevent.specversion _specversion : String, + @ws.cloudevent.type _type : String, + @ws.cloudevent.source _source : String, + @ws.cloudevent.subject _subject : String, + @ws.cloudevent.id _id : String, + @ws.cloudevent.time _time : String, + @ws.cloudevent.comexampleextension1 _comexampleextension1 : String, + @ws.cloudevent.comexampleothervalue _comexampleothervalue : Integer, + @ws.cloudevent.datacontenttype _datacontenttype : String, + appinfoA : String, + appinfoB : Integer, + appinfoC : Boolean, + @ws.ignore appinfoD : String) returns Boolean; + + event cloudEvent1 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + + @websocket.cloudevent.specversion : '1.1' + @ws.cloudevent.type : 'com.example.someevent.cloudEvent2' + @ws.cloudevent.source : '/mycontext' + @ws.cloudevent.subject : 'example' + @ws.cloudevent.id : 'C234-1234-1234' + @ws.cloudevent.time : '2018-04-05T17:31:00Z' + @ws.cloudevent.comexampleextension1: 'value' + @ws.cloudevent.comexampleothervalue: 5 + @ws.cloudevent.datacontenttype : 'application/cloudevents+json' + event cloudEvent2 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + + event cloudEvent3 { + @websocket.ignore + skipValue : String; + @websocket.cloudevent.specversion + specversion : String; + + @ws.cloudevent.type + type : String; + + @ws.cloudevent.source + source : String; + + @ws.cloudevent.subject + subject : String; + + @ws.cloudevent.id + id : String; + + @ws.cloudevent.time + time : String; + + @ws.cloudevent.comexampleextension1 + extension1 : String; + + @ws.cloudevent.comexampleothervalue + othervalue : Integer; + + @ws.cloudevent.datacontenttype + datacontenttype : String; + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + + event cloudEvent4 { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + + event cloudEvent5 { + specversion : String; + type : String; + source : String; + subject : String; + id : String; + time : String; + comexampleextension1 : String; + comexampleothervalue : Integer; + datacontenttype : String; + data: { + appinfoA : String; + appinfoB : Integer; + appinfoC : Boolean; + } + } +} \ No newline at end of file diff --git a/test/_env/srv/handlers/chat.js b/test/_env/srv/handlers/chat.js index ffacc72..4a694dd 100644 --- a/test/_env/srv/handlers/chat.js +++ b/test/_env/srv/handlers/chat.js @@ -3,7 +3,11 @@ module.exports = (srv) => { srv.on("message", async (req) => { req.data.user = req.user.id; - await srv.emit("received", req.data); + await srv.emit("received", req.data, { + ws: { + header: "value", + }, + }); return req.data.text; }); }; diff --git a/test/_env/srv/handlers/cloudevent.js b/test/_env/srv/handlers/cloudevent.js new file mode 100644 index 0000000..7b3b243 --- /dev/null +++ b/test/_env/srv/handlers/cloudevent.js @@ -0,0 +1,78 @@ +"use strict"; + +module.exports = (srv) => { + srv.on(["sendCloudEventModel", "sendCloudEventMap"], async (req) => { + const appinfoA = (req.data.appinfoA ?? req.data.data?.appinfoA ?? "abc") + "d"; + const appinfoB = (req.data.appinfoB ?? req.data.data?.appinfoB ?? 123) + 1111; + const appinfoC = (req.data.appinfoC ?? req.data.data?.appinfoC ?? true) !== true; + + await srv.emit("cloudEvent1", { + appinfoA, + appinfoB, + appinfoC, + }); + + await srv.emit("cloudEvent2", { + appinfoA, + appinfoB, + appinfoC, + }); + + await srv.emit("cloudEvent3", { + specversion: "1.1", + type: "com.example.someevent.cloudEvent3", + source: "/mycontext", + subject: req.data._subject || "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + extension1: "value", + othervalue: 5, + datacontenttype: "application/cloudevents+json", + appinfoA, + appinfoB, + appinfoC, + }); + + await srv.emit( + "cloudEvent4", + { + appinfoA, + appinfoB, + appinfoC, + }, + { + ws: { + specversion: "1.1", + type: "com.example.someevent.cloudEvent4", + source: "/mycontext", + subject: req.data._subject || "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + cloudevent: { + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/cloudevents+json", + }, + }, + }, + ); + + await srv.emit("cloudEvent5", { + specversion: "1.1", + type: "com.example.someevent.cloudEvent5", + source: "/mycontext", + subject: req.data._subject || "example", + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/cloudevents+json", + data: { + appinfoA, + appinfoB, + appinfoC, + }, + }); + return true; + }); +}; diff --git a/test/_env/srv/handlers/pcp.js b/test/_env/srv/handlers/pcp.js index cea2015..4caf125 100644 --- a/test/_env/srv/handlers/pcp.js +++ b/test/_env/srv/handlers/pcp.js @@ -17,6 +17,22 @@ module.exports = (srv) => { field1: req.data.field1 || "value1", field2: req.data.field2 || "value2", }); + await srv.emit( + "notification4", + { + message: "no body!", + action: "MESSAGE", + field1: req.data.field1 || "value1", + field2: req.data.field2 || "value2", + field3: "ignore", + }, + { + ws: { + pcpaction: "ABC", + pcpmessage: "Header", + }, + }, + ); return true; }); }; diff --git a/test/_env/srv/handlers/todo.js b/test/_env/srv/handlers/todo.js index 6baec4e..a73887b 100644 --- a/test/_env/srv/handlers/todo.js +++ b/test/_env/srv/handlers/todo.js @@ -12,7 +12,7 @@ module.exports = class TodoService extends cds.ApplicationService { const ID = context.params?.[0]?.ID; if (ID && context.target === Todo && ["CREATE", "UPDATE", "DELETE"].includes(context.event)) { await this.emit("refresh", { ID }); - await this.emit("notify", { text: ID }); + await this.emit("notify", { text: "4711" }); const service = await cds.connect.to("TodoWSService"); await service.emit("refresh", { ID }); } diff --git a/test/_env/srv/index.cds b/test/_env/srv/index.cds index df0a98c..13df20a 100644 --- a/test/_env/srv/index.cds +++ b/test/_env/srv/index.cds @@ -1,4 +1,5 @@ using from './chat'; +using from './cloudevent'; using from './fns'; using from './main'; using from './odata'; diff --git a/test/_env/srv/pcp.cds b/test/_env/srv/pcp.cds index 65a8ed7..80aa58e 100644 --- a/test/_env/srv/pcp.cds +++ b/test/_env/srv/pcp.cds @@ -4,7 +4,7 @@ service PCPService { @ws.pcp.action: 'MESSAGE' - action sendNotification(@ws.pcp.message message: String, field1: String, field2: String, ![pcp-action]: String) returns Boolean; + action sendNotification(@ws.pcp.message message: String, field1: String, field2: String, @ws.ignore field3: String, ![pcp-action]: String) returns Boolean; @ws.pcp.event @ws.pcp.message: 'this is the body!' @@ -30,4 +30,13 @@ service PCPService { field1: String; field2: String; } + + @ws.pcp.event + event notification4 { + action: String; + field1: String; + field2: String; + @ws.ignore + field3: String; + } } \ No newline at end of file diff --git a/test/_env/srv/todo.cds b/test/_env/srv/todo.cds index e83e853..6bb088e 100644 --- a/test/_env/srv/todo.cds +++ b/test/_env/srv/todo.cds @@ -20,15 +20,15 @@ service TodoService { @ws event refresh { - ID : String + ID : String; }; @ws @ws.pcp.event @ws.pcp.message: '' - @ws.path : 'fns-websocket' @ws.format : 'pcp' + @ws.path : 'fns-websocket' event notify { - text : String + text : String; }; } diff --git a/test/_env/util/socket.io.js b/test/_env/util/socket.io.js index 23bf9d2..6d156b3 100644 --- a/test/_env/util/socket.io.js +++ b/test/_env/util/socket.io.js @@ -5,12 +5,13 @@ const ioc = require("socket.io-client"); const auth = require("./auth"); -async function connect(service, options = {}) { +async function connect(service, options = {}, headers) { const port = cds.app.server.address().port; const socket = ioc(`http://localhost:${port}/${service}${options?.id ? `?id=${options?.id}` : ""}`, { path: "/ws", extraHeaders: { authorization: options?.authorization || auth.alice, + ...headers, }, }); cds.io.of(service).once("connection", (serverSocket) => { diff --git a/test/_env/util/ws.js b/test/_env/util/ws.js index e030336..3d0b631 100644 --- a/test/_env/util/ws.js +++ b/test/_env/util/ws.js @@ -5,11 +5,13 @@ const WebSocket = require("ws"); const auth = require("./auth"); -async function connect(service, options = {}) { +async function connect(service, options = {}, headers = {}, protoocls) { const port = cds.app.server.address().port; - const socket = new WebSocket(`ws://localhost:${port}` + service, { + protoocls ??= []; + const socket = new WebSocket(`ws://localhost:${port}` + service, protoocls, { headers: { authorization: options?.authorization || auth.alice, + ...headers, }, }); cds.wss.once("connection", async (serverSocket) => { @@ -80,12 +82,13 @@ async function waitForNoEvent(socket, event, timeout = 100) { }); } -async function waitForMessage(socket, event, cb) { +async function waitForMessage(socket, event, cb, parse) { _initListeners(socket); return new Promise((resolve) => { socket._listeners.push((message) => { message = message.toString(); if (message.includes(event)) { + message = parse ? JSON.parse(message) : message; resolve(message); cb && cb(message); } diff --git a/test/socketio/cloudevent_socket.io.test.js b/test/socketio/cloudevent_socket.io.test.js new file mode 100644 index 0000000..9e12540 --- /dev/null +++ b/test/socketio/cloudevent_socket.io.test.js @@ -0,0 +1,148 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const { connect, disconnect, emitEvent, emitMessage, waitForEvent } = require("../_env/util/socket.io"); + +cds.test(__dirname + "/../_env"); + +cds.env.websocket = { + kind: "socket.io", + impl: null, +}; + +const cloudEvent = { + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}; + +const cloudEventModel = { + ...cloudEvent, + type: "com.example.someevent.model", +}; + +const cloudEventMap = { + ...cloudEvent, + type: "com.example.someevent.map", +}; + +const cloudEvent1 = { + specversion: "1.0", + type: "CloudEventService.cloudEvent1", + source: "CloudEventService", + subject: null, + id: expect.any(String), + time: expect.any(String), + datacontenttype: "application/json", + data: { + appinfoA: "abcd", + appinfoB: 1234, + appinfoC: false, + }, +}; + +const cloudEvent2 = { + ...cloudEvent1, + specversion: "1.1", + type: "com.example.someevent.cloudEvent2", + source: "/mycontext", + subject: "example", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/cloudevents+json", +}; + +const cloudEvent3 = { + ...cloudEvent2, + type: "com.example.someevent.cloudEvent3", +}; + +const cloudEvent4 = { + ...cloudEvent3, + type: "com.example.someevent.cloudEvent4", +}; + +const cloudEvent5 = { + ...cloudEvent4, + type: "com.example.someevent.cloudEvent5", +}; + +describe("CloudEvent", () => { + let socket; + + beforeAll(async () => { + socket = await connect( + "cloudevent", + {}, + { + "Sec-WebSocket-Protocol": "cloudevents.json", + }, + ); + }); + + afterAll(async () => { + await disconnect(socket); + }); + + test("Cloud Event Protocol", async () => { + expect(socket._opts.extraHeaders["Sec-WebSocket-Protocol"]).toEqual("cloudevents.json"); + expect(socket.serverSocket.handshake.headers["sec-websocket-protocol"]).toEqual("cloudevents.json"); + }); + + test("Cloud event (modeling)", async () => { + const waitCloudEvent1Promise = waitForEvent(socket, "cloudEvent1"); + const waitCloudEvent2Promise = waitForEvent(socket, "cloudEvent2"); + const waitCloudEvent3Promise = waitForEvent(socket, "cloudEvent3"); + const waitCloudEvent4Promise = waitForEvent(socket, "cloudEvent4"); + const waitCloudEvent5Promise = waitForEvent(socket, "cloudEvent5"); + const result = await emitEvent(socket, "sendCloudEventModel", cloudEventModel); + expect(result).toBe(true); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual(cloudEvent3); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual(cloudEvent4); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual(cloudEvent5); + }); + + test("Cloud event (mapping)", async () => { + const waitCloudEvent1Promise = waitForEvent(socket, "cloudEvent1"); + const waitCloudEvent2Promise = waitForEvent(socket, "cloudEvent2"); + const waitCloudEvent3Promise = waitForEvent(socket, "cloudEvent3"); + const waitCloudEvent4Promise = waitForEvent(socket, "cloudEvent4"); + const waitCloudEvent5Promise = waitForEvent(socket, "cloudEvent5"); + const result = await emitEvent(socket, "sendCloudEventMap", cloudEventMap); + expect(result).toBe(true); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual({ ...cloudEvent3, subject: "cloud-example" }); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual({ ...cloudEvent4, subject: "cloud-example" }); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual({ ...cloudEvent5, subject: "cloud-example" }); + }); + + test("Cloud event format error", async () => { + const result = await emitMessage(socket, "This is not a Cloud Event message!"); + expect(result).toEqual(null); + }); +}); diff --git a/test/socketio/fns_socket.io.test.js b/test/socketio/fns_socket.io.test.js index 36a88b4..216fef9 100644 --- a/test/socketio/fns_socket.io.test.js +++ b/test/socketio/fns_socket.io.test.js @@ -52,8 +52,10 @@ describe("Fns", () => { expect(waitResult).toMatchObject({ ID }); const waitFnsResult = await waitNotifyPromise; expect(waitFnsResult).toEqual(`pcp-action:MESSAGE +pcp-event:notify pcp-body-type:text +text:4711 -notify`); +`); }); }); diff --git a/test/socketio/pcp_socket.io.test.js b/test/socketio/pcp_socket.io.test.js index f4de538..2d39a10 100644 --- a/test/socketio/pcp_socket.io.test.js +++ b/test/socketio/pcp_socket.io.test.js @@ -42,6 +42,15 @@ field2:value2 `; +const pcpMessage4 = `pcp-action:ABC +pcp-event:notification4 +pcp-body-type:text +action:MESSAGE +field1:value1 +field2:value2 + +Header`; + describe("PCP", () => { let socket; @@ -57,6 +66,7 @@ describe("PCP", () => { const waitNotification1Promise = waitForEvent(socket, "notification1"); const waitNotification2Promise = waitForEvent(socket, "notification2"); const waitNotification3Promise = waitForEvent(socket, "notification3"); + const waitNotification4Promise = waitForEvent(socket, "notification4"); const result = await emitEvent(socket, "sendNotification", pcpMessage); expect(result).toBe(true); const waitResult1 = await waitNotification1Promise; @@ -65,6 +75,8 @@ describe("PCP", () => { expect(waitResult2).toEqual(pcpMessage2); const waitResult3 = await waitNotification3Promise; expect(waitResult3).toEqual(pcpMessage3); + const waitResult4 = await waitNotification4Promise; + expect(waitResult4).toEqual(pcpMessage4); }); test("PCP format error", async () => { diff --git a/test/ws/cloudevent_ws.test.js b/test/ws/cloudevent_ws.test.js new file mode 100644 index 0000000..3aaf0d3 --- /dev/null +++ b/test/ws/cloudevent_ws.test.js @@ -0,0 +1,143 @@ +"use strict"; + +const cds = require("@sap/cds"); + +const { connect, disconnect, emitMessage, waitForMessage } = require("../_env/util/ws"); + +cds.test(__dirname + "/../_env"); + +const cloudEvent = { + specversion: "1.0", + type: "com.example.someevent", + source: "/mycontext", + subject: null, + id: "C234-1234-1234", + time: "2018-04-05T17:31:00Z", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/json", + data: { + appinfoA: "abc", + appinfoB: 123, + appinfoC: true, + }, +}; + +const cloudEventModel = { + ...cloudEvent, + type: "com.example.someevent.model", +}; + +const cloudEventMap = { + ...cloudEvent, + type: "com.example.someevent.map", +}; + +const cloudEvent1 = { + specversion: "1.0", + type: "CloudEventService.cloudEvent1", + source: "CloudEventService", + subject: null, + id: expect.any(String), + time: expect.any(String), + datacontenttype: "application/json", + data: { + appinfoA: "abcd", + appinfoB: 1234, + appinfoC: false, + }, +}; + +const cloudEvent2 = { + ...cloudEvent1, + specversion: "1.1", + type: "com.example.someevent.cloudEvent2", + source: "/mycontext", + subject: "example", + comexampleextension1: "value", + comexampleothervalue: 5, + datacontenttype: "application/cloudevents+json", +}; + +const cloudEvent3 = { + ...cloudEvent2, + type: "com.example.someevent.cloudEvent3", +}; + +const cloudEvent4 = { + ...cloudEvent3, + type: "com.example.someevent.cloudEvent4", +}; + +const cloudEvent5 = { + ...cloudEvent4, + type: "com.example.someevent.cloudEvent5", +}; + +describe("CloudEvent", () => { + let socket; + + beforeAll(async () => { + socket = await connect( + "/ws/cloudevent", + {}, + { + "Sec-WebSocket-Protocol": "cloudevents.json", + }, + ["cloudevents.json"], + ); + }); + + afterAll(async () => { + await disconnect(socket); + }); + + test("Event Cloud Protocol", async () => { + expect(socket._protocol).toEqual("cloudevents.json"); + }); + + test("Cloud event (modeling)", async () => { + const waitCloudEvent1Promise = waitForMessage(socket, "cloudEvent1", null, true); + const waitCloudEvent2Promise = waitForMessage(socket, "cloudEvent2", null, true); + const waitCloudEvent3Promise = waitForMessage(socket, "cloudEvent3", null, true); + const waitCloudEvent4Promise = waitForMessage(socket, "cloudEvent4", null, true); + const waitCloudEvent5Promise = waitForMessage(socket, "cloudEvent5", null, true); + const result = await emitMessage(socket, JSON.stringify(cloudEventModel)); + expect(result).toBeNull(); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual(cloudEvent3); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual(cloudEvent4); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual(cloudEvent5); + }); + + test("Cloud event (mapping)", async () => { + const waitCloudEvent1Promise = waitForMessage(socket, "cloudEvent1", null, true); + const waitCloudEvent2Promise = waitForMessage(socket, "cloudEvent2", null, true); + const waitCloudEvent3Promise = waitForMessage(socket, "cloudEvent3", null, true); + const waitCloudEvent4Promise = waitForMessage(socket, "cloudEvent4", null, true); + const waitCloudEvent5Promise = waitForMessage(socket, "cloudEvent5", null, true); + const result = await emitMessage(socket, JSON.stringify(cloudEventMap)); + expect(result).toBeNull(); + const waitResult1 = await waitCloudEvent1Promise; + expect(waitResult1).toEqual(cloudEvent1); + const waitResult2 = await waitCloudEvent2Promise; + expect(waitResult2).toEqual(cloudEvent2); + const waitResult3 = await waitCloudEvent3Promise; + expect(waitResult3).toEqual({ ...cloudEvent3, subject: "cloud-example" }); + const waitResult4 = await waitCloudEvent4Promise; + expect(waitResult4).toEqual({ ...cloudEvent4, subject: "cloud-example" }); + const waitResult5 = await waitCloudEvent5Promise; + expect(waitResult5).toEqual({ ...cloudEvent5, subject: "cloud-example" }); + }); + + test("Cloud event format error", async () => { + const result = await emitMessage(socket, "This is not a Cloud Event message!"); + expect(result).toEqual(null); + }); +}); diff --git a/test/ws/fns_ws.test.js b/test/ws/fns_ws.test.js index 5b39b5d..eb1500a 100644 --- a/test/ws/fns_ws.test.js +++ b/test/ws/fns_ws.test.js @@ -47,8 +47,10 @@ describe("Fns", () => { expect(waitResult).toMatchObject({ ID }); const waitFnsResult = await waitNotifyPromise; expect(waitFnsResult).toEqual(`pcp-action:MESSAGE +pcp-event:notify pcp-body-type:text +text:4711 -notify`); +`); }); }); diff --git a/test/ws/pcp_ws.test.js b/test/ws/pcp_ws.test.js index 8efc978..9204fe3 100644 --- a/test/ws/pcp_ws.test.js +++ b/test/ws/pcp_ws.test.js @@ -37,6 +37,15 @@ field2:value2 `; +const pcpMessage4 = `pcp-action:ABC +pcp-event:notification4 +pcp-body-type:text +action:MESSAGE +field1:value1 +field2:value2 + +Header`; + describe("PCP", () => { let socket; @@ -52,6 +61,7 @@ describe("PCP", () => { const waitNotification1Promise = waitForMessage(socket, "notification1"); const waitNotification2Promise = waitForMessage(socket, "notification2"); const waitNotification3Promise = waitForMessage(socket, "notification3"); + const waitNotification4Promise = waitForMessage(socket, "notification4"); const result = await emitMessage(socket, pcpMessage); expect(result).toBeNull(); const waitResult1 = await waitNotification1Promise; @@ -60,6 +70,8 @@ describe("PCP", () => { expect(waitResult2).toEqual(pcpMessage2); const waitResult3 = await waitNotification3Promise; expect(waitResult3).toEqual(pcpMessage3); + const waitResult4 = await waitNotification4Promise; + expect(waitResult4).toEqual(pcpMessage4); }); test("PCP format error", async () => { diff --git a/test/ws/redis_ws.test.js b/test/ws/redis_ws.test.js index 255f189..aa44414 100644 --- a/test/ws/redis_ws.test.js +++ b/test/ws/redis_ws.test.js @@ -52,11 +52,12 @@ describe("Redis", () => { expect(redis.client.connect).toHaveBeenCalledWith(); expect(redis.client.on).toHaveBeenNthCalledWith(1, "error", expect.any(Function)); expect(redis.client.subscribe).toHaveBeenNthCalledWith(1, "websocket/chat", expect.any(Function)); - expect(redis.client.subscribe).toHaveBeenNthCalledWith(2, "websocket/fns-websocket", expect.any(Function)); - expect(redis.client.subscribe).toHaveBeenNthCalledWith(3, "websocket/main", expect.any(Function)); + expect(redis.client.subscribe).toHaveBeenNthCalledWith(2, "websocket/cloudevent", expect.any(Function)); + expect(redis.client.subscribe).toHaveBeenNthCalledWith(3, "websocket/fns-websocket", expect.any(Function)); + expect(redis.client.subscribe).toHaveBeenNthCalledWith(4, "websocket/main", expect.any(Function)); expect(redis.client.publish).toHaveBeenCalledWith( "websocket/chat", - `{"event":"received","data":{"text":"test","user":"alice"},"tenant":"t1"}`, + '{"event":"received","data":{"text":"test","user":"alice"},"tenant":"t1","headers":{"header":"value"}}', ); // Duplicated because Redis mock publishes to same client (not done for real Redis)