diff --git a/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js new file mode 100644 index 0000000000..efdec5962f --- /dev/null +++ b/packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js @@ -0,0 +1,55 @@ +const UnavailableGrpcError = require('@dashevo/grpc-common/lib/server/error/UnavailableGrpcError'); +const ResourceExhaustedGrpcError = require('@dashevo/grpc-common/lib/server/error/ResourceExhaustedGrpcError'); +const RPCError = require('../../rpcServer/RPCError'); + +/** + * @param {jaysonClient} rpcClient + * @return {requestTenderRpc} A function to make RPC requests to Tenderdash. + */ +function requestTenderRpcFactory(rpcClient) { + /** + * @function + * @typedef requestTenderRpc + * @param {string} uri + * @param {Object} [params={}] + * @return {Promise} + */ + async function requestTenderRpc(uri, params = {}) { + let response; + try { + response = await rpcClient.request(uri, params); + } catch (e) { + if (e.code === 'ECONNRESET' || e.message === 'socket hang up') { + throw new UnavailableGrpcError('Tenderdash is not available'); + } + + throw new RPCError( + e.code || -32602, + `Failed to request ${uri}: ${e.message}`, + e, + ); + } + + const { result, error: jsonRpcError } = response; + + if (jsonRpcError) { + if (typeof jsonRpcError.data === 'string') { + if (jsonRpcError.data.includes('too_many_resets')) { + throw new ResourceExhaustedGrpcError('tenderdash is not responding: too many requests'); + } + } + + throw new RPCError( + jsonRpcError.code || -32602, + jsonRpcError.message || 'Internal error', + jsonRpcError.data, + ); + } + + return result; + } + + return requestTenderRpc; +} + +module.exports = requestTenderRpcFactory; diff --git a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js index ad77276042..a17d496918 100644 --- a/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js +++ b/packages/dapi/lib/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.js @@ -2,9 +2,10 @@ const { server: { error: { InvalidArgumentGrpcError, - AlreadyExistsGrpcError, ResourceExhaustedGrpcError, UnavailableGrpcError, + AlreadyExistsGrpcError, + InternalGrpcError, }, }, } = require('@dashevo/grpc-common'); @@ -14,15 +15,23 @@ const { BroadcastStateTransitionResponse, }, } = require('@dashevo/dapi-grpc'); + +const crypto = require('crypto'); + const logger = require('../../../logger'); /** * @param {jaysonClient} rpcClient * @param {createGrpcErrorFromDriveResponse} createGrpcErrorFromDriveResponse + * @param {requestTenderRpc} requestTenderRpc * * @returns {broadcastStateTransitionHandler} */ -function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDriveResponse) { +function broadcastStateTransitionHandlerFactory( + rpcClient, + createGrpcErrorFromDriveResponse, + requestTenderRpc, +) { /** * @typedef broadcastStateTransitionHandler * @@ -38,7 +47,9 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr throw new InvalidArgumentGrpcError('State Transition is not specified'); } - const tx = Buffer.from(stByteArray) + const stBytes = Buffer.from(stByteArray); + + const tx = stBytes .toString('base64'); let response; @@ -46,7 +57,7 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr try { response = await rpcClient.request('broadcast_tx', { tx }); } catch (e) { - if (e.message === 'socket hang up') { + if (e.code === 'ECONNRESET' || e.message === 'socket hang up') { throw new UnavailableGrpcError('Tenderdash is not available'); } @@ -55,15 +66,65 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr throw e; } - const { - result, - error: jsonRpcError, - } = response; + const { result, error: jsonRpcError } = response; if (jsonRpcError) { if (typeof jsonRpcError.data === 'string') { if (jsonRpcError.data === 'tx already exists in cache') { - throw new AlreadyExistsGrpcError('state transition already in chain'); + // We need to figure out and report to user why the ST cached + const stHash = crypto.createHash('sha256') + .update(stBytes) + .digest(); + + // TODO: Apply search filter to fetch specific state transition + // Throw an already exist in mempool error if the ST in mempool + const unconfirmedTxsResponse = await requestTenderRpc('unconfirmed_txs', { limit: 100 }); + + if (unconfirmedTxsResponse?.txs?.includes(stBytes.toString('base64'))) { + throw new AlreadyExistsGrpcError('state transition already in mempool'); + } + + // Throw an already exist in chain error if the ST is committed + let txResponse; + try { + txResponse = await requestTenderRpc('tx', { hash: stHash.toString('base64') }); + } catch (e) { + if (typeof e.data !== 'string' || !e.data.includes('not found')) { + throw e; + } + } + + if (txResponse?.tx_result) { + throw new AlreadyExistsGrpcError('state transition already in chain'); + } + + // If the ST not in mempool and not in the state but still in the cache + // it means it was invalidated by CheckTx so we run CheckTx again to provide + // the validation error + const checkTxResponse = await requestTenderRpc('check_tx', { tx }); + + if (checkTxResponse?.code !== 0) { + // Return validation error + throw await createGrpcErrorFromDriveResponse( + checkTxResponse.code, + checkTxResponse.info, + ); + } else { + // CheckTx passes for the ST, it means we have a bug in Drive so ST is passing check + // tx and then removed from the block. The removal from the block doesn't remove ST + // from the cache because it's happening only one proposer and other nodes do not know + // that this ST was processed and keep it in the cache + // The best what we can do is to return an internal error and and log the transaction + logger.warn({ + tx, + }, `State transition ${stHash.toString('hex')} is passing CheckTx but removed from the block by proposal`); + + const error = new Error('State Transition processing error. Please report' + + ' faulty state transition and try to create a new state transition with different' + + ' hash as a workaround.'); + + throw new InternalGrpcError(error); + } } if (jsonRpcError.data.startsWith('Tx too large.')) { diff --git a/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js b/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js index 641071d357..8288583ab7 100644 --- a/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js +++ b/packages/dapi/lib/grpcServer/handlers/platform/platformHandlersFactory.js @@ -55,6 +55,7 @@ const waitForTransactionToBeProvableFactory = require('../../../externalApis/ten const waitForTransactionResult = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/waitForTransactionResult'); const getExistingTransactionResultFactory = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/getExistingTransactionResult'); const getConsensusParamsFactory = require('../../../externalApis/tenderdash/getConsensusParamsFactory'); +const requestTenderRpcFactory = require('../../../externalApis/tenderdash/requestTenderRpc'); /** * @param {jaysonClient} rpcClient @@ -73,10 +74,13 @@ function platformHandlersFactory( ) { const wrapInErrorHandler = wrapInErrorHandlerFactory(logger, isProductionEnvironment); + const requestTenderRpc = requestTenderRpcFactory(rpcClient); + // broadcastStateTransition const broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory( rpcClient, createGrpcErrorFromDriveResponse, + requestTenderRpc, ); const wrappedBroadcastStateTransition = jsonToProtobufHandlerWrapper( diff --git a/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js b/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js index 3dc61b2eb4..de1152d015 100644 --- a/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js +++ b/packages/dapi/test/unit/grpcServer/handlers/platform/broadcastStateTransitionHandlerFactory.spec.js @@ -5,6 +5,7 @@ const { AlreadyExistsGrpcError, UnavailableGrpcError, ResourceExhaustedGrpcError, + InternalGrpcError, }, }, } = require('@dashevo/grpc-common'); @@ -36,6 +37,7 @@ describe('broadcastStateTransitionHandlerFactory', () => { let log; let code; let createGrpcErrorFromDriveResponseMock; + let requestTenderRpcMock; before(async () => { await loadWasmDpp(); @@ -82,11 +84,14 @@ describe('broadcastStateTransitionHandlerFactory', () => { request: this.sinon.stub().resolves(response), }; + requestTenderRpcMock = this.sinon.stub(); + createGrpcErrorFromDriveResponseMock = this.sinon.stub(); broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory( rpcClientMock, createGrpcErrorFromDriveResponseMock, + requestTenderRpcMock, ); }); @@ -182,13 +187,38 @@ describe('broadcastStateTransitionHandlerFactory', () => { } }); - it('should throw AlreadyExistsGrpcError if transaction was broadcasted twice', async () => { + it('should throw AlreadyExistsGrpcError if transaction in mempool', async () => { + response.error = { + code: -32603, + message: 'Internal error', + data: 'tx already exists in cache', + }; + + requestTenderRpcMock.withArgs('unconfirmed_txs').resolves({ + txs: [stateTransitionFixture.toBuffer().toString('base64')], + }); + + try { + await broadcastStateTransitionHandler(call); + + expect.fail('should throw AlreadyExistsGrpcError'); + } catch (e) { + expect(e).to.be.an.instanceOf(AlreadyExistsGrpcError); + expect(e.getMessage()).to.equal('state transition already in mempool'); + } + }); + + it('should throw AlreadyExistsGrpcError if transaction in chain', async () => { response.error = { code: -32603, message: 'Internal error', data: 'tx already exists in cache', }; + requestTenderRpcMock.withArgs('tx').resolves({ + tx_result: { }, + }); + try { await broadcastStateTransitionHandler(call); @@ -199,6 +229,52 @@ describe('broadcastStateTransitionHandlerFactory', () => { } }); + it('should throw consensus result for invalid transition in cache', async () => { + response.error = { + code: -32603, + message: 'Internal error', + data: 'tx already exists in cache', + }; + + requestTenderRpcMock.withArgs('check_tx').resolves({ + code: 1, + info: 'some info', + }); + + const error = new Error('some error'); + + createGrpcErrorFromDriveResponseMock.resolves(error); + + try { + await broadcastStateTransitionHandler(call); + + expect.fail('should throw consensus error'); + } catch (e) { + expect(e).to.equal(error); + } + }); + + it('should throw internal error for transition in cache that passing check tx', async () => { + response.error = { + code: -32603, + message: 'Internal error', + data: 'tx already exists in cache', + }; + + requestTenderRpcMock.withArgs('check_tx').resolves({ + code: 0, + }); + + try { + await broadcastStateTransitionHandler(call); + + expect.fail('should throw InternalError'); + } catch (e) { + expect(e).to.be.an.instanceOf(InternalGrpcError); + expect(e.getMessage()).to.equal('Internal error'); + } + }); + it('should throw a gRPC error based on drive\'s response', async () => { const message = 'not found'; const metadata = {