Skip to content

Commit

Permalink
Add Listener::isListening
Browse files Browse the repository at this point in the history
Requested feature in #48
  • Loading branch information
gelldur committed Jun 17, 2023
1 parent b30f874 commit ed4b80d
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(CPACK_GENERATOR "" CACHE STRING "Set packages CPack should build e.g. ZIP;TG

# BUILD_SHARED_LIBS can controll build type!
project(EventBus
VERSION 3.0.3
VERSION 3.0.4
LANGUAGES CXX
)

Expand Down
12 changes: 12 additions & 0 deletions lib/src/dexode/EventBus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ eventbus::stream::EventStream* EventBus::obtainStream(
}
}

eventbus::stream::EventStream* EventBus::streamForEvent(
eventbus::internal::event_id_t eventID) const
{
std::lock_guard writeGuard{_mutexStreams};
auto* found = findStreamUnsafe(eventID);
if(found != nullptr)
{
return found;
}
return nullptr;
}

bool EventBus::postponeEvent(eventbus::PostponeHelper& postponeCall)
{
auto* eventStream = obtainStream(postponeCall.eventID, postponeCall.createStreamCallback);
Expand Down
3 changes: 3 additions & 0 deletions lib/src/dexode/EventBus.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class EventBus : public dexode::eventbus::Bus
std::size_t processLimit(std::size_t limit);

protected:
eventbus::stream::EventStream* streamForEvent(
eventbus::internal::event_id_t eventID) const override;

eventbus::stream::EventStream* obtainStream(
eventbus::internal::event_id_t eventID,
eventbus::CreateStreamCallback createStreamCallback);
Expand Down
3 changes: 3 additions & 0 deletions lib/src/dexode/eventbus/Bus.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class Bus
virtual void unlistenAll(std::uint32_t listenerID) = 0;
virtual void unlisten(std::uint32_t listenerID, internal::event_id_t eventID) = 0;

virtual eventbus::stream::EventStream* streamForEvent(
eventbus::internal::event_id_t eventID) const = 0;

private:
std::atomic<std::uint32_t> _lastID{0};

Expand Down
13 changes: 13 additions & 0 deletions lib/src/dexode/eventbus/Listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,19 @@ class Listener
return _bus;
}

template <typename Event>
[[nodiscard]] bool isListening() const
{
static_assert(internal::validateEvent<Event>(), "Invalid event");
if(_bus == nullptr)
{
throw std::runtime_error{"bus is null"};
}
return internal::ListenerAttorney<Bus>::isListening(*_bus
, _id
, internal::event_id<Event>());
}

private:
std::uint32_t _id = 0;
std::shared_ptr<Bus> _bus = nullptr;
Expand Down
13 changes: 13 additions & 0 deletions lib/src/dexode/eventbus/internal/ListenerAttorney.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <functional>

#include "dexode/eventbus/internal/event_id.hpp"
#include "dexode/eventbus/stream/EventStream.hpp"

namespace dexode::eventbus
{
Expand Down Expand Up @@ -49,6 +50,18 @@ class ListenerAttorney
{
bus.unlisten(listenerID, eventID);
}

static constexpr bool isListening(EventBus_t& bus,
const std::uint32_t listenerID,
const event_id_t eventID)
{
const eventbus::stream::EventStream* stream = bus.streamForEvent(eventID);
if(stream != nullptr)
{
return stream->hasListener(listenerID);
}
return false;
}
};

} // namespace dexode::eventbus::internal
6 changes: 6 additions & 0 deletions lib/src/dexode/eventbus/stream/EventStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class EventStream

virtual bool addListener(std::uint32_t listenerID, std::any callback) = 0;
virtual bool removeListener(std::uint32_t listenerID) = 0;

[[nodiscard]] virtual bool hasListener(std::uint32_t listenerID) const = 0;
};

class NoopEventStream : public EventStream
Expand All @@ -37,6 +39,10 @@ class NoopEventStream : public EventStream
{
throw std::runtime_error{"Noop"};
}
[[nodiscard]] bool hasListener(std::uint32_t listenerID) const override
{
throw std::runtime_error{"Noop"};
}
};

} // namespace dexode::eventbus::stream
11 changes: 9 additions & 2 deletions lib/src/dexode/eventbus/stream/ProtectedEventStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ class ProtectedEventStream : public EventStream
return not _queue.empty();
}

[[nodiscard]] bool hasListener(std::uint32_t listenerID) const override
{
std::shared_lock readGuard{_mutexCallbacks};
auto found = std::find(_listenerIDs.begin(), _listenerIDs.end(), listenerID);
return found != _listenerIDs.end();
}

private:
std::vector<std::uint32_t> _listenerIDs;
std::vector<Event> _queue;
Expand All @@ -106,8 +113,8 @@ class ProtectedEventStream : public EventStream
std::atomic<bool> _isProcessing{false};
std::vector<std::pair<std::uint32_t, Callback>> _waiting;

std::shared_mutex _mutexEvent;
std::shared_mutex _mutexCallbacks;
mutable std::shared_mutex _mutexEvent;
mutable std::shared_mutex _mutexCallbacks;

void flushWaitingOnes()
{
Expand Down
34 changes: 34 additions & 0 deletions test/unit/src/dexode/eventbus/test/SuiteListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,38 @@ TEST_CASE("Should not allow for mistake with move ctor", "[EventBus][Listener]")
REQUIRE(TestClazz::counter == 1);
}


TEST_CASE("Should allow to check if listener is already listening", "[EventBus][Listener]")
{
// Related to Github Issue: https://github.com/gelldur/EventBus/issues/48
EventBus bus;
int callCount = 0;
auto listener = Listener::createNotOwning(bus);

CHECK_FALSE(listener.isListening<event::Value>());

bus.postpone(event::Value{3});
REQUIRE(bus.process() == 1);
REQUIRE(callCount == 0); // not listening

listener.listen<event::Value>([&](const event::Value& event)
{
REQUIRE(event.value == 2);
++callCount;
});
CHECK(listener.isListening<event::Value>());

bus.postpone(event::Value{2});
REQUIRE(bus.process() == 1);
REQUIRE(callCount == 1);

CHECK(listener.isListening<event::Value>());
listener.unlisten<event::Value>();
CHECK_FALSE(listener.isListening<event::Value>());

bus.postpone(event::Value{1});
REQUIRE(bus.process() == 1);
REQUIRE(callCount == 1);
}

} // namespace dexode::eventbus::test

0 comments on commit ed4b80d

Please sign in to comment.