diff --git a/buildstockbatch/aws/aws.py b/buildstockbatch/aws/aws.py index 55f95a2c..2e55d279 100644 --- a/buildstockbatch/aws/aws.py +++ b/buildstockbatch/aws/aws.py @@ -1595,7 +1595,7 @@ def run_batch(self): self._weather_dir = tmp_weather_dir tmppath = pathlib.Path(tmpdir) - array_size, unique_epws = self.prep_batches(tmppath) + array_size, unique_epws, _ = self.prep_batches(tmppath) logger.debug("Uploading files to S3") upload_directory_to_s3( diff --git a/buildstockbatch/cloud/docker_base.py b/buildstockbatch/cloud/docker_base.py index 2551c45f..6bbbbadb 100644 --- a/buildstockbatch/cloud/docker_base.py +++ b/buildstockbatch/cloud/docker_base.py @@ -70,11 +70,12 @@ def prep_batches(self, tmppath): :param tmppath: Path to a temporary directory where files should be collected before uploading. - :returns: (job_count, unique_epws), where + :returns: (job_count, unique_epws, n_sims), where job_count: The number of jobs the samples were split into. unique_epws: A dictionary mapping from the hash of weather files to a list of filenames with that hashed value. Only the first in each list is written to tmppath, so this can be used to recreate the other files later. + n_sims: The total number of simulations to run. """ # Generate buildstock.csv buildstock_csv_filename = self.sampler.run_sampling() @@ -182,4 +183,4 @@ def prep_batches(self, tmppath): os.makedirs(tmppath / "results" / "simulation_output") - return (job_count, unique_epws) + return (job_count, unique_epws, n_sims) diff --git a/buildstockbatch/gcp/gcp.py b/buildstockbatch/gcp/gcp.py index a6aa626d..bc10bc85 100644 --- a/buildstockbatch/gcp/gcp.py +++ b/buildstockbatch/gcp/gcp.py @@ -25,12 +25,9 @@ from joblib import Parallel, delayed import json import io -import itertools import logging -import math import os import pathlib -import random import re import shutil import subprocess @@ -50,11 +47,8 @@ from buildstockbatch.cloud.docker_base import DockerBatchBase from buildstockbatch.exc import ValidationError from buildstockbatch.utils import ( - calc_hash_for_file, - compress_file, get_project_configuration, log_error_details, - read_csv, ) @@ -134,6 +128,7 @@ class GcpBatch(DockerBatchBase): /___/(_/_(_(/_(_/_(___)(__(_)(__/ |_/___/(_/(_(__(__/ /_ / (___/(___/ / Executing BuildStock projects with grace since 2018 """ + MAX_JOB_COUNT = 100000 def __init__(self, project_filename, job_identifier=None): """ @@ -419,111 +414,13 @@ def run_batch(self): """ gcp_cfg = self.cfg["gcp"] - # Step 1: Run sampling and split up buildings into batches. - buildstock_csv_filename = self.sampler.run_sampling() - - # Step 2: Compress and upload weather data and any other required files to GCP - # todo: aws-shared (see file comment) - logger.info("Collecting and uploading input files") with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir, tempfile.TemporaryDirectory( prefix="bsb_" ) as tmp_weather_dir: # noqa: E501 self._weather_dir = tmp_weather_dir - self._get_weather_files() tmppath = pathlib.Path(tmpdir) - logger.debug("Creating assets tarfile") - with tarfile.open(tmppath / "assets.tar.gz", "x:gz") as tar_f: - project_path = pathlib.Path(self.project_dir) - buildstock_path = pathlib.Path(self.buildstock_dir) - tar_f.add(buildstock_path / "measures", "measures") - if os.path.exists(buildstock_path / "resources/hpxml-measures"): - tar_f.add(buildstock_path / "resources/hpxml-measures", "resources/hpxml-measures") - tar_f.add(buildstock_path / "resources", "lib/resources") - tar_f.add(project_path / "housing_characteristics", "lib/housing_characteristics") - - # Weather files - weather_path = tmppath / "weather" - os.makedirs(weather_path) - - # Determine the unique weather files - epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir))) - logger.debug("Calculating hashes for weather files") - epw_hashes = Parallel(n_jobs=-1, verbose=5)( - delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename) - for epw_filename in epw_filenames - ) - unique_epws = collections.defaultdict(list) - for epw_filename, epw_hash in zip(epw_filenames, epw_hashes): - unique_epws[epw_hash].append(epw_filename) - - # Compress unique weather files - logger.debug("Compressing weather files") - Parallel(n_jobs=-1, verbose=5)( - delayed(compress_file)(pathlib.Path(self.weather_dir) / x[0], str(weather_path / x[0]) + ".gz") - for x in list(unique_epws.values()) - ) - logger.debug("Writing project configuration for upload") - with open(tmppath / "config.json", "wt", encoding="utf-8") as f: - json.dump(self.cfg, f) - - # Collect simulations to queue - df = read_csv(buildstock_csv_filename, index_col=0, dtype=str) - self.validate_buildstock_csv(self.project_filename, df) - building_ids = df.index.tolist() - n_datapoints = len(building_ids) - n_sims = n_datapoints * (len(self.cfg.get("upgrades", [])) + 1) - logger.debug("Total number of simulations = {}".format(n_sims)) - - # GCP Batch allows up to 100,000 tasks, but limit to 10,000 here for consistency with AWS implementation. - if self.batch_array_size <= 10000: - max_array_size = self.batch_array_size - else: - max_array_size = 10000 - n_sims_per_job = math.ceil(n_sims / max_array_size) - n_sims_per_job = max(n_sims_per_job, 2) - logger.debug("Number of simulations per array job = {}".format(n_sims_per_job)) - - # Create list of (building ID, upgrade to apply) pairs for all simulations to run. - baseline_sims = zip(building_ids, itertools.repeat(None)) - upgrade_sims = itertools.product(building_ids, range(len(self.cfg.get("upgrades", [])))) - all_sims = list(itertools.chain(baseline_sims, upgrade_sims)) - random.shuffle(all_sims) - all_sims_iter = iter(all_sims) - - os.makedirs(tmppath / "jobs") - - # Write each batch of simulations to a file. - logger.info("Creating batches of jobs") - for i in itertools.count(0): - batch = list(itertools.islice(all_sims_iter, n_sims_per_job)) - if not batch: - break - job_json_filename = tmppath / "jobs" / "job{:05d}.json".format(i) - with open(job_json_filename, "w") as f: - json.dump( - { - "job_num": i, - "n_datapoints": n_datapoints, - "batch": batch, - }, - f, - indent=4, - ) - task_count = i - logger.debug("Task count = {}".format(task_count)) - - # Compress job jsons - jobs_dir = tmppath / "jobs" - logger.debug("Compressing job jsons using gz") - tick = time.time() - with tarfile.open(tmppath / "jobs.tar.gz", "w:gz") as tf: - tf.add(jobs_dir, arcname="jobs") - tick = time.time() - tick - logger.debug("Done compressing job jsons using gz {:.1f} seconds".format(tick)) - shutil.rmtree(jobs_dir) - - os.makedirs(tmppath / "results" / "simulation_output") + task_count, unique_epws, n_sims = self.prep_batches(tmppath) logger.debug(f"Uploading files to GCS bucket: {self.gcs_bucket}") # TODO: Consider creating a unique directory each time a job is run, diff --git a/buildstockbatch/test/test_docker_base.py b/buildstockbatch/test/test_docker_base.py index 28863ea8..fd7d42c8 100644 --- a/buildstockbatch/test/test_docker_base.py +++ b/buildstockbatch/test/test_docker_base.py @@ -33,7 +33,7 @@ def test_prep_batches(basic_residential_project_file, mocker): dbb._weather_dir = tmp_weather_dir tmppath = pathlib.Path(tmpdir) - job_count, unique_epws = dbb.prep_batches(tmppath) + job_count, unique_epws, n_sims = dbb.prep_batches(tmppath) sampler_mock.run_sampling.assert_called_once() # Of the three test weather files, two are identical @@ -45,6 +45,7 @@ def test_prep_batches(basic_residential_project_file, mocker): # Three job files should be created, with 10 total simulations, split # into batches of 4, 4, and 2 simulations. assert job_count == 3 + assert n_sims == 10 jobs_file_path = tmppath / "jobs.tar.gz" with tarfile.open(jobs_file_path, "r") as tar_f: all_job_files = ["jobs", "jobs/job00000.json", "jobs/job00001.json", "jobs/job00002.json"]