Skip to content

Commit

Permalink
Implement start saving history the near the head (#235)
Browse files Browse the repository at this point in the history
* Implement add saving history at the near the head

* Get isInReorgThreshold from persisted db values

* Fix tests

---------

Co-authored-by: Dmitry Zakharov <[email protected]>
  • Loading branch information
JonoPrest and DZakh authored Oct 1, 2024
1 parent 2da2a6c commit 8e0f015
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 98 deletions.
52 changes: 42 additions & 10 deletions codegenerator/cli/templates/dynamic/codegen/src/ContextEnv.res.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ let getUserLogger = (logger): Logs.userLogger => {
logger->Logging.uerrorWithExn(exn, message),
}

let makeEventIdentifier = (eventBatchQueueItem: Types.eventBatchQueueItem): Types.eventIdentifier => {
let makeEventIdentifier = (
eventBatchQueueItem: Types.eventBatchQueueItem,
): Types.eventIdentifier => {
let {event, blockNumber, timestamp} = eventBatchQueueItem
{
chainId: event.chainId,
Expand All @@ -29,7 +31,10 @@ let makeEventIdentifier = (eventBatchQueueItem: Types.eventBatchQueueItem): Type
}

let getEventId = (eventBatchQueueItem: Types.eventBatchQueueItem) => {
EventUtils.packEventIndex(~blockNumber=eventBatchQueueItem.blockNumber, ~logIndex=eventBatchQueueItem.event.logIndex)
EventUtils.packEventIndex(
~blockNumber=eventBatchQueueItem.blockNumber,
~logIndex=eventBatchQueueItem.event.logIndex,
)
}

let make = (~eventBatchQueueItem: Types.eventBatchQueueItem, ~logger) => {
Expand Down Expand Up @@ -86,8 +91,21 @@ let makeDynamicContractRegisterFn = (~contextEnv: t, ~contractName, ~inMemorySto
)
}

let makeWhereLoader = (loadLayer, ~entityMod, ~inMemoryStore, ~fieldName, ~fieldValueSchema, ~logger) => {
Entities.eq: loadLayer->LoadLayer.makeWhereEqLoader(~entityMod, ~fieldName, ~fieldValueSchema, ~inMemoryStore, ~logger)
let makeWhereLoader = (
loadLayer,
~entityMod,
~inMemoryStore,
~fieldName,
~fieldValueSchema,
~logger,
) => {
Entities.eq: loadLayer->LoadLayer.makeWhereEqLoader(
~entityMod,
~fieldName,
~fieldValueSchema,
~inMemoryStore,
~logger,
),
}

let makeEntityHandlerContext = (
Expand All @@ -98,20 +116,22 @@ let makeEntityHandlerContext = (
~logger,
~getKey,
~loadLayer,
~isInReorgThreshold,
): entityHandlerContext<entity> => {
let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityMod)
let shouldRollbackOnReorg = RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg
let shouldSaveHistory =
RegisterHandlers.getConfig()->Config.shouldSaveHistory(~isInReorgThreshold)
{
set: entity => {
inMemTable->InMemoryTable.Entity.set(
Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId=getKey(entity)),
~shouldRollbackOnReorg,
~shouldSaveHistory,
)
},
deleteUnsafe: entityId => {
inMemTable->InMemoryTable.Entity.set(
Delete->Types.mkEntityUpdate(~eventIdentifier, ~entityId),
~shouldRollbackOnReorg,
~shouldSaveHistory,
)
},
get: loadLayer->LoadLayer.makeLoader(~entityMod, ~logger, ~inMemoryStore),
Expand Down Expand Up @@ -152,7 +172,12 @@ let getLoaderContext = (contextEnv: t, ~inMemoryStore: InMemoryStore.t, ~loadLay
}
}

let getHandlerContext = (context, ~inMemoryStore: InMemoryStore.t, ~loadLayer) => {
let getHandlerContext = (
context,
~inMemoryStore: InMemoryStore.t,
~loadLayer,
~isInReorgThreshold,
) => {
let {eventBatchQueueItem, logger} = context

let eventIdentifier = eventBatchQueueItem->makeEventIdentifier
Expand All @@ -166,6 +191,7 @@ let getHandlerContext = (context, ~inMemoryStore: InMemoryStore.t, ~loadLayer) =
~getKey=entity => entity.id,
~logger,
~loadLayer,
~isInReorgThreshold,
),
{{/each}}
}
Expand All @@ -181,8 +207,14 @@ let getLoaderArgs = (contextEnv, ~inMemoryStore, ~loadLayer) => {
context: contextEnv->getLoaderContext(~inMemoryStore, ~loadLayer),
}

let getHandlerArgs = (contextEnv, ~inMemoryStore, ~loaderReturn, ~loadLayer) => {
let getHandlerArgs = (
contextEnv,
~inMemoryStore,
~loaderReturn,
~loadLayer,
~isInReorgThreshold,
) => {
Types.HandlerTypes.event: contextEnv.eventBatchQueueItem.event,
context: contextEnv->getHandlerContext(~inMemoryStore, ~loadLayer),
context: contextEnv->getHandlerContext(~inMemoryStore, ~loadLayer, ~isInReorgThreshold),
loaderReturn,
}
15 changes: 8 additions & 7 deletions codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ let executeDbFunctionsEntity = (
promises->Promise.all->Promise.thenResolve(_ => ())
}

let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t) => {
let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold) => {
let entityDbExecutionComposer =
RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg
RegisterHandlers.getConfig()->Config.shouldSaveHistory(~isInReorgThreshold)
? executeSetEntityWithHistory
: executeDbFunctionsEntity

Expand Down Expand Up @@ -223,7 +223,8 @@ module RollBack = {

let inMemStore = InMemoryStore.makeWithRollBackEventIdentifier(Some(rollBackEventIdentifier))

let shouldRollbackOnReorg = RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg
//Don't save the rollback diffs to history table
let shouldSaveHistory = false

reorgData->Belt.Array.forEach(e => {
switch e {
Expand All @@ -232,17 +233,17 @@ module RollBack = {
{{#each entities as | entity |}}
| {previousEntity: Some({entity: {{entity.name.capitalized}}(entity), eventIdentifier}), entityId} =>
inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set(
Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId, ~shouldSaveHistory=false),
~shouldRollbackOnReorg,
Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId, ~shouldSaveHistory),
~shouldSaveHistory,
)
{{/each}}
//Where previousEntity is None,
//delete it with the eventIdentifier of the rollback event
{{#each entities as | entity |}}
| {previousEntity: None, entityType: {{entity.name.capitalized}}, entityId} =>
inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set(
Delete->Types.mkEntityUpdate(~eventIdentifier=rollBackEventIdentifier, ~entityId, ~shouldSaveHistory=false),
~shouldRollbackOnReorg,
Delete->Types.mkEntityUpdate(~eventIdentifier=rollBackEventIdentifier, ~entityId, ~shouldSaveHistory),
~shouldSaveHistory,
)
{{/each}}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ module EventFunctions = {
~logger,
~latestProcessedBlocks,
~config,
~isInReorgThreshold=false,
) {
| Ok(_) => ()
| Error(e) => e->ErrorHandling.logAndRaise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ let makeStoreOperatorEntity = (
~entityId,
~eventIdentifier={chainId: -1, blockNumber: -1, blockTimestamp: 0, logIndex: -1},
),
~shouldRollbackOnReorg=RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg,
~shouldSaveHistory=false,
)
updateEntityIndicesMockDb(~mockDbTable=table, ~entity, ~entityId)
Expand Down
7 changes: 7 additions & 0 deletions codegenerator/cli/templates/static/codegen/src/Config.res
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ let shouldRollbackOnReorg = config =>
| _ => false
}

let shouldSaveHistory = (config, ~isInReorgThreshold) =>
switch config.historyConfig {
| {rollbackFlag: RollbackOnReorg} if isInReorgThreshold => true
| {historyFlag: FullHistory} => true
| _ => false
}

let shouldPruneHistory = config =>
switch config.historyConfig {
| {historyFlag: MinHistory} => true
Expand Down
24 changes: 21 additions & 3 deletions codegenerator/cli/templates/static/codegen/src/EventProcessing.res
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ let runEventHandler = (
~latestProcessedBlocks,
~loadLayer,
~config: Config.t,
~isInReorgThreshold,
) => {
open ErrorHandling.ResultPropogateEnv
runAsyncEnv(async () => {
Expand All @@ -230,7 +231,12 @@ let runEventHandler = (
(await runEventLoader(~contextEnv, ~loader, ~inMemoryStore, ~loadLayer))->propogate

switch await handler(
contextEnv->ContextEnv.getHandlerArgs(~loaderReturn, ~inMemoryStore, ~loadLayer),
contextEnv->ContextEnv.getHandlerArgs(
~loaderReturn,
~inMemoryStore,
~loadLayer,
~isInReorgThreshold,
),
) {
| exception exn =>
exn
Expand Down Expand Up @@ -262,6 +268,7 @@ let runHandler = (
~logger,
~loadLayer,
~config,
~isInReorgThreshold,
) => {
switch eventBatchQueueItem.handlerRegister->Types.HandlerTypes.Register.getLoaderHandler {
| Some(loaderHandler) =>
Expand All @@ -272,6 +279,7 @@ let runHandler = (
~logger,
~loadLayer,
~config,
~isInReorgThreshold,
)
| None => Ok(latestProcessedBlocks)->Promise.resolve
}
Expand Down Expand Up @@ -385,6 +393,7 @@ let runHandlers = (
~logger,
~loadLayer,
~config,
~isInReorgThreshold,
) => {
open ErrorHandling.ResultPropogateEnv
let latestProcessedBlocks = ref(latestProcessedBlocks)
Expand All @@ -401,6 +410,7 @@ let runHandlers = (
~latestProcessedBlocks=latestProcessedBlocks.contents,
~loadLayer,
~config,
~isInReorgThreshold,
)
)->propogate
}
Expand Down Expand Up @@ -436,6 +446,7 @@ type batchProcessed = {
let processEventBatch = (
~eventBatch: array<Types.eventBatchQueueItem>,
~inMemoryStore: InMemoryStore.t,
~isInReorgThreshold,
~latestProcessedBlocks: EventsProcessed.t,
~checkContractIsRegistered,
~loadLayer,
Expand Down Expand Up @@ -474,12 +485,19 @@ let processEventBatch = (

let latestProcessedBlocks =
(await eventsBeforeDynamicRegistrations
->runHandlers(~inMemoryStore, ~latestProcessedBlocks, ~logger, ~loadLayer, ~config))
->runHandlers(
~inMemoryStore,
~latestProcessedBlocks,
~logger,
~loadLayer,
~config,
~isInReorgThreshold,
))
->propogate

let elapsedTimeAfterProcess = timeRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis

switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore) {
switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore, ~isInReorgThreshold) {
| exception exn =>
exn->ErrorHandling.make(~msg="Failed writing batch to database", ~logger)->Error->propogate
| () => ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ module Entity = {
let set = (
inMemTable: t<'entity>,
entityUpdate: Types.entityUpdate<'entity>,
~shouldRollbackOnReorg,
~shouldSaveHistory,
) => {
let {entityRow, entityIndices} = switch inMemTable.table->get(entityUpdate.entityId) {
| Some({entityRow: InitialReadFromDb(entity_read), entityIndices}) =>
Expand All @@ -152,7 +152,7 @@ module Entity = {
})
{entityRow, entityIndices}
| Some({entityRow: Updated(previous_values), entityIndices})
if !shouldRollbackOnReorg ||
if !shouldSaveHistory ||
//Rollback initial state cases should not save history
!previous_values.latest.shouldSaveHistory ||
// This prevents two db actions in the same event on the same entity from being recorded to the history table.
Expand Down
14 changes: 14 additions & 0 deletions codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ type chainId = int
type eventId = string
type blockNumberRow = {@as("block_number") blockNumber: int}

module General = {
type existsRes = {exists: bool}

let hasRows = async (sql, ~table: Table.table) => {
let query = `SELECT EXISTS(SELECT 1 FROM public.${table.tableName});`
switch await sql->Postgres.unsafe(query) {
| [{exists}] => exists
| _ => Js.Exn.raiseError("Unexpected result from hasRows query: " ++ query)
}
}
}

module ChainMetadata = {
type chainMetadata = {
@as("chain_id") chainId: int,
Expand Down Expand Up @@ -370,4 +382,6 @@ module EntityHistory = {
})
})
}

let hasRows = () => General.hasRows(sql, ~table=TablesStatic.EntityHistory.table)
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,7 @@ let hasProcessedToEndblock = (self: t) => {
}

let hasNoMoreEventsToProcess = (self: t, ~hasArbQueueEvents) => {
!hasArbQueueEvents &&
self.fetchState->PartitionedFetchState.queueSize === 0
!hasArbQueueEvents && self.fetchState->PartitionedFetchState.queueSize === 0
}

/**
Expand Down
Loading

0 comments on commit 8e0f015

Please sign in to comment.