Skip to content

Commit

Permalink
Merge pull request #7 from Prismadic/6-featperf-add-nats-functionality
Browse files Browse the repository at this point in the history
🧲 feat(release): Basic NATS functionality w/ persistent streams
  • Loading branch information
mxchinegod authored Dec 18, 2023
2 parents 4aa5b92 + 7613464 commit b01317b
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 303 deletions.
27 changes: 6 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,24 @@ python3 setup.py install
``` python
from magnet.filings import Processor
source_data_file = "./raw/kb_export_clean.parquet" # your text documents data
filings = Processor()
filings.load(source_data_file)
await filings.process('./data/filings.parquet','clean','file', nlp=False)
```

<small>*(yes, this is all it takes to initialize a project!)*</small>

## 😥 compute requirements

_minimum_ requirements for ~6000 documents from a knowledge base:

1. RAM
- 32GB RAM
3. GPU
- can choose to store your embeddings in VRAM
- 4x 16GB VRAM (*for finetuning with research efficiency*)
- otherwise helpful with embedding your data & scoring/ranking (speeds below)

#### ⏱️ "Ready, Set, Go!"

Generally speaking, the size of your documents and the quality of them will impact these times.
The larger datasets listed are curated with a lot more attention to quality for example. So in addition to being larger overall, the documents in the dataset are also larger.

🚧

## 👏 features

- Apple silicon first class citizen
- so long as your initial data has columns for article text and ids, `magnet` can do the rest
- sequential distributed processing with NATS
- finetune highly performant expert models from 0-1 in very little time
- upload to S3
- ideal cyberpunk vision of LLM power users in vectorspace

## goals

- [ ] add [mlx](https://github.com/ml-explore/mlx) support
- [x] finish `README.md`
- [x] add [NATS](https://nats.io) for distributed processing
- [ ] `deepspeed` integration for model parallelism on multiple GPU

## bad code
Expand Down
233 changes: 72 additions & 161 deletions example.ipynb

Large diffs are not rendered by default.

24 changes: 13 additions & 11 deletions magnet/filings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import os
from .utils import _f, Utils
from tqdm import tqdm

class Processor:
def __init__(self):
def __init__(self, field=None):
self.df = None
self.utils = Utils()

self.field = field

def save(self, filename: str = None, raw: pd.DataFrame = None):
try:
file_extension = os.path.splitext(filename)[-1]
Expand Down Expand Up @@ -46,9 +48,10 @@ def load(self, raw: str | pd.DataFrame = None):
_f("fatal", "data type not in [csv, json, xlsx, parquet, pd.DataFrame]")
except Exception as e:
_f("fatal", e)

def export_as_sentences(self, path: str = None, text_column: str = "clean", id_column: str = 'id', splitter: any = None, nlp=True):
async def process(self, path: str = None, text_column: str = "clean", id_column: str = 'id', splitter: any = None, nlp=True):
self.df = self.df.dropna()
if self.field:
await self.field.on()
if self.df is not None:
try:
_f("wait", f"get coffee or tea - {len(self.df)} processing...")
Expand All @@ -61,10 +64,14 @@ def export_as_sentences(self, path: str = None, text_column: str = "clean", id_c
str(s) for s in sentence_splitter(self.utils.normalize_text(x), nlp=nlp)
]
)
for i in range(len(self.df)):
pbar = tqdm(range(len(self.df)))
for i in pbar:
for s in self.df['sentences'].iloc[i]:
a = self.df[id_column].iloc[i]
all_sentences.append((a, s))
if self.field:
await self.field.pulse(bytes(s, 'utf8'))
pbar.set_description(s[0:10])
knowledge_base['sentences'] = [x[1] for x in all_sentences]
knowledge_base['id'] = [x[0] for x in all_sentences]
self.df = knowledge_base
Expand All @@ -74,7 +81,6 @@ def export_as_sentences(self, path: str = None, text_column: str = "clean", id_c
_f("fatal", e)
else:
return _f("fatal", "no data loaded!")

def bge_sentence_splitter(self, data, window_size=250, overlap=25, nlp=True):
if nlp:
self.utils.nlp.max_length = len(data) + 100
Expand Down Expand Up @@ -115,9 +121,6 @@ def bge_sentence_splitter(self, data, window_size=250, overlap=25, nlp=True):
new_sentences.append(chunk)
start_char_idx += (window_size - overlap)
return new_sentences



def mistral_sentence_splitter(self, data, window_size=768, overlap=76, nlp=True):
if nlp:
self.utils.nlp.max_length = len(data) + 100
Expand Down Expand Up @@ -156,5 +159,4 @@ def mistral_sentence_splitter(self, data, window_size=768, overlap=76, nlp=True)
chunk = data[start_char_idx:end_char_idx]
new_sentences.append(chunk)
start_char_idx += (window_size - overlap)
return new_sentences

return new_sentences
1 change: 0 additions & 1 deletion magnet/ic/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .ic import ic
16 changes: 0 additions & 16 deletions magnet/ic/current.py

This file was deleted.

53 changes: 53 additions & 0 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import nats
from magnet.utils import _f

class Charge:
def __init__(self, server):
self.server = server

async def on(self, frequency: str = 'default', stream: str = 'default'):
self.frequency = frequency
self.stream = stream
try:
nc = await nats.connect(f'nats://{self.server}:4222')
self.nc = nc
self.js = self.nc.jetstream()
await self.js.add_stream(name=self.stream, subjects=[self.frequency])
self.sub = await self.js.pull_subscribe(self.frequency, 'magnet-charge')
_f("success", f'connected to {self.server}')
except TimeoutError:
_f('fatal', f'could not connect to {self.server}')
async def off(self):
await self.js.unsubscribe()
_f('warn', f'unsubscribed from {self.stream}')
await self.nc.drain()
_f('warn', f'disconnected from {self.server}')
async def pulse(self, packet):
try:
await self.js.publish(self.frequency, packet)
except Exception as e:
print(e)
_f('fatal', f'could not send data to {self.server}')
async def emp(self):
await self.js.delete_stream(name=self.stream)
_f('success', f'{self.frequency} stream deleted')

class Resonator:

def __init__(self, server):
self.server = server
async def on(self, frequency: str = 'default', stream: str = 'default', cb=print):
self.frequency = frequency
self.stream = stream
self.nc = await nats.connect(f'nats://{self.server}:4222')
self.js = self.nc.jetstream()
self.sub = await self.js.pull_subscribe(self.frequency, stream=self.stream)
while True:
msgs = await self.sub.fetch(batch=10, timeout=60)
for msg in msgs:
cb(msg)
async def off(self):
await self.js.unsubscribe()
_f('warn', f'unsubscribed from {self.stream}')
await self.nc.drain()
_f('warn', f'disconnected from {self.server}')
1 change: 0 additions & 1 deletion magnet/ize/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .ize import ize
1 change: 0 additions & 1 deletion magnet/ron/__init__.py

This file was deleted.

91 changes: 0 additions & 91 deletions magnet/ron/tune.py

This file was deleted.

0 comments on commit b01317b

Please sign in to comment.