Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K01 ChargePoint refactorings #831

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 79 additions & 59 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,57 @@ bool allowed_to_send_message(const ControlMessage<M>& message, const DateTime& t
return true;
}

template <typename M> class MessageQueueInterface {
public:
virtual ~MessageQueueInterface() {
}
virtual void start() = 0;
virtual void reset_next_message_to_send() = 0;
virtual void get_persisted_messages_from_db(bool ignore_security_event_notifications = false) = 0;

/// \brief pushes a new \p call message onto the message queue
template <class T> void push(Call<T> call, const bool stall_until_accepted = false) {
json call_json = call;
this->push(call_json, stall_until_accepted);
}
virtual void push(const json& message, const bool stall_until_accepted = false) = 0;
template <class T> void push(CallResult<T> call_result) {
json call_result_json = call_result;
this->push_call_result(call_result_json, call_result.uniqueId);
}
virtual void push_call_result(const json& call_result_json, const MessageId& unique_id) = 0;
virtual void push(CallError call_error) = 0;
template <class T> std::future<EnhancedMessage<M>> push_async(Call<T> call) {
auto message = std::make_shared<ControlMessage<M>>(call);
return push_async_internal(message);
}
virtual std::future<EnhancedMessage<M>> push_async_internal(std::shared_ptr<ControlMessage<M>> message) = 0;
virtual EnhancedMessage<M> receive(std::string_view message) = 0;
virtual void reset_in_flight() = 0;
virtual void handle_call_result(EnhancedMessage<M>& enhanced_message) = 0;
virtual void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) = 0;
virtual void stop() = 0;
virtual void pause() = 0;
virtual void resume(std::chrono::seconds delay_on_reconnect) = 0;
virtual void set_registration_status_accepted() = 0;
virtual bool is_transaction_message_queue_empty() = 0;
virtual bool contains_transaction_messages(const CiString<36> transaction_id) = 0;
virtual bool contains_stop_transaction_message(const int32_t transaction_id) = 0;
virtual void update_transaction_message_attempts(const int transaction_message_attempts) = 0;
virtual void update_transaction_message_retry_interval(const int transaction_message_retry_interval) = 0;
virtual void update_message_timeout(const int timeout) = 0;
virtual MessageId createMessageId() = 0;
virtual void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id) = 0;
virtual void add_meter_value_message_id(const std::string& start_transaction_message_id,
const std::string& meter_value_message_id) = 0;
virtual void notify_start_transaction_handled(const std::string& start_transaction_message_id,
const int32_t transaction_id) = 0;
virtual M string_to_messagetype(const std::string& s) = 0;
virtual std::string messagetype_to_string(M m) = 0;
};

/// \brief contains a message queue that makes sure that OCPPs synchronicity requirements are met
template <typename M> class MessageQueue {
template <typename M> class MessageQueue : public MessageQueueInterface<M> {
private:
MessageQueueConfig<M> config;
std::shared_ptr<ocpp::common::DatabaseHandlerCommon> database_handler;
Expand Down Expand Up @@ -417,7 +466,7 @@ template <typename M> class MessageQueue {
MessageQueue(send_callback, config, {}, databaseHandler) {
}

void start() {
void start() override {
this->worker_thread = std::thread([this]() {
// TODO(kai): implement message timeout
while (this->running) {
Expand Down Expand Up @@ -577,13 +626,13 @@ template <typename M> class MessageQueue {
}

/// \brief Resets next message to send. Can be used in situation when we dont want to reply to a CALL message
void reset_next_message_to_send() {
void reset_next_message_to_send() override {
std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
this->next_message_to_send.reset();
}

/// \brief Gets all persisted messages of normal message queue and persisted message queue from the database
void get_persisted_messages_from_db(bool ignore_security_event_notifications = false) {
void get_persisted_messages_from_db(bool ignore_security_event_notifications = false) override {
std::vector<QueueType> queue_types = {QueueType::Normal, QueueType::Transaction};
// do for Normal and Transaction queue
for (const auto queue_type : queue_types) {
Expand Down Expand Up @@ -627,16 +676,7 @@ template <typename M> class MessageQueue {
}
}

/// \brief pushes a new \p call message onto the message queue
template <class T> void push(Call<T> call, const bool stall_until_accepted = false) {
if (!running) {
return;
}
json call_json = call;
push(call_json, stall_until_accepted);
}

void push(const json& message, const bool stall_until_accepted = false) {
void push(const json& message, const bool stall_until_accepted = false) override {
if (!running) {
return;
}
Expand All @@ -661,16 +701,16 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_result message over the websocket
template <class T> void push(CallResult<T> call_result) {
void push_call_result(const json& call_result_json, const MessageId& unique_id) override {
gberardi-pillar marked this conversation as resolved.
Show resolved Hide resolved
if (!running) {
return;
}

this->send_callback(call_result);
this->send_callback(call_result_json);
{
std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
if (next_message_to_send.has_value()) {
if (next_message_to_send.value() == call_result.uniqueId) {
if (next_message_to_send.value() == unique_id) {
next_message_to_send.reset();
}
}
Expand All @@ -680,7 +720,7 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_error message over the websocket
void push(CallError call_error) {
void push(CallError call_error) override {
if (!running) {
return;
}
Expand All @@ -700,9 +740,7 @@ template <typename M> class MessageQueue {

/// \brief pushes a new \p call message onto the message queue
/// \returns a future from which the CallResult can be extracted
template <class T> std::future<EnhancedMessage<M>> push_async(Call<T> call) {
auto message = std::make_shared<ControlMessage<M>>(call);

std::future<EnhancedMessage<M>> push_async_internal(std::shared_ptr<ControlMessage<M>> message) override {
if (!running) {
auto enhanced_message = EnhancedMessage<M>();
enhanced_message.offline = true;
Expand Down Expand Up @@ -730,7 +768,7 @@ template <typename M> class MessageQueue {
/// \brief Enhances a received \p json_message with additional meta information, checks if it is a valid CallResult
/// with a corresponding Call message on top of the queue
/// \returns the enhanced message
EnhancedMessage<M> receive(std::string_view message) {
EnhancedMessage<M> receive(std::string_view message) override {
EnhancedMessage<M> enhanced_message;

enhanced_message.message = json::parse(message);
Expand Down Expand Up @@ -783,12 +821,12 @@ template <typename M> class MessageQueue {
return enhanced_message;
}

void reset_in_flight() {
void reset_in_flight() override {
this->in_flight = nullptr;
this->in_flight_timeout_timer.stop();
}

void handle_call_result(EnhancedMessage<M>& enhanced_message) {
void handle_call_result(EnhancedMessage<M>& enhanced_message) override {
if (this->in_flight->uniqueId() == enhanced_message.uniqueId) {
enhanced_message.call_message = this->in_flight->message;
enhanced_message.messageType = this->string_to_messagetype(
Expand Down Expand Up @@ -822,7 +860,7 @@ template <typename M> class MessageQueue {
}

/// \brief Handles a message timeout or a CALLERROR. \p enhanced_message_opt is set only in case of CALLERROR
void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) {
void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) override {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
// We got a timeout iff enhanced_message_opt is empty. Otherwise, enhanced_message_opt contains the CallError.
bool timeout = !enhanced_message_opt.has_value();
Expand Down Expand Up @@ -925,7 +963,7 @@ template <typename M> class MessageQueue {
}

/// \brief Stops the message queue
void stop() {
void stop() override {
EVLOG_debug << "stop()";
// stop the running thread
this->running = false;
Expand All @@ -935,7 +973,7 @@ template <typename M> class MessageQueue {
}

/// \brief Pauses the message queue
void pause() {
void pause() override {
EVLOG_debug << "pause()";
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->pause_resume_ctr++;
Expand All @@ -947,7 +985,7 @@ template <typename M> class MessageQueue {
}

/// \brief Resumes the message queue
void resume(std::chrono::seconds delay_on_reconnect) {
void resume(std::chrono::seconds delay_on_reconnect) override {
EVLOG_debug << "resume() called";
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
if (!this->paused) {
Expand All @@ -966,77 +1004,59 @@ template <typename M> class MessageQueue {
}
}

void set_registration_status_accepted() {
void set_registration_status_accepted() override {
{
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->is_registration_status_accepted = true;
}
this->cv.notify_all();
}

bool is_transaction_message_queue_empty() {
bool is_transaction_message_queue_empty() override {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
return this->transaction_message_queue.empty();
}

bool contains_transaction_messages(const CiString<36> transaction_id) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v201::MessageType::TransactionEvent) {
v201::TransactionEventRequest req = control_message->message.at(CALL_PAYLOAD);
if (req.transactionInfo.transactionId == transaction_id) {
return true;
}
}
}
gberardi-pillar marked this conversation as resolved.
Show resolved Hide resolved
bool contains_transaction_messages(const CiString<36> transaction_id) override {
return false;
}

bool contains_stop_transaction_message(const int32_t transaction_id) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v16::MessageType::StopTransaction) {
v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
if (req.transactionId == transaction_id) {
return true;
}
}
}
gberardi-pillar marked this conversation as resolved.
Show resolved Hide resolved
bool contains_stop_transaction_message(const int32_t transaction_id) override {
return false;
}

/// \brief Set transaction_message_attempts to given \p transaction_message_attempts
void update_transaction_message_attempts(const int transaction_message_attempts) {
void update_transaction_message_attempts(const int transaction_message_attempts) override {
this->config.transaction_message_attempts = transaction_message_attempts;
}

/// \brief Set transaction_message_retry_interval to given \p transaction_message_retry_interval in seconds
void update_transaction_message_retry_interval(const int transaction_message_retry_interval) {
void update_transaction_message_retry_interval(const int transaction_message_retry_interval) override {
this->config.transaction_message_retry_interval = transaction_message_retry_interval;
}

/// \brief Set message_timeout to given \p timeout (in seconds)
void update_message_timeout(const int timeout) {
void update_message_timeout(const int timeout) override {
this->config.message_timeout_seconds = timeout;
}

/// \brief Creates a unique message ID
/// \returns the unique message ID
MessageId createMessageId() {
MessageId createMessageId() override {
std::stringstream s;
s << this->uuid_generator();
return MessageId(s.str());
}

/// \brief Adds the given \p transaction_id to the message_id_transaction_id_map using the key \p
/// stop_transaction_message_id
void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id) {
void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id) override {
EVLOG_debug << "adding " << stop_transaction_message_id << " for transaction " << transaction_id;
this->message_id_transaction_id_map[stop_transaction_message_id] = transaction_id;
}

void add_meter_value_message_id(const std::string& start_transaction_message_id,
const std::string& meter_value_message_id) {
const std::string& meter_value_message_id) override {
if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
this->start_transaction_mid_meter_values_mid_map.at(start_transaction_message_id)
.push_back(meter_value_message_id);
Expand All @@ -1048,7 +1068,7 @@ template <typename M> class MessageQueue {
}

void notify_start_transaction_handled(const std::string& start_transaction_message_id,
const int32_t transaction_id) {
const int32_t transaction_id) override {
this->cv.notify_one();

// replace transaction id in meter values if start_transaction_message_id is present in map
Expand All @@ -1069,8 +1089,8 @@ template <typename M> class MessageQueue {
this->start_transaction_mid_meter_values_mid_map.erase(start_transaction_message_id);
}

M string_to_messagetype(const std::string& s);
std::string messagetype_to_string(M m);
M string_to_messagetype(const std::string& s) override;
std::string messagetype_to_string(M m) override;
};

} // namespace ocpp
Expand Down
2 changes: 1 addition & 1 deletion include/ocpp/v16/charge_point_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ChargePointImpl : ocpp::ChargingStationBase {

std::unique_ptr<Websocket> websocket;
Everest::SteadyTimer websocket_timer;
std::unique_ptr<MessageQueue<v16::MessageType>> message_queue;
std::unique_ptr<MessageQueueInterface<v16::MessageType>> message_queue;
std::map<int32_t, std::shared_ptr<Connector>> connectors;
std::unique_ptr<SmartChargingHandler> smart_charging_handler;
int32_t heartbeat_interval;
Expand Down
7 changes: 4 additions & 3 deletions include/ocpp/v201/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
std::unique_ptr<ConnectivityManager> connectivity_manager;

// utility
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue;
std::shared_ptr<MessageQueueInterface<v201::MessageType>> message_queue;
std::shared_ptr<DatabaseHandler> database_handler;

std::map<int32_t, AvailabilityChange> scheduled_change_availability_requests;
Expand Down Expand Up @@ -750,8 +750,9 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
/// \param callbacks Callbacks that will be registered for ChargePoint
ChargePoint(const std::map<int32_t, int32_t>& evse_connector_structure, std::shared_ptr<DeviceModel> device_model,
std::shared_ptr<DatabaseHandler> database_handler,
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue, const std::string& message_log_path,
const std::shared_ptr<EvseSecurity> evse_security, const Callbacks& callbacks);
std::shared_ptr<MessageQueueInterface<v201::MessageType>> message_queue,
const std::string& message_log_path, const std::shared_ptr<EvseSecurity> evse_security,
const Callbacks& callbacks);

/// \brief Construct a new ChargePoint object
/// \param evse_connector_structure Map that defines the structure of EVSE and connectors of the chargepoint. The
Expand Down
8 changes: 4 additions & 4 deletions include/ocpp/v201/smart_charging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ class SmartChargingHandlerInterface {
class SmartChargingHandler : public SmartChargingHandlerInterface {
private:
EvseManagerInterface& evse_manager;
std::shared_ptr<DeviceModel>& device_model;
DeviceModel& device_model;

std::shared_ptr<ocpp::v201::DatabaseHandler> database_handler;
ocpp::v201::DatabaseHandler& database_handler;

public:
SmartChargingHandler(EvseManagerInterface& evse_manager, std::shared_ptr<DeviceModel>& device_model,
std::shared_ptr<ocpp::v201::DatabaseHandler> database_handler);
SmartChargingHandler(EvseManagerInterface& evse_manager, DeviceModel& device_model,
ocpp::v201::DatabaseHandler& database_handler);

///
/// \brief for the given \p transaction_id removes the associated charging profile.
Expand Down
13 changes: 13 additions & 0 deletions lib/ocpp/v16/message_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ template <> std::string MessageQueue<v16::MessageType>::messagetype_to_string(v1
return v16::conversions::messagetype_to_string(m);
}

template <> bool MessageQueue<v16::MessageType>::contains_stop_transaction_message(const int32_t transaction_id) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v16::MessageType::StopTransaction) {
v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
if (req.transactionId == transaction_id) {
return true;
}
}
}
return false;
}

} // namespace ocpp
4 changes: 2 additions & 2 deletions lib/ocpp/v201/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static DisplayMessage message_info_to_display_message(const MessageInfo& message

ChargePoint::ChargePoint(const std::map<int32_t, int32_t>& evse_connector_structure,
std::shared_ptr<DeviceModel> device_model, std::shared_ptr<DatabaseHandler> database_handler,
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue,
std::shared_ptr<MessageQueueInterface<v201::MessageType>> message_queue,
const std::string& message_log_path, const std::shared_ptr<EvseSecurity> evse_security,
const Callbacks& callbacks) :
ocpp::ChargingStationBase(evse_security),
Expand Down Expand Up @@ -1132,7 +1132,7 @@ void ChargePoint::initialize(const std::map<int32_t, int32_t>& evse_connector_st
transaction_meter_value_callback, this->callbacks.pause_charging_callback);

this->smart_charging_handler =
std::make_shared<SmartChargingHandler>(*this->evse_manager, this->device_model, this->database_handler);
std::make_shared<SmartChargingHandler>(*this->evse_manager, *this->device_model, *this->database_handler);

this->configure_message_logging_format(message_log_path);
this->monitoring_updater.start_monitoring();
Expand Down
Loading
Loading