Skip to content

Commit

Permalink
fix: Clear out AnchorRequestStore for requests that were anchored sev…
Browse files Browse the repository at this point in the history
…eral commits ago (#3055)
  • Loading branch information
stbrody authored Dec 7, 2023
1 parent a79d36d commit 85f8a79
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 5 deletions.
7 changes: 7 additions & 0 deletions packages/common/src/utils/stream-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,4 +385,11 @@ export class StreamUtils {

return true
}

/**
* Returns whether the given StreamState contains the given commit CID in its log
*/
static stateContainsCommit(state: StreamState, commit: CID): boolean {
return state.log.find((logEntry) => logEntry.cid.equals(commit)) != null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ import { jest } from '@jest/globals'
import { createCeramic } from '../../__tests__/create-ceramic.js'
import { InMemoryAnchorService } from '../../anchor/memory/in-memory-anchor-service.js'
import { Observable, Subject } from 'rxjs'
import { AnchorEvent, AnchorStatus, IpfsApi, SyncOptions, TestUtils } from '@ceramicnetwork/common'
import {
AnchorEvent,
AnchorStatus,
GenesisCommit,
IpfsApi,
SyncOptions,
TestUtils,
} from '@ceramicnetwork/common'
import { TileDocument } from '@ceramicnetwork/stream-tile'
import tmp from 'tmp-promise'
import { createIPFS } from '@ceramicnetwork/ipfs-daemon'
Expand All @@ -23,7 +30,7 @@ async function getPendingAnchorStreamIDs(
}

describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => {
jest.setTimeout(10000)
jest.setTimeout(30 * 1000)

let ipfs: IpfsApi
let stateStoreDirectoryName: string
Expand Down Expand Up @@ -155,7 +162,7 @@ describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => {
await newCeramic.close()
})

test('Cleans up entries from store for already anchored tips', async () => {
test('Cleans up entries from store for current anchored tip', async () => {
const ceramic = await createCeramic(ipfs, {
stateStoreDirectory: stateStoreDirectoryName,
})
Expand Down Expand Up @@ -202,4 +209,58 @@ describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => {

await ceramic.close()
})

test('Cleans up entries from store for historical anchored tips', async () => {
const ceramic = await createCeramic(ipfs, {
stateStoreDirectory: stateStoreDirectoryName,
})

const stream = await TileDocument.create(ceramic, { step: 0 })
await TestUtils.anchorUpdate(ceramic, stream)
await stream.update({ step: 1 })
await TestUtils.anchorUpdate(ceramic, stream)
await stream.update({ step: 2 })
await TestUtils.anchorUpdate(ceramic, stream)
expect(stream.state.anchorStatus).toEqual(AnchorStatus.ANCHORED)
expect(stream.state.log).toHaveLength(6)

// Wait for AnchorRequestStore to be cleared out
await TestUtils.waitForConditionOrTimeout(async () => {
const remaining = await getPendingAnchorStreamIDs(ceramic.repository.anchorRequestStore)
return remaining.length == 0
})

// Now make it seem like there was a lingering entry in the AnchorRequestStore for a commit in
// the middle of the log
await ceramic.repository.anchorRequestStore.save(stream.id, {
cid: stream.state.log[2].cid,
timestamp: Date.now(),
genesis: {} as GenesisCommit,
})

await expect(
getPendingAnchorStreamIDs(ceramic.repository.anchorRequestStore)
).resolves.toHaveLength(1)

// Resume polling for the entry we just added to the AnchorResumingService
const anchorResumingService = new AnchorResumingService(
ceramic.loggerProvider.getDiagnosticsLogger()
)
// Clear out the cache to ensure Ceramic needs to go to the StateStore to load the stream,
// which is what triggers the resume logic for pending anchors.
ceramic.repository.inmemory.delete(stream.id.toString())
await anchorResumingService.resumeRunningStatesFromAnchorRequestStore(ceramic.repository)

// The node should detect that the entry is already anchored and clean it up.
await TestUtils.waitForConditionOrTimeout(async () => {
const remaining = await getPendingAnchorStreamIDs(ceramic.repository.anchorRequestStore)
return remaining.length == 0
})

await expect(
getPendingAnchorStreamIDs(ceramic.repository.anchorRequestStore)
).resolves.toHaveLength(0)

await ceramic.close()
})
})
39 changes: 37 additions & 2 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ export class Repository {
this._registerRunningState(runningState)
const storedRequest = await this.anchorRequestStore.load(streamId)
if (storedRequest !== null && this.anchorService) {
this._confirmAnchorResponse(runningState, storedRequest.cid)
await this._confirmAnchorResponse(runningState, storedRequest.cid)
}
return runningState
} else {
Expand Down Expand Up @@ -568,10 +568,45 @@ export class Repository {
return this._processAnchorResponse(state$, anchorStatus$)
}

/**
* If, when checking an entry in the AnchorRequestStore, we notice that the commit CID from the
* stored request has in fact already been anchored, we need to clean up the entry from the store
* so it doesn't stick around forever. We need to do that from within the ExecutionQueue, however,
* or we risk deleting a valid entry for a newer request on the same Stream from the store.
*/
private async _cleanUpStaleAnchorRequestStore(state$: RunningState, commit: CID): Promise<void> {
return this.executionQ.forStream(state$.id).run(async () => {
const request = await this.anchorRequestStore.load(state$.id)
if (!request.cid.equals(commit)) {
// We've already moved on to a newer request, don't accidentally delete the new request
return
}

// Even if we may have already checked this condition before calling into this function,
// we need to check it again under the execution queue as the state may have changed in the
// meantime otherwise.
if (
state$.state.anchorStatus == AnchorStatus.ANCHORED &&
StreamUtils.stateContainsCommit(state$.state, commit)
) {
// The commit was already anchored. Clean up the AnchorRequestStore entry for this CID.
await this.anchorRequestStore.remove(state$.id)
}
})
}

/**
* Restart polling and handle response for a previously submitted anchor request
*/
private _confirmAnchorResponse(state$: RunningState, cid: CID): Subscription {
private async _confirmAnchorResponse(state$: RunningState, cid: CID): Promise<Subscription> {
if (
state$.state.anchorStatus == AnchorStatus.ANCHORED &&
StreamUtils.stateContainsCommit(state$.state, cid)
) {
// The commit was already anchored. Clean up the AnchorRequestStore entry for this CID.
await this._cleanUpStaleAnchorRequestStore(state$, cid)
}

const anchorStatus$ = this.anchorService.pollForAnchorResponse(state$.id, cid)
return this._processAnchorResponse(state$, anchorStatus$)
}
Expand Down

1 comment on commit 85f8a79

@ukstv
Copy link
Contributor

@ukstv ukstv commented on 85f8a79 Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe, this case is handled in https://github.com/ceramicnetwork/js-ceramic/pull/2971/files#diff-b5b74d53a433eb9a51e83a35f62b1c7d4ed22f3134f5086b7c4cadda66f91a4eR704 Look for "Anchor COMPLETED for non tip should remove the request from the store" if GH interface behaves badly.

Please sign in to comment.