Skip to content

Commit

Permalink
feat: 🎸 Added subscribeEntities and subscribeConfig methods
Browse files Browse the repository at this point in the history
  • Loading branch information
kostysh committed Feb 15, 2024
1 parent d246558 commit 9341b87
Showing 1 changed file with 126 additions and 34 deletions.
160 changes: 126 additions & 34 deletions packages/contracts-manger/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {
Address,
Hash,
Abi,
Account,
InferFunctionName,
GetFunctionArgs,
Expand All @@ -17,12 +16,15 @@ import {
zeroAddress,
GetFilterLogsReturnType,
InferEventName,
Abi,
CreateContractEventFilterParameters,
} from 'viem';
import { stringify } from 'superjson';
import {
marketABI,
erc20_18ABI,
entitiesRegistryABI,
configABI,
kinds,
} from '@windingtree/contracts';
import {
Expand Down Expand Up @@ -52,6 +54,11 @@ export interface ProtocolContractsOptions {
walletClient?: WalletClient;
}

/**
* Generic filter options type.
*/
type FilterOptions = CreateContractEventFilterParameters<Abi, string>;

/**
* Common API of the protocol smart contracts set
*
Expand Down Expand Up @@ -688,69 +695,154 @@ export class ProtocolContracts<
}

/**
* Subscribes to specific events emitted by the market smart contract.
* Subscribes to events from a specified smart contract.
*
* @param eventName - The name of the event to listen for.
* @param onLogs - Callback function to handle the event logs.
* @param fromBlock - (Optional) The starting block number for listening to events.
* @param pollInterval - (Optional) Interval in milliseconds for polling new events.
* @returns A function to unsubscribe from the event.
* @template TEventName - Generic type parameter for event name.
* @template TAbi The ABI type of the contract.
* @template TEventName The name of the event to subscribe to.
* @param {TAbi} abi The ABI of the contract to subscribe to.
* @param {Address} address The address of the contract.
* @param {InferEventName<TAbi, TEventName>} eventName The name of the event.
* @param {(logs: GetFilterLogsReturnType<TAbi>) => void} onLogs Callback to execute when logs are received.
* @param {bigint} [fromBlock] The block number from which to start listening for events.
* @param {number} [pollInterval=1000] The interval in milliseconds at which to poll for new events.
* @returns {Promise<() => void>} A promise that resolves to an unsubscribe function.
* @private
*/
async subscribeMarket<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof marketABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof marketABI>) => void,
private async subscribeToEvents<
// eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents
const TAbi extends Abi | readonly unknown[] = Abi,
TEventName extends string | undefined = undefined,
>(
abi: TAbi,
address: Address,
eventName: InferEventName<TAbi, TEventName>,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onLogs: (logs: any) => void,
fromBlock?: bigint,
pollInterval = 1000,
pollInterval: number = 1000,
): Promise<() => void> {
let blockNumber = await this.publicClient.getBlockNumber();
let isUnsubscribed = false;
let timeoutId: NodeJS.Timeout;

// Use the specified fromBlock or the current block number
// Adjust starting block number if fromBlock is provided and valid
if (fromBlock && fromBlock < blockNumber) {
blockNumber = fromBlock;
}

// Function to fetch and process logs
const getLogs = async () => {
if (isUnsubscribed) return; // Stop if unsubscribed
if (isUnsubscribed) return;

// Create an event filter
const filter = await this.publicClient.createContractEventFilter({
abi: marketABI,
address: this.contracts['market'].address,
eventName,
abi,
address,
fromBlock: blockNumber,
strict: true,
});
eventName: eventName,
} as unknown as FilterOptions);

// Retrieve logs based on the filter
const logs = await this.publicClient.getFilterLogs({ filter });

// Process logs and update the block number
if (logs.length > 0) {
const maxBlockNumber = logs.reduce(
(max, log) => (log.blockNumber > max ? log.blockNumber : max),
BigInt(0),
);
blockNumber = maxBlockNumber + BigInt(1);
onLogs(logs);
// Update the block number to the next after the last log's block
const bn = logs[logs.length - 1].blockNumber;
blockNumber = (bn !== null ? bn : BigInt(0)) + BigInt(1);
}

// Schedule the next call
timeoutId = setTimeout(() => {
getLogs().catch(logger.error);
}, pollInterval);
if (!isUnsubscribed) {
timeoutId = setTimeout(() => {
getLogs().catch(logger.error);
}, pollInterval);
}
};

// Initial call to start the polling process
// Initial call to start polling
getLogs().catch(logger.error);

// Return the unsubscribe function
// Return unsubscribe function
return () => {
isUnsubscribed = true; // Set the flag to stop further polling
clearTimeout(timeoutId); // Clear the timeout to stop scheduled calls
isUnsubscribed = true;
clearTimeout(timeoutId);
};
}

/**
* Subscribes to market contract events.
*
* @template TEventName Type of event name.
* @param {InferEventName<typeof marketABI, TEventName>} eventName The event name to subscribe to.
* @param {(logs: GetFilterLogsReturnType<typeof marketABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
*/
async subscribeMarket<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof marketABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof marketABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
return this.subscribeToEvents(
marketABI,
this.contracts['market'].address,
eventName,
onLogs,
fromBlock,
pollInterval,
);
}

/**
* Subscribes to entities contract events.
*
* @template TEventName Type of event name.
* @param {InferEventName<typeof entitiesRegistryABI, TEventName>} eventName The event name to subscribe to.
* @param {(logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
*/
async subscribeEntities<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof entitiesRegistryABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
return this.subscribeToEvents(
entitiesRegistryABI,
this.contracts['entities'].address,
eventName,
onLogs,
fromBlock,
pollInterval,
);
}

/**
* Subscribes to config contract events.
*
* @template TEventName Type of event name.
* @param {InferEventName<typeof configABI, TEventName>} eventName The event name to subscribe to.
* @param {(logs: GetFilterLogsReturnType<typeof configABI>) => void} onLogs Callback for when logs are received.
* @param {bigint} [fromBlock] Starting block number for listening for events.
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
* @returns {Promise<() => void>} Unsubscribe function.
*/
async subscribeConfig<TEventName extends string | undefined = undefined>(
eventName: InferEventName<typeof configABI, TEventName>,
onLogs: (logs: GetFilterLogsReturnType<typeof configABI>) => void,
fromBlock?: bigint,
pollInterval: number = 1000,
): Promise<() => void> {
return this.subscribeToEvents(
configABI,
this.contracts['config'].address,
eventName,
onLogs,
fromBlock,
pollInterval,
);
}
}

0 comments on commit 9341b87

Please sign in to comment.