From a40e8dca88660862bdb9f3a0d43a5c8d6e1e0707 Mon Sep 17 00:00:00 2001 From: Ryan Herbst Date: Wed, 9 Aug 2023 15:58:05 -0700 Subject: [PATCH 1/2] Add ensureSingleBuffer in slave class --- include/rogue/interfaces/stream/Slave.h | 14 ++++++++++++ src/rogue/interfaces/stream/Slave.cpp | 29 +++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/include/rogue/interfaces/stream/Slave.h b/include/rogue/interfaces/stream/Slave.h index c730e35ef..6af34787e 100644 --- a/include/rogue/interfaces/stream/Slave.h +++ b/include/rogue/interfaces/stream/Slave.h @@ -126,6 +126,20 @@ class Slave : public rogue::interfaces::stream::Pool, */ uint64_t getByteCount(); + //! Ensure frame is a single buffer + /** This method makes sure the passed frame is composed of a single buffer. + * If the reqNew flag is true and the passed frame is not a single buffer, a + * new frame will be requested and the frame data will be copied, with the passed + * frame pointer being updated. The return value will indicate if the frame is a + * single buffer at the end of the process. A frame lock must be held when this + * method is called. + * + * Not exposed to Python + * @param frame Reference to frame pointer (FramePtr) + * @param rewEn Flag to determine if a new frame should be requested + */ + bool ensureSingleBuffer(std::shared_ptr& frame, bool reqEn); + #ifndef NO_PYTHON //! Support << operator in python diff --git a/src/rogue/interfaces/stream/Slave.cpp b/src/rogue/interfaces/stream/Slave.cpp index 68fa6b323..3ea134b9e 100644 --- a/src/rogue/interfaces/stream/Slave.cpp +++ b/src/rogue/interfaces/stream/Slave.cpp @@ -140,6 +140,35 @@ uint64_t ris::Slave::getByteCount() { return (frameBytes_); } +// Ensure passed frame is a single buffer +bool ris::Slave::ensureSingleBuffer(ris::FramePtr& frame, bool reqEn) { + // Frame is a single buffer + if (frame->bufferCount() == 1) + return true; + + else if (!reqEn) + return false; + + else { + uint32_t size = frame->getPayload(); + ris::FramePtr nFrame = acceptReq(size, true); + + if (nFrame->bufferCount() != 1) + return false; + + else { + nFrame->setPayload(size); + + ris::FrameIterator srcIter = frame->begin(); + ris::FrameIterator dstIter = nFrame->begin(); + + ris::copyFrame(srcIter, size, dstIter); + frame = nFrame; + return true; + } + } +} + void ris::Slave::setup_python() { #ifndef NO_PYTHON From f7ee6e9a8084189e603353ce68d308865bda47d5 Mon Sep 17 00:00:00 2001 From: Ryan Herbst Date: Wed, 9 Aug 2023 23:47:39 -0700 Subject: [PATCH 2/2] Add local frame request --- include/rogue/interfaces/stream/Slave.h | 9 +++++++++ src/rogue/hardware/axi/AxiStreamDma.cpp | 2 +- src/rogue/interfaces/stream/Slave.cpp | 7 ++++++- src/rogue/interfaces/stream/TcpCore.cpp | 2 +- src/rogue/protocols/udp/Client.cpp | 4 ++-- src/rogue/protocols/udp/Server.cpp | 4 ++-- 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/include/rogue/interfaces/stream/Slave.h b/include/rogue/interfaces/stream/Slave.h index 6af34787e..029d44b1e 100644 --- a/include/rogue/interfaces/stream/Slave.h +++ b/include/rogue/interfaces/stream/Slave.h @@ -140,6 +140,15 @@ class Slave : public rogue::interfaces::stream::Pool, */ bool ensureSingleBuffer(std::shared_ptr& frame, bool reqEn); + // Process a local frame request + /* Method to service a local frame request + * + * size Minimum size for requested Frame, larger Frame may be allocated + * zeroCopyEn Flag which indicates if a zero copy mode Frame is allowed. + * Newly allocated Frame pointer (FramePtr) + */ + std::shared_ptr reqLocalFrame(uint32_t size, bool zeroCopyEn); + #ifndef NO_PYTHON //! Support << operator in python diff --git a/src/rogue/hardware/axi/AxiStreamDma.cpp b/src/rogue/hardware/axi/AxiStreamDma.cpp index 702aec446..d7330925a 100644 --- a/src/rogue/hardware/axi/AxiStreamDma.cpp +++ b/src/rogue/hardware/axi/AxiStreamDma.cpp @@ -270,7 +270,7 @@ ris::FramePtr rha::AxiStreamDma::acceptReq(uint32_t size, bool zeroCopyEn) { // Zero copy is disabled. Allocate from memory. if (zeroCopyEn == false || desc_->rawBuff == NULL) { - frame = ris::Pool::acceptReq(size, false); + frame = reqLocalFrame(size, false); } // Allocate zero copy buffers from driver diff --git a/src/rogue/interfaces/stream/Slave.cpp b/src/rogue/interfaces/stream/Slave.cpp index 3ea134b9e..3ba5107c7 100644 --- a/src/rogue/interfaces/stream/Slave.cpp +++ b/src/rogue/interfaces/stream/Slave.cpp @@ -151,7 +151,7 @@ bool ris::Slave::ensureSingleBuffer(ris::FramePtr& frame, bool reqEn) { else { uint32_t size = frame->getPayload(); - ris::FramePtr nFrame = acceptReq(size, true); + ris::FramePtr nFrame = reqLocalFrame(size, true); if (nFrame->bufferCount() != 1) return false; @@ -169,6 +169,11 @@ bool ris::Slave::ensureSingleBuffer(ris::FramePtr& frame, bool reqEn) { } } +// Process a local frame request +ris::FramePtr ris::Slave::reqLocalFrame(uint32_t size, bool zeroCopyEn) { + return ris::Pool::acceptReq(size,zeroCopyEn); +} + void ris::Slave::setup_python() { #ifndef NO_PYTHON diff --git a/src/rogue/interfaces/stream/TcpCore.cpp b/src/rogue/interfaces/stream/TcpCore.cpp index 00523de97..2a5d91b4a 100644 --- a/src/rogue/interfaces/stream/TcpCore.cpp +++ b/src/rogue/interfaces/stream/TcpCore.cpp @@ -271,7 +271,7 @@ void ris::TcpCore::runThread() { size = zmq_msg_size(&(msg[3])); // Generate frame - frame = ris::Pool::acceptReq(size, false); + frame = reqLocalFrame(size, false); frame->setPayload(size); // Copy data diff --git a/src/rogue/protocols/udp/Client.cpp b/src/rogue/protocols/udp/Client.cpp index d8ad96dd3..579079d8a 100644 --- a/src/rogue/protocols/udp/Client.cpp +++ b/src/rogue/protocols/udp/Client.cpp @@ -197,7 +197,7 @@ void rpu::Client::runThread(std::weak_ptr lockPtr) { udpLog_->logThreadId(); // Preallocate frame - frame = ris::Pool::acceptReq(maxPayload(), false); + frame = reqLocalFrame(maxPayload(), false); while (threadEn_) { // Attempt receive @@ -215,7 +215,7 @@ void rpu::Client::runThread(std::weak_ptr lockPtr) { } // Get new frame - frame = ris::Pool::acceptReq(maxPayload(), false); + frame = reqLocalFrame(maxPayload(), false); } else { // Setup fds for select call FD_ZERO(&fds); diff --git a/src/rogue/protocols/udp/Server.cpp b/src/rogue/protocols/udp/Server.cpp index 7aa0826ee..03fec128a 100644 --- a/src/rogue/protocols/udp/Server.cpp +++ b/src/rogue/protocols/udp/Server.cpp @@ -200,7 +200,7 @@ void rpu::Server::runThread(std::weak_ptr lockPtr) { udpLog_->logThreadId(); // Preallocate frame - frame = ris::Pool::acceptReq(maxPayload(), false); + frame = reqLocalFrame(maxPayload(), false); while (threadEn_) { // Attempt receive @@ -219,7 +219,7 @@ void rpu::Server::runThread(std::weak_ptr lockPtr) { } // Get new frame - frame = ris::Pool::acceptReq(maxPayload(), false); + frame = reqLocalFrame(maxPayload(), false); // Lock before updating address if (memcmp(&remAddr_, &tmpAddr, sizeof(remAddr_)) != 0) {