Skip to content

Commit

Permalink
Add DateTimeField to cache DB models; Add create_instance and adj…
Browse files Browse the repository at this point in the history
…ust `create_program` methods on `LightNode`
  • Loading branch information
MHHukiewitz committed Nov 27, 2023
1 parent 6f286fc commit b77a775
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 12 deletions.
57 changes: 57 additions & 0 deletions src/aleph/sdk/client/light_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ async def create_program(
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
persistent: bool = False,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
subscriptions: Optional[List[Mapping]] = None,
Expand All @@ -340,6 +343,9 @@ async def create_program(
vcpus=vcpus,
timeout_seconds=timeout_seconds,
persistent=persistent,
allow_amend=allow_amend,
internet=internet,
aleph_api=aleph_api,
encoding=encoding,
volumes=volumes,
subscriptions=subscriptions,
Expand All @@ -350,6 +356,57 @@ async def create_program(
asyncio.create_task(self.delete_if_rejected(resp.item_hash))
return resp, status

async def create_instance(
self,
rootfs: str,
rootfs_size: int,
rootfs_name: str,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
address: Optional[str] = None,
sync: bool = False,
memory: Optional[int] = None,
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
volume_persistence: str = "host",
ssh_keys: Optional[List[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
) -> Tuple[AlephMessage, MessageStatus]:
self.check_validity(
MessageType.instance, address, channel, dict(metadata) if metadata else None
)
resp, status = await self.session.create_instance(
rootfs=rootfs,
rootfs_size=rootfs_size,
rootfs_name=rootfs_name,
environment_variables=environment_variables,
storage_engine=storage_engine,
channel=channel,
address=address,
sync=sync,
memory=memory,
vcpus=vcpus,
timeout_seconds=timeout_seconds,
allow_amend=allow_amend,
internet=internet,
aleph_api=aleph_api,
encoding=encoding,
volumes=volumes,
volume_persistence=volume_persistence,
ssh_keys=ssh_keys,
metadata=metadata,
)
if status in [MessageStatus.PENDING, MessageStatus.PROCESSED]:
self.add(resp)
asyncio.create_task(self.delete_if_rejected(resp.item_hash))
return resp, status

async def forget(
self,
hashes: List[str],
Expand Down
10 changes: 6 additions & 4 deletions src/aleph/sdk/client/message_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import logging
import typing
from datetime import datetime
from pathlib import Path
from typing import (
AsyncIterable,
Expand Down Expand Up @@ -230,7 +230,7 @@ def _handle_amends(self, amend_messages: List[PostMessage]):
self.missing_posts[ItemHash(amend.content.ref)] = amend
continue

if datetime.fromtimestamp(amend.time) < original_post.last_updated:
if amend.time < original_post.last_updated:
continue

original_post.item_hash = amend.item_hash
Expand All @@ -239,7 +239,7 @@ def _handle_amends(self, amend_messages: List[PostMessage]):
original_post.original_type = amend.content.type
original_post.address = amend.sender
original_post.channel = amend.channel
original_post.last_updated = datetime.fromtimestamp(amend.time)
original_post.last_updated = amend.time
post_data.append(model_to_dict(original_post))
with self.db.atomic():
PostDBModel.insert_many(post_data).on_conflict_replace().execute()
Expand All @@ -254,7 +254,9 @@ def _handle_aggregates(self, aggregate_messages):
if not existing_aggregate:
aggregate_data.append(aggregate_to_model(aggregate))
continue
data = model_to_dict(existing_aggregate)
existing_aggregate.time = datetime.datetime.fromisoformat(
existing_aggregate.time
)
if aggregate.time > existing_aggregate.time:
existing_aggregate.content.update(aggregate.content.content)
existing_aggregate.time = aggregate.time
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/sdk/db/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Dict

from aleph_message.models import AggregateMessage
from peewee import CharField, FloatField, Model
from peewee import CharField, DateTimeField, Model
from playhouse.sqlite_ext import JSONField

from .common import pydantic_json_dumps
Expand All @@ -17,7 +17,7 @@ class AggregateDBModel(Model):
key = CharField()
channel = CharField(null=True)
content = JSONField(json_dumps=pydantic_json_dumps, null=True)
time = FloatField()
time = DateTimeField()


def aggregate_to_model(message: AggregateMessage) -> Dict:
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/sdk/db/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from aleph_message import parse_message
from aleph_message.models import AlephMessage, MessageConfirmation
from peewee import BooleanField, CharField, FloatField, IntegerField, Model
from peewee import BooleanField, CharField, DateTimeField, IntegerField, Model
from playhouse.shortcuts import model_to_dict
from playhouse.sqlite_ext import JSONField

Expand All @@ -26,7 +26,7 @@ class MessageDBModel(Model):
confirmed = BooleanField(null=True)
signature = CharField(null=True)
size = IntegerField(null=True)
time = FloatField()
time = DateTimeField()
item_type = CharField(7)
item_content = CharField(null=True)
hash_type = CharField(6, null=True)
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/sdk/db/post.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict, Iterable

from aleph_message.models import MessageConfirmation, PostMessage
from peewee import BooleanField, CharField, FloatField, IntegerField, Model
from peewee import BooleanField, CharField, DateTimeField, IntegerField, Model
from playhouse.shortcuts import model_to_dict
from playhouse.sqlite_ext import JSONField

Expand Down Expand Up @@ -29,7 +29,7 @@ class PostDBModel(Model):
confirmed = BooleanField()
signature = CharField()
size = IntegerField(null=True)
time = FloatField()
time = DateTimeField()
item_type = CharField(7)
item_content = CharField(null=True)
content = JSONField(json_dumps=pydantic_json_dumps)
Expand Down
3 changes: 2 additions & 1 deletion src/aleph/sdk/query/responses.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime
from typing import Any, Dict, List, Optional, Union

from aleph_message.models import (
Expand Down Expand Up @@ -35,7 +36,7 @@ class Post(BaseModel):
description="Cryptographic signature of the message by the sender"
)
size: int = Field(description="Size of the post")
time: float = Field(description="Timestamp of the post")
time: datetime = Field(description="Timestamp of the post")
confirmations: List[MessageConfirmation] = Field(
description="Number of confirmations"
)
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_val, exc_tb):
...

async def raise_for_status(self):
def raise_for_status(self):
...

async def close(self):
...

@property
Expand Down

0 comments on commit b77a775

Please sign in to comment.