Skip to content

Commit

Permalink
[NHUB-537] Various core changes for Newshub Agenda async
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkLark86 committed Nov 6, 2024
1 parent 3ab2188 commit 11c6b99
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 22 deletions.
2 changes: 2 additions & 0 deletions content_api/items/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
class CVItem:
qcode: fields.Keyword
name: fields.Keyword
scheme: fields.Keyword | None = None
schema: fields.Keyword | None = None


Expand Down Expand Up @@ -110,6 +111,7 @@ class ContentType(str, Enum):

class ContentAPIItem(ResourceModel, ModelWithVersions):
id: Annotated[str, Field(alias="_id")]
guid: str | None = None
associations: ContentAssociation | None = None
anpa_category: list[CVItem] = Field(default_factory=list)
body_html: fields.HTML | None = None
Expand Down
6 changes: 3 additions & 3 deletions superdesk/core/elastic/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,10 @@ def _format_doc(self, hit: Dict[str, Any]):

if hit.get("inner_hits"):
doc["_inner_hits"] = {}
for key, value in hit["innter_hits"].items():
doc["inner_hits"][key] = []
for key, value in hit["inner_hits"].items():
doc["_inner_hits"][key] = []
for item in value.get("hits", {}).get("hits", []):
doc["_inner_hits"][key].append(item)
doc["_inner_hits"][key].append(item.get("_source", {}))

return doc

Expand Down
4 changes: 4 additions & 0 deletions superdesk/core/elastic/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def _get_field_type_from_json_schema(
return type_schema["elastic_mapping"]

properties: Dict[str, Any] = {}

if not type_schema.get("properties"):
if "enum" in type_schema:
return {"type": "keyword"}
for type_field, type_props in type_schema["properties"].items():
type_field_type = _get_field_type_from_json_schema(schema, type_props)
if type_field_type is not None:
Expand Down
10 changes: 8 additions & 2 deletions superdesk/core/elastic/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from elasticsearch import AsyncElasticsearch, Elasticsearch, JSONSerializer, TransportError
from elasticsearch.exceptions import NotFoundError, RequestError

from superdesk.core.errors import ElasticNotConfiguredForResource

from .mapping import get_elastic_mapping_from_model
from .common import ElasticResourceConfig, ElasticClientConfig, generate_index_name
from .sync_client import ElasticResourceClient
Expand Down Expand Up @@ -131,10 +133,14 @@ def get_client_async(self, resource_name) -> ElasticResourceAsyncClient:
:raises KeyError: If the resource is not registered for use with Elasticsearch
"""

resource_client = self._resource_async_clients[resource_name]
try:
resource_client = self._resource_async_clients[resource_name]
except KeyError:
raise ElasticNotConfiguredForResource(resource_name)

config = self.app.resources.get_config(resource_name)
if config.elastic is None:
raise KeyError(f"Elasticsearch not enabled on resource '{resource_name}'")
raise ElasticNotConfiguredForResource(resource_name)
config_prefix = config.elastic.prefix

if not self._elastic_async_connections.get(config_prefix):
Expand Down
3 changes: 3 additions & 0 deletions superdesk/core/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class ElasticNotConfiguredForResource(KeyError):
def __init__(self, resource_name: str):
super().__init__(f"Elasticsearch not enabled on resource '{resource_name}'")
4 changes: 4 additions & 0 deletions superdesk/core/resources/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ def keyword_mapping() -> WithJsonSchema:
return Field(json_schema_extra={"elastic_mapping": {"type": "keyword"}})


def dynamic_mapping() -> WithJsonSchema:
return Field(json_schema_extra={"elastic_mapping": {"type": "object", "dynamic": True}})


def mapping_disabled(data_type: str) -> WithJsonSchema:
return Field(json_schema_extra={"elastic_mapping": {"type": data_type, "enabled": False}})

Expand Down
48 changes: 35 additions & 13 deletions superdesk/core/resources/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from motor.motor_asyncio import AsyncIOMotorCursor

from superdesk.core.types import SearchRequest, SortListParam, SortParam, ProjectedFieldArg
from superdesk.core.errors import ElasticNotConfiguredForResource
from superdesk.flask import g
from superdesk.utc import utcnow
from superdesk.cache import cache
Expand Down Expand Up @@ -163,7 +164,7 @@ async def find_one_raw(
try:
if not search_request.use_mongo:
item = await self.elastic.find_one(search_request)
except KeyError:
except ElasticNotConfiguredForResource:
pass

if search_request.use_mongo or item is None:
Expand Down Expand Up @@ -242,14 +243,22 @@ async def find_by_id_raw(
item_id = ObjectId(item_id) if self.id_uses_objectid() else item_id
try:
item = await self.elastic.find_by_id(item_id)
except KeyError:
except ElasticNotConfiguredForResource:
item = await self.mongo_async.find_one({"_id": item_id})

if item is None:
return None

return item if version is None else await self.get_item_version(item, version)

async def find_by_ids(self, ids: list[str | ObjectId]) -> list[ResourceModelType]:
cursor = await self.search({ID_FIELD: {"$in": ids}}, use_mongo=True)
return await cursor.to_list()

async def find_by_ids_raw(self, ids: list[str | ObjectId]) -> list[dict[str, Any]]:
cursor = await self.search({ID_FIELD: {"$in": ids}}, use_mongo=True)
return await cursor.to_list_raw()

async def search(self, lookup: Dict[str, Any], use_mongo=False) -> ResourceCursorAsync[ResourceModelType]:
"""Search the resource using the provided ``lookup``
Expand All @@ -264,7 +273,7 @@ async def search(self, lookup: Dict[str, Any], use_mongo=False) -> ResourceCurso
if not use_mongo:
response = await self.elastic.search(lookup)
return ElasticsearchResourceCursorAsync(cast(Type[ResourceModelType], self.config.data_class), response)
except KeyError:
except ElasticNotConfiguredForResource:
pass

response = self.mongo_async.find(lookup)
Expand Down Expand Up @@ -339,12 +348,12 @@ async def create(self, _docs: Sequence[ResourceModelType | dict[str, Any]]) -> L
:raises Pydantic.ValidationError: If any of the docs provided are not valid
"""

docs = self._convert_dicts_to_model(_docs)
docs = await self._convert_dicts_to_model(_docs)
await self.on_create(docs)

ids: List[str] = []

for doc in docs:
for index, doc in enumerate(docs):
await self.validate_create(doc)
versioned_model = get_versioned_model(doc)
if versioned_model is not None:
Expand All @@ -355,9 +364,20 @@ async def create(self, _docs: Sequence[ResourceModelType | dict[str, Any]]) -> L
doc.etag = doc_dict["_etag"] = self.generate_etag(doc_dict, self.config.etag_ignore_fields)
response = await self.mongo_async.insert_one(doc_dict)
ids.append(response.inserted_id)

# Update the provided docs, so the `_id` and `_etag` get applied to the supplied dicts
doc.id = response.inserted_id
docs_entry = _docs[index]
if isinstance(docs_entry, dict):
docs_entry.update(
dict(
_id=response.inserted_id,
_etag=doc.etag,
)
)
try:
await self.elastic.insert([doc_dict])
except KeyError:
except ElasticNotConfiguredForResource:
pass

if self.config.versioning:
Expand Down Expand Up @@ -430,7 +450,7 @@ async def update(self, item_id: Union[str, ObjectId], updates: Dict[str, Any], e
response = await self.mongo_async.update_one({"_id": item_id}, {"$set": updates_dict})
try:
await self.elastic.update(item_id, updates_dict)
except KeyError:
except ElasticNotConfiguredForResource:
pass

if self.config.versioning:
Expand Down Expand Up @@ -467,7 +487,7 @@ async def delete(self, doc: ResourceModelType, etag: str | None = None):
await self.mongo_async.delete_one({"_id": doc.id})
try:
await self.elastic.remove(doc.id)
except KeyError:
except ElasticNotConfiguredForResource:
pass
await self.on_deleted(doc)

Expand All @@ -492,7 +512,7 @@ async def delete_many(self, lookup: Dict[str, Any]) -> List[str]:

try:
await self.elastic.remove(doc.id)
except KeyError:
except ElasticNotConfiguredForResource:
pass

await self.on_deleted(doc)
Expand Down Expand Up @@ -612,7 +632,7 @@ async def find(
return ElasticsearchResourceCursorAsync(
cast(Type[ResourceModelType], self.config.data_class), cursor.hits
)
except KeyError:
except ElasticNotConfiguredForResource:
pass

return await self._mongo_find(search_request)
Expand All @@ -631,7 +651,7 @@ async def count(self, lookup: dict[str, Any] | None = None, use_mongo: bool = Fa
try:
if not use_mongo:
return await self.elastic.count(lookup)
except KeyError:
except ElasticNotConfiguredForResource:
pass

return await self.mongo_async.count_documents(lookup or {})
Expand Down Expand Up @@ -690,7 +710,9 @@ def _convert_req_to_mongo_sort(self, sort: SortParam | None) -> SortListParam:

return client_sort

def _convert_dicts_to_model(self, docs: Sequence[ResourceModelType | dict[str, Any]]) -> List[ResourceModelType]:
async def _convert_dicts_to_model(
self, docs: Sequence[ResourceModelType | dict[str, Any]]
) -> List[ResourceModelType]:
return [self.get_model_instance_from_dict(doc) if isinstance(doc, dict) else doc for doc in docs]

def validate_etag(self, original: ResourceModelType, etag: str | None) -> None:
Expand Down Expand Up @@ -808,7 +830,7 @@ async def system_update(self, item_id: ObjectId | str, updates: dict[str, Any])
await self.mongo_async.update_one({"_id": item_id}, {"$set": updates})
try:
await self.elastic.update(item_id, updates)
except KeyError:
except ElasticNotConfiguredForResource:
pass


Expand Down
8 changes: 4 additions & 4 deletions superdesk/core/resources/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from superdesk.errors import SuperdeskApiError


SYSTEM_FIELDS = ["_id", "_type", "_resource", "_etag"]
SYSTEM_FIELDS = {"_id", "_type", "_resource", "_etag"}


def get_projection_from_request(req: SearchRequest) -> tuple[bool, list[str]] | tuple[None, None]:
Expand All @@ -38,14 +38,14 @@ def get_projection_from_request(req: SearchRequest) -> tuple[bool, list[str]] |
if not projection_data:
# No projection will be used
return None, None
elif isinstance(projection_data, list):
elif isinstance(projection_data, (list, set)):
# Projection: include these fields only
return True, list(set(projection_data + SYSTEM_FIELDS))
return True, list(set(projection_data) | SYSTEM_FIELDS)
elif isinstance(projection_data, dict):
if next(iter(projection_data.values()), None) in [True, 1]:
# Projection: include these fields only
return True, list(
set([field for field, value in projection_data.items() if value is True or value == 1] + SYSTEM_FIELDS)
set([field for field, value in projection_data.items() if value is True or value == 1]) | SYSTEM_FIELDS
)
else:
# Projection: exclude these fields
Expand Down
4 changes: 4 additions & 0 deletions superdesk/core/types/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ESQuery:
query: ESBoolQuery = Field(default_factory=ESBoolQuery)
post_filter: ESBoolQuery = Field(default_factory=ESBoolQuery)
aggs: dict[str, Any] = Field(default_factory=dict)
exclude_fields: list[str] = Field(default_factory=list)

def generate_query_dict(self, query: dict[str, Any] | None = None) -> dict[str, Any]:
if query is None:
Expand Down Expand Up @@ -121,6 +122,9 @@ def generate_query_dict(self, query: dict[str, Any] | None = None) -> dict[str,
if self.post_filter.filter:
query["post_filter"]["bool"]["filter"] = self.post_filter.filter

if self.exclude_fields:
query.setdefault("_source", {}).setdefault("excludes", []).extend(self.exclude_fields)

return query


Expand Down
3 changes: 3 additions & 0 deletions superdesk/core/types/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,6 @@ class RestGetResponse(TypedDict, total=False):

#: Response metadata
_meta: RestResponseMeta

#: Elasticsearch aggregations result
_aggregations: dict[str, Any]
14 changes: 14 additions & 0 deletions superdesk/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,17 @@ def get_conf(key: str, default: bool | str) -> bool | str:
"timestamp": t.isoformat(),
"identifier": hints["id"],
}


def str_to_date(value: str | None) -> datetime | None:
"""Convert a string to a datetime instance"""

date_format: str = get_app_config("DATE_FORMAT") or "%Y-%m-%dT%H:%M:%S+0000"
return datetime.strptime(value, date_format) if value else None


def date_to_str(value: datetime | None) -> str | None:
"""Convert a datetime instance to a string"""

date_format: str = get_app_config("DATE_FORMAT") or "%Y-%m-%dT%H:%M:%S+0000"
return datetime.strftime(value, date_format) if value else None

0 comments on commit 11c6b99

Please sign in to comment.