From d394612fa3bea925d677b1715f85f632e95738b4 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Mon, 24 Jul 2023 19:59:31 +0800 Subject: [PATCH] CTKD over BR/EDR accepted case --- cases/security_test.py | 160 +++++++++++++++++++++++++++++++++++------ 1 file changed, 138 insertions(+), 22 deletions(-) diff --git a/cases/security_test.py b/cases/security_test.py index 4d23b0c..b81df66 100644 --- a/cases/security_test.py +++ b/cases/security_test.py @@ -18,16 +18,63 @@ import logging from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices -from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE +from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE, HCI_Write_Default_Link_Policy_Settings_Command from bumble.pairing import PairingDelegate +from bumble.keys import PairingKeys from mobly import base_test, signals, test_runner from mobly.asserts import assert_equal # type: ignore from mobly.asserts import assert_in # type: ignore +from mobly.asserts import assert_is_none # type: ignore from mobly.asserts import assert_is_not_none # type: ignore from mobly.asserts import fail # type: ignore -from pandora.host_pb2 import Connection -from pandora.security_pb2 import LEVEL2, PairingEventAnswer, SecureResponse, WaitSecurityResponse -from typing import Any, Literal, Optional, Tuple, Union +from pandora.host_pb2 import Connection, DataTypes, RESOLVABLE_OR_RANDOM +from pandora.security_pb2 import LEVEL2, LE_LEVEL3, PairingEventAnswer, SecureResponse, WaitSecurityResponse +from typing import Any, Literal, Optional, Tuple, Union, List + +DEFAULT_SMP_KEY_DISTRIBUTION = ( + PairingDelegate.KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY + | PairingDelegate.KeyDistribution.DISTRIBUTE_IDENTITY_KEY + | PairingDelegate.KeyDistribution.DISTRIBUTE_SIGNING_KEY +) + +# Make classic connection task. +async def bredr_connect(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: + init_res, wait_res = await asyncio.gather( + initiator.aio.host.Connect(address=acceptor.address), + acceptor.aio.host.WaitConnection(address=initiator.address), + ) + assert_equal(init_res.result_variant(), 'connection') + assert_equal(wait_res.result_variant(), 'connection') + assert init_res.connection is not None and wait_res.connection is not None + return init_res.connection, wait_res.connection + + +async def le_connect_with_rpa_and_encrypt(central: PandoraDevice, peripheral: PandoraDevice) -> None: + advertisement = peripheral.aio.host.Advertise( + legacy=True, + connectable=True, + own_address_type=RESOLVABLE_OR_RANDOM, + data=DataTypes(manufacturer_specific_data=b'pause cafe'), + ) + + (cen_res, per_res) = await asyncio.gather( + central.aio.host.ConnectLE(own_address_type=RESOLVABLE_OR_RANDOM, public_identity=peripheral.address), + anext(aiter(advertisement)), # pytype: disable=name-error + ) + + advertisement.cancel() + assert_equal(cen_res.result_variant(), 'connection') + cen_per = cen_res.connection + per_cen = per_res.connection + assert cen_per is not None and per_cen is not None + + encryption = peripheral.security.Secure(connection=per_cen, le=LE_LEVEL3) + assert_equal(encryption.result_variant(), 'success') + + await asyncio.gather( + central.aio.host.Disconnect(connection=cen_per), + peripheral.aio.host.WaitDisconnection(connection=per_cen), + ) class SecurityTest(base_test.BaseTestClass): # type: ignore[misc] @@ -75,6 +122,7 @@ def teardown_class(self) -> None: 'rejected', 'disconnect', 'disconnected', + 'accept_ctkd', ), ( 'against_default_io_cap', @@ -97,6 +145,7 @@ async def test_ssp( Literal['rejected'], Literal['disconnect'], Literal['disconnected'], + Literal['accept_ctkd'], ], ref_io_capability: Union[ Literal['against_default_io_cap'], @@ -142,6 +191,10 @@ async def test_ssp( if not isinstance(self.ref, BumblePandoraDevice) and ref_io_capability != 'against_default_io_cap': raise signals.TestSkip('Unable to override IO capability on non Bumble device.') + # CTKD + if variant in ('accept_ctkd') and ref_io_capability not in ('against_display_yes_no'): + raise signals.TestSkip('CTKD cases must be conducted under Security Level 4') + # Factory reset both DUT and REF devices. await asyncio.gather(self.dut.reset(), self.ref.reset()) @@ -154,27 +207,32 @@ async def test_ssp( 'against_display_yes_no': PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT, }[ref_io_capability] self.ref.server_config.io_capability = io_capability + self.ref.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION + self.ref.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION + # Allow role switch + await self.ref.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True) # type: ignore + + if isinstance(self.dut, BumblePandoraDevice): + self.dut.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION + self.dut.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION + # Allow role switch + await self.dut.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True) # type: ignore # Pandora connection tokens ref_dut, dut_ref = None, None + # Bumble connection + ref_dut_bumble = None + dut_ref_bumble = None + # CTKD async task + ctkd_task = None # Connection/pairing task. async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: nonlocal ref_dut nonlocal dut_ref - - # Make classic connection task. - async def bredr_connect( - initiator: PandoraDevice, acceptor: PandoraDevice - ) -> Tuple[Connection, Connection]: - init_res, wait_res = await asyncio.gather( - initiator.aio.host.Connect(address=acceptor.address), - acceptor.aio.host.WaitConnection(address=initiator.address), - ) - assert_equal(init_res.result_variant(), 'connection') - assert_equal(wait_res.result_variant(), 'connection') - assert init_res.connection is not None and wait_res.connection is not None - return init_res.connection, wait_res.connection + nonlocal ref_dut_bumble + nonlocal dut_ref_bumble + nonlocal ctkd_task # Make classic connection. if connect == 'incoming_connection': @@ -182,20 +240,46 @@ async def bredr_connect( else: dut_ref, ref_dut = await bredr_connect(self.dut, self.ref) + # Retrieve Bumble connection + if isinstance(self.dut, BumblePandoraDevice): + dut_ref_bumble = self.dut.device.lookup_connection(int.from_bytes(dut_ref.cookie.value, 'big')) # type: ignore # Role switch. if isinstance(self.ref, BumblePandoraDevice): - ref_dut_raw = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big')) - if ref_dut_raw is not None: + ref_dut_bumble = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big')) + if ref_dut_bumble is not None: role = { 'against_central': HCI_CENTRAL_ROLE, 'against_peripheral': HCI_PERIPHERAL_ROLE, }[ref_role] - if ref_dut_raw.role != role: + if ref_dut_bumble.role != role: self.ref.log.info( f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}" ) - await ref_dut_raw.switch_role(role) + await ref_dut_bumble.switch_role(role) + + async def wait_ctkd_keys() -> List[PairingKeys]: + futures: List[asyncio.Future[PairingKeys]] = [] + if ref_dut_bumble is not None: + ref_dut_fut = asyncio.get_event_loop().create_future() + futures.append(ref_dut_fut) + + def on_pairing(keys: PairingKeys): + ref_dut_fut.set_result(keys) + + ref_dut_bumble.on('pairing', on_pairing) + if ref_dut_bumble is not None: + dut_ref_fut = asyncio.get_event_loop().create_future() + futures.append(dut_ref_fut) + + def on_pairing(keys: PairingKeys): + dut_ref_fut.set_result(keys) + + ref_dut_bumble.on('pairing', on_pairing) + + return await asyncio.gather(*futures) + + ctkd_task = asyncio.create_task(wait_ctkd_keys()) # Pairing. if pair == 'incoming_pairing': @@ -215,7 +299,7 @@ async def bredr_connect( # Start connection/pairing. connect_and_pair_task = asyncio.create_task(connect_and_pair()) - shall_pass = variant == 'accept' + shall_pass = variant == 'accept' or 'ctkd' in variant try: dut_pairing_fut = asyncio.create_task(anext(dut_pairing)) ref_pairing_fut = asyncio.create_task(anext(ref_pairing)) @@ -339,6 +423,38 @@ def on_done(_: Any) -> None: dut_pairing.cancel() ref_pairing.cancel() + if 'ctkd' not in variant: + return + + ctkd_shall_pass = variant == 'accept_ctkd' + + if variant == 'accept_ctkd': + + async def ctkd_over_bredr(): + if ref_role == 'against_central': + if ref_dut_bumble is not None: + await ref_dut_bumble.pair() + else: + if dut_ref_bumble is not None: + await dut_ref_bumble.pair() + assert ctkd_task is not None + await ctkd_task + + await ctkd_over_bredr() + else: + fail("Unsupported variant " + variant) + + if ctkd_shall_pass: + if ref_dut_bumble is not None: + ref_dut_keys = await self.ref.device.keystore.get(str(ref_dut_bumble.peer_address)) # type: ignore + assert_is_not_none(ref_dut_keys.ltk) # type: ignore + assert_is_not_none(ref_dut_keys.irk) # type: ignore + assert_is_not_none(ref_dut_keys.csrk) # type: ignore + + # Address resolving doesn't work for now + # TODO: Fix Pandora/Bumble/RootCanal address resolving + # await le_connect_with_rpa_and_encrypt(self.dut, self.ref) + if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG)