diff --git a/pippin/classifiers/classifier.py b/pippin/classifiers/classifier.py index f4498b40..d06206c3 100644 --- a/pippin/classifiers/classifier.py +++ b/pippin/classifiers/classifier.py @@ -363,7 +363,7 @@ def get_num_ranseed(sim_tasks, lcfit_tasks): # deps.append(t) extra = t.get_unique_name() - assert t.__class__ == cls, f"Model {clas_name} with class {cls} has model {model} with class {t.__class__}, they should match!" + assert isinstance(t, cls), f"Model {clas_name} with class {cls} has model {model} with class {t.__class__}, they should match!" indexes = get_num_ranseed(sim_deps, fit_deps) for i in range(indexes): @@ -381,8 +381,6 @@ def get_num_ranseed(sim_tasks, lcfit_tasks): for i in range(indexes): num = i + 1 if indexes > 1 else None clas_output_dir = _get_clas_output_dir(base_output_dir, stage_number, sim_name, fit_name, clas_name, index=num) - print(clas_output_dir) - print(deps) cc = cls(clas_name, clas_output_dir, config, deps, mode, options, index=i) Task.logger.info( f"Creating classification task {name} with {cc.num_jobs} jobs, for LC fit {fit_name} on simulation {sim_name} and index {i}" diff --git a/pippin/classifiers/scone.py b/pippin/classifiers/scone.py index 23c95b82..395e6d89 100644 --- a/pippin/classifiers/scone.py +++ b/pippin/classifiers/scone.py @@ -1,16 +1,28 @@ -import shutil -import subprocess +# Created Mar 2024 by R.Kessler and H.Qu +# Refactor pippin interface to scone to accept and modify +# a scone-input file. + +import os, sys, shutil, subprocess, yaml, re, time from pathlib import Path -import yaml import pandas as pd -import re import numpy as np -import time from pippin.classifiers.classifier import Classifier from pippin.config import get_config, get_output_loc, mkdirs, get_data_loc, merge_dict from pippin.task import Task + +# ========================================= + +SCONE_SHELL_SCRIPT = "run.py" # top-level script under $SCONE_DIR + +KEYLIST_SCONE_INPUT = [ 'init_env_train', 'init_env_heatmaps', + 'prescale_heatmaps', 'nevt_select_heatmaps', + 'batch_size', 'categorical', 'class_balanced', + 'num_epochs', 'num_mjd_bins', 'num_wavelength_bins', + 'mode', 'trained_model', 'prob_column_name' ] + +# ========================================== class SconeClassifier(Classifier): """ convolutional neural network-based SN photometric classifier for details, see https://arxiv.org/abs/2106.04370, https://arxiv.org/abs/2111.05539, https://arxiv.org/abs/2207.09440 @@ -33,7 +45,8 @@ class SconeClassifier(Classifier): NUM_EPOCHS: 400 IA_FRACTION: 0.5 MODEL: /path/to/trained/model - SIM_FRACTION: 1 # fraction of sims to use for training + SIM_FRACTION: 1 # fraction of sims to use for training (to be obsolete) + PRESCALE_HEATMAPS: 1 # divide sample by PRESCALE for heatmag and training SCONE_CPU_BATCH_FILE: /path/to/sbatch/template/for/scone SCONE_GPU_BATCH_FILE: /path/to/sbatch/template/for/scone BATCH_REPLACE: {} @@ -45,99 +58,57 @@ class SconeClassifier(Classifier): """ + def __new__(cls, name, output_dir, config, dependencies, mode, options, index=0, model_name=None): + # XXX DEPRECATION + # If no BASE file is present, run legacy version of Scone + # Avoid recursive nonsense by making sure the type of `cls` is SconeClassifier + if cls == SconeClassifier and config.get('BASE') is None: + # Have to import later because SconeClassifier must exist prior to importing SconeLegacyClassifier + from pippin.classifiers.scone_legacy import SconeLegacyClassifier + cls = SconeLegacyClassifier + return super().__new__(cls) + def __init__(self, name, output_dir, config, dependencies, mode, options, index=0, model_name=None): super().__init__(name, output_dir, config, dependencies, mode, options, index=index, model_name=model_name) self.global_config = get_config() self.options = options - self.gpu = self.options.get("GPU", False) + # - - - - - - - + # special checks to help users cope with some changes + if mode == 'predict' and 'MODEL' in options: + self.options['TRAINED_MODEL'] = self.options['MODEL'] + + self.gpu = self.options.get("GPU", False) self.init_env_heatmaps = self.global_config["SCONE"]["init_env_cpu"] - self.init_env = self.global_config["SCONE"]["init_env_cpu"] if not self.gpu else self.global_config["SCONE"]["init_env_gpu"] + self.init_env = self.global_config["SCONE"]["init_env_cpu"] if not self.gpu else self.global_config["SCONE"]["init_env_gpu"] self.path_to_classifier = self.global_config["SCONE"]["location"] + self.combine_mask = "COMBINE_MASK" in config - output_path_obj = Path(self.output_dir) + self.select_lcfit = self.options.get("OPTIONAL_MASK_FIT", None) # RK May 3 2024 + scone_input_file = config.get('BASE') # refactor by passing scone input file to pippin + if scone_input_file is not None: + scone_input_file = get_data_loc(scone_input_file) + self.scone_input_file = scone_input_file + + output_path_obj = Path(self.output_dir) heatmaps_path_obj = output_path_obj / "heatmaps" + self.job_base_name = output_path_obj.parents[1].name + "__" + output_path_obj.name - self.batch_replace = self.options.get("BATCH_REPLACE", self.global_config.get("BATCH_REPLACE", {})) - self.slurm = """{sbatch_header} - {task_setup}""" + self.batch_replace = self.options.get("BATCH_REPLACE", + self.global_config.get("BATCH_REPLACE", {})) - self.config_path = str(output_path_obj / "model_config.yml") - self.logfile = str(output_path_obj / "output.log") - self.model_sbatch_job_path = str(output_path_obj / "job.slurm") - - self.heatmaps_path = str(heatmaps_path_obj) self.heatmaps_done_file = str(heatmaps_path_obj / "done.txt") - self.heatmaps_sbatch_header_path = str(heatmaps_path_obj / "sbatch_header.sh") - self.heatmaps_log_path = str(heatmaps_path_obj / f"create_heatmaps__{Path(self.config_path).name.split('.')[0]}.log") remake_heatmaps = self.options.get("REMAKE_HEATMAPS", False) self.keep_heatmaps = not remake_heatmaps - def make_sbatch_header(self, option_name, header_dict, use_gpu=False): - sbatch_header_template = self.options.get(option_name) - sbatch_header = self.sbatch_gpu_header if use_gpu else self.sbatch_cpu_header - - if sbatch_header_template is not None: - self.logger.debug(f"batch file found at {sbatch_header_template}") - with open(get_data_loc(sbatch_header_template), 'r') as f: - sbatch_header = f.read() - - sbatch_header = self.clean_header(sbatch_header) - - header_dict = merge_dict(header_dict, self.batch_replace) - return self._update_header(sbatch_header, header_dict) - - def make_heatmaps_sbatch_header(self): - self.logger.info("heatmaps not created, creating now") - shutil.rmtree(self.output_dir, ignore_errors=True) - mkdirs(self.heatmaps_path) - - # TODO: if externally specified batchfile exists, have to parse desired logfile path from it - header_dict = { - "REPLACE_LOGFILE": self.heatmaps_log_path, - "REPLACE_WALLTIME": "12:00:00", #TODO: change to scale with # of heatmaps expected - "REPLACE_MEM": self.options.get("HEATMAPS_MEM", "32GB"), - } - heatmaps_sbatch_header = self.make_sbatch_header("HEATMAPS_BATCH_FILE", header_dict) - - with open(self.heatmaps_sbatch_header_path, "w+") as f: - f.write(heatmaps_sbatch_header) - - def make_model_sbatch_script(self): - header_dict = { - "REPLACE_NAME": self.job_base_name, - "REPLACE_LOGFILE": str(Path(self.output_dir) / "output.log"), - "REPLACE_MEM": self.options.get("MODEL_MEM", "64GB"), - "REPLACE_WALLTIME": "4:00:00" if self.gpu else "12:00:00", # 4h is max for gpu - } - model_sbatch_header = self.make_sbatch_header("MODEL_BATCH_FILE", header_dict, use_gpu=self.gpu) - - setup_dict = { - "init_env": self.init_env, - "path_to_classifier": self.path_to_classifier, - "heatmaps_path": self.heatmaps_path, - "config_path": self.config_path, - "done_file": self.done_file, - } - - format_dict = { - "sbatch_header": model_sbatch_header, - "task_setup": self.update_setup(setup_dict, self.task_setup['scone']) - } - - self.logger.info(f"Running SCONE model, slurm job written to {self.model_sbatch_job_path}") - slurm_script = self.slurm.format(**format_dict) - - with open(self.model_sbatch_job_path, "w") as f: - f.write(slurm_script) - - return slurm_script + return def classify(self, mode): + self.logger.info(f"============ Prepare refactored SCONE with mode = {mode} =============") failed = False if Path(self.done_file).exists(): self.logger.debug(f"Found done file at {self.done_file}") @@ -145,119 +116,182 @@ def classify(self, mode): if "SUCCESS" not in f.read().upper(): failed = True - heatmaps_created = self._heatmap_creation_success() and self.keep_heatmaps + scone_input_file = self.scone_input_file + # - - - - sim_deps = self.get_simulation_dependency() sim_dirs = [sim_dep.output["photometry_dirs"][self.index] for sim_dep in sim_deps] - lcdata_paths = [path for path in self._get_lcdata_paths(sim_dirs) if "PHOT.FITS" in path] - metadata_paths = [path.replace("PHOT.FITS", "HEAD.FITS") for path in lcdata_paths] + # prepare scone input lines needed to create hash, + # but don't create scone input file yet. + scone_input_lines = self.prepare_scone_input_lines(sim_dirs,mode) - str_config = self._make_config(metadata_paths, lcdata_paths, mode, heatmaps_created) - new_hash = self.get_hash_from_string(str_config) + str_config = ' '.join(scone_input_lines) + new_hash = self.get_hash_from_string(str_config) if self._check_regenerate(new_hash) or failed: - self.logger.debug("Regenerating") + self.logger.debug("Regenerating scone") else: - self.logger.info("Hash check passed, not rerunning") + self.logger.info("scone hash check passed, not rerunning") self.should_be_done() return True - if not heatmaps_created: - # this deletes the whole directory tree, don't write anything before this - self.make_heatmaps_sbatch_header() - + # later, perhaps check to preserve heatmaps ?? + if os.path.exists(self.output_dir): + shutil.rmtree(self.output_dir) + os.makedirs(self.output_dir) + + # write scone input file, and beware that name of scone + # input file is updated + scone_input_base = os.path.basename(self.scone_input_file) + self.scone_input_file = self.output_dir + '/' + 'PIP_' + scone_input_base + with open(self.scone_input_file,"wt") as i: + for line in scone_input_lines: + i.write(f"{line}\n") + self.save_new_hash(new_hash) - with open(self.config_path, "w+") as cfgfile: - cfgfile.write(str_config) - - slurm_script = self.make_model_sbatch_script() - - # TODO: nersc needs `module load esslurm` to sbatch gpu jobs, maybe make - # this shell command to a file so diff systems can define their own - file_to_run = 'run.py' - if self.options.get("REFACTORED", False): - file_to_run = 'run_refactor.py' - elif self.options.get("LEGACY", False): - file_to_run = 'run_legacy.py' - path = Path(self.path_to_classifier) / file_to_run - path = path if path.exists() else Path(self.path_to_classifier) / 'run.py' - cmd = f"python {str(path)} --config_path {self.config_path}" - subprocess.run([cmd], shell=True) + + path = Path(self.path_to_classifier) / SCONE_SHELL_SCRIPT + path = path if path.exists() else Path(self.path_to_classifier) / SCONE_SHELL_SCRIPT + cmd = f"python {str(path)} " \ + f"--config_path {self.scone_input_file} " + # f"--sbatch_job_name {self.job_base_name} " + self.logger.info(f"Running command: {cmd}") + subprocess.run([cmd], shell=True) return True + def prepare_scone_input_lines(self, sim_dirs, mode ): + + # Created Apr 2024 by R.Kessler + # Read base scone input and make a few modification such as + # the sim data dirs, and other substitutions defined in pippin input. + # Method returns list of lines for modified scone-config input file. + # Original comments and input layout are preserved. + + config_lines = [] + scone_input_file = self.scone_input_file + options_local = self.options.copy() # make local copy + + # set local mode as if it were an override key in pippin input file + options_local['MODE'] = mode + + if mode == 'predict' : + options_local['PROB_COLUMN_NAME'] = self.get_prob_column_name() + + # - - - - + flag_remove_line = False + + with open(scone_input_file, 'r') as i: + inp_config = i.read().split('\n') + + key_replace_dict = {} + key_remove_list = [ 'input_data_paths:' , 'snid_select_files:', + 'sbatch_job_name:' ] + + for line_in in inp_config: + line_out = line_in + wdlist = line_in.split() + nwd = len(wdlist) + if nwd == 0 : + flag_remove_line = False + else: + if wdlist[0] == 'output_path:' : + line_out = line_in.replace(wdlist[1],self.output_dir) + + # goofy logic to remove original input_data_paths + if flag_remove_line and wdlist[0] != '-' : + flag_remove_line = False + if wdlist[0] in key_remove_list: + flag_remove_line = True + + # check all possible scone keys that can be overwritten/added + for key in KEYLIST_SCONE_INPUT: + if wdlist[0] == key + ':' : + key_pippin = key.upper() + if key_pippin in options_local: + key_replace_dict[key_pippin] = True + val_replace = options_local[key_pippin] + line_out = line_in.replace(wdlist[1],str(val_replace)) + + # remove prescale for predict mode + if mode == 'predict' and 'prescale' in wdlist[0]: + line_out = f"# WARNING: {wdlist[0]} removed for {mode} mode." + + + if not flag_remove_line : + config_lines.append(line_out) + + # - - - - - - - - - - + # add extra info from pippin + config_lines.append(f"") + config_lines.append(f"# ======================================= ") + config_lines.append(f"# keys added by pippin\n ") + + # pass sbatch_job_name via config since there are other sbatch config + # keys already. Could also pass via command line arg --sbatch_job_name. + config_lines.append(f"sbatch_job_name: {self.job_base_name}\n") + + config_lines.append(f"input_data_paths:") + for sim_dir in sim_dirs: + resolved_dir = os.path.realpath(sim_dir) + config_lines.append(f" - {resolved_dir}") + + # add pippin-specified keys that were not in the original scone input + for key_pippin in options_local: + key = key_pippin.lower() + if key_pippin not in key_replace_dict and key in KEYLIST_SCONE_INPUT: + val = options_local[key_pippin] + line = f"{key}: {val}" + config_lines.append(f"") + config_lines.append(f"{line}") + + # check option to select events passing LCFIT + + if self.select_lcfit: + config_lines.append(f'') + config_lines.append(f'# Train on events passing LCFIT') + config_lines.append('snid_select_files:') + lcfit_deps = self.get_fit_dependency() + #self.logger.info(f"\n xxx lcfit_deps = \n{lcfit_deps}\n") + for tmp_dict in lcfit_deps: + fitres_dir = tmp_dict['fitres_dirs'][self.index] + fitopt_base_file = tmp_dict['fitopt_map']['DEFAULT'] + fitres_file = f"{fitres_dir}/{fitopt_base_file}" + config_lines.append(f" - {fitres_file}") + + return config_lines + + + #def get_optional_requirements(config): + # # Created May 3 2024 by R.Kessler and P.Armstrong + # if config.get("SELECT_LCFIT", False): + # return False, True # wait for LCFIT task + # return False, False # no optional LCFIT task + def predict(self): return self.classify("predict") def train(self): return self.classify("train") - def _get_types(self): - types = {} - for t in self.get_simulation_dependency(): - for k, v in t.output['types'].items(): - if k not in types: - types[k] = v - return types - - def _make_config(self, metadata_paths, lcdata_paths, mode, heatmaps_created): - config = {} - - # environment configuration - config["init_env_heatmaps"] = self.init_env_heatmaps - config["init_env"] = self.init_env - - # info for heatmap creation - if not heatmaps_created: - config["sbatch_header_path"] = self.heatmaps_sbatch_header_path - - config["heatmaps_donefile"] = self.heatmaps_done_file - config["heatmaps_logfile"] = self.heatmaps_log_path - config["sim_fraction"] = self.options.get("SIM_FRACTION", 1) # 1/sim_fraction % of simulated SNe will be used for the model - config["heatmaps_path"] = self.heatmaps_path - config["model_sbatch_job_path"] = self.model_sbatch_job_path - config["num_wavelength_bins"] = self.options.get("NUM_WAVELENGTH_BINS", 32) - config["num_mjd_bins"] = self.options.get("NUM_MJD_BINS", 180) - config["metadata_paths"] = metadata_paths - config["lcdata_paths"] = lcdata_paths - - # info for classification model - config["categorical"] = self.options.get("CATEGORICAL", False) - config["num_epochs"] = self.options.get("NUM_EPOCHS", 400) # TODO: replace num epochs with autostop: stop training when slope plateaus? - config["batch_size"] = self.options.get("BATCH_SIZE", 32) # TODO: replace with percentage of total size? - config["Ia_fraction"] = self.options.get("IA_FRACTION", 0.5) - config["output_path"] = self.output_dir - config["trained_model"] = self.options.get("MODEL", None) - config["kcor_file"] = self.options.get("KCOR_FILE", None) - config["mode"] = mode - config["job_base_name"] = self.job_base_name - config["class_balanced"] = (mode == "train") - - types = self._get_types() - if types is not None: - types = {int(k): v for k, v in types.items()} # sometimes the keys are strings, sometimes ints - self.logger.info(f"input types from sim found, types set to {types}") - config["sn_type_id_to_name"] = types - - return yaml.dump(config) - def _check_completion(self, squeue): if Path(self.done_file).exists(): - self.logger.debug(f"Found done file at {self.done_file}") + self.logger.debug(f"Found scone done file at {self.done_file}") with open(self.done_file) as f: if "SUCCESS" not in f.read().upper(): return Task.FINISHED_FAILURE pred_path = str(Path(self.output_dir) / "predictions.csv") - predictions = pd.read_csv(pred_path) - if "pred_labels" in predictions.columns: - predictions = predictions[["snid", "pred_labels"]] # make sure snid is the first col - predictions = predictions.rename(columns={"pred_labels": self.get_prob_column_name()}) - predictions.to_csv(pred_path, index=False) - self.logger.info(f"Predictions file can be found at {pred_path}") + #predictions = pd.read_csv(pred_path) + #if "pred_labels" in predictions.columns: + # predictions = predictions[["snid", "pred_labels"]] # make sure snid is the first col + # predictions = predictions.rename(columns={"pred_labels": self.get_prob_column_name()}) + # predictions.to_csv(pred_path, index=False) + #self.logger.info(f"Predictions file can be found at {pred_path}") self.output.update({"model_filename": self.options.get("MODEL", str(Path(self.output_dir) / "trained_model")), "predictions_filename": pred_path}) + return Task.FINISHED_SUCCESS return self.check_for_job(squeue, self.job_base_name) @@ -269,28 +303,6 @@ def _heatmap_creation_success(self): return False return Path(self.heatmaps_path).exists() and (Path(self.heatmaps_path) / "done.log").exists() - def num_jobs_in_queue(self): - print("rerun num jobs in queue") - squeue = [i.strip() for i in subprocess.check_output(f"squeue -h -u $USER -o '%.200j'", shell=True, text=True).splitlines()] - self.logger.debug(f"{squeue}") - return self.check_for_job(squeue, self.job_base_name) - - @staticmethod - def _get_lcdata_paths(sim_dirs): - lcdata_paths = [str(f.resolve()) for sim_dir in sim_dirs for f in Path(sim_dir).iterdir() if "PHOT" in f.name] - return lcdata_paths - - @staticmethod - def _update_header(header, header_dict): - for key, value in header_dict.items(): - if key in header: - header = header.replace(key, str(value)) - append_list = header_dict.get("APPEND") - if append_list is not None: - lines = header.split('\n') - lines += append_list - header = '\n'.join(lines) - return header @staticmethod def get_requirements(options): diff --git a/pippin/classifiers/scone_legacy.py b/pippin/classifiers/scone_legacy.py new file mode 100644 index 00000000..35d8a153 --- /dev/null +++ b/pippin/classifiers/scone_legacy.py @@ -0,0 +1,292 @@ +import shutil +import subprocess +from pathlib import Path +import yaml +import pandas as pd +import re +import numpy as np +import time + +from pippin.classifiers.scone import SconeClassifier +from pippin.config import get_config, get_output_loc, mkdirs, get_data_loc, merge_dict +from pippin.task import Task + +class SconeLegacyClassifier(SconeClassifier): + """ convolutional neural network-based SN photometric classifier + for details, see https://arxiv.org/abs/2106.04370, https://arxiv.org/abs/2111.05539, https://arxiv.org/abs/2207.09440 + + CONFIGURATION: + ============== + CLASSIFICATION: + label: + CLASSIFIER: SconeClassifier + MASK: TEST # partial match on sim and classifier + MASK_SIM: TEST # partial match on sim name + MASK_FIT: TEST # partial match on lcfit name + MODE: train/predict + OPTS: + GPU: True + CATEGORICAL: False + NUM_WAVELENGTH_BINS: 32 + NUM_MJD_BINS: 180 + REMAKE_HEATMAPS: False + NUM_EPOCHS: 400 + IA_FRACTION: 0.5 + MODEL: /path/to/trained/model + SIM_FRACTION: 1 # fraction of sims to use for training + SCONE_CPU_BATCH_FILE: /path/to/sbatch/template/for/scone + SCONE_GPU_BATCH_FILE: /path/to/sbatch/template/for/scone + BATCH_REPLACE: {} + + OUTPUTS: + ======== + predictions.csv: list of snids and associated predictions + training_history.csv: training history output from keras + + """ + + def __init__(self, name, output_dir, config, dependencies, mode, options, index=0, model_name=None): + super().__init__(name, output_dir, config, dependencies, mode, options, index=index, model_name=model_name) + self.logger.warning(f'Using Legacy Scone version, pass a Scone input file via `BASE: /path/to/input.yml` to use the latest Scone version.') + self.global_config = get_config() + self.options = options + + self.gpu = self.options.get("GPU", False) + self.init_env_heatmaps = self.global_config["SCONE"]["init_env_cpu"] + self.init_env = self.global_config["SCONE"]["init_env_cpu"] if not self.gpu else self.global_config["SCONE"]["init_env_gpu"] + self.path_to_classifier = self.global_config["SCONE"]["location"] + self.combine_mask = "COMBINE_MASK" in config + + output_path_obj = Path(self.output_dir) + heatmaps_path_obj = output_path_obj / "heatmaps" + + self.job_base_name = output_path_obj.parents[1].name + "__" + output_path_obj.name + + self.batch_replace = self.options.get("BATCH_REPLACE", self.global_config.get("BATCH_REPLACE", {})) + self.slurm = """{sbatch_header} + {task_setup}""" + + self.config_path = str(output_path_obj / "model_config.yml") + self.logfile = str(output_path_obj / "output.log") + self.model_sbatch_job_path = str(output_path_obj / "job.slurm") + + self.heatmaps_path = str(heatmaps_path_obj) + self.heatmaps_done_file = str(heatmaps_path_obj / "done.txt") + self.heatmaps_sbatch_header_path = str(heatmaps_path_obj / "sbatch_header.sh") + self.heatmaps_log_path = str(heatmaps_path_obj / f"create_heatmaps__{Path(self.config_path).name.split('.')[0]}.log") + + remake_heatmaps = self.options.get("REMAKE_HEATMAPS", False) + self.keep_heatmaps = not remake_heatmaps + + def make_sbatch_header(self, option_name, header_dict, use_gpu=False): + sbatch_header_template = self.options.get(option_name) + sbatch_header = self.sbatch_gpu_header if use_gpu else self.sbatch_cpu_header + + if sbatch_header_template is not None: + self.logger.debug(f"batch file found at {sbatch_header_template}") + with open(get_data_loc(sbatch_header_template), 'r') as f: + sbatch_header = f.read() + + sbatch_header = self.clean_header(sbatch_header) + + header_dict = merge_dict(header_dict, self.batch_replace) + return self._update_header(sbatch_header, header_dict) + + def make_heatmaps_sbatch_header(self): + self.logger.info("heatmaps not created, creating now") + shutil.rmtree(self.output_dir, ignore_errors=True) + mkdirs(self.heatmaps_path) + + # TODO: if externally specified batchfile exists, have to parse desired logfile path from it + header_dict = { + "REPLACE_LOGFILE": self.heatmaps_log_path, + "REPLACE_WALLTIME": "12:00:00", #TODO: change to scale with # of heatmaps expected + "REPLACE_MEM": self.options.get("HEATMAPS_MEM", "32GB"), + } + heatmaps_sbatch_header = self.make_sbatch_header("HEATMAPS_BATCH_FILE", header_dict) + + with open(self.heatmaps_sbatch_header_path, "w+") as f: + f.write(heatmaps_sbatch_header) + + def make_model_sbatch_script(self): + header_dict = { + "REPLACE_NAME": self.job_base_name, + "REPLACE_LOGFILE": str(Path(self.output_dir) / "output.log"), + "REPLACE_MEM": self.options.get("MODEL_MEM", "64GB"), + "REPLACE_WALLTIME": "4:00:00" if self.gpu else "12:00:00", # 4h is max for gpu + } + model_sbatch_header = self.make_sbatch_header("MODEL_BATCH_FILE", header_dict, use_gpu=self.gpu) + + setup_dict = { + "init_env": self.init_env, + "path_to_classifier": self.path_to_classifier, + "heatmaps_path": self.heatmaps_path, + "config_path": self.config_path, + "done_file": self.done_file, + } + + format_dict = { + "sbatch_header": model_sbatch_header, + "task_setup": self.update_setup(setup_dict, self.task_setup['scone']) + } + + self.logger.info(f"Running SCONE model, slurm job written to {self.model_sbatch_job_path}") + slurm_script = self.slurm.format(**format_dict) + + with open(self.model_sbatch_job_path, "w") as f: + f.write(slurm_script) + + return slurm_script + + def classify(self, mode): + failed = False + if Path(self.done_file).exists(): + self.logger.debug(f"Found done file at {self.done_file}") + with open(self.done_file) as f: + if "SUCCESS" not in f.read().upper(): + failed = True + + heatmaps_created = self._heatmap_creation_success() and self.keep_heatmaps + + sim_deps = self.get_simulation_dependency() + sim_dirs = [sim_dep.output["photometry_dirs"][self.index] for sim_dep in sim_deps] + + lcdata_paths = [path for path in self._get_lcdata_paths(sim_dirs) if "PHOT.FITS" in path] + metadata_paths = [path.replace("PHOT.FITS", "HEAD.FITS") for path in lcdata_paths] + + str_config = self._make_config(metadata_paths, lcdata_paths, mode, heatmaps_created) + new_hash = self.get_hash_from_string(str_config) + + if self._check_regenerate(new_hash) or failed: + self.logger.debug("Regenerating") + else: + self.logger.info("Hash check passed, not rerunning") + self.should_be_done() + return True + + if not heatmaps_created: + # this deletes the whole directory tree, don't write anything before this + self.make_heatmaps_sbatch_header() + + self.save_new_hash(new_hash) + with open(self.config_path, "w+") as cfgfile: + cfgfile.write(str_config) + + slurm_script = self.make_model_sbatch_script() + + # TODO: nersc needs `module load esslurm` to sbatch gpu jobs, maybe make + # this shell command to a file so diff systems can define their own + file_to_run = 'run_legacy.py' + path = Path(self.path_to_classifier) / file_to_run + cmd = f"python {str(path)} --config_path {self.config_path}" + subprocess.run([cmd], shell=True) + self.logger.info(f"Running command: {cmd}") + + return True + + def predict(self): + return self.classify("predict") + + def train(self): + return self.classify("train") + + def _get_types(self): + types = {} + for t in self.get_simulation_dependency(): + for k, v in t.output['types'].items(): + if k not in types: + types[k] = v + return types + + def _make_config(self, metadata_paths, lcdata_paths, mode, heatmaps_created): + config = {} + + # environment configuration + config["init_env_heatmaps"] = self.init_env_heatmaps + config["init_env"] = self.init_env + + # info for heatmap creation + if not heatmaps_created: + config["sbatch_header_path"] = self.heatmaps_sbatch_header_path + + config["heatmaps_donefile"] = self.heatmaps_done_file + config["heatmaps_logfile"] = self.heatmaps_log_path + config["sim_fraction"] = self.options.get("SIM_FRACTION", 1) # 1/sim_fraction % of simulated SNe will be used for the model + config["heatmaps_path"] = self.heatmaps_path + config["model_sbatch_job_path"] = self.model_sbatch_job_path + config["num_wavelength_bins"] = self.options.get("NUM_WAVELENGTH_BINS", 32) + config["num_mjd_bins"] = self.options.get("NUM_MJD_BINS", 180) + config["metadata_paths"] = metadata_paths + config["lcdata_paths"] = lcdata_paths + + # info for classification model + config["categorical"] = self.options.get("CATEGORICAL", False) + config["num_epochs"] = self.options.get("NUM_EPOCHS", 400) # TODO: replace num epochs with autostop: stop training when slope plateaus? + config["batch_size"] = self.options.get("BATCH_SIZE", 32) # TODO: replace with percentage of total size? + config["Ia_fraction"] = self.options.get("IA_FRACTION", 0.5) + config["output_path"] = self.output_dir + config["trained_model"] = self.options.get("MODEL", None) + config["kcor_file"] = self.options.get("KCOR_FILE", None) + config["mode"] = mode + config["job_base_name"] = self.job_base_name + config["class_balanced"] = (mode == "train") + + types = self._get_types() + if types is not None: + types = {int(k): v for k, v in types.items()} # sometimes the keys are strings, sometimes ints + self.logger.info(f"input types from sim found, types set to {types}") + config["sn_type_id_to_name"] = types + + return yaml.dump(config) + + def _check_completion(self, squeue): + if Path(self.done_file).exists(): + self.logger.debug(f"Found done file at {self.done_file}") + with open(self.done_file) as f: + if "SUCCESS" not in f.read().upper(): + return Task.FINISHED_FAILURE + + pred_path = str(Path(self.output_dir) / "predictions.csv") + predictions = pd.read_csv(pred_path) + if "pred_labels" in predictions.columns: + predictions = predictions[["snid", "pred_labels"]] # make sure snid is the first col + predictions = predictions.rename(columns={"pred_labels": self.get_prob_column_name()}) + predictions.to_csv(pred_path, index=False) + self.logger.info(f"Predictions file can be found at {pred_path}") + self.output.update({"model_filename": self.options.get("MODEL", str(Path(self.output_dir) / "trained_model")), "predictions_filename": pred_path}) + return Task.FINISHED_SUCCESS + return self.check_for_job(squeue, self.job_base_name) + + def _heatmap_creation_success(self): + if not Path(self.heatmaps_done_file).exists(): + return False + with open(self.heatmaps_done_file, "r") as donefile: + if "CREATE HEATMAPS FAILURE" in donefile.read(): + return False + return Path(self.heatmaps_path).exists() and (Path(self.heatmaps_path) / "done.log").exists() + + def num_jobs_in_queue(self): + squeue = [i.strip() for i in subprocess.check_output(f"squeue -h -u $USER -o '%.200j'", shell=True, text=True).splitlines()] + self.logger.debug(f"{squeue}") + return self.check_for_job(squeue, self.job_base_name) + + @staticmethod + def _get_lcdata_paths(sim_dirs): + lcdata_paths = [str(f.resolve()) for sim_dir in sim_dirs for f in Path(sim_dir).iterdir() if "PHOT" in f.name] + return lcdata_paths + + @staticmethod + def _update_header(header, header_dict): + for key, value in header_dict.items(): + if key in header: + header = header.replace(key, str(value)) + append_list = header_dict.get("APPEND") + if append_list is not None: + lines = header.split('\n') + lines += append_list + header = '\n'.join(lines) + return header + + @staticmethod + def get_requirements(options): + return True, False diff --git a/tests/config_files/cfg_dev.yml b/tests/config_files/cfg_dev.yml index 6f4221da..99a4fe1a 100644 --- a/tests/config_files/cfg_dev.yml +++ b/tests/config_files/cfg_dev.yml @@ -26,6 +26,11 @@ DataSkimmer: conda_env: snn_gpu location: $PRODUCTS/utilities/dataskim +SCONE: + init_env_cpu: source activate scone_cpu_tf2.6 + init_env_gpu: source activate scone_gpu_tf2.6 + location: $SCONE_DIR + CosmoMC: location: $PRODUCTS/CosmoMC/v03/CosmoMC-master static_loc: cosmomc_static_chains diff --git a/tests/config_files/valid_classify_scone.yml b/tests/config_files/valid_classify_scone.yml new file mode 100644 index 00000000..cf8a4888 --- /dev/null +++ b/tests/config_files/valid_classify_scone.yml @@ -0,0 +1,48 @@ +SIM: + EXAMPLESIM: + IA_G10_DES3YR: + BASE: surveys/sdss/sims_ia/sn_ia_g10_sdss_3yr.input + II: + BASE: surveys/sdss/sims_cc/sn_ii_templates.input + Ibc: + BASE: surveys/sdss/sims_cc/sn_ibc_templates.input + GLOBAL: + NGEN_UNIT: 1 + RANSEED_REPEAT: 10 12345 + SOLID_ANGLE: 10 + +LCFIT: + D: + BASE: surveys/des/lcfit_nml/des_5yr.nml + +CLASSIFICATION: + + LEGACY_SCONE_TRAIN: + CLASSIFIER: SconeClassifier + MODE: train + OPTS: + OPTIONAL_MASK_FIT: "D" + NUM_EPOCHS: 400 + + LEGACY_SCONE_PREDICT: + CLASSIFIER: SconeClassifier + MODE: predict + OPTS: + OPTIONAL_MASK_FIT: "D" + MODEL: 'LEGACY_SCONE_TRAIN' + + SCONE_TRAIN: + CLASSIFIER: SconeClassifier + MODE: train + BASE: "/path/to/base/file" + OPTS: + OPTIONAL_MASK_FIT: "D" + NUM_EPOCHS: 400 + + SCONE_PREDICT: + CLASSIFIER: SconeClassifier + MODE: predict + BASE: "/path/to/base/file" + OPTS: + OPTIONAL_MASK_FIT: "D" + MODEL: 'SCONE_TRAIN' diff --git a/tests/test_valid_config.py b/tests/test_valid_config.py index 88033ab1..05bdaaf2 100644 --- a/tests/test_valid_config.py +++ b/tests/test_valid_config.py @@ -7,6 +7,8 @@ from pippin.snana_fit import SNANALightCurveFit from pippin.classifiers.fitprob import FitProbClassifier from pippin.classifiers.perfect import PerfectClassifier +from pippin.classifiers.scone import SconeClassifier +from pippin.classifiers.scone_legacy import SconeLegacyClassifier from pippin.aggregator import Aggregator from pippin.merge import Merger from pippin.biascor import BiasCor @@ -150,6 +152,58 @@ def test_classifier_sim_with_opt_lcfit_config_valid(): assert isinstance(deps[0], SNANASimulation) assert isinstance(deps[1], SNANALightCurveFit) +def test_classifier_scone_valid(): + manager = get_manager(yaml="tests/config_files/valid_classify_scone.yml", check=True) + tasks = manager.tasks + + # 1 Sim, 1 LCFit, 4 Scone + assert len(tasks) == 6 + assert isinstance(tasks[0], SNANASimulation) + assert isinstance(tasks[1], SNANALightCurveFit) + for task in tasks[2:]: + # isinstance => Class or Subclass + assert isinstance(task, SconeClassifier) + + tests = [ + { + 'task': tasks[2], + 'cls': SconeLegacyClassifier, + 'attr': { + 'name': 'LEGACY_SCONE_TRAIN', + 'scone_input_file': None + } + }, + { + 'task': tasks[3], + 'cls': SconeLegacyClassifier, + 'attr': { + 'name': 'LEGACY_SCONE_PREDICT', + 'scone_input_file': None + } + }, + { + 'task': tasks[4], + 'cls': SconeClassifier, + 'attr': { + 'name': 'SCONE_TRAIN', + } + }, + { + 'task': tasks[5], + 'cls': SconeClassifier, + 'attr': { + 'name': 'SCONE_PREDICT', + } + } + ] + + for test in tests: + task = test['task'] + assert type(task) is test['cls'] + for (attr, val) in test['attr'].items(): + assert hasattr(task, attr) + assert getattr(task, attr) == val + def test_agg_config_valid(): # This shouldn't raise an error