Skip to content

Commit

Permalink
feat: Flag to fully disable pubsub and all cross-node syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Oct 11, 2024
1 parent bcc6585 commit 4cd0434
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 30 deletions.
3 changes: 2 additions & 1 deletion packages/cli/src/ceramic-daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ export function makeCeramicConfig(opts: DaemonConfig): CeramicConfig {
syncOverride: SYNC_OPTIONS_MAP[opts.node.syncOverride],
streamCacheLimit: opts.node.streamCacheLimit,
indexing: opts.indexing,
disablePeerDataSync: opts.ipfs.disablePeerDataSync,
disablePeerDataSync:
opts.ipfs.disablePeerDataSync || process.env.CERAMIC_DISABLE_PEER_DATA_SYNC == 'true',
metrics: opts.metrics,
}
if (opts.stateStore?.mode == StateStoreMode.FS) {
Expand Down
11 changes: 5 additions & 6 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export class Dispatcher {
const rustCeramic = EnvironmentUtils.useRustCeramic()
this.enableSync = rustCeramic ? false : enableSync

if (!rustCeramic) {
if (this.enableSync) {
const pubsub = new Pubsub(
_ipfs,
topic,
Expand Down Expand Up @@ -178,10 +178,9 @@ export class Dispatcher {
}

async init() {
if (EnvironmentUtils.useRustCeramic()) {
return
if (this.enableSync) {
this.messageBus.subscribe(this.handleMessage.bind(this))
}
this.messageBus.subscribe(this.handleMessage.bind(this))
}

get shutdownSignal(): ShutdownSignal {
Expand Down Expand Up @@ -498,7 +497,7 @@ export class Dispatcher {
* @param tip - Commit CID
*/
publishTip(streamId: StreamID, tip: CID, model?: StreamID): Subscription {
if (process.env.CERAMIC_DISABLE_PUBSUB_UPDATES == 'true' || EnvironmentUtils.useRustCeramic()) {
if (process.env.CERAMIC_DISABLE_PUBSUB_UPDATES == 'true' || !this.enableSync) {
return empty().subscribe()
}

Expand Down Expand Up @@ -621,7 +620,7 @@ export class Dispatcher {
* Gracefully closes the Dispatcher.
*/
async close(): Promise<void> {
if (!EnvironmentUtils.useRustCeramic()) {
if (this.enableSync) {
this.messageBus.unsubscribe()
}
await this.tasks.onIdle()
Expand Down
16 changes: 15 additions & 1 deletion packages/core/src/initialization/stream-loading.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ import { LogSyncer } from '../stream-loading/log-syncer.js'
import { StateManipulator } from '../stream-loading/state-manipulator.js'
import { AnchorValidator } from '../anchor/anchor-service.js'
import { HandlersMap } from '../handlers-map.js'
import { StreamID } from '@ceramicnetwork/streamid'
import { Observable, empty } from 'rxjs'
import type { CID } from 'multiformats/cid'

const noopPubsubQuerier = {
queryNetwork(streamId: StreamID): Observable<CID> {
return empty()
},
}

export function makeStreamLoaderAndUpdater(
logger: DiagnosticsLogger,
Expand All @@ -17,7 +26,12 @@ export function makeStreamLoaderAndUpdater(
streamHandlers: HandlersMap
): [StreamLoader, StreamUpdater] {
const anchorTimestampExtractor = new AnchorTimestampExtractor(logger, dispatcher, anchorValidator)
const tipFetcher = new TipFetcher(dispatcher.messageBus)
if (!dispatcher.messageBus) {
logger.warn("No pubsub querier detected, won't be able to load tips from the network")
}
const tipFetcher = new TipFetcher(
dispatcher.messageBus ? dispatcher.messageBus : noopPubsubQuerier
)
const logSyncer = new LogSyncer(dispatcher)
const stateManipulator = new StateManipulator(logger, streamHandlers, logSyncer, api)
const streamLoader = new StreamLoader(
Expand Down
32 changes: 10 additions & 22 deletions packages/stream-tests/src/__tests__/ceramic_sync_disabled.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ const makeCeramicCore = async (
return core
}

// should pass on v4 when updated from TileDocument

describeIfV3('Cross node syncing disabled', () => {
jest.setTimeout(20000)

Expand Down Expand Up @@ -110,32 +108,22 @@ describeIfV3('Cross node syncing disabled', () => {
it('Stream created and updated on node with peer data sync disabled still loads via other well connected nodes', async () => {
const content0 = { step: 0 }
const content1 = { step: 1 }
const content2 = { step: 2 }
const doc1 = await TileDocument.create(disconnectedCeramic, content0, null, {
anchor: false,
})
await doc1.update(content1, null, { anchor: false })

const doc2 = await TileDocument.load(connectedCeramic, doc1.id)
expect(doc1.content).toEqual(doc2.content)

// Update should also propagate from node with sync disabled to the other node without issue
await doc1.update(content2, null, { anchor: false })

await TestUtils.waitForState(
doc2,
5000,
(state) => state.log.length == 3,
(state) => {
throw new Error(`Sync failed. State: ${StreamUtils.serializeState(state)}`)
}
)

expect(doc1.content).toEqual(content2)
expect(doc1.state.log.length).toEqual(3)

expect(doc2.content).toEqual(content2)
expect(doc2.state.log.length).toEqual(3)
// The disconnected node won't be listening to pubsub so the connected node will only get the
// genesis commit, not the tip from the update.
expect(doc2.content).toEqual(content0)

// Loading at the specific CommitID of the update will work though because the underlying
// commit blocks are still available via bitswap.
const docAtCommit = await TileDocument.load(connectedCeramic, doc1.commitId)
expect(docAtCommit.content).toEqual(content1)
expect(doc1.content).toEqual(docAtCommit.content)
expect(docAtCommit.state.log.length).toEqual(2)
})

it('Updates made on connected node not visible to node with peer data sync disabled', async () => {
Expand Down

0 comments on commit 4cd0434

Please sign in to comment.