Skip to content

Commit

Permalink
Implement Throttler for frequent cleanup & ui db functions (#318)
Browse files Browse the repository at this point in the history
* Add debouncer module with test

* Move Pino bindings and Utils to envio package

* Use Pino logger in debouncer

* Update test to use logger

* Update name from delayMillis to intervalMillis

* Use debouncer instead of lazy writer for Benchmark module

* Move chainMetaData updates to using a debouncer

* Add debouncing for db cleanup functions

* Fix debouncer and add more tests

* Improve debouncer and add test

* Remove unused Debouncer function

* Add rescript schema to package.json.tmpl

* Rename debouncer to throttler everywhere

* Add resi file with function docs

* Remove promise from timeout
  • Loading branch information
JonoPrest authored Nov 6, 2024
1 parent 62f77a5 commit b4b2894
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 105 deletions.
3 changes: 2 additions & 1 deletion codegenerator/cli/npm/envio/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
},
"homepage": "https://envio.dev",
"dependencies": {
"rescript": "11.1.3"
"rescript": "11.1.3",
"rescript-schema": "8.2.0"
}
}
3 changes: 2 additions & 1 deletion codegenerator/cli/npm/envio/package.json.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"envio-darwin-arm64": "${version}"
},
"dependencies": {
"rescript": "11.1.3"
"rescript": "11.1.3",
"rescript-schema": "8.2.0"
},
"files": [
"bin.js",
Expand Down
4 changes: 3 additions & 1 deletion codegenerator/cli/npm/envio/rescript.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@
"package-specs": {
"module": "commonjs",
"in-source": true
}
},
"bs-dependencies": ["rescript-schema"],
"bsc-flags": ["-open RescriptSchema"]
}
55 changes: 55 additions & 0 deletions codegenerator/cli/npm/envio/src/Throttler.res
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
type t = {
mutable lastRunTimeMillis: float,
mutable isRunning: bool,
mutable isAwaitingInterval: bool,
mutable scheduled: option<unit => promise<unit>>,
intervalMillis: float,
logger: Pino.t,
}

let make = (~intervalMillis: int, ~logger) => {
lastRunTimeMillis: 0.,
isRunning: false,
isAwaitingInterval: false,
scheduled: None,
intervalMillis: intervalMillis->Belt.Int.toFloat,
logger,
}

let rec startInternal = async (throttler: t) => {
switch throttler {
| {scheduled: Some(fn), isRunning: false, isAwaitingInterval: false} =>
let timeSinceLastRun = Js.Date.now() -. throttler.lastRunTimeMillis

//Only execute if we are passed the interval
if timeSinceLastRun >= throttler.intervalMillis {
throttler.isRunning = true
throttler.scheduled = None
throttler.lastRunTimeMillis = Js.Date.now()

switch await fn() {
| exception exn =>
throttler.logger->Pino.errorExn(
Pino.createPinoMessageWithError("Scheduled action failed in throttler", exn),
)
| _ => ()
}
throttler.isRunning = false

await throttler->startInternal
} else {
//Store isAwaitingInterval in state so that timers don't continuously get created
throttler.isAwaitingInterval = true
let _ = Js.Global.setTimeout(() => {
throttler.isAwaitingInterval = false
throttler->startInternal->ignore
}, Belt.Int.fromFloat(throttler.intervalMillis -. timeSinceLastRun))
}
| _ => ()
}
}

let schedule = (throttler: t, fn) => {
throttler.scheduled = Some(fn)
throttler->startInternal->ignore
}
26 changes: 26 additions & 0 deletions codegenerator/cli/npm/envio/src/Throttler.resi
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
Throttles a scheduled function to run at a minimum given interval

Does NOT queue scheduled functions but rather overwrites them
on each schedule call.
*/
type t

/**
Creates a throttler that throttles scheduled functions to run at a minimum
given interval in milliseconds.

Does NOT queue scheduled functions but rather overwrites them

The logger will be used to log any errors that occur in the scheduled
functions.
*/
let make: (~intervalMillis: int, ~logger: Pino.t) => t

/**
Schedules a function to be run on a throttler, overwriting any
previously scheduled functions. Should only be used for functions
that do not need to be executed if there is a more up to date scheduled
function available.
*/
let schedule: (t, unit => promise<unit>) => unit
File renamed without changes.
31 changes: 0 additions & 31 deletions codegenerator/cli/templates/static/codegen/src/AsyncTaskQueue.res

This file was deleted.

43 changes: 5 additions & 38 deletions codegenerator/cli/templates/static/codegen/src/Benchmark.res
Original file line number Diff line number Diff line change
Expand Up @@ -120,44 +120,11 @@ module Data = {
}
}

module LazyWriter = {
let isWriting = ref(false)
let scheduledWriteFn: ref<option<unit => promise<unit>>> = ref(None)
let lastRunTimeMillis = ref(0.)

let rec start = async () => {
switch (scheduledWriteFn.contents, isWriting.contents) {
| (Some(fn), false) =>
isWriting := true
scheduledWriteFn := None
lastRunTimeMillis := Js.Date.now()

switch await fn() {
| exception exn => Logging.errorWithExn(exn, "Failed to write benchmark cache file")
| _ => ()
}
isWriting := false
await start()
| _ => ()
}
}

let schedule = (~intervalMillis=500, fn) => {
scheduledWriteFn := Some(fn)
if !isWriting.contents {
let timeSinceLastRun = Js.Date.now() -. lastRunTimeMillis.contents
if timeSinceLastRun >= intervalMillis->Belt.Int.toFloat {
start()->ignore
} else {
let _ = Js.Global.setTimeout(() => {
start()->ignore
}, intervalMillis - timeSinceLastRun->Belt.Float.toInt)
}
}
}
}

let data = Data.make()
let throttler = Throttler.make(
~intervalMillis=500,
~logger=Logging.createChild(~params={"context": "Benchmarking framework"}),
)
let cacheFileName = "BenchmarkCache.json"
let cacheFilePath = NodeJsLocal.Path.join(NodeJsLocal.Path.__dirname, cacheFileName)

Expand All @@ -166,7 +133,7 @@ let saveToCacheFile = data => {
let json = data->S.serializeToJsonStringOrRaiseWith(Data.schema)
NodeJsLocal.Fs.Promises.writeFile(~filepath=cacheFilePath, ~content=json)
}
LazyWriter.schedule(write)
throttler->Throttler.schedule(write)
}

let readFromCacheFile = async () => {
Expand Down
11 changes: 11 additions & 0 deletions codegenerator/cli/templates/static/codegen/src/Env.res
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,16 @@ module Configurable = {
}
}

module ThrottleWrites = {
let chainMetadataIntervalMillis =
envSafe->EnvSafe.get("ENVIO_THROTTLE_CHAIN_METADATA_INTERVAL_MILLIS", S.int, ~devFallback=500)
let pruneStaleDataIntervalMillis =
envSafe->EnvSafe.get(
"ENVIO_THROTTLE_PRUNE_STALE_DATA_INTERVAL_MILLIS",
S.int,
~devFallback=10_000,
)
}

// You need to close the envSafe after you're done with it so that it immediately tells you about your misconfigured environment on startup.
envSafe->EnvSafe.close
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,33 @@ open Belt
type chain = ChainMap.Chain.t
type rollbackState = NoRollback | RollingBack(chain) | RollbackInMemStore(InMemoryStore.t)

module WriteThrottlers = {
type t = {chainMetaData: Throttler.t, pruneStaleData: ChainMap.t<Throttler.t>}
let make = (~config: Config.t): t => {
let chainMetaData = {
let intervalMillis = Env.ThrottleWrites.chainMetadataIntervalMillis
let logger = Logging.createChild(
~params={
"context": "Throttler for chain metadata writes",
"intervalMillis": intervalMillis,
},
)
Throttler.make(~intervalMillis, ~logger)
}
let pruneStaleData = config.chainMap->ChainMap.map(cfg => {
let intervalMillis = Env.ThrottleWrites.pruneStaleDataIntervalMillis
let logger = Logging.createChild(
~params={
"context": "Throttler for pruning stale endblock and entity history data",
"intervalMillis": intervalMillis,
"chain": cfg.chain,
},
)
Throttler.make(~intervalMillis, ~logger)
})
{chainMetaData, pruneStaleData}
}
}
type t = {
config: Config.t,
chainManager: ChainManager.t,
Expand All @@ -11,7 +38,7 @@ type t = {
maxBatchSize: int,
maxPerChainQueueSize: int,
indexerStartTime: Js.Date.t,
asyncTaskQueue: AsyncTaskQueue.t,
writeThrottlers: WriteThrottlers.t,
loadLayer: LoadLayer.t,
//Initialized as 0, increments, when rollbacks occur to invalidate
//responses based on the wrong stateId
Expand All @@ -29,7 +56,7 @@ let make = (~config, ~chainManager, ~loadLayer) => {
},
indexerStartTime: Js.Date.make(),
rollbackState: NoRollback,
asyncTaskQueue: AsyncTaskQueue.make(),
writeThrottlers: WriteThrottlers.make(~config),
loadLayer,
id: 0,
}
Expand Down Expand Up @@ -94,7 +121,7 @@ let updateChainFetcherCurrentBlockHeight = (chainFetcher: ChainFetcher.t, ~curre
}
}

let updateChainMetadataTable = async (cm: ChainManager.t, ~asyncTaskQueue: AsyncTaskQueue.t) => {
let updateChainMetadataTable = async (cm: ChainManager.t, ~throttler: Throttler.t) => {
let chainMetadataArray: array<DbFunctions.ChainMetadata.chainMetadata> =
cm.chainFetchers
->ChainMap.values
Expand All @@ -121,7 +148,7 @@ let updateChainMetadataTable = async (cm: ChainManager.t, ~asyncTaskQueue: Async
chainMetadata
})
//Don't await this set, it can happen in its own time
await asyncTaskQueue->AsyncTaskQueue.add(() =>
throttler->Throttler.schedule(() =>
DbFunctions.ChainMetadata.batchSetChainMetadataRow(~chainMetadataArray)
)
}
Expand Down Expand Up @@ -846,32 +873,10 @@ let injectedTaskReducer = (
nextEndOfBlockRangeScannedData,
}) =>
let timeRef = Hrtime.makeTimer()
await DbFunctions.sql->Postgres.beginSql(sql => {
[
DbFunctions.EndOfBlockRangeScannedData.setEndOfBlockRangeScannedData(
sql,
nextEndOfBlockRangeScannedData,
),
DbFunctions.EndOfBlockRangeScannedData.deleteStaleEndOfBlockRangeScannedDataForChain(
sql,
~chainId=chain->ChainMap.Chain.toChainId,
~blockTimestampThreshold,
~blockNumberThreshold,
),
]->Array.concat(
//only prune history if we are not saving full history
state.config->Config.shouldPruneHistory
? [
DbFunctions.EntityHistory.deleteAllEntityHistoryOnChainBeforeThreshold(
sql,
~chainId=chain->ChainMap.Chain.toChainId,
~blockNumberThreshold,
~blockTimestampThreshold,
),
]
: [],
)
})
await DbFunctions.sql->DbFunctions.EndOfBlockRangeScannedData.setEndOfBlockRangeScannedData(
nextEndOfBlockRangeScannedData,
)

if Env.saveBenchmarkData {
let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis
Benchmark.addSummaryData(
Expand All @@ -880,14 +885,47 @@ let injectedTaskReducer = (
~value=elapsedTimeMillis->Belt.Int.toFloat,
)
}

//These prune functions can be scheduled and throttled if a more recent prune function gets called
//before the current one is executed
let runPruneFunctions = async () => {
let timeRef = Hrtime.makeTimer()
await DbFunctions.sql->DbFunctions.EndOfBlockRangeScannedData.deleteStaleEndOfBlockRangeScannedDataForChain(
~chainId=chain->ChainMap.Chain.toChainId,
~blockTimestampThreshold,
~blockNumberThreshold,
)

if state.config->Config.shouldPruneHistory {
await DbFunctions.sql->DbFunctions.EntityHistory.deleteAllEntityHistoryOnChainBeforeThreshold(
~chainId=chain->ChainMap.Chain.toChainId,
~blockNumberThreshold,
~blockTimestampThreshold,
)
}

if Env.saveBenchmarkData {
let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.intFromMillis
Benchmark.addSummaryData(
~group="Other",
~label=`Chain ${chain->ChainMap.Chain.toString} PruneStaleData (ms)`,
~value=elapsedTimeMillis->Belt.Int.toFloat,
)
}
}

let throttler = state.writeThrottlers.pruneStaleData->ChainMap.get(chain)
throttler->Throttler.schedule(runPruneFunctions)

| UpdateChainMetaDataAndCheckForExit(shouldExit) =>
let {chainManager, asyncTaskQueue} = state
let {chainManager, writeThrottlers} = state
switch shouldExit {
| ExitWithSuccess =>
updateChainMetadataTable(chainManager, ~asyncTaskQueue)
updateChainMetadataTable(chainManager, ~throttler=writeThrottlers.chainMetaData)
->Promise.thenResolve(_ => dispatchAction(SuccessExit))
->ignore
| NoExit => updateChainMetadataTable(chainManager, ~asyncTaskQueue)->ignore
| NoExit =>
updateChainMetadataTable(chainManager, ~throttler=writeThrottlers.chainMetaData)->ignore
}
| NextQuery(chainCheck) =>
let fetchForChain = checkAndFetchForChain(
Expand Down
1 change: 1 addition & 0 deletions scenarios/erc20_multichain_factory/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scenarios/test_codegen/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b4b2894

Please sign in to comment.