Skip to content

Commit

Permalink
Merge pull request #2004 from dandi/embargoed-asset-admin-access
Browse files Browse the repository at this point in the history
Fix admin access to embargoed asset blobs
  • Loading branch information
jjnesbitt authored Aug 16, 2024
2 parents 7bad420 + 71b9bc8 commit 4cebe93
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 17 deletions.
60 changes: 60 additions & 0 deletions dandiapi/api/tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,66 @@ def test_asset_rest_retrieve_no_sha256(api_client, version, asset):
)


@pytest.mark.django_db()
def test_asset_rest_retrieve_embargoed_admin(
api_client,
draft_version_factory,
draft_asset_factory,
admin_user,
storage,
monkeypatch,
):
monkeypatch.setattr(AssetBlob.blob.field, 'storage', storage)

api_client.force_authenticate(user=admin_user)
version = draft_version_factory(dandiset__embargo_status=Dandiset.EmbargoStatus.EMBARGOED)
ds = version.dandiset

# Create an extra asset so that there are multiple assets to filter down
asset = draft_asset_factory(blob__embargoed=True)
version.assets.add(asset)

# Asset View
r = api_client.get(f'/api/assets/{asset.asset_id}/')
assert r.status_code == 200

# Nested Asset View
r = api_client.get(
f'/api/dandisets/{ds.identifier}/versions/{version.version}/assets/{asset.asset_id}/'
)
assert r.status_code == 200


@pytest.mark.django_db()
def test_asset_rest_download_embargoed_admin(
api_client,
draft_version_factory,
draft_asset_factory,
admin_user,
storage,
monkeypatch,
):
monkeypatch.setattr(AssetBlob.blob.field, 'storage', storage)

api_client.force_authenticate(user=admin_user)
version = draft_version_factory(dandiset__embargo_status=Dandiset.EmbargoStatus.EMBARGOED)
ds = version.dandiset

# Create an extra asset so that there are multiple assets to filter down
asset = draft_asset_factory(blob__embargoed=True)
version.assets.add(asset)

# Asset View
r = api_client.get(f'/api/assets/{asset.asset_id}/download/')
assert r.status_code == 302

# Nested Asset View
r = api_client.get(
f'/api/dandisets/{ds.identifier}/versions/{version.version}/assets/{asset.asset_id}/download/'
)
assert r.status_code == 302


@pytest.mark.django_db()
def test_asset_rest_info(api_client, version, asset):
version.assets.add(asset)
Expand Down
43 changes: 26 additions & 17 deletions dandiapi/api/views/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,32 @@ def raise_if_unauthorized(self):
# We need to check the dandiset to see if it's embargoed, and if so whether or not the
# user has ownership
asset_id = self.kwargs.get('asset_id')
if asset_id is not None:
asset = get_object_or_404(Asset.objects.select_related('blob'), asset_id=asset_id)
# TODO: When EmbargoedZarrArchive is implemented, check that as well
if asset.blob and asset.blob.embargoed:
if not self.request.user.is_authenticated:
# Clients must be authenticated to access it
raise NotAuthenticated

# User must be an owner on any of the dandisets this asset belongs to
asset_dandisets = Dandiset.objects.filter(versions__in=asset.versions.all())
asset_dandisets_owned_by_user = DandisetUserObjectPermission.objects.filter(
content_object__in=asset_dandisets,
user=self.request.user,
permission__codename='owner',
)
if not asset_dandisets_owned_by_user.exists():
raise PermissionDenied
if asset_id is None:
return

asset = get_object_or_404(Asset.objects.select_related('blob'), asset_id=asset_id)

# TODO: When EmbargoedZarrArchive is implemented, check that as well
if not (asset.blob and asset.blob.embargoed):
return

# Clients must be authenticated to access it
if not self.request.user.is_authenticated:
raise NotAuthenticated

# Admins are allowed to access any embargoed asset blob
if self.request.user.is_superuser:
return

# User must be an owner on any of the dandisets this asset belongs to
asset_dandisets = Dandiset.objects.filter(versions__in=asset.versions.all())
asset_dandisets_owned_by_user = DandisetUserObjectPermission.objects.filter(
content_object__in=asset_dandisets,
user=self.request.user,
permission__codename='owner',
)
if not asset_dandisets_owned_by_user.exists():
raise PermissionDenied

def get_queryset(self):
self.raise_if_unauthorized()
Expand Down

0 comments on commit 4cebe93

Please sign in to comment.