diff --git a/inbox/actions/__init__.py b/inbox/actions/__init__.py index f739a270a..ee740e82a 100644 --- a/inbox/actions/__init__.py +++ b/inbox/actions/__init__.py @@ -31,7 +31,6 @@ from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound from inbox.models import Account, Message, Thread -from inbox.models.session import session_scope from inbox.models.action_log import schedule_action from inbox.sendmail.base import (generate_attachments, get_sendmail_client, SendMailException) @@ -134,23 +133,21 @@ def save_draft(account_id, message_id, db_session): def delete_draft(account_id, draft_id, db_session, args): """ Delete a draft from the remote backend. """ inbox_uid = args.get('inbox_uid') + account = db_session.query(Account).get(account_id) - with session_scope(ignore_soft_deletes=False) as db_session: - account = db_session.query(Account).get(account_id) - - # Non-Inbox created draft, therefore standard delete - if inbox_uid is None: - draft = db_session.query(Message).get(draft_id) - remote_delete = \ - module_registry[account.provider].remote_delete - remote_delete(account, account.drafts_folder.name, - draft.thread_id, db_session) - # Inbox created draft, therefore use X-INBOX header - else: - remote_delete_draft = \ - module_registry[account.provider].remote_delete_draft - remote_delete_draft(account, account.drafts_folder.name, - inbox_uid, db_session) + # Non-Inbox created draft, therefore standard delete + if inbox_uid is None: + draft = db_session.query(Message).get(draft_id) + remote_delete = \ + module_registry[account.provider].remote_delete + remote_delete(account, account.drafts_folder.name, + draft.thread_id, db_session) + # Inbox created draft, therefore use X-INBOX header + else: + remote_delete_draft = \ + module_registry[account.provider].remote_delete_draft + remote_delete_draft(account, account.drafts_folder.name, + inbox_uid, db_session) def send_directly(account_id, draft_id, db_session): diff --git a/inbox/transactions/actions.py b/inbox/transactions/actions.py index ce32b22ad..d0e87da24 100644 --- a/inbox/transactions/actions.py +++ b/inbox/transactions/actions.py @@ -5,7 +5,9 @@ * Make this more robust across multiple machines. If you started two instances talking to the same database backend things could go really badly. """ +from collections import defaultdict import gevent +from gevent.coros import BoundedSemaphore from sqlalchemy import asc from inbox.util.concurrency import retry_with_logging @@ -43,14 +45,18 @@ } +CONCURRENCY_LIMIT = 3 + + class SyncbackService(gevent.Greenlet): """Asynchronously consumes the action log and executes syncback actions.""" - def __init__(self, poll_interval=1, chunk_size=100, max_pool_size=22): + def __init__(self, poll_interval=1, chunk_size=100): + semaphore_factory = lambda: BoundedSemaphore(CONCURRENCY_LIMIT) + self.semaphore_map = defaultdict(semaphore_factory) self.keep_running = True self.running = False self.log = logger.new(component='syncback') - self.worker_pool = gevent.pool.Pool(max_pool_size) self.poll_interval = poll_interval self.chunk_size = chunk_size self._scheduled_actions = set() @@ -72,15 +78,15 @@ def _process_log(self): namespace = db_session.query(Namespace). \ get(log_entry.namespace_id) self._scheduled_actions.add(log_entry.id) - worker = SyncbackWorker(action_function, log_entry.id, - log_entry.record_id, - namespace.account_id, - syncback_service=self, - extra_args=log_entry.extra_args) self.log.info('delegating action', action_id=log_entry.id, msg=log_entry.action) - self.worker_pool.start(worker) + semaphore = self.semaphore_map[(namespace.account_id, + log_entry.action)] + gevent.spawn(syncback_worker, semaphore, action_function, + log_entry.id, log_entry.record_id, + namespace.account_id, syncback_service=self, + extra_args=log_entry.extra_args) def remove_from_schedule(self, log_entry_id): self._scheduled_actions.discard(log_entry_id) @@ -117,46 +123,32 @@ def stop(self): gevent.sleep() -class SyncbackWorker(gevent.Greenlet): - """A greenlet spawned to execute a single syncback action.""" - def __init__(self, func, action_log_id, record_id, account_id, - syncback_service, retry_interval=30, extra_args=None): - self.func = func - self.action_log_id = action_log_id - self.record_id = record_id - self.account_id = account_id - self.syncback_service = syncback_service - self.retry_interval = retry_interval - self.extra_args = extra_args - - self.log = logger.new(record_id=record_id, action_log_id=action_log_id, - action=self.func, account_id=self.account_id, - extra_args=extra_args) - gevent.Greenlet.__init__(self) - - def _run(self): - # Not ignoring soft-deleted objects here because if you, say, delete a - # draft, we still need to access the object to delete it on the remote. - with session_scope(ignore_soft_deletes=False) as db_session: +def syncback_worker(semaphore, func, action_log_id, record_id, account_id, + syncback_service, retry_interval=30, extra_args=None): + with semaphore: + log = logger.new(record_id=record_id, action_log_id=action_log_id, + action=func, account_id=account_id, + extra_args=extra_args) + # Not ignoring soft-deleted objects here because if you, say, + # delete a draft, we still need to access the object to delete it + # on the remote. try: - if self.extra_args: - self.func(self.account_id, self.record_id, db_session, - self.extra_args) - else: - self.func(self.account_id, self.record_id, db_session) + with session_scope(ignore_soft_deletes=False) as db_session: + if extra_args: + func(account_id, record_id, db_session, extra_args) + else: + func(account_id, record_id, db_session) + action_log_entry = db_session.query(ActionLog).get( + action_log_id) + action_log_entry.executed = True + db_session.commit() + log.info('syncback action completed', + action_id=action_log_id) + syncback_service.remove_from_schedule(action_log_id) except Exception: - log_uncaught_errors(self.log) + log_uncaught_errors(log) # Wait for a bit, then remove the log id from the scheduled set # so that it can be retried. - gevent.sleep(self.retry_interval) - self.syncback_service.remove_from_schedule(self.action_log_id) + gevent.sleep(retry_interval) + syncback_service.remove_from_schedule(action_log_id) raise - else: - action_log_entry = db_session.query(ActionLog).get( - self.action_log_id) - action_log_entry.executed = True - db_session.commit() - - self.log.info('syncback action completed', - action_id=self.action_log_id) - self.syncback_service.remove_from_schedule(self.action_log_id)