Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential deadlocks when building proxies/registering stubs in callbacks from events #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 52 additions & 127 deletions include/CommonAPI/Event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <functional>
#include <mutex>
#include <map>
#include <memory>
#include <set>
#include <tuple>

Expand All @@ -34,7 +35,7 @@ class Event {
typedef std::set<Subscription> SubscriptionsSet;
typedef std::function<void(const CallStatus)> ErrorListener;
typedef std::tuple<Listener, ErrorListener> Listeners;
typedef std::map<Subscription, Listeners> ListenersMap;
typedef std::map<Subscription, std::shared_ptr<Listeners>> ListenersMap;

/**
* \brief Constructor
Expand Down Expand Up @@ -93,177 +94,101 @@ class Event {
ListenersMap subscriptions_;
Subscription nextSubscription_;

ListenersMap pendingSubscriptions_;
SubscriptionsSet pendingUnsubscriptions_;

std::mutex notificationMutex_;
std::mutex subscriptionMutex_;
};

template<typename ... Arguments_>
typename Event<Arguments_...>::Subscription Event<Arguments_...>::subscribe(Listener listener, ErrorListener errorListener) {
std::shared_ptr<Listeners> listeners = std::make_shared<Listeners>(std::move(listener), std::move(errorListener));
Subscription subscription;
bool isFirstListener;
Listeners listeners;

subscriptionMutex_.lock();
subscription = nextSubscription_++;
isFirstListener = (0 == pendingSubscriptions_.size()) && (pendingUnsubscriptions_.size() == subscriptions_.size());
listener = std::move(listener);
listeners = std::make_tuple(listener, std::move(errorListener));
pendingSubscriptions_[subscription] = std::move(listeners);
subscriptionMutex_.unlock();
{
std::lock_guard<std::mutex> guard(subscriptionMutex_);
isFirstListener = subscriptions_.empty();
subscriptions_[nextSubscription_] = listeners;
subscription = nextSubscription_++;
}

if (isFirstListener)
onFirstListenerAdded(listener);
onListenerAdded(listener, subscription);
onFirstListenerAdded(std::get<0>(*listeners));
onListenerAdded(std::get<0>(*listeners), subscription);

return subscription;
}

template<typename ... Arguments_>
void Event<Arguments_...>::unsubscribe(const Subscription subscription) {
bool isLastListener(false);
bool hasUnsubscribed(false);
Listener listener;

subscriptionMutex_.lock();
auto listenerIterator = subscriptions_.find(subscription);
if (subscriptions_.end() != listenerIterator) {
if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
if (0 == pendingSubscriptions_.erase(subscription)) {
pendingUnsubscriptions_.insert(subscription);
listener = std::get<0>(listenerIterator->second);
hasUnsubscribed = true;
}
isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
}
}
else {
listenerIterator = pendingSubscriptions_.find(subscription);
if (pendingSubscriptions_.end() != listenerIterator) {
listener = std::get<0>(listenerIterator->second);
if (0 != pendingSubscriptions_.erase(subscription)) {
isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
hasUnsubscribed = true;
}
bool isLastListener = false;
bool hasUnsubscribed = false;
std::shared_ptr<Listeners> listeners;
{
std::lock_guard<std::mutex> guard(subscriptionMutex_);
auto listenerIterator = subscriptions_.find(subscription);
if (subscriptions_.end() != listenerIterator) {
listeners = listenerIterator->second;
subscriptions_.erase(listenerIterator);
hasUnsubscribed = true;
isLastListener = subscriptions_.empty();
}
}
isLastListener = isLastListener && (0 == pendingSubscriptions_.size());
subscriptionMutex_.unlock();

if (hasUnsubscribed) {
onListenerRemoved(listener, subscription);
onListenerRemoved(std::get<0>(*listeners), subscription);
if (isLastListener) {
onLastListenerRemoved(listener);
onLastListenerRemoved(std::get<0>(*listeners));
}
}
}

template<typename ... Arguments_>
void Event<Arguments_...>::notifyListeners(const Arguments_&... eventArguments) {
subscriptionMutex_.lock();
notificationMutex_.lock();
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
subscriptions_.erase(*iterator);
}
pendingUnsubscriptions_.clear();

for (auto iterator = pendingSubscriptions_.begin();
iterator != pendingSubscriptions_.end();
iterator++) {
subscriptions_.insert(*iterator);
std::vector<std::shared_ptr<Listeners>> listeners;
{
std::lock_guard<std::mutex> guard(subscriptionMutex_);
listeners.reserve(subscriptions_.size());
for (const auto& element : subscriptions_) {
listeners.emplace_back(element.second);
}
}
pendingSubscriptions_.clear();

subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
(std::get<0>(iterator->second))(eventArguments...);
for (const auto& element : listeners) {
(std::get<0>(*element))(eventArguments...);
}

notificationMutex_.unlock();
}

template<typename ... Arguments_>
void Event<Arguments_...>::notifySpecificListener(const Subscription subscription, const Arguments_&... eventArguments) {
subscriptionMutex_.lock();
notificationMutex_.lock();
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
subscriptions_.erase(*iterator);
}
pendingUnsubscriptions_.clear();

for (auto iterator = pendingSubscriptions_.begin();
iterator != pendingSubscriptions_.end();
iterator++) {

subscriptions_.insert(*iterator);
}
pendingSubscriptions_.clear();


subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
if (subscription == iterator->first) {
(std::get<0>(iterator->second))(eventArguments...);
std::shared_ptr<Listeners> listeners;
{
std::lock_guard<std::mutex> guard(subscriptionMutex_);
auto iterator = subscriptions_.find(subscription);
if (iterator != subscriptions_.end()) {
listeners = iterator->second;
}
}

notificationMutex_.unlock();
if (listeners) {
(std::get<0>(*listeners))(eventArguments...);
}
}

template<typename ... Arguments_>
void Event<Arguments_...>::notifySpecificError(const Subscription subscription, const CallStatus status) {

subscriptionMutex_.lock();
notificationMutex_.lock();
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
subscriptions_.erase(*iterator);
}
pendingUnsubscriptions_.clear();

for (auto iterator = pendingSubscriptions_.begin();
iterator != pendingSubscriptions_.end();
iterator++) {
subscriptions_.insert(*iterator);
}
pendingSubscriptions_.clear();

subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
if (subscription == iterator->first) {
ErrorListener listener = std::get<1>(iterator->second);
if (listener) {
listener(status);
}
std::shared_ptr<Listeners> listeners;
{
std::lock_guard<std::mutex> guard(subscriptionMutex_);
auto iterator = subscriptions_.find(subscription);
if (iterator != subscriptions_.end()) {
listeners = iterator->second;
}
}

notificationMutex_.unlock();
if (listeners) {
(std::get<1>(*listeners))(status);
}

if (status != CommonAPI::CallStatus::SUCCESS) {
subscriptionMutex_.lock();
auto listenerIterator = subscriptions_.find(subscription);
if (subscriptions_.end() != listenerIterator) {
if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
if (0 == pendingSubscriptions_.erase(subscription)) {
pendingUnsubscriptions_.insert(subscription);
}
}
}
else {
listenerIterator = pendingSubscriptions_.find(subscription);
if (pendingSubscriptions_.end() != listenerIterator) {
pendingSubscriptions_.erase(subscription);
}
}
subscriptionMutex_.unlock();
unsubscribe(subscription);
}
}

Expand Down