Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create S3 bucket with tags #1608

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/nucliadb_shared/templates/nucliadb.cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ data:
S3_MAX_POOL_CONNECTIONS: {{ .Values.storage.s3_max_pool_connections | quote }}
S3_REGION_NAME: {{ .Values.storage.s3_region_name }}
S3_BUCKET: {{ .Values.storage.s3_bucket }}
S3_BUCKET_TAGS: {{ toJson .Values.storage.s3_bucket_tags | quote }}
S3_DEADLETTER_BUCKET: {{ .Values.storage.s3_deadletter_bucket }}
S3_INDEXING_BUCKET: {{ .Values.storage.s3_indexing_bucket }}
{{- end }}
Expand Down
1 change: 1 addition & 0 deletions charts/nucliadb_shared/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ storage:
s3_max_pool_connections: 30
s3_region_name: XX
s3_bucket: nucliadb_{kbid}
s3_bucket_tags:
s3_deadletter_bucket: XX
s3_indexing_bucket: XX

Expand Down
1 change: 1 addition & 0 deletions nucliadb/nucliadb/writer/tus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async def initialize():
endpoint_url=storage_settings.s3_endpoint,
region_name=storage_settings.s3_region_name,
bucket=storage_settings.s3_bucket,
bucket_tags=storage_settings.s3_bucket_tags,
)

storage_manager = S3FileStorageManager(storage_backend)
Expand Down
16 changes: 5 additions & 11 deletions nucliadb/nucliadb/writer/tus/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from nucliadb.writer.tus.dm import FileDataMangaer
from nucliadb.writer.tus.exceptions import CloudFileNotFound
from nucliadb.writer.tus.storage import BlobStore, FileStorageManager
from nucliadb_utils.storages.s3 import bucket_exists, create_bucket

RETRIABLE_EXCEPTIONS = (
botocore.exceptions.ClientError,
Expand Down Expand Up @@ -180,16 +181,7 @@ async def delete_upload(self, uri: str, kbid: str):

class S3BlobStore(BlobStore):
async def check_exists(self, bucket_name: str) -> bool:
exists = True
try:
res = await self._s3aioclient.head_bucket(Bucket=bucket_name)
if res["ResponseMetadata"]["HTTPStatusCode"] == 404:
exists = False
except botocore.exceptions.ClientError as e:
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
exists = False
return exists
return await bucket_exists(self._s3aioclient, bucket_name)

def get_bucket_name(self, kbid: str) -> str:
bucket_name = super().get_bucket_name(kbid)
Expand All @@ -201,7 +193,7 @@ def get_bucket_name(self, kbid: str) -> str:
async def create_bucket(self, bucket):
exists = await self.check_exists(bucket)
if not exists:
await self._s3aioclient.create_bucket(Bucket=bucket)
await create_bucket(self._s3aioclient, bucket, self.bucket_tags)
return exists

async def finalize(self):
Expand All @@ -217,8 +209,10 @@ async def initialize(
endpoint_url,
region_name,
bucket,
bucket_tags: Optional[dict[str, str]] = None,
):
self.bucket = bucket
self.bucket_tags = bucket_tags
self.source = CloudFile.Source.S3

self._exit_stack = AsyncExitStack()
Expand Down
8 changes: 7 additions & 1 deletion nucliadb_utils/nucliadb_utils/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ class StorageSettings(BaseSettings):
s3_max_pool_connections: int = 30
s3_endpoint: Optional[str] = None
s3_region_name: Optional[str] = None
s3_bucket: Optional[str] = None
s3_bucket: Optional[str] = Field(
default=None, description="KnowledgeBox S3 bucket name template"
)
s3_bucket_tags: Dict[str, str] = Field(
default={},
description="Map of tags with which S3 buckets will be tagged with: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketTagging.html", # noqa
)

local_files: Optional[str] = Field(
default=None,
Expand Down
62 changes: 45 additions & 17 deletions nucliadb_utils/nucliadb_utils/storages/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def __init__(
region_name: Optional[str] = None,
max_pool_connections: int = 30,
bucket: Optional[str] = None,
bucket_tags: Optional[dict[str, str]] = None,
):
self.source = CloudFile.S3
self.deadletter_bucket = deadletter_bucket
Expand All @@ -326,6 +327,8 @@ def __init__(
"CreateBucketConfiguration": {"LocationConstraint": self._region_name}
}

self._bucket_tags = bucket_tags

self.opts = dict(
aws_secret_access_key=self._aws_secret_key,
aws_access_key_id=self._aws_access_key,
Expand Down Expand Up @@ -379,27 +382,20 @@ async def iterate_bucket(
yield item

async def create_kb(self, kbid: str):
bucket_name = self.get_bucket_name(kbid)
missing = False
created = False
try:
res = await self._s3aioclient.head_bucket(Bucket=bucket_name)
if res["ResponseMetadata"]["HTTPStatusCode"] == 404:
missing = True
except botocore.exceptions.ClientError as e:
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
missing = True
else:
raise

if missing:
await self._s3aioclient.create_bucket(
Bucket=bucket_name, **self._bucket_creation_options
)
bucket_name = self.get_bucket_name(kbid)
bucket_exists = await self.bucket_exists(bucket_name)
if not bucket_exists:
await self.create_bucket(bucket_name)
created = True
return created

async def bucket_exists(self, bucket_name: str) -> bool:
return await bucket_exists(self._s3aioclient, bucket_name)

async def create_bucket(self, bucket_name: str):
await create_bucket(self._s3aioclient, bucket_name, self._bucket_tags)

async def schedule_delete_kb(self, kbid: str):
bucket_name = self.get_bucket_name(kbid)

Expand Down Expand Up @@ -446,3 +442,35 @@ async def delete_kb(self, kbid: str):
if error_code in (200, 204):
deleted = True
return deleted, conflict


async def bucket_exists(client: AioSession, bucket_name: str) -> bool:
exists = True
try:
res = await client.head_bucket(Bucket=bucket_name)
if res["ResponseMetadata"]["HTTPStatusCode"] == 404:
exists = False
except botocore.exceptions.ClientError as e:
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
exists = False
return exists


async def create_bucket(
client: AioSession, bucket_name: str, bucket_tags: Optional[dict[str, str]] = None
):
# Create the bucket
await client.create_bucket(Bucket=bucket_name)

if bucket_tags is not None and len(bucket_tags) > 0:
# Set bucket tags
await client.put_bucket_tagging(
Bucket=bucket_name,
Tagging={
"TagSet": [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in bucket_tags.items()
]
},
)
3 changes: 2 additions & 1 deletion nucliadb_utils/nucliadb_utils/tests/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def check(self):
try:
response = requests.get(f"http://{self.host}:{self.get_port()}")
return response.status_code == 404
except:
except Exception:
return False


Expand All @@ -68,6 +68,7 @@ async def s3_storage(s3):
use_ssl=False,
region_name=None,
bucket="test-{kbid}",
bucket_tags={"testTag": "test"},
)
await storage.initialize()
MAIN["storage"] = storage
Expand Down
1 change: 1 addition & 0 deletions nucliadb_utils/nucliadb_utils/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ async def get_storage(
region_name=storage_settings.s3_region_name,
max_pool_connections=storage_settings.s3_max_pool_connections,
bucket=storage_settings.s3_bucket,
bucket_tags=storage_settings.s3_bucket_tags,
)
set_utility(Utility.STORAGE, s3util)
await s3util.initialize()
Expand Down
Loading