diff --git a/examples/store.py b/examples/store.py index 5b09fd0f..9b5b645b 100644 --- a/examples/store.py +++ b/examples/store.py @@ -1,6 +1,6 @@ import asyncio from pathlib import Path -from typing import Optional +from typing import Optional, Literal import click from aleph_message.models import StoreMessage @@ -27,7 +27,13 @@ async def print_output_hash(message: StoreMessage, status: MessageStatus): ) -async def do_upload(account, engine, channel, filename=None, file_hash=None): +async def do_upload( + account: Account, + engine: Literal["STORAGE", "IPFS"], + channel: Optional[str] = None, + filename: Optional[str] = None, + file_hash: Optional[str] = None, +): async with AuthenticatedAlephClient( account=account, api_server=settings.API_HOST ) as session: @@ -43,7 +49,7 @@ async def do_upload(account, engine, channel, filename=None, file_hash=None): message, status = await session.create_store( file_content=content, channel=channel, - storage_engine=engine.lower(), + storage_engine=StorageEnum(engine.lower()), ) except IOError: print("File not accessible") @@ -53,42 +59,12 @@ async def do_upload(account, engine, channel, filename=None, file_hash=None): message, status = await session.create_store( file_hash=file_hash, channel=channel, - storage_engine=engine.lower(), + storage_engine=StorageEnum(engine.lower()), ) await print_output_hash(message, status) -async def do_upload_with_message( - account: Account, - engine: StorageEnum, - channel: str, - filename: Optional[str] = None, - item_hash: Optional[str] = None, -): - async with AuthenticatedAlephClient( - account=account, api_server=settings.API_HOST - ) as session: - print(filename, account.get_address()) - if filename: - try: - p = Path(filename) - content = p.read_bytes() - if len(content) > 1000 * MiB and engine == "STORAGE": - print("File too big for native STORAGE engine") - return - message = await session.create_store_with_message( - file_content=content, - channel=channel, - storage_engine=engine.lower(), - file_hash=item_hash, - ) - return message - except Exception as e: - print("File not accessible") - raise - - @click.command() @click.argument( "filename", @@ -111,7 +87,12 @@ async def do_upload_with_message( default="TEST", help="Channel to write in (default: TEST)", ) -def main(filename, pkey=None, storage_engine="IPFS", channel="TEST"): +def main( + filename, + pkey=None, + storage_engine: Literal["IPFS", "STORAGE"] = "IPFS", + channel="TEST", +): """Uploads or store FILENAME. If FILENAME is an IPFS multihash and IPFS is selected as an engine (default), don't try to upload, just pin it to the network. diff --git a/src/aleph/sdk/client.py b/src/aleph/sdk/client.py index c549c013..2de73168 100644 --- a/src/aleph/sdk/client.py +++ b/src/aleph/sdk/client.py @@ -1012,30 +1012,42 @@ async def storage_push_file(self, file_content) -> str: resp.raise_for_status() return (await resp.json()).get("hash") - async def storage_push_file_with_message(self, file_content, content) -> str: + async def _storage_push_file_with_message( + self, + file_content: bytes, + store_content: StoreContent, + channel: Optional[str] = None, + sync: bool = False, + ) -> Tuple[StoreMessage, MessageStatus]: """Push a file to the storage service.""" data = aiohttp.FormData() - data.add_field("file", file_content) - message_dict = await self._prepare_aleph_message_dict( + + # Prepare the STORE message + message = await self._prepare_aleph_message( message_type=MessageType.store, - content=content, - channel="test", - storage_engine=StorageEnum.storage, + content=store_content.dict(exclude_none=True), + channel=channel, ) metadata = { - "message": message_dict, + "message": message.dict(exclude_none=True), "file_size": len(file_content), - "sync": True, + "sync": sync, } data.add_field( "metadata", json.dumps(metadata), content_type="application/json" ) + # Add the file + data.add_field("file", file_content) + url = "/api/v0/storage/add_file" logger.debug(f"Posting file on {url}") async with self.http_session.post(url, data=data) as resp: resp.raise_for_status() - return await resp.json() + message_status = ( + MessageStatus.PENDING if resp.status == 202 else MessageStatus.PROCESSED + ) + return message, message_status @staticmethod def _log_publication_status(publication_status: Mapping[str, Any]): @@ -1236,6 +1248,37 @@ async def create_aggregate( sync=sync, ) + async def _upload_file_native( + self, + address: str, + file_content: bytes, + guess_mime_type: bool = False, + ref: Optional[str] = None, + extra_fields: Optional[dict] = None, + channel: Optional[str] = None, + sync: bool = False, + ) -> Tuple[StoreMessage, MessageStatus]: + file_hash = hashlib.sha256(file_content).hexdigest() + if magic and guess_mime_type: + mime_type = magic.from_buffer(file_content, mime=True) + else: + mime_type = None + + store_content = StoreContent( + address=address, + ref=ref, + item_type=StorageEnum.storage, + item_hash=file_hash, + mime_type=mime_type, + **extra_fields, + ) + return await self._storage_push_file_with_message( + file_content=file_content, + store_content=store_content, + channel=channel, + sync=sync, + ) + async def create_store( self, address: Optional[str] = None, @@ -1279,8 +1322,19 @@ async def create_store( file_content = Path(file_path).read_bytes() if storage_engine == StorageEnum.storage: - file_hash = await self.storage_push_file(file_content=file_content) + # Upload the file and message all at once using authenticated upload. + return await self._upload_file_native( + address=address, + file_content=file_content, + guess_mime_type=guess_mime_type, + ref=ref, + extra_fields=extra_fields, + channel=channel, + sync=sync, + ) elif storage_engine == StorageEnum.ipfs: + # We do not support authenticated upload for IPFS yet. Use the legacy method + # of uploading the file first then publishing the message using POST /messages. file_hash = await self.ipfs_push_file(file_content=file_content) else: raise ValueError(f"Unknown storage engine: '{storage_engine}'") @@ -1314,72 +1368,6 @@ async def create_store( sync=sync, ) - async def create_store_with_message( - self, - address: Optional[str] = None, - file_content: Optional[bytes] = None, - file_path: Optional[Union[str, Path]] = None, - file_hash: Optional[str] = None, - guess_mime_type: bool = False, - ref: Optional[str] = None, - storage_engine: StorageEnum = StorageEnum.storage, - extra_fields: Optional[dict] = None, - channel: Optional[str] = None, - sync: bool = False, - ) -> str: - """ - Create a STORE message to store a file on the Aleph network. - - Can be passed either a file path, an IPFS hash or the file's content as raw bytes. - - :param address: Address to display as the author of the message (Default: account.get_address()) - :param file_content: Byte stream of the file to store (Default: None) - :param file_path: Path to the file to store (Default: None) - :param file_hash: Hash of the file to store (Default: None) - :param guess_mime_type: Guess the MIME type of the file (Default: False) - :param ref: Reference to a previous message (Default: None) - :param storage_engine: Storage engine to use (Default: "storage") - :param extra_fields: Extra fields to add to the STORE message (Default: None) - :param channel: Channel to post the message to (Default: "TEST") - :param sync: If true, waits for the message to be processed by the API server (Default: False) - """ - address = address or settings.ADDRESS_TO_USE or self.account.get_address() - - extra_fields = extra_fields or {} - if file_content is None: - if file_path is None: - raise ValueError( - "Please specify at least a file_content, a file_hash or a file_path" - ) - else: - file_content = Path(file_path).read_bytes() - - if storage_engine == StorageEnum.storage: - if ref: - extra_fields["ref"] = ref - content = { - "address": address, - "item_type": storage_engine, - "item_hash": file_hash, - "time": time.time(), - } - if extra_fields is not None: - content.update(extra_fields) - if magic is None: - pass - elif file_content and guess_mime_type and ("mime_type" not in extra_fields): - extra_fields["mime_type"] = magic.from_buffer(file_content, mime=True) - status = await self.storage_push_file_with_message( - file_content=file_content, content=content - ) - return status - elif storage_engine == StorageEnum.ipfs: - raise ValueError( - f"Storage not compatible : {storage_engine} on upload with message" - ) - else: - raise ValueError(f"Unknown storage engine: '{storage_engine}'") - async def create_program( self, program_ref: str, @@ -1540,6 +1528,7 @@ async def _prepare_aleph_message( allow_inlining: bool = True, storage_engine: StorageEnum = StorageEnum.storage, ) -> AlephMessage: + message_dict: Dict[str, Any] = { "sender": self.account.get_address(), "chain": self.account.CHAIN, @@ -1571,45 +1560,6 @@ async def _prepare_aleph_message( message_dict = await self.account.sign_message(message_dict) return parse_message(message_dict) - async def _prepare_aleph_message_dict( - self, - message_type: MessageType, - content: Dict[str, Any], - channel: Optional[str], - allow_inlining: bool = True, - storage_engine: StorageEnum = StorageEnum.storage, - ) -> dict: - message_dict: Dict[str, Any] = { - "sender": self.account.get_address(), - "chain": self.account.CHAIN, - "type": "STORE", - "content": content, - "time": time.time(), - "channel": channel, - } - - item_content: str = json.dumps(content, separators=(",", ":")) - - if allow_inlining and (len(item_content) < settings.MAX_INLINE_SIZE): - message_dict["item_content"] = item_content - message_dict["item_hash"] = self.compute_sha256(item_content) - message_dict["item_type"] = ItemType.inline - else: - if storage_engine == StorageEnum.ipfs: - message_dict["item_hash"] = await self.ipfs_push( - content=content, - ) - message_dict["item_type"] = ItemType.ipfs - else: # storage - assert storage_engine == StorageEnum.storage - message_dict["item_hash"] = await self.storage_push( - content=content, - ) - message_dict["item_type"] = ItemType.storage - - message_dict = await self.account.sign_message(message_dict) - return message_dict - async def submit( self, content: Dict[str, Any], @@ -1626,6 +1576,5 @@ async def submit( allow_inlining=allow_inlining, storage_engine=storage_engine, ) - message message_status = await self._broadcast(message=message, sync=sync) return message, message_status diff --git a/tests/unit/test_upload.py b/tests/unit/test_upload.py index 35a50542..a284675b 100644 --- a/tests/unit/test_upload.py +++ b/tests/unit/test_upload.py @@ -1,13 +1,12 @@ import hashlib -import json -from pathlib import Path -from tempfile import NamedTemporaryFile + import pytest +from aleph_message.status import MessageStatus -from aleph.sdk import AlephClient +from aleph.sdk import AuthenticatedAlephClient from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount -from examples.store import do_upload_with_message +from aleph.sdk.types import StorageEnum @pytest.mark.asyncio @@ -15,24 +14,21 @@ async def test_upload_with_message(): pkey = get_fallback_private_key() account = ETHAccount(private_key=pkey) - content = "Test Py Aleph upload\n" - content_bytes = content.encode("utf-8") - - with NamedTemporaryFile(mode="w", delete=False) as temp_file: - temp_file.write(content) - - file_name = Path(temp_file.name) - actual_item_hash = hashlib.sha256( - content.encode() - ).hexdigest() # Calculate the hash of the content - - test = await do_upload_with_message( - account=account, - engine="STORAGE", - channel="Test", - filename=file_name, - item_hash=actual_item_hash, - ) - async with AlephClient(api_server="http://0.0.0.0:4024") as client: - file_content = await client.download_file(test["hash"]) - assert file_content == content_bytes + content = b"Test pyaleph upload\n" + file_hash = hashlib.sha256(content).hexdigest() + + async with AuthenticatedAlephClient( + account=account, api_server="http://0.0.0.0:8000" + ) as client: + message, status = await client.create_store( + address=account.get_address(), + file_content=content, + storage_engine=StorageEnum.storage, + sync=True, + ) + + assert status == MessageStatus.PROCESSED + assert message.content.item_hash == file_hash + + server_content = await client.download_file(file_hash=file_hash) + assert server_content == content