Skip to content

Commit

Permalink
filedb can update
Browse files Browse the repository at this point in the history
  • Loading branch information
gracesong312 committed Oct 3, 2023
1 parent f08dbd0 commit f5b4b6a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
7 changes: 7 additions & 0 deletions src/pygama/flow/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,7 @@ def explode_evt_cols(el: pd.DataFrame, tier_table: Table):
table_length = len(f_entries)

log.debug(f"will load new columns {field_mask}")
# print(f"will load new columns {field_mask}")

# loop through each table in entry list and
# loop through each level we're asked to load from
Expand All @@ -1188,8 +1189,12 @@ def explode_evt_cols(el: pd.DataFrame, tier_table: Table):
# loop over tiers in the level
for tier in self.tiers[level]:
if tb not in col_tiers[file]["tables"][tier]:
# print(f"Cannot find table {tb} in col_tiers tier {tier}")
continue

# print(
# f"...for stream '{self.filedb.get_table_name(tier, tb)}' (at {level} level)"
# )
log.debug(
f"...for stream '{self.filedb.get_table_name(tier, tb)}' (at {level} level)"
)
Expand Down Expand Up @@ -1584,10 +1589,12 @@ def get_tiers_for_col(
for tier in self.tiers[level]:
col_tiers[file]["tables"][tier] = []
tier_col_idx = self.filedb.df.loc[file, f"{tier}_col_idx"]
# print(f"{tier}_col_idx: {tier_col_idx}")
if tier_col_idx is not None:
# Loop over tables
for i in range(len(tier_col_idx)):
col_idx = self.filedb.df.loc[file, f"{tier}_col_idx"][i]
# print(f"col_idx: {col_idx}")
if col_idx in col_inds:
col_tiers[file]["tables"][tier].append(
self.filedb.df.loc[file, f"{tier}_tables"][i]
Expand Down
26 changes: 16 additions & 10 deletions src/pygama/flow/file_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, config: str | dict | list[str], scan: bool = True) -> None:
names = list(np.unique(names))
names += [f"{tier}_file" for tier in self.tiers] # the generated file names
names += [f"{tier}_size" for tier in self.tiers] # file sizes

names += ["file_status"] # bonus columns

self.df = pd.DataFrame(columns=names)
Expand Down Expand Up @@ -271,10 +272,13 @@ def scan_files(self, dirs: list[str] = None) -> None:

# fill the main DataFrame
self.df = pd.concat([self.df, temp_df])

# convert cols to numeric dtypes where possible
for col in self.df.columns:
self.df[col] = pd.to_numeric(self.df[col], errors="ignore")

# Remove duplicates in the 'sortby' column
self.df.drop_duplicates(subset=[self.sortby], inplace=True, ignore_index=True)

# sort rows according to timestamps
utils.inplace_sort(self.df, self.sortby)
Expand Down Expand Up @@ -358,16 +362,15 @@ def scan_tables_columns(
"""
log.info("getting table column names")

if self.columns is not None:
if not override:
log.warning(
"LH5 tables/columns names already set, if you want to perform the scan anyway, set override=True"
)
return
else:
log.warning("overwriting existing LH5 tables/columns names")
if override:
log.warning("overwriting existing LH5 tables/columns names")

def update_tables_cols(row, tier: str, utc_cache: dict = None) -> pd.Series:
# Skip this row if it already has values and override is false
if set([f"{tier}_tables", f"{tier}_col_idx"]).issubset(self.df.columns):
if not override and not row[[f"{tier}_tables", f"{tier}_col_idx"]].isnull().any():
return row[[f"{tier}_tables", f"{tier}_col_idx"]]

fpath = os.path.join(
self.data_dir,
self.tier_dirs[tier].lstrip("/"),
Expand Down Expand Up @@ -448,7 +451,10 @@ def update_tables_cols(row, tier: str, utc_cache: dict = None) -> pd.Series:
utc_cache[this_dir] = series
return series

columns = []
if self.columns is None:
columns = []
else:
columns = self.columns

# set up a cache to provide a fast option if all files in each directory
# are expected to all have the same cols
Expand Down

0 comments on commit f5b4b6a

Please sign in to comment.