Skip to content

Commit

Permalink
🧲 feat(wip): JSON NATS payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Dec 19, 2023
1 parent 2d682d6 commit a038878
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
3 changes: 2 additions & 1 deletion magnet/filings.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ async def process(self, path: str = None, text_column: str = "clean", id_column:
a = self.df[id_column].iloc[i]
all_sentences.append((a, s))
if self.field:
await self.field.pulse(bytes(s, 'utf8'))
await self.field.pulse(s, a)
pbar.set_description(s[0:10])
knowledge_base['sentences'] = [x[1] for x in all_sentences]
knowledge_base['id'] = [x[0] for x in all_sentences]
self.df = knowledge_base
_f('wait', f'saving to {path}')
self.save(path, self.df)
return
except Exception as e:
Expand Down
23 changes: 19 additions & 4 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import nats
import nats, json
from magnet.utils import _f
from dataclasses import asdict, dataclass

@dataclass
class Payload:
text: str
document: str

class Charge:
def __init__(self, server):
Expand All @@ -22,9 +28,14 @@ async def off(self):
_f('warn', f'unsubscribed from {self.stream}')
await self.nc.drain()
_f('warn', f'disconnected from {self.server}')
async def pulse(self, packet):
async def pulse(self, packet, document_id):
try:
payload = Payload(text=packet,document=document_id)
bytes_ = json.dumps(asdict(payload)).encode()
except:
_f('fatal', 'invalid JSON')
try:
await self.js.publish(self.frequency, packet)
await self.js.publish(self.frequency, bytes_)
except Exception as e:
print(e)
_f('fatal', f'could not send data to {self.server}')
Expand All @@ -45,7 +56,11 @@ async def on(self, frequency: str = 'default', stream: str = 'default', cb=print
while True:
msgs = await self.sub.fetch(batch=10, timeout=60)
for msg in msgs:
cb(msg)
try:
payload = Payload(**json.loads(msg.data))
cb(payload)
except json.decoder.JSONDecodeError:
_f('fatal','invalid JSON')
async def off(self):
await self.js.unsubscribe()
_f('warn', f'unsubscribed from {self.stream}')
Expand Down

0 comments on commit a038878

Please sign in to comment.