Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply Black formatter to gcp.py #30

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,14 @@ def postprocessing_job_id(self):

@property
def postprocessing_job_name(self):
return f"projects/{self.gcp_project}/locations/{self.region}" \
f"/jobs/{self.postprocessing_job_id}"
return f"projects/{self.gcp_project}/locations/{self.region}/jobs/{self.postprocessing_job_id}"

@property
def postprocessing_job_console_url(self):
return f"https://console.cloud.google.com/run/jobs/details/{self.region}" \
f"/{self.postprocessing_job_id}/executions?project={self.gcp_project}"
return (
f"https://console.cloud.google.com/run/jobs/details/{self.region}"
f"/{self.postprocessing_job_id}/executions?project={self.gcp_project}"
)

# todo: aws-shared (see file comment)
def build_image(self):
Expand Down Expand Up @@ -381,7 +382,6 @@ def clean(self):
delete_job(self.gcp_batch_job_name)
self.clean_postprocessing_job()


def list_jobs(self):
"""
List existing GCP Batch jobs that match the provided project.
Expand Down Expand Up @@ -887,7 +887,6 @@ def process_results(self, skip_combine=False, use_dask_cluster=True):
if not skip_combine:
self.start_combine_results_job_on_cloud(self.results_dir, do_timeseries=do_timeseries)


@classmethod
def run_combine_results_on_cloud(cls, gcs_bucket, gcs_prefix, results_dir, do_timeseries):
"""This is the function that is run on the cloud to actually perform `combine_results` on
Expand All @@ -904,7 +903,6 @@ def run_combine_results_on_cloud(cls, gcs_bucket, gcs_prefix, results_dir, do_ti
DaskClient()
postprocessing.combine_results(GCSFileSystem(), results_dir, cfg, do_timeseries=do_timeseries)


def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True):
"""Set up `combine_results` to be run on GCP Cloud Run.

Expand All @@ -924,7 +922,7 @@ def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True):
resources=run_v2.ResourceRequirements(
limits={
"memory": f"{pp_env_cfg.get('memory_mib', 4096)}Mi",
"cpu": str(pp_env_cfg.get('cpus', 2)),
"cpu": str(pp_env_cfg.get("cpus", 2)),
}
),
command=["/bin/sh"],
Expand All @@ -934,11 +932,11 @@ def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True):
run_v2.EnvVar(name="GCS_PREFIX", value=self.gcs_prefix),
run_v2.EnvVar(name="GCS_BUCKET", value=self.gcs_bucket),
run_v2.EnvVar(name="RESULTS_DIR", value=results_dir),
run_v2.EnvVar(name="DO_TIMESERIES", value="True" if do_timeseries else "False")
run_v2.EnvVar(name="DO_TIMESERIES", value="True" if do_timeseries else "False"),
],
)
],
timeout=f"{60 * 60 * 24}s", # 24h
timeout=f"{60 * 60 * 24}s", # 24h
max_retries=0,
)
)
Expand All @@ -950,33 +948,40 @@ def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True):
run_v2.CreateJobRequest(
parent=f"projects/{self.gcp_project}/locations/{self.region}",
job_id=self.postprocessing_job_id,
job=job
job=job,
)
)

# Start the job!
try:
jobs_client.run_job(name=self.postprocessing_job_name)
logger.info("Post-processing Cloud Run job started! "
f"See status at: {self.postprocessing_job_console_url}. "
"You will need to run this script with --clean to clean up the GCP "
"environment after post-processing is complete.")
logger.info(
"Post-processing Cloud Run job started! "
f"See status at: {self.postprocessing_job_console_url}. "
"You will need to run this script with --clean to clean up the GCP "
"environment after post-processing is complete."
)
except:
logger.warning("Post-processing Cloud Run job failed to start. You may try starting it "
f"at the console: {self.postprocessing_job_console_url}")

logger.warning(
"Post-processing Cloud Run job failed to start. You may try starting it "
f"at the console: {self.postprocessing_job_console_url}"
)

def clean_postprocessing_job(self):
jobs_client = run_v2.JobsClient()
logger.info("Cleaning post-processing Cloud Run job with "
f"job_identifier='{self.job_identifier}'; "
f"job name={self.postprocessing_job_name}...")
logger.info(
"Cleaning post-processing Cloud Run job with "
f"job_identifier='{self.job_identifier}'; "
f"job name={self.postprocessing_job_name}..."
)
try:
job = jobs_client.get_job(name=self.postprocessing_job_name)
except Exception:
logger.warning("Post-processing Cloud Run job not found for "
f"job_identifier='{self.job_identifier}' "
f"(postprocessing_job_name='{self.postprocessing_job_name}').")
logger.warning(
"Post-processing Cloud Run job not found for "
f"job_identifier='{self.job_identifier}' "
f"(postprocessing_job_name='{self.postprocessing_job_name}')."
)
return

# Ask for confirmation to delete if it is not completed
Expand All @@ -993,10 +998,12 @@ def clean_postprocessing_job(self):
try:
executions_client.cancel_execution(name=job.latest_created_execution.name)
except Exception:
logger.warning("Failed to cancel execution with"
f" name={job.latest_created_execution.name}.", exc_info=True)
logger.warning(f"You may want to try deleting the job via the console:"
f" {self.postprocessing_job_console_url}")
logger.warning(
"Failed to cancel execution with name={job.latest_created_execution.name}.", exc_info=True
)
logger.warning(
f"You may want to try deleting the job via the console: {self.postprocessing_job_console_url}"
)
return

# ... The job succeeded or its execution was deleted successfully; it can be deleted
Expand All @@ -1006,7 +1013,6 @@ def clean_postprocessing_job(self):
logger.warning("Failed to deleted post-processing Cloud Run job.", exc_info=True)
logger.info(f"Post-processing Cloud Run job deleted: '{self.postprocessing_job_name}'")


def upload_results(self, *args, **kwargs):
"""
Overrides `BuildStockBatchBase.upload_results()` from base.
Expand Down
Loading