Skip to content

Commit

Permalink
Run script
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran committed Dec 1, 2023
1 parent 0730d57 commit 5f6a75f
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 25 deletions.
88 changes: 63 additions & 25 deletions nucliadb_performance/export_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,48 @@ class NucliaDB:
writer: NucliaSDK


API = "http://localhost:8080/api"
CHUNK_SIZE = 1024 * 1025 * 5
LOCAL_API = "http://localhost:8080/api"
CLUSTER_API = "http://{service}.nucliadb.svc.cluster.local:8080/api"

LOCAL_NUCLIADB = True
if LOCAL_NUCLIADB:
READER_API = WRITER_API = LOCAL_API
else:
READER_API = CLUSTER_API.format(service="reader")
WRITER_API = CLUSTER_API.format(service="writer")

MB = 1024 * 1024
CHUNK_SIZE = 10 * MB


ndb = NucliaDB(
reader=NucliaSDK(url=API, headers={"X-Nucliadb-Roles": "READER"}),
writer=NucliaSDK(url=API, headers={"X-Nucliadb-Roles": "WRITER;MANAGER"}),
reader=NucliaSDK(url=READER_API, headers={"X-Nucliadb-Roles": "READER"}),
writer=NucliaSDK(url=WRITER_API, headers={"X-Nucliadb-Roles": "WRITER;MANAGER"}),
)


def get_or_create_kb(ndb, slug, release_channel=None) -> str:
def get_kb(ndb, slug_or_kbid) -> str:
try:
kbid = ndb.reader.get_knowledge_box_by_slug(slug=slug).uuid
kbid = ndb.reader.get_knowledge_box_by_slug(slug=slug_or_kbid).uuid
except NotFoundError:
ndb.reader.get_knowledge_box(kbid=slug_or_kbid)
kbid = slug_or_kbid
return kbid


def get_or_create_kb(ndb, slug_or_kbid, release_channel=None) -> str:
try:
kbid = get_kb(ndb, slug_or_kbid)
except NotFoundError:
kbid = ndb.writer.create_knowledge_box(
slug=slug, release_channel=release_channel
slug=slug_or_kbid, release_channel=release_channel
).uuid
return kbid


def import_kb(*, uri, slug, release_channel=None):
kbid = get_or_create_kb(ndb, slug, release_channel=release_channel)
print(f"Importing from {uri} to kb={slug}")
def import_kb(*, uri, kb, release_channel=None):
kbid = get_or_create_kb(ndb, kb, release_channel=release_channel)
print(f"Importing from {uri} to kb={kb}")

import_id = ndb.writer.start_import(
kbid=kbid, content=read_import_stream(uri)
Expand All @@ -51,11 +70,11 @@ def import_kb(*, uri, slug, release_channel=None):
print(f"Import finished!")


def export_kb(*, uri, slug):
kbid = ndb.reader.get_knowledge_box_by_slug(slug=slug).uuid
def export_kb(*, uri, kb):
kbid = get_kb(ndb, kb)
export_id = ndb.writer.start_export(kbid=kbid).export_id

print(f"Starting export for {slug}. Export id: {export_id}")
print(f"Starting export for {kb}. Export id: {export_id}")
wait_until_finished(ndb, kbid, "export", export_id)

print(f"Downloading export at {uri}")
Expand All @@ -76,11 +95,14 @@ def get_status(ndb, kbid, task_type, task_id):

def wait_until_finished(ndb, kbid, task_type, task_id, wait_interval=2):
status = get_status(ndb, kbid, task_type, task_id)
while status.status != "finished":
print(f"Status: {status.status} {status.processed}/{status.total}")
assert status.status != "error"
time.sleep(wait_interval)
status = get_status(ndb, kbid, task_type, task_id)
with tqdm(
total=status.total, desc=f"Waiting for {task_type} {task_id} to finish"
) as progress_bar:
while status.status != "finished":
progress_bar.update(status.processed - progress_bar.n)
assert status.status != "error"
time.sleep(wait_interval)
status = get_status(ndb, kbid, task_type, task_id)


def save_export_stream(uri, export_generator):
Expand Down Expand Up @@ -115,9 +137,13 @@ def read_import_stream(uri):
unit="iB",
unit_scale=True,
)
if uri.startswith("http"):
path_for_uri = get_export_file_from_url(uri)
path_for_uri_exists = os.path.exists(path_for_uri)
if uri.startswith("http") and not path_for_uri_exists:
stream = read_from_url
else:
if path_for_uri_exists:
uri = path_for_uri
stream = read_from_file
tqdm_kwargs["total"] = os.path.getsize(uri)
for chunk in progressify(stream(uri), **tqdm_kwargs):
Expand All @@ -134,10 +160,22 @@ def read_from_file(path):


def read_from_url(uri):
with requests.get(uri, stream=True) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
yield chunk
"""
Read from a URL using requests, but also save the read chunks to disk.
"""
path_for_uri = get_export_file_from_url(uri)
with open(path_for_uri, mode="wb") as f:
with requests.get(uri, stream=True) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
yield chunk
# Save to disk too
f.write(chunk)


def get_export_file_from_url(uri):
export_name = uri.split("/")[-1]
return f"./{export_name}"


def progressify(func, **tqdm_kwargs):
Expand Down Expand Up @@ -165,10 +203,10 @@ def parse_arguments():
def main():
args = parse_arguments()
if args.action == "export":
export_kb(uri=args.uri, slug=args.kb)
export_kb(uri=args.uri, kb=args.kb)
elif args.action == "import":
release_channel = ReleaseChannel(args.release_channel)
import_kb(uri=args.uri, slug=args.kb, release_channel=release_channel)
import_kb(uri=args.uri, kb=args.kb, release_channel=release_channel)
else:
raise ValueError(f"Unknown action {args.action}")

Expand Down
44 changes: 44 additions & 0 deletions nucliadb_performance/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
KB=${1:-small}

echo "Installing requirements..."
python3 -m venv performance_env
source performance_env/bin/activate
python3 -m pip install -q --upgrade pip wheel
pip install -q -e .
pip install -q -e ../nucliadb

echo "Starting NucliaDB..."
DEBUG=true nucliadb &
NDB_PID=$!
echo "NucliaDB PID: $NDB_PID"

echo "Waiting for NucliaDB to start..."
while : ; do
sleep 1
status_code=`curl -s -o /dev/null -w "%{http_code}" http://localhost:8080/api/v1/health/ready`
if [ "$status_code" -eq "200" ]; then
sleep 2
echo "NucliaDB is ready!"
break
fi
done

echo "Importing data..."
python export_import.py --action=import --kb=$KB --uri=https://storage.googleapis.com/nucliadb_indexer/nucliadb_performance/exports/$KB.export

echo "Running performance tests..."
make test-standalone-search kb_slug=$KB max_workers=4 ramp_up=2 duration_s=60

# Show results
cat standalone.json

echo "Stopping NucliaDB at ${NDB_PID}..."
kill $NDB_PID

echo "Cleaning up..."
source deactivate
# rm -rf ./performance_env
rm ./cache.data.db
rm -rf ./data
# rm -rf ./logs

0 comments on commit 5f6a75f

Please sign in to comment.