diff --git a/include/CommonAPI/Event.hpp b/include/CommonAPI/Event.hpp index 5fbec00..aa29e15 100644 --- a/include/CommonAPI/Event.hpp +++ b/include/CommonAPI/Event.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -34,7 +35,7 @@ class Event { typedef std::set SubscriptionsSet; typedef std::function ErrorListener; typedef std::tuple Listeners; - typedef std::map ListenersMap; + typedef std::map> ListenersMap; /** * \brief Constructor @@ -93,177 +94,101 @@ class Event { ListenersMap subscriptions_; Subscription nextSubscription_; - ListenersMap pendingSubscriptions_; - SubscriptionsSet pendingUnsubscriptions_; - - std::mutex notificationMutex_; std::mutex subscriptionMutex_; }; template typename Event::Subscription Event::subscribe(Listener listener, ErrorListener errorListener) { + std::shared_ptr listeners = std::make_shared(std::move(listener), std::move(errorListener)); Subscription subscription; bool isFirstListener; - Listeners listeners; - - subscriptionMutex_.lock(); - subscription = nextSubscription_++; - isFirstListener = (0 == pendingSubscriptions_.size()) && (pendingUnsubscriptions_.size() == subscriptions_.size()); - listener = std::move(listener); - listeners = std::make_tuple(listener, std::move(errorListener)); - pendingSubscriptions_[subscription] = std::move(listeners); - subscriptionMutex_.unlock(); + { + std::lock_guard guard(subscriptionMutex_); + isFirstListener = subscriptions_.empty(); + subscriptions_[nextSubscription_] = listeners; + subscription = nextSubscription_++; + } if (isFirstListener) - onFirstListenerAdded(listener); - onListenerAdded(listener, subscription); + onFirstListenerAdded(std::get<0>(*listeners)); + onListenerAdded(std::get<0>(*listeners), subscription); return subscription; } template void Event::unsubscribe(const Subscription subscription) { - bool isLastListener(false); - bool hasUnsubscribed(false); - Listener listener; - - subscriptionMutex_.lock(); - auto listenerIterator = subscriptions_.find(subscription); - if (subscriptions_.end() != listenerIterator) { - if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) { - if (0 == pendingSubscriptions_.erase(subscription)) { - pendingUnsubscriptions_.insert(subscription); - listener = std::get<0>(listenerIterator->second); - hasUnsubscribed = true; - } - isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size()); - } - } - else { - listenerIterator = pendingSubscriptions_.find(subscription); - if (pendingSubscriptions_.end() != listenerIterator) { - listener = std::get<0>(listenerIterator->second); - if (0 != pendingSubscriptions_.erase(subscription)) { - isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size()); - hasUnsubscribed = true; - } + bool isLastListener = false; + bool hasUnsubscribed = false; + std::shared_ptr listeners; + { + std::lock_guard guard(subscriptionMutex_); + auto listenerIterator = subscriptions_.find(subscription); + if (subscriptions_.end() != listenerIterator) { + listeners = listenerIterator->second; + subscriptions_.erase(listenerIterator); + hasUnsubscribed = true; + isLastListener = subscriptions_.empty(); } } - isLastListener = isLastListener && (0 == pendingSubscriptions_.size()); - subscriptionMutex_.unlock(); if (hasUnsubscribed) { - onListenerRemoved(listener, subscription); + onListenerRemoved(std::get<0>(*listeners), subscription); if (isLastListener) { - onLastListenerRemoved(listener); + onLastListenerRemoved(std::get<0>(*listeners)); } } } template void Event::notifyListeners(const Arguments_&... eventArguments) { - subscriptionMutex_.lock(); - notificationMutex_.lock(); - for (auto iterator = pendingUnsubscriptions_.begin(); - iterator != pendingUnsubscriptions_.end(); - iterator++) { - subscriptions_.erase(*iterator); - } - pendingUnsubscriptions_.clear(); - - for (auto iterator = pendingSubscriptions_.begin(); - iterator != pendingSubscriptions_.end(); - iterator++) { - subscriptions_.insert(*iterator); + std::vector> listeners; + { + std::lock_guard guard(subscriptionMutex_); + listeners.reserve(subscriptions_.size()); + for (const auto& element : subscriptions_) { + listeners.emplace_back(element.second); + } } - pendingSubscriptions_.clear(); - subscriptionMutex_.unlock(); - for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) { - (std::get<0>(iterator->second))(eventArguments...); + for (const auto& element : listeners) { + (std::get<0>(*element))(eventArguments...); } - - notificationMutex_.unlock(); } template void Event::notifySpecificListener(const Subscription subscription, const Arguments_&... eventArguments) { - subscriptionMutex_.lock(); - notificationMutex_.lock(); - for (auto iterator = pendingUnsubscriptions_.begin(); - iterator != pendingUnsubscriptions_.end(); - iterator++) { - subscriptions_.erase(*iterator); - } - pendingUnsubscriptions_.clear(); - - for (auto iterator = pendingSubscriptions_.begin(); - iterator != pendingSubscriptions_.end(); - iterator++) { - - subscriptions_.insert(*iterator); - } - pendingSubscriptions_.clear(); - - - subscriptionMutex_.unlock(); - for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) { - if (subscription == iterator->first) { - (std::get<0>(iterator->second))(eventArguments...); + std::shared_ptr listeners; + { + std::lock_guard guard(subscriptionMutex_); + auto iterator = subscriptions_.find(subscription); + if (iterator != subscriptions_.end()) { + listeners = iterator->second; } } - notificationMutex_.unlock(); + if (listeners) { + (std::get<0>(*listeners))(eventArguments...); + } } template void Event::notifySpecificError(const Subscription subscription, const CallStatus status) { - - subscriptionMutex_.lock(); - notificationMutex_.lock(); - for (auto iterator = pendingUnsubscriptions_.begin(); - iterator != pendingUnsubscriptions_.end(); - iterator++) { - subscriptions_.erase(*iterator); - } - pendingUnsubscriptions_.clear(); - - for (auto iterator = pendingSubscriptions_.begin(); - iterator != pendingSubscriptions_.end(); - iterator++) { - subscriptions_.insert(*iterator); - } - pendingSubscriptions_.clear(); - - subscriptionMutex_.unlock(); - for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) { - if (subscription == iterator->first) { - ErrorListener listener = std::get<1>(iterator->second); - if (listener) { - listener(status); - } + std::shared_ptr listeners; + { + std::lock_guard guard(subscriptionMutex_); + auto iterator = subscriptions_.find(subscription); + if (iterator != subscriptions_.end()) { + listeners = iterator->second; } } - notificationMutex_.unlock(); + if (listeners) { + (std::get<1>(*listeners))(status); + } if (status != CommonAPI::CallStatus::SUCCESS) { - subscriptionMutex_.lock(); - auto listenerIterator = subscriptions_.find(subscription); - if (subscriptions_.end() != listenerIterator) { - if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) { - if (0 == pendingSubscriptions_.erase(subscription)) { - pendingUnsubscriptions_.insert(subscription); - } - } - } - else { - listenerIterator = pendingSubscriptions_.find(subscription); - if (pendingSubscriptions_.end() != listenerIterator) { - pendingSubscriptions_.erase(subscription); - } - } - subscriptionMutex_.unlock(); + unsubscribe(subscription); } }