Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve read_object speed when passing idx #35

Merged
merged 12 commits into from
Nov 9, 2023
99 changes: 89 additions & 10 deletions src/lgdo/lh5_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,22 @@ def read_object(
start_row: int = 0,
n_rows: int = sys.maxsize,
idx: np.ndarray | list | tuple | list[np.ndarray | list | tuple] = None,
use_h5idx: bool = False,
field_mask: dict[str, bool] | list[str] | tuple[str] = None,
obj_buf: LGDO = None,
obj_buf_start: int = 0,
decompress: bool = True,
) -> tuple[LGDO, int]:
"""Read LH5 object data from a file.

Use the ``idx`` parameter to read out particular rows of the data. The ``use_h5idx`` flag
controls whether *only* those rows are read from disk or if the rows are indexed after reading
the entire object. Reading individual rows can be orders of magnitude slower than reading
the whole object and then indexing the desired rows. The default behavior (``use_h5idx=False``)
is to use slightly more memory for a much faster read. See
`legend-pydataobj #29 <https://github.com/legend-exp/legend-pydataobj/issues/29>`_
for additional information.

Parameters
----------
name
Expand All @@ -192,16 +201,27 @@ def read_object(
actual number of rows read will be returned as one of the return
values (see below).
idx
For NumPy-style "fancying indexing" for the read. Used to read out
rows that pass some selection criteria. Only selection along the first
axis is supported, so tuple arguments must be one-tuples. If `n_rows`
is not false, `idx` will be truncated to `n_rows` before reading. To use
with a list of files, can pass in a list of `idx`'s (one for each
file) or use a long contiguous list (e.g. built from a previous
For NumPy-style "fancying indexing" for the read to select only some
rows, e.g. after applying some cuts to particular columns.
Only selection along the first axis is supported, so tuple arguments
must be one-tuples. If `n_rows` is not false, `idx` will be truncated to
`n_rows` before reading. To use with a list of files, can pass in a list of
`idx`'s (one for each file) or use a long contiguous list (e.g. built from a previous
identical read). If used in conjunction with `start_row` and `n_rows`,
will be sliced to obey those constraints, where `n_rows` is
interpreted as the (max) number of *selected* values (in `idx`) to be
read out.
read out. Note that the ``use_h5idx`` parameter controls some behaviour of the
read and that the default behavior (``use_h5idx=False``) prioritizes speed over
a small memory penalty.
use_h5idx
``True`` will directly pass the ``idx`` parameter to the underlying
``h5py`` call such that only the selected rows are read directly into memory,
which conserves memory at the cost of speed. There can be a significant penalty
to speed for larger files (1 - 2 orders of magnitude longer time).
``False`` (default) will read the entire object into memory before
performing the indexing. The default is much faster but requires additional memory,
though a relatively small amount in the typical use case. It is recommended to
leave this parameter as its default.
field_mask
For tables and structs, determines which fields get written out.
Only applies to immediate fields of the requested objects. If a dict
Expand All @@ -223,6 +243,7 @@ def read_object(
after reading. The option has no effect on data encoded with HDF5
built-in filters, which is always decompressed upstream by HDF5.


Returns
-------
(object, n_rows_read)
Expand All @@ -236,6 +257,14 @@ def read_object(
if not isinstance(lh5_file, (str, h5py.File)):
lh5_file = list(lh5_file)
n_rows_read = 0

# to know whether we are reading in a list of files.
# this is part of the fix for reading data by idx
# (see https://github.com/legend-exp/legend-pydataobj/issues/29)
# so that we only make a copy of the data if absolutely necessary
# or if we can read the data from file without having to make a copy
self.in_file_loop = True

for i, h5f in enumerate(lh5_file):
if isinstance(idx, list) and len(idx) > 0 and not np.isscalar(idx[0]):
# a list of lists: must be one per file
Expand All @@ -255,22 +284,32 @@ def read_object(
else:
idx_i = None
n_rows_i = n_rows - n_rows_read

# maybe someone passed in a list of len==1?
if i == (len(lh5_file) - 1):
self.in_file_loop = False

obj_buf, n_rows_read_i = self.read_object(
name,
lh5_file[i],
start_row=start_row,
n_rows=n_rows_i,
idx=idx_i,
use_h5idx=use_h5idx,
field_mask=field_mask,
obj_buf=obj_buf,
obj_buf_start=obj_buf_start,
decompress=decompress,
)

n_rows_read += n_rows_read_i
if n_rows_read >= n_rows or obj_buf is None:
return obj_buf, n_rows_read
start_row = 0
obj_buf_start += n_rows_read_i

self.in_file_loop = False

return obj_buf, n_rows_read

# get the file from the store
Expand Down Expand Up @@ -358,6 +397,7 @@ def read_object(
start_row=start_row,
n_rows=n_rows,
idx=idx,
use_h5idx=use_h5idx,
decompress=decompress,
)
# modify datatype in attrs if a field_mask was used
Expand Down Expand Up @@ -404,6 +444,7 @@ def read_object(
start_row=start_row,
n_rows=n_rows,
idx=idx,
use_h5idx=use_h5idx,
obj_buf=fld_buf,
obj_buf_start=obj_buf_start,
decompress=decompress,
Expand Down Expand Up @@ -497,6 +538,7 @@ def read_object(
start_row=start_row,
n_rows=n_rows,
idx=idx,
use_h5idx=use_h5idx,
obj_buf=None if decompress else decoded_size_buf,
obj_buf_start=0 if decompress else obj_buf_start,
)
Expand All @@ -508,6 +550,7 @@ def read_object(
start_row=start_row,
n_rows=n_rows,
idx=idx,
use_h5idx=use_h5idx,
obj_buf=None if decompress else encoded_data_buf,
obj_buf_start=0 if decompress else obj_buf_start,
)
Expand Down Expand Up @@ -573,6 +616,7 @@ def read_object(
start_row=start_row,
n_rows=n_rows,
idx=idx,
use_h5idx=use_h5idx,
obj_buf=cumulen_buf,
obj_buf_start=obj_buf_start,
)
Expand All @@ -597,6 +641,7 @@ def read_object(
start_row=start_row,
n_rows=n_rows,
idx=idx2,
use_h5idx=use_h5idx,
)
fd_starts = fd_starts.nda # we just need the nda
if fd_start is None:
Expand Down Expand Up @@ -679,6 +724,7 @@ def read_object(
start_row=fd_start,
n_rows=fd_n_rows,
idx=fd_idx,
use_h5idx=use_h5idx,
obj_buf=fd_buf,
obj_buf_start=fd_buf_start,
)
Expand Down Expand Up @@ -722,9 +768,22 @@ def read_object(
if n_rows_to_read > n_rows:
n_rows_to_read = n_rows

# if idx is passed, check if we can make it a slice instead (faster)
change_idx_to_slice = False

# prepare the selection for the read. Use idx if available
if idx is not None:
source_sel = idx
# check if idx is empty and convert to slice instead
if len(idx[0]) == 0:
source_sel = np.s_[0:0]
change_idx_to_slice = True
# check if idx is contiguous and increasing
# if so, convert it to a slice instead (faster)
elif np.all(np.diff(idx[0]) == 1):
gipert marked this conversation as resolved.
Show resolved Hide resolved
source_sel = np.s_[idx[0][0] : idx[0][-1] + 1]
change_idx_to_slice = True
else:
source_sel = idx
else:
source_sel = np.s_[start_row : start_row + n_rows_to_read]

Expand All @@ -734,14 +793,34 @@ def read_object(
if len(obj_buf) < buf_size:
obj_buf.resize(buf_size)
dest_sel = np.s_[obj_buf_start:buf_size]
h5f[name].read_direct(obj_buf.nda, source_sel, dest_sel)

# this is required to make the read of multiple files faster
# until a better solution found.
if change_idx_to_slice or idx is None or use_h5idx:
h5f[name].read_direct(obj_buf.nda, source_sel, dest_sel)
else:
# it is faster to read the whole object and then do fancy indexing
obj_buf.nda[dest_sel] = h5f[name][...][source_sel]

nda = obj_buf.nda
else:
if n_rows == 0:
tmp_shape = (0,) + h5f[name].shape[1:]
nda = np.empty(tmp_shape, h5f[name].dtype)
else:
nda = h5f[name][source_sel]
if change_idx_to_slice or idx is None or use_h5idx:
nda = h5f[name][source_sel]
else:
# it is faster to read the whole object and then do fancy indexing
nda = h5f[name][...][source_sel]

# if reading a list of files recursively, this is given to obj_buf on
# the first file read. obj_buf needs to be resized and therefore
# it needs to hold the data itself (not a view of the data).
# a view is returned by the source_sel indexing, which cannot be resized
# by ndarray.resize().
if hasattr(self, "in_file_loop") and self.in_file_loop:
nda = np.copy(nda)

# special handling for bools
# (c and Julia store as uint8 so cast to bool)
Expand Down
Loading