Skip to content

Commit

Permalink
Merge pull request #474 from lidofinance/develop
Browse files Browse the repository at this point in the history
Develop to main
  • Loading branch information
itaven authored Sep 13, 2024
2 parents a0e2326 + 2dfa797 commit 12558c8
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 15 deletions.
17 changes: 16 additions & 1 deletion pages/api/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import {
} from 'utilsApi';
import Metrics from 'utilsApi/metrics';
import { rpcFactory } from 'utilsApi/rpcFactory';
import { METRIC_CONTRACT_ADDRESSES } from 'utilsApi/contractAddressesMetricsMap';
import {
METRIC_CONTRACT_ADDRESSES,
METRIC_CONTRACT_EVENT_ADDRESSES,
} from 'utilsApi/contractAddressesMetricsMap';

const allowedCallAddresses: Record<string, string[]> = Object.entries(
METRIC_CONTRACT_ADDRESSES,
Expand All @@ -27,6 +30,16 @@ const allowedCallAddresses: Record<string, string[]> = Object.entries(
{} as Record<string, string[]>,
);

const allowedLogsAddresses: Record<string, string[]> = Object.entries(
METRIC_CONTRACT_EVENT_ADDRESSES,
).reduce(
(acc, [chainId, addresses]) => {
acc[chainId] = Object.keys(addresses);
return acc;
},
{} as Record<string, string[]>,
);

const rpc = rpcFactory({
fetchRPC: trackedFetchRpcFactory({
registry: Metrics.registry,
Expand Down Expand Up @@ -61,7 +74,9 @@ const rpc = rpcFactory({
'net_version',
],
allowedCallAddresses,
allowedLogsAddresses,
maxBatchCount: config.PROVIDER_MAX_BATCH,
disallowEmptyAddressGetLogs: false,
});

export default wrapNextRequest([
Expand Down
7 changes: 5 additions & 2 deletions shared/hooks/use-allowance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type UseAllowanceProps = {
spender: Address;
};

const onError = (error: unknown) =>
console.warn('[useAllowance] error while watching events', error);

export const useAllowance = ({
token,
account,
Expand Down Expand Up @@ -95,7 +98,6 @@ export const useAllowance = ({
useWatchContractEvent({
abi: Erc20AllowanceAbi,
eventName: 'Approval',
batch: true,
poll: true,
args: useMemo(
() => ({
Expand All @@ -107,12 +109,12 @@ export const useAllowance = ({
address: token,
enabled,
onLogs,
onError,
});

useWatchContractEvent({
abi: Erc20AllowanceAbi,
eventName: 'Transfer',
batch: false,
poll: true,
args: useMemo(
() => ({
Expand All @@ -123,6 +125,7 @@ export const useAllowance = ({
address: token,
enabled,
onLogs,
onError,
});

return allowanceQuery;
Expand Down
12 changes: 12 additions & 0 deletions shared/hooks/use-balance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ type OnLogsFn = WatchContractEventOnLogsFn<
true
>;

const onError = (error: unknown) =>
console.warn(
'[useTokenTransferSubscription] error while watching events',
error,
);

export const useTokenTransferSubscription = () => {
const { address } = useAccount();
const queryClient = useQueryClient();
Expand Down Expand Up @@ -134,6 +140,8 @@ export const useTokenTransferSubscription = () => {
useWatchContractEvent({
abi: Erc20EventsAbi,
eventName: 'Transfer',
batch: true,
poll: true,
args: useMemo(
() => ({
to: address,
Expand All @@ -143,11 +151,14 @@ export const useTokenTransferSubscription = () => {
address: tokens,
enabled: shouldWatch,
onLogs,
onError,
});

useWatchContractEvent({
abi: Erc20EventsAbi,
eventName: 'Transfer',
batch: true,
poll: true,
args: useMemo(
() => ({
from: address,
Expand All @@ -157,6 +168,7 @@ export const useTokenTransferSubscription = () => {
address: tokens,
enabled: shouldWatch,
onLogs,
onError,
});

const subscribe = useCallback(
Expand Down
19 changes: 19 additions & 0 deletions utils/use-web3-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
custom,
Chain,
UnsupportedProviderMethodError,
InvalidParamsRpcError,
} from 'viem';
import type { OnResponseFn } from 'viem/_types/clients/transports/fallback';
import type { Connection } from 'wagmi';
Expand Down Expand Up @@ -56,6 +57,24 @@ const runtimeMutableTransport = (
throw error;
}

if (
requestParams.method === 'eth_getLogs' &&
Array.isArray(requestParams?.params) &&
requestParams.params[0]?.address?.length < 0
) {
const error = new InvalidParamsRpcError(
new Error(`Empty address for eth_getLogs is not supported`),
);
responseFn({
error,
method: requestParams.method,
params: params as unknown[],
transport,
status: 'error',
});
throw error;
}

transport.value?.onResponse(responseFn);
return transport.request(requestParams, options);
},
Expand Down
24 changes: 24 additions & 0 deletions utilsApi/contractAddressesMetricsMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,27 @@ export const METRIC_CONTRACT_ADDRESSES = (
},
{} as Record<CHAINS, Record<`0x${string}`, CONTRACT_NAMES>>,
);

export const METRIC_CONTRACT_EVENT_ADDRESSES = (
config.supportedChains as CHAINS[]
).reduce(
(mapped, chainId) => {
const map = {
[CONTRACT_NAMES.stETH]: getAddressOrNull(
getTokenAddress,
chainId,
TOKENS.STETH,
),
[CONTRACT_NAMES.wstETH]: getAddressOrNull(
getTokenAddress,
chainId,
TOKENS.WSTETH,
),
};
return {
...mapped,
[chainId]: invert(omitBy(map, isNull)),
};
},
{} as Record<CHAINS, Record<`0x${string}`, CONTRACT_NAMES>>,
);
126 changes: 114 additions & 12 deletions utilsApi/rpcFactory.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Readable } from 'node:stream';
import { Readable, Transform } from 'node:stream';
import { ReadableStream } from 'node:stream/web';
import type { NextApiRequest, NextApiResponse } from 'next';
import { Counter, Registry } from 'prom-client';
Expand All @@ -13,18 +13,53 @@ export const DEFAULT_API_ERROR_MESSAGE =

export const HEALTHY_RPC_SERVICES_ARE_OVER = 'Healthy RPC services are over!';

export class UnsupportedChainIdError extends Error {
export class ClientError extends Error {}
export class UnsupportedChainIdError extends ClientError {
constructor(message?: string) {
super(message || 'Unsupported chainId');
}
}

export class UnsupportedHTTPMethodError extends Error {
export class UnsupportedHTTPMethodError extends ClientError {
constructor(message?: string) {
super(message || 'Unsupported HTTP method');
}
}

export class InvalidRequestError extends ClientError {
constructor(message?: string) {
super(message || 'Invalid Request');
}
}

export class SizeTooLargeError extends ClientError {
constructor(message?: string) {
super(message || 'Invalid Request');
}
}

const createSizeLogger = (MAX_SIZE: number) => {
let bytesWritten = 0;
const logSizeStream = new Transform({
transform(chunk, _encoding, callback) {
bytesWritten += chunk.length;
if (bytesWritten > MAX_SIZE) {
// Emit an error if size exceeds MAX_SIZE
return callback(
new SizeTooLargeError(
`Stream size exceeds the maximum limit of ${MAX_SIZE} bytes`,
),
);
}
return callback(null, chunk); // Pass the chunk through
},
flush(callback) {
callback();
},
});
return logSizeStream;
};

export type RPCFactoryParams = {
metrics: {
prefix: string;
Expand All @@ -39,7 +74,10 @@ export type RPCFactoryParams = {
allowedRPCMethods: string[];
// filtration by eth_call to addresses
allowedCallAddresses?: Record<number, string[]>;
allowedLogsAddresses?: Record<number, string[]>;
disallowEmptyAddressGetLogs?: boolean;
maxBatchCount?: number;
maxResponseSize?: number;
};

export const rpcFactory = ({
Expand All @@ -49,7 +87,10 @@ export const rpcFactory = ({
defaultChain,
allowedRPCMethods,
allowedCallAddresses = {},
allowedLogsAddresses = {},
maxBatchCount,
maxResponseSize = 1_000_000, // ~1MB,
disallowEmptyAddressGetLogs = false,
}: RPCFactoryParams) => {
const rpcRequestBlocked = new Counter({
name: prefix + 'rpc_service_request_blocked',
Expand All @@ -67,6 +108,14 @@ export const rpcFactory = ({
{} as Record<string, Set<string>>,
);

const allowedLogsAddressMap = Object.entries(allowedLogsAddresses).reduce(
(acc, [chainId, addresses]) => {
acc[chainId] = new Set(addresses.map((a) => a.toLowerCase()));
return acc;
},
{} as Record<string, Set<string>>,
);

return async (req: NextApiRequest, res: NextApiResponse): Promise<void> => {
try {
// Accept only POST requests
Expand All @@ -89,19 +138,19 @@ export const rpcFactory = ({
typeof maxBatchCount === 'number' &&
requests.length > maxBatchCount
) {
throw new Error(`Too many batched requests`);
throw new InvalidRequestError(`Too many batched requests`);
}

// TODO: consider returning array of validators instead of throwing error right away

// Check if provided methods are allowed
// We throw HTTP error for ANY invalid RPC request out of batch
// because we assume that frontend must not send invalid requests
for (const { method, params } of requests) {
if (typeof method !== 'string') {
throw new Error(`RPC method isn't string`);
throw new InvalidRequestError(`RPC method isn't string`);
}
if (!allowedRPCMethods.includes(method)) {
rpcRequestBlocked.inc();
throw new Error(`RPC method ${method} isn't allowed`);
throw new InvalidRequestError(`RPC method ${method} isn't allowed`);
}
if (method === 'eth_call' && allowedCallAddressMap[chainId]) {
if (
Expand All @@ -113,9 +162,41 @@ export const rpcFactory = ({
!allowedCallAddressMap[chainId].has(params[0].to.toLowerCase())
) {
rpcRequestBlocked.inc();
throw new Error(`Address not allowed for eth_call`);
throw new InvalidRequestError(`Address not allowed for eth_call`);
}
} else
throw new InvalidRequestError(`RPC method eth_call is invalid`);
}
if (
method === 'eth_getLogs' &&
(disallowEmptyAddressGetLogs || allowedLogsAddressMap[chainId])
) {
if (Array.isArray(params) && typeof params[0] === 'object') {
const address = params[0].address;
if (
disallowEmptyAddressGetLogs &&
(!address || (Array.isArray(address) && address.length === 0))
) {
rpcRequestBlocked.inc();
throw new InvalidRequestError(`No empty address on eth_getLogs`);
}
const addresses = Array.isArray(address) ? address : [address];
if (
addresses.some(
(eventAddress) =>
// needs this check before toLowerCase
typeof eventAddress !== 'string' ||
!allowedLogsAddressMap[chainId].has(
eventAddress.toLowerCase(),
),
)
) {
rpcRequestBlocked.inc();
throw new InvalidRequestError(
`Address not allowed for eth_getLogs`,
);
}
} else throw new Error(`RPC method eth_call is invalid`);
} else throw new InvalidRequestError(`Invalid eth_getLogs`);
}
}

Expand All @@ -133,14 +214,35 @@ export const rpcFactory = ({
requested.headers.get('Content-Type') ?? 'application/json',
);
if (requested.body) {
Readable.fromWeb(requested.body as ReadableStream).pipe(res);
const sizeLimit = createSizeLogger(maxResponseSize);
const readableStream = Readable.fromWeb(
requested.body as ReadableStream,
);
readableStream
.pipe(sizeLimit)
.on('error', (error) => {
if (error instanceof SizeTooLargeError) {
console.warn('[rpcFactory] RPC response too large', {
request: JSON.stringify(requests),
});
res.statusCode = 413; // Payload Too Large
res.end(error.message);
} else {
res.statusCode = 500;
res.end(DEFAULT_API_ERROR_MESSAGE);
}
readableStream.destroy();
})
.pipe(res);
} else {
res
.status(requested.status)
.json('There are a problems with RPC provider');
}
} catch (error) {
if (error instanceof Error) {
if (error instanceof ClientError) {
res.status(400).json(error.message ?? DEFAULT_API_ERROR_MESSAGE);
} else if (error instanceof Error) {
// TODO: check if there are errors duplication with iterateUrls
console.error(error.message ?? DEFAULT_API_ERROR_MESSAGE);
res.status(500).json(error.message ?? DEFAULT_API_ERROR_MESSAGE);
Expand Down

0 comments on commit 12558c8

Please sign in to comment.