Skip to content

Commit

Permalink
feat: recon resubscribes to interests on startup (#3262)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 authored Jul 18, 2024
1 parent 59acfaa commit 0682179
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 4 deletions.
1 change: 1 addition & 0 deletions packages/core/src/__tests__/create-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export async function createDispatcher(ipfs: IpfsApi, pubsubTopic: string): Prom
} as unknown as AnchorRequestStore
const index = {
init: () => Promise.resolve(),
indexedModels: () => [],
} as unknown as IndexApi
repository.setDeps({
pinStore,
Expand Down
17 changes: 17 additions & 0 deletions packages/core/src/__tests__/recon.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { jest } from '@jest/globals'
import { CARFactory, type CAR } from 'cartonne'
import { toArray, take, lastValueFrom, firstValueFrom, race, timer } from 'rxjs'
import { CommonTestUtils } from '@ceramicnetwork/common-test-utils'
import { BaseTestUtils as TestUtils } from '@ceramicnetwork/base-test-utils'

const RECON_URL = 'http://example.com'
const LOGGER = new LoggerProvider().getDiagnosticsLogger()
Expand Down Expand Up @@ -80,6 +81,22 @@ describe('ReconApi', () => {
await firstValueFrom(race(reconApi, timer(1000)))
expect(mockSendRequest).toHaveBeenCalledTimes(1)
})

test('should register interests on init', async () => {
const mockSendRequest = jest.fn(() => Promise.resolve())
const reconApi = new ReconApi(
{ enabled: true, url: RECON_URL, feedEnabled: false },
LOGGER,
mockSendRequest
)
const fakeInterest0 = TestUtils.randomStreamID()
const fakeInterest1 = TestUtils.randomStreamID()
await reconApi.init('testInitialCursor', [fakeInterest0, fakeInterest1])
expect(mockSendRequest).toHaveBeenCalledTimes(3)
expect(mockSendRequest.mock.calls[1][0]).toContain(fakeInterest0.toString())
expect(mockSendRequest.mock.calls[2][0]).toContain(fakeInterest1.toString())
reconApi.stop()
})
})

describe('registerInterest', () => {
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/recon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export interface ReconEventFeedResponse {
* Recon API Interface
*/
export interface IReconApi extends Observable<ReconEventFeedResponse> {
init(initialCursor?: string): Promise<void>
init(initialCursor?: string, initialInterests?: Array<StreamID>): Promise<void>
registerInterest(model: StreamID): Promise<void>
put(car: CAR, opts?: AbortOptions): Promise<void>
enabled: boolean
Expand Down Expand Up @@ -96,7 +96,7 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
* @param initialCursor
* @returns
*/
async init(initialCursor = ''): Promise<void> {
async init(initialCursor = '', initialInterests: Array<StreamID> = []): Promise<void> {
if (this.#initialized) {
return
}
Expand All @@ -110,6 +110,10 @@ export class ReconApi extends Observable<ReconEventFeedResponse> implements IRec
this.#url = await this.#config.url
await this.registerInterest(Model.MODEL)

for (const interest of initialInterests) {
await this.registerInterest(interest)
}

if (this.#config.feedEnabled) {
this.#eventsSubscription = this.createSubscription(initialCursor).subscribe(this.#feed$)
}
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ export class Repository {
const cursor = (await reconStore.exists(RECON_STORE_CURSOR_KEY))
? await reconStore.get(RECON_STORE_CURSOR_KEY)
: '0'
await this.recon.init(cursor)

const interests = this.index.indexedModels().map((modelData) => modelData.streamID)
await this.recon.init(cursor, interests)
this.reconEventFeedSubscription = this.recon
.pipe(concatMap(this.handleReconEvents.bind(this)))
.subscribe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { CeramicDaemon, DaemonConfig } from '@ceramicnetwork/cli'
import { CeramicClient } from '@ceramicnetwork/http-client'
import { Model, ModelDefinition } from '@ceramicnetwork/stream-model'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import tmp from 'tmp-promise'

const CONTENT0 = { myData: 0 }
const CONTENT1 = { myData: 1 }
Expand Down Expand Up @@ -508,14 +509,24 @@ describe('ModelInstanceDocument API multi-node tests', () => {
let ceramic1: Ceramic
let model: Model
let midMetadata: ModelInstanceDocumentMetadataArgs
let ceramic1StateStore: string

beforeAll(async () => {
ipfs0 = await createIPFS()
ipfs1 = await createIPFS()
await swarmConnect(ipfs0, ipfs1)

ceramic0 = await createCeramic(ipfs0)
ceramic1 = await createCeramic(ipfs1)
ceramic1StateStore = await tmp.tmpName()
ceramic1 = await createCeramic(ipfs1, {
indexing: {
db: `sqlite://${ceramic1StateStore}/ceramic.sqlite`,
allowQueriesBeforeHistoricalSync: false,
disableComposedb: false,
enableHistoricalSync: false,
},
stateStoreDirectory: ceramic1StateStore,
})

model = await Model.create(ceramic0, MODEL_DEFINITION)
midMetadata = { model: model.id }
Expand Down Expand Up @@ -593,4 +604,33 @@ describe('ModelInstanceDocument API multi-node tests', () => {
expect(loaded.state.log.length).toEqual(3)
expect(JSON.stringify(loaded.state)).toEqual(JSON.stringify(doc.state))
})

test('can load doc after restart', async () => {
//restart ceramic1
await ceramic1.close()
ceramic1 = await createCeramic(ipfs1, {
indexing: {
db: `sqlite://${ceramic1StateStore}/ceramic.sqlite`,
allowQueriesBeforeHistoricalSync: false,
disableComposedb: false,
enableHistoricalSync: false,
},
stateStoreDirectory: ceramic1StateStore,
})

const doc = await ModelInstanceDocument.create(ceramic0, CONTENT0, midMetadata)
if (EnvironmentUtils.useRustCeramic())
await TestUtils.waitForEvent(ceramic1.repository.recon, doc.tip)

const loaded = await ModelInstanceDocument.load(ceramic1, doc.id)

const docState = doc.state
const loadedState = loaded.state
expect(docState.anchorStatus).toEqual(AnchorStatus.PENDING)
expect(loadedState.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)
delete docState.anchorStatus
delete loadedState.anchorStatus
expect(loadedState.log.length).toEqual(1)
expect(JSON.stringify(loadedState)).toEqual(JSON.stringify(docState))
})
})

0 comments on commit 0682179

Please sign in to comment.