Skip to content

Commit

Permalink
LE CTKD cases
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Aug 14, 2023
1 parent ad9bf1b commit 89d95de
Showing 1 changed file with 45 additions and 2 deletions.
47 changes: 45 additions & 2 deletions cases/le_security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import logging

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
from bumble.pairing import PairingDelegate
from avatar.common import make_bredr_connection
from bumble.pairing import PairingConfig, PairingDelegate
from mobly import base_test, signals, test_runner
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_in # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from mobly.asserts import fail # type: ignore
from pandora.host_pb2 import PUBLIC, RANDOM, Connection, DataTypes, OwnAddressType
from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer, SecureResponse, WaitSecurityResponse
from pandora.security_pb2 import LEVEL2, LE_LEVEL3, PairingEventAnswer, SecureResponse, WaitSecurityResponse
from typing import Any, Literal, Optional, Tuple, Union


Expand Down Expand Up @@ -74,6 +75,10 @@ def teardown_class(self) -> None:
'against_display_yes_no',
'against_both_display_and_keyboard',
),
(
'ltk_irk_csrk',
'ltk_irk_csrk_lk',
),
)
) # type: ignore[misc]
@avatar.asynchronous
Expand All @@ -84,6 +89,7 @@ async def test_le_pairing(
ref_address_type_name: Union[Literal['against_random'], Literal['against_public']],
variant: Union[
Literal['accept'],
Literal['accept_ctkd'],
Literal['reject'],
Literal['rejected'],
Literal['disconnect'],
Expand All @@ -97,6 +103,10 @@ async def test_le_pairing(
Literal['against_display_yes_no'],
Literal['against_both_display_and_keyboard'],
],
key_distribution: Union[
Literal['ltk_irk_csrk'],
Literal['ltk_irk_csrk_lk'],
],
) -> None:

if self.dut.name == 'android' and connect == 'outgoing_connection' and pair == 'incoming_pairing':
Expand Down Expand Up @@ -125,6 +135,9 @@ async def test_le_pairing(

if not isinstance(self.ref, BumblePandoraDevice) and ref_io_capability != 'against_default_io_cap':
raise signals.TestSkip('Unable to override IO capability on non Bumble device.')

if 'lk' in key_distribution and ref_io_capability == 'against_no_output_no_input':
raise signals.TestSkip('CTKD requires Security Level 4')

# Factory reset both DUT and REF devices.
await asyncio.gather(self.dut.reset(), self.ref.reset())
Expand All @@ -139,6 +152,30 @@ async def test_le_pairing(
'against_both_display_and_keyboard': PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
}[ref_io_capability]
self.ref.server_config.io_capability = io_capability
bumble_key_distribution = sum(
{
'ltk': PairingDelegate.KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY,
'irk': PairingDelegate.KeyDistribution.DISTRIBUTE_IDENTITY_KEY,
'csrk': PairingDelegate.KeyDistribution.DISTRIBUTE_SIGNING_KEY,
'lk': PairingDelegate.KeyDistribution.DISTRIBUTE_LINK_KEY,
}[x]
for x in key_distribution.split('_')
)
assert bumble_key_distribution
self.ref.server_config.smp_local_initiator_key_distribution = bumble_key_distribution
self.ref.server_config.smp_local_responder_key_distribution = bumble_key_distribution
self.ref.server_config.identity_address_type = PairingConfig.AddressType.PUBLIC

if isinstance(self.dut, BumblePandoraDevice):
ALL_KEYS = (
PairingDelegate.KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
| PairingDelegate.KeyDistribution.DISTRIBUTE_IDENTITY_KEY
| PairingDelegate.KeyDistribution.DISTRIBUTE_SIGNING_KEY
| PairingDelegate.KeyDistribution.DISTRIBUTE_LINK_KEY
)
self.dut.server_config.smp_local_initiator_key_distribution = ALL_KEYS
self.dut.server_config.smp_local_responder_key_distribution = ALL_KEYS
self.dut.server_config.identity_address_type = PairingConfig.AddressType.PUBLIC

dut_address_type = RANDOM
ref_address_type = {
Expand Down Expand Up @@ -329,6 +366,12 @@ def on_done(_: Any) -> None:
if shall_pass:
assert_equal(secure.result_variant(), 'success')
assert_equal(wait_security.result_variant(), 'success')
if 'lk' in key_distribution:
# Make a Classic connection
ref_dut_classic, _dut_ref_clsssic = await make_bredr_connection(self.ref, self.dut)
# Try to encrypt Classic connection
ref_dut_secure = await self.ref.aio.security.Secure(ref_dut_classic, classic=LEVEL2)
assert_equal(ref_dut_secure.result_variant(), 'success')
else:
assert_in(
secure.result_variant(),
Expand Down

0 comments on commit 89d95de

Please sign in to comment.