Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Superfluid integrations #144

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ dependencies = [
"jwcrypto==1.5.6",
"python-magic",
"typing_extensions",
"aioresponses>=0.7.6"
"aioresponses>=0.7.6",
"superfluid~=0.2.1",
"eth_typing==4.3.1",

]

[project.optional-dependencies]
Expand Down
98 changes: 96 additions & 2 deletions src/aleph/sdk/chains/ethereum.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,78 @@
from decimal import Decimal
from pathlib import Path
from typing import Optional, Union
from typing import Awaitable, Dict, Optional, Set, Union

from aleph_message.models import Chain
from eth_account import Account
from eth_account.messages import encode_defunct
from eth_account.signers.local import LocalAccount
from eth_keys.exceptions import BadSignature as EthBadSignatureError
from superfluid import Web3FlowInfo

from ..conf import settings
from ..connectors.superfluid import Superfluid
from ..exceptions import BadSignatureError
from ..utils import bytes_from_hex
from .common import BaseAccount, get_fallback_private_key, get_public_key

CHAINS_WITH_SUPERTOKEN: Set[Chain] = {Chain.AVAX}
CHAIN_IDS: Dict[Chain, int] = {
Chain.AVAX: settings.AVAX_CHAIN_ID,
}


def get_rpc_for_chain(chain: Chain):
"""Returns the RPC to use for a given Ethereum based blockchain"""
if not chain:
return None

if chain == Chain.AVAX:
return settings.AVAX_RPC
else:
raise ValueError(f"Unknown RPC for chain {chain}")


def get_chain_id_for_chain(chain: Chain):
"""Returns the chain ID of a given Ethereum based blockchain"""
if not chain:
return None

if chain in CHAIN_IDS:
return CHAIN_IDS[chain]
else:
raise ValueError(f"Unknown RPC for chain {chain}")


class ETHAccount(BaseAccount):
"""Interact with an Ethereum address or key pair"""

CHAIN = "ETH"
CURVE = "secp256k1"
_account: LocalAccount
chain: Optional[Chain]
superfluid_connector: Optional[Superfluid]

def __init__(self, private_key: bytes):
def __init__(
self,
private_key: bytes,
chain: Optional[Chain] = None,
rpc: Optional[str] = None,
chain_id: Optional[int] = None,
):
self.private_key = private_key
self._account = Account.from_key(self.private_key)
self.chain = chain
rpc = rpc or get_rpc_for_chain(chain)
chain_id = chain_id or get_chain_id_for_chain(chain)
self.superfluid_connector = (
Superfluid(
rpc=rpc,
chain_id=chain_id,
account=self._account,
)
if chain in CHAINS_WITH_SUPERTOKEN
else None
)

async def sign_raw(self, buffer: bytes) -> bytes:
"""Sign a raw buffer."""
Expand All @@ -37,6 +91,46 @@ def from_mnemonic(mnemonic: str) -> "ETHAccount":
Account.enable_unaudited_hdwallet_features()
return ETHAccount(private_key=Account.from_mnemonic(mnemonic=mnemonic).key)

def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
"""Creat a Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to create a flow")
return self.superfluid_connector.create_flow(
sender=self.get_address(), receiver=receiver, flow=flow
)

def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]:
"""Get the Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to get a flow")
return self.superfluid_connector.get_flow(
sender=self.get_address(), receiver=receiver
)

def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
"""Update the Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to update a flow")
return self.superfluid_connector.update_flow(
sender=self.get_address(), receiver=receiver, flow=flow
)

def delete_flow(self, receiver: str) -> Awaitable[str]:
"""Delete the Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to delete a flow")
return self.superfluid_connector.delete_flow(
sender=self.get_address(), receiver=receiver
)

def update_superfluid_connector(self, rpc: str, chain_id: int):
"""Update the Superfluid connector after initialisation."""
self.superfluid_connector = Superfluid(
rpc=rpc,
chain_id=chain_id,
account=self._account,
)


def get_fallback_account(path: Optional[Path] = None) -> ETHAccount:
return ETHAccount(private_key=get_fallback_private_key(path=path))
Expand Down
4 changes: 4 additions & 0 deletions src/aleph/sdk/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class Settings(BaseSettings):

CODE_USES_SQUASHFS: bool = which("mksquashfs") is not None # True if command exists

AVAX_RPC: str = "https://api.avax.network/ext/bc/C/rpc"
AVAX_CHAIN_ID: int = 43114
AVAX_ALEPH_SUPER_TOKEN = "0xc0Fbc4967259786C743361a5885ef49380473dCF" # mainnet

# Dns resolver
DNS_IPFS_DOMAIN = "ipfs.public.aleph.sh"
DNS_PROGRAM_DOMAIN = "program.public.aleph.sh"
Expand Down
124 changes: 124 additions & 0 deletions src/aleph/sdk/connectors/superfluid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

import asyncio
from decimal import Decimal
from typing import TYPE_CHECKING, Optional

from eth_utils import to_normalized_address, to_wei
from superfluid import CFA_V1, Operation, Web3FlowInfo
from web3 import Web3
from web3.types import TxParams

from aleph.sdk.conf import settings

if TYPE_CHECKING:
from aleph.sdk.chains.ethereum import LocalAccount


async def sign_and_send_transaction(
account: LocalAccount, tx_params: TxParams, rpc: str
) -> str:
"""
Sign and broadcast a transaction using the provided ETHAccount

@param tx_params - Transaction parameters
@param rpc - RPC URL
@returns - str - The transaction hash
"""
web3 = Web3(Web3.HTTPProvider(rpc))

def sign_and_send():
signed_txn = account.sign_transaction(tx_params)
transaction_hash = web3.eth.send_raw_transaction(signed_txn.rawTransaction)
return transaction_hash.hex()

# Sending a transaction is done over HTTP(S) and implemented using a blocking
# API in `web3.eth`. This runs it in a non-blocking asyncio executor.
loop = asyncio.get_running_loop()
transaction_hash = await loop.run_in_executor(None, sign_and_send)
return transaction_hash


async def execute_operation_with_account(
account: LocalAccount, operation: Operation
) -> str:
"""
Execute an operation using the provided ETHAccount

@param operation - Operation instance from the library
@returns - str - The transaction hash
@returns - str - The transaction hash
"""
populated_transaction = operation._get_populated_transaction_request(
operation.rpc, account.key
)
transaction_hash = await sign_and_send_transaction(
account, populated_transaction, operation.rpc
)
return transaction_hash


class Superfluid:
"""
Wrapper around the Superfluid APIs in order to CRUD Superfluid flows between two accounts.
"""

account: Optional[LocalAccount]

def __init__(
self,
rpc=settings.AVAX_RPC,
chain_id=settings.AVAX_CHAIN_ID,
account: Optional[LocalAccount] = None,
):
self.cfaV1Instance = CFA_V1(rpc, chain_id)
self.account = account

async def create_flow(self, sender: str, receiver: str, flow: Decimal) -> str:
"""Create a Superfluid flow between two addresses."""
if not self.account:
raise ValueError("An account is required to create a flow")
return await execute_operation_with_account(
account=self.account,
operation=self.cfaV1Instance.create_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
flow_rate=to_wei(Decimal(flow), "ether"),
),
)

async def get_flow(self, sender: str, receiver: str) -> Web3FlowInfo:
"""Fetch information about the Superfluid flow between two addresses."""
return self.cfaV1Instance.get_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
)

async def delete_flow(self, sender: str, receiver: str) -> str:
"""Delete the Supefluid flow between two addresses."""
if not self.account:
raise ValueError("An account is required to delete a flow")
return await execute_operation_with_account(
account=self.account,
operation=self.cfaV1Instance.delete_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
),
)

async def update_flow(self, sender: str, receiver: str, flow: Decimal) -> str:
"""Update the flow of a Superfluid flow between two addresses."""
if not self.account:
raise ValueError("An account is required to update a flow")
return await execute_operation_with_account(
account=self.account,
operation=self.cfaV1Instance.update_flow(
sender=to_normalized_address(sender),
receiver=to_normalized_address(receiver),
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
flow_rate=to_wei(Decimal(flow), "ether"),
),
)
Loading
Loading