diff --git a/nucliadb/src/nucliadb/ingest/orm/resource.py b/nucliadb/src/nucliadb/ingest/orm/resource.py index da2b22ad8e..d735afb4aa 100644 --- a/nucliadb/src/nucliadb/ingest/orm/resource.py +++ b/nucliadb/src/nucliadb/ingest/orm/resource.py @@ -36,8 +36,9 @@ from nucliadb.ingest.fields.text import Text from nucliadb.ingest.orm.brain import FilePagePositions, ResourceBrain from nucliadb.ingest.orm.metrics import processor_observer +from nucliadb_models import content_types from nucliadb_models.common import CloudLink -from nucliadb_models.writer import GENERIC_MIME_TYPE +from nucliadb_models.content_types import GENERIC_MIME_TYPE from nucliadb_protos import utils_pb2, writer_pb2 from nucliadb_protos.resources_pb2 import AllFieldIDs as PBAllFieldIDs from nucliadb_protos.resources_pb2 import ( @@ -1231,8 +1232,17 @@ def maybe_update_basic_icon(basic: PBBasic, mimetype: Optional[str]) -> bool: if basic.icon not in (None, "", "application/octet-stream", GENERIC_MIME_TYPE): # Icon already set or detected return False + if not mimetype: return False + + if not content_types.valid(mimetype): + logger.warning( + "Invalid mimetype. Skipping icon update.", + extra={"mimetype": mimetype, "rid": basic.uuid, "slug": basic.slug}, + ) + return False + basic.icon = mimetype return True diff --git a/nucliadb/src/nucliadb/writer/api/v1/upload.py b/nucliadb/src/nucliadb/writer/api/v1/upload.py index 0b013b17f7..196ab8140a 100644 --- a/nucliadb/src/nucliadb/writer/api/v1/upload.py +++ b/nucliadb/src/nucliadb/writer/api/v1/upload.py @@ -18,7 +18,6 @@ # along with this program. If not, see . # import base64 -import mimetypes import pickle import uuid from datetime import datetime @@ -61,6 +60,7 @@ from nucliadb.writer.tus.storage import FileStorageManager from nucliadb.writer.tus.utils import parse_tus_metadata from nucliadb.writer.utilities import get_processing +from nucliadb_models import content_types from nucliadb_models.resource import NucliaDBRoles from nucliadb_models.utils import FieldIdString from nucliadb_models.writer import CreateResourcePayload, ResourceFileUploaded @@ -251,8 +251,15 @@ async def _tus_post( request_content_type = None if item is None: request_content_type = request.headers.get("content-type") - if not request_content_type: - request_content_type = guess_content_type(metadata["filename"]) + if request_content_type is None: + request_content_type = content_types.guess(metadata["filename"]) or "application/octet-stream" + + if request_content_type is not None and not content_types.valid(request_content_type): + raise HTTPException( + status_code=415, + detail=f"Unsupported content type: {request_content_type}", + ) + metadata.setdefault("content_type", request_content_type) metadata["implies_resource_creation"] = implies_resource_creation @@ -530,10 +537,18 @@ async def _tus_patch( if isinstance(item_payload, str): item_payload = item_payload.encode() creation_payload = pickle.loads(base64.b64decode(item_payload)) + + content_type = dm.get("metadata", {}).get("content_type") + if content_type is not None and not content_types.valid(content_type): + return HTTPClientError( + status_code=415, + detail=f"Unsupported content type: {content_type}", + ) + try: seqid = await store_file_on_nuclia_db( size=dm.get("size"), - content_type=dm.get("metadata", {}).get("content_type"), + content_type=content_type, override_resource_title=dm.get("metadata", {}).get("implies_resource_creation", False), filename=dm.get("metadata", {}).get("filename"), password=dm.get("metadata", {}).get("password"), @@ -702,8 +717,14 @@ async def _upload( # - content-type set by the user in the upload request header takes precedence. # - if not set, we will try to guess it from the filename and default to a generic binary content type otherwise content_type = request.headers.get("content-type") - if not content_type: - content_type = guess_content_type(filename) + if content_type is None: + content_type = content_types.guess(filename) or "application/octet-stream" + + if not content_types.valid(content_type): + raise HTTPException( + status_code=415, + detail=f"Unsupported content type: {content_type}", + ) metadata = {"content_type": content_type, "filename": filename} @@ -814,7 +835,6 @@ async def store_file_on_nuclia_db( item: Optional[CreateResourcePayload] = None, ) -> Optional[int]: # File is on NucliaDB Storage at path - partitioning = get_partitioning() processing = get_processing() storage = await get_storage(service_name=SERVICE_NAME) @@ -920,9 +940,3 @@ def maybe_b64decode(some_string: str) -> str: except ValueError: # not b64encoded return some_string - - -def guess_content_type(filename: str) -> str: - default = "application/octet-stream" - guessed, _ = mimetypes.guess_type(filename) - return guessed or default diff --git a/nucliadb/src/nucliadb/writer/resource/basic.py b/nucliadb/src/nucliadb/writer/resource/basic.py index b98b78dcee..c2b0c3f78f 100644 --- a/nucliadb/src/nucliadb/writer/resource/basic.py +++ b/nucliadb/src/nucliadb/writer/resource/basic.py @@ -24,6 +24,7 @@ from nucliadb.ingest.orm.utils import set_title from nucliadb.ingest.processing import PushPayload from nucliadb_models.common import FIELD_TYPES_MAP_REVERSE +from nucliadb_models.content_types import GENERIC_MIME_TYPE from nucliadb_models.file import FileField from nucliadb_models.link import LinkField from nucliadb_models.metadata import ( @@ -34,7 +35,6 @@ ) from nucliadb_models.text import TEXT_FORMAT_TO_MIMETYPE, PushTextFormat, Text from nucliadb_models.writer import ( - GENERIC_MIME_TYPE, ComingResourcePayload, CreateResourcePayload, UpdateResourcePayload, diff --git a/nucliadb/src/nucliadb/writer/resource/field.py b/nucliadb/src/nucliadb/writer/resource/field.py index bd4a3e59ff..b58144d784 100644 --- a/nucliadb/src/nucliadb/writer/resource/field.py +++ b/nucliadb/src/nucliadb/writer/resource/field.py @@ -29,9 +29,9 @@ from nucliadb.writer import SERVICE_NAME from nucliadb.writer.utilities import get_processing from nucliadb_models.common import FIELD_TYPES_MAP, FieldTypeName +from nucliadb_models.content_types import GENERIC_MIME_TYPE from nucliadb_models.conversation import PushConversation from nucliadb_models.writer import ( - GENERIC_MIME_TYPE, CreateResourcePayload, UpdateResourcePayload, ) diff --git a/nucliadb/tests/ingest/unit/orm/test_resource.py b/nucliadb/tests/ingest/unit/orm/test_resource.py index dc948d2894..f83cdcc8f4 100644 --- a/nucliadb/tests/ingest/unit/orm/test_resource.py +++ b/nucliadb/tests/ingest/unit/orm/test_resource.py @@ -141,12 +141,16 @@ def test_get_text_field_mimetype(text_format, mimetype): (Basic(), "text/html", True), (Basic(icon=""), "text/html", True), (Basic(icon="application/octet-stream"), "text/html", True), + # Invalid icon should not be updated + (Basic(), "invalid", False), ], ) def test_maybe_update_basic_icon(basic, icon, updated): assert maybe_update_basic_icon(basic, icon) == updated if updated: assert basic.icon == icon + else: + assert basic.icon != icon class Transaction: diff --git a/nucliadb/tests/standalone/integration/test_upload_download.py b/nucliadb/tests/standalone/integration/test_upload_download.py index e6359de8cd..0c73a4c7f0 100644 --- a/nucliadb/tests/standalone/integration/test_upload_download.py +++ b/nucliadb/tests/standalone/integration/test_upload_download.py @@ -65,9 +65,11 @@ async def test_file_tus_upload_and_download( knowledgebox_one, ): language = "ca" - filename = "image.jpg" + filename = "image.jpeg" md5 = "7af0916dba8b70e29d99e72941923529" - content_type = "image/jpg" + # aitable is a custom content type suffix to indicate + # that the file must be processed with the ai tables feature... + content_type = "image/jpeg+aitable" # Create a resource kb_path = f"/{KB_PREFIX}/{knowledgebox_one}" @@ -99,7 +101,7 @@ async def test_file_tus_upload_and_download( "upload-defer-length": "1", }, ) - assert resp.status_code == 201 + assert resp.status_code == 201, resp.json() # Get the URL to upload the file to url = resp.headers["location"] @@ -220,3 +222,209 @@ async def test_tus_upload_handles_unknown_upload_ids( assert resp.status_code == 404 error_detail = resp.json().get("detail") assert error_detail == "Resumable URI not found for upload_id: foobarid" + + +@pytest.mark.asyncio +async def test_content_type_validation( + local_storage_settings, + configure_redis_dm, + nucliadb_writer, + nucliadb_reader, + knowledgebox_one, +): + language = "ca" + filename = "image.jpg" + md5 = "7af0916dba8b70e29d99e72941923529" + + # Create a resource + kb_path = f"/{KB_PREFIX}/{knowledgebox_one}" + resp = await nucliadb_writer.post( + f"{kb_path}/{RESOURCES_PREFIX}", + json={ + "slug": "resource1", + "title": "Resource 1", + }, + ) + assert resp.status_code == 201 + resource = resp.json().get("uuid") + + # Start TUS upload + url = f"{kb_path}/{RESOURCE_PREFIX}/{resource}/file/field1/{TUSUPLOAD}" + upload_metadata = ",".join( + [ + f"filename {header_encode(filename)}", + f"language {header_encode(language)}", + f"md5 {header_encode(md5)}", + ] + ) + resp = await nucliadb_writer.post( + url, + headers={ + "tus-resumable": "1.0.0", + "upload-metadata": upload_metadata, + "content-type": "invalid-content-type", + "upload-defer-length": "1", + }, + ) + assert resp.status_code == 415 + error_detail = resp.json().get("detail") + assert error_detail == "Unsupported content type: invalid-content-type" + + +@pytest.mark.parametrize( + "content_type", + [ + "application/epub+zip", + "application/font-woff", + "application/generic", + "application/java-archive", + "application/java-vm", + "application/json", + "application/mp4", + "application/msword", + "application/octet-stream", + "application/pdf", + "application/pdf+aitable", + "application/postscript", + "application/rls-services+xml", + "application/rtf", + "application/stf-link", + "application/toml", + "application/vnd.jgraph.mxfile", + "application/vnd.lotus-organizer", + "application/vnd.ms-excel.sheet.macroenabled.12", + "application/vnd.ms-excel", + "application/vnd.ms-excel+aitable", + "application/vnd.ms-outlook", + "application/vnd.ms-powerpoint", + "application/vnd.ms-project", + "application/vnd.ms-word.document.macroenabled.12", + "application/vnd.oasis.opendocument.presentation", + "application/vnd.oasis.opendocument.text", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + "application/vnd.openxmlformats-officedocument.presentationml.presentation+aitable", + "application/vnd.openxmlformats-officedocument.presentationml.slideshow", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet+aitable", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document+aitable", + "application/vnd.openxmlformats-officedocument.wordprocessingml.template", + "application/vnd.rar", + "application/x-mobipocket-ebook", + "application/x-ms-shortcut", + "application/x-msdownload", + "application/x-ndjson", + "application/x-openscad", + "application/x-sql", + "application/x-zip-compressed", + "application/xml", + "application/zip", + "application/zstd", + "audio/aac", + "audio/mp4", + "audio/mpeg", + "audio/vnd.dlna.adts", + "audio/wav", + "audio/x-m4a", + "image/avif", + "image/gif", + "image/heic", + "image/jpeg", + "image/jpeg+aitable", + "image/png", + "image/png+aitable", + "image/svg+xml", + "image/tiff", + "image/vnd.djvu", + "image/vnd.dwg", + "image/webp", + "model/stl", + "text/calendar", + "text/css", + "text/csv", + "text/csv+aitable", + "text/html", + "text/javascript", + "text/jsx", + "text/markdown", + "text/plain", + "text/rtf", + "text/rtf+aitable", + "text/x-java-source", + "text/x-log", + "text/x-python", + "text/xml", + "text/yaml", + "video/mp4", + "video/mp4+aitable", + "video/quicktime", + "video/webm", + "video/x-m4v", + "video/x-ms-wmv", + "video/YouTube", + "multipart/form-data", + ], +) +def test_valid_content_types(content_type): + from nucliadb_models import content_types + + assert content_types.valid(content_type) + + +@pytest.mark.parametrize( + "content_type", + [ + "multipart/form-data;boundary=--------------------------472719318099714047986957", + ], +) +def test_invalid_content_types(content_type): + from nucliadb_models import content_types + + assert not content_types.valid(content_type) + + +@pytest.mark.parametrize( + "filename,content_type", + [ + # Text files + ("foo.txt", "text/plain"), + ("foo.docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"), + ("foo.pdf", "application/pdf"), + ("foo.json", "application/json"), + # Spreadsheets + ("foo.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"), + ("foo.csv", "text/csv"), + # Presentations + ("foo.pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"), + # Images + ("image.jpg", "image/jpeg"), + ("image.jpeg", "image/jpeg"), + ("image.png", "image/png"), + ("image.tiff", "image/tiff"), + ("image.gif", "image/gif"), + # Videos + ("video.mp4", "video/mp4"), + ("video.webm", "video/webm"), + ("video.avi", "video/x-msvideo"), + ("video.mpeg", "video/mpeg"), + # Audio + ("audio.mp3", "audio/mpeg"), + ("audio.wav", "audio/x-wav"), + # Web data + ("data.html", "text/html"), + ("data.xml", "application/xml"), + # Archive files + ("archive.zip", "application/zip"), + ("archive.rar", "application/x-rar-compressed"), + ("archive.tar", "application/x-tar"), + ("archive.tar.gz", "application/x-tar"), + # Invalid content types + ("foobar", None), + ("someuuidwithoutextension", None), + ("", None), + ], +) +def test_guess_content_type(filename, content_type): + from nucliadb_models import content_types + + assert content_types.guess(filename) == content_type diff --git a/nucliadb/tests/writer/test_fields.py b/nucliadb/tests/writer/test_fields.py index 4ca7cb6977..b330c08fa3 100644 --- a/nucliadb/tests/writer/test_fields.py +++ b/nucliadb/tests/writer/test_fields.py @@ -54,7 +54,7 @@ "content": { "text": "Hi people!", "format": "PLAIN", - "files": [load_file_as_FileB64_payload("/assets/image001.jpg", "image/jpg")], + "files": [load_file_as_FileB64_payload("/assets/image001.jpg", "image/jpeg")], }, "ident": "message_id_001", } @@ -64,7 +64,7 @@ TEST_FILE_PAYLOAD = { "language": "en", "password": "xxxxxx", - "file": load_file_as_FileB64_payload("/assets/image001.jpg", "image/jpg"), + "file": load_file_as_FileB64_payload("/assets/image001.jpg", "image/jpeg"), } TEST_EXTERNAL_FILE_PAYLOAD = { @@ -82,7 +82,7 @@ "content": { "text": "Hi people!", "format": "PLAIN", - "attachments": [load_file_as_FileB64_payload("/assets/image001.jpg", "image/jpg")], + "attachments": [load_file_as_FileB64_payload("/assets/image001.jpg", "image/jpeg")], }, "ident": "message_id_001", } diff --git a/nucliadb/tests/writer/test_files.py b/nucliadb/tests/writer/test_files.py index 0009f65804..ea2bb0ed19 100644 --- a/nucliadb/tests/writer/test_files.py +++ b/nucliadb/tests/writer/test_files.py @@ -85,7 +85,7 @@ async def test_knowledgebox_file_tus_upload_root(writer_api, knowledgebox_writer headers={ "tus-resumable": "1.0.0", "upload-metadata": f"filename {filename},language {language},md5 {md5}", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", }, ) @@ -135,7 +135,7 @@ async def test_knowledgebox_file_tus_upload_root(writer_api, knowledgebox_writer field = path.split("/")[-1] rid = path.split("/")[-3] assert writer.uuid == rid - assert writer.basic.icon == "image/jpg" + assert writer.basic.icon == "image/jpeg" assert writer.basic.title == "image.jpg" assert writer.files[field].language == "ca" assert writer.files[field].file.size == len(raw_bytes) @@ -156,7 +156,7 @@ async def test_knowledgebox_file_tus_upload_root(writer_api, knowledgebox_writer headers={ "tus-resumable": "1.0.0", "upload-metadata": f"filename {filename},language {language},md5 {md5}", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", }, ) @@ -174,7 +174,7 @@ async def test_knowledgebox_file_upload_root( f"/{KB_PREFIX}/{knowledgebox_writer}/{UPLOAD}", content=f.read(), headers={ - "content-type": "image/jpg", + "content-type": "image/jpeg", "X-MD5": "7af0916dba8b70e29d99e72941923529", }, ) @@ -193,7 +193,7 @@ async def test_knowledgebox_file_upload_root( field = body["field_id"] rid = body["uuid"] assert writer.uuid == rid - assert writer.basic.icon == "image/jpg" + assert writer.basic.icon == "image/jpeg" assert writer.files[field].file.size == 30472 storage = await get_storage() @@ -210,7 +210,7 @@ async def test_knowledgebox_file_upload_root( f"/{KB_PREFIX}/{knowledgebox_writer}/{UPLOAD}", content=f.read(), headers={ - "content-type": "image/jpg", + "content-type": "image/jpeg", "X-MD5": "7af0916dba8b70e29d99e72941923529", }, ) @@ -232,7 +232,7 @@ async def test_knowledgebox_file_upload_root_headers( "X-FILENAME": filename, "X-LANGUAGE": "ca", "X-MD5": "7af0916dba8b70e29d99e72941923529", - "content-type": "image/jpg", + "content-type": "image/jpeg", }, ) assert resp.status_code == 201 @@ -250,7 +250,7 @@ async def test_knowledgebox_file_upload_root_headers( field = body["field_id"] rid = body["uuid"] assert writer.uuid == rid - assert writer.basic.icon == "image/jpg" + assert writer.basic.icon == "image/jpeg" assert writer.basic.title == "image.jpg" assert writer.files[field].language == "ca" assert writer.files[field].file.size == 30472 @@ -275,7 +275,7 @@ async def test_knowledgebox_file_tus_upload_field(writer_api, knowledgebox_write headers={ "tus-resumable": "1.0.0", "upload-metadata": f"filename {filename},language {language},md5 {md5}", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", }, ) @@ -287,7 +287,7 @@ async def test_knowledgebox_file_tus_upload_field(writer_api, knowledgebox_write headers={ "tus-resumable": "1.0.0", "upload-metadata": f"filename {filename},language {language},md5 {md5}", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", }, ) @@ -338,7 +338,7 @@ async def test_knowledgebox_file_tus_upload_field(writer_api, knowledgebox_write field = path.split("/")[-1] rid = path.split("/")[-3] assert writer.uuid == rid - assert writer.basic.icon == "image/jpg" + assert writer.basic.icon == "image/jpeg" assert writer.basic.title == "" assert writer.files[field].language == "ca" assert writer.files[field].file.size == len(raw_bytes) @@ -366,7 +366,7 @@ async def test_knowledgebox_file_upload_field_headers(writer_api, knowledgebox_w "X-FILENAME": encoded_filename, "X-LANGUAGE": "ca", "X-MD5": "7af0916dba8b70e29d99e72941923529", - "content-type": "image/jpg", + "content-type": "image/jpeg", }, ) assert resp.status_code == 201 @@ -383,7 +383,7 @@ async def test_knowledgebox_file_upload_field_headers(writer_api, knowledgebox_w field = body["field_id"] rid = body["uuid"] assert writer.uuid == rid - assert writer.basic.icon == "image/jpg" + assert writer.basic.icon == "image/jpeg" assert writer.basic.title == "" assert writer.files[field].language == "ca" assert writer.files[field].file.size == 30472 @@ -409,7 +409,7 @@ async def test_knowledgebox_file_upload_field_sync(writer_api, knowledgebox_writ "X-FILENAME": filename, "X-LANGUAGE": "ca", "X-MD5": "7af0916dba8b70e29d99e72941923529", - "content-type": "image/jpg", + "content-type": "image/jpeg", }, ) assert resp.status_code == 201 @@ -437,7 +437,7 @@ async def test_file_tus_upload_field_by_slug(writer_api, knowledgebox_writer, re headers = { "tus-resumable": "1.0.0", "upload-metadata": f"filename {filename},language {language},md5 {md5}", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", } @@ -500,7 +500,7 @@ async def test_file_tus_upload_field_by_slug(writer_api, knowledgebox_writer, re field = path.split("/")[-1] rid = path.split("/")[-3] assert writer.uuid == rid - assert writer.basic.icon == "image/jpg" + assert writer.basic.icon == "image/jpeg" assert writer.basic.title == "" assert writer.files[field].language == "ca" assert writer.files[field].file.size == len(raw_bytes) @@ -526,7 +526,7 @@ async def test_file_tus_upload_urls_field_by_resource_id(writer_api, knowledgebo headers = { "tus-resumable": "1.0.0", "upload-metadata": f"filename {filename},language {language},md5 {md5}", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", } @@ -562,7 +562,7 @@ async def test_multiple_tus_file_upload_tries(writer_api, knowledgebox_writer, r async with writer_api(roles=[NucliaDBRoles.WRITER]) as client: headers = { "tus-resumable": "1.0.0", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", } @@ -633,7 +633,7 @@ async def test_file_upload_by_slug(writer_api, knowledgebox_writer): content=f.read(), headers={ "X-FILENAME": filename, - "content-type": "image/jpg", + "content-type": "image/jpeg", "X-MD5": "7af0916dba8b70e29d99e72941923529", }, ) @@ -653,7 +653,7 @@ async def test_file_upload_by_slug(writer_api, knowledgebox_writer): rid = body["uuid"] assert writer.uuid == rid - assert writer.basic.icon == "image/jpg" + assert writer.basic.icon == "image/jpeg" assert writer.files[field].file.size == 30472 assert writer.files[field].file.filename == filename @@ -683,7 +683,7 @@ async def test_tus_validates_intermediate_chunks_length(writer_api, knowledgebox headers={ "tus-resumable": "1.0.0", "upload-metadata": f"filename {filename},language {language},md5 {md5}", - "content-type": "image/jpg", + "content-type": "image/jpeg", "upload-defer-length": "1", }, ) diff --git a/nucliadb/tests/writer/unit/api/v1/test_upload.py b/nucliadb/tests/writer/unit/api/v1/test_upload.py index 857976a1b4..bb845e4933 100644 --- a/nucliadb/tests/writer/unit/api/v1/test_upload.py +++ b/nucliadb/tests/writer/unit/api/v1/test_upload.py @@ -24,7 +24,6 @@ from nucliadb.ingest.processing import ProcessingInfo, Source from nucliadb.writer.api.v1.upload import ( - guess_content_type, store_file_on_nuclia_db, validate_field_upload, ) @@ -87,19 +86,6 @@ async def test_store_file_on_nucliadb_does_not_store_passwords( assert not writer_bm.files[field].password -@pytest.mark.parametrize( - "filename,content_type", - [ - ("foo.png", "image/png"), - ("foo.pdf", "application/pdf"), - ("someuuidwithoutextension", "application/octet-stream"), - ("", "application/octet-stream"), - ], -) -def test_guess_content_type(filename, content_type): - assert guess_content_type(filename) == content_type - - @pytest.mark.parametrize( "rid,field,md5,exists,result", [ diff --git a/nucliadb_models/src/nucliadb_models/common.py b/nucliadb_models/src/nucliadb_models/common.py index 9eeeadacb4..4749c8f49a 100644 --- a/nucliadb_models/src/nucliadb_models/common.py +++ b/nucliadb_models/src/nucliadb_models/common.py @@ -23,9 +23,10 @@ from enum import Enum from typing import Any, Dict, List, Optional -from pydantic import BaseModel, Field, field_serializer, model_validator +from pydantic import BaseModel, Field, field_serializer, field_validator, model_validator from typing_extensions import Self +from nucliadb_models import content_types from nucliadb_protos import resources_pb2 FIELD_TYPE_CHAR_MAP = { @@ -92,10 +93,11 @@ class File(BaseModel): @model_validator(mode="after") def _check_internal_file_fields(self) -> Self: + if not content_types.valid(self.content_type): + raise ValueError(f"Unsupported content type: {self.content_type}") if self.uri: # Externally hosted file return self - if self.filename is None: raise ValueError(f"'filename' field is required") if self.payload is None: @@ -107,6 +109,7 @@ def _check_internal_file_fields(self) -> Self: self.md5 = result.hexdigest() except Exception: raise ValueError("MD5 could not be computed") + return self @property @@ -120,6 +123,12 @@ class FileB64(BaseModel): payload: str md5: str + @field_validator("content_type") + def check_content_type(cls, v): + if not content_types.valid(v): + raise ValueError(f"Unsupported content type: {v}") + return v + class CloudFile(BaseModel): uri: Optional[str] = None diff --git a/nucliadb_models/src/nucliadb_models/content_types.py b/nucliadb_models/src/nucliadb_models/content_types.py new file mode 100644 index 0000000000..00d0af2b85 --- /dev/null +++ b/nucliadb_models/src/nucliadb_models/content_types.py @@ -0,0 +1,95 @@ +# Copyright (C) 2021 Bosutech XXI S.L. +# +# nucliadb is offered under the AGPL v3.0 and as commercial software. +# For commercial licensing, contact us at info@nuclia.com. +# +# AGPL: +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +import mimetypes +from typing import Optional + +GENERIC_MIME_TYPE = "application/generic" + +NUCLIA_CUSTOM_CONTENT_TYPES = { + GENERIC_MIME_TYPE, + "application/stf-link", + "application/conversation", +} + +EXTRA_VALID_CONTENT_TYPES = { + "application/font-woff", + "application/toml", + "application/vnd.jgraph.mxfile", + "application/vnd.ms-excel.sheet.macroenabled.12", + "application/vnd.ms-outlook", + "application/vnd.rar", + "application/x-aportisdoc", + "application/x-archive", + "application/x-git", + "application/x-gzip", + "application/x-iwork-pages-sffpages", + "application/x-mach-binary", + "application/x-mobipocket-ebook", + "application/x-msdownload", + "application/x-ndjson", + "application/x-openscad", + "application/x-zip-compressed", + "application/zstd", + "audio/vnd.dlna.adts", + "audio/wav", + "audio/x-m4a", + "model/stl", + "multipart/form-data", + "text/jsx", + "text/markdown", + "text/mdx", + "text/rtf", + "text/x-c++", + "text/x-java-source", + "text/x-log", + "text/x-python-script", + "text/yaml", + "video/x-m4v", + "video/YouTube", +} | NUCLIA_CUSTOM_CONTENT_TYPES + + +def guess(filename: str) -> Optional[str]: + """ + Guess the content type of a file based on its filename. + Returns None if the content type could not be guessed. + >>> guess("example.jpg") + 'image/jpeg' + >>> guess("example") + None + """ + guessed, _ = mimetypes.guess_type(filename, strict=False) + return guessed + + +def valid(content_type: str) -> bool: + """ + Check if a content type is valid. + >>> valid("image/jpeg") + True + >>> valid("invalid") + False + """ + # The AI tables feature has been implemented via a custom mimetype suffix. + # Keep this until we have a better solution to handle this. + content_type = content_type.split("+aitable")[0] + in_standard = mimetypes.guess_extension(content_type, strict=False) is not None + return in_standard or content_type in EXTRA_VALID_CONTENT_TYPES diff --git a/nucliadb_models/src/nucliadb_models/writer.py b/nucliadb_models/src/nucliadb_models/writer.py index b57eb7e3a6..2d9232952d 100644 --- a/nucliadb_models/src/nucliadb_models/writer.py +++ b/nucliadb_models/src/nucliadb_models/writer.py @@ -22,6 +22,7 @@ from pydantic import BaseModel, Field, field_validator +from nucliadb_models import content_types from nucliadb_models.conversation import InputConversationField from nucliadb_models.file import FileField from nucliadb_models.link import LinkField @@ -37,8 +38,6 @@ from nucliadb_models.text import TextField from nucliadb_models.utils import FieldIdPattern, FieldIdString, SlugString -GENERIC_MIME_TYPE = "application/generic" - class FieldDefaults: title = Field(None, title="Title") @@ -104,13 +103,8 @@ class CreateResourcePayload(BaseModel): def icon_check(cls, v): if v is None: return v - - if "/" not in v: - raise ValueError("Icon should be a MIME string") - - if len(v.split("/")) != 2: - raise ValueError("Icon needs two parts of MIME string") - + if not content_types.valid(v): + raise ValueError(f"Icon is not a valid MIME string: {v}") return v @field_validator("extra")