diff --git a/.travis.yml b/.travis.yml index 16d2bed..5a5fd1b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,11 +2,11 @@ dist: xenial language: python python: - - "3.7" + - "3.7" sudo: required env: - - DATABASE=DUMMY - - DATABASE=postgresql + - ES_VERSION=7 + - ES_VERSION=7 DATABASE=postgresql services: - postgresql @@ -23,7 +23,7 @@ cache: - eggs install: - pip install flake8 codecov mypy_extensions - - pip install git+https://github.com/plone/guillotina.git@master + - pip install -e . - pip install -e .[test] script: - flake8 guillotina_elasticsearch --config=setup.cfg diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 91ccd45..c12f9f3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,63 @@ -5.0.1 (unreleased) +6.0.0 (unreleased) ------------------ -- Nothing changed yet. +- Support Guillotina 6 + [masipcat] + +- Support elasticsearch 7.0 + [jordic] + +- Make sure to save sub index changes in ES + [vangheem] + +- Fix default index settings + [vangheem] + +- Pinned aioelasticsearch to <0.6.0 + [masipcat] + +- Be able to import types + [vangheem] + +- Retry conflict errors on delete by query + +- Pay attention to trashed objects in pg +- Fix commands using missing attribute `self.request` + +- ISecurityInfo can be async + +- Fix not iterating over all content indexes in elasticsearch + [vangheem] + +- build_security_query(): changed 'query.bool.filter' to use a list instead of a single object + [masipcat] + +- Fix release + +- Missing pg conn lock with vacuuming + [vangheem] + +- Pass request on the index progress when possible + +- Fix release + +- Do not require request object for vacuuming + [vangheem] + +- G5 support + [vangheem] + +- Do not close indexes on create/delete + [vangheem] + +- Handle another index not found error on vacuum + [vangheem] + +- logging + [vangheem] + +- Handle index not found error + [vangheem] 5.0.0 (2019-10-21) diff --git a/VERSION b/VERSION index 80f5e86..3f6d5ce 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.0.1.dev0 +6.0.0.dev0 diff --git a/config-opendistro.json b/config-opendistro.json deleted file mode 100644 index 22843b8..0000000 --- a/config-opendistro.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "port": 8080, - "applications": ["guillotina_elasticsearch"], - "databases": [{ - "db": { - "storage": "DUMMY" - } - }], - "root_user": { - "password": "root" - }, - "elasticsearch": { - "bulk_size": 50, - "index_name_prefix": "guillotina-", - "connection_settings": { - "hosts": ["localhost:9200"], - "sniffer_timeout": 0, - "use_ssl": true, - "http_auth": "admin:admin" - } - } -} - \ No newline at end of file diff --git a/guillotina_elasticsearch/commands/vacuum.py b/guillotina_elasticsearch/commands/vacuum.py index 8411881..2f8f0a9 100644 --- a/guillotina_elasticsearch/commands/vacuum.py +++ b/guillotina_elasticsearch/commands/vacuum.py @@ -5,7 +5,7 @@ from guillotina.component import get_utility from guillotina.db import ROOT_ID from guillotina.db import TRASHED_ID -from guillotina.db.reader import reader +from guillotina.utils import get_object_by_uid from guillotina.interfaces import ICatalogUtility from guillotina.tests.utils import get_mocked_request from guillotina.tests.utils import login @@ -19,6 +19,7 @@ import aioelasticsearch import asyncio +import elasticsearch import json import logging @@ -26,7 +27,10 @@ logger = logging.getLogger('guillotina_elasticsearch_vacuum') GET_CONTAINERS = 'select zoid from {objects_table} where parent_id = $1' -SELECT_BY_KEYS = '''SELECT zoid from {objects_table} where zoid = ANY($1)''' +SELECT_BY_KEYS = f''' +SELECT zoid from {{objects_table}} +where zoid = ANY($1) AND parent_id != '{TRASHED_ID}' +''' GET_CHILDREN_BY_PARENT = """ SELECT zoid, parent_id, tid FROM {objects_table} @@ -36,10 +40,10 @@ PAGE_SIZE = 1000 -GET_OBS_BY_TID = """ +GET_OBS_BY_TID = f""" SELECT zoid, parent_id, tid -FROM {objects_table} -WHERE of is NULL +FROM {{objects_table}} +WHERE of is NULL and parent_id != '{TRASHED_ID}' ORDER BY tid ASC, zoid ASC """ @@ -95,14 +99,17 @@ async def iter_batched_es_keys(self): indexes.append(index['index']) for index_name in indexes: - result = await self.conn.search( - index=index_name, - scroll='15m', - size=PAGE_SIZE, - _source=False, - body={ - "sort": ["_doc"] - }) + try: + result = await self.conn.search( + index=index_name, + scroll='15m', + size=PAGE_SIZE, + _source=False, + body={ + "sort": ["_doc"] + }) + except elasticsearch.exceptions.NotFoundError: + continue yield [r['_id'] for r in result['hits']['hits']], index_name scroll_id = result['_scroll_id'] while scroll_id: @@ -161,33 +168,15 @@ async def get_object(self, oid): if oid in self.cache: return self.cache[oid] - try: - result = self.txn._manager._hard_cache.get(oid, None) - except AttributeError: - from guillotina.db.transaction import HARD_CACHE # noqa - result = HARD_CACHE.get(oid, None) - if result is None: - result = await self.txn._cache.get(oid=oid) - - if result is None: - result = await self.tm._storage.load(self.txn, oid) - - obj = reader(result) - obj.__txn__ = self.txn - if result['parent_id']: - obj.__parent__ = await self.get_object(result['parent_id']) - return obj + return await get_object_by_uid(oid) async def process_missing(self, oid, index_type='missing', folder=False): # need to fill in parents in order for indexing to work... logger.warning(f'Index {index_type} {oid}') try: obj = await self.get_object(oid) - except KeyError: + except (AttributeError, KeyError, TypeError, ModuleNotFoundError): logger.warning(f'Could not find {oid}') - return - except (AttributeError, TypeError, ModuleNotFoundError): - logger.warning(f'Could not find {oid}', exc_info=True) return # object or parent of object was removed, ignore try: if folder: @@ -302,17 +291,23 @@ async def check_missing(self): async for batch in self.iter_paged_db_keys([self.container.__uuid__]): oids = [r['zoid'] for r in batch] indexes = self.get_indexes_for_oids(oids) - results = await self.conn.search( - ','.join(indexes), body={ - 'query': { - 'terms': { - 'uuid': oids + try: + results = await self.conn.search( + index=','.join(indexes), + body={ + 'query': { + 'terms': { + 'uuid': oids + } } - } - }, - _source=False, - stored_fields='tid,parent_uuid', - size=PAGE_SIZE) + }, + _source=False, + stored_fields='tid,parent_uuid', + size=PAGE_SIZE) + except elasticsearch.exceptions.NotFoundError: + logger.warning( + f'Error searching index: {indexes}', exc_info=True) + continue es_batch = {} for result in results['hits']['hits']: diff --git a/guillotina_elasticsearch/events.py b/guillotina_elasticsearch/events.py index 6d2641c..1c1f88a 100644 --- a/guillotina_elasticsearch/events.py +++ b/guillotina_elasticsearch/events.py @@ -35,7 +35,7 @@ class IIndexProgress(Interface): @implementer(IIndexProgress) class IndexProgress(object): - def __init__(self, request, context, processed, total, completed=None): + def __init__(self, context, processed, total, completed=None, request=None): # noqa self.request = request self.context = context self.processed = processed diff --git a/guillotina_elasticsearch/exceptions.py b/guillotina_elasticsearch/exceptions.py index 7ea6f67..571d47d 100644 --- a/guillotina_elasticsearch/exceptions.py +++ b/guillotina_elasticsearch/exceptions.py @@ -3,3 +3,10 @@ class QueryErrorException(HTTPException): status_code = 488 + + +class ElasticsearchConflictException(Exception): + def __init__(self, conflicts, resp): + self.conflicts = conflicts + self.response = resp + super().__init__(f"{self.conflicts} on ES request") diff --git a/guillotina_elasticsearch/manager.py b/guillotina_elasticsearch/manager.py index 196603c..6807ab3 100644 --- a/guillotina_elasticsearch/manager.py +++ b/guillotina_elasticsearch/manager.py @@ -1,4 +1,5 @@ from copy import deepcopy +from guillotina.catalog.index import index_object from guillotina import app_settings from guillotina import configure from guillotina import task_vars @@ -6,6 +7,7 @@ from guillotina.component import get_adapter from guillotina.component import query_utility from guillotina.db.uid import get_short_uid +from guillotina.db.transaction import Status from guillotina.directives import index_field from guillotina.exceptions import NoIndexField from guillotina.interfaces import IAnnotations @@ -14,9 +16,10 @@ from guillotina.interfaces import IObjectAddedEvent from guillotina.interfaces import IResource from guillotina.transactions import get_transaction -from guillotina.utils import get_current_request -from guillotina.utils import resolve_dotted_name +from guillotina.transactions import transaction +from guillotina.utils import execute from guillotina.utils import get_registry as guillotina_get_registry +from guillotina.utils import resolve_dotted_name from guillotina_elasticsearch.directives import index from guillotina_elasticsearch.interfaces import IContentIndex from guillotina_elasticsearch.interfaces import IIndexActive @@ -29,7 +32,6 @@ import logging - logger = logging.getLogger('guillotina_elasticsearch') @@ -64,7 +66,9 @@ async def get_indexes(self): async def get_index_settings(self): index_settings = default_settings() - index_settings.update(app_settings.get('index', {})) + index_settings.update( + app_settings.get('elasticsearch', {}).get("index", {}) + ) return index_settings async def get_mappings(self): @@ -86,9 +90,17 @@ async def get_index_name(self): try: result = registry['el_index_name'] except KeyError: - result = self._generate_new_index_name() - registry['el_index_name'] = result - registry.register() + txn = get_transaction() + is_active = txn.status in (Status.ACTIVE, Status.COMMITTING) + if is_active: + result = self._generate_new_index_name() + registry['el_index_name'] = result + registry.register() + else: + async with transaction(): + result = self._generate_new_index_name() + registry['el_index_name'] = result + registry.register() return result async def get_real_index_name(self): @@ -119,8 +131,8 @@ async def finish_migration(self): registry = await self.get_registry() next_version = registry['el_next_index_version'] assert next_version is not None - await registry.__txn__.refresh(registry) - + txn = get_transaction() + await txn.refresh(registry) registry['el_index_version'] = next_version registry['el_next_index_version'] = None registry.register() @@ -159,19 +171,29 @@ def __init__(self, ob): self.object_settings = None async def get_registry(self, refresh=False): - if (refresh and self.object_settings is not None): + if refresh and self.object_settings is not None: txn = get_transaction() await txn.refresh(self.object_settings) if self.object_settings is None: - annotations_container = IAnnotations(self.context) - self.object_settings = await annotations_container.async_get('default') # noqa - if self.object_settings is None: - # need to create annotation... - self.object_settings = AnnotationData() - await annotations_container.async_set( - 'default', self.object_settings) + txn = get_transaction() + is_active = txn.status in (Status.ACTIVE, Status.COMMITTING) + if is_active: + self.object_settings = await self._get_registry_or_create() + else: + async with transaction(): + self.object_settings = await self._get_registry_or_create() return self.object_settings + async def _get_registry_or_create(self): + annotations_container = IAnnotations(self.context) + object_settings = await annotations_container.async_get('default') # noqa + if object_settings is None: + # need to create annotation... + object_settings = AnnotationData() + await annotations_container.async_set( + 'default', object_settings) + return object_settings + def _generate_new_index_name(self): ''' index name structure is: @@ -211,6 +233,11 @@ async def get_schemas(self): index_data.get('schemas') or []]) return set(schemas) + async def finish_migration(self): + await super().finish_migration() + await index_object(self.context, indexes=["elastic_index"], + modified=True) + async def _teardown_failed_request_with_index(im): utility = query_utility(ICatalogUtility) @@ -231,7 +258,6 @@ async def init_index(context, subscriber): index_name = await im.get_index_name() real_index_name = await im.get_real_index_name() - request = get_current_request() conn = utility.get_connection() await utility.create_index(real_index_name, im) @@ -245,7 +271,7 @@ async def init_index(context, subscriber): wait_for_status='yellow') alsoProvides(context, IIndexActive) - request.add_future( + execute.add_future( 'cleanup-' + context.uuid, _teardown_failed_request_with_index, scope='failure', args=[im]) diff --git a/guillotina_elasticsearch/migration.py b/guillotina_elasticsearch/migration.py index 47cd094..c8b4e42 100644 --- a/guillotina_elasticsearch/migration.py +++ b/guillotina_elasticsearch/migration.py @@ -13,13 +13,12 @@ from guillotina.interfaces import IFolder from guillotina.interfaces import IResourceFactory from guillotina.interfaces import ISecurityInfo -from guillotina.transactions import managed_transaction +from guillotina.transactions import transaction from guillotina.utils import apply_coroutine from guillotina.utils import get_authenticated_user from guillotina.utils import get_content_path -from guillotina.utils import get_current_request -from guillotina.utils import get_current_transaction from guillotina.utils import get_current_container +from guillotina.utils import get_current_transaction from guillotina.utils import get_security_policy from guillotina_elasticsearch.events import IndexProgress from guillotina_elasticsearch.interfaces import DOC_TYPE @@ -152,16 +151,12 @@ def __init__(self, utility, context, response=noop_response, force=False, 'Can not do a full reindex and a mapping only migration') self.mapping_only = mapping_only - if request is None: - self.request = get_current_request() - else: - self.request = request - self.txn = get_current_transaction() if not cache: # make sure that we don't cache requests... self.txn._cache = DummyCache(self.txn) + self.request = request self.container = get_current_container() self.conn = utility.get_connection() @@ -171,7 +166,6 @@ def __init__(self, utility, context, response=noop_response, force=False, self.index_manager = index_manager self.user = get_authenticated_user() - self.policy = get_security_policy(self.user) self.indexer = Indexer() @@ -197,7 +191,7 @@ def per_sec(self): return self.processed / (time.time() - self.index_start_time) async def create_next_index(self): - async with managed_transaction(tm=self.txn.manager) as txn: + async with transaction(adopt_parent_txn=True) as txn: await txn.refresh(await self.index_manager.get_registry()) next_index_name = await self.index_manager.start_migration() if await self.conn.indices.exists(next_index_name): @@ -231,7 +225,8 @@ async def copy_to_next_index(self): })) as resp: data = await resp.json() self.active_task_id = task_id = data['task'] - while True: + task_completed = False + while not task_completed: await asyncio.sleep(10) async with conn_es.session.get( join(str(conn_es.base_url), '_tasks', task_id), @@ -241,7 +236,8 @@ async def copy_to_next_index(self): if resp.status in (400, 404): break data = await resp.json() - if data['completed']: + task_completed = data['completed'] + if task_completed: break status = data["task"]["status"] self.response.write( @@ -250,16 +246,21 @@ async def copy_to_next_index(self): self.copied_docs = status["created"] self.active_task_id = None - response = data['response'] - failures = response['failures'] - if len(failures) > 0: - failures = json.dumps(failures, sort_keys=True, indent=4, - separators=(',', ': ')) - self.response.write( - f'Reindex encountered failures: {failures}') + if task_completed: + response = data['response'] + failures = response['failures'] + if len(failures) > 0: + failures = json.dumps( + failures, sort_keys=True, + indent=4, separators=(',', ': ') + ) + self.response.write( + f'Reindex encountered failures: {failures}') + else: + self.response.write( + f'Finished copying to new index: {self.copied_docs}') else: - self.response.write( - f'Finished copying to new index: {self.copied_docs}') + self.response.write(f'Unknown state for task {task_id}') async def get_all_uids(self): self.response.write('Retrieving existing doc ids') @@ -308,7 +309,6 @@ async def calculate_mapping_diff(self): except elasticsearch.exceptions.NotFoundError: # allows us to upgrade when no index is present yet return next_mappings - existing_mappings = existing_mappings[existing_index_name]['mappings'] existing_mappings = existing_mappings['properties'] @@ -370,7 +370,7 @@ async def index_object(self, ob, full=False): batch_type = 'update' if self.reindex_security: try: - data = ISecurityInfo(ob)() + data = await apply_coroutine(ISecurityInfo(ob)) except TypeError: self.response.write(f'Could not index {ob}') return @@ -429,8 +429,9 @@ async def attempt_flush(self): )) if len(self.batch) >= self.bulk_size: await notify(IndexProgress( - self.request, self.context, self.processed, - (len(self.existing) + len(self.missing)) + self.context, self.processed, + (len(self.existing) + len(self.missing)), + request=self.request )) await self.flush() @@ -537,7 +538,7 @@ async def setup_next_index(self): async def cancel_migration(self): # canceling the migration, clearing index self.response.write('Canceling migration') - async with managed_transaction(tm=self.txn.manager): + async with transaction(adopt_parent_txn=True): await self.index_manager.cancel_migration() self.response.write('Next index disabled') if self.active_task_id is not None: @@ -596,8 +597,8 @@ async def run_migration(self): async with get_migration_lock( await self.index_manager.get_index_name()): self.response.write('Activating new index') - - await self.index_manager.finish_migration() + async with transaction(adopt_parent_txn=True): + await self.index_manager.finish_migration() self.status = 'done' self.response.write(f'''Update alias({alias_index_name}): @@ -642,7 +643,7 @@ async def run_migration(self): migrator = Migrator( self.utility, ob, response=self.response, force=self.force, log_details=self.log_details, - memory_tracking=self.memory_tracking, request=self.request, + memory_tracking=self.memory_tracking, bulk_size=self.bulk_size, full=self.full, reindex_security=self.reindex_security, mapping_only=self.mapping_only, diff --git a/guillotina_elasticsearch/parser.py b/guillotina_elasticsearch/parser.py index 300439e..045d2c9 100644 --- a/guillotina_elasticsearch/parser.py +++ b/guillotina_elasticsearch/parser.py @@ -7,7 +7,6 @@ from guillotina.interfaces import ISearchParser from guillotina_elasticsearch.interfaces import IElasticSearchUtility from guillotina_elasticsearch.interfaces import ParsedQueryInfo - import logging import typing @@ -16,17 +15,13 @@ MAX_AGGS = 20 SEARCH_DATA_FIELDS = [ - 'content_layout', 'contributors', 'creation_date', 'creators', - 'hidden_navigation', 'id', - 'language', 'modification_date', 'parent_uuid', 'path', - 'review_state', 'tags', 'title', 'type_name', @@ -188,10 +183,6 @@ def process_query_level(params): name='default') class Parser(BaseParser): - def __init__(self, request, context): - self.request = request - self.context = context - def __call__(self, params: typing.Dict) -> ParsedQueryInfo: query_info = super().__call__(params) diff --git a/guillotina_elasticsearch/py.typed b/guillotina_elasticsearch/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/guillotina_elasticsearch/queries.py b/guillotina_elasticsearch/queries.py index 670099f..f1af1c5 100644 --- a/guillotina_elasticsearch/queries.py +++ b/guillotina_elasticsearch/queries.py @@ -27,12 +27,14 @@ async def build_security_query(container): return { 'query': { 'bool': { - 'filter': { - 'bool': { - 'should': should_list, - 'minimum_should_match': 1 + 'filter': [ + { + 'bool': { + 'should': should_list, + 'minimum_should_match': 1 + } } - } + ] } } } diff --git a/guillotina_elasticsearch/reindex.py b/guillotina_elasticsearch/reindex.py index 1f32b1e..18569a5 100644 --- a/guillotina_elasticsearch/reindex.py +++ b/guillotina_elasticsearch/reindex.py @@ -17,13 +17,13 @@ def __init__(self, *args, **kwargs): async def reindex(self, obj): index_manager = find_index_manager(obj) - container = get_current_container() if index_manager is None: + container = get_current_container() index_manager = get_adapter(container, IIndexManager) self.work_index_name = await index_manager.get_index_name() await notify(IndexProgress( - self.request, self.context, 0, self.processed)) + self.context, 0, self.processed)) await self.process_object(obj) await self.flush() if len(self.sub_indexes) > 0: @@ -34,9 +34,10 @@ async def reindex(self, obj): self.utility, ob, response=self.response, force=self.force, log_details=self.log_details, memory_tracking=self.memory_tracking, - request=self.request, bulk_size=self.bulk_size, + bulk_size=self.bulk_size, full=self.full, reindex_security=self.reindex_security, - mapping_only=self.mapping_only, index_manager=im) + mapping_only=self.mapping_only, index_manager=im, + request=self.request) reindexer.processed = self.processed reindexer.work_index_name = await im.get_index_name() await reindexer.process_folder(ob) @@ -44,6 +45,7 @@ async def reindex(self, obj): self.processed = reindexer.processed await notify(IndexProgress( - self.request, self.context, self.processed, - self.processed, completed=True + self.context, self.processed, + self.processed, completed=True, + request=self.request )) diff --git a/guillotina_elasticsearch/schema.py b/guillotina_elasticsearch/schema.py index 49d954d..29f9653 100644 --- a/guillotina_elasticsearch/schema.py +++ b/guillotina_elasticsearch/schema.py @@ -2,11 +2,11 @@ from guillotina.content import get_all_possible_schemas_for_type from guillotina.content import IResourceFactory from guillotina import app_settings +from typing import Dict, Any import guillotina.directives - -CATALOG_TYPES = { +CATALOG_TYPES: Dict[str, Any] = { 'searchabletext': { 'type': 'text', 'index': True @@ -116,10 +116,6 @@ def get_mappings(schemas=None, schema_info=False): schema_field_mappings[index_name] = catalog_info['__schema__'] mappings[index_name] = field_mapping - - result = { - 'properties': mappings, - 'dynamic': False, + return { + 'properties': mappings } - - return result diff --git a/guillotina_elasticsearch/testing.py b/guillotina_elasticsearch/testing.py index 49af1b2..8b7d6be 100644 --- a/guillotina_elasticsearch/testing.py +++ b/guillotina_elasticsearch/testing.py @@ -1,10 +1,9 @@ from aioelasticsearch import Elasticsearch -from guillotina import app_settings +from guillotina import app_settings, task_vars from guillotina import configure from guillotina.content import Folder from guillotina.exceptions import RequestNotFound from guillotina.interfaces import IResource -from guillotina.utils import get_current_request from guillotina_elasticsearch.directives import index from guillotina_elasticsearch.interfaces import IConnectionFactoryUtility from guillotina_elasticsearch.interfaces import IContentIndex @@ -51,17 +50,16 @@ def __init__(self): self._special_conn = None def get(self, loop=None): - container_id = None + container = None try: - request = get_current_request() - container_id = getattr(request, '_container_id', None) + container = task_vars.container.get() except RequestNotFound: return super().get(loop) settings = app_settings.get('elasticsearch', {}).get( 'connection_settings' ) - if (container_id is None or container_id != 'new_container' or + if (container is None or container.id != 'new_container' or 'new_container_settings' not in app_settings['elasticsearch']): return super().get(loop) else: diff --git a/guillotina_elasticsearch/tests/_test_flaky.py b/guillotina_elasticsearch/tests/_test_flaky.py index fd5ee4f..eb9ef6d 100644 --- a/guillotina_elasticsearch/tests/_test_flaky.py +++ b/guillotina_elasticsearch/tests/_test_flaky.py @@ -17,7 +17,7 @@ async def _test_new_indexes_are_performed_during_migration(es_requester): container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) await migrator.setup_next_index() await migrator.copy_to_next_index() @@ -47,7 +47,7 @@ async def _test_new_deletes_are_performed_during_migration(es_requester): container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) await migrator.setup_next_index() await migrator.copy_to_next_index() diff --git a/guillotina_elasticsearch/tests/conftest.py b/guillotina_elasticsearch/tests/conftest.py index 5f604a0..144f705 100644 --- a/guillotina_elasticsearch/tests/conftest.py +++ b/guillotina_elasticsearch/tests/conftest.py @@ -3,7 +3,7 @@ images.configure( 'elasticsearch', - 'docker.elastic.co/elasticsearch/elasticsearch', '7.1.1', + 'docker.elastic.co/elasticsearch/elasticsearch', '7.5.1', env={ "xpack.security.enabled": None, # unset "discovery.type": "single-node", diff --git a/guillotina_elasticsearch/tests/fixtures.py b/guillotina_elasticsearch/tests/fixtures.py index 46c1920..c14876f 100644 --- a/guillotina_elasticsearch/tests/fixtures.py +++ b/guillotina_elasticsearch/tests/fixtures.py @@ -76,9 +76,9 @@ async def __aenter__(self): @pytest.fixture(scope='function') -async def es_requester(elasticsearch, guillotina, loop): +async def es_requester(elasticsearch, guillotina, event_loop): # clean up all existing indexes es_host = '{}:{}'.format( elasticsearch[0], elasticsearch[1]) await cleanup_es(es_host) - return ESRequester(guillotina, loop) + return ESRequester(guillotina, event_loop) diff --git a/guillotina_elasticsearch/tests/test_catalog.py b/guillotina_elasticsearch/tests/test_catalog.py index 07436c1..73e4519 100644 --- a/guillotina_elasticsearch/tests/test_catalog.py +++ b/guillotina_elasticsearch/tests/test_catalog.py @@ -6,6 +6,10 @@ from guillotina_elasticsearch.interfaces import IIndexManager from guillotina_elasticsearch.tests.utils import setup_txn_on_container +import pytest + +pytestmark = [pytest.mark.asyncio] + async def test_index(es_requester): async with es_requester as requester: diff --git a/guillotina_elasticsearch/tests/test_custom_index.py b/guillotina_elasticsearch/tests/test_custom_index.py index be5215a..5fe6200 100644 --- a/guillotina_elasticsearch/tests/test_custom_index.py +++ b/guillotina_elasticsearch/tests/test_custom_index.py @@ -11,6 +11,9 @@ import pytest +pytestmark = [pytest.mark.asyncio] + + async def test_create_index(es_requester): async with es_requester as requester: resp, status = await requester( @@ -164,6 +167,7 @@ async def _test(): async def test_delete_base_removes_index_from_elastic(es_requester): async with es_requester as requester: + container, request, txn, tm = await setup_txn_on_container(requester) cresp, _ = await requester( 'POST', '/db/guillotina/', @@ -210,7 +214,6 @@ async def _test(): )) -@pytest.mark.flaky(reruns=5) async def test_delete_parent_gets_cleaned_up(es_requester): async with es_requester as requester: await requester( diff --git a/guillotina_elasticsearch/tests/test_indexer.py b/guillotina_elasticsearch/tests/test_indexer.py index 07b9ac2..c2b9e16 100644 --- a/guillotina_elasticsearch/tests/test_indexer.py +++ b/guillotina_elasticsearch/tests/test_indexer.py @@ -3,6 +3,10 @@ from guillotina_elasticsearch.tests.utils import setup_txn_on_container import json +import pytest + + +pytestmark = [pytest.mark.asyncio] async def test_indexer_matches_manual(es_requester): diff --git a/guillotina_elasticsearch/tests/test_migration.py b/guillotina_elasticsearch/tests/test_migration.py index bd85fda..4d10ef6 100644 --- a/guillotina_elasticsearch/tests/test_migration.py +++ b/guillotina_elasticsearch/tests/test_migration.py @@ -1,8 +1,8 @@ from guillotina import task_vars from guillotina.component import get_adapter -from guillotina.db.uid import get_short_uid from guillotina.component import get_utility from guillotina.component import globalregistry as gr +from guillotina.db.uid import get_short_uid from guillotina.event import notify from guillotina.events import ObjectRemovedEvent from guillotina.interfaces import ICatalogUtility @@ -24,12 +24,15 @@ import random -@pytest.mark.flaky(reruns=5) +pytestmark = [pytest.mark.asyncio] + + async def _test_migrate_while_content_getting_added(es_requester): async with es_requester as requester: add_count = await add_content(requester) container, request, txn, tm = await setup_txn_on_container(requester) + task_vars.request.set(request) search = get_utility(ICatalogUtility) await search.refresh(container) @@ -79,7 +82,6 @@ async def test_migrate_get_all_uids(es_requester): await tm.abort(txn=txn) -@pytest.mark.flaky(reruns=5) async def test_removes_orphans(es_requester): async with es_requester as requester: container, request, txn, tm = await setup_txn_on_container(requester) @@ -114,7 +116,6 @@ async def _test(): await run_with_retries(_test, requester) -# @pytest.mark.flaky(reruns=5) async def test_fixes_missing(es_requester): async with es_requester as requester: await add_content(requester, 2, 2) @@ -145,7 +146,7 @@ def write(self, item): responses.append(item) migrator = Migrator(search, container, force=True, - request=request, response=Writer()) + response=Writer()) await migrator.run_migration() assert migrator.status == 'done' @@ -160,13 +161,12 @@ def write(self, item): assert old_index_name != await im.get_real_index_name() -@pytest.mark.flaky(reruns=5) async def test_updates_index_data(es_requester): async with es_requester as requester: container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) new_index_name = await migrator.create_next_index() migrator.work_index_name = new_index_name @@ -203,7 +203,7 @@ async def test_updates_index_data(es_requester): await search.refresh(container, new_index_name) await asyncio.sleep(1) doc = await search.get_connection().get( - index=new_index_name, doc_type=DOC_TYPE, id=ob.uuid) + index=new_index_name, doc_type=DOC_TYPE, id=ob.__uuid__) assert doc['_source']['title'] == 'foobar-new' @@ -213,7 +213,7 @@ async def test_calculate_mapping_diff(es_requester): search = get_utility(ICatalogUtility) index_manager = get_adapter(container, IIndexManager) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) new_index_name = await index_manager.start_migration() migrator.work_index_name = new_index_name @@ -238,7 +238,7 @@ async def test_updates_index_name(es_requester): im = get_adapter(container, IIndexManager) existing_index = await im.get_real_index_name() assert await search.get_connection().indices.exists(existing_index) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) await migrator.run_migration() assert not await search.get_connection().indices.exists(existing_index) assert await search.get_connection().indices.exists( @@ -257,7 +257,7 @@ async def test_moves_docs_over(es_requester): await asyncio.sleep(1) current_count = await search.get_doc_count(container) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) await migrator.run_migration() im = get_adapter(container, IIndexManager) @@ -272,7 +272,7 @@ async def test_create_next_index(es_requester): async with es_requester as requester: container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) name = await migrator.create_next_index() assert name == 'guillotina-db-guillotina_2' @@ -320,7 +320,7 @@ async def test_migrator_emit_events_during_indexing( gr.base.adapters.subscribe( [IIndexProgress], None, event_handler.subscribe) migrator = Reindexer( - search, _marker, force=True, request=req, reindex_security=True + search, _marker, force=True, reindex_security=True ) migrator.bulk_size = 0 migrator.batch = {} @@ -351,7 +351,7 @@ async def test_migrator_emmits_events_on_end(es_requester, event_handler): gr.base.adapters.subscribe( [IIndexProgress], None, event_handler.subscribe) migrator = Reindexer( - search, container, force=True, request=req, reindex_security=True + search, container, force=True, reindex_security=True ) ob = await container.async_get('foobar') @@ -369,7 +369,7 @@ async def test_search_works_on_new_docs_during_migration(es_requester): await add_content(requester, 2) container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) im = get_adapter(container, IIndexManager) index_name = await im.get_index_name() next_index_name = await migrator.setup_next_index() @@ -404,7 +404,7 @@ async def test_search_works_on_updated_docs_during_migration_when_missing(es_req container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) im = get_adapter(container, IIndexManager) index_name = await im.get_index_name() next_index_name = await migrator.setup_next_index() @@ -434,7 +434,7 @@ async def test_search_works_on_updated_docs_during_migration_when_present(es_req async with es_requester as requester: container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) im = get_adapter(container, IIndexManager) index_name = await im.get_index_name() next_index_name = await migrator.setup_next_index() @@ -462,12 +462,11 @@ async def _test(): await run_with_retries(_test, requester) -@pytest.mark.flaky(reruns=5) async def test_delete_in_both_during_migration(es_requester): async with es_requester as requester: container, request, txn, tm = await setup_txn_on_container(requester) search = get_utility(ICatalogUtility) - migrator = Migrator(search, container, force=True, request=request) + migrator = Migrator(search, container, force=True) im = get_adapter(container, IIndexManager) index_name = await im.get_index_name() next_index_name = await migrator.setup_next_index() diff --git a/guillotina_elasticsearch/tests/test_parser.py b/guillotina_elasticsearch/tests/test_parser.py index 097529f..2f164f2 100644 --- a/guillotina_elasticsearch/tests/test_parser.py +++ b/guillotina_elasticsearch/tests/test_parser.py @@ -1,8 +1,12 @@ from guillotina_elasticsearch.parser import Parser from guillotina.tests import utils as test_utils +import pytest -async def test_es_field_parser(dummy_guillotina): +pytestmark = [pytest.mark.asyncio] + + +async def _test_es_field_parser(dummy_guillotina): content = test_utils.create_content() parser = Parser(None, content) diff --git a/guillotina_elasticsearch/tests/test_schema.py b/guillotina_elasticsearch/tests/test_schema.py index d29abcb..51fe459 100644 --- a/guillotina_elasticsearch/tests/test_schema.py +++ b/guillotina_elasticsearch/tests/test_schema.py @@ -5,6 +5,8 @@ import pytest +pytestmark = [pytest.mark.asyncio] + class IA(Interface): index_field('item', field_mapping={'type': 'integer'}) diff --git a/guillotina_elasticsearch/tests/test_search.py b/guillotina_elasticsearch/tests/test_search.py index c9326d1..4433bb4 100644 --- a/guillotina_elasticsearch/tests/test_search.py +++ b/guillotina_elasticsearch/tests/test_search.py @@ -2,6 +2,9 @@ from guillotina_elasticsearch.tests.utils import setup_txn_on_container import json +import pytest + +pytestmark = [pytest.mark.asyncio] async def test_indexing_and_search(es_requester): @@ -30,8 +33,9 @@ async def _test(): '/db/guillotina/@search', data=json.dumps({}) ) - assert resp['items_count'] == 1 - assert resp['member'][0]['path'] == '/item1' + assert status == 200 + assert resp['items_total'] == 1 + assert resp['items'][0]['path'] == '/item1' await run_with_retries(_test, requester) @@ -44,7 +48,7 @@ async def _test(): '/db/guillotina/@search', data=json.dumps({}) ) - assert resp['items_count'] == 0 + assert resp['items_total'] == 0 await run_with_retries(_test, requester) @@ -87,8 +91,8 @@ async def _test(): '/db/guillotina/@search', data=json.dumps({}) ) - assert resp['items_count'] == 3 - assert resp['member'][0]['@name'] + assert resp['items_total'] == 3 + assert resp['items'][0]['@name'] await run_with_retries(_test, requester) @@ -101,6 +105,6 @@ async def _test(): '/db/guillotina/@search', data=json.dumps({}) ) - assert resp['items_count'] == 0 + assert resp['items_total'] == 0 await run_with_retries(_test, requester) diff --git a/guillotina_elasticsearch/tests/test_vacuum.py b/guillotina_elasticsearch/tests/test_vacuum.py index 244379e..db620d0 100644 --- a/guillotina_elasticsearch/tests/test_vacuum.py +++ b/guillotina_elasticsearch/tests/test_vacuum.py @@ -14,11 +14,13 @@ import pytest +pytestmark = [pytest.mark.asyncio] + + DATABASE = os.environ.get('DATABASE', 'DUMMY') @pytest.mark.skipif(DATABASE == 'DUMMY', reason='Not for dummy db') -@pytest.mark.flaky(reruns=5) async def test_adds_missing_elasticsearch_entry(es_requester): async with es_requester as requester: await add_content(requester) @@ -59,7 +61,6 @@ async def ___test(): @pytest.mark.skipif(DATABASE == 'DUMMY', reason='Not for dummy db') -@pytest.mark.flaky(reruns=5) async def test_updates_out_of_data_es_entries(es_requester): async with es_requester as requester: await add_content(requester) @@ -97,7 +98,6 @@ async def _test(): @pytest.mark.skipif(DATABASE == 'DUMMY', reason='Not for dummy db') -@pytest.mark.flaky(reruns=5) async def test_removes_orphaned_es_entry(es_requester): async with es_requester as requester: container, request, txn, tm = await setup_txn_on_container(requester) @@ -132,7 +132,6 @@ async def __test(): @pytest.mark.skipif(DATABASE == 'DUMMY', reason='Not for dummy db') -@pytest.mark.flaky(reruns=5) async def test_vacuum_with_sub_indexes(es_requester): async with es_requester as requester: await add_content( @@ -219,7 +218,6 @@ async def ___test(): @pytest.mark.skipif(DATABASE == 'DUMMY', reason='Not for dummy db') -@pytest.mark.flaky(reruns=3) async def test_reindexes_moved_content(es_requester): async with es_requester as requester: resp1, _ = await requester( @@ -331,7 +329,6 @@ async def __test(): @pytest.mark.skipif(DATABASE == 'DUMMY', reason='Not for dummy db') -# @pytest.mark.flaky(reruns=3) async def test_vacuum_with_multiple_containers(es_requester): async with es_requester as requester: diff --git a/guillotina_elasticsearch/tests/utils.py b/guillotina_elasticsearch/tests/utils.py index cd5b9fa..9f254ad 100644 --- a/guillotina_elasticsearch/tests/utils.py +++ b/guillotina_elasticsearch/tests/utils.py @@ -48,8 +48,7 @@ async def setup_txn_on_container(requester, container_id='guillotina'): utils.login() request = utils.get_mocked_request(db=requester.db) task_vars.request.set(request) - container = await get_container( - requester=requester, container_id=container_id) + container = await get_container(requester=requester, container_id=container_id) # noqa tm = task_vars.tm.get() txn = await tm.begin() return container, request, txn, tm @@ -94,7 +93,8 @@ async def cleanup_es(es_host, prefix=''): try: await conn.indices.delete_alias(index, name) await conn.indices.delete(index) - except elasticsearch.exceptions.AuthorizationException: + except (elasticsearch.exceptions.AuthorizationException, + elasticsearch.exceptions.NotFoundError): pass for index in (await conn.cat.indices()).splitlines(): _, _, index_name = index.split()[:3] diff --git a/guillotina_elasticsearch/utility.py b/guillotina_elasticsearch/utility.py index 6a09487..cbbc03f 100644 --- a/guillotina_elasticsearch/utility.py +++ b/guillotina_elasticsearch/utility.py @@ -6,7 +6,8 @@ from guillotina.component import get_adapter from guillotina.component import get_utility from guillotina.event import notify -from guillotina.interfaces import IContainer +from guillotina.exceptions import RequestNotFound +from guillotina_elasticsearch.exceptions import ElasticsearchConflictException from guillotina.interfaces import IFolder from guillotina.transactions import get_transaction from guillotina.utils import get_content_depth @@ -16,6 +17,7 @@ from guillotina.utils import merge_dicts from guillotina.utils import navigate_to from guillotina.utils import resolve_dotted_name +from guillotina.utils.misc import get_current_container from guillotina_elasticsearch.events import SearchDoneEvent from guillotina_elasticsearch.exceptions import QueryErrorException from guillotina_elasticsearch.interfaces import DOC_TYPE @@ -23,7 +25,6 @@ from guillotina_elasticsearch.interfaces import IElasticSearchUtility # noqa b/w compat import from guillotina_elasticsearch.interfaces import IIndexActive from guillotina_elasticsearch.interfaces import IIndexManager -from guillotina_elasticsearch.interfaces import ParsedQueryInfo from guillotina_elasticsearch.utils import find_index_manager from guillotina_elasticsearch.utils import format_hit from guillotina_elasticsearch.utils import get_content_sub_indexes @@ -132,14 +133,10 @@ async def initialize_catalog(self, container): index_name = await index_manager.get_index_name() real_index_name = await index_manager.get_real_index_name() - await self.create_index(real_index_name, index_manager) conn = self.get_connection() await conn.indices.put_alias( name=index_name, index=real_index_name) - await conn.indices.close(real_index_name) - - await conn.indices.open(real_index_name) await conn.cluster.health( wait_for_status='yellow') # pylint: disable=E1123 @@ -154,7 +151,8 @@ async def create_index(self, real_index_name, index_manager, if mappings is None: mappings = await index_manager.get_mappings() settings = { - 'settings': settings + 'settings': settings, + 'mappings': mappings, } settings['mappings'] = mappings conn = self.get_connection() @@ -164,14 +162,12 @@ async def _delete_index(self, im): index_name = await im.get_index_name() real_index_name = await im.get_real_index_name() conn = self.get_connection() - await safe_es_call(conn.indices.close, real_index_name) await safe_es_call( conn.indices.delete_alias, real_index_name, index_name) await safe_es_call(conn.indices.delete, real_index_name) await safe_es_call(conn.indices.delete, index_name) migration_index = await im.get_migration_index_name() if migration_index: - await safe_es_call(conn.indices.close, migration_index) await safe_es_call(conn.indices.delete, migration_index) async def remove_catalog(self, container): @@ -194,7 +190,7 @@ async def reindex_all_content( self, obj, security=False, response=noop_response, request=None): from guillotina_elasticsearch.reindex import Reindexer reindexer = Reindexer(self, obj, response=response, - reindex_security=security, request=request) + reindex_security=security) await reindexer.reindex(obj) async def _build_security_query( @@ -239,28 +235,25 @@ def _get_items_from_result(self, container, request, result): items.append(data) return items - # async def search(self, container: IContainer, query: ParsedQueryInfo): - # return await self.query( - # container, query['query'], - # size=query['size'], scroll=None, index=None) - - async def search( - self, container, query_info: ParsedQueryInfo, - size=10, request=None, scroll=None, index=None): + async def search_raw( + self, container, query, + doc_type=None, size=10, request=None, scroll=None, index=None): """ - transform into query... - right now, it's just passing through into elasticsearch + Search raw query """ if index is None: index = await self.get_container_index_name(container) t1 = time.time() if request is None: - request = get_current_request() - query = query_info['query'] - q = await self._build_security_query( - container, query, size, scroll) + try: + request = get_current_request() + except RequestNotFound: + pass + + q = await self._build_security_query(container, query, size, scroll) q['ignore_unavailable'] = True + logger.debug("Generated query %s", json.dumps(query)) conn = self.get_connection() result = await conn.search(index=index, **q) @@ -272,8 +265,8 @@ async def search( raise QueryErrorException(reason=error_message) items = self._get_items_from_result(container, request, result) final = { - 'items_count': result['hits']['total']['value'], - 'member': items + 'items_total': result['hits']['total']['value'], + 'items': items } if 'aggregations' in result: final['aggregations'] = result['aggregations'] @@ -298,7 +291,7 @@ async def get_by_uuid(self, container, uuid): } } } - return await self.query(container, query, container) + return await self.search_raw(container, query, container) async def get_by_uuids(self, container, uuids, doc_type=None): uuid_query = self._get_type_query(doc_type) @@ -307,14 +300,14 @@ async def get_by_uuids(self, container, uuids, doc_type=None): "terms": {"uuid": uuids} }) - return await self.query(container, uuid_query) + return await self.search_raw(container, uuid_query) async def get_object_by_uuid(self, container, uuid): result = await self.get_by_uuid(container, uuid) - if result['items_count'] == 0 or result['items_count'] > 1: + if result['items_total'] == 0 or result['items_total'] > 1: raise AttributeError('Not found a unique object') - path = result['members'][0]['path'] + path = result['items'][0]['path'] obj = await navigate_to(container, path) return obj @@ -400,9 +393,8 @@ async def unindex_all_children(self, container, resource, b'Removing all children of %s' % content_path.encode('utf-8')) # use future here because this can potentially take some # time to clean up indexes, etc - asyncio.ensure_future( - self.call_unindex_all_children( - container, index_name, content_path)) + await self.call_unindex_all_children( + container, index_name, content_path) @backoff.on_exception( backoff.constant, @@ -431,18 +423,29 @@ async def call_unindex_all_children(self, container, index_name, pass path_query = await self.get_path_query(content_path) + await self._delete_by_query(path_query, index_name) + + @backoff.on_exception( + backoff.constant, (ElasticsearchConflictException,), + interval=0.5, max_tries=5) + async def _delete_by_query(self, path_query, index_name): + conn = self.get_connection() conn_es = await conn.transport.get_connection() async with conn_es.session.post( join(conn_es.base_url.human_repr(), index_name, '_delete_by_query'), data=json.dumps(path_query), params={ - 'ignore_unavailable': 'true' + 'ignore_unavailable': 'true', + 'conflicts': 'proceed' }, headers={ 'Content-Type': 'application/json' }) as resp: result = await resp.json() + if result['version_conflicts'] > 0: + raise ElasticsearchConflictException( + result['version_conflicts'], resp) if 'deleted' in result: logger.debug(f'Deleted {result["deleted"]} children') logger.debug(f'Deleted {json.dumps(path_query)}') @@ -451,11 +454,11 @@ async def call_unindex_all_children(self, container, index_name, async def update_by_query(self, query, context=None, indexes=None): if indexes is None: - request = get_current_request() - indexes = await self.get_current_indexes(request.container) + container = get_current_container() + indexes = await self.get_current_indexes(container) if context is not None: for index in await get_content_sub_indexes( - request.container, get_content_path(context)): + container, get_content_path(context)): indexes.append(index['index']) return await self._update_by_query(query, ','.join(indexes)) @@ -591,10 +594,13 @@ async def index(self, container, datas, response=noop_response, def _get_current_tid(self): # make sure to get current committed tid or we may be one-behind # for what was actually used to commit to db - txn = get_transaction() tid = None - if txn: - tid = txn._tid + try: + txn = get_transaction() + if txn: + tid = txn._tid + except RequestNotFound: + pass return tid async def update(self, container, datas, response=noop_response, diff --git a/guillotina_elasticsearch/utils.py b/guillotina_elasticsearch/utils.py index a344256..b491ea7 100644 --- a/guillotina_elasticsearch/utils.py +++ b/guillotina_elasticsearch/utils.py @@ -6,7 +6,7 @@ from guillotina.content import IResourceFactory from guillotina.interfaces import ICatalogUtility from guillotina.schema.interfaces import ICollection -from guillotina.utils import get_current_request +from guillotina.utils.misc import get_current_container from guillotina_elasticsearch.interfaces import IIndexActive from guillotina_elasticsearch.interfaces import IIndexManager from guillotina_elasticsearch.interfaces import SUB_INDEX_SEPERATOR @@ -81,6 +81,7 @@ async def get_content_sub_indexes(container, path=None): im = get_adapter(container, IIndexManager) index_name = await im.get_index_name() query = { + "size": 50, "query": { "constant_score": { "filter": { @@ -106,16 +107,25 @@ async def get_content_sub_indexes(container, path=None): "depth": {"gte": path.count('/') + 1} } }) - results = await search.get_connection().search( + conn = search.get_connection() + q_result = await conn.search( index=index_name, _source=False, - stored_fields='elastic_index,path', body=query) - indexes = [] - for item in results['hits']['hits']: - indexes.append({ + stored_fields='elastic_index,path', body=query, + scroll="1m") + indexes = [{ + 'path': item['fields']['path'][0], + 'oid': item['_id'], + 'index': item['fields']['elastic_index'][0] + } for item in q_result['hits']['hits']] + + if len(q_result["hits"]["hits"]) >= 50: + q_result = await conn.scroll( + scroll_id=q_result['_scroll_id'], scroll="1m") + [indexes.append({ 'path': item['fields']['path'][0], 'oid': item['_id'], 'index': item['fields']['elastic_index'][0] - }) + }) for item in q_result['hits']['hits']] return indexes @@ -127,13 +137,11 @@ async def get_all_indexes_identifier(container=None, index_manager=None): index_name, index_name, SUB_INDEX_SEPERATOR) -async def get_index_for(context, container=None, request=None): +async def get_index_for(context, container=None): im = find_index_manager(parent=context) if im is None: if container is None: - if request is None: - request = get_current_request() - container = request.container + container = get_current_container() im = get_adapter(container, IIndexManager) return await im.get_index_name() diff --git a/setup.py b/setup.py index 667d1c5..107f9b6 100644 --- a/setup.py +++ b/setup.py @@ -4,16 +4,13 @@ test_requires = [ - 'pytest>=3.6', - 'docker', - 'backoff', - 'psycopg2-binary', + 'async_asgi_testclient', + 'pytest>=5.0', 'pytest-asyncio', 'coverage', 'pytest-cov', - 'pytest-aiohttp', - 'pytest-rerunfailures', - 'pytest-docker-fixtures>=1.3.0' + 'pytest-docker-fixtures>=1.3.0', + 'pytest-docker-fixtures[pg]' ] @@ -27,25 +24,25 @@ long_description=(open('README.rst').read() + '\n' + open('CHANGELOG.rst').read()), classifiers=[ - 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Topic :: Software Development :: Libraries :: Python Modules', ], - url='https://pypi.python.org/pypi/guillotina_elasticsearch', + url='https://github.com/plone/guillotina_elasticsearch', license='GPL version 3', setup_requires=[ 'pytest-runner', ], zip_safe=True, include_package_data=True, + package_data={"": ["*.txt", "*.rst"], "guillotina_elasticsearch": ["py.typed"]}, packages=find_packages(exclude=['ez_setup']), install_requires=[ - 'guillotina>=5.0.0a3', + 'guillotina>=6.0.0a16', 'mypy_extensions', - 'aioelasticsearch', - 'ujson', + 'aioelasticsearch<0.7.0', 'lru-dict', 'backoff', - 'asyncpg' ], tests_require=test_requires, extras_require={