Skip to content

Commit

Permalink
Merge pull request #66 from plone/3.3.x-elastic7-to-master
Browse files Browse the repository at this point in the history
Support Elasticsearch 7.0 and Guillotina 6
  • Loading branch information
masipcat authored Mar 12, 2020
2 parents df5d37c + 8af23db commit 9ea4194
Show file tree
Hide file tree
Showing 30 changed files with 340 additions and 261 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
dist: xenial
language: python
python:
- "3.7"
- "3.7"
sudo: required
env:
- DATABASE=DUMMY
- DATABASE=postgresql
- ES_VERSION=7
- ES_VERSION=7 DATABASE=postgresql

services:
- postgresql
Expand All @@ -23,7 +23,7 @@ cache:
- eggs
install:
- pip install flake8 codecov mypy_extensions
- pip install git+https://github.com/plone/guillotina.git@master
- pip install -e .
- pip install -e .[test]
script:
- flake8 guillotina_elasticsearch --config=setup.cfg
Expand Down
60 changes: 58 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,63 @@
5.0.1 (unreleased)
6.0.0 (unreleased)
------------------

- Nothing changed yet.
- Support Guillotina 6
[masipcat]

- Support elasticsearch 7.0
[jordic]

- Make sure to save sub index changes in ES
[vangheem]

- Fix default index settings
[vangheem]

- Pinned aioelasticsearch to <0.6.0
[masipcat]

- Be able to import types
[vangheem]

- Retry conflict errors on delete by query

- Pay attention to trashed objects in pg
- Fix commands using missing attribute `self.request`

- ISecurityInfo can be async

- Fix not iterating over all content indexes in elasticsearch
[vangheem]

- build_security_query(): changed 'query.bool.filter' to use a list instead of a single object
[masipcat]

- Fix release

- Missing pg conn lock with vacuuming
[vangheem]

- Pass request on the index progress when possible

- Fix release

- Do not require request object for vacuuming
[vangheem]

- G5 support
[vangheem]

- Do not close indexes on create/delete
[vangheem]

- Handle another index not found error on vacuum
[vangheem]

- logging
[vangheem]

- Handle index not found error
[vangheem]


5.0.0 (2019-10-21)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.0.1.dev0
6.0.0.dev0
23 changes: 0 additions & 23 deletions config-opendistro.json

This file was deleted.

81 changes: 38 additions & 43 deletions guillotina_elasticsearch/commands/vacuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from guillotina.component import get_utility
from guillotina.db import ROOT_ID
from guillotina.db import TRASHED_ID
from guillotina.db.reader import reader
from guillotina.utils import get_object_by_uid
from guillotina.interfaces import ICatalogUtility
from guillotina.tests.utils import get_mocked_request
from guillotina.tests.utils import login
Expand All @@ -19,14 +19,18 @@

import aioelasticsearch
import asyncio
import elasticsearch
import json
import logging


logger = logging.getLogger('guillotina_elasticsearch_vacuum')

GET_CONTAINERS = 'select zoid from {objects_table} where parent_id = $1'
SELECT_BY_KEYS = '''SELECT zoid from {objects_table} where zoid = ANY($1)'''
SELECT_BY_KEYS = f'''
SELECT zoid from {{objects_table}}
where zoid = ANY($1) AND parent_id != '{TRASHED_ID}'
'''
GET_CHILDREN_BY_PARENT = """
SELECT zoid, parent_id, tid
FROM {objects_table}
Expand All @@ -36,10 +40,10 @@

PAGE_SIZE = 1000

GET_OBS_BY_TID = """
GET_OBS_BY_TID = f"""
SELECT zoid, parent_id, tid
FROM {objects_table}
WHERE of is NULL
FROM {{objects_table}}
WHERE of is NULL and parent_id != '{TRASHED_ID}'
ORDER BY tid ASC, zoid ASC
"""

Expand Down Expand Up @@ -95,14 +99,17 @@ async def iter_batched_es_keys(self):
indexes.append(index['index'])

for index_name in indexes:
result = await self.conn.search(
index=index_name,
scroll='15m',
size=PAGE_SIZE,
_source=False,
body={
"sort": ["_doc"]
})
try:
result = await self.conn.search(
index=index_name,
scroll='15m',
size=PAGE_SIZE,
_source=False,
body={
"sort": ["_doc"]
})
except elasticsearch.exceptions.NotFoundError:
continue
yield [r['_id'] for r in result['hits']['hits']], index_name
scroll_id = result['_scroll_id']
while scroll_id:
Expand Down Expand Up @@ -161,33 +168,15 @@ async def get_object(self, oid):
if oid in self.cache:
return self.cache[oid]

try:
result = self.txn._manager._hard_cache.get(oid, None)
except AttributeError:
from guillotina.db.transaction import HARD_CACHE # noqa
result = HARD_CACHE.get(oid, None)
if result is None:
result = await self.txn._cache.get(oid=oid)

if result is None:
result = await self.tm._storage.load(self.txn, oid)

obj = reader(result)
obj.__txn__ = self.txn
if result['parent_id']:
obj.__parent__ = await self.get_object(result['parent_id'])
return obj
return await get_object_by_uid(oid)

async def process_missing(self, oid, index_type='missing', folder=False):
# need to fill in parents in order for indexing to work...
logger.warning(f'Index {index_type} {oid}')
try:
obj = await self.get_object(oid)
except KeyError:
except (AttributeError, KeyError, TypeError, ModuleNotFoundError):
logger.warning(f'Could not find {oid}')
return
except (AttributeError, TypeError, ModuleNotFoundError):
logger.warning(f'Could not find {oid}', exc_info=True)
return # object or parent of object was removed, ignore
try:
if folder:
Expand Down Expand Up @@ -302,17 +291,23 @@ async def check_missing(self):
async for batch in self.iter_paged_db_keys([self.container.__uuid__]):
oids = [r['zoid'] for r in batch]
indexes = self.get_indexes_for_oids(oids)
results = await self.conn.search(
','.join(indexes), body={
'query': {
'terms': {
'uuid': oids
try:
results = await self.conn.search(
index=','.join(indexes),
body={
'query': {
'terms': {
'uuid': oids
}
}
}
},
_source=False,
stored_fields='tid,parent_uuid',
size=PAGE_SIZE)
},
_source=False,
stored_fields='tid,parent_uuid',
size=PAGE_SIZE)
except elasticsearch.exceptions.NotFoundError:
logger.warning(
f'Error searching index: {indexes}', exc_info=True)
continue

es_batch = {}
for result in results['hits']['hits']:
Expand Down
2 changes: 1 addition & 1 deletion guillotina_elasticsearch/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class IIndexProgress(Interface):
@implementer(IIndexProgress)
class IndexProgress(object):

def __init__(self, request, context, processed, total, completed=None):
def __init__(self, context, processed, total, completed=None, request=None): # noqa
self.request = request
self.context = context
self.processed = processed
Expand Down
7 changes: 7 additions & 0 deletions guillotina_elasticsearch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@

class QueryErrorException(HTTPException):
status_code = 488


class ElasticsearchConflictException(Exception):
def __init__(self, conflicts, resp):
self.conflicts = conflicts
self.response = resp
super().__init__(f"{self.conflicts} on ES request")
Loading

0 comments on commit 9ea4194

Please sign in to comment.