diff --git a/packages/cli/src/ceramic-daemon.ts b/packages/cli/src/ceramic-daemon.ts index a2789a0f93..46d68939db 100644 --- a/packages/cli/src/ceramic-daemon.ts +++ b/packages/cli/src/ceramic-daemon.ts @@ -90,7 +90,8 @@ export function makeCeramicConfig(opts: DaemonConfig): CeramicConfig { syncOverride: SYNC_OPTIONS_MAP[opts.node.syncOverride], streamCacheLimit: opts.node.streamCacheLimit, indexing: opts.indexing, - disablePeerDataSync: opts.ipfs.disablePeerDataSync, + disablePeerDataSync: + opts.ipfs.disablePeerDataSync || process.env.CERAMIC_DISABLE_PEER_DATA_SYNC == 'true', metrics: opts.metrics, } if (opts.stateStore?.mode == StateStoreMode.FS) { diff --git a/packages/core/src/dispatcher.ts b/packages/core/src/dispatcher.ts index c2cef4518d..6c0f9711b3 100644 --- a/packages/core/src/dispatcher.ts +++ b/packages/core/src/dispatcher.ts @@ -140,7 +140,7 @@ export class Dispatcher { const rustCeramic = EnvironmentUtils.useRustCeramic() this.enableSync = rustCeramic ? false : enableSync - if (!rustCeramic) { + if (this.enableSync) { const pubsub = new Pubsub( _ipfs, topic, @@ -178,10 +178,9 @@ export class Dispatcher { } async init() { - if (EnvironmentUtils.useRustCeramic()) { - return + if (this.enableSync) { + this.messageBus.subscribe(this.handleMessage.bind(this)) } - this.messageBus.subscribe(this.handleMessage.bind(this)) } get shutdownSignal(): ShutdownSignal { @@ -498,7 +497,7 @@ export class Dispatcher { * @param tip - Commit CID */ publishTip(streamId: StreamID, tip: CID, model?: StreamID): Subscription { - if (process.env.CERAMIC_DISABLE_PUBSUB_UPDATES == 'true' || EnvironmentUtils.useRustCeramic()) { + if (!this.enableSync) { return empty().subscribe() } @@ -621,7 +620,7 @@ export class Dispatcher { * Gracefully closes the Dispatcher. */ async close(): Promise { - if (!EnvironmentUtils.useRustCeramic()) { + if (this.enableSync) { this.messageBus.unsubscribe() } await this.tasks.onIdle() diff --git a/packages/core/src/initialization/stream-loading.ts b/packages/core/src/initialization/stream-loading.ts index b45f5bf4d3..2bde8a1049 100644 --- a/packages/core/src/initialization/stream-loading.ts +++ b/packages/core/src/initialization/stream-loading.ts @@ -8,6 +8,15 @@ import { LogSyncer } from '../stream-loading/log-syncer.js' import { StateManipulator } from '../stream-loading/state-manipulator.js' import { AnchorValidator } from '../anchor/anchor-service.js' import { HandlersMap } from '../handlers-map.js' +import { StreamID } from '@ceramicnetwork/streamid' +import { Observable, empty } from 'rxjs' +import type { CID } from 'multiformats/cid' + +const noopPubsubQuerier = { + queryNetwork(streamId: StreamID): Observable { + return empty() + }, +} export function makeStreamLoaderAndUpdater( logger: DiagnosticsLogger, @@ -17,7 +26,12 @@ export function makeStreamLoaderAndUpdater( streamHandlers: HandlersMap ): [StreamLoader, StreamUpdater] { const anchorTimestampExtractor = new AnchorTimestampExtractor(logger, dispatcher, anchorValidator) - const tipFetcher = new TipFetcher(dispatcher.messageBus) + if (!dispatcher.messageBus) { + logger.warn("No pubsub querier detected, won't be able to load tips from the network") + } + const tipFetcher = new TipFetcher( + dispatcher.messageBus ? dispatcher.messageBus : noopPubsubQuerier + ) const logSyncer = new LogSyncer(dispatcher) const stateManipulator = new StateManipulator(logger, streamHandlers, logSyncer, api) const streamLoader = new StreamLoader( diff --git a/packages/stream-tests/src/__tests__/ceramic_sync_disabled.test.ts b/packages/stream-tests/src/__tests__/ceramic_sync_disabled.test.ts index 221adae1e1..e99511ed19 100644 --- a/packages/stream-tests/src/__tests__/ceramic_sync_disabled.test.ts +++ b/packages/stream-tests/src/__tests__/ceramic_sync_disabled.test.ts @@ -35,8 +35,6 @@ const makeCeramicCore = async ( return core } -// should pass on v4 when updated from TileDocument - describeIfV3('Cross node syncing disabled', () => { jest.setTimeout(20000) @@ -110,32 +108,22 @@ describeIfV3('Cross node syncing disabled', () => { it('Stream created and updated on node with peer data sync disabled still loads via other well connected nodes', async () => { const content0 = { step: 0 } const content1 = { step: 1 } - const content2 = { step: 2 } const doc1 = await TileDocument.create(disconnectedCeramic, content0, null, { anchor: false, }) await doc1.update(content1, null, { anchor: false }) const doc2 = await TileDocument.load(connectedCeramic, doc1.id) - expect(doc1.content).toEqual(doc2.content) - - // Update should also propagate from node with sync disabled to the other node without issue - await doc1.update(content2, null, { anchor: false }) - - await TestUtils.waitForState( - doc2, - 5000, - (state) => state.log.length == 3, - (state) => { - throw new Error(`Sync failed. State: ${StreamUtils.serializeState(state)}`) - } - ) - - expect(doc1.content).toEqual(content2) - expect(doc1.state.log.length).toEqual(3) - - expect(doc2.content).toEqual(content2) - expect(doc2.state.log.length).toEqual(3) + // The disconnected node won't be listening to pubsub so the connected node will only get the + // genesis commit, not the tip from the update. + expect(doc2.content).toEqual(content0) + + // Loading at the specific CommitID of the update will work though because the underlying + // commit blocks are still available via bitswap. + const docAtCommit = await TileDocument.load(connectedCeramic, doc1.commitId) + expect(docAtCommit.content).toEqual(content1) + expect(doc1.content).toEqual(docAtCommit.content) + expect(docAtCommit.state.log.length).toEqual(2) }) it('Updates made on connected node not visible to node with peer data sync disabled', async () => {