Skip to content

Commit

Permalink
fix: 🐛 Fixed contract events subscription fromBlock management
Browse files Browse the repository at this point in the history
  • Loading branch information
kostysh committed Mar 8, 2024
1 parent cff4095 commit 3c68a31
Showing 1 changed file with 35 additions and 26 deletions.
61 changes: 35 additions & 26 deletions packages/contracts-manger/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,10 +705,10 @@ export class ProtocolContracts<
* @param {(logs: GetFilterLogsReturnType<TAbi>) => void} onLogs Callback to execute when logs are received.
* @param {bigint} [fromBlock] The block number from which to start listening for events.
* @param {number} [pollInterval=1000] The interval in milliseconds at which to poll for new events.
* @returns {Promise<() => void>} A promise that resolves to an unsubscribe function.
* @returns {() => void} A promise that resolves to an unsubscribe function.
* @private
*/
private async subscribeToEvents<
private subscribeToEvents<
// eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents
const TAbi extends Abi | readonly unknown[] = Abi,
TEventName extends string | undefined = undefined,
Expand All @@ -717,38 +717,47 @@ export class ProtocolContracts<
address: Address,
eventName: InferEventName<TAbi, TEventName>,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onLogs: (logs: any) => void,
fromBlock?: bigint,
onLogs: (logs: any, maxBlockNumber: bigint) => void,
fromBlock: bigint = 0n,
pollInterval: number = 1000,
): Promise<() => void> {
let blockNumber = await this.publicClient.getBlockNumber();
let isUnsubscribed = false;
): () => void {
let timeoutId: NodeJS.Timeout;

// Adjust starting block number if fromBlock is provided and valid
if (fromBlock && fromBlock < blockNumber) {
blockNumber = fromBlock;
}
let isUnsubscribed = false;

// Function to fetch and process logs
const getLogs = async () => {
if (isUnsubscribed) return;

const blockNumber = await this.publicClient.getBlockNumber();

// Adjust starting block number if fromBlock is provided
if (fromBlock > blockNumber) {
fromBlock = blockNumber;
}

const filter = await this.publicClient.createContractEventFilter({
abi,
address,
fromBlock: blockNumber,
fromBlock,
strict: true,
eventName: eventName,
} as unknown as FilterOptions);

const logs = await this.publicClient.getFilterLogs({ filter });

if (logs.length > 0) {
onLogs(logs);
// Update the block number to the next after the last log's block
const bn = logs[logs.length - 1].blockNumber;
blockNumber = (bn !== null ? bn : BigInt(0)) + BigInt(1);
const maxBlockNumber = logs.reduce(
(max, log) =>
log.blockNumber !== null && log.blockNumber > max
? log.blockNumber
: max,
0n,
);

if (maxBlockNumber > fromBlock) {
fromBlock = maxBlockNumber + 1n;
onLogs(logs, maxBlockNumber);
}
}

if (!isUnsubscribed) {
Expand Down Expand Up @@ -776,14 +785,14 @@ export class ProtocolContracts<
* @param {(logs: GetFilterLogsReturnType<typeof marketABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
* @returns {() => void} Unsubscribe function.
*/
async subscribeMarket<TEventName extends string | undefined = undefined>(
subscribeMarket<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof marketABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof marketABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
): () => void {
return this.subscribeToEvents(
marketABI,
this.contracts['market'].address,
Expand All @@ -802,14 +811,14 @@ export class ProtocolContracts<
* @param {(logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
* @returns {() => void} Unsubscribe function.
*/
async subscribeEntities<TEventName extends string | undefined = undefined>(
subscribeEntities<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof entitiesRegistryABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
): () => void {
return this.subscribeToEvents(
entitiesRegistryABI,
this.contracts['entities'].address,
Expand All @@ -828,14 +837,14 @@ export class ProtocolContracts<
* @param {(logs: GetFilterLogsReturnType<typeof configABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
* @returns {() => void} Unsubscribe function.
*/
async subscribeConfig<TEventName extends string | undefined = undefined>(
subscribeConfig<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof configABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof configABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
): () => void {
return this.subscribeToEvents(
configABI,
this.contracts['config'].address,
Expand Down

0 comments on commit 3c68a31

Please sign in to comment.