Skip to content

Commit

Permalink
Add LE extended scan/advertise test case
Browse files Browse the repository at this point in the history
Run isort on le_host_test.py.
  • Loading branch information
BenjaminLawson committed Mar 29, 2024
1 parent 8de9f8d commit bbca2fd
Showing 1 changed file with 87 additions and 0 deletions.
87 changes: 87 additions & 0 deletions avatar/cases/le_host_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
from mobly.asserts import assert_is_not_none # type: ignore
from mobly.asserts import assert_true # type: ignore
from mobly.asserts import explicit_pass # type: ignore
from pandora.host_pb2 import PRIMARY_1M
from pandora.host_pb2 import PRIMARY_CODED
from pandora.host_pb2 import PUBLIC
from pandora.host_pb2 import RANDOM
from pandora.host_pb2 import SECONDARY_1M
from pandora.host_pb2 import SECONDARY_CODED
from pandora.host_pb2 import Connection
from pandora.host_pb2 import DataTypes
from pandora.host_pb2 import OwnAddressType
from pandora.host_pb2 import PrimaryPhy
from typing import Any, Dict, Literal, Optional, Union


Expand All @@ -58,6 +63,8 @@ class LeHostTest(base_test.BaseTestClass): # type: ignore[misc]
dut: PandoraDevice
ref: PandoraDevice

scan_timeout: float

def setup_class(self) -> None:
self.devices = PandoraDevices(self)
self.dut, self.ref, *_ = self.devices
Expand Down Expand Up @@ -139,6 +146,86 @@ def test_scan(
scan.cancel()
advertise.cancel()

@avatar.parameterized(
*itertools.product(
# The advertisement cannot be both connectable and scannable.
('connectable', 'non_connectable', 'non_connectable_scannable'),
('directed', 'undirected'),
# Bumble does not send multiple HCI commands, so it must also fit in
# 1 HCI command (max length 251 minus overhead).
(0, 150),
(PRIMARY_1M, PRIMARY_CODED),
),
) # type: ignore[misc]
def test_extended_scan(
self,
connectable_scannable: Union[
Literal['connectable'], Literal['non_connectable'], Literal['non_connectable_scannable']
],
directed: Union[Literal['directed'], Literal['undirected']],
data_len: int,
primary_phy: PrimaryPhy,
) -> None:
'''
Advertise from the REF device with the specified extended advertising
event properties. Use the manufacturer specific data to pad the advertising data to the
desired length. The scan response data must always be provided when
scannable.
'''
man_specific_data_length = max(0, data_len - 5) # Flags (3) + LV (2)
man_specific_data = bytes([random.randint(1, 255) for _ in range(man_specific_data_length)])
data = DataTypes(manufacturer_specific_data=man_specific_data) if data_len > 0 else None
scan_response_data = None
# Extended advertisements with advertising data cannot also have
# scan response data.
if connectable_scannable == 'non_connectable_scannable':
scan_response_data = data
data = None

is_connectable = True if connectable_scannable == 'connectable' else False
target = self.dut.address if directed == 'directed' else None

# For a better test, make the secondary phy the same as the primary to
# avoid the scan just scanning the 1M advertisement when the primary
# phy is CODED.
secondary_phy = SECONDARY_1M
if primary_phy == PRIMARY_CODED:
secondary_phy = SECONDARY_CODED

advertise = self.ref.host.Advertise(
legacy=False,
connectable=is_connectable,
data=data, # type: ignore[arg-type]
scan_response_data=scan_response_data, # type: ignore[arg-type]
public=target,
own_address_type=PUBLIC,
primary_phy=primary_phy,
secondary_phy=secondary_phy,
)

scan = self.dut.host.Scan(
legacy=False,
passive=False,
timeout=self.scan_timeout,
phys=[primary_phy],
)
report = next((x for x in scan if x.public == self.ref.address))
try:
report = next((x for x in scan if x.public == self.ref.address))

# TODO: scannable is not set by the android server
# TODO: direct_address is not set by the android server
assert_false(report.legacy, msg='expected extended advertising report')
assert_equal(report.connectable, is_connectable)
assert_equal(report.data.manufacturer_specific_data, man_specific_data)
assert_false(report.truncated, msg='expected non-truncated advertising report')
assert_equal(report.primary_phy, primary_phy)
except grpc.aio.AioRpcError as e:
raise e
finally:
scan.cancel()
advertise.cancel()

@avatar.parameterized(
(dict(incomplete_service_class_uuids16=["183A", "181F"]),),
(dict(incomplete_service_class_uuids32=["FFFF183A", "FFFF181F"]),),
Expand Down

0 comments on commit bbca2fd

Please sign in to comment.