Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use prep_batches in gcp.py #33

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildstockbatch/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,7 @@ def run_batch(self):
self._weather_dir = tmp_weather_dir
tmppath = pathlib.Path(tmpdir)

array_size, unique_epws = self.prep_batches(tmppath)
array_size, unique_epws, _ = self.prep_batches(tmppath)

logger.debug("Uploading files to S3")
upload_directory_to_s3(
Expand Down
5 changes: 3 additions & 2 deletions buildstockbatch/cloud/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ def prep_batches(self, tmppath):

:param tmppath: Path to a temporary directory where files should be collected before uploading.

:returns: (job_count, unique_epws), where
:returns: (job_count, unique_epws, n_sims), where
job_count: The number of jobs the samples were split into.
unique_epws: A dictionary mapping from the hash of weather files to a list of filenames
with that hashed value. Only the first in each list is written to tmppath, so
this can be used to recreate the other files later.
n_sims: The total number of simulations to run.
"""
# Generate buildstock.csv
buildstock_csv_filename = self.sampler.run_sampling()
Expand Down Expand Up @@ -182,4 +183,4 @@ def prep_batches(self, tmppath):

os.makedirs(tmppath / "results" / "simulation_output")

return (job_count, unique_epws)
return (job_count, unique_epws, n_sims)
107 changes: 2 additions & 105 deletions buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@
from joblib import Parallel, delayed
import json
import io
import itertools
import logging
import math
import os
import pathlib
import random
import re
import shutil
import subprocess
Expand All @@ -50,11 +47,8 @@
from buildstockbatch.cloud.docker_base import DockerBatchBase
from buildstockbatch.exc import ValidationError
from buildstockbatch.utils import (
calc_hash_for_file,
compress_file,
get_project_configuration,
log_error_details,
read_csv,
)


Expand Down Expand Up @@ -134,6 +128,7 @@ class GcpBatch(DockerBatchBase):
/___/(_/_(_(/_(_/_(___)(__(_)(__/ |_/___/(_/(_(__(__/ /_ / (___/(___/ /
Executing BuildStock projects with grace since 2018
"""
MAX_JOB_COUNT = 100000

def __init__(self, project_filename, job_identifier=None):
"""
Expand Down Expand Up @@ -419,111 +414,13 @@ def run_batch(self):
"""
gcp_cfg = self.cfg["gcp"]

# Step 1: Run sampling and split up buildings into batches.
buildstock_csv_filename = self.sampler.run_sampling()

# Step 2: Compress and upload weather data and any other required files to GCP
# todo: aws-shared (see file comment)
logger.info("Collecting and uploading input files")
with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir, tempfile.TemporaryDirectory(
prefix="bsb_"
) as tmp_weather_dir: # noqa: E501
self._weather_dir = tmp_weather_dir
self._get_weather_files()
tmppath = pathlib.Path(tmpdir)
logger.debug("Creating assets tarfile")
with tarfile.open(tmppath / "assets.tar.gz", "x:gz") as tar_f:
project_path = pathlib.Path(self.project_dir)
buildstock_path = pathlib.Path(self.buildstock_dir)
tar_f.add(buildstock_path / "measures", "measures")
if os.path.exists(buildstock_path / "resources/hpxml-measures"):
tar_f.add(buildstock_path / "resources/hpxml-measures", "resources/hpxml-measures")
tar_f.add(buildstock_path / "resources", "lib/resources")
tar_f.add(project_path / "housing_characteristics", "lib/housing_characteristics")

# Weather files
weather_path = tmppath / "weather"
os.makedirs(weather_path)

# Determine the unique weather files
epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir)))
logger.debug("Calculating hashes for weather files")
epw_hashes = Parallel(n_jobs=-1, verbose=5)(
delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename)
for epw_filename in epw_filenames
)
unique_epws = collections.defaultdict(list)
for epw_filename, epw_hash in zip(epw_filenames, epw_hashes):
unique_epws[epw_hash].append(epw_filename)

# Compress unique weather files
logger.debug("Compressing weather files")
Parallel(n_jobs=-1, verbose=5)(
delayed(compress_file)(pathlib.Path(self.weather_dir) / x[0], str(weather_path / x[0]) + ".gz")
for x in list(unique_epws.values())
)

logger.debug("Writing project configuration for upload")
with open(tmppath / "config.json", "wt", encoding="utf-8") as f:
json.dump(self.cfg, f)

# Collect simulations to queue
df = read_csv(buildstock_csv_filename, index_col=0, dtype=str)
self.validate_buildstock_csv(self.project_filename, df)
building_ids = df.index.tolist()
n_datapoints = len(building_ids)
n_sims = n_datapoints * (len(self.cfg.get("upgrades", [])) + 1)
logger.debug("Total number of simulations = {}".format(n_sims))

# GCP Batch allows up to 100,000 tasks, but limit to 10,000 here for consistency with AWS implementation.
if self.batch_array_size <= 10000:
max_array_size = self.batch_array_size
else:
max_array_size = 10000
n_sims_per_job = math.ceil(n_sims / max_array_size)
n_sims_per_job = max(n_sims_per_job, 2)
logger.debug("Number of simulations per array job = {}".format(n_sims_per_job))

# Create list of (building ID, upgrade to apply) pairs for all simulations to run.
baseline_sims = zip(building_ids, itertools.repeat(None))
upgrade_sims = itertools.product(building_ids, range(len(self.cfg.get("upgrades", []))))
all_sims = list(itertools.chain(baseline_sims, upgrade_sims))
random.shuffle(all_sims)
all_sims_iter = iter(all_sims)

os.makedirs(tmppath / "jobs")

# Write each batch of simulations to a file.
logger.info("Creating batches of jobs")
for i in itertools.count(0):
batch = list(itertools.islice(all_sims_iter, n_sims_per_job))
if not batch:
break
job_json_filename = tmppath / "jobs" / "job{:05d}.json".format(i)
with open(job_json_filename, "w") as f:
json.dump(
{
"job_num": i,
"n_datapoints": n_datapoints,
"batch": batch,
},
f,
indent=4,
)
task_count = i
logger.debug("Task count = {}".format(task_count))

# Compress job jsons
jobs_dir = tmppath / "jobs"
logger.debug("Compressing job jsons using gz")
tick = time.time()
with tarfile.open(tmppath / "jobs.tar.gz", "w:gz") as tf:
tf.add(jobs_dir, arcname="jobs")
tick = time.time() - tick
logger.debug("Done compressing job jsons using gz {:.1f} seconds".format(tick))
shutil.rmtree(jobs_dir)

os.makedirs(tmppath / "results" / "simulation_output")
task_count, unique_epws, n_sims = self.prep_batches(tmppath)

logger.debug(f"Uploading files to GCS bucket: {self.gcs_bucket}")
# TODO: Consider creating a unique directory each time a job is run,
Expand Down
3 changes: 2 additions & 1 deletion buildstockbatch/test/test_docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_prep_batches(basic_residential_project_file, mocker):
dbb._weather_dir = tmp_weather_dir
tmppath = pathlib.Path(tmpdir)

job_count, unique_epws = dbb.prep_batches(tmppath)
job_count, unique_epws, n_sims = dbb.prep_batches(tmppath)
sampler_mock.run_sampling.assert_called_once()

# Of the three test weather files, two are identical
Expand All @@ -45,6 +45,7 @@ def test_prep_batches(basic_residential_project_file, mocker):
# Three job files should be created, with 10 total simulations, split
# into batches of 4, 4, and 2 simulations.
assert job_count == 3
assert n_sims == 10
jobs_file_path = tmppath / "jobs.tar.gz"
with tarfile.open(jobs_file_path, "r") as tar_f:
all_job_files = ["jobs", "jobs/job00000.json", "jobs/job00001.json", "jobs/job00002.json"]
Expand Down
Loading