Skip to content

Commit

Permalink
linting: fix/clean-ups for pylance type checking
Browse files Browse the repository at this point in the history
  • Loading branch information
agittins committed Apr 20, 2024
1 parent ab95993 commit 8564d5d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
58 changes: 33 additions & 25 deletions custom_components/bermuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def update_advertisement(
# There's no API for this, so we somewhat sneakily are accessing
# what is intended to be a protected dict.
# pylint: disable-next=protected-access
stamps = scandata.scanner._discovered_device_timestamps
stamps = scandata.scanner._discovered_device_timestamps # type: ignore

# In this dict all MAC address keys are upper-cased
uppermac = device_address.upper()
Expand Down Expand Up @@ -445,7 +445,9 @@ def calculate_data(self):
self.hist_distance_by_interval.insert(0, self.rssi_distance_raw)
del self.hist_distance_by_interval[1:]

elif (new_stamp is None) and self.stamp < MONOTONIC_TIME() - DISTANCE_TIMEOUT:
elif new_stamp is None and (
self.stamp is None or self.stamp < MONOTONIC_TIME() - DISTANCE_TIMEOUT
):
# DEVICE IS AWAY!
# Last distance reading is stale, mark device distance as unknown.
self.rssi_distance = None
Expand Down Expand Up @@ -545,7 +547,7 @@ def calculate_data(self):
movavg = local_min

# The average is only helpful if it's lower than the actual reading.
if movavg < self.rssi_distance_raw:
if self.rssi_distance_raw is None or movavg < self.rssi_distance_raw:
self.rssi_distance = movavg
else:
self.rssi_distance = self.rssi_distance_raw
Expand Down Expand Up @@ -660,19 +662,19 @@ def update_scanner(
self.scanners[format_mac(scanner_device.address)].update_advertisement(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id,
scanner_device.area_id or "area_not_defined",
self.options,
)
else:
self.scanners[format_mac(scanner_device.address)] = BermudaDeviceScanner(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id,
scanner_device.area_id or "area_not_defined",
self.options,
)
device_scanner = self.scanners[format_mac(scanner_device.address)]
# Let's see if we should update our last_seen based on this...
if self.last_seen < device_scanner.stamp:
if device_scanner.stamp is not None and self.last_seen < device_scanner.stamp:
self.last_seen = device_scanner.stamp

def to_dict(self):
Expand Down Expand Up @@ -753,7 +755,7 @@ def __init__(
def handle_state_changes(ev: Event):
"""Watch for new mac addresses on private ble devices and act."""
if ev.event_type == EVENT_STATE_CHANGED:
event_entity = ev.data.get("entity_id")
event_entity = ev.data.get("entity_id", "invalid_event_entity")
if event_entity in self.pb_state_sources:
# It's a state change of entity we are tracking.
new_state = ev.data.get("new_state")
Expand All @@ -778,9 +780,9 @@ def handle_state_changes(ev: Event):
# Flag that we need new pb checks, and work them out:
self._do_private_device_init = True
self.configure_beacons()
# If no sensors have yet been configured, the coordinator won't be getting
# polled for fresh data. Since we have found something, we should get it to
# do that.
# If no sensors have yet been configured, the coordinator
# won't be getting polled for fresh data. Since we have
# found something, we should get it to do that.
self.hass.add_job(
self.async_config_entry_first_refresh()
)
Expand Down Expand Up @@ -1011,11 +1013,11 @@ async def _async_update_data(self):

device.beacon_type = BEACON_IBEACON_SOURCE
device.beacon_uuid = man_data[2:18].hex().lower()
device.beacon_major = int.from_bytes(
man_data[18:20], byteorder="big"
device.beacon_major = str(
int.from_bytes(man_data[18:20], byteorder="big")
)
device.beacon_minor = int.from_bytes(
man_data[20:22], byteorder="big"
device.beacon_minor = str(
int.from_bytes(man_data[20:22], byteorder="big")
)
device.beacon_power = int.from_bytes(
[man_data[22]], signed=True
Expand All @@ -1025,7 +1027,7 @@ async def _async_update_data(self):
# UniversallyUniqueIDentifier is not even unique
# locally, so we need to make one :-)

device.beacon_unique_id = f"{device.beacon_uuid}_{device.beacon_major}_{device.beacon_minor}"
device.beacon_unique_id = f"{device.beacon_uuid}_{device.beacon_major}_{device.beacon_minor}" # pylint: disable=line-too-long
# Note: it's possible that a device sends multiple
# beacons. We are only going to process the latest
# one in any single update cycle, so we ignore that
Expand Down Expand Up @@ -1214,6 +1216,7 @@ def prune_devices(self):
# DOS-attack.
sorted_addresses = sorted([(v, k) for k, v in prunable_stamps.items()])
_LOGGER.info("Having to prune %s extra devices to make quota.", prune_quota)
# pylint: disable-next=unused-variable
for stamp, address in sorted_addresses[:prune_quota]:
prune_list.append(address)

Expand Down Expand Up @@ -1273,12 +1276,10 @@ def configure_beacons(self):
source_device = self._get_or_create_device(pb_address)
# Override name and prefname, as the user will (hopefully)
# chosen something nice for their private ble device name.
source_device.name = (
pb_device.name_by_user or pb_device.name
)
source_device.prefname = (
pb_device.name_by_user or pb_device.name
source_device.name = getattr(
pb_device, "name_by_user", getattr(pb_device, "name")
)
source_device.prefname = source_device.name
source_device.beacon_type = BEACON_PRIVATE_BLE_SOURCE

# As of 2024.4.0b4 Private_ble appends _device_tracker to the
Expand Down Expand Up @@ -1310,7 +1311,10 @@ def configure_beacons(self):

# Iterate through each device to see if it has an advert for our target id
for device in self.devices.values():
if device.beacon_type in [BEACON_IBEACON_SOURCE, BEACON_PRIVATE_BLE_SOURCE]:
if (
device.beacon_type in [BEACON_IBEACON_SOURCE, BEACON_PRIVATE_BLE_SOURCE]
and device.beacon_unique_id is not None
):
# We found an advert for our beacon of interest...
if (
device.beacon_unique_id not in freshest_beacon_sources # first-find
Expand Down Expand Up @@ -1447,14 +1451,18 @@ def _refresh_area_by_min_distance(self, device: BermudaDevice):
# reading was old enough that our algo decides it's "away".
if (
scanner.rssi_distance is not None
and scanner.rssi_distance < self.options.get(CONF_MAX_RADIUS)
and scanner.rssi_distance
< self.options.get(CONF_MAX_RADIUS, DEFAULT_MAX_RADIUS)
):
# It's inside max_radius...
if closest_scanner is None:
# no encumbent, we win!
closest_scanner = scanner
else:
if scanner.rssi_distance < closest_scanner.rssi_distance:
if (
closest_scanner.rssi_distance is not None
and scanner.rssi_distance < closest_scanner.rssi_distance
):
# We're closer than the last-closest, we win!
closest_scanner = scanner

Expand All @@ -1464,7 +1472,7 @@ def _refresh_area_by_min_distance(self, device: BermudaDevice):
device.area_id = closest_scanner.area_id
areas = self.area_reg.async_get_area(device.area_id)
if hasattr(areas, "name"):
device.area_name = areas.name
device.area_name = getattr(areas, "name", "invalid_area")
else:
# Wasn't a single area entry. Let's freak out, but not in a spammy way.
_LOGGER_SPAM_LESS.warning(
Expand Down Expand Up @@ -1536,7 +1544,7 @@ def _refresh_scanners(
else:
_LOGGER_SPAM_LESS.warning(
f"no_area_on_update{scandev.name}",
"No area name or no area id while updating scanner %s, area_id %s",
"No area name or no area id updating scanner %s, area_id %s",
scandev.name,
areas,
)
Expand Down
6 changes: 4 additions & 2 deletions custom_components/bermuda/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@

CONF_MAX_VELOCITY, DEFAULT_MAX_VELOCITY = "max_velocity", 3
DOCS[CONF_MAX_VELOCITY] = (
"In metres per second - ignore readings that imply movement away faster than this limit. 3m/s (10km/h) is good." # fmt: skip
"In metres per second - ignore readings that imply movement away faster than",
"this limit. 3m/s (10km/h) is good." # fmt: skip
)

CONF_DEVTRACK_TIMEOUT, DEFAULT_DEVTRACK_TIMEOUT = "devtracker_nothome_timeout", 30
Expand All @@ -131,7 +132,8 @@

CONF_UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL = "update_interval", 10
DOCS[CONF_UPDATE_INTERVAL] = (
"Maximum time between sensor updates in seconds. Smaller intervals means more data, bigger database." # fmt: skip
"Maximum time between sensor updates in seconds. Smaller intervals",
"means more data, bigger database." # fmt: skip
)

CONF_SMOOTHING_SAMPLES, DEFAULT_SMOOTHING_SAMPLES = "smoothing_samples", 20
Expand Down

0 comments on commit 8564d5d

Please sign in to comment.