From 91668d4f5bdb557f67c1684c12879604dcfc3391 Mon Sep 17 00:00:00 2001 From: 1yam Date: Fri, 9 Aug 2024 11:08:41 +0200 Subject: [PATCH] Feature: Could not creating Superfluid flows from sdk Solution: Install and import superfluid.py from PyPI. Add helper methods on EthAccount --- pyproject.toml | 5 +- src/aleph/sdk/chains/ethereum.py | 98 +++++++++++++++++- src/aleph/sdk/conf.py | 4 + src/aleph/sdk/connectors/superfluid.py | 124 +++++++++++++++++++++++ tests/unit/test_superfluid.py | 133 +++++++++++++++++++++++++ 5 files changed, 361 insertions(+), 3 deletions(-) create mode 100644 src/aleph/sdk/connectors/superfluid.py create mode 100644 tests/unit/test_superfluid.py diff --git a/pyproject.toml b/pyproject.toml index fbe94434..4c41904a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,10 @@ dependencies = [ "jwcrypto==1.5.6", "python-magic", "typing_extensions", - "aioresponses>=0.7.6" + "aioresponses>=0.7.6", + "superfluid~=0.2.1", + "eth_typing==4.3.1", + ] [project.optional-dependencies] diff --git a/src/aleph/sdk/chains/ethereum.py b/src/aleph/sdk/chains/ethereum.py index b0fa5fbe..294f47da 100644 --- a/src/aleph/sdk/chains/ethereum.py +++ b/src/aleph/sdk/chains/ethereum.py @@ -1,24 +1,78 @@ +from decimal import Decimal from pathlib import Path -from typing import Optional, Union +from typing import Awaitable, Dict, Optional, Set, Union +from aleph_message.models import Chain from eth_account import Account from eth_account.messages import encode_defunct from eth_account.signers.local import LocalAccount from eth_keys.exceptions import BadSignature as EthBadSignatureError +from superfluid import Web3FlowInfo +from ..conf import settings +from ..connectors.superfluid import Superfluid from ..exceptions import BadSignatureError from ..utils import bytes_from_hex from .common import BaseAccount, get_fallback_private_key, get_public_key +CHAINS_WITH_SUPERTOKEN: Set[Chain] = {Chain.AVAX} +CHAIN_IDS: Dict[Chain, int] = { + Chain.AVAX: settings.AVAX_CHAIN_ID, +} + + +def get_rpc_for_chain(chain: Chain): + """Returns the RPC to use for a given Ethereum based blockchain""" + if not chain: + return None + + if chain == Chain.AVAX: + return settings.AVAX_RPC + else: + raise ValueError(f"Unknown RPC for chain {chain}") + + +def get_chain_id_for_chain(chain: Chain): + """Returns the chain ID of a given Ethereum based blockchain""" + if not chain: + return None + + if chain in CHAIN_IDS: + return CHAIN_IDS[chain] + else: + raise ValueError(f"Unknown RPC for chain {chain}") + class ETHAccount(BaseAccount): + """Interact with an Ethereum address or key pair""" + CHAIN = "ETH" CURVE = "secp256k1" _account: LocalAccount + chain: Optional[Chain] + superfluid_connector: Optional[Superfluid] - def __init__(self, private_key: bytes): + def __init__( + self, + private_key: bytes, + chain: Optional[Chain] = None, + rpc: Optional[str] = None, + chain_id: Optional[int] = None, + ): self.private_key = private_key self._account = Account.from_key(self.private_key) + self.chain = chain + rpc = rpc or get_rpc_for_chain(chain) + chain_id = chain_id or get_chain_id_for_chain(chain) + self.superfluid_connector = ( + Superfluid( + rpc=rpc, + chain_id=chain_id, + account=self._account, + ) + if chain in CHAINS_WITH_SUPERTOKEN + else None + ) async def sign_raw(self, buffer: bytes) -> bytes: """Sign a raw buffer.""" @@ -37,6 +91,46 @@ def from_mnemonic(mnemonic: str) -> "ETHAccount": Account.enable_unaudited_hdwallet_features() return ETHAccount(private_key=Account.from_mnemonic(mnemonic=mnemonic).key) + def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: + """Creat a Superfluid flow between this account and the receiver address.""" + if not self.superfluid_connector: + raise ValueError("Superfluid connector is required to create a flow") + return self.superfluid_connector.create_flow( + sender=self.get_address(), receiver=receiver, flow=flow + ) + + def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]: + """Get the Superfluid flow between this account and the receiver address.""" + if not self.superfluid_connector: + raise ValueError("Superfluid connector is required to get a flow") + return self.superfluid_connector.get_flow( + sender=self.get_address(), receiver=receiver + ) + + def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: + """Update the Superfluid flow between this account and the receiver address.""" + if not self.superfluid_connector: + raise ValueError("Superfluid connector is required to update a flow") + return self.superfluid_connector.update_flow( + sender=self.get_address(), receiver=receiver, flow=flow + ) + + def delete_flow(self, receiver: str) -> Awaitable[str]: + """Delete the Superfluid flow between this account and the receiver address.""" + if not self.superfluid_connector: + raise ValueError("Superfluid connector is required to delete a flow") + return self.superfluid_connector.delete_flow( + sender=self.get_address(), receiver=receiver + ) + + def update_superfluid_connector(self, rpc: str, chain_id: int): + """Update the Superfluid connector after initialisation.""" + self.superfluid_connector = Superfluid( + rpc=rpc, + chain_id=chain_id, + account=self._account, + ) + def get_fallback_account(path: Optional[Path] = None) -> ETHAccount: return ETHAccount(private_key=get_fallback_private_key(path=path)) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 318536e4..70378088 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -38,6 +38,10 @@ class Settings(BaseSettings): CODE_USES_SQUASHFS: bool = which("mksquashfs") is not None # True if command exists + AVAX_RPC: str = "https://api.avax.network/ext/bc/C/rpc" + AVAX_CHAIN_ID: int = 43114 + AVAX_ALEPH_SUPER_TOKEN = "0xc0Fbc4967259786C743361a5885ef49380473dCF" # mainnet + # Dns resolver DNS_IPFS_DOMAIN = "ipfs.public.aleph.sh" DNS_PROGRAM_DOMAIN = "program.public.aleph.sh" diff --git a/src/aleph/sdk/connectors/superfluid.py b/src/aleph/sdk/connectors/superfluid.py new file mode 100644 index 00000000..2c0b9fb6 --- /dev/null +++ b/src/aleph/sdk/connectors/superfluid.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import asyncio +from decimal import Decimal +from typing import TYPE_CHECKING, Optional + +from eth_utils import to_normalized_address, to_wei +from superfluid import CFA_V1, Operation, Web3FlowInfo +from web3 import Web3 +from web3.types import TxParams + +from aleph.sdk.conf import settings + +if TYPE_CHECKING: + from aleph.sdk.chains.ethereum import LocalAccount + + +async def sign_and_send_transaction( + account: LocalAccount, tx_params: TxParams, rpc: str +) -> str: + """ + Sign and broadcast a transaction using the provided ETHAccount + + @param tx_params - Transaction parameters + @param rpc - RPC URL + @returns - str - The transaction hash + """ + web3 = Web3(Web3.HTTPProvider(rpc)) + + def sign_and_send(): + signed_txn = account.sign_transaction(tx_params) + transaction_hash = web3.eth.send_raw_transaction(signed_txn.rawTransaction) + return transaction_hash.hex() + + # Sending a transaction is done over HTTP(S) and implemented using a blocking + # API in `web3.eth`. This runs it in a non-blocking asyncio executor. + loop = asyncio.get_running_loop() + transaction_hash = await loop.run_in_executor(None, sign_and_send) + return transaction_hash + + +async def execute_operation_with_account( + account: LocalAccount, operation: Operation +) -> str: + """ + Execute an operation using the provided ETHAccount + + @param operation - Operation instance from the library + @returns - str - The transaction hash + @returns - str - The transaction hash + """ + populated_transaction = operation._get_populated_transaction_request( + operation.rpc, account.key + ) + transaction_hash = await sign_and_send_transaction( + account, populated_transaction, operation.rpc + ) + return transaction_hash + + +class Superfluid: + """ + Wrapper around the Superfluid APIs in order to CRUD Superfluid flows between two accounts. + """ + + account: Optional[LocalAccount] + + def __init__( + self, + rpc=settings.AVAX_RPC, + chain_id=settings.AVAX_CHAIN_ID, + account: Optional[LocalAccount] = None, + ): + self.cfaV1Instance = CFA_V1(rpc, chain_id) + self.account = account + + async def create_flow(self, sender: str, receiver: str, flow: Decimal) -> str: + """Create a Superfluid flow between two addresses.""" + if not self.account: + raise ValueError("An account is required to create a flow") + return await execute_operation_with_account( + account=self.account, + operation=self.cfaV1Instance.create_flow( + sender=to_normalized_address(sender), + receiver=to_normalized_address(receiver), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + flow_rate=to_wei(Decimal(flow), "ether"), + ), + ) + + async def get_flow(self, sender: str, receiver: str) -> Web3FlowInfo: + """Fetch information about the Superfluid flow between two addresses.""" + return self.cfaV1Instance.get_flow( + sender=to_normalized_address(sender), + receiver=to_normalized_address(receiver), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + ) + + async def delete_flow(self, sender: str, receiver: str) -> str: + """Delete the Supefluid flow between two addresses.""" + if not self.account: + raise ValueError("An account is required to delete a flow") + return await execute_operation_with_account( + account=self.account, + operation=self.cfaV1Instance.delete_flow( + sender=to_normalized_address(sender), + receiver=to_normalized_address(receiver), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + ), + ) + + async def update_flow(self, sender: str, receiver: str, flow: Decimal) -> str: + """Update the flow of a Superfluid flow between two addresses.""" + if not self.account: + raise ValueError("An account is required to update a flow") + return await execute_operation_with_account( + account=self.account, + operation=self.cfaV1Instance.update_flow( + sender=to_normalized_address(sender), + receiver=to_normalized_address(receiver), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + flow_rate=to_wei(Decimal(flow), "ether"), + ), + ) diff --git a/tests/unit/test_superfluid.py b/tests/unit/test_superfluid.py new file mode 100644 index 00000000..92c83f2c --- /dev/null +++ b/tests/unit/test_superfluid.py @@ -0,0 +1,133 @@ +import random +from decimal import Decimal +from unittest import mock +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from aleph_message.models import Chain +from eth_utils import to_checksum_address +from superfluid import Operation, Web3FlowInfo + +from aleph.sdk.chains.ethereum import ETHAccount +from aleph.sdk.conf import settings + + +def generate_fake_eth_address(): + return to_checksum_address( + "0x" + "".join([random.choice("0123456789abcdef") for _ in range(40)]) + ) + + +@pytest.fixture +def mock_superfluid(): + with patch("aleph.sdk.connectors.superfluid.CFA_V1") as MockCFA_V1: + yield MockCFA_V1.return_value + + +@pytest.fixture +def eth_account(mock_superfluid): + private_key = b"\x01" * 32 + return ETHAccount( + private_key, + chain=Chain.AVAX, + rpc=settings.AVAX_RPC, + chain_id=settings.AVAX_CHAIN_ID, + ) + + +@pytest.mark.asyncio +async def test_initialization(eth_account): + assert eth_account.superfluid_connector is not None + + +@pytest.mark.asyncio +async def test_create_flow(eth_account, mock_superfluid): + mock_operation = AsyncMock(spec=Operation) + mock_superfluid.create_flow.return_value = mock_operation + + sender = eth_account.get_address() + receiver = generate_fake_eth_address() + flow = Decimal("10.0") + + with patch( + "aleph.sdk.connectors.superfluid.execute_operation_with_account", + return_value="0xTransactionHash", + ) as mock_execute: + tx_hash = await eth_account.create_flow(receiver, flow) + assert tx_hash == "0xTransactionHash" + mock_execute.assert_called_once_with( + account=eth_account._account, operation=mock_operation + ) + mock_superfluid.create_flow.assert_called_once_with( + sender=sender.lower(), + receiver=receiver.lower(), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + flow_rate=mock.ANY, + ) + + +@pytest.mark.asyncio +async def test_delete_flow(eth_account, mock_superfluid): + mock_operation = AsyncMock(spec=Operation) + mock_superfluid.delete_flow.return_value = mock_operation + + sender = eth_account.get_address() + receiver = generate_fake_eth_address() + + with patch( + "aleph.sdk.connectors.superfluid.execute_operation_with_account", + return_value="0xTransactionHash", + ) as mock_execute: + tx_hash = await eth_account.delete_flow(receiver) + assert tx_hash == "0xTransactionHash" + mock_execute.assert_called_once_with( + account=eth_account._account, operation=mock_operation + ) + mock_superfluid.delete_flow.assert_called_once_with( + sender=sender.lower(), + receiver=receiver.lower(), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + ) + + +@pytest.mark.asyncio +async def test_update_flow(eth_account, mock_superfluid): + mock_operation = AsyncMock(spec=Operation) + mock_superfluid.update_flow.return_value = mock_operation + + sender = eth_account.get_address() + receiver = generate_fake_eth_address() + flow = Decimal(15.0) + + with patch( + "aleph.sdk.connectors.superfluid.execute_operation_with_account", + return_value="0xTransactionHash", + ) as mock_execute: + tx_hash = await eth_account.update_flow(receiver, flow) + assert tx_hash == "0xTransactionHash" + mock_execute.assert_called_once_with( + account=eth_account._account, operation=mock_operation + ) + mock_superfluid.update_flow.assert_called_once_with( + sender=sender.lower(), + receiver=receiver.lower(), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + flow_rate=mock.ANY, + ) + + +@pytest.mark.asyncio +async def test_get_flow(eth_account, mock_superfluid): + mock_flow_info = MagicMock(spec=Web3FlowInfo) + mock_superfluid.get_flow.return_value = mock_flow_info + + sender = eth_account.get_address() + receiver = generate_fake_eth_address() + + flow_info = await eth_account.get_flow(receiver) + assert flow_info == mock_flow_info + mock_superfluid.get_flow.assert_called_once_with( + sender=sender.lower(), + receiver=receiver.lower(), + super_token=settings.AVAX_ALEPH_SUPER_TOKEN, + )