Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of message dispatching #864

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions doc/message_dispatching.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Message Dispatching Class Diagram

Check notice on line 1 in doc/message_dispatching.md

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

doc/message_dispatching.md#L1

Expected: [None]; Actual: # Message Dispatching Class Diagram

```mermaid
classDiagram
class MessageDispatcherInterface {
+dispatch_call(const json& call, bool triggered = false)
+dispatch_call_async(const json& call, bool triggered = false): std::future~EnhancedMessage~T~~
+dispatch_call_result(const json& call_result)
+dispatch_call_error(const json& call_error)
}

class v16_MessageDispatcher {
- MessageQueue& message_queue
- ChargePointConfiguration& configuration
- RegistrationStatus& registration_status
}

class v201_MessageDispatcher {
- MessageQueue& message_queue
- DeviceModel& device_model
- ConnectivityManager& connectivity_manager
- RegistrationStatusEnum& registration_status
}

class v201_DataTransferInterface {
+data_transfer_req(request: DataTransferRequest): std::optional~DataTransferResponse~
+handle_data_transfer_req(call: Call~DataTransferRequest~)
}

class v201_DataTransfer {
-MessageDispatcherInterface &message_dispatcher
-std::optional~function~ data_transfer_callback
}

class v201_ChargePoint {
std::unique_ptr~MessageDispatcherInterface~ message_dispatcher
std::unique_ptr~v201_DataTransferInterface~ data_transfer
}

class v16_ChargePoint {
std::unique_ptr~MessageDispatcherInterface~ message_dispatcher
}

MessageDispatcherInterface <|-- v16_MessageDispatcher
MessageDispatcherInterface <|-- v201_MessageDispatcher
v201_DataTransferInterface <|-- v201_DataTransfer
MessageDispatcherInterface *-- v201_DataTransfer
MessageDispatcherInterface *-- v201_ChargePoint
v201_DataTransferInterface *-- v201_ChargePoint
MessageDispatcherInterface *-- v16_ChargePoint
```
3 changes: 3 additions & 0 deletions include/ocpp/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <chrono>
#include <cstdint>

namespace ocpp {
Expand All @@ -22,4 +23,6 @@ constexpr float NO_LIMIT_SPECIFIED = -1.0;
constexpr std::int32_t NO_START_PERIOD = -1;
constexpr std::int32_t EVSEID_NOT_SET = -1;

constexpr std::chrono::seconds DEFAULT_WAIT_FOR_FUTURE_TIMEOUT = std::chrono::seconds(60);

} // namespace ocpp
39 changes: 39 additions & 0 deletions include/ocpp/common/message_dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Pionix GmbH and Contributors to EVerest

#pragma once

#include <ocpp/common/message_queue.hpp>

namespace ocpp {

/// \brief Interface for dispatching OCPP messages that shall be send over the websocket. This interface defines
/// dispatching of Call, CallResult and CallError messages.
/// \tparam T Type specifies the OCPP protocol version
template <typename T> class MessageDispatcherInterface {

public:
virtual ~MessageDispatcherInterface(){};

/// \brief Dispatches a Call message.
/// \param call the OCPP Call message.
/// \param triggered indicates if the call was triggered by a TriggerMessage. Default is false.
virtual void dispatch_call(const json& call, bool triggered = false) = 0;

/// \brief Dispatches a Call message asynchronously.
/// \param call the OCPP Call message.
/// \param triggered indicates if the call was triggered by a TriggerMessage. Default is false.
/// \return std::future<ocpp::EnhancedMessage<T>> Future object containing the enhanced message
/// result of type T.
virtual std::future<ocpp::EnhancedMessage<T>> dispatch_call_async(const json& call, bool triggered = false) = 0;

/// \brief Dispatches a CallResult message.
/// \param call_result the OCPP CallResult message.
virtual void dispatch_call_result(const json& call_result) = 0;

/// \brief Dispatches a CallError message.
/// \param call_result the OCPP CallError message.
virtual void dispatch_call_error(const json& call_error) = 0;
};

} // namespace ocpp
20 changes: 5 additions & 15 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,16 +630,7 @@ template <typename M> class MessageQueue {
}
}

/// \brief pushes a new \p call message onto the message queue
template <class T> void push(Call<T> call, const bool stall_until_accepted = false) {
if (!running) {
return;
}
json call_json = call;
push(call_json, stall_until_accepted);
}

void push(const json& message, const bool stall_until_accepted = false) {
void push_call(const json& message, const bool stall_until_accepted = false) {
if (!running) {
return;
}
Expand All @@ -664,16 +655,15 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_result message over the websocket
template <class T> void push(CallResult<T> call_result) {
void push_call_result(const json& call_result) {
if (!running) {
return;
}

this->send_callback(call_result);
{
std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
if (next_message_to_send.has_value()) {
if (next_message_to_send.value() == call_result.uniqueId) {
if (next_message_to_send.value() == call_result.at(MESSAGE_ID)) {
next_message_to_send.reset();
}
}
Expand All @@ -683,7 +673,7 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_error message over the websocket
void push(CallError call_error) {
void push_call_error(CallError call_error) {
if (!running) {
return;
}
Expand All @@ -703,7 +693,7 @@ template <typename M> class MessageQueue {

/// \brief pushes a new \p call message onto the message queue
/// \returns a future from which the CallResult can be extracted
template <class T> std::future<EnhancedMessage<M>> push_async(Call<T> call) {
std::future<EnhancedMessage<M>> push_call_async(const json& call) {
auto message = std::make_shared<ControlMessage<M>>(call);

if (!running) {
Expand Down
10 changes: 4 additions & 6 deletions include/ocpp/v16/charge_point_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

#include <ocpp/common/aligned_timer.hpp>
#include <ocpp/common/charging_station_base.hpp>
#include <ocpp/common/message_dispatcher.hpp>
#include <ocpp/common/message_queue.hpp>
#include <ocpp/common/schemas.hpp>
#include <ocpp/common/types.hpp>
#include <ocpp/common/websocket/websocket.hpp>
#include <ocpp/v16/charge_point_configuration.hpp>
#include <ocpp/v16/connector.hpp>
#include <ocpp/v16/database_handler.hpp>
#include <ocpp/v16/message_dispatcher.hpp>
#include <ocpp/v16/messages/Authorize.hpp>
#include <ocpp/v16/messages/BootNotification.hpp>
#include <ocpp/v16/messages/CancelReservation.hpp>
Expand Down Expand Up @@ -87,14 +89,15 @@ class ChargePointImpl : ocpp::ChargingStationBase {
BootReasonEnum bootreason;
ChargePointConnectionState connection_state;
bool boot_notification_callerror;
RegistrationStatus registration_status;
std::atomic<RegistrationStatus> registration_status;
DiagnosticsStatus diagnostics_status;
FirmwareStatus firmware_status;
bool firmware_update_is_pending = false;
UploadLogStatusEnumType log_status;
std::string message_log_path;

std::unique_ptr<Websocket> websocket;
std::unique_ptr<ocpp::MessageDispatcherInterface<MessageType>> message_dispatcher;
Everest::SteadyTimer websocket_timer;
std::unique_ptr<MessageQueue<v16::MessageType>> message_queue;
std::map<int32_t, std::shared_ptr<Connector>> connectors;
Expand Down Expand Up @@ -205,11 +208,6 @@ class ChargePointImpl : ocpp::ChargingStationBase {
std::unique_ptr<ocpp::MessageQueue<v16::MessageType>> create_message_queue();
void message_callback(const std::string& message);
void handle_message(const EnhancedMessage<v16::MessageType>& message);
template <class T> bool send(Call<T> call, bool initiated_by_trigger_message = false);
template <class T>
std::future<EnhancedMessage<v16::MessageType>> send_async(Call<T> call, bool initiated_by_trigger_message = false);
template <class T> bool send(CallResult<T> call_result);
bool send(CallError call_error);
void heartbeat(bool initiated_by_trigger_message = false);
void boot_notification(bool initiated_by_trigger_message = false);
void clock_aligned_meter_values_sample();
Expand Down
30 changes: 30 additions & 0 deletions include/ocpp/v16/message_dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Pionix GmbH and Contributors to EVerest

#pragma once

#include <ocpp/common/message_dispatcher.hpp>
#include <ocpp/v16/charge_point_configuration.hpp>

namespace ocpp {
namespace v16 {

class MessageDispatcher : public MessageDispatcherInterface<MessageType> {

public:
MessageDispatcher(ocpp::MessageQueue<MessageType>& message_queue, ChargePointConfiguration& configuration,
std::atomic<RegistrationStatus>& registration_status) :
message_queue(message_queue), configuration(configuration), registration_status(registration_status){};
void dispatch_call(const json& call, bool triggered = false) override;
std::future<ocpp::EnhancedMessage<MessageType>> dispatch_call_async(const json& call, bool triggered) override;
void dispatch_call_result(const json& call_result) override;
void dispatch_call_error(const json& call_error) override;

private:
ocpp::MessageQueue<MessageType>& message_queue;
ChargePointConfiguration& configuration;
std::atomic<RegistrationStatus>& registration_status;
};

} // namespace v16
} // namespace ocpp
17 changes: 6 additions & 11 deletions include/ocpp/v201/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <memory>
#include <set>

#include <ocpp/common/message_dispatcher.hpp>

#include <ocpp/common/charging_station_base.hpp>

#include <ocpp/v201/average_meter_values.hpp>
Expand Down Expand Up @@ -373,6 +375,8 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
std::unique_ptr<EvseManager> evse_manager;
std::unique_ptr<ConnectivityManager> connectivity_manager;

std::unique_ptr<MessageDispatcherInterface<MessageType>> message_dispatcher;

// utility
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue;
std::shared_ptr<DatabaseHandler> database_handler;
Expand Down Expand Up @@ -401,7 +405,7 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
std::atomic_bool stop_auth_cache_cleanup_handler;

// states
RegistrationStatusEnum registration_status;
std::atomic<RegistrationStatusEnum> registration_status;
FirmwareStatusEnum firmware_status;
// The request ID in the last firmware update status received
std::optional<int32_t> firmware_status_id;
Expand Down Expand Up @@ -453,8 +457,6 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
/// \brief optional delay to resumption of message queue after reconnecting to the CSMS
std::chrono::seconds message_queue_resume_delay = std::chrono::seconds(0);

bool send(CallError call_error);

// internal helper functions
void initialize(const std::map<int32_t, int32_t>& evse_connector_structure, const std::string& message_log_path);
void init_certificate_expiration_check_timers();
Expand Down Expand Up @@ -748,20 +750,13 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
// Functional Block P: DataTransfer
void handle_data_transfer_req(Call<DataTransferRequest> call);

// general message handling
template <class T> bool send(ocpp::Call<T> call, const bool initiated_by_trigger_message = false);

template <class T> std::future<EnhancedMessage<v201::MessageType>> send_async(ocpp::Call<T> call);

template <class T> bool send(ocpp::CallResult<T> call_result);

// Generates async sending callbacks
template <class RequestType, class ResponseType>
std::function<ResponseType(RequestType)> send_callback(MessageType expected_response_message_type) {
return [this, expected_response_message_type](auto request) {
MessageId message_id = MessageId(to_string(this->uuid_generator()));
const auto enhanced_response =
this->send_async<RequestType>(ocpp::Call<RequestType>(request, message_id)).get();
this->message_dispatcher->dispatch_call_async(ocpp::Call<RequestType>(request, message_id)).get();
if (enhanced_response.messageType != expected_response_message_type) {
throw UnexpectedMessageTypeFromCSMS(
std::string("Got unexpected message type from CSMS, expected: ") +
Expand Down
31 changes: 31 additions & 0 deletions include/ocpp/v201/message_dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Pionix GmbH and Contributors to EVerest

#pragma once

#include <ocpp/common/message_dispatcher.hpp>
#include <ocpp/v201/connectivity_manager.hpp>
#include <ocpp/v201/device_model.hpp>

namespace ocpp {
namespace v201 {

class MessageDispatcher : public MessageDispatcherInterface<MessageType> {

public:
MessageDispatcher(ocpp::MessageQueue<MessageType>& message_queue, DeviceModel& device_model,
std::atomic<RegistrationStatusEnum>& registration_status) :
message_queue(message_queue), device_model(device_model), registration_status(registration_status){};
void dispatch_call(const json& call, bool triggered = false) override;
std::future<ocpp::EnhancedMessage<MessageType>> dispatch_call_async(const json& call, bool triggered) override;
void dispatch_call_result(const json& call_result) override;
void dispatch_call_error(const json& call_error) override;

private:
ocpp::MessageQueue<MessageType>& message_queue;
DeviceModel& device_model;
std::atomic<RegistrationStatusEnum>& registration_status;
};

} // namespace v201
} // namespace ocpp
2 changes: 2 additions & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ if(LIBOCPP_ENABLE_V16)
ocpp/v16/charge_point.cpp
ocpp/v16/database_handler.cpp
ocpp/v16/charge_point_impl.cpp
ocpp/v16/message_dispatcher.cpp
ocpp/v16/smart_charging.cpp
ocpp/v16/charge_point_configuration.cpp
ocpp/v16/charge_point_state_machine.cpp
Expand Down Expand Up @@ -80,6 +81,7 @@ if(LIBOCPP_ENABLE_V201)
ocpp/v201/utils.cpp
ocpp/v201/component_state_manager.cpp
ocpp/v201/connectivity_manager.cpp
ocpp/v201/message_dispatcher.cpp
)
add_subdirectory(ocpp/v201/messages)
endif()
Expand Down
Loading