diff --git a/packages/block-brokers/src/trustless-gateway/session.ts b/packages/block-brokers/src/trustless-gateway/session.ts index 9d26d7435..933578acf 100644 --- a/packages/block-brokers/src/trustless-gateway/session.ts +++ b/packages/block-brokers/src/trustless-gateway/session.ts @@ -42,6 +42,7 @@ class TrustlessGatewaySession extends AbstractSession { + this.log('findNewProviders called') yield * findHttpGatewayProviders(cid, this.routing, this.logger, this.allowInsecure, this.allowLocal, options) } diff --git a/packages/block-brokers/test/trustless-gateway-sessions.spec.ts b/packages/block-brokers/test/trustless-gateway-sessions.spec.ts index f2216ceb0..b0220368d 100644 --- a/packages/block-brokers/test/trustless-gateway-sessions.spec.ts +++ b/packages/block-brokers/test/trustless-gateway-sessions.spec.ts @@ -99,13 +99,43 @@ describe('trustless-gateway sessions', () => { ] } - components.routing.findProviders.returns(async function * () { - yield prov + components.routing.findProviders.callsFake(async function * () { yield prov - yield prov - }()) + }) await expect(session.retrieve(cid)).to.eventually.deep.equal(block) expect(queryProviderSpy.callCount).to.equal(1) }) + + it('should ignore duplicate providers when unable to retrieve a block', async () => { + const session = createTrustlessGatewaySession(components, { + allowInsecure: true, + allowLocal: true + }) + + // changed the CID to end in `aa` instead of `aq` + const cid = CID.parse('bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aa') + + const queryProviderSpy = Sinon.spy(session, 'queryProvider') + const findNewProvidersSpy = Sinon.spy(session, 'findNewProviders') + const hasProviderSpy = Sinon.spy(session, 'hasProvider') + + const prov = { + id: await createEd25519PeerId(), + multiaddrs: [ + uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '') + ] + } + + components.routing.findProviders.callsFake(async function * () { + yield prov + }) + + await expect(session.retrieve(cid)).to.eventually.be.rejected() + expect(hasProviderSpy.callCount).to.be.greaterThanOrEqual(2) + expect(hasProviderSpy.getCall(0).returnValue).to.be.false() + expect(hasProviderSpy.getCall(1).returnValue).to.be.true() + expect(findNewProvidersSpy.callCount).to.be.greaterThanOrEqual(2) + expect(queryProviderSpy.callCount).to.equal(1) + }) }) diff --git a/packages/utils/src/abstract-session.ts b/packages/utils/src/abstract-session.ts index 36a215caf..f8c6e961a 100644 --- a/packages/utils/src/abstract-session.ts +++ b/packages/utils/src/abstract-session.ts @@ -32,13 +32,9 @@ export abstract class AbstractSession + findProviderQueue: Queue + queryProviderQueue: Queue constructor (components: AbstractSessionComponents, init: AbstractCreateSessionOptions) { super() @@ -52,7 +48,13 @@ export abstract class AbstractSession = {}): Promise { @@ -64,10 +66,18 @@ export abstract class AbstractSession = pDefer() this.requests.set(cidStr, deferred.promise) + const peerAddedToSessionListener = (event: CustomEvent): void => { + this.log('peer added to session...') + this.addQueryProviderJob(cid, event.detail, options) + } + + // add new session peers to query as they are discovered + this.addEventListener('provider', peerAddedToSessionListener) + if (this.providers.length === 0) { let first = false @@ -82,114 +92,133 @@ export abstract class AbstractSession { + this.log('querying existing provider %o', this.toEvictionKey(provider)) + return this.addQueryProviderJob(cid, provider, options) + })) } - let foundBlock = false + let findProvidersErrored = false + this.findProviderQueue.addEventListener('failure', (evt) => { + this.log.error('error finding new providers for %c', cid, evt.detail.error) - // this queue manages outgoing requests - as new peers are added to the - // session they will be added to the queue so we can request the current - // block from multiple peers as they are discovered - const queue = new Queue({ - concurrency: this.maxProviders - }) - queue.addEventListener('error', () => {}) - queue.addEventListener('failure', (evt) => { - this.log.error('error querying provider %o, evicting from session', evt.detail.job.options.provider, evt.detail.error) - this.evict(evt.detail.job.options.provider) - }) - queue.addEventListener('success', (evt) => { - // peer has sent block, return it to the caller - foundBlock = true - deferred.resolve(evt.detail.result) - }) - queue.addEventListener('idle', () => { - if (foundBlock || options.signal?.aborted === true || this.noNewProviders) { - // we either found the block, the user gave up, or cannot find any more providers - this.log('session aborted') + findProvidersErrored = true + if (['ERR_INSUFFICIENT_PROVIDERS_FOUND'].includes((evt.detail.error as CodeError).code)) { + deferred.reject(evt.detail.error) return } + }) - // find more session peers and retry - Promise.resolve() - .then(async () => { - this.log('no session peers had block for for %c, finding new providers', cid) - - // evict this.minProviders random providers to make room for more - for (let i = 0; i < this.minProviders; i++) { - if (this.providers.length === 0) { - break - } - - const provider = this.providers[Math.floor(Math.random() * this.providers.length)] - this.evict(provider) - } + this.findProviderQueue.addEventListener('idle', () => { + this.log.trace('findProviderQueue idle') + if (options.signal?.aborted === true && !foundBlock) { + deferred.reject(new CodeError(options.signal.reason, 'ABORT_ERR')) + return + } - // find new providers for the CID - await this.findProviders(cid, this.minProviders, options) - - // keep trying until the abort signal fires - this.log('found new providers re-retrieving %c', cid) - this.requests.delete(cidStr) - deferred.resolve(await this.retrieve(cid, options)) - }) - .catch(err => { - this.log.error('could not find new providers for %c', cid, err) - deferred.reject(err) - }) + if (foundBlock || findProvidersErrored || options.signal?.aborted === true) { + return + } + // continuously find new providers while we haven't found the block and signal is not aborted + this.addFindProviderJob(cid, options) }) - const peerAddedToSessionListener = (event: CustomEvent): void => { - queue.add(async () => { - return this.queryProvider(cid, event.detail, options) - }, { - provider: event.detail - }) - .catch(err => { - if (options.signal?.aborted === true) { - // skip logging error if signal was aborted because abort can happen - // on success (e.g. another session found the block) - return - } - - this.log.error('error retrieving session block for %c', cid, err) - }) - } - // add new session peers to query as they are discovered - this.addEventListener('provider', peerAddedToSessionListener) + this.queryProviderQueue.addEventListener('failure', (evt) => { + this.log.error('error querying provider %o, evicting from session', evt.detail.job.options.provider, evt.detail.error) + this.evict(evt.detail.job.options.provider) + }) - // query each session peer directly - Promise.all([...this.providers].map(async (provider) => { - return queue.add(async () => { - return this.queryProvider(cid, provider, options) - }, { - provider - }) - })) - .catch(err => { - if (options.signal?.aborted === true) { - // skip logging error if signal was aborted because abort can happen - // on success (e.g. another session found the block) - return - } + this.queryProviderQueue.addEventListener('success', (event) => { + this.log.trace('queryProviderQueue success') + foundBlock = true + // this.findProviderQueue.clear() + deferred.resolve(event.detail.result) + }) - this.log.error('error retrieving session block for %c', cid, err) - }) + this.queryProviderQueue.addEventListener('idle', () => { + this.log.trace('queryProviderQueue is idle') + if (foundBlock) { + return + } + if (options.signal?.aborted === true) { + // if the signal was aborted, we should reject the request + deferred.reject(options.signal.reason) + return + } + // we're done querying found providers.. if we can't find new providers, we should reject + if (findProvidersErrored) { + deferred.reject(new CodeError('Done querying all found providers and unable to retrieve the block', 'ERR_NO_PROVIDERS_HAD_BLOCK')) + return + } + // otherwise, we're still waiting for more providers to query + this.log('waiting for more providers to query') + // if this.findProviders is not running, start it + if (this.findProviderQueue.running === 0) { + this.addFindProviderJob(cid, options) + } + }) try { + // this.intialPeerSearchComplete = this.findProviders(cid, this.minProviders, options) return await deferred.promise } finally { + this.log('finally block, cleaning up session') this.removeEventListener('provider', peerAddedToSessionListener) - queue.clear() + this.findProviderQueue.clear() + this.queryProviderQueue.clear() this.requests.delete(cidStr) } } + addFindProviderJob(cid: CID, options: AbortOptions): any { + return this.findProviderQueue.add(async () => { + await this.findProviders(cid, this.minProviders, options) + }, { signal: options.signal }) + .catch(err => { + if (options.signal?.aborted === true) { + // skip logging error if signal was aborted because abort can happen + // on success (e.g. another session found the block) + return + } + }) + } + + addQueryProviderJob(cid: CID, provider: Provider, options: AbortOptions): any { + return this.queryProviderQueue.add(async () => { + return this.queryProvider(cid, provider, options) + }, { + provider, + signal: options.signal + }).catch(err => { + if (options.signal?.aborted === true) { + // skip logging error if signal was aborted because abort can happen + // on success (e.g. another session found the block) + return + } + }) + } + evict (provider: Provider): void { + this.log('evicting provider %o', provider) this.evictionFilter.add(this.toEvictionKey(provider)) + this.evictionFilter2.add(this.toEvictionKey(provider).toString()) + this.log('provider added to evictionFilter') const index = this.providers.findIndex(prov => this.equals(prov, provider)) + this.log('index of provider in this.providers: %d', index) if (index === -1) { + this.log('tried to evict provider, but it was not in this.providers') return } @@ -202,19 +231,36 @@ export abstract class AbstractSession this.equals(prov, provider)) != null) { + if (this.providers.some(prov => this.equals(prov, provider))) { + this.log('this.providers already has provider') return true + } else { + this.log('this.providers does not have provider') } // dedupe failed session peers if (this.isEvicted(provider)) { + this.log('provider was previously evicted') + return true + } else { + this.log('provider was not previously evicted') + } + if (this.evictionFilter2.has(this.toEvictionKey(provider).toString())) { + this.log('provider was *actually* previously evicted') return true } return false } + /** + * @param cid - The CID of the block to find providers for + * @param count - The number of providers to find + * @param options - AbortOptions + * @returns + */ private async findProviders (cid: CID, count: number, options: AbortOptions): Promise { + this.log('findProviders called') const deferred: DeferredPromise = pDefer() let found = 0 @@ -225,23 +271,32 @@ export abstract class AbstractSession { }) it('should not make multiple requests to the only found provider', async function () { - this.timeout(1000) const session: Session | null = new Session() const cid = CID.parse('bafybeifaymukvfkyw6xgh4th7tsctiifr4ea2btoznf46y6b2fnvikdczi') @@ -253,6 +252,7 @@ describe('abstract-session', () => { await expect(session.retrieve(cid)).to.eventually.be.rejected() - expect(session.findNewProviders.callCount).to.equal(4) + expect(session.findNewProviders.callCount).to.be.greaterThanOrEqual(2) + expect(session.queryProvider.callCount).to.equal(1) }) })