diff --git a/.eslintrc.js b/.eslintrc.js index c3ccc1d9..de7c40cc 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -21,12 +21,7 @@ module.exports = { DROP: false, }, plugins: ["jest", "node"], - extends: [ - "eslint:recommended", - "plugin:jest/recommended", - "plugin:@sap/cds/recommended", - "prettier", - ], + extends: ["eslint:recommended", "plugin:jest/recommended", "plugin:@sap/cds/recommended", "prettier"], rules: { "no-unused-vars": [ "error", diff --git a/.prettierrc.yml b/.prettierrc.yml new file mode 100644 index 00000000..90af0fd3 --- /dev/null +++ b/.prettierrc.yml @@ -0,0 +1,3 @@ +--- +printWidth: 120 +trailingComma: es5 diff --git a/docs/implement-event/index.md b/docs/implement-event/index.md index 8010abc7..2d1a1df4 100644 --- a/docs/implement-event/index.md +++ b/docs/implement-event/index.md @@ -42,10 +42,7 @@ for each queueEntry. Each queueEntry may have a different status. ```js "use strict"; -const { - EventQueueBaseClass, - EventProcessingStatus, -} = require("@cap-js-community/event-queue"); +const { EventQueueBaseClass, EventProcessingStatus } = require("@cap-js-community/event-queue"); class EventQueueMinimalistic extends EventQueueBaseClass { constructor(context, eventType, eventSubType, config) { @@ -96,10 +93,7 @@ class EventQueueAdvanced extends EventQueueBaseClass { } async checkEventAndGeneratePayload(queueEntry) { - const eventStillValid = await checkEventIsStillValid( - this.tx, - queueEntry.payload - ); + const eventStillValid = await checkEventIsStillValid(this.tx, queueEntry.payload); if (!eventStillValid) { this.logger.info("Event not valid anymore, skipping processing", { eventType: this.__eventType, diff --git a/src/EventQueueError.js b/src/EventQueueError.js index fcec8466..78581903 100644 --- a/src/EventQueueError.js +++ b/src/EventQueueError.js @@ -15,12 +15,10 @@ const ERROR_CODES = { const ERROR_CODES_META = { [ERROR_CODES.WRONG_TX_USAGE]: { - message: - "Usage of this.tx|this.context is not allowed if parallel event processing is enabled", + message: "Usage of this.tx|this.context is not allowed if parallel event processing is enabled", }, [ERROR_CODES.UNKNOWN_EVENT_TYPE]: { - message: - "The event type and subType configuration is not configured! Maintain the combination in the config file.", + message: "The event type and subType configuration is not configured! Maintain the combination in the config file.", }, [ERROR_CODES.NOT_INITIALIZED]: { message: @@ -33,16 +31,13 @@ const ERROR_CODES_META = { message: "disabled reconnect, because we are not running on cloud foundry", }, [ERROR_CODES.MISSING_TABLE_DEFINITION]: { - message: - "Could not find table in csn. Make sure the provided table name is correct and the table is known by CDS.", + message: "Could not find table in csn. Make sure the provided table name is correct and the table is known by CDS.", }, [ERROR_CODES.MISSING_ELEMENT_IN_TABLE]: { - message: - "The provided table doesn't match the required structure. At least the following element is missing.", + message: "The provided table doesn't match the required structure. At least the following element is missing.", }, [ERROR_CODES.TYPE_MISMATCH_TABLE]: { - message: - "At least one field in the provided table doesn't have the expected data type.", + message: "At least one field in the provided table doesn't have the expected data type.", }, }; diff --git a/src/EventQueueProcessorBase.js b/src/EventQueueProcessorBase.js index 6ac0344d..0ea05c12 100644 --- a/src/EventQueueProcessorBase.js +++ b/src/EventQueueProcessorBase.js @@ -34,28 +34,22 @@ class EventQueueProcessorBase { this.__eventSubType = eventSubType; this.__queueEntriesWithPayloadMap = {}; this.__config = config ?? {}; - this.__parallelEventProcessing = - this.__config.parallelEventProcessing ?? - DEFAULT_PARALLEL_EVENT_PROCESSING; + this.__parallelEventProcessing = this.__config.parallelEventProcessing ?? DEFAULT_PARALLEL_EVENT_PROCESSING; if (this.__parallelEventProcessing > LIMIT_PARALLEL_EVENT_PROCESSING) { this.__parallelEventProcessing = LIMIT_PARALLEL_EVENT_PROCESSING; } // NOTE: keep the feature, this might be needed again this.__concurrentEventProcessing = false; this.__startTime = this.__config.startTime ?? new Date(); - this.__retryAttempts = - this.__config.retryAttempts ?? DEFAULT_RETRY_ATTEMPTS; - this.__selectMaxChunkSize = - this.__config.selectMaxChunkSize ?? SELECT_LIMIT_EVENTS_PER_TICK; + this.__retryAttempts = this.__config.retryAttempts ?? DEFAULT_RETRY_ATTEMPTS; + this.__selectMaxChunkSize = this.__config.selectMaxChunkSize ?? SELECT_LIMIT_EVENTS_PER_TICK; this.__selectNextChunk = !!this.__config.checkForNextChunk; this.__keepalivePromises = {}; this.__outdatedCheckEnabled = this.__config.eventOutdatedCheck ?? true; - this.__transactionMode = - this.__config.transactionMode ?? TransactionMode.isolated; + this.__transactionMode = this.__config.transactionMode ?? TransactionMode.isolated; if (this.__config.deleteFinishedEventsAfterDays) { this.__deleteFinishedEventsAfter = - Number.isInteger(this.__config.deleteFinishedEventsAfterDays) && - this.__config.deleteFinishedEventsAfterDays > 0 + Number.isInteger(this.__config.deleteFinishedEventsAfterDays) && this.__config.deleteFinishedEventsAfterDays > 0 ? this.__config.deleteFinishedEventsAfterDays : DEFAULT_DELETE_FINISHED_EVENTS_AFTER; } else { @@ -89,17 +83,11 @@ class EventQueueProcessorBase { } startPerformanceTracerEvents() { - this.__performanceLoggerEvents = new PerformanceTracer( - this.logger, - "Processing events" - ); + this.__performanceLoggerEvents = new PerformanceTracer(this.logger, "Processing events"); } startPerformanceTracerPreprocessing() { - this.__performanceLoggerPreprocessing = new PerformanceTracer( - this.logger, - "Preprocessing events" - ); + this.__performanceLoggerPreprocessing = new PerformanceTracer(this.logger, "Preprocessing events"); } endPerformanceTracerEvents() { @@ -187,10 +175,7 @@ class EventQueueProcessorBase { eventType: this.__eventType, eventSubType: this.__eventSubType, }); - this.#determineAndAddEventStatusToMap( - queueEntry.ID, - EventProcessingStatus.Done - ); + this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Done); } /** @@ -201,11 +186,9 @@ class EventQueueProcessorBase { * In this case the events should be clustered together and only one mail should be sent. */ clusterQueueEntries() { - Object.entries(this.__queueEntriesWithPayloadMap).forEach( - ([key, { queueEntry, payload }]) => { - this.addEntryToProcessingMap(key, queueEntry, payload); - } - ); + Object.entries(this.__queueEntriesWithPayloadMap).forEach(([key, { queueEntry, payload }]) => { + this.addEntryToProcessingMap(key, queueEntry, payload); + }); } /** @@ -240,9 +223,7 @@ class EventQueueProcessorBase { */ setEventStatus(queueEntries, queueEntryProcessingStatusTuple) { this.logger.debug("setting event status for entries", { - queueEntryProcessingStatusTuple: JSON.stringify( - queueEntryProcessingStatusTuple - ), + queueEntryProcessingStatusTuple: JSON.stringify(queueEntryProcessingStatusTuple), eventType: this.__eventType, eventSubType: this.__eventSubType, }); @@ -253,11 +234,7 @@ class EventQueueProcessorBase { ); } catch (error) { queueEntries.forEach((queueEntry) => - this.#determineAndAddEventStatusToMap( - queueEntry.ID, - EventProcessingStatus.Error, - statusMap - ) + this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Error, statusMap) ); this.logger.error( `The supplied status tuple doesn't have the required structure. Setting all entries to error. Error: ${error.toString()}`, @@ -284,20 +261,12 @@ class EventQueueProcessorBase { } } - #determineAndAddEventStatusToMap( - id, - processingStatus, - statusMap = this.__statusMap - ) { + #determineAndAddEventStatusToMap(id, processingStatus, statusMap = this.__statusMap) { if (!statusMap[id]) { statusMap[id] = processingStatus; return; } - if ( - [EventProcessingStatus.Error, EventProcessingStatus.Exceeded].includes( - statusMap[id] - ) - ) { + if ([EventProcessingStatus.Error, EventProcessingStatus.Exceeded].includes(statusMap[id])) { // NOTE: worst aggregation --> if already error|exceeded keep this state return; } @@ -317,17 +286,9 @@ class EventQueueProcessorBase { } ); queueEntries.forEach((queueEntry) => - this.#determineAndAddEventStatusToMap( - queueEntry.ID, - EventProcessingStatus.Error - ) - ); - return Object.fromEntries( - queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Error, - ]) + this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Error) ); + return Object.fromEntries(queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Error])); } /** @@ -335,10 +296,7 @@ class EventQueueProcessorBase { * selected events a status has been submitted. Persisting the status of events is done in a dedicated database tx. * The function accepts no arguments as there are dedicated functions to set the status of events (e.g. setEventStatus) */ - async persistEventStatus( - tx, - { skipChecks, statusMap = this.__statusMap } = {} - ) { + async persistEventStatus(tx, { skipChecks, statusMap = this.__statusMap } = {}) { this.logger.debug("entering persistEventStatus", { eventType: this.__eventType, eventSubType: this.__eventSubType, @@ -349,9 +307,7 @@ class EventQueueProcessorBase { } this.#ensureEveryStatusIsAllowed(statusMap); - const { success, failed, exceeded, invalidAttempts } = Object.entries( - statusMap - ).reduce( + const { success, failed, exceeded, invalidAttempts } = Object.entries(statusMap).reduce( (result, [notificationEntityId, processingStatus]) => { this.__commitedStatusMap[notificationEntityId] = processingStatus; if (processingStatus === EventProcessingStatus.Open) { @@ -403,22 +359,18 @@ class EventQueueProcessorBase { } if (failed.length) { await tx.run( - UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue) - .where("ID IN", failed) - .with({ - status: EventProcessingStatus.Error, - lastAttemptTimestamp: new Date().toISOString(), - }) + UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue).where("ID IN", failed).with({ + status: EventProcessingStatus.Error, + lastAttemptTimestamp: new Date().toISOString(), + }) ); } if (exceeded.length) { await tx.run( - UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue) - .where("ID IN", exceeded) - .with({ - status: EventProcessingStatus.Exceeded, - lastAttemptTimestamp: new Date().toISOString(), - }) + UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue).where("ID IN", exceeded).with({ + status: EventProcessingStatus.Exceeded, + lastAttemptTimestamp: new Date().toISOString(), + }) ); } this.logger.debug("exiting persistEventStatus", { @@ -438,9 +390,7 @@ class EventQueueProcessorBase { "AND subType=", this.eventSubType, "AND lastAttemptTimestamp <=", - new Date( - Date.now() - this.__deleteFinishedEventsAfter * DAYS_TO_MS - ).toISOString() + new Date(Date.now() - this.__deleteFinishedEventsAfter * DAYS_TO_MS).toISOString() ) ); this.logger.debug("Deleted finished events", { @@ -453,24 +403,15 @@ class EventQueueProcessorBase { #ensureEveryQueueEntryHasStatus() { this.__queueEntries.forEach((queueEntry) => { - if ( - queueEntry.ID in this.__statusMap || - queueEntry.ID in this.__commitedStatusMap - ) { + if (queueEntry.ID in this.__statusMap || queueEntry.ID in this.__commitedStatusMap) { return; } - this.logger.error( - "Missing status for selected event entry. Setting status to error", - { - eventType: this.__eventType, - eventSubType: this.__eventSubType, - queueEntry, - } - ); - this.#determineAndAddEventStatusToMap( - queueEntry.ID, - EventProcessingStatus.Error - ); + this.logger.error("Missing status for selected event entry. Setting status to error", { + eventType: this.__eventType, + eventSubType: this.__eventSubType, + queueEntry, + }); + this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Error); }); } @@ -487,15 +428,12 @@ class EventQueueProcessorBase { return; } - this.logger.error( - "Not allowed event status returned. Only Open, Done, Error is allowed!", - { - eventType: this.__eventType, - eventSubType: this.__eventSubType, - queueEntryId, - status: statusMap[queueEntryId], - } - ); + this.logger.error("Not allowed event status returned. Only Open, Done, Error is allowed!", { + eventType: this.__eventType, + eventSubType: this.__eventSubType, + queueEntryId, + status: statusMap[queueEntryId], + }); delete statusMap[queueEntryId]; }); } @@ -519,18 +457,12 @@ class EventQueueProcessorBase { } handleErrorDuringClustering(error) { - this.logger.error( - `Error during clustering of events - setting all queue entries to error. Error: ${error}`, - { - eventType: this.__eventType, - eventSubType: this.__eventSubType, - } - ); + this.logger.error(`Error during clustering of events - setting all queue entries to error. Error: ${error}`, { + eventType: this.__eventType, + eventSubType: this.__eventSubType, + }); this.__queueEntries.forEach((queueEntry) => { - this.#determineAndAddEventStatusToMap( - queueEntry.ID, - EventProcessingStatus.Error - ); + this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Error); }); } @@ -543,29 +475,15 @@ class EventQueueProcessorBase { eventSubType: this.__eventSubType, } ); - this.#determineAndAddEventStatusToMap( - queueEntry.ID, - EventProcessingStatus.Error - ); + this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Error); } - static async handleMissingTypeImplementation( - context, - eventType, - eventSubType - ) { - const baseInstance = new EventQueueProcessorBase( - context, + static async handleMissingTypeImplementation(context, eventType, eventSubType) { + const baseInstance = new EventQueueProcessorBase(context, eventType, eventSubType); + baseInstance.logger.error("No Implementation found in the provided configuration file.", { eventType, - eventSubType - ); - baseInstance.logger.error( - "No Implementation found in the provided configuration file.", - { - eventType, - eventSubType, - } - ); + eventSubType, + }); } /** @@ -577,87 +495,80 @@ class EventQueueProcessorBase { */ async getQueueEntriesAndSetToInProgress() { let result = []; - await executeInNewTransaction( - this.__baseContext, - "eventQueue-getQueueEntriesAndSetToInProgress", - async (tx) => { - const entries = await tx.run( - SELECT.from(this.__eventQueueConfig.tableNameEventQueue) - .forUpdate({ wait: this.__eventQueueConfig.forUpdateTimeout }) - .limit(this.getSelectMaxChunkSize()) - .where( - "type =", - this.__eventType, - "AND subType=", - this.__eventSubType, - "AND ( status =", - EventProcessingStatus.Open, - "OR ( status =", - EventProcessingStatus.Error, - "AND lastAttemptTimestamp <=", - this.__startTime.toISOString(), - ") OR ( status =", - EventProcessingStatus.InProgress, - "AND lastAttemptTimestamp <=", - new Date( - new Date().getTime() - this.__eventQueueConfig.globalTxTimeout - ).toISOString(), - ") )" - ) - .orderBy("createdAt", "ID") - ); - - if (!entries.length) { - this.logger.debug("no entries available for processing", { - eventType: this.__eventType, - eventSubType: this.__eventSubType, - }); - this.__emptyChunkSelected = true; - return; - } - - const { exceededTries, openEvents } = - this.#filterExceededEvents(entries); - if (exceededTries.length) { - this.__eventsWithExceededTries = exceededTries; - } - result = openEvents; - - if (!result.length) { - this.__emptyChunkSelected = true; - return; - } + await executeInNewTransaction(this.__baseContext, "eventQueue-getQueueEntriesAndSetToInProgress", async (tx) => { + const entries = await tx.run( + SELECT.from(this.__eventQueueConfig.tableNameEventQueue) + .forUpdate({ wait: this.__eventQueueConfig.forUpdateTimeout }) + .limit(this.getSelectMaxChunkSize()) + .where( + "type =", + this.__eventType, + "AND subType=", + this.__eventSubType, + "AND ( status =", + EventProcessingStatus.Open, + "OR ( status =", + EventProcessingStatus.Error, + "AND lastAttemptTimestamp <=", + this.__startTime.toISOString(), + ") OR ( status =", + EventProcessingStatus.InProgress, + "AND lastAttemptTimestamp <=", + new Date(new Date().getTime() - this.__eventQueueConfig.globalTxTimeout).toISOString(), + ") )" + ) + .orderBy("createdAt", "ID") + ); - this.logger.info("Selected event queue entries for processing", { - queueEntriesCount: result.length, + if (!entries.length) { + this.logger.debug("no entries available for processing", { eventType: this.__eventType, eventSubType: this.__eventSubType, }); + this.__emptyChunkSelected = true; + return; + } - const isoTimestamp = new Date().toISOString(); - await tx.run( - UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue) - .with({ - status: EventProcessingStatus.InProgress, - lastAttemptTimestamp: isoTimestamp, - attempts: { "+=": 1 }, - }) - .where( - "ID IN", - result.map(({ ID }) => ID) - ) - ); - result.forEach((entry) => { - entry.lastAttemptTimestamp = isoTimestamp; - // NOTE: empty payloads are supported on DB-Level. - // Behaviour of event queue is: null as payload is treated as obsolete/done - // For supporting this convert null to empty string --> "" as payload will be processed normally - if (entry.payload === null) { - entry.payload = ""; - } - }); + const { exceededTries, openEvents } = this.#filterExceededEvents(entries); + if (exceededTries.length) { + this.__eventsWithExceededTries = exceededTries; } - ); + result = openEvents; + + if (!result.length) { + this.__emptyChunkSelected = true; + return; + } + + this.logger.info("Selected event queue entries for processing", { + queueEntriesCount: result.length, + eventType: this.__eventType, + eventSubType: this.__eventSubType, + }); + + const isoTimestamp = new Date().toISOString(); + await tx.run( + UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue) + .with({ + status: EventProcessingStatus.InProgress, + lastAttemptTimestamp: isoTimestamp, + attempts: { "+=": 1 }, + }) + .where( + "ID IN", + result.map(({ ID }) => ID) + ) + ); + result.forEach((entry) => { + entry.lastAttemptTimestamp = isoTimestamp; + // NOTE: empty payloads are supported on DB-Level. + // Behaviour of event queue is: null as payload is treated as obsolete/done + // For supporting this convert null to empty string --> "" as payload will be processed normally + if (entry.payload === null) { + entry.payload = ""; + } + }); + }); this.__queueEntries = result; this.__queueEntriesMap = arrayToFlatMap(result); return result; @@ -688,15 +599,12 @@ class EventQueueProcessorBase { exceededEvents.map(({ ID }) => ID) ) ); - this.logger.error( - "The retry attempts for the following events are exceeded", - { - eventType: this.__eventType, - eventSubType: this.__eventSubType, - retryAttempts: this.__retryAttempts, - queueEntriesIds: exceededEvents.map(({ ID }) => ID), - } - ); + this.logger.error("The retry attempts for the following events are exceeded", { + eventType: this.__eventType, + eventSubType: this.__eventSubType, + retryAttempts: this.__retryAttempts, + queueEntriesIds: exceededEvents.map(({ ID }) => ID), + }); await this.hookForExceededEvents(exceededEvents); } @@ -730,9 +638,7 @@ class EventQueueProcessorBase { return false; } let eventOutdated; - const runningChecks = queueEntries - .map((queueEntry) => this.__keepalivePromises[queueEntry.ID]) - .filter((p) => p); + const runningChecks = queueEntries.map((queueEntry) => this.__keepalivePromises[queueEntry.ID]).filter((p) => p); if (runningChecks.length === queueEntries.length) { const results = await Promise.allSettled(runningChecks); for (const { value } of results) { @@ -745,70 +651,54 @@ class EventQueueProcessorBase { await Promise.allSettled(runningChecks); } const checkAndUpdatePromise = new Promise((resolve) => { - executeInNewTransaction( - this.__baseContext, - "eventProcessing-isOutdatedAndKeepalive", - async (tx) => { - const queueEntriesFresh = await tx.run( - SELECT.from(this.__eventQueueConfig.tableNameEventQueue) - .forUpdate({ wait: this.__eventQueueConfig.forUpdateTimeout }) + executeInNewTransaction(this.__baseContext, "eventProcessing-isOutdatedAndKeepalive", async (tx) => { + const queueEntriesFresh = await tx.run( + SELECT.from(this.__eventQueueConfig.tableNameEventQueue) + .forUpdate({ wait: this.__eventQueueConfig.forUpdateTimeout }) + .where( + "ID IN", + queueEntries.map(({ ID }) => ID) + ) + .columns("ID", "lastAttemptTimestamp") + ); + eventOutdated = queueEntriesFresh.some((queueEntryFresh) => { + const queueEntry = this.__queueEntriesMap[queueEntryFresh.ID]; + return queueEntry?.lastAttemptTimestamp !== queueEntryFresh.lastAttemptTimestamp; + }); + let newTs = new Date().toISOString(); + if (!eventOutdated) { + await tx.run( + UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue) + .set("lastAttemptTimestamp =", newTs) .where( "ID IN", queueEntries.map(({ ID }) => ID) ) - .columns("ID", "lastAttemptTimestamp") ); - eventOutdated = queueEntriesFresh.some((queueEntryFresh) => { - const queueEntry = this.__queueEntriesMap[queueEntryFresh.ID]; - return ( - queueEntry?.lastAttemptTimestamp !== - queueEntryFresh.lastAttemptTimestamp - ); - }); - let newTs = new Date().toISOString(); - if (!eventOutdated) { - await tx.run( - UPDATE.entity(this.__eventQueueConfig.tableNameEventQueue) - .set("lastAttemptTimestamp =", newTs) - .where( - "ID IN", - queueEntries.map(({ ID }) => ID) - ) - ); - } else { - newTs = null; - this.logger.warn( - "event data has been modified. Processing skipped.", - { - eventType: this.__eventType, - eventSubType: this.__eventSubType, - queueEntriesIds: queueEntries.map(({ ID }) => ID), - } - ); - queueEntries.forEach( - ({ ID: queueEntryId }) => - delete this.__queueEntriesMap[queueEntryId] - ); - } - this.__queueEntries = Object.values(this.__queueEntriesMap); - queueEntriesFresh.forEach((queueEntryFresh) => { - if (this.__queueEntriesMap[queueEntryFresh.ID]) { - const queueEntry = this.__queueEntriesMap[queueEntryFresh.ID]; - if (newTs) { - queueEntry.lastAttemptTimestamp = newTs; - } - } - delete this.__keepalivePromises[queueEntryFresh.ID]; + } else { + newTs = null; + this.logger.warn("event data has been modified. Processing skipped.", { + eventType: this.__eventType, + eventSubType: this.__eventSubType, + queueEntriesIds: queueEntries.map(({ ID }) => ID), }); - resolve(eventOutdated); + queueEntries.forEach(({ ID: queueEntryId }) => delete this.__queueEntriesMap[queueEntryId]); } - ); + this.__queueEntries = Object.values(this.__queueEntriesMap); + queueEntriesFresh.forEach((queueEntryFresh) => { + if (this.__queueEntriesMap[queueEntryFresh.ID]) { + const queueEntry = this.__queueEntriesMap[queueEntryFresh.ID]; + if (newTs) { + queueEntry.lastAttemptTimestamp = newTs; + } + } + delete this.__keepalivePromises[queueEntryFresh.ID]; + }); + resolve(eventOutdated); + }); }); - queueEntries.forEach( - (queueEntry) => - (this.__keepalivePromises[queueEntry.ID] = checkAndUpdatePromise) - ); + queueEntries.forEach((queueEntry) => (this.__keepalivePromises[queueEntry.ID] = checkAndUpdatePromise)); return await checkAndUpdatePromise; } @@ -833,15 +723,9 @@ class EventQueueProcessorBase { return; } try { - await distributedLock.releaseLock( - this.context, - [this.eventType, this.eventSubType].join("##") - ); + await distributedLock.releaseLock(this.context, [this.eventType, this.eventSubType].join("##")); } catch (err) { - this.logger.error( - "Releasing distributed lock failed. Error:", - err.toString() - ); + this.logger.error("Releasing distributed lock failed. Error:", err.toString()); } } diff --git a/src/config.js b/src/config.js index e4abdc99..29383397 100644 --- a/src/config.js +++ b/src/config.js @@ -34,9 +34,7 @@ class Config { } hasEventAfterCommitFlag(type, subType) { - return ( - this.__eventMap[[type, subType].join("##")]?.processAfterCommit ?? true - ); + return this.__eventMap[[type, subType].join("##")]?.processAfterCommit ?? true; } _checkRedisIsBound() { diff --git a/src/dbHandler.js b/src/dbHandler.js index 7e0810ff..e63b4e46 100644 --- a/src/dbHandler.js +++ b/src/dbHandler.js @@ -14,10 +14,7 @@ const registerEventQueueDbHandler = (dbService) => { const eventCombinations = Object.keys( data.reduce((result, event) => { const key = [event.type, event.subType].join("##"); - if ( - !configInstance.hasEventAfterCommitFlag(event.type, event.subType) || - eventQueuePublishEvents[key] - ) { + if (!configInstance.hasEventAfterCommitFlag(event.type, event.subType) || eventQueuePublishEvents[key]) { return result; } eventQueuePublishEvents[key] = true; diff --git a/src/initialize.js b/src/initialize.js index 1481919d..367bffa1 100644 --- a/src/initialize.js +++ b/src/initialize.js @@ -56,15 +56,11 @@ const initialize = async ({ ); const logger = cds.log(COMPONENT); - configInstance.fileContent = await readConfigFromFile( - configInstance.configFilePath - ); + configInstance.fileContent = await readConfigFromFile(configInstance.configFilePath); configInstance.calculateIsRedisEnabled(); const dbService = await cds.connect.to("db"); - await (cds.model - ? Promise.resolve() - : new Promise((resolve) => cds.on("serving", resolve))); + await (cds.model ? Promise.resolve() : new Promise((resolve) => cds.on("serving", resolve))); !configInstance.skipCsnCheck && (await csnCheck()); if (configInstance.processEventsAfterPublish) { dbHandler.registerEventQueueDbHandler(dbService); @@ -152,8 +148,7 @@ const checkCustomTable = (baseCsn, customCsn) => { if ( customCsn.elements[columnName].type !== "cds.Association" && - customCsn.elements[columnName].type !== - baseCsn.elements[columnName].type && + customCsn.elements[columnName].type !== baseCsn.elements[columnName].type && columnName === "status" && customCsn.elements[columnName].type !== "cds.Integer" ) { @@ -174,32 +169,18 @@ const mixConfigVarsWithEnv = ( ) => { const configInstance = getConfigInstance(); - configInstance.configFilePath = - configFilePath ?? cds.env.eventQueue?.configFilePath; + configInstance.configFilePath = configFilePath ?? cds.env.eventQueue?.configFilePath; configInstance.registerAsEventProcessor = - registerAsEventProcessor ?? - cds.env.eventQueue?.registerAsEventProcessor ?? - true; + registerAsEventProcessor ?? cds.env.eventQueue?.registerAsEventProcessor ?? true; configInstance.processEventsAfterPublish = - processEventsAfterPublish ?? - cds.env.eventQueue?.processEventsAfterPublish ?? - true; - configInstance.runInterval = - runInterval ?? cds.env.eventQueue?.runInterval ?? 5 * 60 * 1000; + processEventsAfterPublish ?? cds.env.eventQueue?.processEventsAfterPublish ?? true; + configInstance.runInterval = runInterval ?? cds.env.eventQueue?.runInterval ?? 5 * 60 * 1000; configInstance.parallelTenantProcessing = - parallelTenantProcessing ?? - cds.env.eventQueue?.parallelTenantProcessing ?? - 5; + parallelTenantProcessing ?? cds.env.eventQueue?.parallelTenantProcessing ?? 5; configInstance.tableNameEventQueue = - tableNameEventQueue ?? - cds.env.eventQueue?.tableNameEventQueue ?? - BASE_TABLES.EVENT; - configInstance.tableNameEventLock = - tableNameEventLock ?? - cds.env.eventQueue?.tableNameEventLock ?? - BASE_TABLES.LOCK; - configInstance.skipCsnCheck = - skipCsnCheck ?? cds.env.eventQueue?.skipCsnCheck ?? false; + tableNameEventQueue ?? cds.env.eventQueue?.tableNameEventQueue ?? BASE_TABLES.EVENT; + configInstance.tableNameEventLock = tableNameEventLock ?? cds.env.eventQueue?.tableNameEventLock ?? BASE_TABLES.LOCK; + configInstance.skipCsnCheck = skipCsnCheck ?? cds.env.eventQueue?.skipCsnCheck ?? false; }; module.exports = { diff --git a/src/processEventQueue.js b/src/processEventQueue.js index a6fe4342..a19ff50e 100644 --- a/src/processEventQueue.js +++ b/src/processEventQueue.js @@ -9,10 +9,7 @@ const { TransactionMode } = require("./constants"); const { limiter, Funnel } = require("./shared/common"); const EventQueueBase = require("./EventQueueProcessorBase"); -const { - executeInNewTransaction, - TriggerRollback, -} = require("./shared/cdsHelper"); +const { executeInNewTransaction, TriggerRollback } = require("./shared/cdsHelper"); const COMPONENT_NAME = "eventQueue/processEventQueue"; const MAX_EXECUTION_TIME = 5 * 60 * 1000; @@ -22,47 +19,24 @@ const eventQueueRunner = async (context, events) => { const funnel = new Funnel(); await Promise.allSettled( events.map((event) => - funnel.run(event.load, async () => - processEventQueue(context, event.type, event.subType, startTime) - ) + funnel.run(event.load, async () => processEventQueue(context, event.type, event.subType, startTime)) ) ); }; -const processEventQueue = async ( - context, - eventType, - eventSubType, - startTime = new Date() -) => { +const processEventQueue = async (context, eventType, eventSubType, startTime = new Date()) => { let iterationCounter = 0; let shouldContinue = true; let baseInstance; try { let eventTypeInstance; - const eventConfig = getConfigInstance().getEventConfig( - eventType, - eventSubType - ); + const eventConfig = getConfigInstance().getEventConfig(eventType, eventSubType); const [err, EventTypeClass] = resilientRequire(eventConfig?.impl); - if ( - !eventConfig || - err || - !(typeof EventTypeClass.constructor === "function") - ) { - await EventQueueBase.handleMissingTypeImplementation( - context, - eventType, - eventSubType - ); + if (!eventConfig || err || !(typeof EventTypeClass.constructor === "function")) { + await EventQueueBase.handleMissingTypeImplementation(context, eventType, eventSubType); return; } - baseInstance = new EventTypeClass( - context, - eventType, - eventSubType, - eventConfig - ); + baseInstance = new EventTypeClass(context, eventType, eventSubType, eventConfig); const continueProcessing = await baseInstance.handleDistributedLock(); if (!continueProcessing) { return; @@ -70,54 +44,36 @@ const processEventQueue = async ( eventConfig.startTime = startTime; while (shouldContinue) { iterationCounter++; - await executeInNewTransaction( - context, - `eventQueue-pre-processing-${eventType}##${eventSubType}`, - async (tx) => { - eventTypeInstance = new EventTypeClass( - tx.context, - eventType, - eventSubType, - eventConfig - ); - const queueEntries = - await eventTypeInstance.getQueueEntriesAndSetToInProgress(); - eventTypeInstance.startPerformanceTracerPreprocessing(); - for (const queueEntry of queueEntries) { - try { - eventTypeInstance.modifyQueueEntry(queueEntry); - const payload = - await eventTypeInstance.checkEventAndGeneratePayload( - queueEntry - ); - if (payload === null) { - eventTypeInstance.setStatusToDone(queueEntry); - continue; - } - if (payload === undefined) { - eventTypeInstance.handleInvalidPayloadReturned(queueEntry); - continue; - } - eventTypeInstance.addEventWithPayloadForProcessing( - queueEntry, - payload - ); - } catch (err) { - eventTypeInstance.handleErrorDuringProcessing(err, queueEntry); + await executeInNewTransaction(context, `eventQueue-pre-processing-${eventType}##${eventSubType}`, async (tx) => { + eventTypeInstance = new EventTypeClass(tx.context, eventType, eventSubType, eventConfig); + const queueEntries = await eventTypeInstance.getQueueEntriesAndSetToInProgress(); + eventTypeInstance.startPerformanceTracerPreprocessing(); + for (const queueEntry of queueEntries) { + try { + eventTypeInstance.modifyQueueEntry(queueEntry); + const payload = await eventTypeInstance.checkEventAndGeneratePayload(queueEntry); + if (payload === null) { + eventTypeInstance.setStatusToDone(queueEntry); + continue; } + if (payload === undefined) { + eventTypeInstance.handleInvalidPayloadReturned(queueEntry); + continue; + } + eventTypeInstance.addEventWithPayloadForProcessing(queueEntry, payload); + } catch (err) { + eventTypeInstance.handleErrorDuringProcessing(err, queueEntry); } - throw new TriggerRollback(); } - ); + throw new TriggerRollback(); + }); eventTypeInstance.exceededEvents.length && (await executeInNewTransaction( context, `eventQueue-handleExceededEvents-${eventType}##${eventSubType}`, async (tx) => { eventTypeInstance.processEventContext = tx.context; - await eventTypeInstance.handleExceededEvents( - eventTypeInstance.exceededEvents - ); + await eventTypeInstance.handleExceededEvents(eventTypeInstance.exceededEvents); } )); if (!eventTypeInstance) { @@ -125,41 +81,28 @@ const processEventQueue = async ( } eventTypeInstance.endPerformanceTracerPreprocessing(); if (Object.keys(eventTypeInstance.queueEntriesWithPayloadMap).length) { - await executeInNewTransaction( - context, - `eventQueue-processing-${eventType}##${eventSubType}`, - async (tx) => { - eventTypeInstance.processEventContext = tx.context; - try { - eventTypeInstance.clusterQueueEntries(); - await processEventMap(eventTypeInstance); - } catch (err) { - eventTypeInstance.handleErrorDuringClustering(err); - } - if ( - eventTypeInstance.transactionMode !== - TransactionMode.alwaysCommit || - Object.entries(eventTypeInstance.eventProcessingMap).some( - ([key]) => eventTypeInstance.shouldRollbackTransaction(key) - ) - ) { - throw new TriggerRollback(); - } + await executeInNewTransaction(context, `eventQueue-processing-${eventType}##${eventSubType}`, async (tx) => { + eventTypeInstance.processEventContext = tx.context; + try { + eventTypeInstance.clusterQueueEntries(); + await processEventMap(eventTypeInstance); + } catch (err) { + eventTypeInstance.handleErrorDuringClustering(err); } - ); + if ( + eventTypeInstance.transactionMode !== TransactionMode.alwaysCommit || + Object.entries(eventTypeInstance.eventProcessingMap).some(([key]) => + eventTypeInstance.shouldRollbackTransaction(key) + ) + ) { + throw new TriggerRollback(); + } + }); } - await executeInNewTransaction( - context, - `eventQueue-persistStatus-${eventType}##${eventSubType}`, - async (tx) => { - await eventTypeInstance.persistEventStatus(tx); - } - ); - shouldContinue = reevaluateShouldContinue( - eventTypeInstance, - iterationCounter, - startTime - ); + await executeInNewTransaction(context, `eventQueue-persistStatus-${eventType}##${eventSubType}`, async (tx) => { + await eventTypeInstance.persistEventStatus(tx); + }); + shouldContinue = reevaluateShouldContinue(eventTypeInstance, iterationCounter, startTime); if (!shouldContinue) { await executeInNewTransaction( context, @@ -171,26 +114,16 @@ const processEventQueue = async ( } } } catch (err) { - cds - .log(COMPONENT_NAME) - .error( - "Processing event queue failed with unexpected error. Error:", - err, - { - eventType, - eventSubType, - } - ); + cds.log(COMPONENT_NAME).error("Processing event queue failed with unexpected error. Error:", err, { + eventType, + eventSubType, + }); } finally { await baseInstance?.handleReleaseLock(); } }; -const reevaluateShouldContinue = ( - eventTypeInstance, - iterationCounter, - startTime -) => { +const reevaluateShouldContinue = (eventTypeInstance, iterationCounter, startTime) => { if (!eventTypeInstance.getSelectNextChunk()) { return false; // no select next chunk configured for this event } @@ -220,13 +153,7 @@ const processEventMap = async (eventTypeInstance) => { eventTypeInstance.baseContext, `eventQueue-processEvent-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`, async (tx) => { - statusMap = await _processEvent( - eventTypeInstance, - tx.context, - key, - queueEntries, - payload - ); + statusMap = await _processEvent(eventTypeInstance, tx.context, key, queueEntries, payload); if ( eventTypeInstance.statusMapContainsError(statusMap) || eventTypeInstance.shouldRollbackTransaction(key) @@ -247,13 +174,7 @@ const processEventMap = async (eventTypeInstance) => { } ); } else { - await _processEvent( - eventTypeInstance, - eventTypeInstance.context, - key, - queueEntries, - payload - ); + await _processEvent(eventTypeInstance, eventTypeInstance.context, key, queueEntries, payload); } } ).finally(() => { @@ -265,28 +186,15 @@ const processEventMap = async (eventTypeInstance) => { eventTypeInstance.endPerformanceTracerEvents(); }; -const _processEvent = async ( - eventTypeInstance, - processContext, - key, - queueEntries, - payload -) => { +const _processEvent = async (eventTypeInstance, processContext, key, queueEntries, payload) => { try { eventTypeInstance.logStartMessage(queueEntries); - const eventOutdated = await eventTypeInstance.isOutdatedAndKeepalive( - queueEntries - ); + const eventOutdated = await eventTypeInstance.isOutdatedAndKeepalive(queueEntries); if (eventOutdated) { return; } eventTypeInstance.setTxForEventProcessing(key, cds.tx(processContext)); - const statusTuple = await eventTypeInstance.processEvent( - processContext, - key, - queueEntries, - payload - ); + const statusTuple = await eventTypeInstance.processEvent(processContext, key, queueEntries, payload); return eventTypeInstance.setEventStatus(queueEntries, statusTuple); } catch (err) { return eventTypeInstance.handleErrorDuringProcessing(err, queueEntries); diff --git a/src/publishEvent.js b/src/publishEvent.js index c4ee6f61..039b0dd3 100644 --- a/src/publishEvent.js +++ b/src/publishEvent.js @@ -15,9 +15,7 @@ const publishEvent = async (tx, events) => { throw EventQueueError.unknownEventType(type, subType); } } - return await tx.run( - INSERT.into(configInstance.tableNameEventQueue).entries(eventsForProcessing) - ); + return await tx.run(INSERT.into(configInstance.tableNameEventQueue).entries(eventsForProcessing)); }; module.exports = { diff --git a/src/redisPubSub.js b/src/redisPubSub.js index fc14c35f..928f52fe 100644 --- a/src/redisPubSub.js +++ b/src/redisPubSub.js @@ -22,15 +22,11 @@ const initEventQueueRedisSubscribe = () => { const subscribeRedisClient = () => { const errorHandlerCreateClient = (err) => { - cds - .log(COMPONENT_NAME) - .error("error from redis client for pub/sub failed", err); + cds.log(COMPONENT_NAME).error("error from redis client for pub/sub failed", err); subscriberClientPromise = null; setTimeout(subscribeRedisClient, 5 * 1000).unref(); }; - subscriberClientPromise = redis.createClientAndConnect( - errorHandlerCreateClient - ); + subscriberClientPromise = redis.createClientAndConnect(errorHandlerCreateClient); subscriberClientPromise .then((client) => { cds.log(COMPONENT_NAME).info("subscribe redis client connected"); @@ -39,10 +35,7 @@ const subscribeRedisClient = () => { .catch((err) => { cds .log(COMPONENT_NAME) - .error( - "error from redis client for pub/sub failed during startup - trying to reconnect", - err - ); + .error("error from redis client for pub/sub failed during startup - trying to reconnect", err); }); }; @@ -68,9 +61,7 @@ const messageHandlerProcessEvents = async (messageData) => { type, subType, }); - getWorkerPoolInstance().addToQueue(async () => - processEventQueue(context, type, subType) - ); + getWorkerPoolInstance().addToQueue(async () => processEventQueue(context, type, subType)); }; const publishEvent = async (tenantId, type, subType) => { @@ -89,9 +80,7 @@ const publishEvent = async (tenantId, type, subType) => { }; try { if (!publishClient) { - publishClient = await redis.createClientAndConnect( - errorHandlerCreateClient - ); + publishClient = await redis.createClientAndConnect(errorHandlerCreateClient); logger.info("publish redis client connected"); } @@ -108,10 +97,7 @@ const publishEvent = async (tenantId, type, subType) => { type, subType, }); - await publishClient.publish( - MESSAGE_CHANNEL, - JSON.stringify({ tenantId, type, subType }) - ); + await publishClient.publish(MESSAGE_CHANNEL, JSON.stringify({ tenantId, type, subType })); } catch (err) { logger.error(`publish event failed with error: ${err.toString()}`, { tenantId, diff --git a/src/runner.js b/src/runner.js index 641891c5..8ec890c8 100644 --- a/src/runner.js +++ b/src/runner.js @@ -26,17 +26,13 @@ const _scheduleFunction = async (fn) => { const configInstance = eventQueueConfig.getConfigInstance(); const eventsForAutomaticRun = configInstance.events; if (!eventsForAutomaticRun.length) { - LOGGER.warn( - "no events for automatic run are configured - skipping runner registration" - ); + LOGGER.warn("no events for automatic run are configured - skipping runner registration"); return; } const fnWithRunningCheck = () => { if (configInstance.isRunnerDeactivated) { - LOGGER.info( - "runner is deactivated via config variable. Skipping this run." - ); + LOGGER.info("runner is deactivated via config variable. Skipping this run."); return; } return fn(); @@ -45,9 +41,7 @@ const _scheduleFunction = async (fn) => { const offsetDependingOnLastRun = await _calculateOffsetForFirstRun(); LOGGER.info("first event-queue run scheduled", { - firstRunScheduledFor: new Date( - Date.now() + offsetDependingOnLastRun - ).toISOString(), + firstRunScheduledFor: new Date(Date.now() + offsetDependingOnLastRun).toISOString(), }); setTimeout(() => { @@ -72,9 +66,7 @@ const _multiTenancyRedis = async () => { const _multiTenancyDb = async () => { try { - LOGGER.info( - "executing event queue run for single instance and multi tenant" - ); + LOGGER.info("executing event queue run for single instance and multi tenant"); const tenantIds = await cdsHelper.getAllTenantIds(); _executeAllTenants(tenantIds, EVENT_QUEUE_RUN_ID); } catch (err) { @@ -91,13 +83,9 @@ const _executeAllTenants = (tenantIds, runId) => { workerQueueInstance.addToQueue(async () => { try { const tenantContext = new cds.EventContext({ tenant: tenantId }); - const couldAcquireLock = await distributedLock.acquireLock( - tenantContext, - runId, - { - expiryTime: configInstance.runInterval * 0.95, - } - ); + const couldAcquireLock = await distributedLock.acquireLock(tenantContext, runId, { + expiryTime: configInstance.runInterval * 0.95, + }); if (!couldAcquireLock) { return; } @@ -129,48 +117,31 @@ const _executeRunForTenant = async (tenantId, runId) => { }); await eventQueueRunner(context, eventsForAutomaticRun); } catch (err) { - LOGGER.error( - `Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, - { - tenantId, - redisEnabled: configInstance.redisEnabled, - } - ); + LOGGER.error(`Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, { + tenantId, + redisEnabled: configInstance.redisEnabled, + }); } }; const _acquireRunId = async (context) => { const configInstance = eventQueueConfig.getConfigInstance(); let runId = uuid.v4(); - const couldSetValue = await distributedLock.setValueWithExpire( - context, - EVENT_QUEUE_RUN_ID, - runId, - { - tenantScoped: false, - expiryTime: configInstance.runInterval * 0.95, - } - ); + const couldSetValue = await distributedLock.setValueWithExpire(context, EVENT_QUEUE_RUN_ID, runId, { + tenantScoped: false, + expiryTime: configInstance.runInterval * 0.95, + }); if (couldSetValue) { - await distributedLock.setValueWithExpire( - context, - EVENT_QUEUE_RUN_TS, - new Date().toISOString(), - { - tenantScoped: false, - expiryTime: configInstance.runInterval, - overrideValue: true, - } - ); + await distributedLock.setValueWithExpire(context, EVENT_QUEUE_RUN_TS, new Date().toISOString(), { + tenantScoped: false, + expiryTime: configInstance.runInterval, + overrideValue: true, + }); } else { - runId = await distributedLock.checkLockExistsAndReturnValue( - context, - EVENT_QUEUE_RUN_ID, - { - tenantScoped: false, - } - ); + runId = await distributedLock.checkLockExistsAndReturnValue(context, EVENT_QUEUE_RUN_ID, { + tenantScoped: false, + }); } return runId; @@ -185,34 +156,24 @@ const _calculateOffsetForFirstRun = async () => { try { if (configInstance.redisEnabled) { const dummyContext = new cds.EventContext({}); - let lastRunTs = await distributedLock.checkLockExistsAndReturnValue( - dummyContext, - EVENT_QUEUE_RUN_TS, - { tenantScoped: false } - ); + let lastRunTs = await distributedLock.checkLockExistsAndReturnValue(dummyContext, EVENT_QUEUE_RUN_TS, { + tenantScoped: false, + }); if (!lastRunTs) { const ts = new Date(now).toISOString(); - const couldSetValue = await distributedLock.setValueWithExpire( - dummyContext, - EVENT_QUEUE_RUN_TS, - ts, - { - tenantScoped: false, - expiryTime: configInstance.runInterval, - } - ); + const couldSetValue = await distributedLock.setValueWithExpire(dummyContext, EVENT_QUEUE_RUN_TS, ts, { + tenantScoped: false, + expiryTime: configInstance.runInterval, + }); if (couldSetValue) { lastRunTs = ts; } else { - lastRunTs = await distributedLock.checkLockExistsAndReturnValue( - dummyContext, - EVENT_QUEUE_RUN_TS, - { tenantScoped: false } - ); + lastRunTs = await distributedLock.checkLockExistsAndReturnValue(dummyContext, EVENT_QUEUE_RUN_TS, { + tenantScoped: false, + }); } } - offsetDependingOnLastRun = - new Date(lastRunTs).getTime() + configInstance.runInterval - now; + offsetDependingOnLastRun = new Date(lastRunTs).getTime() + configInstance.runInterval - now; } } catch (err) { LOGGER.error( diff --git a/src/shared/PerformanceTracer.js b/src/shared/PerformanceTracer.js index bef0948b..392ab9a0 100644 --- a/src/shared/PerformanceTracer.js +++ b/src/shared/PerformanceTracer.js @@ -36,9 +36,7 @@ class PerformanceTracer { //determine, if an options object was provided as first argument if ( typeof args?.[0] === "object" && - (args[0].quantity >= 0 || - args[0].threshold > 0 || - args[0].additionalQuantityThreshold > 0) + (args[0].quantity >= 0 || args[0].threshold > 0 || args[0].additionalQuantityThreshold > 0) ) { options = args.shift(); } diff --git a/src/shared/WorkerQueue.js b/src/shared/WorkerQueue.js index 799f7eb9..91024478 100644 --- a/src/shared/WorkerQueue.js +++ b/src/shared/WorkerQueue.js @@ -32,31 +32,20 @@ class WorkerQueue { this.__runningPromises.push(promise); promise .finally(() => { - this.__runningPromises.splice( - this.__runningPromises.indexOf(promise), - 1 - ); + this.__runningPromises.splice(this.__runningPromises.indexOf(promise), 1); this._checkForNext(); }) .then((...results) => { resolve(...results); }) .catch((err) => { - cds - .log(COMPONENT_NAME) - .error( - "Error happened in WorkQueue. Errors should be caught before! Error:", - err - ); + cds.log(COMPONENT_NAME).error("Error happened in WorkQueue. Errors should be caught before! Error:", err); reject(err); }); } _checkForNext() { - if ( - !this.__queue.length || - this.__runningPromises.length >= this.__concurrencyLimit - ) { + if (!this.__queue.length || this.__runningPromises.length >= this.__concurrencyLimit) { return; } const [cb, resolve, reject] = this.__queue.shift(); diff --git a/src/shared/cdsHelper.js b/src/shared/cdsHelper.js index 6d9913ac..ffe4d476 100644 --- a/src/shared/cdsHelper.js +++ b/src/shared/cdsHelper.js @@ -20,13 +20,7 @@ const COMPONENT_NAME = "eventQueue/cdsHelper"; * @param info {object} Additional information object attached to logging * @returns {Promise} Promise resolving to true if everything worked fine / false if an error occurred */ -async function executeInNewTransaction( - context = {}, - transactionTag, - fn, - args, - { info = {} } = {} -) { +async function executeInNewTransaction(context = {}, transactionTag, fn, args, { info = {} } = {}) { const parameters = Array.isArray(args) ? args : [args]; const logger = cds.log(COMPONENT_NAME); try { @@ -48,10 +42,7 @@ async function executeInNewTransaction( } else { const contextTx = cds.tx(context); const contextTxState = contextTx.ready; - if ( - !contextTxState || - ["committed", "rolled back"].includes(contextTxState) - ) { + if (!contextTxState || ["committed", "rolled back"].includes(contextTxState)) { await cds.tx( { id: context.id, diff --git a/src/shared/common.js b/src/shared/common.js index 8db6bf71..09b135d3 100644 --- a/src/shared/common.js +++ b/src/shared/common.js @@ -56,10 +56,7 @@ class Funnel { } // map function call to promise - const p = - f.constructor.name === "AsyncFunction" - ? f(...args) - : Promise.resolve().then(() => f(...args)); + const p = f.constructor.name === "AsyncFunction" ? f(...args) : Promise.resolve().then(() => f(...args)); // create promise for book keeping const workload = p.finally(() => { @@ -100,9 +97,7 @@ const limiter = async (limit, payloads, iterator) => { returnPromises.push(p); if (limit <= payloads.length) { - const e = p - .catch(() => {}) - .finally(() => runningPromises.splice(runningPromises.indexOf(e), 1)); + const e = p.catch(() => {}).finally(() => runningPromises.splice(runningPromises.indexOf(e), 1)); runningPromises.push(e); if (limit <= runningPromises.length) { await Promise.race(runningPromises); diff --git a/src/shared/distributedLock.js b/src/shared/distributedLock.js index 1d15a732..8e6deafa 100644 --- a/src/shared/distributedLock.js +++ b/src/shared/distributedLock.js @@ -8,10 +8,7 @@ const { getConfigInstance } = require("../config"); const acquireLock = async ( context, key, - { - tenantScoped = true, - expiryTime = config.getConfigInstance().globalTxTimeout, - } = {} + { tenantScoped = true, expiryTime = config.getConfigInstance().globalTxTimeout } = {} ) => { const fullKey = _generateKey(context, tenantScoped, key); if (config.getConfigInstance().redisEnabled) { @@ -25,11 +22,7 @@ const setValueWithExpire = async ( context, key, value, - { - tenantScoped = true, - expiryTime = config.getConfigInstance().globalTxTimeout, - overrideValue = false, - } = {} + { tenantScoped = true, expiryTime = config.getConfigInstance().globalTxTimeout, overrideValue = false } = {} ) => { const fullKey = _generateKey(context, tenantScoped, key); if (config.getConfigInstance().redisEnabled) { @@ -54,11 +47,7 @@ const releaseLock = async (context, key, { tenantScoped = true } = {}) => { } }; -const checkLockExistsAndReturnValue = async ( - context, - key, - { tenantScoped = true } = {} -) => { +const checkLockExistsAndReturnValue = async (context, key, { tenantScoped = true } = {}) => { const fullKey = _generateKey(context, tenantScoped, key); if (config.getConfigInstance().redisEnabled) { return await _checkLockExistsRedis(context, fullKey); @@ -67,12 +56,7 @@ const checkLockExistsAndReturnValue = async ( } }; -const _acquireLockRedis = async ( - context, - fullKey, - expiryTime, - { value = "true", overrideValue = false } = {} -) => { +const _acquireLockRedis = async (context, fullKey, expiryTime, { value = "true", overrideValue = false } = {}) => { const client = await redis.createMainClientAndConnect(); const result = await client.set(fullKey, value, { PX: expiryTime, @@ -89,17 +73,9 @@ const _checkLockExistsRedis = async (context, fullKey) => { const _checkLockExistsDb = async (context, fullKey) => { let result; const configInstance = getConfigInstance(); - await cdsHelper.executeInNewTransaction( - context, - "distributedLock-checkExists", - async (tx) => { - result = await tx.run( - SELECT.one - .from(configInstance.tableNameEventLock) - .where("code =", fullKey) - ); - } - ); + await cdsHelper.executeInNewTransaction(context, "distributedLock-checkExists", async (tx) => { + result = await tx.run(SELECT.one.from(configInstance.tableNameEventLock).where("code =", fullKey)); + }); return result?.value; }; @@ -110,69 +86,49 @@ const _releaseLockRedis = async (context, fullKey) => { const _releaseLockDb = async (context, fullKey) => { const configInstance = getConfigInstance(); - await cdsHelper.executeInNewTransaction( - context, - "distributedLock-release", - async (tx) => { - await tx.run( - DELETE.from(configInstance.tableNameEventLock).where("code =", fullKey) - ); - } - ); + await cdsHelper.executeInNewTransaction(context, "distributedLock-release", async (tx) => { + await tx.run(DELETE.from(configInstance.tableNameEventLock).where("code =", fullKey)); + }); }; -const _acquireLockDB = async ( - context, - fullKey, - expiryTime, - { value = "true", overrideValue = false } = {} -) => { +const _acquireLockDB = async (context, fullKey, expiryTime, { value = "true", overrideValue = false } = {}) => { let result; const configInstance = getConfigInstance(); - await cdsHelper.executeInNewTransaction( - context, - "distributedLock-acquire", - async (tx) => { - try { + await cdsHelper.executeInNewTransaction(context, "distributedLock-acquire", async (tx) => { + try { + await tx.run( + INSERT.into(configInstance.tableNameEventLock).entries({ + code: fullKey, + value, + }) + ); + result = true; + } catch (err) { + let currentEntry; + + if (!overrideValue) { + currentEntry = await tx.run( + SELECT.one + .from(configInstance.tableNameEventLock) + .forUpdate({ wait: config.getConfigInstance().forUpdateTimeout }) + .where("code =", fullKey) + ); + } + if (overrideValue || (currentEntry && new Date(currentEntry.createdAt).getTime() + expiryTime <= Date.now())) { await tx.run( - INSERT.into(configInstance.tableNameEventLock).entries({ - code: fullKey, - value, - }) + UPDATE.entity(configInstance.tableNameEventLock) + .set({ + createdAt: new Date().toISOString(), + value, + }) + .where("code =", currentEntry.code) ); result = true; - } catch (err) { - let currentEntry; - - if (!overrideValue) { - currentEntry = await tx.run( - SELECT.one - .from(configInstance.tableNameEventLock) - .forUpdate({ wait: config.getConfigInstance().forUpdateTimeout }) - .where("code =", fullKey) - ); - } - if ( - overrideValue || - (currentEntry && - new Date(currentEntry.createdAt).getTime() + expiryTime <= - Date.now()) - ) { - await tx.run( - UPDATE.entity(configInstance.tableNameEventLock) - .set({ - createdAt: new Date().toISOString(), - value, - }) - .where("code =", currentEntry.code) - ); - result = true; - } else { - result = false; - } + } else { + result = false; } } - ); + }); return result; }; diff --git a/src/shared/redis.js b/src/shared/redis.js index 524cac0d..d7cbe8ea 100644 --- a/src/shared/redis.js +++ b/src/shared/redis.js @@ -15,9 +15,7 @@ const createMainClientAndConnect = () => { } const errorHandlerCreateClient = (err) => { - cds - .log(COMPONENT_NAME) - .error("error from redis client for pub/sub failed", err); + cds.log(COMPONENT_NAME).error("error from redis client for pub/sub failed", err); subscriberClientPromise = null; setTimeout(createMainClientAndConnect, 5 * 1000).unref(); }; diff --git a/test-integration/integration-main.test.js b/test-integration/integration-main.test.js index 6e4ee5ca..edebad83 100644 --- a/test-integration/integration-main.test.js +++ b/test-integration/integration-main.test.js @@ -22,13 +22,7 @@ describe("integration-main", () => { let loggerMock; beforeAll(async () => { - const configFilePath = path.join( - __dirname, - "..", - "./test", - "asset", - "config.yml" - ); + const configFilePath = path.join(__dirname, "..", "./test", "asset", "config.yml"); await eventQueue.initialize({ configFilePath, processEventsAfterPublish: false, @@ -94,10 +88,7 @@ describe("integration-main", () => { .spyOn(EventQueueTest.prototype, "processEvent") .mockImplementationOnce(async (processContext, key, queueEntries) => { await cds.tx(processContext).run(SELECT.from("sap.eventqueue.Lock")); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Error, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Error]); }); await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(0); @@ -109,17 +100,13 @@ describe("integration-main", () => { await cds.tx({}, (tx2) => testHelper.insertEventEntry(tx2)); dbCounts = {}; const event = eventQueue.getConfigInstance().events[0]; - jest - .spyOn(EventQueueTest.prototype, "processEvent") - .mockImplementationOnce(async (processContext) => { - await cds.tx(processContext).run(SELECT.from("sap.eventqueue.Lock")); - throw new Error("error during processing"); - }); + jest.spyOn(EventQueueTest.prototype, "processEvent").mockImplementationOnce(async (processContext) => { + await cds.tx(processContext).run(SELECT.from("sap.eventqueue.Lock")); + throw new Error("error during processing"); + }); await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(1); - expect( - loggerMock.calls().error[0][0].includes("error during processing") - ).toBeTruthy(); + expect(loggerMock.calls().error[0][0].includes("error during processing")).toBeTruthy(); await testHelper.selectEventQueueAndExpectError(tx); expect(dbCounts).toMatchSnapshot(); }); @@ -128,16 +115,12 @@ describe("integration-main", () => { await cds.tx({}, (tx2) => testHelper.insertEventEntry(tx2)); dbCounts = {}; const event = eventQueue.getConfigInstance().events[0]; - jest - .spyOn(EventQueueTest.prototype, "clusterQueueEntries") - .mockImplementationOnce(() => { - throw new Error("error during processing"); - }); + jest.spyOn(EventQueueTest.prototype, "clusterQueueEntries").mockImplementationOnce(() => { + throw new Error("error during processing"); + }); await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(1); - expect( - loggerMock.calls().error[0][0].includes("error during processing") - ).toBeTruthy(); + expect(loggerMock.calls().error[0][0].includes("error during processing")).toBeTruthy(); expect(loggerMock.calls().error).toMatchSnapshot(); await testHelper.selectEventQueueAndExpectError(tx); expect(dbCounts).toMatchSnapshot(); @@ -147,11 +130,9 @@ describe("integration-main", () => { await cds.tx({}, (tx2) => testHelper.insertEventEntry(tx2)); dbCounts = {}; const event = eventQueue.getConfigInstance().events[0]; - jest - .spyOn(EventQueueTest.prototype, "checkEventAndGeneratePayload") - .mockImplementationOnce(() => { - throw new Error("error during processing"); - }); + jest.spyOn(EventQueueTest.prototype, "checkEventAndGeneratePayload").mockImplementationOnce(() => { + throw new Error("error during processing"); + }); await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(1); expect(loggerMock.calls().error).toMatchSnapshot(); @@ -163,11 +144,9 @@ describe("integration-main", () => { await cds.tx({}, (tx2) => testHelper.insertEventEntry(tx2)); dbCounts = {}; const event = eventQueue.getConfigInstance().events[0]; - jest - .spyOn(EventQueueTest.prototype, "modifyQueueEntry") - .mockImplementationOnce(() => { - throw new Error("error during processing"); - }); + jest.spyOn(EventQueueTest.prototype, "modifyQueueEntry").mockImplementationOnce(() => { + throw new Error("error during processing"); + }); await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(1); // TODO: should not be an unexpected error @@ -185,11 +164,7 @@ describe("integration-main", () => { }) ); dbCounts = {}; - await eventQueue.processEventQueue( - context, - "TransactionMode", - "alwaysRollback" - ); + await eventQueue.processEventQueue(context, "TransactionMode", "alwaysRollback"); expect(loggerMock.callsLengths().error).toEqual(0); await testHelper.selectEventQueueAndExpectDone(tx, 2); expect(dbCounts).toMatchSnapshot(); @@ -200,10 +175,7 @@ describe("integration-main", () => { const processSpy = jest .spyOn(EventQueueTest.prototype, "processEvent") .mockImplementationOnce((processContext, key, queueEntries) => { - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Exceeded, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Exceeded]); }); dbCounts = {}; const event = eventQueue.getConfigInstance().events[0]; @@ -226,10 +198,7 @@ describe("integration-main", () => { const event = eventQueue.getConfigInstance().events[0]; await cds.tx({}, async (tx2) => { await testHelper.insertEventEntry(tx2); - await distributedLock.acquireLock( - tx2.context, - [event.type, event.subType].join("##") - ); + await distributedLock.acquireLock(tx2.context, [event.type, event.subType].join("##")); }); dbCounts = {}; await eventQueue.processEventQueue(context, event.type, event.subType); @@ -241,9 +210,7 @@ describe("integration-main", () => { it("should delete event entries after 30 days", async () => { await cds.tx({}, async (tx2) => { const event = testHelper.getEventEntry(); - event.lastAttemptTimestamp = new Date( - Date.now() - 31 * 24 * 60 * 60 * 1000 - ).toISOString(); + event.lastAttemptTimestamp = new Date(Date.now() - 31 * 24 * 60 * 60 * 1000).toISOString(); event.status = 2; await eventQueue.publishEvent(tx2, event); }); @@ -267,11 +234,7 @@ describe("integration-main", () => { dbCounts = {}; jest .spyOn(EventQueueTest.prototype, "processEvent") - .mockImplementationOnce(async function ( - processContext, - key, - queueEntries - ) { + .mockImplementationOnce(async function (processContext, key, queueEntries) { this.setShouldRollbackTransaction(key); await testHelper.insertEventEntry(cds.tx(processContext), { numberOfEntries: 1, @@ -279,32 +242,18 @@ describe("integration-main", () => { subType: "alwaysRollback", randomGuid: true, }); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Done, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]); }) - .mockImplementationOnce(async function ( - processContext, - key, - queueEntries - ) { + .mockImplementationOnce(async function (processContext, key, queueEntries) { await testHelper.insertEventEntry(cds.tx(processContext), { numberOfEntries: 1, type: "TransactionMode", subType: "alwaysRollback", randomGuid: true, }); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Done, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]); }); - await eventQueue.processEventQueue( - context, - "TransactionMode", - "isolated" - ); + await eventQueue.processEventQueue(context, "TransactionMode", "isolated"); expect(loggerMock.callsLengths().error).toEqual(0); const events = await testHelper.selectEventQueueAndReturn(tx, 3); expect(events).toMatchSnapshot(); @@ -322,11 +271,7 @@ describe("integration-main", () => { dbCounts = {}; jest .spyOn(EventQueueTest.prototype, "processEvent") - .mockImplementationOnce(async function ( - processContext, - key, - queueEntries - ) { + .mockImplementationOnce(async function (processContext, key, queueEntries) { this.setShouldRollbackTransaction(key); await testHelper.insertEventEntry(cds.tx(processContext), { numberOfEntries: 1, @@ -334,16 +279,9 @@ describe("integration-main", () => { subType: "alwaysRollback", randomGuid: true, }); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Done, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]); }) - .mockImplementationOnce(async function ( - processContext, - key, - queueEntries - ) { + .mockImplementationOnce(async function (processContext, key, queueEntries) { this.setShouldRollbackTransaction(key); await testHelper.insertEventEntry(cds.tx(processContext), { numberOfEntries: 1, @@ -351,16 +289,9 @@ describe("integration-main", () => { subType: "alwaysRollback", randomGuid: true, }); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Done, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]); }); - await eventQueue.processEventQueue( - context, - "TransactionMode", - "isolated" - ); + await eventQueue.processEventQueue(context, "TransactionMode", "isolated"); expect(loggerMock.callsLengths().error).toEqual(0); await testHelper.selectEventQueueAndExpectDone(tx, 2); expect(dbCounts).toMatchSnapshot(); @@ -377,26 +308,18 @@ describe("integration-main", () => { }) ); dbCounts = {}; - jest - .spyOn(EventQueueTest.prototype, "processEvent") - .mockImplementationOnce(async (processContext) => { - await testHelper.insertEventEntry(cds.tx(processContext), { - numberOfEntries: 1, - type: "TransactionMode", - subType: "alwaysRollback", - randomGuid: true, - }); - throw new Error("error during processing"); + jest.spyOn(EventQueueTest.prototype, "processEvent").mockImplementationOnce(async (processContext) => { + await testHelper.insertEventEntry(cds.tx(processContext), { + numberOfEntries: 1, + type: "TransactionMode", + subType: "alwaysRollback", + randomGuid: true, }); - await eventQueue.processEventQueue( - context, - "TransactionMode", - "alwaysCommit" - ); + throw new Error("error during processing"); + }); + await eventQueue.processEventQueue(context, "TransactionMode", "alwaysCommit"); expect(loggerMock.callsLengths().error).toEqual(1); - expect( - loggerMock.calls().error[0][0].includes("error during processing") - ).toBeTruthy(); + expect(loggerMock.calls().error[0][0].includes("error during processing")).toBeTruthy(); expect(dbCounts).toMatchSnapshot(); const events = await testHelper.selectEventQueueAndReturn(tx, 3); expect(events).toMatchSnapshot(); @@ -413,11 +336,7 @@ describe("integration-main", () => { dbCounts = {}; jest .spyOn(EventQueueTest.prototype, "processEvent") - .mockImplementationOnce(async function ( - processContext, - key, - queueEntries - ) { + .mockImplementationOnce(async function (processContext, key, queueEntries) { await testHelper.insertEventEntry(cds.tx(processContext), { numberOfEntries: 1, type: "TransactionMode", @@ -425,16 +344,9 @@ describe("integration-main", () => { randomGuid: true, }); this.setShouldRollbackTransaction(key); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Done, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]); }); - await eventQueue.processEventQueue( - context, - "TransactionMode", - "alwaysCommit" - ); + await eventQueue.processEventQueue(context, "TransactionMode", "alwaysCommit"); expect(loggerMock.callsLengths().error).toEqual(0); expect(dbCounts).toMatchSnapshot(); const events = await testHelper.selectEventQueueAndReturn(tx, 1); @@ -452,29 +364,19 @@ describe("integration-main", () => { }) ); dbCounts = {}; - jest - .spyOn(EventQueueTest.prototype, "processEvent") - .mockImplementationOnce(async (processContext) => { - await testHelper.insertEventEntry(cds.tx(processContext), { - numberOfEntries: 1, - type: "TransactionMode", - subType: "alwaysRollback", - randomGuid: true, - }); - throw new Error("error during processing"); + jest.spyOn(EventQueueTest.prototype, "processEvent").mockImplementationOnce(async (processContext) => { + await testHelper.insertEventEntry(cds.tx(processContext), { + numberOfEntries: 1, + type: "TransactionMode", + subType: "alwaysRollback", + randomGuid: true, }); - await eventQueue.processEventQueue( - context, - "TransactionMode", - "alwaysRollback" - ); + throw new Error("error during processing"); + }); + await eventQueue.processEventQueue(context, "TransactionMode", "alwaysRollback"); expect(loggerMock.callsLengths().error).toEqual(1); - expect( - loggerMock.calls().error[0][0].includes("error during processing") - ).toBeTruthy(); - expect( - loggerMock.calls().error[0][0].includes("error during processing") - ).toBeTruthy(); + expect(loggerMock.calls().error[0][0].includes("error during processing")).toBeTruthy(); + expect(loggerMock.calls().error[0][0].includes("error during processing")).toBeTruthy(); expect(dbCounts).toMatchSnapshot(); const result = await testHelper.selectEventQueueAndReturn(tx, 2); expect(result).toMatchSnapshot(); @@ -498,16 +400,9 @@ describe("integration-main", () => { subType: "alwaysRollback", randomGuid: true, }); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Done, - ]); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]); }); - await eventQueue.processEventQueue( - context, - "TransactionMode", - "alwaysRollback" - ); + await eventQueue.processEventQueue(context, "TransactionMode", "alwaysRollback"); expect(loggerMock.callsLengths().error).toEqual(0); expect(dbCounts).toMatchSnapshot(); const result = await testHelper.selectEventQueueAndReturn(tx, 1); @@ -518,13 +413,7 @@ describe("integration-main", () => { describe("end-to-end", () => { beforeAll(async () => { eventQueue.getConfigInstance().initialized = false; - const configFilePath = path.join( - __dirname, - "..", - "./test", - "asset", - "config.yml" - ); + const configFilePath = path.join(__dirname, "..", "./test", "asset", "config.yml"); await eventQueue.initialize({ configFilePath, processEventsAfterPublish: true, @@ -543,9 +432,7 @@ describe("integration-main", () => { const waitEntryIsDone = async () => { let startTime = Date.now(); while (true) { - const row = await cds.tx({}, (tx2) => - tx2.run(SELECT.one.from("sap.eventqueue.Event")) - ); + const row = await cds.tx({}, (tx2) => tx2.run(SELECT.one.from("sap.eventqueue.Event"))); dbCounts["BEGIN"]--; dbCounts["COMMIT"]--; dbCounts["READ"]--; diff --git a/test-integration/runner.test.js b/test-integration/runner.test.js index 792aea18..8f4d2dc9 100644 --- a/test-integration/runner.test.js +++ b/test-integration/runner.test.js @@ -13,14 +13,12 @@ const getAllTenantIdsSpy = jest.spyOn(cdsHelper, "getAllTenantIds"); jest.spyOn(cdsHelper, "getSubdomainForTenantId").mockResolvedValue("dummy"); const processEventQueue = require("../src/processEventQueue"); -const eventQueueRunnerSpy = jest - .spyOn(processEventQueue, "eventQueueRunner") - .mockImplementation( - async () => - new Promise((resolve) => { - setTimeout(resolve, 10); - }) - ); +const eventQueueRunnerSpy = jest.spyOn(processEventQueue, "eventQueueRunner").mockImplementation( + async () => + new Promise((resolve) => { + setTimeout(resolve, 10); + }) +); const distributedLock = require("../src/shared/distributedLock"); const eventQueue = require("../src"); @@ -37,13 +35,7 @@ describe("redisRunner", () => { let context, tx, configInstance; beforeAll(async () => { - const configFilePath = path.join( - __dirname, - "..", - "./test", - "asset", - "config.yml" - ); + const configFilePath = path.join(__dirname, "..", "./test", "asset", "config.yml"); await eventQueue.initialize({ configFilePath, processEventsAfterPublish: false, @@ -74,15 +66,9 @@ describe("redisRunner", () => { }); it("redis", async () => { - const setValueWithExpireSpy = jest.spyOn( - distributedLock, - "setValueWithExpire" - ); + const setValueWithExpireSpy = jest.spyOn(distributedLock, "setValueWithExpire"); const acquireLockSpy = jest.spyOn(distributedLock, "acquireLock"); - const checkLockExistsAndReturnValueSpy = jest.spyOn( - distributedLock, - "checkLockExistsAndReturnValue" - ); + const checkLockExistsAndReturnValueSpy = jest.spyOn(distributedLock, "checkLockExistsAndReturnValue"); getAllTenantIdsSpy .mockResolvedValueOnce(tenantIds) .mockResolvedValueOnce(tenantIds) @@ -202,18 +188,14 @@ describe("redisRunner", () => { }); it("acquireRunId should set ts", async () => { - let runTs = await distributedLock.checkLockExistsAndReturnValue( - {}, - runner._.EVENT_QUEUE_RUN_TS, - { tenantScoped: false } - ); + let runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner._.EVENT_QUEUE_RUN_TS, { + tenantScoped: false, + }); expect(runTs).toBeNull(); await runner._._acquireRunId(); - runTs = await distributedLock.checkLockExistsAndReturnValue( - {}, - runner._.EVENT_QUEUE_RUN_TS, - { tenantScoped: false } - ); + runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner._.EVENT_QUEUE_RUN_TS, { + tenantScoped: false, + }); expect(runTs).toBeDefined(); }); @@ -222,13 +204,10 @@ describe("redisRunner", () => { jest.useFakeTimers(); const systemTime = Date.now(); jest.setSystemTime(systemTime); - const runTs = await distributedLock.checkLockExistsAndReturnValue( - {}, - runner._.EVENT_QUEUE_RUN_TS, - { tenantScoped: false } - ); - const expectedTs = - new Date(runTs).getTime() + configInstance.runInterval - systemTime; + const runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner._.EVENT_QUEUE_RUN_TS, { + tenantScoped: false, + }); + const expectedTs = new Date(runTs).getTime() + configInstance.runInterval - systemTime; const result = await runner._._calculateOffsetForFirstRun(); expect(result).toEqual(expectedTs); jest.useRealTimers(); @@ -236,22 +215,14 @@ describe("redisRunner", () => { it("should calculate correct offset - manuel set", async () => { const ts = new Date(Date.now() - 3 * 60 * 1000).toISOString(); - await distributedLock.setValueWithExpire( - {}, - runner._.EVENT_QUEUE_RUN_TS, - ts, - { tenantScoped: false } - ); + await distributedLock.setValueWithExpire({}, runner._.EVENT_QUEUE_RUN_TS, ts, { tenantScoped: false }); jest.useFakeTimers(); const systemTime = Date.now(); jest.setSystemTime(systemTime); - const runTs = await distributedLock.checkLockExistsAndReturnValue( - {}, - runner._.EVENT_QUEUE_RUN_TS, - { tenantScoped: false } - ); - const expectedTs = - new Date(runTs).getTime() + configInstance.runInterval - systemTime; + const runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner._.EVENT_QUEUE_RUN_TS, { + tenantScoped: false, + }); + const expectedTs = new Date(runTs).getTime() + configInstance.runInterval - systemTime; const result = await runner._._calculateOffsetForFirstRun(); expect(result).toEqual(expectedTs); jest.useRealTimers(); diff --git a/test/asset/EventQueueTest.js b/test/asset/EventQueueTest.js index 39cdd82b..7ec987b1 100644 --- a/test/asset/EventQueueTest.js +++ b/test/asset/EventQueueTest.js @@ -10,13 +10,8 @@ class EventQueueTest extends EventQueueBaseClass { // eslint-disable-next-line no-unused-vars async processEvent(processContext, key, queueEntries, payload) { - await this.getTxForEventProcessing(key).run( - SELECT.from("sap.eventqueue.Event") - ); - return queueEntries.map((queueEntry) => [ - queueEntry.ID, - EventProcessingStatus.Done, - ]); + await this.getTxForEventProcessing(key).run(SELECT.from("sap.eventqueue.Event")); + return queueEntries.map((queueEntry) => [queueEntry.ID, EventProcessingStatus.Done]); } async checkEventAndGeneratePayload(queueEntry) { diff --git a/test/baseFunctionality.test.js b/test/baseFunctionality.test.js index 1ed2e445..b379e068 100644 --- a/test/baseFunctionality.test.js +++ b/test/baseFunctionality.test.js @@ -5,10 +5,7 @@ const path = require("path"); const cds = require("@sap/cds/lib"); const cdsHelper = require("../src/shared/cdsHelper"); -const executeInNewTransactionSpy = jest.spyOn( - cdsHelper, - "executeInNewTransaction" -); +const executeInNewTransactionSpy = jest.spyOn(cdsHelper, "executeInNewTransaction"); const eventQueue = require("../src"); const testHelper = require("./helper"); @@ -77,12 +74,7 @@ describe("baseFunctionality", () => { test("should do nothing if no lock is available", async () => { await testHelper.insertEventEntry(tx); - jest - .spyOn( - eventQueue.EventQueueProcessorBase.prototype, - "handleDistributedLock" - ) - .mockResolvedValueOnce(false); + jest.spyOn(eventQueue.EventQueueProcessorBase.prototype, "handleDistributedLock").mockResolvedValueOnce(false); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(0); @@ -101,10 +93,7 @@ describe("baseFunctionality", () => { test("handle handleDistributedLock fails", async () => { await testHelper.insertEventEntry(tx); jest - .spyOn( - eventQueue.EventQueueProcessorBase.prototype, - "handleDistributedLock" - ) + .spyOn(eventQueue.EventQueueProcessorBase.prototype, "handleDistributedLock") .mockRejectedValueOnce(new Error("lock require failed")); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); @@ -116,10 +105,7 @@ describe("baseFunctionality", () => { test("handle getQueueEntriesAndSetToInProgress fails", async () => { await testHelper.insertEventEntry(tx); jest - .spyOn( - eventQueue.EventQueueProcessorBase.prototype, - "getQueueEntriesAndSetToInProgress" - ) + .spyOn(eventQueue.EventQueueProcessorBase.prototype, "getQueueEntriesAndSetToInProgress") .mockRejectedValueOnce(new Error("db error")); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); @@ -130,11 +116,9 @@ describe("baseFunctionality", () => { test("handle modifyQueueEntry fails", async () => { await testHelper.insertEventEntry(tx); - jest - .spyOn(eventQueue.EventQueueProcessorBase.prototype, "modifyQueueEntry") - .mockImplementationOnce(() => { - throw new Error("syntax error"); - }); + jest.spyOn(eventQueue.EventQueueProcessorBase.prototype, "modifyQueueEntry").mockImplementationOnce(() => { + throw new Error("syntax error"); + }); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(1); @@ -156,14 +140,9 @@ describe("baseFunctionality", () => { test("handle clusterQueueEntries fails", async () => { await testHelper.insertEventEntry(tx); - jest - .spyOn( - eventQueue.EventQueueProcessorBase.prototype, - "clusterQueueEntries" - ) - .mockImplementationOnce(() => { - throw new Error("syntax error"); - }); + jest.spyOn(eventQueue.EventQueueProcessorBase.prototype, "clusterQueueEntries").mockImplementationOnce(() => { + throw new Error("syntax error"); + }); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(1); @@ -179,9 +158,7 @@ describe("baseFunctionality", () => { .mockImplementationOnce(async (queueEntry) => { return queueEntry.payload; }); - await tx.run( - INSERT.into("sap.eventqueue.Event").entries(eventQueueEntry) - ); + await tx.run(INSERT.into("sap.eventqueue.Event").entries(eventQueueEntry)); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(0); @@ -191,14 +168,10 @@ describe("baseFunctionality", () => { test("null as payload should be set to done", async () => { const eventQueueEntry = testHelper.getEventEntry(); eventQueueEntry.payload = undefined; - jest - .spyOn(EventQueueTest.prototype, "checkEventAndGeneratePayload") - .mockImplementationOnce(async () => { - return null; - }); - await tx.run( - INSERT.into("sap.eventqueue.Event").entries(eventQueueEntry) - ); + jest.spyOn(EventQueueTest.prototype, "checkEventAndGeneratePayload").mockImplementationOnce(async () => { + return null; + }); + await tx.run(INSERT.into("sap.eventqueue.Event").entries(eventQueueEntry)); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(0); @@ -208,14 +181,10 @@ describe("baseFunctionality", () => { test("undefined as payload should be treated as error", async () => { const eventQueueEntry = testHelper.getEventEntry(); eventQueueEntry.payload = undefined; - jest - .spyOn(EventQueueTest.prototype, "checkEventAndGeneratePayload") - .mockImplementationOnce(async () => { - return undefined; - }); - await tx.run( - INSERT.into("sap.eventqueue.Event").entries(eventQueueEntry) - ); + jest.spyOn(EventQueueTest.prototype, "checkEventAndGeneratePayload").mockImplementationOnce(async () => { + return undefined; + }); + await tx.run(INSERT.into("sap.eventqueue.Event").entries(eventQueueEntry)); const event = eventQueue.getConfigInstance().events[0]; await eventQueue.processEventQueue(context, event.type, event.subType); expect(loggerMock.callsLengths().error).toEqual(1); diff --git a/test/distributedLock.test.js b/test/distributedLock.test.js index 5af97ed8..e5540e2c 100644 --- a/test/distributedLock.test.js +++ b/test/distributedLock.test.js @@ -3,10 +3,7 @@ const cds = require("@sap/cds/lib"); const cdsHelper = require("../src/shared/cdsHelper"); -const executeInNewTransactionSpy = jest.spyOn( - cdsHelper, - "executeInNewTransaction" -); +const executeInNewTransactionSpy = jest.spyOn(cdsHelper, "executeInNewTransaction"); const { acquireLock, releaseLock } = require("../src/shared/distributedLock"); const path = require("path"); @@ -55,16 +52,12 @@ describe("distributedLock", () => { it("straight forward - acquire and release", async () => { const lockAcquired = await acquireLock(context, "key"); expect(lockAcquired).toEqual(true); - const afterAcquire = await tx.run( - SELECT.one.from("sap.eventqueue.Lock").where("code LIKE '%key%'") - ); + const afterAcquire = await tx.run(SELECT.one.from("sap.eventqueue.Lock").where("code LIKE '%key%'")); expect(afterAcquire).toBeDefined(); await releaseLock(context, "key"); - const afterRelease = await tx.run( - SELECT.one.from("sap.eventqueue.Lock").where("code LIKE '%key%'") - ); + const afterRelease = await tx.run(SELECT.one.from("sap.eventqueue.Lock").where("code LIKE '%key%'")); expect(afterRelease).toEqual(undefined); }); @@ -78,10 +71,7 @@ describe("distributedLock", () => { it("two concurrent acquire", async () => { const lockAcquiredPromise = acquireLock(context, "key"); const lockAcquiredSecondPromise = acquireLock(context, "key"); - const [lockAcquired, lockAcquiredSecond] = await Promise.all([ - lockAcquiredPromise, - lockAcquiredSecondPromise, - ]); + const [lockAcquired, lockAcquiredSecond] = await Promise.all([lockAcquiredPromise, lockAcquiredSecondPromise]); expect(lockAcquiredSecond).toEqual(!lockAcquired); }); diff --git a/test/helper.js b/test/helper.js index 38efe47d..afe8bc6c 100644 --- a/test/helper.js +++ b/test/helper.js @@ -24,13 +24,7 @@ const getEventEntry = () => { const insertEventEntry = async ( tx, - { - entries, - numberOfEntries = 1, - type = "Notifications", - subType = "Task", - randomGuid = false, - } = {} + { entries, numberOfEntries = 1, type = "Notifications", subType = "Task", randomGuid = false } = {} ) => { if (!entries || entries?.length === 0) { entries = [ @@ -66,16 +60,10 @@ const selectEventQueueAndExpectError = async (tx, expectedLength = 1) => _selectEventQueueAndExpect(tx, EventProcessingStatus.Error, expectedLength); const selectEventQueueAndExpectExceeded = async (tx, expectedLength = 1) => - _selectEventQueueAndExpect( - tx, - EventProcessingStatus.Exceeded, - expectedLength - ); + _selectEventQueueAndExpect(tx, EventProcessingStatus.Exceeded, expectedLength); const selectEventQueueAndReturn = async (tx, expectedLength = 1) => { - const events = await tx.run( - SELECT.from("sap.eventqueue.Event").columns("status", "attempts") - ); + const events = await tx.run(SELECT.from("sap.eventqueue.Event").columns("status", "attempts")); expect(events).toHaveLength(expectedLength); return events; }; diff --git a/test/initialize.test.js b/test/initialize.test.js index 2f02466a..eaf4da5f 100644 --- a/test/initialize.test.js +++ b/test/initialize.test.js @@ -42,9 +42,7 @@ describe("initialize", () => { describe("runner mode registration", () => { test("single tenant", async () => { - const singleTenantSpy = jest - .spyOn(runner, "singleTenant") - .mockReturnValueOnce(); + const singleTenantSpy = jest.spyOn(runner, "singleTenant").mockReturnValueOnce(); await eventQueue.initialize({ configFilePath, processEventsAfterPublish: false, @@ -54,9 +52,7 @@ describe("initialize", () => { test("multi tenancy with db", async () => { cds.requires.multitenancy = {}; - const multiTenancyDbSpy = jest - .spyOn(runner, "multiTenancyDb") - .mockReturnValueOnce(); + const multiTenancyDbSpy = jest.spyOn(runner, "multiTenancyDb").mockReturnValueOnce(); await eventQueue.initialize({ configFilePath, processEventsAfterPublish: false, @@ -66,9 +62,7 @@ describe("initialize", () => { }); test("calling initialize twice should only processed once", async () => { - const singleTenant = jest - .spyOn(runner, "singleTenant") - .mockReturnValueOnce(); + const singleTenant = jest.spyOn(runner, "singleTenant").mockReturnValueOnce(); const p1 = eventQueue.initialize({ configFilePath, processEventsAfterPublish: false, @@ -87,9 +81,7 @@ describe("initialize", () => { "redis-cache": [{ credentials: { hostname: "123" } }], }; eventQueue.getConfigInstance().isOnCF = true; - const multiTenancyRedisSpy = jest - .spyOn(runner, "multiTenancyRedis") - .mockReturnValueOnce(); + const multiTenancyRedisSpy = jest.spyOn(runner, "multiTenancyRedis").mockReturnValueOnce(); await eventQueue.initialize({ configFilePath, processEventsAfterPublish: false, @@ -125,9 +117,7 @@ describe("initialize", () => { expect(configInstance.processEventsAfterPublish).toEqual(true); expect(configInstance.runInterval).toEqual(5 * 60 * 1000); expect(configInstance.parallelTenantProcessing).toEqual(3); - expect(configInstance.tableNameEventQueue).toEqual( - "sap.eventqueue.Event" - ); + expect(configInstance.tableNameEventQueue).toEqual("sap.eventqueue.Event"); expect(configInstance.tableNameEventLock).toEqual("sap.eventqueue.Lock"); expect(configInstance.skipCsnCheck).toEqual(false); }); diff --git a/test/redisPubSub.test.js b/test/redisPubSub.test.js index 9fb3f932..2af6aa3f 100644 --- a/test/redisPubSub.test.js +++ b/test/redisPubSub.test.js @@ -1,10 +1,7 @@ "use strict"; const distributedLock = require("../src/shared/distributedLock"); -const checkLockExistsSpy = jest.spyOn( - distributedLock, - "checkLockExistsAndReturnValue" -); +const checkLockExistsSpy = jest.spyOn(distributedLock, "checkLockExistsAndReturnValue"); const project = __dirname + "/.."; // The project's root folder cds.test(project);