Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageCache and LightNode #59

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*.pot
__pycache__/*
.cache/*
cache/**/*
.*.swp
*/.ipynb_checkpoints/*

Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ install_requires =
eciespy>=0.3.13; python_version>="3.11"
typing_extensions
typer
aleph-message==0.4.1
aleph-message~=0.4.3
eth_account>=0.4.0
# Required to fix a dependency issue with parsimonious and Python3.11
eth_abi==4.0.0b2; python_version>="3.11"
python-magic
peewee
# The usage of test_requires is discouraged, see `Dependency Management` docs
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
Expand Down
23 changes: 21 additions & 2 deletions src/aleph/sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pkg_resources import DistributionNotFound, get_distribution

from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient
from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient, LightNode
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current discussion with @hoh : we might want to use another name for this class


try:
# Change here if project is renamed and does not equal the package name
Expand All @@ -11,4 +11,23 @@
finally:
del get_distribution, DistributionNotFound

__all__ = ["AlephHttpClient", "AuthenticatedAlephHttpClient"]
__all__ = ["AlephHttpClient", "AuthenticatedAlephHttpClient", "LightNode"]


def __getattr__(name):
if name == "AlephClient":
raise ImportError(
"AlephClient has been turned into an abstract class. Please use `AlephHttpClient` instead."
)
elif name == "AuthenticatedAlephClient":
raise ImportError(
"AuthenticatedAlephClient has been turned into an abstract class. Please use `AuthenticatedAlephHttpClient` instead."
)
elif name == "synchronous":
raise ImportError(
"The 'aleph.sdk.synchronous' type is deprecated and has been removed from the aleph SDK. Please use `aleph.sdk.client.AlephHttpClient` instead."
)
elif name == "asynchronous":
raise ImportError(
"The 'aleph.sdk.asynchronous' type is deprecated and has been removed from the aleph SDK. Please use `aleph.sdk.client.AlephHttpClient` instead."
)
5 changes: 5 additions & 0 deletions src/aleph/sdk/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def get_address(self) -> str:
def get_public_key(self) -> str:
return "0x" + get_public_key(private_key=self._account.key).hex()

@staticmethod
def from_mnemonic(mnemonic: str) -> "ETHAccount":
Account.enable_unaudited_hdwallet_features()
return ETHAccount(private_key=Account.from_mnemonic(mnemonic=mnemonic).key)


def get_fallback_account(path: Optional[Path] = None) -> ETHAccount:
return ETHAccount(private_key=get_fallback_private_key(path=path))
Expand Down
4 changes: 4 additions & 0 deletions src/aleph/sdk/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from .abstract import AlephClient, AuthenticatedAlephClient
from .authenticated_http import AuthenticatedAlephHttpClient
from .http import AlephHttpClient
from .light_node import LightNode
from .message_cache import MessageCache

__all__ = [
"AlephClient",
"AuthenticatedAlephClient",
"AlephHttpClient",
"AuthenticatedAlephHttpClient",
"MessageCache",
"LightNode",
]
46 changes: 31 additions & 15 deletions src/aleph/sdk/client/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
AlephMessage,
MessagesResponse,
MessageType,
Payment,
PostMessage,
)
from aleph_message.models.execution.program import Encoding
Expand All @@ -42,7 +43,7 @@ async def fetch_aggregate(self, address: str, key: str) -> Dict[str, Dict]:
:param address: Address of the owner of the aggregate
:param key: Key of the aggregate
"""
pass
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

@abstractmethod
async def fetch_aggregates(
Expand All @@ -54,7 +55,7 @@ async def fetch_aggregates(
:param address: Address of the owner of the aggregate
:param keys: Keys of the aggregates to fetch (Default: all items)
"""
pass
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

@abstractmethod
async def get_posts(
Expand All @@ -74,7 +75,7 @@ async def get_posts(
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
"""
pass
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

async def get_posts_iterator(
self,
Expand Down Expand Up @@ -109,7 +110,7 @@ async def download_file(

:param file_hash: The hash of the file to retrieve.
"""
pass
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

async def download_file_ipfs(
self,
Expand Down Expand Up @@ -167,7 +168,7 @@ async def get_messages(
:param ignore_invalid_messages: Ignore invalid messages (Default: True)
:param invalid_messages_log_level: Log level to use for invalid messages (Default: logging.NOTSET)
"""
pass
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

async def get_messages_iterator(
self,
Expand Down Expand Up @@ -202,7 +203,7 @@ async def get_message(
:param item_hash: Hash of the message to fetch
:param message_type: Type of message to fetch
"""
pass
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

@abstractmethod
def watch_messages(
Expand All @@ -214,7 +215,7 @@ def watch_messages(

:param message_filter: Filter to apply to the messages
"""
pass
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")


class AuthenticatedAlephClient(AlephClient):
Expand Down Expand Up @@ -242,7 +243,9 @@ async def create_post(
:param storage_engine: An optional storage engine to use for the message, if not inlined (Default: "storage")
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
pass
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)

@abstractmethod
async def create_aggregate(
Expand All @@ -264,7 +267,9 @@ async def create_aggregate(
:param inline: Whether to write content inside the message (Default: True)
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
pass
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)

@abstractmethod
async def create_store(
Expand Down Expand Up @@ -296,7 +301,9 @@ async def create_store(
:param channel: Channel to post the message to (Default: "TEST")
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
pass
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)

@abstractmethod
async def create_program(
Expand Down Expand Up @@ -344,14 +351,17 @@ async def create_program(
:param subscriptions: Patterns of aleph.im messages to forward to the program's event receiver
:param metadata: Metadata to attach to the message
"""
pass
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)

@abstractmethod
async def create_instance(
self,
rootfs: str,
rootfs_size: int,
rootfs_name: str,
payment: Optional[Payment] = None,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
Expand All @@ -363,7 +373,6 @@ async def create_instance(
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
volume_persistence: str = "host",
ssh_keys: Optional[List[str]] = None,
Expand All @@ -375,6 +384,7 @@ async def create_instance(
:param rootfs: Root filesystem to use
:param rootfs_size: Size of root filesystem
:param rootfs_name: Name of root filesystem
:param payment: Payment method used to pay for the instance
:param environment_variables: Environment variables to pass to the program
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
Expand All @@ -392,7 +402,9 @@ async def create_instance(
:param ssh_keys: SSH keys to authorize access to the VM
:param metadata: Metadata to attach to the message
"""
pass
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)

@abstractmethod
async def forget(
Expand All @@ -417,7 +429,9 @@ async def forget(
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
pass
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)

@abstractmethod
async def submit(
Expand All @@ -442,7 +456,9 @@ async def submit(
:param sync: If true, waits for the message to be processed by the API server (Default: False)
:param raise_on_rejected: Whether to raise an exception if the message is rejected (Default: True)
"""
pass
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
)

async def ipfs_push(self, content: Mapping) -> str:
"""
Expand Down
11 changes: 9 additions & 2 deletions src/aleph/sdk/client/authenticated_http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib
import json
import logging
import ssl
import time
from pathlib import Path
from typing import Any, Dict, List, Mapping, NoReturn, Optional, Tuple, Union
Expand All @@ -11,6 +12,7 @@
AggregateContent,
AggregateMessage,
AlephMessage,
Chain,
ForgetContent,
ForgetMessage,
InstanceContent,
Expand All @@ -24,7 +26,7 @@
StoreContent,
StoreMessage,
)
from aleph_message.models.execution.base import Encoding
from aleph_message.models.execution.base import Encoding, Payment, PaymentType
from aleph_message.models.execution.environment import (
FunctionEnvironment,
MachineResources,
Expand Down Expand Up @@ -72,12 +74,14 @@ def __init__(
api_unix_socket: Optional[str] = None,
allow_unix_sockets: bool = True,
timeout: Optional[aiohttp.ClientTimeout] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
super().__init__(
api_server=api_server,
api_unix_socket=api_unix_socket,
allow_unix_sockets=allow_unix_sockets,
timeout=timeout,
ssl_context=ssl_context,
)
self.account = account

Expand Down Expand Up @@ -504,6 +508,7 @@ async def create_instance(
rootfs: str,
rootfs_size: int,
rootfs_name: str,
payment: Optional[Payment] = None,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
Expand All @@ -515,7 +520,6 @@ async def create_instance(
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
volume_persistence: str = "host",
ssh_keys: Optional[List[str]] = None,
Expand All @@ -528,6 +532,8 @@ async def create_instance(
vcpus = vcpus or settings.DEFAULT_VM_VCPUS
timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT

payment = payment or Payment(chain=Chain.ETH, type=PaymentType.hold)

content = InstanceContent(
address=address,
allow_amend=allow_amend,
Expand Down Expand Up @@ -561,6 +567,7 @@ async def create_instance(
time=time.time(),
authorized_keys=ssh_keys,
metadata=metadata,
payment=payment,
)
message, status, response = await self.submit(
content=content.dict(exclude_none=True),
Expand Down
17 changes: 15 additions & 2 deletions src/aleph/sdk/client/http.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import json
import logging
import ssl
from io import BytesIO
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Type
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Type, Union

import aiohttp
from aleph_message import parse_message
from aleph_message.models import AlephMessage, ItemHash, ItemType
from aleph_message.status import MessageStatus
from pydantic import ValidationError

from ..conf import settings
Expand Down Expand Up @@ -35,6 +37,7 @@ def __init__(
api_unix_socket: Optional[str] = None,
allow_unix_sockets: bool = True,
timeout: Optional[aiohttp.ClientTimeout] = None,
ssl_context: Optional[ssl.SSLContext] = None,
):
"""AlephClient can use HTTP(S) or HTTP over Unix sockets.
Unix sockets are used when running inside a virtual machine,
Expand All @@ -44,8 +47,11 @@ def __init__(
if not self.api_server:
raise ValueError("Missing API host")

connector: Union[aiohttp.BaseConnector, None]
unix_socket_path = api_unix_socket or settings.API_UNIX_SOCKET
if unix_socket_path and allow_unix_sockets:
if ssl_context:
connector = aiohttp.TCPConnector(ssl=ssl_context)
elif unix_socket_path and allow_unix_sockets:
check_unix_socket_valid(unix_socket_path)
connector = aiohttp.UnixConnector(path=unix_socket_path)
else:
Expand Down Expand Up @@ -178,6 +184,8 @@ async def download_file_to_buffer(
)
else:
raise FileTooLarge(f"The file from {file_hash} is too large")
else:
response.raise_for_status()

async def download_file_ipfs_to_buffer(
self,
Expand Down Expand Up @@ -343,6 +351,11 @@ async def get_message_error(
"details": message_raw["details"],
}

async def get_message_status(self, item_hash: str) -> MessageStatus:
async with self.http_session.get(f"/api/v0/messages/{item_hash}") as resp:
resp.raise_for_status()
return MessageStatus((await resp.json())["status"])

async def watch_messages(
self,
message_filter: Optional[MessageFilter] = None,
Expand Down
Loading
Loading