Skip to content

Commit

Permalink
migrating to asyncelasticsearch 8
Browse files Browse the repository at this point in the history
  • Loading branch information
nilbacardit26 committed Jan 23, 2024
1 parent 0996419 commit e6bb81e
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 105 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
7.0.6 (unreleased)
8.0.0 (unreleased)
------------------

- Nothing changed yet.
- Support for elasticsearch 8.
- Changing ES fixture to bitnami version 8. Tests passing for both
versions
- Removing deprecated doc_type argument from get calls.
- Dropping support for elastic search 6.x
[nilbacardit26]


7.0.5 (2023-12-20)
Expand Down
43 changes: 38 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ GUILLOTINA_ELASTICSEARCH
.. image:: https://travis-ci.org/guillotinaweb/guillotina_elasticsearch.svg?branch=master
:target: https://travis-ci.org/guillotinaweb/guillotina_elasticsearch

Elasticsearch integration for guillotina.
Elasticsearch integration for guillotina. Supports Elastic search 7.x
and 8.x


Installation
------------

`pip install guillotina_elasticsearch` defaults to Elasticsearch 7.x
support. Pin `aioelasticsearch<0.6.0` to enable support for
Elasticsearch 6.x.
`pip install guillotina_elasticsearch` defaults to Elasticsearch 8.x
support.


Configuration
Expand All @@ -28,7 +28,7 @@ config.yaml can include elasticsearch section
index_name_prefix: "guillotina-"
connection_settings:
hosts:
- "127.0.0.1:9200"
- "http://127.0.0.1:9200"
sniffer_timeout: 0.5
sniff_on_start: true
security_query_builder: "guillotina_elasticsearch.queries.build_security_query"
Expand All @@ -52,6 +52,21 @@ Example custom `security_query_builder` settings:
}
}
Development and testing
-----------------------
Setup your python virtual environment for version >=3,8. Tested with
3.8, 3.9 and 3.10

.. code-block:: bash
# Linux
pip install -e ".[test]"
pytest tests/
By default the tests run an ES fixture with version 8. If you
want to run the tests for ES version 7, change the image version in
the conftest.py


Installation on a site
----------------------
Expand Down Expand Up @@ -84,6 +99,24 @@ New index and delete requests are performed on both indexes during live migratio
It is also smart about how to migrate, doing a diff on the mapping and only
reindexing the fields that changed.

Breaking changes in 8.0.0
-------------------------

In this version, the library elasticsearch 7 has been upgraded to
elasticsearch 8. There are some changes that need to be taken into
account in the settings of old elasticsearch config files.

- The hosts field in the guillotina's configuration file, need to
include the scheme: http or https
- The sniffer_timeout in the guillotina's configureation file, can not be None
- The doc_type has been removed. Specifying types in requests is no longer supported.
- The include_type_name parameter is removed.

The elasticsearch field of the config.yaml file is directly passed to
instantiate AsyncElasticsearch. The class definition is the same of
the synchronous one, to know how to configure your ES take a look at:
https://elasticsearch-py.readthedocs.io/en/v8.12.0/api/elasticsearch.html#elasticsearch.Elasticsearch


Breaking changes in 2.0
-----------------------
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7.0.6.dev0
8.0.0.dev0
8 changes: 5 additions & 3 deletions guillotina_elasticsearch/commands/vacuum.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from elastic_transport import ObjectApiResponse
from elasticsearch import AsyncElasticsearch
from guillotina import task_vars
from guillotina.commands import Command
Expand Down Expand Up @@ -76,7 +77,7 @@ async def iter_batched_es_keys(self):
indexes = [self.index_name]
for index_name in indexes:
try:
result = await self.conn.search(
result: ObjectApiResponse = await self.conn.search(
index=index_name,
scroll="15m",
size=PAGE_SIZE,
Expand Down Expand Up @@ -206,7 +207,7 @@ async def check_orphans(self):

# delete by query for orphaned keys...
data = await self.conn.delete_by_query(
index_name, body={"query": {"terms": {"_id": orphaned}}}
index=index_name, body={"query": {"terms": {"_id": orphaned}}}
)
if data["deleted"] != len(orphaned):
logger.warning(
Expand All @@ -233,10 +234,11 @@ async def check_missing(self):
async for batch in self.iter_paged_db_keys([self.container.__uuid__]):
oids = [r["zoid"] for r in batch]
try:
results = await self.conn.search(
results: ObjectApiResponse = await self.conn.search(
index=self.index_name,
body={"query": {"terms": {"uuid": oids}}},
_source=False,
fields=["tid", "parent_uuid"],
stored_fields="tid,parent_uuid",
size=PAGE_SIZE,
)
Expand Down
70 changes: 32 additions & 38 deletions guillotina_elasticsearch/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async def create_next_index(self):
next_index_name = await self.index_manager.start_migration()
# The index is created in the same transaction the registry is updated
# to prevent another process from accessing the 'next_index' and not finding it
if await self.conn.indices.exists(next_index_name):
if await self.conn.indices.exists(index=next_index_name):
if self.force:
# delete and recreate
self.response.write("Clearing index")
Expand All @@ -215,17 +215,15 @@ async def create_next_index(self):
async def copy_to_next_index(self):
real_index_name = await self.index_manager.get_index_name()
data = await self.conn.reindex(
{
"source": {"index": real_index_name, "size": 100},
"dest": {"index": self.work_index_name},
},
params={"wait_for_completion": "false"},
source={"index": real_index_name, "size": 100},
dest={"index": self.work_index_name},
wait_for_completion=False,
)
self.active_task_id = task_id = data["task"]
task_completed = False
while not task_completed:
await asyncio.sleep(10)
data = await self.conn.tasks.get(task_id)
data = await self.conn.tasks.get(task_id=task_id)
task_completed = data["completed"]
if task_completed:
break
Expand Down Expand Up @@ -282,12 +280,14 @@ async def calculate_mapping_diff(self):
all we care about is new fields...
Missing ones are ignored and we don't care about it.
"""
next_mappings = await self.conn.indices.get_mapping(self.work_index_name)
next_mappings = await self.conn.indices.get_mapping(index=self.work_index_name)
next_mappings = next_mappings[self.work_index_name]["mappings"]["properties"]

existing_index_name = await self.index_manager.get_real_index_name()
try:
existing_mappings = await self.conn.indices.get_mapping(existing_index_name)
existing_mappings = await self.conn.indices.get_mapping(
index=existing_index_name
)
except elasticsearch.exceptions.NotFoundError:
# allows us to upgrade when no index is present yet
return next_mappings
Expand Down Expand Up @@ -389,7 +389,6 @@ async def index_object(self, ob, full=False):
await self.attempt_flush()

async def attempt_flush(self):

if self.processed % 500 == 0:
self.policy.invalidate_cache()
num, _, _ = gc.get_count()
Expand Down Expand Up @@ -573,40 +572,35 @@ async def run_migration(self):

try:
await self.conn.indices.update_aliases(
{
"actions": [
{
"remove": {
"alias": alias_index_name,
"index": existing_index,
}
},
{
"add": {
"alias": alias_index_name,
"index": self.work_index_name,
}
},
]
}
actions=[
{
"remove": {
"alias": alias_index_name,
"index": existing_index,
}
},
{
"add": {
"alias": alias_index_name,
"index": self.work_index_name,
}
},
]
)
except elasticsearch.exceptions.NotFoundError:
await self.conn.indices.update_aliases(
{
"actions": [
{
"add": {
"alias": alias_index_name,
"index": self.work_index_name,
}
actions=[
{
"add": {
"alias": alias_index_name,
"index": self.work_index_name,
}
]
}
}
]
)

try:
await self.conn.indices.close(existing_index)
await self.conn.indices.delete(existing_index)
await self.conn.indices.close(index=existing_index)
await self.conn.indices.delete(index=existing_index)
self.response.write("Old index deleted")
except elasticsearch.exceptions.NotFoundError:
pass
1 change: 0 additions & 1 deletion guillotina_elasticsearch/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def _addon_index(ob):


def get_mappings(schemas=None, schema_info=False):

if schemas is None:
schemas = []
for name, _ in get_utilities_for(IResourceFactory):
Expand Down
15 changes: 5 additions & 10 deletions guillotina_elasticsearch/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
from pytest_docker_fixtures import images


image_version = "7.5.1"
image_version_8 = "8.12.0-debian-11-r2"
image_version_7 = "7.17.16-debian-11-r3" # noqa

images.configure(
"elasticsearch",
"docker.elastic.co/elasticsearch/elasticsearch",
image_version,
name="elasticsearch",
full=f"bitnami/elasticsearch:{image_version_8}",
max_wait_s=90,
env={
"xpack.security.enabled": None, # unset
"discovery.type": "single-node",
"http.host": "0.0.0.0",
"transport.host": "127.0.0.1",
},
env={"ELASTICSEARCH_ENABLE_SECURITY": "false", "ELASTICSEARCH_HEAP_SIZE": "1g"},
)


Expand Down
5 changes: 2 additions & 3 deletions guillotina_elasticsearch/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ def base_settings_configurator(settings):
"index": elastic_search_analyzers_normalizers(),
"connection_settings": {
"hosts": [
"{}:{}".format(
"http://{}:{}".format(
getattr(elasticsearch, "host", "localhost"),
getattr(elasticsearch, "port", "9200"),
)
],
"sniffer_timeout": None,
]
},
}

Expand Down
26 changes: 13 additions & 13 deletions guillotina_elasticsearch/tests/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,13 @@ async def test_updates_index_name(es_requester):
search = get_utility(ICatalogUtility)
im = get_adapter(container, IIndexManager)
existing_index = await im.get_real_index_name()
assert await search.get_connection().indices.exists(existing_index)
assert await search.get_connection().indices.exists(index=existing_index)
migrator = Migrator(search, container, force=True)
await migrator.run_migration()
assert not await search.get_connection().indices.exists(existing_index)
assert await search.get_connection().indices.exists(migrator.work_index_name)
assert not await search.get_connection().indices.exists(index=existing_index)
assert await search.get_connection().indices.exists(
index=migrator.work_index_name
)
assert await im.get_real_index_name() == migrator.work_index_name


Expand Down Expand Up @@ -358,11 +360,11 @@ async def test_search_works_on_new_docs_during_migration(es_requester):

async def _test():
result1 = await search.get_connection().get(
index=next_index_name, doc_type="_all", id=resp["@uid"]
index=next_index_name, id=resp["@uid"]
)
assert result1 is not None
result2 = await search.get_connection().get(
index=index_name, doc_type="_all", id=resp["@uid"]
index=index_name, id=resp["@uid"]
)
assert result2 is not None

Expand Down Expand Up @@ -397,13 +399,13 @@ async def test_search_works_on_updated_docs_during_migration_when_missing(

async def _test():
result1 = await search.get_connection().get(
index=index_name, doc_type="_all", id=resp["@uid"]
index=index_name, id=resp["@uid"]
)
assert result1 is not None
assert result1["_source"]["title"] == "Foobar2"
with pytest.raises(elasticsearch.exceptions.NotFoundError):
await search.get_connection().get(
index=next_index_name, doc_type="_all", id=resp["@uid"]
index=next_index_name, id=resp["@uid"]
)

await run_with_retries(_test, requester)
Expand Down Expand Up @@ -437,12 +439,12 @@ async def test_search_works_on_updated_docs_during_migration_when_present(

async def _test():
result1 = await search.get_connection().get(
index=next_index_name, doc_type="_all", id=resp["@uid"]
index=next_index_name, id=resp["@uid"]
)
assert result1 is not None
assert result1["_source"]["title"] == "Foobar2"
result2 = await search.get_connection().get(
index=index_name, doc_type="_all", id=resp["@uid"]
index=index_name, id=resp["@uid"]
)
assert result2 is not None
assert result2["_source"]["title"] == "Foobar2"
Expand All @@ -469,11 +471,9 @@ async def test_delete_in_both_during_migration(es_requester):
async def _test():
with pytest.raises(elasticsearch.exceptions.NotFoundError):
await search.get_connection().get(
index=next_index_name, doc_type="_all", id=resp["@uid"]
index=next_index_name, id=resp["@uid"]
)
with pytest.raises(elasticsearch.exceptions.NotFoundError):
await search.get_connection().get(
index=index_name, doc_type="_all", id=resp["@uid"]
)
await search.get_connection().get(index=index_name, id=resp["@uid"])

await run_with_retries(_test, requester)
Loading

0 comments on commit e6bb81e

Please sign in to comment.