From 5f6a75f0669d88c2f4d1acf3d94749de2de54af2 Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Fri, 1 Dec 2023 19:39:27 +0100 Subject: [PATCH] Run script --- nucliadb_performance/export_import.py | 88 +++++++++++++++++++-------- nucliadb_performance/run.sh | 44 ++++++++++++++ 2 files changed, 107 insertions(+), 25 deletions(-) create mode 100755 nucliadb_performance/run.sh diff --git a/nucliadb_performance/export_import.py b/nucliadb_performance/export_import.py index 4d350a4605..56c5caf9c9 100644 --- a/nucliadb_performance/export_import.py +++ b/nucliadb_performance/export_import.py @@ -17,29 +17,48 @@ class NucliaDB: writer: NucliaSDK -API = "http://localhost:8080/api" -CHUNK_SIZE = 1024 * 1025 * 5 +LOCAL_API = "http://localhost:8080/api" +CLUSTER_API = "http://{service}.nucliadb.svc.cluster.local:8080/api" + +LOCAL_NUCLIADB = True +if LOCAL_NUCLIADB: + READER_API = WRITER_API = LOCAL_API +else: + READER_API = CLUSTER_API.format(service="reader") + WRITER_API = CLUSTER_API.format(service="writer") + +MB = 1024 * 1024 +CHUNK_SIZE = 10 * MB ndb = NucliaDB( - reader=NucliaSDK(url=API, headers={"X-Nucliadb-Roles": "READER"}), - writer=NucliaSDK(url=API, headers={"X-Nucliadb-Roles": "WRITER;MANAGER"}), + reader=NucliaSDK(url=READER_API, headers={"X-Nucliadb-Roles": "READER"}), + writer=NucliaSDK(url=WRITER_API, headers={"X-Nucliadb-Roles": "WRITER;MANAGER"}), ) -def get_or_create_kb(ndb, slug, release_channel=None) -> str: +def get_kb(ndb, slug_or_kbid) -> str: try: - kbid = ndb.reader.get_knowledge_box_by_slug(slug=slug).uuid + kbid = ndb.reader.get_knowledge_box_by_slug(slug=slug_or_kbid).uuid + except NotFoundError: + ndb.reader.get_knowledge_box(kbid=slug_or_kbid) + kbid = slug_or_kbid + return kbid + + +def get_or_create_kb(ndb, slug_or_kbid, release_channel=None) -> str: + try: + kbid = get_kb(ndb, slug_or_kbid) except NotFoundError: kbid = ndb.writer.create_knowledge_box( - slug=slug, release_channel=release_channel + slug=slug_or_kbid, release_channel=release_channel ).uuid return kbid -def import_kb(*, uri, slug, release_channel=None): - kbid = get_or_create_kb(ndb, slug, release_channel=release_channel) - print(f"Importing from {uri} to kb={slug}") +def import_kb(*, uri, kb, release_channel=None): + kbid = get_or_create_kb(ndb, kb, release_channel=release_channel) + print(f"Importing from {uri} to kb={kb}") import_id = ndb.writer.start_import( kbid=kbid, content=read_import_stream(uri) @@ -51,11 +70,11 @@ def import_kb(*, uri, slug, release_channel=None): print(f"Import finished!") -def export_kb(*, uri, slug): - kbid = ndb.reader.get_knowledge_box_by_slug(slug=slug).uuid +def export_kb(*, uri, kb): + kbid = get_kb(ndb, kb) export_id = ndb.writer.start_export(kbid=kbid).export_id - print(f"Starting export for {slug}. Export id: {export_id}") + print(f"Starting export for {kb}. Export id: {export_id}") wait_until_finished(ndb, kbid, "export", export_id) print(f"Downloading export at {uri}") @@ -76,11 +95,14 @@ def get_status(ndb, kbid, task_type, task_id): def wait_until_finished(ndb, kbid, task_type, task_id, wait_interval=2): status = get_status(ndb, kbid, task_type, task_id) - while status.status != "finished": - print(f"Status: {status.status} {status.processed}/{status.total}") - assert status.status != "error" - time.sleep(wait_interval) - status = get_status(ndb, kbid, task_type, task_id) + with tqdm( + total=status.total, desc=f"Waiting for {task_type} {task_id} to finish" + ) as progress_bar: + while status.status != "finished": + progress_bar.update(status.processed - progress_bar.n) + assert status.status != "error" + time.sleep(wait_interval) + status = get_status(ndb, kbid, task_type, task_id) def save_export_stream(uri, export_generator): @@ -115,9 +137,13 @@ def read_import_stream(uri): unit="iB", unit_scale=True, ) - if uri.startswith("http"): + path_for_uri = get_export_file_from_url(uri) + path_for_uri_exists = os.path.exists(path_for_uri) + if uri.startswith("http") and not path_for_uri_exists: stream = read_from_url else: + if path_for_uri_exists: + uri = path_for_uri stream = read_from_file tqdm_kwargs["total"] = os.path.getsize(uri) for chunk in progressify(stream(uri), **tqdm_kwargs): @@ -134,10 +160,22 @@ def read_from_file(path): def read_from_url(uri): - with requests.get(uri, stream=True) as response: - response.raise_for_status() - for chunk in response.iter_content(chunk_size=CHUNK_SIZE): - yield chunk + """ + Read from a URL using requests, but also save the read chunks to disk. + """ + path_for_uri = get_export_file_from_url(uri) + with open(path_for_uri, mode="wb") as f: + with requests.get(uri, stream=True) as response: + response.raise_for_status() + for chunk in response.iter_content(chunk_size=CHUNK_SIZE): + yield chunk + # Save to disk too + f.write(chunk) + + +def get_export_file_from_url(uri): + export_name = uri.split("/")[-1] + return f"./{export_name}" def progressify(func, **tqdm_kwargs): @@ -165,10 +203,10 @@ def parse_arguments(): def main(): args = parse_arguments() if args.action == "export": - export_kb(uri=args.uri, slug=args.kb) + export_kb(uri=args.uri, kb=args.kb) elif args.action == "import": release_channel = ReleaseChannel(args.release_channel) - import_kb(uri=args.uri, slug=args.kb, release_channel=release_channel) + import_kb(uri=args.uri, kb=args.kb, release_channel=release_channel) else: raise ValueError(f"Unknown action {args.action}") diff --git a/nucliadb_performance/run.sh b/nucliadb_performance/run.sh new file mode 100755 index 0000000000..26a03427d5 --- /dev/null +++ b/nucliadb_performance/run.sh @@ -0,0 +1,44 @@ +#!/bin/bash +KB=${1:-small} + +echo "Installing requirements..." +python3 -m venv performance_env +source performance_env/bin/activate +python3 -m pip install -q --upgrade pip wheel +pip install -q -e . +pip install -q -e ../nucliadb + +echo "Starting NucliaDB..." +DEBUG=true nucliadb & +NDB_PID=$! +echo "NucliaDB PID: $NDB_PID" + +echo "Waiting for NucliaDB to start..." +while : ; do + sleep 1 + status_code=`curl -s -o /dev/null -w "%{http_code}" http://localhost:8080/api/v1/health/ready` + if [ "$status_code" -eq "200" ]; then + sleep 2 + echo "NucliaDB is ready!" + break + fi +done + +echo "Importing data..." +python export_import.py --action=import --kb=$KB --uri=https://storage.googleapis.com/nucliadb_indexer/nucliadb_performance/exports/$KB.export + +echo "Running performance tests..." +make test-standalone-search kb_slug=$KB max_workers=4 ramp_up=2 duration_s=60 + +# Show results +cat standalone.json + +echo "Stopping NucliaDB at ${NDB_PID}..." +kill $NDB_PID + +echo "Cleaning up..." +source deactivate +# rm -rf ./performance_env +rm ./cache.data.db +rm -rf ./data +# rm -rf ./logs \ No newline at end of file