Skip to content

Commit

Permalink
fix(dapi): invalid state transition failed with already in chain error (
Browse files Browse the repository at this point in the history
  • Loading branch information
shumkov authored Oct 24, 2024
1 parent 24de235 commit e0e152f
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 10 deletions.
55 changes: 55 additions & 0 deletions packages/dapi/lib/externalApis/tenderdash/requestTenderRpc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
const UnavailableGrpcError = require('@dashevo/grpc-common/lib/server/error/UnavailableGrpcError');
const ResourceExhaustedGrpcError = require('@dashevo/grpc-common/lib/server/error/ResourceExhaustedGrpcError');
const RPCError = require('../../rpcServer/RPCError');

/**
* @param {jaysonClient} rpcClient
* @return {requestTenderRpc} A function to make RPC requests to Tenderdash.
*/
function requestTenderRpcFactory(rpcClient) {
/**
* @function
* @typedef requestTenderRpc
* @param {string} uri
* @param {Object} [params={}]
* @return {Promise<Object>}
*/
async function requestTenderRpc(uri, params = {}) {
let response;
try {
response = await rpcClient.request(uri, params);
} catch (e) {
if (e.code === 'ECONNRESET' || e.message === 'socket hang up') {
throw new UnavailableGrpcError('Tenderdash is not available');
}

throw new RPCError(
e.code || -32602,
`Failed to request ${uri}: ${e.message}`,
e,
);
}

const { result, error: jsonRpcError } = response;

if (jsonRpcError) {
if (typeof jsonRpcError.data === 'string') {
if (jsonRpcError.data.includes('too_many_resets')) {
throw new ResourceExhaustedGrpcError('tenderdash is not responding: too many requests');
}
}

throw new RPCError(
jsonRpcError.code || -32602,
jsonRpcError.message || 'Internal error',
jsonRpcError.data,
);
}

return result;
}

return requestTenderRpc;
}

module.exports = requestTenderRpcFactory;
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ const {
server: {
error: {
InvalidArgumentGrpcError,
AlreadyExistsGrpcError,
ResourceExhaustedGrpcError,
UnavailableGrpcError,
AlreadyExistsGrpcError,
InternalGrpcError,
},
},
} = require('@dashevo/grpc-common');
Expand All @@ -14,15 +15,23 @@ const {
BroadcastStateTransitionResponse,
},
} = require('@dashevo/dapi-grpc');

const crypto = require('crypto');

const logger = require('../../../logger');

/**
* @param {jaysonClient} rpcClient
* @param {createGrpcErrorFromDriveResponse} createGrpcErrorFromDriveResponse
* @param {requestTenderRpc} requestTenderRpc
*
* @returns {broadcastStateTransitionHandler}
*/
function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDriveResponse) {
function broadcastStateTransitionHandlerFactory(
rpcClient,
createGrpcErrorFromDriveResponse,
requestTenderRpc,
) {
/**
* @typedef broadcastStateTransitionHandler
*
Expand All @@ -38,15 +47,17 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr
throw new InvalidArgumentGrpcError('State Transition is not specified');
}

const tx = Buffer.from(stByteArray)
const stBytes = Buffer.from(stByteArray);

const tx = stBytes
.toString('base64');

let response;

try {
response = await rpcClient.request('broadcast_tx', { tx });
} catch (e) {
if (e.message === 'socket hang up') {
if (e.code === 'ECONNRESET' || e.message === 'socket hang up') {
throw new UnavailableGrpcError('Tenderdash is not available');
}

Expand All @@ -55,15 +66,65 @@ function broadcastStateTransitionHandlerFactory(rpcClient, createGrpcErrorFromDr
throw e;
}

const {
result,
error: jsonRpcError,
} = response;
const { result, error: jsonRpcError } = response;

if (jsonRpcError) {
if (typeof jsonRpcError.data === 'string') {
if (jsonRpcError.data === 'tx already exists in cache') {
throw new AlreadyExistsGrpcError('state transition already in chain');
// We need to figure out and report to user why the ST cached
const stHash = crypto.createHash('sha256')
.update(stBytes)
.digest();

// TODO: Apply search filter to fetch specific state transition
// Throw an already exist in mempool error if the ST in mempool
const unconfirmedTxsResponse = await requestTenderRpc('unconfirmed_txs', { limit: 100 });

if (unconfirmedTxsResponse?.txs?.includes(stBytes.toString('base64'))) {
throw new AlreadyExistsGrpcError('state transition already in mempool');
}

// Throw an already exist in chain error if the ST is committed
let txResponse;
try {
txResponse = await requestTenderRpc('tx', { hash: stHash.toString('base64') });
} catch (e) {
if (typeof e.data !== 'string' || !e.data.includes('not found')) {
throw e;
}
}

if (txResponse?.tx_result) {
throw new AlreadyExistsGrpcError('state transition already in chain');
}

// If the ST not in mempool and not in the state but still in the cache
// it means it was invalidated by CheckTx so we run CheckTx again to provide
// the validation error
const checkTxResponse = await requestTenderRpc('check_tx', { tx });

if (checkTxResponse?.code !== 0) {
// Return validation error
throw await createGrpcErrorFromDriveResponse(
checkTxResponse.code,
checkTxResponse.info,
);
} else {
// CheckTx passes for the ST, it means we have a bug in Drive so ST is passing check
// tx and then removed from the block. The removal from the block doesn't remove ST
// from the cache because it's happening only one proposer and other nodes do not know
// that this ST was processed and keep it in the cache
// The best what we can do is to return an internal error and and log the transaction
logger.warn({
tx,
}, `State transition ${stHash.toString('hex')} is passing CheckTx but removed from the block by proposal`);

const error = new Error('State Transition processing error. Please report'
+ ' faulty state transition and try to create a new state transition with different'
+ ' hash as a workaround.');

throw new InternalGrpcError(error);
}
}

if (jsonRpcError.data.startsWith('Tx too large.')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const waitForTransactionToBeProvableFactory = require('../../../externalApis/ten
const waitForTransactionResult = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/waitForTransactionResult');
const getExistingTransactionResultFactory = require('../../../externalApis/tenderdash/waitForTransactionToBeProvable/getExistingTransactionResult');
const getConsensusParamsFactory = require('../../../externalApis/tenderdash/getConsensusParamsFactory');
const requestTenderRpcFactory = require('../../../externalApis/tenderdash/requestTenderRpc');

/**
* @param {jaysonClient} rpcClient
Expand All @@ -73,10 +74,13 @@ function platformHandlersFactory(
) {
const wrapInErrorHandler = wrapInErrorHandlerFactory(logger, isProductionEnvironment);

const requestTenderRpc = requestTenderRpcFactory(rpcClient);

// broadcastStateTransition
const broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory(
rpcClient,
createGrpcErrorFromDriveResponse,
requestTenderRpc,
);

const wrappedBroadcastStateTransition = jsonToProtobufHandlerWrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
AlreadyExistsGrpcError,
UnavailableGrpcError,
ResourceExhaustedGrpcError,
InternalGrpcError,
},
},
} = require('@dashevo/grpc-common');
Expand Down Expand Up @@ -36,6 +37,7 @@ describe('broadcastStateTransitionHandlerFactory', () => {
let log;
let code;
let createGrpcErrorFromDriveResponseMock;
let requestTenderRpcMock;

before(async () => {
await loadWasmDpp();
Expand Down Expand Up @@ -82,11 +84,14 @@ describe('broadcastStateTransitionHandlerFactory', () => {
request: this.sinon.stub().resolves(response),
};

requestTenderRpcMock = this.sinon.stub();

createGrpcErrorFromDriveResponseMock = this.sinon.stub();

broadcastStateTransitionHandler = broadcastStateTransitionHandlerFactory(
rpcClientMock,
createGrpcErrorFromDriveResponseMock,
requestTenderRpcMock,
);
});

Expand Down Expand Up @@ -182,13 +187,38 @@ describe('broadcastStateTransitionHandlerFactory', () => {
}
});

it('should throw AlreadyExistsGrpcError if transaction was broadcasted twice', async () => {
it('should throw AlreadyExistsGrpcError if transaction in mempool', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('unconfirmed_txs').resolves({
txs: [stateTransitionFixture.toBuffer().toString('base64')],
});

try {
await broadcastStateTransitionHandler(call);

expect.fail('should throw AlreadyExistsGrpcError');
} catch (e) {
expect(e).to.be.an.instanceOf(AlreadyExistsGrpcError);
expect(e.getMessage()).to.equal('state transition already in mempool');
}
});

it('should throw AlreadyExistsGrpcError if transaction in chain', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('tx').resolves({
tx_result: { },
});

try {
await broadcastStateTransitionHandler(call);

Expand All @@ -199,6 +229,52 @@ describe('broadcastStateTransitionHandlerFactory', () => {
}
});

it('should throw consensus result for invalid transition in cache', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('check_tx').resolves({
code: 1,
info: 'some info',
});

const error = new Error('some error');

createGrpcErrorFromDriveResponseMock.resolves(error);

try {
await broadcastStateTransitionHandler(call);

expect.fail('should throw consensus error');
} catch (e) {
expect(e).to.equal(error);
}
});

it('should throw internal error for transition in cache that passing check tx', async () => {
response.error = {
code: -32603,
message: 'Internal error',
data: 'tx already exists in cache',
};

requestTenderRpcMock.withArgs('check_tx').resolves({
code: 0,
});

try {
await broadcastStateTransitionHandler(call);

expect.fail('should throw InternalError');
} catch (e) {
expect(e).to.be.an.instanceOf(InternalGrpcError);
expect(e.getMessage()).to.equal('Internal error');
}
});

it('should throw a gRPC error based on drive\'s response', async () => {
const message = 'not found';
const metadata = {
Expand Down

0 comments on commit e0e152f

Please sign in to comment.