Skip to content

Commit

Permalink
multiple fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
odesenfans committed Sep 8, 2023
1 parent 8027f98 commit 9d1536d
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 177 deletions.
51 changes: 16 additions & 35 deletions examples/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from pathlib import Path
from typing import Optional
from typing import Optional, Literal

import click
from aleph_message.models import StoreMessage
Expand All @@ -27,7 +27,13 @@ async def print_output_hash(message: StoreMessage, status: MessageStatus):
)


async def do_upload(account, engine, channel, filename=None, file_hash=None):
async def do_upload(
account: Account,
engine: Literal["STORAGE", "IPFS"],
channel: Optional[str] = None,
filename: Optional[str] = None,
file_hash: Optional[str] = None,
):
async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
) as session:
Expand All @@ -43,7 +49,7 @@ async def do_upload(account, engine, channel, filename=None, file_hash=None):
message, status = await session.create_store(
file_content=content,
channel=channel,
storage_engine=engine.lower(),
storage_engine=StorageEnum(engine.lower()),
)
except IOError:
print("File not accessible")
Expand All @@ -53,42 +59,12 @@ async def do_upload(account, engine, channel, filename=None, file_hash=None):
message, status = await session.create_store(
file_hash=file_hash,
channel=channel,
storage_engine=engine.lower(),
storage_engine=StorageEnum(engine.lower()),
)

await print_output_hash(message, status)


async def do_upload_with_message(
account: Account,
engine: StorageEnum,
channel: str,
filename: Optional[str] = None,
item_hash: Optional[str] = None,
):
async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
) as session:
print(filename, account.get_address())
if filename:
try:
p = Path(filename)
content = p.read_bytes()
if len(content) > 1000 * MiB and engine == "STORAGE":
print("File too big for native STORAGE engine")
return
message = await session.create_store_with_message(
file_content=content,
channel=channel,
storage_engine=engine.lower(),
file_hash=item_hash,
)
return message
except Exception as e:
print("File not accessible")
raise


@click.command()
@click.argument(
"filename",
Expand All @@ -111,7 +87,12 @@ async def do_upload_with_message(
default="TEST",
help="Channel to write in (default: TEST)",
)
def main(filename, pkey=None, storage_engine="IPFS", channel="TEST"):
def main(
filename,
pkey=None,
storage_engine: Literal["IPFS", "STORAGE"] = "IPFS",
channel="TEST",
):
"""Uploads or store FILENAME.
If FILENAME is an IPFS multihash and IPFS is selected as an engine (default), don't try to upload, just pin it to the network.
Expand Down
181 changes: 65 additions & 116 deletions src/aleph/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,30 +1012,42 @@ async def storage_push_file(self, file_content) -> str:
resp.raise_for_status()
return (await resp.json()).get("hash")

async def storage_push_file_with_message(self, file_content, content) -> str:
async def _storage_push_file_with_message(
self,
file_content: bytes,
store_content: StoreContent,
channel: Optional[str] = None,
sync: bool = False,
) -> Tuple[StoreMessage, MessageStatus]:
"""Push a file to the storage service."""
data = aiohttp.FormData()
data.add_field("file", file_content)
message_dict = await self._prepare_aleph_message_dict(

# Prepare the STORE message
message = await self._prepare_aleph_message(
message_type=MessageType.store,
content=content,
channel="test",
storage_engine=StorageEnum.storage,
content=store_content.dict(exclude_none=True),
channel=channel,
)
metadata = {
"message": message_dict,
"message": message.dict(exclude_none=True),
"file_size": len(file_content),
"sync": True,
"sync": sync,
}
data.add_field(
"metadata", json.dumps(metadata), content_type="application/json"
)
# Add the file
data.add_field("file", file_content)

url = "/api/v0/storage/add_file"
logger.debug(f"Posting file on {url}")

async with self.http_session.post(url, data=data) as resp:
resp.raise_for_status()
return await resp.json()
message_status = (
MessageStatus.PENDING if resp.status == 202 else MessageStatus.PROCESSED
)
return message, message_status

@staticmethod
def _log_publication_status(publication_status: Mapping[str, Any]):
Expand Down Expand Up @@ -1236,6 +1248,37 @@ async def create_aggregate(
sync=sync,
)

async def _upload_file_native(
self,
address: str,
file_content: bytes,
guess_mime_type: bool = False,
ref: Optional[str] = None,
extra_fields: Optional[dict] = None,
channel: Optional[str] = None,
sync: bool = False,
) -> Tuple[StoreMessage, MessageStatus]:
file_hash = hashlib.sha256(file_content).hexdigest()
if magic and guess_mime_type:
mime_type = magic.from_buffer(file_content, mime=True)
else:
mime_type = None

store_content = StoreContent(
address=address,
ref=ref,
item_type=StorageEnum.storage,
item_hash=file_hash,
mime_type=mime_type,
**extra_fields,
)
return await self._storage_push_file_with_message(
file_content=file_content,
store_content=store_content,
channel=channel,
sync=sync,
)

async def create_store(
self,
address: Optional[str] = None,
Expand Down Expand Up @@ -1279,8 +1322,19 @@ async def create_store(
file_content = Path(file_path).read_bytes()

if storage_engine == StorageEnum.storage:
file_hash = await self.storage_push_file(file_content=file_content)
# Upload the file and message all at once using authenticated upload.
return await self._upload_file_native(
address=address,
file_content=file_content,
guess_mime_type=guess_mime_type,
ref=ref,
extra_fields=extra_fields,
channel=channel,
sync=sync,
)
elif storage_engine == StorageEnum.ipfs:
# We do not support authenticated upload for IPFS yet. Use the legacy method
# of uploading the file first then publishing the message using POST /messages.
file_hash = await self.ipfs_push_file(file_content=file_content)
else:
raise ValueError(f"Unknown storage engine: '{storage_engine}'")
Expand Down Expand Up @@ -1314,72 +1368,6 @@ async def create_store(
sync=sync,
)

async def create_store_with_message(
self,
address: Optional[str] = None,
file_content: Optional[bytes] = None,
file_path: Optional[Union[str, Path]] = None,
file_hash: Optional[str] = None,
guess_mime_type: bool = False,
ref: Optional[str] = None,
storage_engine: StorageEnum = StorageEnum.storage,
extra_fields: Optional[dict] = None,
channel: Optional[str] = None,
sync: bool = False,
) -> str:
"""
Create a STORE message to store a file on the Aleph network.
Can be passed either a file path, an IPFS hash or the file's content as raw bytes.
:param address: Address to display as the author of the message (Default: account.get_address())
:param file_content: Byte stream of the file to store (Default: None)
:param file_path: Path to the file to store (Default: None)
:param file_hash: Hash of the file to store (Default: None)
:param guess_mime_type: Guess the MIME type of the file (Default: False)
:param ref: Reference to a previous message (Default: None)
:param storage_engine: Storage engine to use (Default: "storage")
:param extra_fields: Extra fields to add to the STORE message (Default: None)
:param channel: Channel to post the message to (Default: "TEST")
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
address = address or settings.ADDRESS_TO_USE or self.account.get_address()

extra_fields = extra_fields or {}
if file_content is None:
if file_path is None:
raise ValueError(
"Please specify at least a file_content, a file_hash or a file_path"
)
else:
file_content = Path(file_path).read_bytes()

if storage_engine == StorageEnum.storage:
if ref:
extra_fields["ref"] = ref
content = {
"address": address,
"item_type": storage_engine,
"item_hash": file_hash,
"time": time.time(),
}
if extra_fields is not None:
content.update(extra_fields)
if magic is None:
pass
elif file_content and guess_mime_type and ("mime_type" not in extra_fields):
extra_fields["mime_type"] = magic.from_buffer(file_content, mime=True)
status = await self.storage_push_file_with_message(
file_content=file_content, content=content
)
return status
elif storage_engine == StorageEnum.ipfs:
raise ValueError(
f"Storage not compatible : {storage_engine} on upload with message"
)
else:
raise ValueError(f"Unknown storage engine: '{storage_engine}'")

async def create_program(
self,
program_ref: str,
Expand Down Expand Up @@ -1540,6 +1528,7 @@ async def _prepare_aleph_message(
allow_inlining: bool = True,
storage_engine: StorageEnum = StorageEnum.storage,
) -> AlephMessage:

message_dict: Dict[str, Any] = {
"sender": self.account.get_address(),
"chain": self.account.CHAIN,
Expand Down Expand Up @@ -1571,45 +1560,6 @@ async def _prepare_aleph_message(
message_dict = await self.account.sign_message(message_dict)
return parse_message(message_dict)

async def _prepare_aleph_message_dict(
self,
message_type: MessageType,
content: Dict[str, Any],
channel: Optional[str],
allow_inlining: bool = True,
storage_engine: StorageEnum = StorageEnum.storage,
) -> dict:
message_dict: Dict[str, Any] = {
"sender": self.account.get_address(),
"chain": self.account.CHAIN,
"type": "STORE",
"content": content,
"time": time.time(),
"channel": channel,
}

item_content: str = json.dumps(content, separators=(",", ":"))

if allow_inlining and (len(item_content) < settings.MAX_INLINE_SIZE):
message_dict["item_content"] = item_content
message_dict["item_hash"] = self.compute_sha256(item_content)
message_dict["item_type"] = ItemType.inline
else:
if storage_engine == StorageEnum.ipfs:
message_dict["item_hash"] = await self.ipfs_push(
content=content,
)
message_dict["item_type"] = ItemType.ipfs
else: # storage
assert storage_engine == StorageEnum.storage
message_dict["item_hash"] = await self.storage_push(
content=content,
)
message_dict["item_type"] = ItemType.storage

message_dict = await self.account.sign_message(message_dict)
return message_dict

async def submit(
self,
content: Dict[str, Any],
Expand All @@ -1626,6 +1576,5 @@ async def submit(
allow_inlining=allow_inlining,
storage_engine=storage_engine,
)
message
message_status = await self._broadcast(message=message, sync=sync)
return message, message_status
48 changes: 22 additions & 26 deletions tests/unit/test_upload.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,34 @@
import hashlib
import json
from pathlib import Path
from tempfile import NamedTemporaryFile

import pytest
from aleph_message.status import MessageStatus

from aleph.sdk import AlephClient
from aleph.sdk import AuthenticatedAlephClient
from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.ethereum import ETHAccount
from examples.store import do_upload_with_message
from aleph.sdk.types import StorageEnum


@pytest.mark.asyncio
async def test_upload_with_message():
pkey = get_fallback_private_key()
account = ETHAccount(private_key=pkey)

content = "Test Py Aleph upload\n"
content_bytes = content.encode("utf-8")

with NamedTemporaryFile(mode="w", delete=False) as temp_file:
temp_file.write(content)

file_name = Path(temp_file.name)
actual_item_hash = hashlib.sha256(
content.encode()
).hexdigest() # Calculate the hash of the content

test = await do_upload_with_message(
account=account,
engine="STORAGE",
channel="Test",
filename=file_name,
item_hash=actual_item_hash,
)
async with AlephClient(api_server="http://0.0.0.0:4024") as client:
file_content = await client.download_file(test["hash"])
assert file_content == content_bytes
content = b"Test pyaleph upload\n"
file_hash = hashlib.sha256(content).hexdigest()

async with AuthenticatedAlephClient(
account=account, api_server="http://0.0.0.0:8000"
) as client:
message, status = await client.create_store(
address=account.get_address(),
file_content=content,
storage_engine=StorageEnum.storage,
sync=True,
)

assert status == MessageStatus.PROCESSED
assert message.content.item_hash == file_hash

server_content = await client.download_file(file_hash=file_hash)
assert server_content == content

0 comments on commit 9d1536d

Please sign in to comment.