From efb3e08db9548dc327f496364d21333840854309 Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Mon, 27 Nov 2023 10:40:49 +0100 Subject: [PATCH 1/4] Create s3 buckets with tags --- .../templates/nucliadb.cm.yaml | 1 + charts/nucliadb_shared/values.yaml | 1 + nucliadb/nucliadb/writer/tus/__init__.py | 1 + nucliadb/nucliadb/writer/tus/s3.py | 16 ++--- nucliadb_utils/nucliadb_utils/settings.py | 8 ++- nucliadb_utils/nucliadb_utils/storages/s3.py | 59 +++++++++++++++---- nucliadb_utils/nucliadb_utils/tests/s3.py | 3 +- nucliadb_utils/nucliadb_utils/utilities.py | 1 + 8 files changed, 67 insertions(+), 23 deletions(-) diff --git a/charts/nucliadb_shared/templates/nucliadb.cm.yaml b/charts/nucliadb_shared/templates/nucliadb.cm.yaml index 1110cd27ed..38f3b6b69e 100644 --- a/charts/nucliadb_shared/templates/nucliadb.cm.yaml +++ b/charts/nucliadb_shared/templates/nucliadb.cm.yaml @@ -36,6 +36,7 @@ data: S3_MAX_POOL_CONNECTIONS: {{ .Values.storage.s3_max_pool_connections | quote }} S3_REGION_NAME: {{ .Values.storage.s3_region_name }} S3_BUCKET: {{ .Values.storage.s3_bucket }} + S3_BUCKET_TAGS: {{ toJson .Values.storage.s3_bucket_tags | quote }} S3_DEADLETTER_BUCKET: {{ .Values.storage.s3_deadletter_bucket }} S3_INDEXING_BUCKET: {{ .Values.storage.s3_indexing_bucket }} {{- end }} diff --git a/charts/nucliadb_shared/values.yaml b/charts/nucliadb_shared/values.yaml index 06361c5ffb..45547cb371 100644 --- a/charts/nucliadb_shared/values.yaml +++ b/charts/nucliadb_shared/values.yaml @@ -32,6 +32,7 @@ storage: s3_max_pool_connections: 30 s3_region_name: XX s3_bucket: nucliadb_{kbid} + s3_bucket_tags: s3_deadletter_bucket: XX s3_indexing_bucket: XX diff --git a/nucliadb/nucliadb/writer/tus/__init__.py b/nucliadb/nucliadb/writer/tus/__init__.py index 4682fc436a..b4dafbea4c 100644 --- a/nucliadb/nucliadb/writer/tus/__init__.py +++ b/nucliadb/nucliadb/writer/tus/__init__.py @@ -76,6 +76,7 @@ async def initialize(): endpoint_url=storage_settings.s3_endpoint, region_name=storage_settings.s3_region_name, bucket=storage_settings.s3_bucket, + bucket_tags=storage_settings.s3_bucket_tags, ) storage_manager = S3FileStorageManager(storage_backend) diff --git a/nucliadb/nucliadb/writer/tus/s3.py b/nucliadb/nucliadb/writer/tus/s3.py index d0d54cb4ae..be51fa2eb7 100644 --- a/nucliadb/nucliadb/writer/tus/s3.py +++ b/nucliadb/nucliadb/writer/tus/s3.py @@ -34,6 +34,7 @@ from nucliadb.writer.tus.dm import FileDataMangaer from nucliadb.writer.tus.exceptions import CloudFileNotFound from nucliadb.writer.tus.storage import BlobStore, FileStorageManager +from nucliadb_utils.storages.s3 import bucket_exists, create_bucket RETRIABLE_EXCEPTIONS = ( botocore.exceptions.ClientError, @@ -180,16 +181,7 @@ async def delete_upload(self, uri: str, kbid: str): class S3BlobStore(BlobStore): async def check_exists(self, bucket_name: str) -> bool: - exists = True - try: - res = await self._s3aioclient.head_bucket(Bucket=bucket_name) - if res["ResponseMetadata"]["HTTPStatusCode"] == 404: - exists = False - except botocore.exceptions.ClientError as e: - error_code = int(e.response["Error"]["Code"]) - if error_code == 404: - exists = False - return exists + return await bucket_exists(self._s3aioclient, bucket_name) def get_bucket_name(self, kbid: str) -> str: bucket_name = super().get_bucket_name(kbid) @@ -201,7 +193,7 @@ def get_bucket_name(self, kbid: str) -> str: async def create_bucket(self, bucket): exists = await self.check_exists(bucket) if not exists: - await self._s3aioclient.create_bucket(Bucket=bucket) + await create_bucket(self._s3aioclient, bucket, self.bucket_tags) return exists async def finalize(self): @@ -217,8 +209,10 @@ async def initialize( endpoint_url, region_name, bucket, + bucket_tags: Optional[dict[str, str]] = None, ): self.bucket = bucket + self.bucket_tags = bucket_tags self.source = CloudFile.Source.S3 self._exit_stack = AsyncExitStack() diff --git a/nucliadb_utils/nucliadb_utils/settings.py b/nucliadb_utils/nucliadb_utils/settings.py index ad3a69ecf0..8c70993959 100644 --- a/nucliadb_utils/nucliadb_utils/settings.py +++ b/nucliadb_utils/nucliadb_utils/settings.py @@ -97,7 +97,13 @@ class StorageSettings(BaseSettings): s3_max_pool_connections: int = 30 s3_endpoint: Optional[str] = None s3_region_name: Optional[str] = None - s3_bucket: Optional[str] = None + s3_bucket: Optional[str] = Field( + default=None, description="KnowledgeBox S3 bucket name template" + ) + s3_bucket_tags: Dict[str, str] = Field( + default={}, + description="Map of tags with which S3 buckets will be tagged with: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketTagging.html", # noqa + ) local_files: Optional[str] = Field( default=None, diff --git a/nucliadb_utils/nucliadb_utils/storages/s3.py b/nucliadb_utils/nucliadb_utils/storages/s3.py index 316bb92da6..70b9a03e0c 100644 --- a/nucliadb_utils/nucliadb_utils/storages/s3.py +++ b/nucliadb_utils/nucliadb_utils/storages/s3.py @@ -312,6 +312,7 @@ def __init__( region_name: Optional[str] = None, max_pool_connections: int = 30, bucket: Optional[str] = None, + bucket_tags: Optional[dict[str, str]] = None, ): self.source = CloudFile.S3 self.deadletter_bucket = deadletter_bucket @@ -326,6 +327,8 @@ def __init__( "CreateBucketConfiguration": {"LocationConstraint": self._region_name} } + self._bucket_tags = bucket_tags + self.opts = dict( aws_secret_access_key=self._aws_secret_key, aws_access_key_id=self._aws_access_key, @@ -379,26 +382,30 @@ async def iterate_bucket( yield item async def create_kb(self, kbid: str): - bucket_name = self.get_bucket_name(kbid) - missing = False created = False + bucket_name = self.get_bucket_name(kbid) + bucket_exists = await self.bucket_exists(bucket_name) + if not bucket_exists: + await self.create_bucket(bucket_name) + created = True + return created + + async def bucket_exists(self, bucket_name: str): try: res = await self._s3aioclient.head_bucket(Bucket=bucket_name) if res["ResponseMetadata"]["HTTPStatusCode"] == 404: - missing = True + return False + else: + return True except botocore.exceptions.ClientError as e: error_code = int(e.response["Error"]["Code"]) if error_code == 404: - missing = True + return False else: raise - if missing: - await self._s3aioclient.create_bucket( - Bucket=bucket_name, **self._bucket_creation_options - ) - created = True - return created + async def create_bucket(self, bucket_name: str): + await create_bucket(self._s3aioclient, bucket_name, self._bucket_tags) async def schedule_delete_kb(self, kbid: str): bucket_name = self.get_bucket_name(kbid) @@ -446,3 +453,35 @@ async def delete_kb(self, kbid: str): if error_code in (200, 204): deleted = True return deleted, conflict + + +async def bucket_exists(client: AioSession, bucket_name: str) -> bool: + exists = True + try: + res = await client.head_bucket(Bucket=bucket_name) + if res["ResponseMetadata"]["HTTPStatusCode"] == 404: + exists = False + except botocore.exceptions.ClientError as e: + error_code = int(e.response["Error"]["Code"]) + if error_code == 404: + exists = False + return exists + + +async def create_bucket( + client: AioSession, bucket_name: str, bucket_tags: Optional[dict[str, str]] = None +): + # Create the bucket + await client.create_bucket(Bucket=bucket_name) + + if bucket_tags is not None and len(bucket_tags) > 0: + # Set bucket tags + await client.put_bucket_tagging( + Bucket=bucket_name, + Tagging={ + "TagSet": [ + {"Key": tag_key, "Value": tag_value} + for tag_key, tag_value in bucket_tags.items() + ] + }, + ) diff --git a/nucliadb_utils/nucliadb_utils/tests/s3.py b/nucliadb_utils/nucliadb_utils/tests/s3.py index 781c01a421..0c6d636076 100644 --- a/nucliadb_utils/nucliadb_utils/tests/s3.py +++ b/nucliadb_utils/nucliadb_utils/tests/s3.py @@ -43,7 +43,7 @@ def check(self): try: response = requests.get(f"http://{self.host}:{self.get_port()}") return response.status_code == 404 - except: + except Exception: return False @@ -68,6 +68,7 @@ async def s3_storage(s3): use_ssl=False, region_name=None, bucket="test-{kbid}", + bucket_tags={"testTag": "test"}, ) await storage.initialize() MAIN["storage"] = storage diff --git a/nucliadb_utils/nucliadb_utils/utilities.py b/nucliadb_utils/nucliadb_utils/utilities.py index 2f257a8011..56be85fd78 100644 --- a/nucliadb_utils/nucliadb_utils/utilities.py +++ b/nucliadb_utils/nucliadb_utils/utilities.py @@ -111,6 +111,7 @@ async def get_storage( region_name=storage_settings.s3_region_name, max_pool_connections=storage_settings.s3_max_pool_connections, bucket=storage_settings.s3_bucket, + bucket_tags=storage_settings.s3_bucket_tags, ) set_utility(Utility.STORAGE, s3util) await s3util.initialize() From ec75a75d69ae004ff2f6475a2923a617eef33678 Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Mon, 27 Nov 2023 10:42:48 +0100 Subject: [PATCH 2/4] cleanup dup code --- nucliadb_utils/nucliadb_utils/storages/s3.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/nucliadb_utils/nucliadb_utils/storages/s3.py b/nucliadb_utils/nucliadb_utils/storages/s3.py index 70b9a03e0c..39299ad2e8 100644 --- a/nucliadb_utils/nucliadb_utils/storages/s3.py +++ b/nucliadb_utils/nucliadb_utils/storages/s3.py @@ -390,19 +390,8 @@ async def create_kb(self, kbid: str): created = True return created - async def bucket_exists(self, bucket_name: str): - try: - res = await self._s3aioclient.head_bucket(Bucket=bucket_name) - if res["ResponseMetadata"]["HTTPStatusCode"] == 404: - return False - else: - return True - except botocore.exceptions.ClientError as e: - error_code = int(e.response["Error"]["Code"]) - if error_code == 404: - return False - else: - raise + async def bucket_exists(self, bucket_name: str) -> bool: + return await bucket_exists(self._s3aioclient, bucket_name) async def create_bucket(self, bucket_name: str): await create_bucket(self._s3aioclient, bucket_name, self._bucket_tags) From 65f7906d5bdf7528aa423890cd0d94b834422aeb Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Mon, 27 Nov 2023 10:47:18 +0100 Subject: [PATCH 3/4] Extend tus tests --- nucliadb/nucliadb/writer/tests/test_tus.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nucliadb/nucliadb/writer/tests/test_tus.py b/nucliadb/nucliadb/writer/tests/test_tus.py index 9897f75bd9..db11c280db 100644 --- a/nucliadb/nucliadb/writer/tests/test_tus.py +++ b/nucliadb/nucliadb/writer/tests/test_tus.py @@ -48,6 +48,7 @@ async def s3_storage_tus(s3): ssl=False, region_name=None, bucket="test_{kbid}", + bucket_tags={"testTag": "test"}, ) yield storage await storage.finalize() From 041787789b815ea15629c6fcb4b394cb20dbdc1e Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Mon, 27 Nov 2023 11:01:50 +0100 Subject: [PATCH 4/4] fix serialization of summarize response --- nucliadb/nucliadb/search/predict.py | 2 +- nucliadb/nucliadb/search/tests/unit/test_predict.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nucliadb/nucliadb/search/predict.py b/nucliadb/nucliadb/search/predict.py index 992a3235fd..fdf14daa0a 100644 --- a/nucliadb/nucliadb/search/predict.py +++ b/nucliadb/nucliadb/search/predict.py @@ -476,7 +476,7 @@ async def summarize(self, kbid: str, item: SummarizeModel) -> SummarizedResponse ) await self.check_response(resp, expected_status=200) data = await resp.json() - return SummarizedResponse.parse_raw(data) + return SummarizedResponse.parse_obj(data) def get_answer_generator(response: aiohttp.ClientResponse): diff --git a/nucliadb/nucliadb/search/tests/unit/test_predict.py b/nucliadb/nucliadb/search/tests/unit/test_predict.py index 9d397c3235..87660f1f7e 100644 --- a/nucliadb/nucliadb/search/tests/unit/test_predict.py +++ b/nucliadb/nucliadb/search/tests/unit/test_predict.py @@ -418,7 +418,7 @@ async def test_summarize(): resources={"r1": SummarizedResource(summary="resource summary", tokens=10)} ) pe.session = get_mocked_session( - "POST", 200, json=summarized.json(), context_manager=False + "POST", 200, json=summarized.dict(), context_manager=False ) item = SummarizeModel(