Skip to content

Commit

Permalink
use api token and load metadatas with mutlithread
Browse files Browse the repository at this point in the history
  • Loading branch information
luiztauffer committed Aug 16, 2023
1 parent b25a003 commit e161965
Show file tree
Hide file tree
Showing 8 changed files with 4,328 additions and 1,088 deletions.
10 changes: 6 additions & 4 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ services:
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_BATCH_JOB_QUEUE: ${AWS_BATCH_JOB_QUEUE}
AWS_BATCH_JOB_DEFINITION: ${AWS_BATCH_JOB_DEFINITION}
DANDI_API_TOKEN: ${DANDI_API_TOKEN}
volumes:
- ./rest:/app
depends_on:
- database

worker:
build:
context: containers
dockerfile: Dockerfile.combined
image: si-sorting-worker
# build:
# context: containers
# dockerfile: Dockerfile.combined
# image: si-sorting-worker
image: ghcr.io/catalystneuro/si-sorting-worker:latest
container_name: si-sorting-worker
ports:
- "5000:5000"
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
DANDI_API_TOKEN: ${DANDI_API_TOKEN}
volumes:
- ./rest:/app
depends_on:
Expand Down
46 changes: 28 additions & 18 deletions rest/clients/dandi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
from dandi.dandiapi import DandiAPIClient
from dandischema.models import Dandiset
from pynwb import NWBHDF5IO, NWBFile
Expand All @@ -11,14 +12,15 @@

class DandiClient:

def __init__(self):
def __init__(self, token: str = None):
"""
Initialize DandiClient object, to interact with DANDI API.
"""
self.fs = CachingFileSystem(
fs=fsspec.filesystem("http"),
cache_storage="data/nwb-cache", # Local folder for the cache
)
self.token = token


def get_all_dandisets_labels(self) -> List[str]:
Expand Down Expand Up @@ -55,30 +57,38 @@ def get_all_dandisets_metadata_from_file(self) -> List:
all_metadata = json.load(f)
return all_metadata


def get_all_dandisets_metadata_from_dandi(self) -> List:
"""
Get metadata for all dandisets, directly from DANDI.
Returns:
List: List of dandisets metadata.
"""
with DandiAPIClient() as client:
dandisets_list = list(client.get_dandisets())
with DandiAPIClient(token=self.token) as client:
all_metadata = dict()
for ii, dandiset in enumerate(dandisets_list):
if 1 < ii < 500:
try:
metadata = dandiset.get_raw_metadata()
if self.has_nwb(metadata) and self.has_ecephys(metadata):
all_metadata[metadata["id"].split(":")[-1].split("/")[0].strip()] = metadata
except:
pass
else:
pass
dandisets_list = list(client.get_dandisets())
total_dandisets = len(dandisets_list)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.process_dandiset, dandiset) for dandiset in dandisets_list]
for future in concurrent.futures.as_completed(futures):
metadata = future.result()
if metadata:
all_metadata[metadata["id"].split(":")[-1].split("/")[0].strip()] = metadata
print(f"Processed {len(all_metadata)} of {total_dandisets} dandisets.")
return all_metadata


def process_dandiset(self, dandiset):
try:
metadata = dandiset.get_raw_metadata()
if self.has_nwb(metadata) and self.has_ecephys(metadata):
return metadata
except:
pass
return None


def get_dandiset_metadata(self, dandiset_id: str) -> dict:
"""
Get metadata for a dandiset.
Expand All @@ -89,7 +99,7 @@ def get_dandiset_metadata(self, dandiset_id: str) -> dict:
Returns:
dict: Metadata for the dandiset.
"""
with DandiAPIClient() as client:
with DandiAPIClient(token=self.token) as client:
dandiset = client.get_dandiset(dandiset_id=dandiset_id, version_id="draft")
return dandiset.get_raw_metadata()

Expand All @@ -104,7 +114,7 @@ def list_dandiset_files(self, dandiset_id: str) -> List[str]:
Returns:
List[str]: List of files in the dandiset.
"""
with DandiAPIClient() as client:
with DandiAPIClient(token=self.token) as client:
dandiset = client.get_dandiset(dandiset_id=dandiset_id, version_id="draft")
return [i.dict().get("path") for i in dandiset.get_assets() if i.dict().get("path").endswith(".nwb")]

Expand Down Expand Up @@ -180,7 +190,7 @@ def get_file_url(self, dandiset_id: str, file_path: str) -> str:
Returns:
str: S3 URL of the file.
"""
with DandiAPIClient() as client:
with DandiAPIClient(token=self.token) as client:
asset = client.get_dandiset(dandiset_id, "draft").get_asset_by_path(file_path)
return asset.get_content_url(follow_redirects=1, strip_query=True)

Expand All @@ -201,7 +211,7 @@ def has_nwb(self, metadata: Dandiset) -> bool:
if data_standard:
return any(x.get("identifier", "") == "RRID:SCR_015242" for x in data_standard)
return False


def has_ecephys(self, metadata: Dandiset) -> bool:
"""
Expand Down
1 change: 1 addition & 0 deletions rest/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class Settings:
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID", None)
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", None)
AWS_DEFAULT_REGION = os.environ.get("AWS_DEFAULT_REGION", None)
DANDI_API_TOKEN = os.environ.get("DANDI_API_TOKEN", None)
DANDI_VAR = os.environ.get("DANDI_VAR")

AWS_BATCH_JOB_QUEUE = os.environ.get("AWS_BATCH_JOB_QUEUE", None)
Expand Down
Loading

0 comments on commit e161965

Please sign in to comment.