Skip to content

Commit

Permalink
🚧 fix(wip): obviously
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Mar 15, 2024
1 parent 717f39c commit d36ba44
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 88 deletions.
6 changes: 3 additions & 3 deletions examples/0_embedded_cluster.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"\u001b[93m 🧠 ⣻: Server is ready\n",
"\u001b[0m\n",
"\u001b[92m🌊 SUCCESS: milvus server started\u001b[0m\n",
"\u001b[96m☕️ WAIT: creating prism with embedded cluster\u001b[0m\n",
"\u001b[96m☕️ WAIT: creating magnet with embedded cluster\u001b[0m\n",
"\u001b[93m🚨 WARN: Stream my_stream not found, creating\u001b[0m\n",
"\u001b[92m🌊 SUCCESS: created `my_stream` with default category `magnet`\u001b[0m\n",
"\u001b[93m🚨 WARN: KV bucket my_kv not found, creating\u001b[0m\n",
Expand All @@ -80,8 +80,8 @@
"from magnet.base import EmbeddedMagnet\n",
"cluster = EmbeddedMagnet()\n",
"cluster.start()\n",
"prism = cluster.create_prism()\n",
"await prism.align()\n",
"magnet = cluster.create_magnet()\n",
"await magnet.align()\n",
"cluster.stop()\n",
"cluster.cleanup()"
]
Expand Down
18 changes: 9 additions & 9 deletions examples/0_prism.ipynb → examples/0_magnet.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"\u001b[34m 💜 ⣽: Server is ready\n",
"\u001b[0m\n",
"\u001b[92m🌊 SUCCESS: milvus server started\u001b[0m\n",
"\u001b[96m☕️ WAIT: creating prism with embedded cluster\u001b[0m\n",
"\u001b[96m☕️ WAIT: creating magnet with embedded cluster\u001b[0m\n",
"\u001b[93m🚨 WARN: Stream my_stream not found, creating\u001b[0m\n",
"\u001b[92m🌊 SUCCESS: created `my_stream` with default category `magnet`\u001b[0m\n",
"\u001b[93m🚨 WARN: KV bucket my_kv not found, creating\u001b[0m\n",
Expand All @@ -84,13 +84,13 @@
}
],
"source": [
"from magnet.base import Prism\n",
"from magnet.base import Magnet\n",
"from magnet.base import EmbeddedMagnet\n",
"\n",
"cluster = EmbeddedMagnet()\n",
"cluster.start()\n",
"prism = cluster.create_prism()\n",
"await prism.align()\n",
"magnet = cluster.create_magnet()\n",
"await magnet.align()\n",
"\n",
"config = {\n",
" \"host\": \"127.0.0.1\",\n",
Expand Down Expand Up @@ -120,8 +120,8 @@
" }\n",
"}\n",
"\n",
"prism = Prism(config)\n",
"await prism.align()"
"magnet = Magnet(config)\n",
"await magnet.align()"
]
},
{
Expand All @@ -143,7 +143,7 @@
"source": [
"from magnet.ic.field import Charge\n",
"\n",
"field = Charge(prism)\n",
"field = Charge(magnet)\n",
"await field.on()"
]
},
Expand Down Expand Up @@ -229,7 +229,7 @@
],
"source": [
"from magnet.ize.memory import Memory\n",
"mem = Memory(prism)\n",
"mem = Memory(magnet)\n",
"await mem.on(create=True)\n"
]
},
Expand Down Expand Up @@ -271,7 +271,7 @@
"\n",
"from magnet.ic.field import Resonator\n",
"\n",
"reso = Resonator(prism)\n",
"reso = Resonator(magnet)\n",
"\n",
"async def handle_payload(payload, msg):\n",
" await mem.index(payload, msg, v=True)\n",
Expand Down
10 changes: 5 additions & 5 deletions magnet/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
}
}

class Prism:
class Magnet:
def __init__(self, config: PrismConfig | dict = None):
try:
if isinstance(config, dict):
Expand Down Expand Up @@ -198,10 +198,10 @@ def stop(self):
except Exception as e:
_f("warn", f"embedded milvus can't be stopped\n{e}")

def create_prism(self):
_f('wait', 'creating prism with embedded cluster')
prism = Prism(auto_config)
return prism
def create_magnet(self):
_f('wait', 'creating magnet with embedded cluster')
magnet = Magnet(auto_config)
return magnet

def cleanup(self):
self.client.images.prune()
Expand Down
108 changes: 54 additions & 54 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import asdict
from tabulate import tabulate

from magnet.base import Prism
from magnet.base import Magnet
from magnet.utils.globals import _f
from magnet.utils.data_classes import *

Expand All @@ -17,12 +17,12 @@
utc_timestamp = utc_time.timestamp()

class Charge:
def __init__(self, prism: Prism):
self.prism = prism
def __init__(self, magnet: Magnet):
self.magnet = magnet

async def list_streams(self):
try:
streams = await self.prism.js.streams_info()
streams = await self.magnet.js.streams_info()
remote_streams = [x.config.name for x in streams]
remote_subjects = [x.config.subjects for x in streams]
data = zip(remote_streams, remote_subjects)
Expand All @@ -32,13 +32,13 @@ async def list_streams(self):
# Loop through each stream and its subjects
for stream, subjects in zip(remote_streams, remote_subjects):
# Check if the current stream name or any of the subjects match the config variables
match_stream_name = stream == self.prism.config.stream_name
match_subject = self.prism.config.category in subjects
match_stream_name = stream == self.magnet.config.stream_name
match_subject = self.magnet.config.category in subjects

# If there's a match, format the stream name and subjects with ANSI green color and a magnet emoji
if match_stream_name or match_subject:
formatted_stream = f"\033[92m{stream} \U0001F9F2\033[0m" # Green and magnet emoji for stream name
formatted_subjects = [f"\033[92m{subject} \U0001F9F2\033[0m" if subject == self.prism.config.category else subject for subject in subjects]
formatted_subjects = [f"\033[92m{subject} \U0001F9F2\033[0m" if subject == self.magnet.config.category else subject for subject in subjects]
else:
formatted_stream = stream
formatted_subjects = subjects
Expand All @@ -51,41 +51,41 @@ async def list_streams(self):

_f("info", f'\n{table}')
except TimeoutError:
return _f('fatal', f'could not connect to {self.prism.config.host}')
return _f('fatal', f'could not connect to {self.magnet.config.host}')
except Exception as e:
return _f('fatal', e)

async def on(self):
try:
streams = await self.prism.js.streams_info()
streams = await self.magnet.js.streams_info()
remote_streams = [x.config.name for x in streams]
remote_subjects = [x.config.subjects for x in streams]
if self.prism.config.stream_name not in remote_streams:
return _f('fatal', f'{self.prism.config.stream_name} not found, initialize with `Prism.align()` first')
elif self.prism.config.category not in sum(remote_subjects, []):
if self.prism.config.category not in sum([x.config.subjects for x in streams if x.config.name == self.prism.config.stream_name], []):
if self.magnet.config.stream_name not in remote_streams:
return _f('fatal', f'{self.magnet.config.stream_name} not found, initialize with `Magnet.align()` first')
elif self.magnet.config.category not in sum(remote_subjects, []):
if self.magnet.config.category not in sum([x.config.subjects for x in streams if x.config.name == self.magnet.config.stream_name], []):
try:
subjects = sum(
[x.config.subjects for x in streams if x.config.name == self.prism.config.stream_name], [])
subjects.append(self.prism.config.category)
await self.prism.js.update_stream(StreamConfig(
name=self.prism.config.stream_name
[x.config.subjects for x in streams if x.config.name == self.magnet.config.stream_name], [])
subjects.append(self.magnet.config.category)
await self.magnet.js.update_stream(StreamConfig(
name=self.magnet.config.stream_name
, subjects=subjects
))
_f("success", f'created [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}')
_f("success", f'created [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}')
except ServerError as e:
_f('fatal', f"couldn't create {self.prism.config.stream_name} on {self.prism.config.host}, ensure your `category` is set")
_f('fatal', f"couldn't create {self.magnet.config.stream_name} on {self.magnet.config.host}, ensure your `category` is set")
except TimeoutError:
return _f('fatal', f'could not connect to {self.prism.config.host}')
_f("success", f'ready [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}')
return _f('fatal', f'could not connect to {self.magnet.config.host}')
_f("success", f'ready [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}')

async def off(self):
"""
Disconnects from the NATS server and prints a warning message.
"""
await self.prism.nc.drain()
await self.prism.nc.close()
_f('warn', f'disconnected from {self.prism.config.host}')
await self.magnet.nc.drain()
await self.magnet.nc.close()
_f('warn', f'disconnected from {self.magnet.config.host}')

async def pulse(self, payload: Payload | GeneratedPayload | EmbeddingPayload | JobParams = None, v=False):
"""
Expand All @@ -101,17 +101,17 @@ async def pulse(self, payload: Payload | GeneratedPayload | EmbeddingPayload | J
return _f('fatal', f'invalid object, more info:\n{e} in [Payload, GeneratedPayload, EmbeddingPayload, JobParams]')
try:
_hash = x.xxh64(bytes_).hexdigest()
msg = await self.prism.js.publish(
self.prism.config.category, bytes_, headers={
msg = await self.magnet.js.publish(
self.magnet.config.category, bytes_, headers={
"Nats-Msg-Id": _hash
}
)
_f('success', f'pulsed to {self.prism.config.category} on {self.prism.config.stream_name}') if v else None
_f('success', f'pulsed to {self.magnet.config.category} on {self.magnet.config.stream_name}') if v else None
_ts = datetime.datetime.now(datetime.timezone.utc)
msg.ts = _ts
return msg
except Exception as e:
return _f('fatal', f'could not pulse data to {self.prism.config.host}\n{e}')
return _f('fatal', f'could not pulse data to {self.magnet.config.host}\n{e}')

async def excite(self, job: dict = {}):
"""
Expand All @@ -126,13 +126,13 @@ async def excite(self, job: dict = {}):
_f('fatal', f'invalid JSON\n{e}')
try:
_hash = x.xxh64(bytes_).hexdigest()
await self.prism.js.publish(
self.prism.config.category, bytes_, headers={
await self.magnet.js.publish(
self.magnet.config.category, bytes_, headers={
"Nats-Msg-Id": _hash
}
)
except Exception as e:
_f('fatal', f'could not send data to {self.prism.config.host}\n{e}')
_f('fatal', f'could not send data to {self.magnet.config.host}\n{e}')

async def emp(self, name=None):
"""
Expand All @@ -141,9 +141,9 @@ async def emp(self, name=None):
Args:
name (str, optional): The name of the stream to delete. Defaults to None.
"""
if name and name == self.prism.config.stream_name:
await self.prism.js.delete_stream(name=self.prism.config.stream_name)
_f('warn', f'{self.prism.config.stream_name} stream deleted')
if name and name == self.magnet.config.stream_name:
await self.magnet.js.delete_stream(name=self.magnet.config.stream_name)
_f('warn', f'{self.magnet.config.stream_name} stream deleted')
else:
_f('fatal', "name doesn't match the stream or stream doesn't exist")

Expand All @@ -154,21 +154,21 @@ async def reset(self, name=None):
Args:
name (str, optional): The name of the category to purge. Defaults to None.
"""
if name and name == self.prism.config.category:
await self.js.purge_stream(name=self.prism.config.stream_name, subject=self.prism.config.category)
_f('warn', f'{self.prism.config.category} category deleted')
if name and name == self.magnet.config.category:
await self.js.purge_stream(name=self.magnet.config.stream_name, subject=self.magnet.config.category)
_f('warn', f'{self.magnet.config.category} category deleted')
else:
_f('fatal', "name doesn't match the stream category or category doesn't exist")

class Resonator:
def __init__(self, prism: Prism):
def __init__(self, magnet: Magnet):
"""
Initializes the `Resonator` class with the NATS server address.
Args:
server (str): The address of the NATS server.
"""
self.prism = prism
self.magnet = magnet

async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000):
"""
Expand All @@ -193,16 +193,16 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000)
, max_ack_pending=bandwidth
, ack_wait=3600
)
_f('wait', f'connecting to {self.prism.config.host}')
_f('wait', f'connecting to {self.magnet.config.host}')
try:
self.sub = await self.prism.js.pull_subscribe(
durable=self.prism.config.session
, subject=self.prism.config.category
, stream=self.prism.config.stream_name
self.sub = await self.magnet.js.pull_subscribe(
durable=self.magnet.config.session
, subject=self.magnet.config.category
, stream=self.magnet.config.stream_name
, config=self.consumer_config
)
_f('info',
f'joined worker queue: {self.prism.config.session} as {self.node}')
f'joined worker queue: {self.magnet.config.session} as {self.node}')
except Exception as e:
return _f('fatal', e)

Expand All @@ -212,7 +212,7 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo
except: return _f('fatal', 'no subscriber initialized')
if job_n:
_f("info",
f'consuming {job_n} from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}\n🧲 session: "{self.prism.session}"')
f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"')
try:
msgs = await self.sub.fetch(batch=job_n, timeout=60)
payloads = [msg.data if generic else Payload(
Expand All @@ -226,12 +226,12 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo
_f('fatal', e)
except ValueError as e:
_f('warn',
f'{self.prism.config.session} reached the end of {self.prism.config.category}, {self.prism.config.name}')
f'{self.magnet.config.session} reached the end of {self.magnet.config.category}, {self.magnet.config.name}')
except Exception as e:
_f('warn', "no more data")
else:
_f("info",
f'consuming delta from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}\n🧲 session: "{self.prism.config.session}"')
f'consuming delta from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"')
while True:
try:
msgs = await self.sub.fetch(batch=1, timeout=60)
Expand All @@ -242,7 +242,7 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo
try:
await cb(payload, msgs[0])
except Exception as e:
_f("warn", f'retrying connection to {self.prism.config.host}\n{e}')
_f("warn", f'retrying connection to {self.magnet.config.host}\n{e}')
_f("info", "this can also be a problem with your callback")
except Exception as e:
_f('warn', f'no more data') if "nats: timeout" in str(e) else _f('fatal', e)
Expand All @@ -262,7 +262,7 @@ async def worker(self, cb=print):
Exception: If there is an error in consuming the message or processing the callback function.
"""
_f("info",
f'processing jobs from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}\n🧲 session: "{self.prism.session}"')
f'processing jobs from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"')
try:
msg = await self.sub.next_msg(timeout=60)
payload = JobParams(**json.loads(msg.data))
Expand All @@ -283,7 +283,7 @@ async def info(self):
:param session: A string representing the session name of the consumer. If not provided, information about all consumers in the stream will be retrieved.
:return: None
"""
jsm = await self.prism.js.consumer_info(stream=self.prism.config.stream_name, consumer=self.prism.session)
jsm = await self.magnet.js.consumer_info(stream=self.magnet.config.stream_name, consumer=self.magnet.session)
_f('info', json.dumps(jsm.config.__dict__, indent=2))

async def off(self):
Expand All @@ -292,8 +292,8 @@ async def off(self):
:return: None
"""
await self.prism.js.sub.unsubscribe()
_f('warn', f'unsubscribed from {self.prism.config.stream_name}')
await self.magnet.js.sub.unsubscribe()
_f('warn', f'unsubscribed from {self.magnet.config.stream_name}')
await self.nc.drain()
_f('warn', f'safe to disconnect from {self.prism.config.host}')
_f('warn', f'safe to disconnect from {self.magnet.config.host}')

6 changes: 3 additions & 3 deletions magnet/ize/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from magnet.utils.index.milvus import *
from magnet.utils.data_classes import EmbeddingPayload

from magnet.ic.field import Charge, Prism
from magnet.ic.field import Charge, Magnet

from typing import Optional

Expand All @@ -23,8 +23,8 @@ class Memory:
db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database.
"""

def __init__(self, prism: Prism = None):
self.config = prism.config
def __init__(self, magnet: Magnet = None):
self.config = magnet.config
self._model = None

async def on(self, create: bool = False, initialize: bool = False):
Expand Down
Loading

0 comments on commit d36ba44

Please sign in to comment.