From 0dca982c8ac23b32d89eb4ca6f09d5f1e0015568 Mon Sep 17 00:00:00 2001 From: Dan Cross Date: Mon, 14 Aug 2023 20:32:58 +0000 Subject: [PATCH] Fix #750 by maintaining the Rx tail pointer At its root, #750 is about a race condition between the DMA engine and the STM32 Ethernet, and the host CPU. We can prevent this race by properly maintaining the Rx tail pointer. Some background information to contextual this change: The device and host participate in a simple protocol to drive a state machine: the host gives descriptors to the device that specify buffers to DMA incoming Ethernet frames into. The host gives a descriptor to the device by setting the ownership bit and setting the tail pointer: this driver handles the former properly, but does not handle the latter correctly. The device consumes these descriptors as frames arrive and it DMAs them host memory. In turn, the device lets the host know that it is done with a descriptor by doing two things: 1. Resetting the ownership bit in the descriptor, and 2. Sending an interrupt to the host. Internally, the device _also_ increments its "head" pointer in the Rx ring (modulo the size of the ring). If the device hits the end of the ring (as specified by the value in the Rx tail pointer register), it goes into a "Suspended" state and stops DMAing packets to the host; it resumes when the Rx tail pointer register is written again (note: writing a value that is the same as the head value on the _first_ write doesn't seem to start the device). Both device and host maintain (or, perhaps more precisely, should maintain) two pointers into the ring: a producer pointer, and a consumer pointer. One may think of this as the host producing available descriptors that are consumed by the device, and the device producing received frames that are consumed by the host. The tail pointer sets a demarkation point between the host and device: descriptors before the tail pointer are owned by the device, those after are owned by the host. The ownership bit is used to communicate to the host that the device is done with a given descriptor. Put another way, the tail pointer represents the host's producer pointer; the host's consumer pointer is simply the next descriptor in the ring after the last one that it processed. The device's consumer pointer is whatever the host set the tail pointer to, and it's producer pointer is its head pointer. If the device encounters a situation where _its_ head and tail pointer are equal, the ring is full, and the DMA engine enters the suspended state until the tail pointer is rewritten. An invariant that both sides of the protocol must maintain is that host software ensures that it's producer pointer (that is, what it sets the tail pointer to) is always equal to or (ie, the ring is full), or greater than (ie, there is space available in the ring) to the devices's producer pointer (that is, the devices's "head" pointer). Of course, we talk about an ordering relationship here, but all of this is happening modulo the size of the ring, but that's a detail. Note that there are two cases here: 1. The first time you write to the tail pointer to set things in motion at the beginning of time, and 2. Every other time. For the first time, this change writes the address of the last valid descriptor in the ring into the tail pointer register (not one beyond the last!), as we have observed there is no non-racy way to provide the full ring to the device immediately after reset: writing the beginning of the ring to the tail pointer does not start the peripheral, and writing just beyond the end of it open us up to a race against updating the first descriptor (until the first time the tail pointer is reset). After consuming and resetting a descriptor on the host, we write the address of the next descriptor in the ring to the tail pointer, indicating that the just-processed descriptor is available to the device. As we do this for every received frame we do not starve the ring. In this scenario, the device follows the host around the ring; the problem with the first write is solved since, after the first frame reception, the device's head pointer will be somewhere beyond the first descriptor in the ring. Thus, writing the tail pointer to the address of (say) the 0'th element is fine, since that won't be equal to the head pointer on the peripheral. Similarly, all available descriptors can be consumed by the device. Finally, this allows for some simplification with respect to barriers: since the tail pointer indicates which descriptors are owned by the device and which by the host, the host can batch updates to the descriptors and do a single flush at the end of its processing, write before writing to the tail pointer. This was already done in `rx_notify`. A similar change for the Tx ring has also been sent. I have validated that, whereas I could reproduce this issue in a demo branch prior to this change, I cannot reproduce it with this change. --- drv/stm32h7-eth/src/lib.rs | 13 ++++++++----- drv/stm32h7-eth/src/ring.rs | 24 ++++++++++++++---------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/drv/stm32h7-eth/src/lib.rs b/drv/stm32h7-eth/src/lib.rs index 79e8f1a371..0be985597d 100644 --- a/drv/stm32h7-eth/src/lib.rs +++ b/drv/stm32h7-eth/src/lib.rs @@ -13,6 +13,7 @@ #![no_std] use core::convert::TryFrom; +use core::sync::atomic::{self, Ordering}; #[cfg(feature = "h743")] use stm32h7::stm32h743 as device; @@ -157,8 +158,10 @@ impl Ethernet { // above. dma.dmactx_dtpr .write(|w| unsafe { w.tdt().bits(tx_ring.tail_ptr() as u32 >> 2) }); - dma.dmacrx_dtpr - .write(|w| unsafe { w.rdt().bits(rx_ring.tail_ptr() as u32 >> 2) }); + atomic::fence(Ordering::Release); + dma.dmacrx_dtpr.write(|w| unsafe { + w.rdt().bits(rx_ring.first_tail_ptr() as u32 >> 2) + }); // Central DMA config: @@ -318,11 +321,11 @@ impl Ethernet { fn rx_notify(&self) { // We have dequeued a packet! The hardware might not realize there is // room in the RX queue now. Poke it. - core::sync::atomic::fence(core::sync::atomic::Ordering::Release); + atomic::fence(Ordering::Release); // Poke the tail pointer so the hardware knows to recheck (dropping two - // bottom bits because svd2rust) + // bottom bits because svd2rust). self.dma.dmacrx_dtpr.write(|w| unsafe { - w.rdt().bits(self.rx_ring.tail_ptr() as u32 >> 2) + w.rdt().bits(self.rx_ring.next_tail_ptr() as u32 >> 2) }); } diff --git a/drv/stm32h7-eth/src/ring.rs b/drv/stm32h7-eth/src/ring.rs index 733e6a5fd4..a2125377cb 100644 --- a/drv/stm32h7-eth/src/ring.rs +++ b/drv/stm32h7-eth/src/ring.rs @@ -448,12 +448,18 @@ impl RxRing { self.storage.as_ptr() } - /// Returns a pointer to the byte just past the end of the `RxDesc` ring. - /// This too gets loaded into the DMA controller, so that it knows what - /// section of the ring is initialized and can be read. (The answer is "all - /// of it.") - pub fn tail_ptr(&self) -> *const RxDesc { - self.storage.as_ptr_range().end + /// Returns a pointer to the last valid descriptor in the ring. + /// We load this into the DMA controller when we first start operation so + /// that it knows what section of the ring is initialized and can be read. + /// The answer is "all of it," but we can't exactly specify that. + pub fn first_tail_ptr(&self) -> *const RxDesc { + self.base_ptr().wrapping_add(self.len() - 1) + } + + /// Returns a pointer to the "next" descriptor in the ring. We load this + /// into the device so that the DMA engine knows what descriptors are free. + pub fn next_tail_ptr(&self) -> *const RxDesc { + self.base_ptr().wrapping_add(self.next.get()) } /// Returns the count of entries in the descriptor ring / buffers in the @@ -468,12 +474,10 @@ impl RxRing { fn set_descriptor(d: &RxDesc, buffer: *mut [u8; BUFSZ]) { d.rdes[0].store(buffer as u32, Ordering::Relaxed); d.rdes[1].store(0, Ordering::Relaxed); - d.rdes[2].store(0, Ordering::Release); - // See hubris#750 for why we need Ordering::Release and this delay - cortex_m::asm::delay(16); + d.rdes[2].store(0, Ordering::Relaxed); let rdes3 = 1 << RDES3_OWN_BIT | 1 << RDES3_IOC_BIT | 1 << RDES3_BUF1_VALID_BIT; - d.rdes[3].store(rdes3, Ordering::Release); // <-- release + d.rdes[3].store(rdes3, Ordering::Relaxed); } }