diff --git a/cpp/include/cudf/io/datasource.hpp b/cpp/include/cudf/io/datasource.hpp index 7d2cc4ad493..1f0c560b775 100644 --- a/cpp/include/cudf/io/datasource.hpp +++ b/cpp/include/cudf/io/datasource.hpp @@ -25,6 +25,7 @@ #include #include +#include namespace CUDF_EXPORT cudf { //! IO interfaces @@ -36,6 +37,212 @@ namespace io { * @file */ +/** + * @brief Kind of data source to create when calling + * `cudf::io::datasource::create()`. + * + * @see cudf::io::datasource::create() + * @see cudf::io::datasource_params + * + * N.B. GDS = GPUDirect Storage + */ +enum class datasource_kind { + /** + * @brief Kvikio-based data source (default). + * + * This data source is the default for cuDF, and should be the most performant + * option for most use cases. It supports GDS where possible, falling back to + * multi-threaded host-based reads when GDS is not available. + * + * It supports asynchronous reads, and will use the provided CUDA stream for + * all I/O operations when possible. + */ + KVIKIO = 0, + DEFAULT = KVIKIO, + + /** + * @brief Kvikio-based data source that does not attempt to use GDS, instead + * falling back to multi-threaded host-based reads. + * + * It supports asynchronous reads, but does not do any stream synchronization, + * as the reads are all performed on the host. + */ + KVIKIO_COMPAT, + + /** + * @brief Kvikio-based data source that will fail if GDS is not available. + * Specifically, `cudf::io::datasource::create()` when called with this kind + * of data source will throw a `cudf::logic_error` if GDS is not available. + */ + KVIKIO_GDS, + + /** + * @brief Host-based data source that does not support any device or async + * operations. + * + * All reads are performed via standard POSIX pread() calls. No + * multi-threading or asynchronous operations are supported. + * + * The primary purpose of this datasource type is to be a base class for the + * `O_DIRECT` implementation, which needs to issue pread() calls against a + * file descriptor that *hasn't* been opened with `O_DIRECT` if certain + * constraints aren't met (specifically: when reading the final bytes of a + * file that isn't perfectly aligned to a sector-size boundary). + * + * The time required to service reads from this data source will be affected + * by the presence or absence of the desired data in the Linux page cache. + * Thus, back-to-back runs of the same file will have significantly different + * performance characteristics, depending on whether the data is in the page + * cache or not. + * + * Generally, this data source should be avoided in favor of the `KVIKIO` + * data source, which will be more performant in most cases. Thus, it can + * be used as a baseline for which improved `KVIKIO` performance can be + * empirically measured. + */ + HOST, + + /** + * @brief Host-based data source that issues reads against a file descriptor + * opened with `O_DIRECT`, where possible, bypassing the Linux page cache. + * + * This data source will always result in the slowest possible read times, + * as all reads are serviced directly from the underlying device. However, + * it will be consistently slow, and that consistency can be critical when + * benchmarking or profiling changes purporting to improve performance in + * unrelated areas. + * + * Thus, the primary use case for this data source is for benchmarking and + * profiling purposes, where you want to eliminate any runtime variance in + * back-to-back runs that would be caused by the presence or absence of data + * in the host's page cache. + * + * A secondary use case for this data source is when you specifically do not + * want to pollute the host's page cache with the data being read, either + * because it won't be read again soon, or you want to remove the memory + * pressure (or small but non-trivial amount of compute overhead) that would + * otherwise be introduced by servicing I/O through the page cache. In some + * scenarios, this can yield a net performance improvement, despite a higher + * per-read latency. + * + * A real-life example of how this can manifest is when doing very large TPC-H + * or TPC-DS runs, where the data set is orders of magnitude larger than the + * available host memory, e.g. 30TB or 100TB runs on hosts with <= 4TB of RAM. + * + * For certain queries--typically read-heavy, join-heavy ones--doing `O_DIRECT` + * reads can result in a net performance improvement, as the host's page cache + * won't be polluted with data that will never be read again, and compute + * overhead associated with cache thrashing when memory is tight is + * eliminated. + */ + ODIRECT, + + /** + * @brief Host-based data source that uses memory mapped files to satisfy + * read requests. + * + * Note that this can result in pathological performance problems in certain + * environments, such as when small reads are done against files residing on + * a network file system (including accelerated file systems like WekaFS). + */ + HOST_MMAP, + + /** + * @brief This is a special sentinel value that is used to indicate the + * datasource is not one of the publicly available types above. + * + * N.B. You cannot create a datasource of this kind directly via create(). + */ + OTHER, +}; + +/** + * @brief Parameters for the kvikio data source. + */ +struct kvikio_datasource_params { + /** + * @brief When set, explicitly disables any attempts at using GPUDirect + * Storage, resulting in kvikio falling back to its "compat" mode using + * multi-threaded host-based reads. + * + * Defaults to false. + * + * N.B. Compat mode will still be used if GDS isn't available, regardless + * of the value of this parameter. + */ + bool use_compat_mode{false}; + + /** + * @brief The threshold at which the data source will switch from using + * host-based reads to device-based (i.e. GPUDirect) reads, if GPUDirect is + * available. + * + * This parameter should represent the read size where GDS is faster than + * a posix read() plus the overhead of a host-to-device memcpy. + * + * Defaults to 128KB. + */ + size_t device_read_threshold{128 << 10}; + + /** + * @brief The number of threads in the kvikio thread pool. + * + * This parameter only applies to the kvikio data source when GDS is not + * available and it is in compat mode. + * + * Defaults to 0, which defers the thread pool sizing to kvikio. + */ + uint16_t num_threads{0}; + + /** + * @brief The size in bytes into which I/O operations will be split. + * + * Defaults to 1MB. + */ + size_t task_size{1 << 20}; +}; + +/** + * @brief Parameters for the `O_DIRECT` data source. + */ +struct odirect_datasource_params { + /** + * @brief The sector size, in bytes, to use for alignment when issuing + * `O_DIRECT` reads. This size dictates the alignment used for three things: + * the file offset, the buffer address, and the buffer size. It *must* be a + * multiple of the underlying device's sector size, which is typically 512 + * bytes. A larger size is fine as long as it's a multiple of the device's + * sector size. + * + * Defaults to 4096. + * + * N.B. On Linux, you can determine the sector size of a device with the + * the `blockdev` command, e.g.: `sudo blockdev --getss /dev/sda`. + */ + size_t sector_size{4096}; + + /** + * @brief The minimum permissible sector size. All sector sizes must be a + * multiple of this value. This is hardcoded to 512 bytes as a simple means + * to catch misconfigurations. The underlying device's sector size may be + * larger, but it will certainly be a multiple of this value. + */ + static constexpr size_t min_sector_size{512}; + + /** + * @brief Returns true iff the sector size is a multiple of the minimum sector + * size. + */ + [[nodiscard]] bool is_valid_sector_size() const { + return ((sector_size > 0) && ((sector_size % min_sector_size) == 0)); + } +}; + +/** + * @brief Union of parameters for different data sources. + */ +using datasource_params = std::variant; + /** * @brief Interface class for providing input data to the readers. */ @@ -92,15 +299,23 @@ class datasource { * this case, `max_size_estimate` can include padding after the byte range, to include additional * data that may be needed for processing. * + * @throws cudf::logic_error if the minimum size estimate is greater than the maximum size estimate + * @throws cudf::logic_error if `KVIKIO_GDS` is specified as the desired kind of data source, + * and GDS is not available for the file. + * * @param[in] filepath Path to the file to use * @param[in] offset Starting byte offset from which data will be read (the default is zero) * @param[in] max_size_estimate Upper estimate of the data range that will be read (the default is * zero, which means the whole file after `offset`) + * @param[in] kind Optionally supplies the kind of data source to create + * @param[in] params Optionally supplies parameters for the data source * @return Constructed datasource object */ static std::unique_ptr create(std::string const& filepath, size_t offset = 0, - size_t max_size_estimate = 0); + size_t max_size_estimate = 0, + datasource_kind kind = datasource_kind::DEFAULT, + std::optional params = std::nullopt); /** * @brief Creates a source from a host memory buffer. @@ -291,6 +506,37 @@ class datasource { */ [[nodiscard]] virtual bool is_empty() const { return size() == 0; } + /** + * @brief Returns the appropriate size, in bytes, of a read request, given + * the supplied requested size and offset. + * + * The returned size is clamped to ensure it does not exceed the total size + * of the data source, once the requested size and offset are taken into + * account. + * + * @param requested_size[in] Supplies the desired size of the read request, + * in bytes. + * + * @param offset[in] Supplies the offset, in bytes, from the start of the + * data. + * + * @return The size of the read request in bytes. This will be the minimum + * of the requested size and the remaining size of the data source after the + * offset. If the offset is beyond the end of the data source, this will + * return 0. + */ + [[nodiscard]] size_t get_read_size(size_t requested_size, size_t offset) const + { + return std::min(requested_size, size() > offset ? size() - offset : 0); + } + + /** + * @brief Returns the kind of data source. + * + * @return The kind of data source. + */ + [[nodiscard]] datasource_kind kind() const { return _kind; } + /** * @brief Implementation for non owning buffer where datasource holds buffer until destruction. */ @@ -380,6 +626,25 @@ class datasource { void const* _data_ptr; size_t _size; }; + + protected: + /** + * @brief Constructor for the datasource object. + * + * @param kind The kind of data source + */ + datasource(datasource_kind kind) : _kind(kind) {} + + /** + * @brief Sets the kind of data source. + * + * @note This is intended for use by derived classes that need to change the + * kind of data source after construction. + */ + void set_datasource_kind(datasource_kind kind) { _kind = kind; } + + private: + datasource_kind _kind{datasource_kind::DEFAULT}; }; /** @} */ // end of group diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 2daaecadca6..6b3aeba898a 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -17,6 +17,7 @@ #include "file_io_utilities.hpp" #include "getenv_or.hpp" +#include #include #include #include @@ -30,6 +31,7 @@ #include #include +#include #include #include @@ -39,12 +41,492 @@ namespace cudf { namespace io { namespace { +/** + * @brief Helper routine for determining if a given address is aligned to the + * specified alignment. + + * @param ptr Supplies the address to check. + * @param alignment Supplies the alignment to check against. + * + * @return True iff ptr is aligned to alignment, false otherwise. + */ +static inline bool is_aligned(void const* ptr, std::uintptr_t alignment) +{ + // N.B. Stolen from io/comp/nvcomp_adapter.cpp. + return (reinterpret_cast(ptr) % alignment) == 0; +} + +/** + * @brief Helper class to encapsulate an aligned host buffer allocated via + * posix_memalign(). + */ +class aligned_buffer : public datasource::buffer { + public: + /** + * @brief Construct an aligned buffer of the specified size and alignment. + * + * @param size Supplies the size of the buffer to allocate. + * @param alignment Supplies the desired address alignment of the underlying + * buffer. + */ + aligned_buffer(size_t size, std::uintptr_t alignment) : _size(size), _alignment(alignment) + { + if (posix_memalign(reinterpret_cast(&_data), static_cast(alignment), size) != + 0) { + CUDF_LOG_ERROR("posix_memalign(size={}, alignment={}) failed: {} ({})", + size, + alignment, + errno, + strerror(errno)); + CUDF_FAIL("Failed to allocate aligned buffer"); + } + } + + ~aligned_buffer() override + { + if (_data != nullptr) { + free(_data); + _data = nullptr; + } + } + + constexpr aligned_buffer() noexcept = default; + + /** + * @brief Move constructor. + * + * @param other Supplies the aligned buffer to move from. + */ + aligned_buffer(aligned_buffer&& other) noexcept + : _data(other._data), _size(other._size), _alignment(other._alignment) + { + other._data = nullptr; + other._size = 0; + other._alignment = 0; + } + + /** + * @brief Swap the contents of this aligned buffer with another. + * + * @param other Supplies the other aligned buffer with which to swap contents. + */ + void swap(aligned_buffer& other) noexcept + { + std::swap(_data, other._data); + std::swap(_size, other._size); + std::swap(_alignment, other._alignment); + } + + aligned_buffer& operator=(aligned_buffer&& other) noexcept + { + if (this != &other) { + // Use a temporary to ensure we don't leave this object in an + // inconsistent state if the move assignment fails. + aligned_buffer tmp(std::move(other)); + swap(tmp); + } + return *this; + } + + // Delete copy constructor and assignment operator. + aligned_buffer(aligned_buffer const&) = delete; + aligned_buffer& operator=(aligned_buffer const&) = delete; + + // Base class overrides + [[nodiscard]] size_t size() const override { return _size; } + [[nodiscard]] uint8_t const* data() const override { return _data; } + + // Additional methods + [[nodiscard]] uint8_t* mutable_data() { return _data; } + [[nodiscard]] std::uintptr_t alignment() const { return _alignment; } + + private: + uint8_t* _data{nullptr}; ///< Pointer to the aligned buffer + size_t _size{0}; ///< Size of the aligned buffer + size_t _alignment{0}; ///< Alignment of the buffer +}; + +/** + * @brief Helper function to safely check the ssize_t return value from read() + * against a size_t read_size. + * + * @param bytes_read Supplies the return value from a read(). Negative values + * are assumed to indicate an error. + * + * @param read_size Supplies the expected number of bytes to have been read. + * + * @return True iff bytes_read is non-negative and equal to read_size, false + * otherwise. + */ +static inline bool check_read(ssize_t bytes_read, size_t read_size) +{ + return (bytes_read >= 0) && (static_cast(bytes_read) == read_size); +} + +/** + * @brief Helper macro for wrapping a check_read() call in a CUDF_EXPECTS(). + * + * @param bytes_read Supplies the return value from a read(). + * + * @param read_size Supplies the expected number of bytes to have been read. + */ +#define CUDF_EXPECTS_READ_SUCCESS(bytes_read, read_size) \ + CUDF_EXPECTS(check_read(bytes_read, read_size), "read failed") + +/** + * @brief Host-based data source that issues standard POSIX file I/O calls. + */ +class host_source : public datasource { + public: + host_source(std::string const& filepath) : datasource(datasource_kind::HOST), _filepath(filepath) + { + // Open the file, then obtain its size by way of fstat(). + _fd = open(filepath.c_str(), O_RDONLY); + if (_fd < 0) { + CUDF_LOG_ERROR("Cannot open file {}: {}: {}", filepath, errno, strerror(errno)); + CUDF_FAIL("Cannot open file"); + } + + // File descriptor is valid; now obtain the file size. + struct stat statbuf; + if (fstat(_fd, &statbuf) < 0) { + CUDF_LOG_ERROR("Cannot stat file {}: {}: {}", filepath, errno, strerror(errno)); + CUDF_FAIL("Cannot stat file"); + } + _size = statbuf.st_size; + } + + ~host_source() override + { + if (_fd >= 0) { + if (::close(_fd) < 0) { + CUDF_LOG_ERROR("Cannot close file {}: {}: {}", _filepath, errno, strerror(errno)); + } + } + _fd = -1; + } + + [[nodiscard]] std::unique_ptr host_read(size_t offset, size_t size) + { + // Clamp length to available data + auto const read_size = get_read_size(size, offset); + + std::vector v(read_size); + auto const bytes_read = host_read(_fd, offset, read_size, v.data()); + CUDF_EXPECTS_READ_SUCCESS(bytes_read, read_size); + return buffer::create(std::move(v)); + } + + [[nodiscard]] size_t host_read(size_t offset, size_t size, uint8_t* dst) + { + // Clamp length to available data + auto const read_size = get_read_size(size, offset); + auto const bytes_read = host_read(_fd, offset, read_size, dst); + CUDF_EXPECTS_READ_SUCCESS(bytes_read, read_size); + return read_size; + } + + [[nodiscard]] size_t size() const override { return _size; } + + protected: + [[nodiscard]] const std::string& filepath() const { return _filepath; } + + /** + * @brief Reads a range of bytes from a file descriptor into the supplied + * buffer. + * + * @param fd Supplies the file descriptor from which to read. + * @param offset Supplies the offset within the file to begin reading. + * @param read_size Supplies the number of bytes to read. This should be the + * clamped read size value obtained from an earlier call to get_read_size(). + * @param dst Supplies the buffer into which to read the data. This buffer + * should be at least read_size bytes in length. + * + * @return The number of bytes read on success. An exception is thrown on + * error. + */ + [[nodiscard]] size_t host_read(int fd, size_t offset, size_t read_size, uint8_t* dst) + { + ssize_t bytes_remaining = read_size; + size_t current_offset = offset; + auto buf = reinterpret_cast(dst); + ssize_t bytes_read; + size_t total_bytes_read = 0; + + while (bytes_remaining > 0) { + // Retry the pread() if interrupted by a signal. + do { + bytes_read = pread(fd, buf, bytes_remaining, current_offset); + } while (bytes_read < 0 && errno == EINTR); + + if (bytes_read == 0) { + // We're at EOF; we should never hit this because get_read_size() clamps + // our size to the underlying datasource size, meaning we'll never issue + // a read past EOF. + CUDF_LOG_ERROR( + "Encountered unexpected EOF reading {} byte{} at offset {} " + "from {}: {}, {}", + bytes_remaining, + (bytes_remaining == 1) ? "" : "s", + current_offset, + filepath(), + errno, + strerror(errno)); + CUDF_FAIL("Unexpected EOF reading file"); + } + + if (bytes_read < 0) { + CUDF_LOG_ERROR("Failed to read {} byte{} at offset {} from file {}: {}, {}", + bytes_remaining, + (bytes_remaining == 1) ? "" : "s", + current_offset, + filepath(), + errno, + strerror(errno)); + CUDF_FAIL("Cannot read from file"); + } + + // Update the buffer pointer, counters, offsets, and remaining byte count. + total_bytes_read += static_cast(bytes_read); + bytes_remaining -= bytes_read; + current_offset += bytes_read; + buf += bytes_read; + + // Invariant check: bytes_remaining should always be non-negative. + CUDF_EXPECTS(bytes_remaining >= 0, "Invariant check failed: bytes_remaining >= 0"); + } + + return total_bytes_read; + } + + private: + std::string _filepath; ///< The path to the file + int _fd{-1}; ///< File descriptor + size_t _size{0}; ///< Size of the file, in bytes. +}; + +/** + * @brief O_DIRECT-based data source derived from host_source. + */ +class odirect_source : public host_source { + public: + odirect_source(std::string const& filepath, odirect_datasource_params const& params) + : host_source(filepath), _params(params) + { + // Verify the caller provided something sane for the sector size. + if (!params.is_valid_sector_size()) { + CUDF_LOG_ERROR("Invalid sector size: {}", params.sector_size); + CUDF_FAIL("Invalid sector size"); + } + _sector_size = _params.sector_size; + + set_datasource_kind(datasource_kind::ODIRECT); + + // Open the file with O_DIRECT. + _fd_o_direct = open(filepath.c_str(), O_RDONLY | O_DIRECT); + if (_fd_o_direct < 0) { + CUDF_LOG_ERROR("Cannot open file {}: {}: {}", filepath, errno, strerror(errno)); + CUDF_FAIL("Cannot open file"); + } + } + + ~odirect_source() override + { + if (_fd_o_direct >= 0) { + if (::close(_fd_o_direct) < 0) { + CUDF_LOG_ERROR("Cannot close file {}: {}: {}", filepath(), errno, strerror(errno)); + } + } + _fd_o_direct = -1; + } + + [[nodiscard]] size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + // Clamp length to available data + auto const read_size = get_read_size(size, offset); + + bool use_o_direct = false; + + // In order to read from a file descriptor opened with O_DIRECT, the + // following three elements must all be aligned to the sector size: + // + // 1. The offset at which to start reading. + // 2. The number of bytes to read. + // 3. The buffer into which the data is read. + // + // If all three conditions are met, we can use O_DIRECT to read the data. + // As a caller will rarely pass us offsets and sizes that are perfectly + // aligned to the sector size, we typically have to massage the read + // parameters first prior to issuing the read. + // + // The exception to this rule is when the caller has requested a read + // of the final bytes of the file, such that an aligned-up sector size + // read would exceed the file size. In this case, we fall back to a + // normal pread() call against a non-O_DIRECT file descriptor (by way + // of simply deferring to the `host_source` base class's `host_read()`). + + // Calculate the sector-aligned sizes for offset and read size. We round + // down for offset, which means we need to track the bytes to skip at the + // beginning of the read buffer. + size_t aligned_offset = util::round_down_safe(offset, _sector_size); + size_t bytes_to_skip = offset - aligned_offset; + + // For the read size, we add an additional sector size to the read size, + // and then round that value up to the nearest sector size. This is done + // to ensure that we always read enough data to cover the requested read + // size. As we're adding an extra sector size and rounding up, we need + // to track the bytes to ignore at the end of the read buffer. + size_t aligned_read_size = util::round_up_safe(read_size + _sector_size, _sector_size); + size_t bytes_to_ignore = aligned_read_size - read_size; + + // We can use O_DIRECT as long as the final aligned read size from the + // aligned offset does not exceed the file size. + if ((aligned_offset + aligned_read_size) <= this->size()) { use_o_direct = true; } + + if (!use_o_direct) { + // We can't use O_DIRECT for this read, so we fall back to a normal + // pread() call against a non-O_DIRECT file descriptor. Note that we + // use the original offset and read size, not the aligned values. + return host_source::host_read(offset, read_size, dst); + } + + // If we get here, we're going to use O_DIRECT for the read, which means + // the buffer we read into needs to be sector-aligned. If the caller has + // already provided a sector-aligned buffer, as well as sector-aligned + // offsets and read size (i.e. bytes_to_skip and bytes_to_ignore are zero), + // we can use the caller's buffer as-is. + uint8_t* buf; + aligned_buffer aligned_buf; + const bool use_caller_buffer = + ((bytes_to_skip == 0) && (bytes_to_ignore == 0) && is_aligned(dst, _sector_size)); + + if (use_caller_buffer) { + buf = dst; + } else { + // Allocate an aligned buffer to read into. + aligned_buf = aligned_buffer(aligned_read_size, _sector_size); + buf = aligned_buf.mutable_data(); + } + + // We can now issue the read against our O_DIRECT file descriptor using + // the base class's `host_read()` implementation. + auto const total_bytes_read = + host_source::host_read(_fd_o_direct, aligned_offset, aligned_read_size, buf); + + // We can't do the usual `CUDF_EXPECTS_READ_SUCCESS(bytes_read, read_size)` + // post-read check here as we probably read more data than originally + // requested, due to the sector alignment. Determine the actual bytes + // read by subtracting the bytes to skip and ignore from the total bytes + // read. + size_t actual_bytes_read = total_bytes_read - bytes_to_skip - bytes_to_ignore; + CUDF_EXPECTS_READ_SUCCESS(actual_bytes_read, read_size); + + // Fast-path exit for the case where the caller provided sector-aligned + // values for everything. + if (use_caller_buffer) { return actual_bytes_read; } + + // Invariant check: the number of readable bytes left in the aligned + // buffer after accounting for the bytes to skip should be equal to or + // greater than the read size (which should be the allocated size of the + // caller's buffer). + auto const remaining_bytes = aligned_read_size - bytes_to_skip; + if (remaining_bytes < read_size) { + CUDF_LOG_ERROR("Invariant check failed: remaining_bytes ({}) >= read_size ({})", + remaining_bytes, + read_size); + CUDF_FAIL("Invariant check failed: remaining_bytes >= read_size"); + } + + // We can now safely copy the requested data from the aligned buffer to + // the caller's buffer, skipping the bytes at the beginning and ignoring + // the bytes at the end (by way of the read size possibly being less than + // the aligned buffer size). + std::memcpy(dst, buf + bytes_to_skip, read_size); + + // Finally, return the actual bytes we read back to the caller. + return actual_bytes_read; + } + + private: + odirect_datasource_params _params; ///< O_DIRECT parameters + int _fd_o_direct{-1}; ///< O_DIRECT file descriptor + size_t _sector_size; ///< Sector size for O_DIRECT I/O +}; + +/** + * @brief Kvikio-based datasource. + */ +class kvikio_source : public host_source { + public: + kvikio_source(std::string const& filepath, kvikio_datasource_params const& params) + : host_source(filepath), _params(params), _kvikio_file(filepath) + { + datasource_kind kind = + (params.use_compat_mode) ? datasource_kind::KVIKIO_COMPAT : datasource_kind::KVIKIO_GDS; + set_datasource_kind(kind); + } + + [[nodiscard]] size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto const read_size = get_read_size(size, offset); + + auto future = + _kvikio_file.pread(dst, read_size, offset, _params.task_size, _params.device_read_threshold); + return future.get(); + } + + [[nodiscard]] std::unique_ptr host_read(size_t offset, size_t size) override + { + auto const read_size = get_read_size(size, offset); + + std::vector v(read_size); + auto future = _kvikio_file.pread( + v.data(), read_size, offset, _params.task_size, _params.device_read_threshold); + future.get(); + return buffer::create(std::move(v)); + } + + [[nodiscard]] bool supports_device_read() const override { return true; } + + [[nodiscard]] bool is_device_read_preferred(size_t size) const override + { + return size >= _params.device_read_threshold; + } + + [[nodiscard]] std::unique_ptr device_read( + size_t offset, size_t size, rmm::cuda_stream_view stream) override + { + auto const read_size = get_read_size(size, offset); + rmm::device_buffer out_data(read_size, stream); + auto dst = reinterpret_cast(out_data.data()); + size_t bytes_read = device_read_async(offset, size, dst, stream).get(); + out_data.resize(bytes_read, stream); + return datasource::buffer::create(std::move(out_data)); + } + + [[nodiscard]] std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override + { + return _kvikio_file.pread(dst, size, offset, _params.task_size, _params.device_read_threshold); + } + + [[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); } + + private: + std::string _filepath; ///< The path to the file + kvikio_datasource_params _params; ///< Kvikio parameters + kvikio::FileHandle _kvikio_file; ///< Kvikio file handle +}; + /** * @brief Base class for file input. Only implements direct device reads. */ class file_source : public datasource { public: - explicit file_source(char const* filepath) : _file(filepath, O_RDONLY) + explicit file_source(char const* filepath) + : datasource(datasource_kind::KVIKIO), _file(filepath, O_RDONLY) { detail::force_init_cuda_context(); if (cufile_integration::is_kvikio_enabled()) { @@ -61,10 +543,11 @@ class file_source : public datasource { lseek(_file.desc(), offset, SEEK_SET); // Clamp length to available data - ssize_t const read_size = std::min(size, _file.size() - offset); + auto const read_size = get_read_size(size, offset); std::vector v(read_size); - CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed"); + auto const bytes_read = read(_file.desc(), v.data(), read_size); + CUDF_EXPECTS_READ_SUCCESS(bytes_read, read_size); return buffer::create(std::move(v)); } @@ -73,10 +556,9 @@ class file_source : public datasource { lseek(_file.desc(), offset, SEEK_SET); // Clamp length to available data - auto const read_size = std::min(size, _file.size() - offset); - - CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast(read_size), - "read failed"); + auto const read_size = get_read_size(size, offset); + auto const bytes_read = read(_file.desc(), dst, read_size); + CUDF_EXPECTS_READ_SUCCESS(bytes_read, read_size); return read_size; } @@ -100,7 +582,7 @@ class file_source : public datasource { { CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - auto const read_size = std::min(size, _file.size() - offset); + auto const read_size = get_read_size(size, offset); if (!_kvikio_file.closed()) { return _kvikio_file.pread(dst, read_size, offset); } return _cufile_in->read_async(offset, read_size, dst, stream); } @@ -146,6 +628,7 @@ class memory_mapped_source : public file_source { explicit memory_mapped_source(char const* filepath, size_t offset, size_t max_size_estimate) : file_source(filepath) { + set_datasource_kind(datasource_kind::HOST_MMAP); if (_file.size() != 0) { // Memory mapping is not exclusive, so we can include the whole region we expect to read map(_file.desc(), offset, max_size_estimate); @@ -160,7 +643,7 @@ class memory_mapped_source : public file_source { std::unique_ptr host_read(size_t offset, size_t size) override { // Clamp length to available data - auto const read_size = std::min(size, +_file.size() - offset); + auto const read_size = get_read_size(size, offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -184,7 +667,7 @@ class memory_mapped_source : public file_source { size_t host_read(size_t offset, size_t size, uint8_t* dst) override { // Clamp length to available data - auto const read_size = std::min(size, +_file.size() - offset); + auto const read_size = get_read_size(size, offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -239,13 +722,14 @@ class memory_mapped_source : public file_source { */ class device_buffer_source final : public datasource { public: - explicit device_buffer_source(cudf::device_span d_buffer) : _d_buffer{d_buffer} + explicit device_buffer_source(cudf::device_span d_buffer) + : datasource(datasource_kind::OTHER), _d_buffer{d_buffer} { } size_t host_read(size_t offset, size_t size, uint8_t* dst) override { - auto const count = std::min(size, this->size() - offset); + auto const count = get_read_size(size, offset); auto const stream = cudf::get_default_stream(); CUDF_CUDA_TRY( cudaMemcpyAsync(dst, _d_buffer.data() + offset, count, cudaMemcpyDefault, stream.value())); @@ -255,7 +739,7 @@ class device_buffer_source final : public datasource { std::unique_ptr host_read(size_t offset, size_t size) override { - auto const count = std::min(size, this->size() - offset); + auto const count = get_read_size(size, offset); auto const stream = cudf::get_default_stream(); auto h_data = cudf::detail::make_host_vector_async( cudf::device_span{_d_buffer.data() + offset, count}, stream); @@ -270,7 +754,7 @@ class device_buffer_source final : public datasource { uint8_t* dst, rmm::cuda_stream_view stream) override { - auto const count = std::min(size, this->size() - offset); + auto const count = get_read_size(size, offset); CUDF_CUDA_TRY( cudaMemcpyAsync(dst, _d_buffer.data() + offset, count, cudaMemcpyDefault, stream.value())); return std::async(std::launch::deferred, [count] { return count; }); @@ -301,7 +785,10 @@ class device_buffer_source final : public datasource { // zero-copy host buffer source class host_buffer_source final : public datasource { public: - explicit host_buffer_source(cudf::host_span h_buffer) : _h_buffer{h_buffer} {} + explicit host_buffer_source(cudf::host_span h_buffer) + : datasource(datasource_kind::OTHER), _h_buffer{h_buffer} + { + } size_t host_read(size_t offset, size_t size, uint8_t* dst) override { @@ -334,7 +821,10 @@ class host_buffer_source final : public datasource { */ class user_datasource_wrapper : public datasource { public: - explicit user_datasource_wrapper(datasource* const source) : source(source) {} + explicit user_datasource_wrapper(datasource* const source) + : datasource(datasource_kind::OTHER), source(source) + { + } size_t host_read(size_t offset, size_t size, uint8_t* dst) override { @@ -391,7 +881,9 @@ class user_datasource_wrapper : public datasource { std::unique_ptr datasource::create(std::string const& filepath, size_t offset, - size_t max_size_estimate) + size_t max_size_estimate, + datasource_kind kind, + std::optional params) { auto const use_memory_mapping = [] { auto const policy = getenv_or("LIBCUDF_MMAP_ENABLED", std::string{"ON"}); @@ -402,11 +894,65 @@ std::unique_ptr datasource::create(std::string const& filepath, CUDF_FAIL("Invalid LIBCUDF_MMAP_ENABLED value: " + policy); }(); - if (use_memory_mapping) { - return std::make_unique(filepath.c_str(), offset, max_size_estimate); - } else { - // `file_source` reads the file directly, without memory mapping - return std::make_unique(filepath.c_str()); + if (use_memory_mapping) { kind = datasource_kind::HOST_MMAP; } + + switch (kind) { + case datasource_kind::KVIKIO: + case datasource_kind::KVIKIO_COMPAT: + case datasource_kind::KVIKIO_GDS: { + kvikio_datasource_params new_params; + if (params) { + if (auto kvikio_params = std::get_if(¶ms.value())) { + // Copy the user-provided parameters into our local variable. + new_params = *kvikio_params; + } else { + throw cudf::logic_error("Invalid parameters for KVIKIO-based datasource."); + } + } + if (kind == datasource_kind::KVIKIO_COMPAT) { + // Forcibly-set the compatibility mode to true, regardless of what may + // already be present in the params. The `kind` parameter has requested + // `KVIKIO_COMPAT`, and that takes precedence over the `use_compat_mode` + // parameter in the `kvikio_datasource_params`. + new_params.use_compat_mode = true; + } else if (kind == datasource_kind::KVIKIO_GDS) { + // GDS is unique in that we are expected to throw a cudf::runtime_error + // if GDS is not available. The first chance we have to do this is + // here, by way of fencing against CUFILE_FOUND. +#ifndef CUFILE_FOUND + CUDF_FAIL("GDS is not available because cuFile is not enabled."); +#endif + // The next check is done against the `is_gds_enabled()` function in + // `cufile_integration`. If GDS is not enabled, we balk here too. + CUDF_EXPECTS(cufile_integration::is_gds_enabled(), "cuFile reports GDS is not available."); + // Forcibly-set the compatibility mode to false, regardless of what may + // already be present in the params. The `kind` parameter has requested + // `KVIKIO_GDS`, and that takes precedence over the `use_compat_mode` + // parameter in the `kvikio_datasource_params`. + new_params.use_compat_mode = false; + } else { + CUDF_EXPECTS(kind == datasource_kind::KVIKIO, + "Invariant check failed: kind != datasource_kind::KVIKIO"); + // We don't need to do any special handling for `KVIKIO` here. + } + return std::make_unique(filepath.c_str(), new_params); + } + case datasource_kind::HOST: return std::make_unique(filepath); + case datasource_kind::ODIRECT: { + odirect_datasource_params new_params; + if (params) { + if (auto odirect_params = std::get_if(¶ms.value())) { + // Copy the user-provided parameters into our local variable. + new_params = *odirect_params; + } else { + throw cudf::logic_error("Invalid parameters for O_DIRECT-based datasource."); + } + } + return std::make_unique(filepath.c_str(), new_params); + } + case datasource_kind::HOST_MMAP: + return std::make_unique(filepath.c_str(), offset, max_size_estimate); + default: CUDF_FAIL("Unsupported datasource kind"); } }