diff --git a/sol_usb/gateware/architecture/car.py b/sol_usb/gateware/architecture/car.py index e1563ef73..27a283b68 100644 --- a/sol_usb/gateware/architecture/car.py +++ b/sol_usb/gateware/architecture/car.py @@ -13,7 +13,6 @@ ClockDomain, ClockSignal, Elaboratable, Instance, Module, ResetSignal, Signal ) -from ..test import SolGatewareTestCase, sync_test_case from ..utils.cdc import stretch_strobe_signal @@ -103,41 +102,6 @@ def elaborate(self, platform): return m - - -class PHYResetControllerTest(SolGatewareTestCase): - FRAGMENT_UNDER_TEST = PHYResetController - - def initialize_signals(self): - yield self.dut.trigger.eq(0) - - @sync_test_case - def test_power_on_reset(self): - - # - # After power-on, the PHY should remain in reset for a while. - # - yield - self.assertEqual((yield self.dut.phy_reset), 1) - - yield from self.advance_cycles(30) - self.assertEqual((yield self.dut.phy_reset), 1) - - yield from self.advance_cycles(60) - self.assertEqual((yield self.dut.phy_reset), 1) - - # - # Then, after the relevant reset time, it should resume being unasserted. - # - yield from self.advance_cycles(31) - self.assertEqual((yield self.dut.phy_reset), 0) - self.assertEqual((yield self.dut.phy_stop), 1) - - yield from self.advance_cycles(120) - self.assertEqual((yield self.dut.phy_stop), 0) - - - class SolDomainGenerator(Elaboratable, metaclass = ABCMeta): ''' Helper that generates the clock domains used in a SOL board. diff --git a/sol_usb/gateware/memory.py b/sol_usb/gateware/memory.py index 92509534c..dc93cd949 100644 --- a/sol_usb/gateware/memory.py +++ b/sol_usb/gateware/memory.py @@ -8,13 +8,9 @@ This module contains definitions of memory units that work well for USB applications. ''' -import unittest - from torii import Elaboratable, Memory, Module, Signal from torii.hdl.xfrm import DomainRenamer -from .test import SolGatewareTestCase, sync_test_case - class TransactionalizedFIFO(Elaboratable): ''' @@ -226,128 +222,3 @@ def elaborate(self, platform): m = DomainRenamer({'sync': self.domain})(m) return m - - - -class TransactionalizedFIFOTest(SolGatewareTestCase): - FRAGMENT_UNDER_TEST = TransactionalizedFIFO - FRAGMENT_ARGUMENTS = {'width': 8, 'depth': 16} - - def initialize_signals(self): - yield self.dut.write_en.eq(0) - - @sync_test_case - def test_simple_fill(self): - dut = self.dut - - # Our FIFO should start off empty; and with a full depth of free space. - self.assertEqual((yield dut.empty), 1) - self.assertEqual((yield dut.full), 0) - self.assertEqual((yield dut.space_available), 16) - - # If we add a byte to the queue... - yield dut.write_data.eq(0xAA) - yield from self.pulse(dut.write_en) - - # ... we should have less space available ... - self.assertEqual((yield dut.space_available), 15) - - # ... but we still should be 'empty', as we won't have data to read until we commit. - self.assertEqual((yield dut.empty), 1) - - # Once we _commit_ our write, we should suddenly have data to read. - yield from self.pulse(dut.write_commit) - self.assertEqual((yield dut.empty), 0) - - # If we read a byte, we should see the FIFO become empty... - yield from self.pulse(dut.read_en) - self.assertEqual((yield dut.empty), 1) - - # ... but we shouldn't see more space become available until we commit the read. - self.assertEqual((yield dut.space_available), 15) - yield from self.pulse(dut.read_commit) - self.assertEqual((yield dut.space_available), 16) - - # If we write 16 more bytes of data... - yield dut.write_en.eq(1) - for i in range(16): - yield dut.write_data.eq(i) - yield - yield dut.write_en.eq(0) - - # ... our buffer should be full, but also empty. - # This paradox exists as we've filled our buffer with uncommitted data. - yield - self.assertEqual((yield dut.full), 1) - self.assertEqual((yield dut.empty), 1) - - # Once we _commit_ our data, it should suddenly stop being empty. - yield from self.pulse(dut.write_commit) - self.assertEqual((yield dut.empty), 0) - - # Reading a byte _without committing_ shouldn't change anything about empty/full/space-available... - yield from self.pulse(dut.read_en) - self.assertEqual((yield dut.empty), 0) - self.assertEqual((yield dut.full), 1) - self.assertEqual((yield dut.space_available), 0) - - # ... but committing should increase our space available by one, and make our buffer no longer full. - yield from self.pulse(dut.read_commit) - self.assertEqual((yield dut.empty), 0) - self.assertEqual((yield dut.full), 0) - self.assertEqual((yield dut.space_available), 1) - - # Reading/committing another byte should increment our space available. - yield from self.pulse(dut.read_en) - yield from self.pulse(dut.read_commit) - self.assertEqual((yield dut.space_available), 2) - - # Writing data into the buffer should then fill it back up again... - yield dut.write_en.eq(1) - for i in range(2): - yield dut.write_data.eq(i) - yield - yield dut.write_en.eq(0) - - # ... meaning it will again be full, and have no space remaining. - yield - self.assertEqual((yield dut.full), 1) - self.assertEqual((yield dut.space_available), 0) - - # If we _discard_ this data, we should go back to having two bytes available. - yield from self.pulse(dut.write_discard) - self.assertEqual((yield dut.full), 0) - self.assertEqual((yield dut.space_available), 2) - - # If we read the data that's remaining in the fifo... - yield dut.read_en.eq(1) - for i in range(2, 16): - yield - self.assertEqual((yield dut.read_data), i) - yield dut.read_en.eq(0) - - # ... our buffer should again be empty. - yield - self.assertEqual((yield dut.empty), 1) - self.assertEqual((yield dut.space_available), 2) - - # If we _discard_ our current read, we should then see our buffer no longer empty... - yield from self.pulse(dut.read_discard) - self.assertEqual((yield dut.empty), 0) - - # and we should be able to read the same data again. - yield dut.read_en.eq(1) - for i in range(2, 16): - yield - self.assertEqual((yield dut.read_data), i) - yield dut.read_en.eq(0) - - # On committing this, we should see a buffer that is no longer full, and is really empty. - yield from self.pulse(dut.read_commit) - self.assertEqual((yield dut.empty), 1) - self.assertEqual((yield dut.full), 0) - self.assertEqual((yield dut.space_available), 16) - - -if __name__ == '__main__': - unittest.main() diff --git a/sol_usb/gateware/utils/cdc.py b/sol_usb/gateware/utils/cdc.py index 44544a31c..07fb28cda 100644 --- a/sol_usb/gateware/utils/cdc.py +++ b/sol_usb/gateware/utils/cdc.py @@ -7,17 +7,13 @@ ''' Helpers for clock domain crossings. ''' -import unittest -import warnings -from unittest import TestCase + from torii import Module, Record, Signal -from torii.hdl.rec import DIR_FANIN, DIR_FANOUT +from torii.hdl.rec import DIR_FANOUT from torii.lib.cdc import FFSynchronizer from torii.lib.io import Pin -from ..test import SolGatewareTestCase, sync_test_case - def synchronize(m, signal, *, output = None, o_domain = 'sync', stages = 2): ''' @@ -88,36 +84,6 @@ def create_synchronizer(signal, output): return output - -class SynchronizedTest(TestCase): - - def test_signal(self): - m = Module() - synchronize(m, Signal()) - - def test_directional_record(self): - m = Module() - - record = Record([ - ('sig_in', 1, DIR_FANIN), - ('sig_out', 1, DIR_FANOUT) - ]) - synchronize(m, record) - - def test_nested_record(self): - m = Module() - - record = Record([ - ('sig_in', 1, DIR_FANIN), - ('sig_out', 1, DIR_FANOUT), - ('nested', [ - ('subsig_in', 1, DIR_FANIN), - ('subsig_out', 1, DIR_FANOUT), - ]) - ]) - synchronize(m, record) - - def stretch_strobe_signal(m, strobe, *, to_cycles, output = None, domain = None, allow_delay = False): ''' Stretches a given strobe to the given number of cycles. @@ -169,54 +135,3 @@ def stretch_strobe_signal(m, strobe, *, to_cycles, output = None, domain = None, m.d.comb += output.eq(strobe | (delayed_strobe != 0)) return output - - -class StrobeStretcherTest(SolGatewareTestCase): - ''' Test case for our strobe stretcher function. ''' - - - def instantiate_dut(self): - m = Module() - - # Create a module that only has our stretched strobe signal. - m.strobe_in = Signal() - m.stretched_strobe = stretch_strobe_signal(m, m.strobe_in, to_cycles = 2) - - return m - - - def initialize_signals(self): - yield self.dut.strobe_in.eq(0) - - - @sync_test_case - def test_stretch(self): - - # Ensure our stretched strobe stays 0 until it sees an input. - yield - self.assertEqual((yield self.dut.stretched_strobe), 0) - yield - self.assertEqual((yield self.dut.stretched_strobe), 0) - - # Apply our strobe, and validate that we immediately see a '1'... - yield self.dut.strobe_in.eq(1) - yield - self.assertEqual((yield self.dut.stretched_strobe), 1) - - # ... ensure that 1 lasts for a second cycle ... - yield self.dut.strobe_in.eq(0) - yield - self.assertEqual((yield self.dut.stretched_strobe), 1) - - # ... and then returns to 0. - yield - self.assertEqual((yield self.dut.stretched_strobe), 0) - - yield - self.assertEqual((yield self.dut.stretched_strobe), 0) - - - -if __name__ == '__main__': - warnings.filterwarnings('error') - unittest.main() diff --git a/tests/architecture/__init__.py b/tests/architecture/__init__.py new file mode 100644 index 000000000..dde388e0c --- /dev/null +++ b/tests/architecture/__init__.py @@ -0,0 +1 @@ +# SPDX-License-Identifier: BSD-3-Clause diff --git a/tests/architecture/test_car.py b/tests/architecture/test_car.py new file mode 100644 index 000000000..2be12fa89 --- /dev/null +++ b/tests/architecture/test_car.py @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: BSD-3-Clause + +from sol_usb.gateware.test import SolGatewareTestCase, sync_test_case + +from sol_usb.gateware.architecture.car import PHYResetController + +class PHYResetControllerTest(SolGatewareTestCase): + FRAGMENT_UNDER_TEST = PHYResetController + + def initialize_signals(self): + yield self.dut.trigger.eq(0) + + @sync_test_case + def test_power_on_reset(self): + + # + # After power-on, the PHY should remain in reset for a while. + # + yield + self.assertEqual((yield self.dut.phy_reset), 1) + + yield from self.advance_cycles(30) + self.assertEqual((yield self.dut.phy_reset), 1) + + yield from self.advance_cycles(60) + self.assertEqual((yield self.dut.phy_reset), 1) + + # + # Then, after the relevant reset time, it should resume being unasserted. + # + yield from self.advance_cycles(31) + self.assertEqual((yield self.dut.phy_reset), 0) + self.assertEqual((yield self.dut.phy_stop), 1) + + yield from self.advance_cycles(120) + self.assertEqual((yield self.dut.phy_stop), 0) diff --git a/tests/test_memory.py b/tests/test_memory.py new file mode 100644 index 000000000..a984b7656 --- /dev/null +++ b/tests/test_memory.py @@ -0,0 +1,123 @@ +# SPDX-License-Identifier: BSD-3-Clause + +from sol_usb.gateware.memory import TransactionalizedFIFO +from sol_usb.gateware.test import SolGatewareTestCase, sync_test_case + +class TransactionalizedFIFOTest(SolGatewareTestCase): + FRAGMENT_UNDER_TEST = TransactionalizedFIFO + FRAGMENT_ARGUMENTS = {'width': 8, 'depth': 16} + + def initialize_signals(self): + yield self.dut.write_en.eq(0) + + @sync_test_case + def test_simple_fill(self): + dut = self.dut + + # Our FIFO should start off empty; and with a full depth of free space. + self.assertEqual((yield dut.empty), 1) + self.assertEqual((yield dut.full), 0) + self.assertEqual((yield dut.space_available), 16) + + # If we add a byte to the queue... + yield dut.write_data.eq(0xAA) + yield from self.pulse(dut.write_en) + + # ... we should have less space available ... + self.assertEqual((yield dut.space_available), 15) + + # ... but we still should be 'empty', as we won't have data to read until we commit. + self.assertEqual((yield dut.empty), 1) + + # Once we _commit_ our write, we should suddenly have data to read. + yield from self.pulse(dut.write_commit) + self.assertEqual((yield dut.empty), 0) + + # If we read a byte, we should see the FIFO become empty... + yield from self.pulse(dut.read_en) + self.assertEqual((yield dut.empty), 1) + + # ... but we shouldn't see more space become available until we commit the read. + self.assertEqual((yield dut.space_available), 15) + yield from self.pulse(dut.read_commit) + self.assertEqual((yield dut.space_available), 16) + + # If we write 16 more bytes of data... + yield dut.write_en.eq(1) + for i in range(16): + yield dut.write_data.eq(i) + yield + yield dut.write_en.eq(0) + + # ... our buffer should be full, but also empty. + # This paradox exists as we've filled our buffer with uncommitted data. + yield + self.assertEqual((yield dut.full), 1) + self.assertEqual((yield dut.empty), 1) + + # Once we _commit_ our data, it should suddenly stop being empty. + yield from self.pulse(dut.write_commit) + self.assertEqual((yield dut.empty), 0) + + # Reading a byte _without committing_ shouldn't change anything about empty/full/space-available... + yield from self.pulse(dut.read_en) + self.assertEqual((yield dut.empty), 0) + self.assertEqual((yield dut.full), 1) + self.assertEqual((yield dut.space_available), 0) + + # ... but committing should increase our space available by one, and make our buffer no longer full. + yield from self.pulse(dut.read_commit) + self.assertEqual((yield dut.empty), 0) + self.assertEqual((yield dut.full), 0) + self.assertEqual((yield dut.space_available), 1) + + # Reading/committing another byte should increment our space available. + yield from self.pulse(dut.read_en) + yield from self.pulse(dut.read_commit) + self.assertEqual((yield dut.space_available), 2) + + # Writing data into the buffer should then fill it back up again... + yield dut.write_en.eq(1) + for i in range(2): + yield dut.write_data.eq(i) + yield + yield dut.write_en.eq(0) + + # ... meaning it will again be full, and have no space remaining. + yield + self.assertEqual((yield dut.full), 1) + self.assertEqual((yield dut.space_available), 0) + + # If we _discard_ this data, we should go back to having two bytes available. + yield from self.pulse(dut.write_discard) + self.assertEqual((yield dut.full), 0) + self.assertEqual((yield dut.space_available), 2) + + # If we read the data that's remaining in the fifo... + yield dut.read_en.eq(1) + for i in range(2, 16): + yield + self.assertEqual((yield dut.read_data), i) + yield dut.read_en.eq(0) + + # ... our buffer should again be empty. + yield + self.assertEqual((yield dut.empty), 1) + self.assertEqual((yield dut.space_available), 2) + + # If we _discard_ our current read, we should then see our buffer no longer empty... + yield from self.pulse(dut.read_discard) + self.assertEqual((yield dut.empty), 0) + + # and we should be able to read the same data again. + yield dut.read_en.eq(1) + for i in range(2, 16): + yield + self.assertEqual((yield dut.read_data), i) + yield dut.read_en.eq(0) + + # On committing this, we should see a buffer that is no longer full, and is really empty. + yield from self.pulse(dut.read_commit) + self.assertEqual((yield dut.empty), 1) + self.assertEqual((yield dut.full), 0) + self.assertEqual((yield dut.space_available), 16) diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 000000000..dde388e0c --- /dev/null +++ b/tests/utils/__init__.py @@ -0,0 +1 @@ +# SPDX-License-Identifier: BSD-3-Clause diff --git a/tests/utils/test_cdc.py b/tests/utils/test_cdc.py new file mode 100644 index 000000000..ba2935f62 --- /dev/null +++ b/tests/utils/test_cdc.py @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: BSD-3-Clause + +from unittest import TestCase + +from torii import Module, Record, Signal +from torii.hdl.rec import DIR_FANIN, DIR_FANOUT + +from sol_usb.gateware.test import SolGatewareTestCase, sync_test_case +from sol_usb.gateware.utils.cdc import stretch_strobe_signal, synchronize + +class StrobeStretcherTest(SolGatewareTestCase): + ''' Test case for our strobe stretcher function. ''' + + + def instantiate_dut(self): + m = Module() + + # Create a module that only has our stretched strobe signal. + m.strobe_in = Signal() + m.stretched_strobe = stretch_strobe_signal(m, m.strobe_in, to_cycles = 2) + + return m + + + def initialize_signals(self): + yield self.dut.strobe_in.eq(0) + + + @sync_test_case + def test_stretch(self): + + # Ensure our stretched strobe stays 0 until it sees an input. + yield + self.assertEqual((yield self.dut.stretched_strobe), 0) + yield + self.assertEqual((yield self.dut.stretched_strobe), 0) + + # Apply our strobe, and validate that we immediately see a '1'... + yield self.dut.strobe_in.eq(1) + yield + self.assertEqual((yield self.dut.stretched_strobe), 1) + + # ... ensure that 1 lasts for a second cycle ... + yield self.dut.strobe_in.eq(0) + yield + self.assertEqual((yield self.dut.stretched_strobe), 1) + + # ... and then returns to 0. + yield + self.assertEqual((yield self.dut.stretched_strobe), 0) + + yield + self.assertEqual((yield self.dut.stretched_strobe), 0) + + + +class SynchronizedTest(TestCase): + + def test_signal(self): + m = Module() + synchronize(m, Signal()) + + def test_directional_record(self): + m = Module() + + record = Record([ + ('sig_in', 1, DIR_FANIN), + ('sig_out', 1, DIR_FANOUT) + ]) + synchronize(m, record) + + def test_nested_record(self): + m = Module() + + record = Record([ + ('sig_in', 1, DIR_FANIN), + ('sig_out', 1, DIR_FANOUT), + ('nested', [ + ('subsig_in', 1, DIR_FANIN), + ('subsig_out', 1, DIR_FANOUT), + ]) + ]) + synchronize(m, record)