Skip to content

Commit

Permalink
CTKD over BR/EDR accepted case
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Aug 1, 2023
1 parent bd11b55 commit 14aa606
Showing 1 changed file with 137 additions and 22 deletions.
159 changes: 137 additions & 22 deletions cases/security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,63 @@
import logging

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE
from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE, HCI_Write_Default_Link_Policy_Settings_Command
from bumble.pairing import PairingDelegate
from bumble.keys import PairingKeys
from mobly import base_test, signals, test_runner
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_in # type: ignore
from mobly.asserts import assert_is_none # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from mobly.asserts import fail # type: ignore
from pandora.host_pb2 import Connection
from pandora.security_pb2 import LEVEL2, PairingEventAnswer, SecureResponse, WaitSecurityResponse
from typing import Any, Literal, Optional, Tuple, Union
from pandora.host_pb2 import Connection, DataTypes, RESOLVABLE_OR_RANDOM
from pandora.security_pb2 import LEVEL2, LE_LEVEL3, PairingEventAnswer, SecureResponse, WaitSecurityResponse
from typing import Any, Literal, Optional, Tuple, Union, List

DEFAULT_SMP_KEY_DISTRIBUTION = (
PairingDelegate.KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
| PairingDelegate.KeyDistribution.DISTRIBUTE_IDENTITY_KEY
| PairingDelegate.KeyDistribution.DISTRIBUTE_SIGNING_KEY
)

# Make classic connection task.
async def bredr_connect(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]:
init_res, wait_res = await asyncio.gather(
initiator.aio.host.Connect(address=acceptor.address),
acceptor.aio.host.WaitConnection(address=initiator.address),
)
assert_equal(init_res.result_variant(), 'connection')
assert_equal(wait_res.result_variant(), 'connection')
assert init_res.connection is not None and wait_res.connection is not None
return init_res.connection, wait_res.connection


async def le_connect_with_rpa_and_encrypt(central: PandoraDevice, peripheral: PandoraDevice) -> None:
advertisement = peripheral.aio.host.Advertise(
legacy=True,
connectable=True,
own_address_type=RESOLVABLE_OR_RANDOM,
data=DataTypes(manufacturer_specific_data=b'pause cafe'),
)

(cen_res, per_res) = await asyncio.gather(
central.aio.host.ConnectLE(own_address_type=RESOLVABLE_OR_RANDOM, public_identity=peripheral.address),
anext(aiter(advertisement)), # pytype: disable=name-error
)

advertisement.cancel()
assert_equal(cen_res.result_variant(), 'connection')
cen_per = cen_res.connection
per_cen = per_res.connection
assert cen_per is not None and per_cen is not None

encryption = peripheral.security.Secure(connection=per_cen, le=LE_LEVEL3)
assert_equal(encryption.result_variant(), 'success')

await asyncio.gather(
central.aio.host.Disconnect(connection=cen_per),
peripheral.aio.host.WaitDisconnection(connection=per_cen),
)


class SecurityTest(base_test.BaseTestClass): # type: ignore[misc]
Expand Down Expand Up @@ -75,6 +122,7 @@ def teardown_class(self) -> None:
'rejected',
'disconnect',
'disconnected',
'accept_ctkd',
),
(
'against_default_io_cap',
Expand All @@ -97,6 +145,7 @@ async def test_ssp(
Literal['rejected'],
Literal['disconnect'],
Literal['disconnected'],
Literal['accept_ctkd'],
],
ref_io_capability: Union[
Literal['against_default_io_cap'],
Expand Down Expand Up @@ -142,6 +191,10 @@ async def test_ssp(
if not isinstance(self.ref, BumblePandoraDevice) and ref_io_capability != 'against_default_io_cap':
raise signals.TestSkip('Unable to override IO capability on non Bumble device.')

# CTKD
if variant in ('accept_ctkd') and ref_io_capability not in ('against_display_yes_no'):
raise signals.TestSkip('CTKD cases must be conducted under Security Level 4')

# Factory reset both DUT and REF devices.
await asyncio.gather(self.dut.reset(), self.ref.reset())

Expand All @@ -154,48 +207,78 @@ async def test_ssp(
'against_display_yes_no': PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
}[ref_io_capability]
self.ref.server_config.io_capability = io_capability
self.ref.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
self.ref.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
# Allow role switch
await self.ref.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True) # type: ignore

if isinstance(self.dut, BumblePandoraDevice):
self.dut.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
self.dut.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
# Allow role switch
await self.dut.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True) # type: ignore

# Pandora connection tokens
ref_dut, dut_ref = None, None
# Bumble connection
ref_dut_bumble = None
dut_ref_bumble = None
# CTKD async task
ctkd_task = None
need_ctkd = 'ctkd' in variant

# Connection/pairing task.
async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
nonlocal ref_dut
nonlocal dut_ref

# Make classic connection task.
async def bredr_connect(
initiator: PandoraDevice, acceptor: PandoraDevice
) -> Tuple[Connection, Connection]:
init_res, wait_res = await asyncio.gather(
initiator.aio.host.Connect(address=acceptor.address),
acceptor.aio.host.WaitConnection(address=initiator.address),
)
assert_equal(init_res.result_variant(), 'connection')
assert_equal(wait_res.result_variant(), 'connection')
assert init_res.connection is not None and wait_res.connection is not None
return init_res.connection, wait_res.connection
nonlocal ref_dut_bumble
nonlocal dut_ref_bumble
nonlocal ctkd_task

# Make classic connection.
if connect == 'incoming_connection':
ref_dut, dut_ref = await bredr_connect(self.ref, self.dut)
else:
dut_ref, ref_dut = await bredr_connect(self.dut, self.ref)

# Retrieve Bumble connection
if isinstance(self.dut, BumblePandoraDevice):
dut_ref_bumble = self.dut.device.lookup_connection(int.from_bytes(dut_ref.cookie.value, 'big')) # type: ignore
# Role switch.
if isinstance(self.ref, BumblePandoraDevice):
ref_dut_raw = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big'))
if ref_dut_raw is not None:
ref_dut_bumble = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big'))
if ref_dut_bumble is not None:
role = {
'against_central': HCI_CENTRAL_ROLE,
'against_peripheral': HCI_PERIPHERAL_ROLE,
}[ref_role]

if ref_dut_raw.role != role:
if ref_dut_bumble.role != role:
self.ref.log.info(
f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}"
)
await ref_dut_raw.switch_role(role)
await ref_dut_bumble.switch_role(role)

async def wait_ctkd_keys() -> List[PairingKeys]:
futures: List[asyncio.Future[PairingKeys]] = []
if ref_dut_bumble is not None:
ref_dut_fut = asyncio.get_event_loop().create_future()
futures.append(ref_dut_fut)
def on_pairing(keys: PairingKeys) -> None:
ref_dut_fut.set_result(keys)
ref_dut_bumble.on('pairing', on_pairing)
if dut_ref_bumble is not None:
dut_ref_fut = asyncio.get_event_loop().create_future()
futures.append(dut_ref_fut)

def on_pairing(keys: PairingKeys) -> None:
dut_ref_fut.set_result(keys)
dut_ref_bumble.on('pairing', on_pairing)

return await asyncio.gather(*futures)

if need_ctkd:
ctkd_task = asyncio.create_task(wait_ctkd_keys())

# Pairing.
if pair == 'incoming_pairing':
Expand All @@ -215,7 +298,7 @@ async def bredr_connect(
# Start connection/pairing.
connect_and_pair_task = asyncio.create_task(connect_and_pair())

shall_pass = variant == 'accept'
shall_pass = variant == 'accept' or 'ctkd' in variant
try:
dut_pairing_fut = asyncio.create_task(anext(dut_pairing))
ref_pairing_fut = asyncio.create_task(anext(ref_pairing))
Expand Down Expand Up @@ -339,6 +422,38 @@ def on_done(_: Any) -> None:
dut_pairing.cancel()
ref_pairing.cancel()

if not need_ctkd:
return

ctkd_shall_pass = variant == 'accept_ctkd'

if variant == 'accept_ctkd':

async def ctkd_over_bredr() -> None:
if ref_role == 'against_central':
if ref_dut_bumble is not None:
await ref_dut_bumble.pair()
else:
if dut_ref_bumble is not None:
await dut_ref_bumble.pair()
assert ctkd_task is not None
await ctkd_task

await ctkd_over_bredr()
else:
fail("Unsupported variant " + variant)

if ctkd_shall_pass:
if ref_dut_bumble is not None:
ref_dut_keys = await self.ref.device.keystore.get(str(ref_dut_bumble.peer_address)) # type: ignore
assert_is_not_none(ref_dut_keys.ltk) # type: ignore
assert_is_not_none(ref_dut_keys.irk) # type: ignore
assert_is_not_none(ref_dut_keys.csrk) # type: ignore

# Address resolving doesn't work for now
# TODO: Fix Pandora/Bumble/RootCanal address resolving
# await le_connect_with_rpa_and_encrypt(self.dut, self.ref)


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Expand Down

0 comments on commit 14aa606

Please sign in to comment.