Skip to content

Commit

Permalink
Improve syncback concurrency limit.
Browse files Browse the repository at this point in the history
Summary:
There's obviously more work to be done here, but in the meantime this commit
removes the global limit on the number of syncback actions that can be tried
concurrently. Instead, it bounds the number of concurrent syncback greenlets
for each (account_id, action_type) pair.

The reason for this is that previously, if say archiving was broken for a
particular account, but there were a bunch of archive actions outstanding for
it, the entire syncback service could grind to a halt. Not ideal.

Smaller associated changes:
 * Make the SyncbackWorker class just be a simple function.
 * Refactor to hoist sleeping-before-retrying out of the database session
   scope.
 * Remove nested database session in delete_draft.

Test Plan: Run some syncback actions.

Reviewers: charles

Reviewed By: charles

Differential Revision: https://review.inboxapp.com/D440
  • Loading branch information
emfree authored and Charles Gruenwald committed Sep 9, 2014
1 parent 51e8251 commit d8e9927
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 63 deletions.
31 changes: 14 additions & 17 deletions inbox/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound

from inbox.models import Account, Message, Thread
from inbox.models.session import session_scope
from inbox.models.action_log import schedule_action
from inbox.sendmail.base import (generate_attachments, get_sendmail_client,
SendMailException)
Expand Down Expand Up @@ -134,23 +133,21 @@ def save_draft(account_id, message_id, db_session):
def delete_draft(account_id, draft_id, db_session, args):
""" Delete a draft from the remote backend. """
inbox_uid = args.get('inbox_uid')
account = db_session.query(Account).get(account_id)

with session_scope(ignore_soft_deletes=False) as db_session:
account = db_session.query(Account).get(account_id)

# Non-Inbox created draft, therefore standard delete
if inbox_uid is None:
draft = db_session.query(Message).get(draft_id)
remote_delete = \
module_registry[account.provider].remote_delete
remote_delete(account, account.drafts_folder.name,
draft.thread_id, db_session)
# Inbox created draft, therefore use X-INBOX header
else:
remote_delete_draft = \
module_registry[account.provider].remote_delete_draft
remote_delete_draft(account, account.drafts_folder.name,
inbox_uid, db_session)
# Non-Inbox created draft, therefore standard delete
if inbox_uid is None:
draft = db_session.query(Message).get(draft_id)
remote_delete = \
module_registry[account.provider].remote_delete
remote_delete(account, account.drafts_folder.name,
draft.thread_id, db_session)
# Inbox created draft, therefore use X-INBOX header
else:
remote_delete_draft = \
module_registry[account.provider].remote_delete_draft
remote_delete_draft(account, account.drafts_folder.name,
inbox_uid, db_session)


def send_directly(account_id, draft_id, db_session):
Expand Down
84 changes: 38 additions & 46 deletions inbox/transactions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
* Make this more robust across multiple machines. If you started two instances
talking to the same database backend things could go really badly.
"""
from collections import defaultdict
import gevent
from gevent.coros import BoundedSemaphore
from sqlalchemy import asc

from inbox.util.concurrency import retry_with_logging
Expand Down Expand Up @@ -43,14 +45,18 @@
}


CONCURRENCY_LIMIT = 3


class SyncbackService(gevent.Greenlet):
"""Asynchronously consumes the action log and executes syncback actions."""

def __init__(self, poll_interval=1, chunk_size=100, max_pool_size=22):
def __init__(self, poll_interval=1, chunk_size=100):
semaphore_factory = lambda: BoundedSemaphore(CONCURRENCY_LIMIT)
self.semaphore_map = defaultdict(semaphore_factory)
self.keep_running = True
self.running = False
self.log = logger.new(component='syncback')
self.worker_pool = gevent.pool.Pool(max_pool_size)
self.poll_interval = poll_interval
self.chunk_size = chunk_size
self._scheduled_actions = set()
Expand All @@ -72,15 +78,15 @@ def _process_log(self):
namespace = db_session.query(Namespace). \
get(log_entry.namespace_id)
self._scheduled_actions.add(log_entry.id)
worker = SyncbackWorker(action_function, log_entry.id,
log_entry.record_id,
namespace.account_id,
syncback_service=self,
extra_args=log_entry.extra_args)
self.log.info('delegating action',
action_id=log_entry.id,
msg=log_entry.action)
self.worker_pool.start(worker)
semaphore = self.semaphore_map[(namespace.account_id,
log_entry.action)]
gevent.spawn(syncback_worker, semaphore, action_function,
log_entry.id, log_entry.record_id,
namespace.account_id, syncback_service=self,
extra_args=log_entry.extra_args)

def remove_from_schedule(self, log_entry_id):
self._scheduled_actions.discard(log_entry_id)
Expand Down Expand Up @@ -117,46 +123,32 @@ def stop(self):
gevent.sleep()


class SyncbackWorker(gevent.Greenlet):
"""A greenlet spawned to execute a single syncback action."""
def __init__(self, func, action_log_id, record_id, account_id,
syncback_service, retry_interval=30, extra_args=None):
self.func = func
self.action_log_id = action_log_id
self.record_id = record_id
self.account_id = account_id
self.syncback_service = syncback_service
self.retry_interval = retry_interval
self.extra_args = extra_args

self.log = logger.new(record_id=record_id, action_log_id=action_log_id,
action=self.func, account_id=self.account_id,
extra_args=extra_args)
gevent.Greenlet.__init__(self)

def _run(self):
# Not ignoring soft-deleted objects here because if you, say, delete a
# draft, we still need to access the object to delete it on the remote.
with session_scope(ignore_soft_deletes=False) as db_session:
def syncback_worker(semaphore, func, action_log_id, record_id, account_id,
syncback_service, retry_interval=30, extra_args=None):
with semaphore:
log = logger.new(record_id=record_id, action_log_id=action_log_id,
action=func, account_id=account_id,
extra_args=extra_args)
# Not ignoring soft-deleted objects here because if you, say,
# delete a draft, we still need to access the object to delete it
# on the remote.
try:
if self.extra_args:
self.func(self.account_id, self.record_id, db_session,
self.extra_args)
else:
self.func(self.account_id, self.record_id, db_session)
with session_scope(ignore_soft_deletes=False) as db_session:
if extra_args:
func(account_id, record_id, db_session, extra_args)
else:
func(account_id, record_id, db_session)
action_log_entry = db_session.query(ActionLog).get(
action_log_id)
action_log_entry.executed = True
db_session.commit()
log.info('syncback action completed',
action_id=action_log_id)
syncback_service.remove_from_schedule(action_log_id)
except Exception:
log_uncaught_errors(self.log)
log_uncaught_errors(log)
# Wait for a bit, then remove the log id from the scheduled set
# so that it can be retried.
gevent.sleep(self.retry_interval)
self.syncback_service.remove_from_schedule(self.action_log_id)
gevent.sleep(retry_interval)
syncback_service.remove_from_schedule(action_log_id)
raise
else:
action_log_entry = db_session.query(ActionLog).get(
self.action_log_id)
action_log_entry.executed = True
db_session.commit()

self.log.info('syncback action completed',
action_id=self.action_log_id)
self.syncback_service.remove_from_schedule(self.action_log_id)

0 comments on commit d8e9927

Please sign in to comment.