Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cudf::io::datasource::create(). #17115

Open
wants to merge 1 commit into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 266 additions & 1 deletion cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <future>
#include <memory>
#include <variant>

namespace CUDF_EXPORT cudf {
//! IO interfaces
Expand All @@ -36,6 +37,212 @@ namespace io {
* @file
*/

/**
* @brief Kind of data source to create when calling
* `cudf::io::datasource::create()`.
*
* @see cudf::io::datasource::create()
* @see cudf::io::datasource_params
*
* N.B. GDS = GPUDirect Storage
*/
enum class datasource_kind {
/**
* @brief Kvikio-based data source (default).
*
* This data source is the default for cuDF, and should be the most performant
* option for most use cases. It supports GDS where possible, falling back to
* multi-threaded host-based reads when GDS is not available.
*
* It supports asynchronous reads, and will use the provided CUDA stream for
* all I/O operations when possible.
*/
KVIKIO = 0,
DEFAULT = KVIKIO,

/**
* @brief Kvikio-based data source that does not attempt to use GDS, instead
* falling back to multi-threaded host-based reads.
*
* It supports asynchronous reads, but does not do any stream synchronization,
* as the reads are all performed on the host.
*/
KVIKIO_COMPAT,

/**
* @brief Kvikio-based data source that will fail if GDS is not available.
* Specifically, `cudf::io::datasource::create()` when called with this kind
* of data source will throw a `cudf::logic_error` if GDS is not available.
*/
KVIKIO_GDS,

/**
* @brief Host-based data source that does not support any device or async
* operations.
*
* All reads are performed via standard POSIX pread() calls. No
* multi-threading or asynchronous operations are supported.
*
* The primary purpose of this datasource type is to be a base class for the
* `O_DIRECT` implementation, which needs to issue pread() calls against a
* file descriptor that *hasn't* been opened with `O_DIRECT` if certain
* constraints aren't met (specifically: when reading the final bytes of a
* file that isn't perfectly aligned to a sector-size boundary).
*
* The time required to service reads from this data source will be affected
* by the presence or absence of the desired data in the Linux page cache.
* Thus, back-to-back runs of the same file will have significantly different
* performance characteristics, depending on whether the data is in the page
* cache or not.
*
* Generally, this data source should be avoided in favor of the `KVIKIO`
* data source, which will be more performant in most cases. Thus, it can
* be used as a baseline for which improved `KVIKIO` performance can be
* empirically measured.
*/
HOST,

/**
* @brief Host-based data source that issues reads against a file descriptor
* opened with `O_DIRECT`, where possible, bypassing the Linux page cache.
*
* This data source will always result in the slowest possible read times,
* as all reads are serviced directly from the underlying device. However,
* it will be consistently slow, and that consistency can be critical when
* benchmarking or profiling changes purporting to improve performance in
* unrelated areas.
*
* Thus, the primary use case for this data source is for benchmarking and
* profiling purposes, where you want to eliminate any runtime variance in
* back-to-back runs that would be caused by the presence or absence of data
* in the host's page cache.
*
* A secondary use case for this data source is when you specifically do not
* want to pollute the host's page cache with the data being read, either
* because it won't be read again soon, or you want to remove the memory
* pressure (or small but non-trivial amount of compute overhead) that would
* otherwise be introduced by servicing I/O through the page cache. In some
* scenarios, this can yield a net performance improvement, despite a higher
* per-read latency.
*
* A real-life example of how this can manifest is when doing very large TPC-H
* or TPC-DS runs, where the data set is orders of magnitude larger than the
* available host memory, e.g. 30TB or 100TB runs on hosts with <= 4TB of RAM.
*
* For certain queries--typically read-heavy, join-heavy ones--doing `O_DIRECT`
* reads can result in a net performance improvement, as the host's page cache
* won't be polluted with data that will never be read again, and compute
* overhead associated with cache thrashing when memory is tight is
* eliminated.
*/
ODIRECT,

/**
* @brief Host-based data source that uses memory mapped files to satisfy
* read requests.
*
* Note that this can result in pathological performance problems in certain
* environments, such as when small reads are done against files residing on
* a network file system (including accelerated file systems like WekaFS).
*/
HOST_MMAP,

/**
* @brief This is a special sentinel value that is used to indicate the
* datasource is not one of the publicly available types above.
*
* N.B. You cannot create a datasource of this kind directly via create().
*/
OTHER,
};

/**
* @brief Parameters for the kvikio data source.
*/
struct kvikio_datasource_params {
/**
* @brief When set, explicitly disables any attempts at using GPUDirect
* Storage, resulting in kvikio falling back to its "compat" mode using
* multi-threaded host-based reads.
*
* Defaults to false.
*
* N.B. Compat mode will still be used if GDS isn't available, regardless
* of the value of this parameter.
*/
bool use_compat_mode{false};

/**
* @brief The threshold at which the data source will switch from using
* host-based reads to device-based (i.e. GPUDirect) reads, if GPUDirect is
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I've used GDS everywhere else--change these GPUDirect occurrences to GDS too, for consistency.

* available.
*
* This parameter should represent the read size where GDS is faster than
* a posix read() plus the overhead of a host-to-device memcpy.
*
* Defaults to 128KB.
*/
size_t device_read_threshold{128 << 10};

/**
* @brief The number of threads in the kvikio thread pool.
*
* This parameter only applies to the kvikio data source when GDS is not
* available and it is in compat mode.
*
* Defaults to 0, which defers the thread pool sizing to kvikio.
*/
uint16_t num_threads{0};

/**
* @brief The size in bytes into which I/O operations will be split.
*
* Defaults to 1MB.
*/
size_t task_size{1 << 20};
};

/**
* @brief Parameters for the `O_DIRECT` data source.
*/
struct odirect_datasource_params {
/**
* @brief The sector size, in bytes, to use for alignment when issuing
* `O_DIRECT` reads. This size dictates the alignment used for three things:
* the file offset, the buffer address, and the buffer size. It *must* be a
* multiple of the underlying device's sector size, which is typically 512
* bytes. A larger size is fine as long as it's a multiple of the device's
* sector size.
*
* Defaults to 4096.
*
* N.B. On Linux, you can determine the sector size of a device with the
* the `blockdev` command, e.g.: `sudo blockdev --getss /dev/sda`.
*/
size_t sector_size{4096};

/**
* @brief The minimum permissible sector size. All sector sizes must be a
* multiple of this value. This is hardcoded to 512 bytes as a simple means
* to catch misconfigurations. The underlying device's sector size may be
* larger, but it will certainly be a multiple of this value.
*/
static constexpr size_t min_sector_size{512};

/**
* @brief Returns true iff the sector size is a multiple of the minimum sector
* size.
*/
[[nodiscard]] bool is_valid_sector_size() const {
return ((sector_size > 0) && ((sector_size % min_sector_size) == 0));
}
};

/**
* @brief Union of parameters for different data sources.
*/
using datasource_params = std::variant<kvikio_datasource_params, odirect_datasource_params>;

/**
* @brief Interface class for providing input data to the readers.
*/
Expand Down Expand Up @@ -92,15 +299,23 @@ class datasource {
* this case, `max_size_estimate` can include padding after the byte range, to include additional
* data that may be needed for processing.
*
* @throws cudf::logic_error if the minimum size estimate is greater than the maximum size estimate
Copy link
Contributor Author

@tpn tpn Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, dodgy rebase! This should be removed.

* @throws cudf::logic_error if `KVIKIO_GDS` is specified as the desired kind of data source,
* and GDS is not available for the file.
*
* @param[in] filepath Path to the file to use
* @param[in] offset Starting byte offset from which data will be read (the default is zero)
* @param[in] max_size_estimate Upper estimate of the data range that will be read (the default is
* zero, which means the whole file after `offset`)
* @param[in] kind Optionally supplies the kind of data source to create
* @param[in] params Optionally supplies parameters for the data source
* @return Constructed datasource object
*/
static std::unique_ptr<datasource> create(std::string const& filepath,
size_t offset = 0,
size_t max_size_estimate = 0);
size_t max_size_estimate = 0,
datasource_kind kind = datasource_kind::DEFAULT,
std::optional<const datasource_params> params = std::nullopt);

/**
* @brief Creates a source from a host memory buffer.
Expand Down Expand Up @@ -291,6 +506,37 @@ class datasource {
*/
[[nodiscard]] virtual bool is_empty() const { return size() == 0; }

/**
* @brief Returns the appropriate size, in bytes, of a read request, given
* the supplied requested size and offset.
*
* The returned size is clamped to ensure it does not exceed the total size
* of the data source, once the requested size and offset are taken into
* account.
*
* @param requested_size[in] Supplies the desired size of the read request,
* in bytes.
*
* @param offset[in] Supplies the offset, in bytes, from the start of the
* data.
*
* @return The size of the read request in bytes. This will be the minimum
* of the requested size and the remaining size of the data source after the
* offset. If the offset is beyond the end of the data source, this will
* return 0.
*/
[[nodiscard]] size_t get_read_size(size_t requested_size, size_t offset) const
{
return std::min(requested_size, size() > offset ? size() - offset : 0);
}

/**
* @brief Returns the kind of data source.
*
* @return The kind of data source.
*/
[[nodiscard]] datasource_kind kind() const { return _kind; }

/**
* @brief Implementation for non owning buffer where datasource holds buffer until destruction.
*/
Expand Down Expand Up @@ -380,6 +626,25 @@ class datasource {
void const* _data_ptr;
size_t _size;
};

protected:
/**
* @brief Constructor for the datasource object.
*
* @param kind The kind of data source
*/
datasource(datasource_kind kind) : _kind(kind) {}

/**
* @brief Sets the kind of data source.
*
* @note This is intended for use by derived classes that need to change the
* kind of data source after construction.
*/
void set_datasource_kind(datasource_kind kind) { _kind = kind; }

private:
datasource_kind _kind{datasource_kind::DEFAULT};
};

/** @} */ // end of group
Expand Down
Loading
Loading