Skip to content

Commit

Permalink
Fix: Restructure and remove the sync wrapper
Browse files Browse the repository at this point in the history
This reorganizes the code and removes the wrapper to call async code synchronously since it added a lot of hacky code.
  • Loading branch information
hoh committed Oct 10, 2023
1 parent 3d7504d commit 2e9878f
Show file tree
Hide file tree
Showing 25 changed files with 320 additions and 639 deletions.
4 changes: 2 additions & 2 deletions examples/httpgateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AuthenticatedAlephClient
from aleph.sdk.client import AuthenticatedAlephHttpClient

app = web.Application()
routes = web.RouteTableDef()
Expand All @@ -32,7 +32,7 @@ async def source_post(request):
return web.json_response(
{"status": "error", "message": "unauthorized secret"}
)
async with AuthenticatedAlephClient(
async with AuthenticatedAlephHttpClient(
account=app["account"], api_server="https://api2.aleph.im"
) as session:
message, _status = await session.create_post(
Expand Down
6 changes: 3 additions & 3 deletions examples/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from aleph_message.status import MessageStatus

from aleph.sdk.chains.ethereum import get_fallback_account
from aleph.sdk.client import AuthenticatedAlephClient, AuthenticatedUserSessionSync
from aleph.sdk.client import AuthenticatedAlephClientSync, AuthenticatedAlephHttpClient
from aleph.sdk.conf import settings


Expand Down Expand Up @@ -54,7 +54,7 @@ def get_cpu_cores():


def send_metrics(
session: AuthenticatedUserSessionSync, metrics
session: AuthenticatedAlephClientSync, metrics
) -> Tuple[AlephMessage, MessageStatus]:
return session.create_aggregate(key="metrics", content=metrics, channel="SYSINFO")

Expand All @@ -70,7 +70,7 @@ def collect_metrics():

def main():
account = get_fallback_account()
with AuthenticatedAlephClient(
with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
) as session:
while True:
Expand Down
6 changes: 3 additions & 3 deletions examples/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AuthenticatedAlephClient
from aleph.sdk.client import AuthenticatedAlephHttpClient
from aleph.sdk.conf import settings


Expand All @@ -27,7 +27,7 @@ def get_input_data(value):


def send_metrics(account, metrics):
with AuthenticatedAlephClient(
with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
) as session:
return session.create_aggregate(
Expand Down Expand Up @@ -100,7 +100,7 @@ async def gateway(
if not userdata["received"]:
await client.reconnect()

async with AuthenticatedAlephClient(
async with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
) as session:
for key, value in state.items():
Expand Down
4 changes: 2 additions & 2 deletions examples/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AuthenticatedAlephClient
from aleph.sdk.client import AuthenticatedAlephHttpClient
from aleph.sdk.conf import settings

DEFAULT_SERVER = "https://api2.aleph.im"
Expand All @@ -23,7 +23,7 @@ async def print_output_hash(message: StoreMessage, status: MessageStatus):


async def do_upload(account, engine, channel, filename=None, file_hash=None):
async with AuthenticatedAlephClient(
async with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
) as session:
print(filename, account.get_address())
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pkg_resources import DistributionNotFound, get_distribution

from aleph.sdk.client import AlephClient, AuthenticatedAlephClient
from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient

try:
# Change here if project is renamed and does not equal the package name
Expand All @@ -11,4 +11,4 @@
finally:
del get_distribution, DistributionNotFound

__all__ = ["AlephClient", "AuthenticatedAlephClient"]
__all__ = ["AlephHttpClient", "AuthenticatedAlephHttpClient"]
12 changes: 5 additions & 7 deletions src/aleph/sdk/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from .authenticated import AuthenticatedAlephClient, AuthenticatedUserSessionSync
from .base import BaseAlephClient, BaseAuthenticatedAlephClient
from .client import AlephClient, UserSessionSync
from .abstract import AlephClient, AuthenticatedAlephClient
from .authenticated_http import AuthenticatedAlephHttpClient
from .http import AlephHttpClient

__all__ = [
"BaseAlephClient",
"BaseAuthenticatedAlephClient",
"AlephClient",
"AuthenticatedAlephClient",
"UserSessionSync",
"AuthenticatedUserSessionSync",
"AlephHttpClient",
"AuthenticatedAlephHttpClient",
]
63 changes: 59 additions & 4 deletions src/aleph/sdk/client/base.py → src/aleph/sdk/client/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
from aleph_message.models.execution.program import Encoding
from aleph_message.status import MessageStatus

from ..models.message import MessageFilter
from ..models.post import PostFilter, PostsResponse
from ..query.filters import MessageFilter, PostFilter
from ..query.responses import PostsResponse
from ..types import GenericMessage, StorageEnum
from ..utils import Writable

DEFAULT_PAGE_SIZE = 200


class BaseAlephClient(ABC):
class AlephClient(ABC):
@abstractmethod
async def fetch_aggregate(self, address: str, key: str) -> Dict[str, Dict]:
"""
Expand Down Expand Up @@ -110,6 +111,44 @@ async def download_file(
"""
pass

async def download_file_ipfs(
self,
file_hash: str,
) -> bytes:
"""
Get a file from the ipfs storage engine as raw bytes.
Warning: Downloading large files can be slow.
:param file_hash: The hash of the file to retrieve.
"""
raise NotImplementedError()

async def download_file_ipfs_to_buffer(
self,
file_hash: str,
output_buffer: Writable[bytes],
) -> None:
"""
Download a file from the storage engine and write it to the specified output buffer.
:param file_hash: The hash of the file to retrieve.
:param output_buffer: The binary output buffer to write the file data to.
"""
raise NotImplementedError()

async def download_file_to_buffer(
self,
file_hash: str,
output_buffer: Writable[bytes],
) -> None:
"""
Download a file from the storage engine and write it to the specified output buffer.
:param file_hash: The hash of the file to retrieve.
:param output_buffer: Writable binary buffer. The file will be written to this buffer.
"""
raise NotImplementedError()

@abstractmethod
async def get_messages(
self,
Expand Down Expand Up @@ -180,7 +219,7 @@ def watch_messages(
pass


class BaseAuthenticatedAlephClient(BaseAlephClient):
class AuthenticatedAlephClient(AlephClient):
@abstractmethod
async def create_post(
self,
Expand Down Expand Up @@ -350,3 +389,19 @@ async def submit(
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
pass

async def ipfs_push(self, content: Mapping) -> str:
"""
Push a file to IPFS.
:param content: Content of the file to push
"""
raise NotImplementedError()

async def storage_push(self, content: Mapping) -> str:
"""
Push arbitrary content as JSON to the storage service.
:param content: The dict-like content to upload
"""
raise NotImplementedError()
Loading

0 comments on commit 2e9878f

Please sign in to comment.