diff --git a/src/pygama/flow/data_loader.py b/src/pygama/flow/data_loader.py index f1034c2f3..23810874f 100644 --- a/src/pygama/flow/data_loader.py +++ b/src/pygama/flow/data_loader.py @@ -1176,6 +1176,7 @@ def explode_evt_cols(el: pd.DataFrame, tier_table: Table): table_length = len(f_entries) log.debug(f"will load new columns {field_mask}") + # print(f"will load new columns {field_mask}") # loop through each table in entry list and # loop through each level we're asked to load from @@ -1188,8 +1189,12 @@ def explode_evt_cols(el: pd.DataFrame, tier_table: Table): # loop over tiers in the level for tier in self.tiers[level]: if tb not in col_tiers[file]["tables"][tier]: + # print(f"Cannot find table {tb} in col_tiers tier {tier}") continue + # print( + # f"...for stream '{self.filedb.get_table_name(tier, tb)}' (at {level} level)" + # ) log.debug( f"...for stream '{self.filedb.get_table_name(tier, tb)}' (at {level} level)" ) @@ -1584,10 +1589,12 @@ def get_tiers_for_col( for tier in self.tiers[level]: col_tiers[file]["tables"][tier] = [] tier_col_idx = self.filedb.df.loc[file, f"{tier}_col_idx"] + # print(f"{tier}_col_idx: {tier_col_idx}") if tier_col_idx is not None: # Loop over tables for i in range(len(tier_col_idx)): col_idx = self.filedb.df.loc[file, f"{tier}_col_idx"][i] + # print(f"col_idx: {col_idx}") if col_idx in col_inds: col_tiers[file]["tables"][tier].append( self.filedb.df.loc[file, f"{tier}_tables"][i] diff --git a/src/pygama/flow/file_db.py b/src/pygama/flow/file_db.py index c64e6b786..4683737c0 100644 --- a/src/pygama/flow/file_db.py +++ b/src/pygama/flow/file_db.py @@ -149,6 +149,7 @@ def __init__(self, config: str | dict | list[str], scan: bool = True) -> None: names = list(np.unique(names)) names += [f"{tier}_file" for tier in self.tiers] # the generated file names names += [f"{tier}_size" for tier in self.tiers] # file sizes + names += ["file_status"] # bonus columns self.df = pd.DataFrame(columns=names) @@ -271,10 +272,13 @@ def scan_files(self, dirs: list[str] = None) -> None: # fill the main DataFrame self.df = pd.concat([self.df, temp_df]) - + # convert cols to numeric dtypes where possible for col in self.df.columns: self.df[col] = pd.to_numeric(self.df[col], errors="ignore") + + # Remove duplicates in the 'sortby' column + self.df.drop_duplicates(subset=[self.sortby], inplace=True, ignore_index=True) # sort rows according to timestamps utils.inplace_sort(self.df, self.sortby) @@ -358,16 +362,15 @@ def scan_tables_columns( """ log.info("getting table column names") - if self.columns is not None: - if not override: - log.warning( - "LH5 tables/columns names already set, if you want to perform the scan anyway, set override=True" - ) - return - else: - log.warning("overwriting existing LH5 tables/columns names") + if override: + log.warning("overwriting existing LH5 tables/columns names") def update_tables_cols(row, tier: str, utc_cache: dict = None) -> pd.Series: + # Skip this row if it already has values and override is false + if set([f"{tier}_tables", f"{tier}_col_idx"]).issubset(self.df.columns): + if not override and not row[[f"{tier}_tables", f"{tier}_col_idx"]].isnull().any(): + return row[[f"{tier}_tables", f"{tier}_col_idx"]] + fpath = os.path.join( self.data_dir, self.tier_dirs[tier].lstrip("/"), @@ -448,7 +451,10 @@ def update_tables_cols(row, tier: str, utc_cache: dict = None) -> pd.Series: utc_cache[this_dir] = series return series - columns = [] + if self.columns is None: + columns = [] + else: + columns = self.columns # set up a cache to provide a fast option if all files in each directory # are expected to all have the same cols