diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c688eb3..013d320 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,12 @@ -7.0.6 (unreleased) +8.0.0 (unreleased) ------------------ -- Nothing changed yet. +- Support for elasticsearch 8. +- Changing ES fixture to bitnami version 8. Tests passing for both + versions +- Removing deprecated doc_type argument from get calls. +- Dropping support for elastic search 6.x + [nilbacardit26] 7.0.5 (2023-12-20) diff --git a/README.rst b/README.rst index 0398643..5a46d61 100644 --- a/README.rst +++ b/README.rst @@ -6,15 +6,15 @@ GUILLOTINA_ELASTICSEARCH .. image:: https://travis-ci.org/guillotinaweb/guillotina_elasticsearch.svg?branch=master :target: https://travis-ci.org/guillotinaweb/guillotina_elasticsearch -Elasticsearch integration for guillotina. +Elasticsearch integration for guillotina. Supports Elastic search 7.x +and 8.x Installation ------------ -`pip install guillotina_elasticsearch` defaults to Elasticsearch 7.x -support. Pin `aioelasticsearch<0.6.0` to enable support for -Elasticsearch 6.x. +`pip install guillotina_elasticsearch` defaults to Elasticsearch 8.x +support. Configuration @@ -28,7 +28,7 @@ config.yaml can include elasticsearch section index_name_prefix: "guillotina-" connection_settings: hosts: - - "127.0.0.1:9200" + - "http://127.0.0.1:9200" sniffer_timeout: 0.5 sniff_on_start: true security_query_builder: "guillotina_elasticsearch.queries.build_security_query" @@ -52,6 +52,21 @@ Example custom `security_query_builder` settings: } } +Development and testing +----------------------- +Setup your python virtual environment for version >=3,8. Tested with +3.8, 3.9 and 3.10 + +.. code-block:: bash + + # Linux + pip install -e ".[test]" + pytest tests/ + +By default the tests run an ES fixture with version 8. If you +want to run the tests for ES version 7, change the image version in +the conftest.py + Installation on a site ---------------------- @@ -84,6 +99,24 @@ New index and delete requests are performed on both indexes during live migratio It is also smart about how to migrate, doing a diff on the mapping and only reindexing the fields that changed. +Breaking changes in 8.0.0 +------------------------- + +In this version, the library elasticsearch 7 has been upgraded to +elasticsearch 8. There are some changes that need to be taken into +account in the settings of old elasticsearch config files. + +- The hosts field in the guillotina's configuration file, need to + include the scheme: http or https +- The sniffer_timeout in the guillotina's configureation file, can not be None +- The doc_type has been removed. Specifying types in requests is no longer supported. +- The include_type_name parameter is removed. + +The elasticsearch field of the config.yaml file is directly passed to +instantiate AsyncElasticsearch. The class definition is the same of +the synchronous one, to know how to configure your ES take a look at: +https://elasticsearch-py.readthedocs.io/en/v8.12.0/api/elasticsearch.html#elasticsearch.Elasticsearch + Breaking changes in 2.0 ----------------------- diff --git a/VERSION b/VERSION index 9dfdb33..6b2f21a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.0.6.dev0 +8.0.0.dev0 diff --git a/guillotina_elasticsearch/commands/vacuum.py b/guillotina_elasticsearch/commands/vacuum.py index 656603f..291c1e5 100644 --- a/guillotina_elasticsearch/commands/vacuum.py +++ b/guillotina_elasticsearch/commands/vacuum.py @@ -1,3 +1,4 @@ +from elastic_transport import ObjectApiResponse from elasticsearch import AsyncElasticsearch from guillotina import task_vars from guillotina.commands import Command @@ -76,7 +77,7 @@ async def iter_batched_es_keys(self): indexes = [self.index_name] for index_name in indexes: try: - result = await self.conn.search( + result: ObjectApiResponse = await self.conn.search( index=index_name, scroll="15m", size=PAGE_SIZE, @@ -206,7 +207,7 @@ async def check_orphans(self): # delete by query for orphaned keys... data = await self.conn.delete_by_query( - index_name, body={"query": {"terms": {"_id": orphaned}}} + index=index_name, body={"query": {"terms": {"_id": orphaned}}} ) if data["deleted"] != len(orphaned): logger.warning( @@ -233,10 +234,11 @@ async def check_missing(self): async for batch in self.iter_paged_db_keys([self.container.__uuid__]): oids = [r["zoid"] for r in batch] try: - results = await self.conn.search( + results: ObjectApiResponse = await self.conn.search( index=self.index_name, body={"query": {"terms": {"uuid": oids}}}, _source=False, + fields=["tid", "parent_uuid"], stored_fields="tid,parent_uuid", size=PAGE_SIZE, ) diff --git a/guillotina_elasticsearch/migration.py b/guillotina_elasticsearch/migration.py index 37181c3..2690472 100644 --- a/guillotina_elasticsearch/migration.py +++ b/guillotina_elasticsearch/migration.py @@ -203,7 +203,7 @@ async def create_next_index(self): next_index_name = await self.index_manager.start_migration() # The index is created in the same transaction the registry is updated # to prevent another process from accessing the 'next_index' and not finding it - if await self.conn.indices.exists(next_index_name): + if await self.conn.indices.exists(index=next_index_name): if self.force: # delete and recreate self.response.write("Clearing index") @@ -215,17 +215,15 @@ async def create_next_index(self): async def copy_to_next_index(self): real_index_name = await self.index_manager.get_index_name() data = await self.conn.reindex( - { - "source": {"index": real_index_name, "size": 100}, - "dest": {"index": self.work_index_name}, - }, - params={"wait_for_completion": "false"}, + source={"index": real_index_name, "size": 100}, + dest={"index": self.work_index_name}, + wait_for_completion=False, ) self.active_task_id = task_id = data["task"] task_completed = False while not task_completed: await asyncio.sleep(10) - data = await self.conn.tasks.get(task_id) + data = await self.conn.tasks.get(task_id=task_id) task_completed = data["completed"] if task_completed: break @@ -282,12 +280,14 @@ async def calculate_mapping_diff(self): all we care about is new fields... Missing ones are ignored and we don't care about it. """ - next_mappings = await self.conn.indices.get_mapping(self.work_index_name) + next_mappings = await self.conn.indices.get_mapping(index=self.work_index_name) next_mappings = next_mappings[self.work_index_name]["mappings"]["properties"] existing_index_name = await self.index_manager.get_real_index_name() try: - existing_mappings = await self.conn.indices.get_mapping(existing_index_name) + existing_mappings = await self.conn.indices.get_mapping( + index=existing_index_name + ) except elasticsearch.exceptions.NotFoundError: # allows us to upgrade when no index is present yet return next_mappings @@ -389,7 +389,6 @@ async def index_object(self, ob, full=False): await self.attempt_flush() async def attempt_flush(self): - if self.processed % 500 == 0: self.policy.invalidate_cache() num, _, _ = gc.get_count() @@ -573,40 +572,35 @@ async def run_migration(self): try: await self.conn.indices.update_aliases( - { - "actions": [ - { - "remove": { - "alias": alias_index_name, - "index": existing_index, - } - }, - { - "add": { - "alias": alias_index_name, - "index": self.work_index_name, - } - }, - ] - } + actions=[ + { + "remove": { + "alias": alias_index_name, + "index": existing_index, + } + }, + { + "add": { + "alias": alias_index_name, + "index": self.work_index_name, + } + }, + ] ) except elasticsearch.exceptions.NotFoundError: await self.conn.indices.update_aliases( - { - "actions": [ - { - "add": { - "alias": alias_index_name, - "index": self.work_index_name, - } + actions=[ + { + "add": { + "alias": alias_index_name, + "index": self.work_index_name, } - ] - } + } + ] ) - try: - await self.conn.indices.close(existing_index) - await self.conn.indices.delete(existing_index) + await self.conn.indices.close(index=existing_index) + await self.conn.indices.delete(index=existing_index) self.response.write("Old index deleted") except elasticsearch.exceptions.NotFoundError: pass diff --git a/guillotina_elasticsearch/schema.py b/guillotina_elasticsearch/schema.py index 80d51e1..ff2c916 100644 --- a/guillotina_elasticsearch/schema.py +++ b/guillotina_elasticsearch/schema.py @@ -53,7 +53,6 @@ def _addon_index(ob): def get_mappings(schemas=None, schema_info=False): - if schemas is None: schemas = [] for name, _ in get_utilities_for(IResourceFactory): diff --git a/guillotina_elasticsearch/tests/conftest.py b/guillotina_elasticsearch/tests/conftest.py index 3cbccf1..ecda8fd 100644 --- a/guillotina_elasticsearch/tests/conftest.py +++ b/guillotina_elasticsearch/tests/conftest.py @@ -1,19 +1,14 @@ from pytest_docker_fixtures import images -image_version = "7.5.1" +image_version_8 = "8.12.0-debian-11-r2" +image_version_7 = "7.17.16-debian-11-r3" # noqa images.configure( - "elasticsearch", - "docker.elastic.co/elasticsearch/elasticsearch", - image_version, + name="elasticsearch", + full=f"bitnami/elasticsearch:{image_version_8}", max_wait_s=90, - env={ - "xpack.security.enabled": None, # unset - "discovery.type": "single-node", - "http.host": "0.0.0.0", - "transport.host": "127.0.0.1", - }, + env={"ELASTICSEARCH_ENABLE_SECURITY": "false", "ELASTICSEARCH_HEAP_SIZE": "1g"}, ) diff --git a/guillotina_elasticsearch/tests/fixtures.py b/guillotina_elasticsearch/tests/fixtures.py index 281e62a..27897e4 100644 --- a/guillotina_elasticsearch/tests/fixtures.py +++ b/guillotina_elasticsearch/tests/fixtures.py @@ -51,12 +51,11 @@ def base_settings_configurator(settings): "index": elastic_search_analyzers_normalizers(), "connection_settings": { "hosts": [ - "{}:{}".format( + "http://{}:{}".format( getattr(elasticsearch, "host", "localhost"), getattr(elasticsearch, "port", "9200"), ) - ], - "sniffer_timeout": None, + ] }, } diff --git a/guillotina_elasticsearch/tests/test_migration.py b/guillotina_elasticsearch/tests/test_migration.py index 4129242..3caaded 100644 --- a/guillotina_elasticsearch/tests/test_migration.py +++ b/guillotina_elasticsearch/tests/test_migration.py @@ -227,11 +227,13 @@ async def test_updates_index_name(es_requester): search = get_utility(ICatalogUtility) im = get_adapter(container, IIndexManager) existing_index = await im.get_real_index_name() - assert await search.get_connection().indices.exists(existing_index) + assert await search.get_connection().indices.exists(index=existing_index) migrator = Migrator(search, container, force=True) await migrator.run_migration() - assert not await search.get_connection().indices.exists(existing_index) - assert await search.get_connection().indices.exists(migrator.work_index_name) + assert not await search.get_connection().indices.exists(index=existing_index) + assert await search.get_connection().indices.exists( + index=migrator.work_index_name + ) assert await im.get_real_index_name() == migrator.work_index_name @@ -358,11 +360,11 @@ async def test_search_works_on_new_docs_during_migration(es_requester): async def _test(): result1 = await search.get_connection().get( - index=next_index_name, doc_type="_all", id=resp["@uid"] + index=next_index_name, id=resp["@uid"] ) assert result1 is not None result2 = await search.get_connection().get( - index=index_name, doc_type="_all", id=resp["@uid"] + index=index_name, id=resp["@uid"] ) assert result2 is not None @@ -397,13 +399,13 @@ async def test_search_works_on_updated_docs_during_migration_when_missing( async def _test(): result1 = await search.get_connection().get( - index=index_name, doc_type="_all", id=resp["@uid"] + index=index_name, id=resp["@uid"] ) assert result1 is not None assert result1["_source"]["title"] == "Foobar2" with pytest.raises(elasticsearch.exceptions.NotFoundError): await search.get_connection().get( - index=next_index_name, doc_type="_all", id=resp["@uid"] + index=next_index_name, id=resp["@uid"] ) await run_with_retries(_test, requester) @@ -437,12 +439,12 @@ async def test_search_works_on_updated_docs_during_migration_when_present( async def _test(): result1 = await search.get_connection().get( - index=next_index_name, doc_type="_all", id=resp["@uid"] + index=next_index_name, id=resp["@uid"] ) assert result1 is not None assert result1["_source"]["title"] == "Foobar2" result2 = await search.get_connection().get( - index=index_name, doc_type="_all", id=resp["@uid"] + index=index_name, id=resp["@uid"] ) assert result2 is not None assert result2["_source"]["title"] == "Foobar2" @@ -469,11 +471,9 @@ async def test_delete_in_both_during_migration(es_requester): async def _test(): with pytest.raises(elasticsearch.exceptions.NotFoundError): await search.get_connection().get( - index=next_index_name, doc_type="_all", id=resp["@uid"] + index=next_index_name, id=resp["@uid"] ) with pytest.raises(elasticsearch.exceptions.NotFoundError): - await search.get_connection().get( - index=index_name, doc_type="_all", id=resp["@uid"] - ) + await search.get_connection().get(index=index_name, id=resp["@uid"]) await run_with_retries(_test, requester) diff --git a/guillotina_elasticsearch/tests/test_vacuum.py b/guillotina_elasticsearch/tests/test_vacuum.py index d2c14ac..c963fc0 100644 --- a/guillotina_elasticsearch/tests/test_vacuum.py +++ b/guillotina_elasticsearch/tests/test_vacuum.py @@ -152,7 +152,7 @@ async def test_reindexes_moved_content(es_requester): async def _test(): assert await search.get_doc_count(container) == 3 result = await search.get_connection().get( - index=index_name, doc_type="_all", id=resp3["@uid"] + index=index_name, id=resp3["@uid"] ) assert result is not None @@ -183,14 +183,12 @@ async def _test(): async def _test(): result = await search.get_connection().get( index=index_name, - doc_type="_all", id=resp3["@uid"], stored_fields="path", ) assert result["fields"]["path"] == ["/moved-foobar/foobar/foobar"] result = await search.get_connection().get( index=index_name, - doc_type="_all", id=resp1["@uid"], stored_fields="path,parent_uuid", ) @@ -213,14 +211,12 @@ async def _test(): async def __test(): result = await search.get_connection().get( index=index_name, - doc_type="_all", id=resp3["@uid"], stored_fields="path,parent_uuid", ) assert result["fields"]["path"] == ["/foobar/foobar/foobar"] result = await search.get_connection().get( index=index_name, - doc_type="_all", id=resp1["@uid"], stored_fields="path,parent_uuid", ) @@ -237,7 +233,6 @@ async def __test(): @pytest.mark.skipif(DATABASE == "DUMMY", reason="Not for dummy db") async def test_vacuum_with_multiple_containers(es_requester): async with es_requester as requester: - # create another container, force to iterate differently _, status = await requester( "POST", "/db", data=json.dumps({"@type": "Container", "id": "foobar"}) diff --git a/guillotina_elasticsearch/tests/utils.py b/guillotina_elasticsearch/tests/utils.py index 69c180c..99175c4 100644 --- a/guillotina_elasticsearch/tests/utils.py +++ b/guillotina_elasticsearch/tests/utils.py @@ -90,8 +90,8 @@ async def cleanup_es(es_host, prefix=""): continue if name.startswith(prefix): try: - await conn.indices.delete_alias(index, name) - await conn.indices.delete(index) + await conn.indices.delete_alias(index=index, name=name) + await conn.indices.delete(index=index) except ( elasticsearch.exceptions.AuthorizationException, elasticsearch.exceptions.NotFoundError, @@ -104,6 +104,6 @@ async def cleanup_es(es_host, prefix=""): continue if index_name.startswith(prefix): try: - await conn.indices.delete(index_name) + await conn.indices.delete(index=index_name) except elasticsearch.exceptions.AuthorizationException: pass diff --git a/guillotina_elasticsearch/utility.py b/guillotina_elasticsearch/utility.py index f7a1e56..5cdc76b 100644 --- a/guillotina_elasticsearch/utility.py +++ b/guillotina_elasticsearch/utility.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from elastic_transport import ObjectApiResponse from elasticsearch import AsyncElasticsearch from guillotina import app_settings from guillotina import configure @@ -55,7 +56,6 @@ def __init__(self): def get(self, loop=None): if self._conn is None: self._conn = AsyncElasticsearch( - loop=loop, **app_settings.get("elasticsearch", {}).get("connection_settings"), ) return self._conn @@ -70,7 +70,6 @@ async def close(self, loop=None): class ElasticSearchUtility(DefaultSearchUtility): - index_count = 0 def __init__(self, settings={}, loop=None): @@ -118,20 +117,28 @@ async def finalize(self, app): if self._conn_util is not None: await self._conn_util.close() + async def enable_id_field_data_access(self): + connection = self.get_connection() + settings = {"persistent": {"indices.id_field_data.enabled": True}} + await connection.cluster.put_settings(body=settings) + async def check_supported_version(self): try: connection = self.get_connection() info = await connection.info() except Exception: logger.warning( - "Could not check current es version. " "Only 6.x and 7.x are supported" + "Could not check current es version. " "Only 7.x and 8.x are supported" ) return es_version = info["version"]["number"] - - if not es_version.startswith("7"): + major_number_release = es_version.split(".")[0] + if major_number_release not in ("7", "8"): raise Exception(f"ES cluster version not supported: {es_version}") + if major_number_release == "8": + # If in version 8, we need to enable indices.id_field_data.enabled + await self.enable_id_field_data_access() async def initialize_catalog(self, container): if not self.enabled: @@ -150,7 +157,7 @@ async def create_index( self, real_index_name, index_manager, settings=None, mappings=None ): if ":" in real_index_name: - raise Exception(f"Ivalid character ':' in index name: {real_index_name}") + raise Exception(f"Invalid character ':' in index name: {real_index_name}") if settings is None: settings = await index_manager.get_index_settings() @@ -160,18 +167,20 @@ async def create_index( settings = {"settings": settings, "mappings": mappings} conn = self.get_connection() - await conn.indices.create(real_index_name, settings) + await conn.indices.create(index=real_index_name, body=settings) async def _delete_index(self, im): index_name = await im.get_index_name() real_index_name = await im.get_real_index_name() conn = self.get_connection() - await safe_es_call(conn.indices.delete_alias, real_index_name, index_name) - await safe_es_call(conn.indices.delete, real_index_name) - await safe_es_call(conn.indices.delete, index_name) + await safe_es_call( + conn.indices.delete_alias, index=real_index_name, name=index_name + ) + await safe_es_call(conn.indices.delete, index=real_index_name) + await safe_es_call(conn.indices.delete, index=index_name) migration_index = await im.get_migration_index_name() if migration_index: - await safe_es_call(conn.indices.delete, migration_index) + await safe_es_call(conn.indices.delete, index=migration_index) async def remove_catalog(self, container): if not self.enabled: @@ -272,8 +281,10 @@ async def search_raw( logger.debug("Generated query %s", json.dumps(query)) conn = self.get_connection() - - result = await conn.search(index=index, **q) + if "size" in q["body"] and "size" in q: + # ValueError: Received multiple values for 'size', specify parameters using either body or parameters, not both. + del q["size"] + result: ObjectApiResponse = await conn.search(index=index, **q) if result.get("_shards", {}).get("failed", 0) > 0: logger.warning(f'Error running query: {result["_shards"]}') error_message = "Unknown" @@ -350,7 +361,7 @@ async def call_unindex_all_children(self, container, index_name, content_path): async def _delete_by_query(self, path_query, index_name): conn = self.get_connection() result = await conn.delete_by_query( - index_name, + index=index_name, body=path_query, ignore_unavailable="true", conflicts="proceed", @@ -363,7 +374,9 @@ async def _delete_by_query(self, path_query, index_name): else: self.log_result(result, "Deletion of children") - async def update_by_query(self, query, context=None, indexes=None): + async def update_by_query( + self, query, context=None, indexes=None + ) -> ObjectApiResponse: if indexes is None: container = get_current_container() indexes = await self.get_current_indexes(container) @@ -375,10 +388,10 @@ async def update_by_query(self, query, context=None, indexes=None): interval=1, max_tries=5, ) - async def _update_by_query(self, query, index_name): + async def _update_by_query(self, query, index_name) -> ObjectApiResponse: conn = self.get_connection() result = await conn.update_by_query( - index_name, + index=index_name, body=query, ignore_unavailable="true", conflicts="proceed", @@ -532,7 +545,8 @@ async def update(self, container, datas, response=noop_response, flush_all=False self.log_result(result) return result - def log_result(self, result, label="ES Query"): + def log_result(self, result: ObjectApiResponse, label="ES Query"): + result = result.body if "errors" in result and result["errors"]: try: if result["error"]["caused_by"]["type"] in ( diff --git a/setup.py b/setup.py index 4915938..312ad5e 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ "pytest-cov", "pytest-docker-fixtures[pg]>=1.3.0", "prometheus-client>=0.9.0", # TODO: remove - "docker>=5.0.0,<6.0.0" + "docker>=6.0.0,<=6.1.3" ] @@ -37,7 +37,7 @@ packages=find_packages(exclude=["ez_setup"]), install_requires=[ "guillotina>=6.0.0a16", - "elasticsearch[async]>=7.8.0,<8.0.0", + "elasticsearch[async]>=8.0.0,<=8.12.0", "mypy_extensions", "lru-dict", "backoff"