Skip to content

Commit

Permalink
Feature: Could not creating Superfluid flows from sdk
Browse files Browse the repository at this point in the history
Solution:
Install and import superfluid.py from PyPI.
Add helper methods on EthAccount
  • Loading branch information
1yam authored and hoh committed Aug 20, 2024
1 parent 04622be commit fcb3730
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 3 deletions.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ dependencies = [
"jwcrypto==1.5.6",
"python-magic",
"typing_extensions",
"aioresponses>=0.7.6"
"aioresponses>=0.7.6",
"superfluid~=0.2.1",
"eth_typing==4.3.1",

]

[project.optional-dependencies]
Expand Down
98 changes: 96 additions & 2 deletions src/aleph/sdk/chains/ethereum.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,78 @@
from decimal import Decimal
from pathlib import Path
from typing import Optional, Union
from typing import Awaitable, Dict, Optional, Set, Union

from aleph_message.models import Chain
from eth_account import Account
from eth_account.messages import encode_defunct
from eth_account.signers.local import LocalAccount
from eth_keys.exceptions import BadSignature as EthBadSignatureError
from superfluid import Web3FlowInfo

from ..conf import settings
from ..connectors.superfluid import Superfluid
from ..exceptions import BadSignatureError
from ..utils import bytes_from_hex
from .common import BaseAccount, get_fallback_private_key, get_public_key

CHAINS_WITH_SUPERTOKEN: Set[Chain] = {Chain.AVAX}
CHAIN_IDS: Dict[Chain, int] = {
Chain.AVAX: settings.AVAX_CHAIN_ID,
}


def get_rpc_for_chain(chain: Chain):
"""Returns the RPC to use for a given Ethereum based blockchain"""
if not chain:
return None

if chain == Chain.AVAX:
return settings.AVAX_RPC
else:
raise ValueError(f"Unknown RPC for chain {chain}")


def get_chain_id_for_chain(chain: Chain):
"""Returns the chain ID of a given Ethereum based blockchain"""
if not chain:
return None

if chain in CHAIN_IDS:
return CHAIN_IDS[chain]
else:
raise ValueError(f"Unknown RPC for chain {chain}")


class ETHAccount(BaseAccount):
"""Interact with an Ethereum address or key pair"""

CHAIN = "ETH"
CURVE = "secp256k1"
_account: LocalAccount
chain: Optional[Chain]
superfluid_connector: Optional[Superfluid]

def __init__(self, private_key: bytes):
def __init__(
self,
private_key: bytes,
chain: Optional[Chain] = None,
rpc: Optional[str] = None,
chain_id: Optional[int] = None,
):
self.private_key = private_key
self._account = Account.from_key(self.private_key)
self.chain = chain
rpc = rpc or get_rpc_for_chain(chain)
chain_id = chain_id or get_chain_id_for_chain(chain)
self.superfluid_connector = (
Superfluid(
rpc=rpc,
chain_id=chain_id,
account=self._account,
)
if chain in CHAINS_WITH_SUPERTOKEN
else None
)

async def sign_raw(self, buffer: bytes) -> bytes:
"""Sign a raw buffer."""
Expand All @@ -37,6 +91,46 @@ def from_mnemonic(mnemonic: str) -> "ETHAccount":
Account.enable_unaudited_hdwallet_features()
return ETHAccount(private_key=Account.from_mnemonic(mnemonic=mnemonic).key)

def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
"""Creat a Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to create a flow")
return self.superfluid_connector.create_flow(
sender=self.get_address(), receiver=receiver, flow=flow
)

def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]:
"""Get the Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to get a flow")
return self.superfluid_connector.get_flow(
sender=self.get_address(), receiver=receiver
)

def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
"""Update the Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to update a flow")
return self.superfluid_connector.update_flow(
sender=self.get_address(), receiver=receiver, flow=flow
)

def delete_flow(self, receiver: str) -> Awaitable[str]:
"""Delete the Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to delete a flow")
return self.superfluid_connector.delete_flow(
sender=self.get_address(), receiver=receiver
)

def update_superfluid_connector(self, rpc: str, chain_id: int):
"""Update the Superfluid connector after initialisation."""
self.superfluid_connector = Superfluid(
rpc=rpc,
chain_id=chain_id,
account=self._account,
)


def get_fallback_account(path: Optional[Path] = None) -> ETHAccount:
return ETHAccount(private_key=get_fallback_private_key(path=path))
Expand Down
4 changes: 4 additions & 0 deletions src/aleph/sdk/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class Settings(BaseSettings):

CODE_USES_SQUASHFS: bool = which("mksquashfs") is not None # True if command exists

AVAX_RPC: str = "https://api.avax.network/ext/bc/C/rpc"
AVAX_CHAIN_ID: int = 43114
AVAX_ALEPH_SUPER_TOKEN = "0xc0Fbc4967259786C743361a5885ef49380473dCF" # mainnet

# Dns resolver
DNS_IPFS_DOMAIN = "ipfs.public.aleph.sh"
DNS_PROGRAM_DOMAIN = "program.public.aleph.sh"
Expand Down
124 changes: 124 additions & 0 deletions src/aleph/sdk/connectors/superfluid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

import asyncio
from decimal import Decimal
from typing import TYPE_CHECKING, Optional

from eth_utils import to_normalized_address, to_wei
from superfluid import CFA_V1, Operation, Web3FlowInfo
from web3 import Web3
from web3.types import TxParams

from aleph.sdk.conf import settings

if TYPE_CHECKING:
from aleph.sdk.chains.ethereum import LocalAccount


async def sign_and_send_transaction(
account: LocalAccount, tx_params: TxParams, rpc: str
) -> str:
"""
Sign and broadcast a transaction using the provided ETHAccount
@param tx_params - Transaction parameters
@param rpc - RPC URL
@returns - str - The transaction hash
"""
web3 = Web3(Web3.HTTPProvider(rpc))

def sign_and_send():
signed_txn = account.sign_transaction(tx_params)
transaction_hash = web3.eth.send_raw_transaction(signed_txn.rawTransaction)
return transaction_hash.hex()

# Sending a transaction is done over HTTP(S) and implemented using a blocking
# API in `web3.eth`. This runs it in a non-blocking asyncio executor.
loop = asyncio.get_running_loop()
transaction_hash = await loop.run_in_executor(None, sign_and_send)
return transaction_hash


async def execute_operation_with_account(
account: LocalAccount, operation: Operation
) -> str:
"""
Execute an operation using the provided ETHAccount
@param operation - Operation instance from the library
@returns - str - The transaction hash
@returns - str - The transaction hash
"""
populated_transaction = operation._get_populated_transaction_request(
operation.rpc, account.key
)
transaction_hash = await sign_and_send_transaction(
account, populated_transaction, operation.rpc
)
return transaction_hash


class Superfluid:
"""
Wrapper around the Superfluid APIs in order to CRUD Superfluid flows between two accounts.
"""

account: Optional[LocalAccount]

def __init__(
self,
rpc=settings.AVAX_RPC,
chain_id=settings.AVAX_CHAIN_ID,
account: Optional[LocalAccount] = None,
):
self.cfaV1Instance = CFA_V1(rpc, chain_id)
self.account = account

async def create_flow(self, sender: str, receiver: str, flow: Decimal) -> str:
"""Create a Superfluid flow between two addresses."""
if not self.account:
raise ValueError("An account is required to create a flow")
return await execute_operation_with_account(
account=self.account,
operation=self.cfaV1Instance.create_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
flow_rate=to_wei(Decimal(flow), "ether"),
),
)

async def get_flow(self, sender: str, receiver: str) -> Web3FlowInfo:
"""Fetch information about the Superfluid flow between two addresses."""
return self.cfaV1Instance.get_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
)

async def delete_flow(self, sender: str, receiver: str) -> str:
"""Delete the Supefluid flow between two addresses."""
if not self.account:
raise ValueError("An account is required to delete a flow")
return await execute_operation_with_account(
account=self.account,
operation=self.cfaV1Instance.delete_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
),
)

async def update_flow(self, sender: str, receiver: str, flow: Decimal) -> str:
"""Update the flow of a Superfluid flow between two addresses."""
if not self.account:
raise ValueError("An account is required to update a flow")
return await execute_operation_with_account(
account=self.account,
operation=self.cfaV1Instance.update_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
flow_rate=to_wei(Decimal(flow), "ether"),
),
)
Loading

0 comments on commit fcb3730

Please sign in to comment.