From b4b2894ad90ebe67d7e9be6761e0ffe0d66d72f5 Mon Sep 17 00:00:00 2001 From: Jono Prest <65739024+JonoPrest@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:04:52 +0200 Subject: [PATCH] Implement Throttler for frequent cleanup & ui db functions (#318) * Add debouncer module with test * Move Pino bindings and Utils to envio package * Use Pino logger in debouncer * Update test to use logger * Update name from delayMillis to intervalMillis * Use debouncer instead of lazy writer for Benchmark module * Move chainMetaData updates to using a debouncer * Add debouncing for db cleanup functions * Fix debouncer and add more tests * Improve debouncer and add test * Remove unused Debouncer function * Add rescript schema to package.json.tmpl * Rename debouncer to throttler everywhere * Add resi file with function docs * Remove promise from timeout --- codegenerator/cli/npm/envio/package.json | 3 +- codegenerator/cli/npm/envio/package.json.tmpl | 3 +- codegenerator/cli/npm/envio/rescript.json | 4 +- codegenerator/cli/npm/envio/src/Throttler.res | 55 +++++++++ .../cli/npm/envio/src/Throttler.resi | 26 +++++ .../codegen => npm/envio}/src/Utils.res | 0 .../envio}/src/bindings/Pino.res | 0 .../static/codegen/src/AsyncTaskQueue.res | 31 ----- .../static/codegen/src/Benchmark.res | 43 +------ .../cli/templates/static/codegen/src/Env.res | 11 ++ .../codegen/src/globalState/GlobalState.res | 104 +++++++++++------ .../erc20_multichain_factory/pnpm-lock.yaml | 1 + scenarios/test_codegen/pnpm-lock.yaml | 1 + .../test/lib_tests/Throttler_test.res | 108 ++++++++++++++++++ 14 files changed, 285 insertions(+), 105 deletions(-) create mode 100644 codegenerator/cli/npm/envio/src/Throttler.res create mode 100644 codegenerator/cli/npm/envio/src/Throttler.resi rename codegenerator/cli/{templates/static/codegen => npm/envio}/src/Utils.res (100%) rename codegenerator/cli/{templates/static/codegen => npm/envio}/src/bindings/Pino.res (100%) delete mode 100644 codegenerator/cli/templates/static/codegen/src/AsyncTaskQueue.res create mode 100644 scenarios/test_codegen/test/lib_tests/Throttler_test.res diff --git a/codegenerator/cli/npm/envio/package.json b/codegenerator/cli/npm/envio/package.json index ffa5c8f85..8d9657cec 100644 --- a/codegenerator/cli/npm/envio/package.json +++ b/codegenerator/cli/npm/envio/package.json @@ -18,6 +18,7 @@ }, "homepage": "https://envio.dev", "dependencies": { - "rescript": "11.1.3" + "rescript": "11.1.3", + "rescript-schema": "8.2.0" } } diff --git a/codegenerator/cli/npm/envio/package.json.tmpl b/codegenerator/cli/npm/envio/package.json.tmpl index 7eee40e59..30ff99375 100644 --- a/codegenerator/cli/npm/envio/package.json.tmpl +++ b/codegenerator/cli/npm/envio/package.json.tmpl @@ -29,7 +29,8 @@ "envio-darwin-arm64": "${version}" }, "dependencies": { - "rescript": "11.1.3" + "rescript": "11.1.3", + "rescript-schema": "8.2.0" }, "files": [ "bin.js", diff --git a/codegenerator/cli/npm/envio/rescript.json b/codegenerator/cli/npm/envio/rescript.json index 4bd7eeef9..4d4bad4b5 100644 --- a/codegenerator/cli/npm/envio/rescript.json +++ b/codegenerator/cli/npm/envio/rescript.json @@ -11,5 +11,7 @@ "package-specs": { "module": "commonjs", "in-source": true - } + }, + "bs-dependencies": ["rescript-schema"], + "bsc-flags": ["-open RescriptSchema"] } diff --git a/codegenerator/cli/npm/envio/src/Throttler.res b/codegenerator/cli/npm/envio/src/Throttler.res new file mode 100644 index 000000000..420fd6348 --- /dev/null +++ b/codegenerator/cli/npm/envio/src/Throttler.res @@ -0,0 +1,55 @@ +type t = { + mutable lastRunTimeMillis: float, + mutable isRunning: bool, + mutable isAwaitingInterval: bool, + mutable scheduled: option promise>, + intervalMillis: float, + logger: Pino.t, +} + +let make = (~intervalMillis: int, ~logger) => { + lastRunTimeMillis: 0., + isRunning: false, + isAwaitingInterval: false, + scheduled: None, + intervalMillis: intervalMillis->Belt.Int.toFloat, + logger, +} + +let rec startInternal = async (throttler: t) => { + switch throttler { + | {scheduled: Some(fn), isRunning: false, isAwaitingInterval: false} => + let timeSinceLastRun = Js.Date.now() -. throttler.lastRunTimeMillis + + //Only execute if we are passed the interval + if timeSinceLastRun >= throttler.intervalMillis { + throttler.isRunning = true + throttler.scheduled = None + throttler.lastRunTimeMillis = Js.Date.now() + + switch await fn() { + | exception exn => + throttler.logger->Pino.errorExn( + Pino.createPinoMessageWithError("Scheduled action failed in throttler", exn), + ) + | _ => () + } + throttler.isRunning = false + + await throttler->startInternal + } else { + //Store isAwaitingInterval in state so that timers don't continuously get created + throttler.isAwaitingInterval = true + let _ = Js.Global.setTimeout(() => { + throttler.isAwaitingInterval = false + throttler->startInternal->ignore + }, Belt.Int.fromFloat(throttler.intervalMillis -. timeSinceLastRun)) + } + | _ => () + } +} + +let schedule = (throttler: t, fn) => { + throttler.scheduled = Some(fn) + throttler->startInternal->ignore +} diff --git a/codegenerator/cli/npm/envio/src/Throttler.resi b/codegenerator/cli/npm/envio/src/Throttler.resi new file mode 100644 index 000000000..e72de1b46 --- /dev/null +++ b/codegenerator/cli/npm/envio/src/Throttler.resi @@ -0,0 +1,26 @@ +/** +Throttles a scheduled function to run at a minimum given interval + +Does NOT queue scheduled functions but rather overwrites them +on each schedule call. +*/ +type t + +/** +Creates a throttler that throttles scheduled functions to run at a minimum +given interval in milliseconds. + +Does NOT queue scheduled functions but rather overwrites them + +The logger will be used to log any errors that occur in the scheduled +functions. +*/ +let make: (~intervalMillis: int, ~logger: Pino.t) => t + +/** +Schedules a function to be run on a throttler, overwriting any +previously scheduled functions. Should only be used for functions +that do not need to be executed if there is a more up to date scheduled +function available. +*/ +let schedule: (t, unit => promise) => unit diff --git a/codegenerator/cli/templates/static/codegen/src/Utils.res b/codegenerator/cli/npm/envio/src/Utils.res similarity index 100% rename from codegenerator/cli/templates/static/codegen/src/Utils.res rename to codegenerator/cli/npm/envio/src/Utils.res diff --git a/codegenerator/cli/templates/static/codegen/src/bindings/Pino.res b/codegenerator/cli/npm/envio/src/bindings/Pino.res similarity index 100% rename from codegenerator/cli/templates/static/codegen/src/bindings/Pino.res rename to codegenerator/cli/npm/envio/src/bindings/Pino.res diff --git a/codegenerator/cli/templates/static/codegen/src/AsyncTaskQueue.res b/codegenerator/cli/templates/static/codegen/src/AsyncTaskQueue.res deleted file mode 100644 index 447186a63..000000000 --- a/codegenerator/cli/templates/static/codegen/src/AsyncTaskQueue.res +++ /dev/null @@ -1,31 +0,0 @@ -/** -Used for managing sequential execution of async tasks - -Currently only implemented with concurrency level of 1 -*/ -type t = { - queue: SDSL.Queue.t promise>, - mutable isProcessing: bool, -} - -let make = (): t => {queue: SDSL.Queue.make(), isProcessing: false} - -let processQueue = async (~logger=?, self) => { - if !self.isProcessing { - self.isProcessing = true - while self.isProcessing { - switch self.queue->SDSL.Queue.pop { - | Some(fn) => await fn->Time.retryAsyncWithExponentialBackOff(~logger) - | None => self.isProcessing = false - } - } - } -} - -let add = (~logger=?, self, fn) => { - Promise.make((res, _) => { - let wrappedFn = () => fn()->Promise.thenResolve(() => res(. ())) - let _size = self.queue->SDSL.Queue.push(wrappedFn) - let _ = self->processQueue(~logger?) - }) -} diff --git a/codegenerator/cli/templates/static/codegen/src/Benchmark.res b/codegenerator/cli/templates/static/codegen/src/Benchmark.res index 50a57d105..270d6df29 100644 --- a/codegenerator/cli/templates/static/codegen/src/Benchmark.res +++ b/codegenerator/cli/templates/static/codegen/src/Benchmark.res @@ -120,44 +120,11 @@ module Data = { } } -module LazyWriter = { - let isWriting = ref(false) - let scheduledWriteFn: ref promise>> = ref(None) - let lastRunTimeMillis = ref(0.) - - let rec start = async () => { - switch (scheduledWriteFn.contents, isWriting.contents) { - | (Some(fn), false) => - isWriting := true - scheduledWriteFn := None - lastRunTimeMillis := Js.Date.now() - - switch await fn() { - | exception exn => Logging.errorWithExn(exn, "Failed to write benchmark cache file") - | _ => () - } - isWriting := false - await start() - | _ => () - } - } - - let schedule = (~intervalMillis=500, fn) => { - scheduledWriteFn := Some(fn) - if !isWriting.contents { - let timeSinceLastRun = Js.Date.now() -. lastRunTimeMillis.contents - if timeSinceLastRun >= intervalMillis->Belt.Int.toFloat { - start()->ignore - } else { - let _ = Js.Global.setTimeout(() => { - start()->ignore - }, intervalMillis - timeSinceLastRun->Belt.Float.toInt) - } - } - } -} - let data = Data.make() +let throttler = Throttler.make( + ~intervalMillis=500, + ~logger=Logging.createChild(~params={"context": "Benchmarking framework"}), +) let cacheFileName = "BenchmarkCache.json" let cacheFilePath = NodeJsLocal.Path.join(NodeJsLocal.Path.__dirname, cacheFileName) @@ -166,7 +133,7 @@ let saveToCacheFile = data => { let json = data->S.serializeToJsonStringOrRaiseWith(Data.schema) NodeJsLocal.Fs.Promises.writeFile(~filepath=cacheFilePath, ~content=json) } - LazyWriter.schedule(write) + throttler->Throttler.schedule(write) } let readFromCacheFile = async () => { diff --git a/codegenerator/cli/templates/static/codegen/src/Env.res b/codegenerator/cli/templates/static/codegen/src/Env.res index d433ffe4c..157fe41c8 100644 --- a/codegenerator/cli/templates/static/codegen/src/Env.res +++ b/codegenerator/cli/templates/static/codegen/src/Env.res @@ -103,5 +103,16 @@ module Configurable = { } } +module ThrottleWrites = { + let chainMetadataIntervalMillis = + envSafe->EnvSafe.get("ENVIO_THROTTLE_CHAIN_METADATA_INTERVAL_MILLIS", S.int, ~devFallback=500) + let pruneStaleDataIntervalMillis = + envSafe->EnvSafe.get( + "ENVIO_THROTTLE_PRUNE_STALE_DATA_INTERVAL_MILLIS", + S.int, + ~devFallback=10_000, + ) +} + // You need to close the envSafe after you're done with it so that it immediately tells you about your misconfigured environment on startup. envSafe->EnvSafe.close diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index d03146aaa..23cd2cb89 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -3,6 +3,33 @@ open Belt type chain = ChainMap.Chain.t type rollbackState = NoRollback | RollingBack(chain) | RollbackInMemStore(InMemoryStore.t) +module WriteThrottlers = { + type t = {chainMetaData: Throttler.t, pruneStaleData: ChainMap.t} + let make = (~config: Config.t): t => { + let chainMetaData = { + let intervalMillis = Env.ThrottleWrites.chainMetadataIntervalMillis + let logger = Logging.createChild( + ~params={ + "context": "Throttler for chain metadata writes", + "intervalMillis": intervalMillis, + }, + ) + Throttler.make(~intervalMillis, ~logger) + } + let pruneStaleData = config.chainMap->ChainMap.map(cfg => { + let intervalMillis = Env.ThrottleWrites.pruneStaleDataIntervalMillis + let logger = Logging.createChild( + ~params={ + "context": "Throttler for pruning stale endblock and entity history data", + "intervalMillis": intervalMillis, + "chain": cfg.chain, + }, + ) + Throttler.make(~intervalMillis, ~logger) + }) + {chainMetaData, pruneStaleData} + } +} type t = { config: Config.t, chainManager: ChainManager.t, @@ -11,7 +38,7 @@ type t = { maxBatchSize: int, maxPerChainQueueSize: int, indexerStartTime: Js.Date.t, - asyncTaskQueue: AsyncTaskQueue.t, + writeThrottlers: WriteThrottlers.t, loadLayer: LoadLayer.t, //Initialized as 0, increments, when rollbacks occur to invalidate //responses based on the wrong stateId @@ -29,7 +56,7 @@ let make = (~config, ~chainManager, ~loadLayer) => { }, indexerStartTime: Js.Date.make(), rollbackState: NoRollback, - asyncTaskQueue: AsyncTaskQueue.make(), + writeThrottlers: WriteThrottlers.make(~config), loadLayer, id: 0, } @@ -94,7 +121,7 @@ let updateChainFetcherCurrentBlockHeight = (chainFetcher: ChainFetcher.t, ~curre } } -let updateChainMetadataTable = async (cm: ChainManager.t, ~asyncTaskQueue: AsyncTaskQueue.t) => { +let updateChainMetadataTable = async (cm: ChainManager.t, ~throttler: Throttler.t) => { let chainMetadataArray: array = cm.chainFetchers ->ChainMap.values @@ -121,7 +148,7 @@ let updateChainMetadataTable = async (cm: ChainManager.t, ~asyncTaskQueue: Async chainMetadata }) //Don't await this set, it can happen in its own time - await asyncTaskQueue->AsyncTaskQueue.add(() => + throttler->Throttler.schedule(() => DbFunctions.ChainMetadata.batchSetChainMetadataRow(~chainMetadataArray) ) } @@ -846,32 +873,10 @@ let injectedTaskReducer = ( nextEndOfBlockRangeScannedData, }) => let timeRef = Hrtime.makeTimer() - await DbFunctions.sql->Postgres.beginSql(sql => { - [ - DbFunctions.EndOfBlockRangeScannedData.setEndOfBlockRangeScannedData( - sql, - nextEndOfBlockRangeScannedData, - ), - DbFunctions.EndOfBlockRangeScannedData.deleteStaleEndOfBlockRangeScannedDataForChain( - sql, - ~chainId=chain->ChainMap.Chain.toChainId, - ~blockTimestampThreshold, - ~blockNumberThreshold, - ), - ]->Array.concat( - //only prune history if we are not saving full history - state.config->Config.shouldPruneHistory - ? [ - DbFunctions.EntityHistory.deleteAllEntityHistoryOnChainBeforeThreshold( - sql, - ~chainId=chain->ChainMap.Chain.toChainId, - ~blockNumberThreshold, - ~blockTimestampThreshold, - ), - ] - : [], - ) - }) + await DbFunctions.sql->DbFunctions.EndOfBlockRangeScannedData.setEndOfBlockRangeScannedData( + nextEndOfBlockRangeScannedData, + ) + if Env.saveBenchmarkData { let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis Benchmark.addSummaryData( @@ -880,14 +885,47 @@ let injectedTaskReducer = ( ~value=elapsedTimeMillis->Belt.Int.toFloat, ) } + + //These prune functions can be scheduled and throttled if a more recent prune function gets called + //before the current one is executed + let runPruneFunctions = async () => { + let timeRef = Hrtime.makeTimer() + await DbFunctions.sql->DbFunctions.EndOfBlockRangeScannedData.deleteStaleEndOfBlockRangeScannedDataForChain( + ~chainId=chain->ChainMap.Chain.toChainId, + ~blockTimestampThreshold, + ~blockNumberThreshold, + ) + + if state.config->Config.shouldPruneHistory { + await DbFunctions.sql->DbFunctions.EntityHistory.deleteAllEntityHistoryOnChainBeforeThreshold( + ~chainId=chain->ChainMap.Chain.toChainId, + ~blockNumberThreshold, + ~blockTimestampThreshold, + ) + } + + if Env.saveBenchmarkData { + let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis + Benchmark.addSummaryData( + ~group="Other", + ~label=`Chain ${chain->ChainMap.Chain.toString} PruneStaleData (ms)`, + ~value=elapsedTimeMillis->Belt.Int.toFloat, + ) + } + } + + let throttler = state.writeThrottlers.pruneStaleData->ChainMap.get(chain) + throttler->Throttler.schedule(runPruneFunctions) + | UpdateChainMetaDataAndCheckForExit(shouldExit) => - let {chainManager, asyncTaskQueue} = state + let {chainManager, writeThrottlers} = state switch shouldExit { | ExitWithSuccess => - updateChainMetadataTable(chainManager, ~asyncTaskQueue) + updateChainMetadataTable(chainManager, ~throttler=writeThrottlers.chainMetaData) ->Promise.thenResolve(_ => dispatchAction(SuccessExit)) ->ignore - | NoExit => updateChainMetadataTable(chainManager, ~asyncTaskQueue)->ignore + | NoExit => + updateChainMetadataTable(chainManager, ~throttler=writeThrottlers.chainMetaData)->ignore } | NextQuery(chainCheck) => let fetchForChain = checkAndFetchForChain( diff --git a/scenarios/erc20_multichain_factory/pnpm-lock.yaml b/scenarios/erc20_multichain_factory/pnpm-lock.yaml index 4cf1662c1..c050a4a5e 100644 --- a/scenarios/erc20_multichain_factory/pnpm-lock.yaml +++ b/scenarios/erc20_multichain_factory/pnpm-lock.yaml @@ -1683,6 +1683,7 @@ snapshots: envio@file:../../codegenerator/cli/npm/envio: dependencies: rescript: 11.1.3 + rescript-schema: 8.2.0(rescript@11.1.3) es-define-property@1.0.0: dependencies: diff --git a/scenarios/test_codegen/pnpm-lock.yaml b/scenarios/test_codegen/pnpm-lock.yaml index a08b87d02..4473ed923 100644 --- a/scenarios/test_codegen/pnpm-lock.yaml +++ b/scenarios/test_codegen/pnpm-lock.yaml @@ -5209,6 +5209,7 @@ snapshots: envio@file:../../codegenerator/cli/npm/envio: dependencies: rescript: 11.1.3 + rescript-schema: 8.2.0(rescript@11.1.3) error-ex@1.3.2: dependencies: diff --git a/scenarios/test_codegen/test/lib_tests/Throttler_test.res b/scenarios/test_codegen/test/lib_tests/Throttler_test.res new file mode 100644 index 000000000..284d0d27a --- /dev/null +++ b/scenarios/test_codegen/test/lib_tests/Throttler_test.res @@ -0,0 +1,108 @@ +open RescriptMocha + +describe("Throttler", () => { + Async.it("Schedules and throttles functions as expected", async () => { + let throttler = Throttler.make(~intervalMillis=10, ~logger=Logging.logger) + let actionsCalled = [] + + throttler->Throttler.schedule(async () => actionsCalled->Js.Array2.push(1)->ignore) + throttler->Throttler.schedule( + async () => { + actionsCalled->Js.Array2.push(2)->ignore + Assert.fail("Should have throttled 2nd scheduled fn in favour of following") + }, + ) + throttler->Throttler.schedule(async () => actionsCalled->Js.Array2.push(3)->ignore) + + Assert.deepEqual(actionsCalled, [1], ~message="Should have immediately called scheduled fn") + await Time.resolvePromiseAfterDelay(~delayMilliseconds=11) + Assert.deepEqual( + actionsCalled, + [1, 3], + ~message="Should have called latest scheduled fn after delay", + ) + }) + + Async.it("Does not continuously increase schedule time", async () => { + let throttler = Throttler.make(~intervalMillis=20, ~logger=Logging.logger) + let actionsCalled = [] + throttler->Throttler.schedule(async () => actionsCalled->Js.Array2.push(1)->ignore) + await Time.resolvePromiseAfterDelay(~delayMilliseconds=10) + throttler->Throttler.schedule(async () => actionsCalled->Js.Array2.push(2)->ignore) + Assert.deepEqual(actionsCalled, [1], ~message="Scheduler should still be waiting for interval") + await Time.resolvePromiseAfterDelay(~delayMilliseconds=11) + Assert.deepEqual( + actionsCalled, + [1, 2], + ~message="Scheduler should have been called straight after the initial interval", + ) + }) + + Async.it("Does not run until previous task is finished", async () => { + let throttler = Throttler.make(~intervalMillis=10, ~logger=Logging.logger) + let actionsCalled = [] + throttler->Throttler.schedule( + async () => { + await Time.resolvePromiseAfterDelay(~delayMilliseconds=13) + actionsCalled->Js.Array2.push(1)->ignore + }, + ) + + throttler->Throttler.schedule( + async () => { + actionsCalled->Js.Array2.push(2)->ignore + }, + ) + + Assert.deepEqual(actionsCalled, [], ~message="First task is still busy") + + await Time.resolvePromiseAfterDelay(~delayMilliseconds=11) + Assert.deepEqual( + actionsCalled, + [], + ~message="Second task has not executed even though passed interval", + ) + + await Time.resolvePromiseAfterDelay(~delayMilliseconds=5) + + Assert.deepEqual( + actionsCalled, + [1, 2], + ~message="Should have finished task one and execute task two immediately", + ) + }) + + Async.it( + "Does not immediately execute after a task has finished if below the interval", + async () => { + let throttler = Throttler.make(~intervalMillis=10, ~logger=Logging.logger) + let actionsCalled = [] + throttler->Throttler.schedule( + async () => { + await Time.resolvePromiseAfterDelay(~delayMilliseconds=5) + actionsCalled->Js.Array2.push(1)->ignore + }, + ) + throttler->Throttler.schedule( + async () => { + actionsCalled->Js.Array2.push(2)->ignore + }, + ) + + Assert.deepEqual(actionsCalled, [], ~message="First task is still busy") + await Time.resolvePromiseAfterDelay(~delayMilliseconds=6) + Assert.deepEqual( + actionsCalled, + [1], + ~message="First action finished, second action waiting for interval", + ) + + await Time.resolvePromiseAfterDelay(~delayMilliseconds=5) + Assert.deepEqual( + actionsCalled, + [1, 2], + ~message="Second action should have been called after the interval as passed", + ) + }, + ) +})