Skip to content

Commit

Permalink
Correctly calculate time when measuring hs + colortemp (#2624)
Browse files Browse the repository at this point in the history
* fix: correctly calculate time when measuring hs + colortemp

* fix: remove debugging code

* fix: docblock
  • Loading branch information
bramstroker authored Oct 26, 2024
1 parent 075152c commit f4db6c9
Showing 1 changed file with 110 additions and 40 deletions.
150 changes: 110 additions & 40 deletions utils/measure/runner/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,42 +80,64 @@ def get_export_directory(self) -> str:
return f"{self.light_info.model_id}"

def run(self, answers: dict[str, Any], export_directory: str) -> RunnerResult | None:
[self.run_color_mode(answers, export_directory, color_mode) for color_mode in self.color_modes]
measurements_to_run = [self.prepare_measurements_for_color_mode(export_directory, color_mode) for color_mode in self.color_modes]

all_variations: list[Variation] = []
for measurement in measurements_to_run:
all_variations.extend(measurement.variations)
left_variations = all_variations.copy()

[self.run_color_mode(answers, measurement_info, all_variations, left_variations) for measurement_info in measurements_to_run]

return RunnerResult(
model_json_data={"calculation_strategy": "lut"},
)

def prepare_measurements_for_color_mode(self, export_directory: str, color_mode: ColorMode) -> MeasurementRunInput:
"""Fetch all variations for the given color mode and prepare the measurement session."""

csv_file_path = f"{export_directory}/{color_mode.value}.csv"

resume_at = None
if self.should_resume(csv_file_path):
resume_at = self.get_resume_variation(csv_file_path, color_mode)

variations = list(self.get_variations(color_mode, resume_at))
return MeasurementRunInput(
color_mode=color_mode,
csv_file=csv_file_path,
variations=variations,
is_resuming=bool(resume_at),
)

def run_color_mode(
self,
answers: dict[str, Any],
export_directory: str,
color_mode: ColorMode,
measurement_info: MeasurementRunInput,
all_variations: list[Variation],
left_variations: list[Variation],
) -> None:
"""Run the measurement session for lights"""
csv_file_path = f"{export_directory}/{color_mode.value}.csv"

resume_at = None
color_mode = measurement_info.color_mode
file_write_mode = "w"
write_header_row = True
if self.should_resume(csv_file_path):
if measurement_info.is_resuming:
_LOGGER.info("Resuming measurements")
resume_at = self.get_resume_variation(csv_file_path, color_mode)
file_write_mode = "a"
write_header_row = False

variations = list(self.get_variations(color_mode, resume_at))
num_variations = len(variations)
variations = measurement_info.variations

_LOGGER.info(
"Starting measurements. Estimated duration: %s",
self.calculate_time_left(variations, variations[0]),
self.calculate_time_left(all_variations, left_variations),
)

with open(csv_file_path, file_write_mode, newline="") as csv_file:
with open(measurement_info.csv_file, file_write_mode, newline="") as csv_file:
csv_writer = CsvWriter(csv_file, color_mode, write_header_row)

if resume_at is None:
if measurement_info.is_resuming is None:
self.light_controller.change_light_state(ColorMode.BRIGHTNESS, on=False)

# Initially wait longer so the smartplug can settle
Expand All @@ -129,8 +151,8 @@ def run_color_mode(
previous_variation = None
for count, variation in enumerate(variations):
if count % 10 == 0:
time_left = self.calculate_time_left(variations, variation, count)
progress_percentage = round(count / num_variations * 100)
time_left = self.calculate_time_left(all_variations, left_variations, variation)
progress_percentage = ((len(all_variations) - len(left_variations)) / len(all_variations)) * 100
_LOGGER.info(
"Progress: %d%%, Estimated time left: %s",
progress_percentage,
Expand Down Expand Up @@ -176,18 +198,19 @@ def run_color_mode(
return
_LOGGER.info("Measured power: %.2f", power)
csv_writer.write_measurement(variation, power)
left_variations.remove(variation)

csv_file.close()
_LOGGER.info(
"Hooray! measurements finished. Exported CSV file %s",
csv_file_path,
measurement_info.csv_file,
)

self.light_controller.change_light_state(ColorMode.BRIGHTNESS, on=False)
_LOGGER.info("Turning off the light")

if bool(answers.get("gzip", True)):
self.gzip_csv(csv_file_path)
self.gzip_csv(measurement_info.csv_file)

def get_dummy_load_value(self) -> float:
"""Get the previously measured dummy load value"""
Expand Down Expand Up @@ -293,14 +316,17 @@ def inclusive_range(start: int, end: int, step: int) -> Iterator[int]:
i += step
yield end

@staticmethod
def calculate_time_left(
variations: list[Variation],
self,
all_variations: list[Variation],
left_variations: list[Variation],
current_variation: Variation | None = None,
progress: int = 0,
) -> str:
"""Try to guess the remaining time left. This will not account for measuring errors / retries obviously"""
num_variations_left = len(variations) - progress
num_variations_left = len(left_variations)
num_variations = len(all_variations)
progress = num_variations - num_variations_left
current_color_mode = self.get_color_mode(current_variation)

# Account estimated seconds for the light_controller and power_meter to process
estimated_step_delay = 0.15
Expand All @@ -312,34 +338,70 @@ def calculate_time_left(
if config.SAMPLE_COUNT > 1:
time_left += num_variations_left * config.SAMPLE_COUNT * (config.SLEEP_TIME_SAMPLE + estimated_step_delay)

if isinstance(current_variation, HsVariation):
sat_steps_left = (
round(
(config.MAX_BRIGHTNESS - current_variation.bri) / config.HS_BRI_STEPS,
)
- 1
)
time_left += sat_steps_left * config.SLEEP_TIME_SAT
hue_steps_left = round(
config.MAX_HUE / config.HS_HUE_STEPS * sat_steps_left,
color_mode_time_calculation = {
ColorMode.HS: self.calculate_hs_time_left,
ColorMode.COLOR_TEMP: self.calculate_ct_time_left,
ColorMode.BRIGHTNESS: lambda _: 0,
}

time_left += color_mode_time_calculation[current_color_mode](current_variation)

# Add timings for color modes which needs to be fully measured
left_color_modes = {self.get_color_mode(variation) for variation in left_variations}

time_left += sum(color_mode_time_calculation[mode](None) for mode in left_color_modes if mode not in current_color_mode)

return self.format_time_left(time_left)

@staticmethod
def get_color_mode(variation: Variation) -> ColorMode:
"""Get the color mode of the variation"""
if isinstance(variation, HsVariation):
return ColorMode.HS
if isinstance(variation, ColorTempVariation):
return ColorMode.COLOR_TEMP
return ColorMode.BRIGHTNESS

@staticmethod
def calculate_hs_time_left(current_variation: HsVariation | None) -> float:
"""Calculate the time left for the HS color mode."""
brightness = current_variation.bri if current_variation else config.MIN_BRIGHTNESS
sat_steps_left = (
round(
(config.MAX_BRIGHTNESS - brightness) / config.HS_BRI_STEPS,
)
time_left += hue_steps_left * config.SLEEP_TIME_HUE
- 1
)
time_left = sat_steps_left * config.SLEEP_TIME_SAT
hue_steps_left = round(
config.MAX_HUE / config.HS_HUE_STEPS * sat_steps_left,
)
time_left += hue_steps_left * config.SLEEP_TIME_HUE
return time_left

if isinstance(current_variation, ColorTempVariation):
ct_steps_left = (
round(
(config.MAX_BRIGHTNESS - current_variation.bri) / config.CT_BRI_STEPS,
)
- 1
@staticmethod
def calculate_ct_time_left(current_variation: ColorTempVariation | None) -> float:
"""Calculate the time left for the HS color mode."""
brightness = current_variation.bri if current_variation else config.MIN_BRIGHTNESS
ct_steps_left = (
round(
(config.MAX_BRIGHTNESS - brightness) / config.CT_BRI_STEPS,
)
time_left += ct_steps_left * config.SLEEP_TIME_CT
- 1
)
return ct_steps_left * config.SLEEP_TIME_CT

@staticmethod
def format_time_left(time_left: float) -> str:
"""Format the time left in a human readable format"""
if time_left < 0:
time_left = 0
if time_left > 3600:
formatted_time = f"{round(time_left / 3600, 1)}h"
elif time_left > 60:
formatted_time = f"{round(time_left / 60, 1)}m"
else:
formatted_time = f"{time_left}s"
formatted_time = f"{round(time_left, 1)}s"

return formatted_time

Expand Down Expand Up @@ -427,7 +489,7 @@ def should_resume(csv_file_path: str) -> bool:
should_resume = config.RESUME
if should_resume is None:
return inquirer.confirm(
message="CSV File already exists. Do you want to resume measurements?",
message=f"CSV File {csv_file_path} already exists. Do you want to resume measurements?",
default=True,
)
return should_resume
Expand Down Expand Up @@ -609,6 +671,14 @@ def is_ct_changed(self, other_variation: ColorTempVariation) -> bool:
return self.ct != other_variation.ct


@dataclass(frozen=True)
class MeasurementRunInput:
color_mode: ColorMode
csv_file: str
variations: list[Variation]
is_resuming: bool


class CsvWriter:
def __init__(
self,
Expand Down

0 comments on commit f4db6c9

Please sign in to comment.