Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST ONLY/DON'T MERGE] Tensorflow-IO DAOS Plugin #1696

Open
wants to merge 90 commits into
base: master
Choose a base branch
from

Conversation

yongtang
Copy link
Member

No description provided.

Omar Marzouk and others added 30 commits September 6, 2021 12:44
Resolve "Skeleton for DFS Plugin"

See merge request parallel-programming/bs-daos-tensorflow-io!1
Resolve "Parse DFS Path"

See merge request parallel-programming/bs-daos-tensorflow-io!2
Resolve "DAOS Library Integration"

See merge request parallel-programming/bs-daos-tensorflow-io!3
Resolve "Plugin Skeleton"

See merge request parallel-programming/bs-daos-tensorflow-io!4
Resolve "Filesystem Operations"

See merge request parallel-programming/bs-daos-tensorflow-io!5
Resolve "Directory Operations"

See merge request parallel-programming/bs-daos-tensorflow-io!6
Resolve "File Operations"

See merge request parallel-programming/bs-daos-tensorflow-io!7
Resolve "Refactor of DFS Plugin + Class"

See merge request parallel-programming/bs-daos-tensorflow-io!9
Resolve "Writable File Ops"

See merge request parallel-programming/bs-daos-tensorflow-io!8
Resolve "Random Access File Ops"

See merge request parallel-programming/bs-daos-tensorflow-io!10
Omar Marzouk and others added 26 commits January 30, 2022 11:00
Resolve "Read-Ahead Buffering"

See merge request parallel-programming/bs-daos-tensorflow-io!17
General:
    Asserts were added and enabled after each DAOS event-related call in order
    to track down internal race conditions in the DAOS client code, see
    DAOS-10601.

    The DAOS_FILE structure documented behavior for the 'offset' field, but
    most of that behavior didn't need to be implemented.  Field 'offset' was
    removed while fixing the Append() and Tell() functions, leaving only a
    single field 'file' in DAOS_FILE, so the DAOS_FILE struct was removed as
    well.

    Typos in error messages were corrected.

File dfs_utils.cc:

In DFS::Setup():
   Code after the Connect() call replaced the detailed error status set in
   Connect() with a generic TF_NOT_FOUND error with no accompanying message.
   This cost me several days of debugging to realize that a problem was not
   some object not being found, but rather was a connection failure to an
   unhealthy container.  The TF_NOT_FOUND has been removed, allowing the more
   detailed error messages in Connect() to be reported.

In ReadBuffer::ReadBuffer()
    By setting buffer_offset to ULONG_MAX, an uninitialized buffer will never
    be matched by CacheHit(), removing the need for a separate 'initialized'
    variable.  The valid variable is no longer needed as well, more on that
    below.

In ReadBuffer::~ReadBuffer()
    daos_event_fini(() cannot be called on an event that is still in flight,
    it fails without doing anything, daos_event_test() must wait for any prior
    event to complete, otherwise the event delete that follows
    daos_event_fini() could then cause corruption of the event queue.  Call the
    reworked WaitEvent() (see below) first to ensure that daos_event_fini()
    will clean up the event before it is deleted.

In ReadBuffer::FinalizeEvent()
    The same problem exists here as in ~ReadBuffer(), daos_event_fini() can't
    be called on an event that is still in flight.  However, FinalizeEvent()
    isn't actually needed, a call to dfs_file->buffers.clear() in Cleanup()
    accomplishes the same thing using the ~ReadBuffer code, so FinalizeEvent
    was removed.

ReadBuffer::WaitEvent()
    There is a need for a WaitEvent() function in several places to wait for
    any outstanding event to complete, but this routine manipulates 'valid',
    so it can't be used anywhere else.  Removed the 'valid' code so that this
    routine can become a void and be called in multiple places.

ReadBuffer::AbortEvent()
    daos_event_abort() doesn't actually contain any logic to ask the server to
    abort an in-flight dfs_read() request.  In addition it is buggy, internal
    DAOS asserts were hit due to daos_event_abort() calls during I/O testing.
    The code was changed to instead use WaitEvent() to simply wait for a prior
    read to complete before issuing a new one, and AbortEvent() was removed.

ReadBuffer::ReadAsync()
    Both daos_event_fini() and daos_event_init() must be called on a
    daos_event_t structure before the event can be reused for another
    dfs_read() call.  These have been added.  The AbortEvent() call was
    replaced with a call to WaitEvent().  The code was changed to save the
    errno from a failed dfs_read() call in the event's ev_error field so
    that the error will be detected, and so a user cannot accidentally read
    trash data after a failed dfs_read() call.

ReadBuffer::ReadSync()
    This function is no longer used, see below.

ReadBuffer::CopyData()
    The WaitEvent() call ensures that the thread blocks until any in-flight
    read request is done.  The event->ev_error field is used to detect I/O
    failure either at the time the dfs_read() is issued or in the reply, so
    the valid flag is no longer needed.

ReadBuffer::CopyFromCache()
    The TF_RandomAccessFile read() function allows for int64_t-sized reads, so
    change the return value here to int64_t.  If an I/O error occurred, then
    return -1 so that the caller function Read() can easily tell when there
    has been an I/O error.  Provide a detailed error message so that the user
    can tell what caused the error.

File dfs_filesystem.cc:

In DFSRandomAccessFile constructor:
    Added an assert() on the event queue creation.

In Cleanup():
    Replaced FinalizeEvent() code with a dfs_file->buffers.clear() call.
    Add asserts on dfs function calls.

In df_dfs_filesystem::Read():
    The loop "for (auto& read_buf : dfs_file->buffers)" was missing a break
    statement, so CacheHit was called 256 times for each curr_offset value.
    A break was added.

    Support was added for detecting a read error and returning -1.

    Since Read() is now a while loop, there is no reason to specially use
    ReadSync() for the first buffer.  Code changed to use ReadAsync() for
    all readahead, CopyFromCache() will block until the first buffer's I/O
    is complete.  ReadSync is now unused, and is removed.

    I could not determine a reason for the WaitEvent loop:
        if (curr_offset >= dfs_file->file_size)
    because I/O requests will never be be started beyond EOF.  The loop
    was removed.

In DFSWritableFile:
   The Append() function had to make a dfs_get_size() call for each append
   to a file, adding a second round trip to the server for each append.  This
   is very expensive.  Member functions were added to cache the file size
   and update it locally as Append() operations are done..  Since the
   tensorflow API only allows one writer, local caching is allowable.  Should
   there be an I/O error, the actual size of the file becomes unknown, the
   new member functions take that into account and call dfs_get_size() in
   those situations to reestablish the correct size of the file.

In Append():
   The dfs_file->daos_file.offset field was not updated after an Append()
   operation completed successfully, so a subsequent Tell() call would return
   the size of the file before the last Append(), not after, the reported size
   was incorrect.  The code was changed to update the cached file size after
   successful Append() operations.

In RenameFile():
    Similar to the Setup() case, the detailed error statuses in Connect() were
    being hidden by a genereric TF_NOT_FOUND error.  The generic error was
    removed.

Signed-off-by: Kevan Rehm <[email protected]>
Currently if DAOS libraries are not installed on a node, the
libtensorflow_io_plugins.so will fail to load due to unsatisfied
externals, and all modular filesystems are then unusable, not
just DFS.  This PR changes the DFS plugin to dynamically load
the DAOS libraries so that the DFS filesystem is available if
DAOS is installed, but the other modular filesystems are still
available if DAOS is not installed.

The checks for the DAOS libraries and the daos_init() call are
now done at filesystem registration time, not as part of each
function call in the filesystem API.  If the libraries are not
installed then the DFS filesystem will not be not registered,
and no calls into DFS functions will ever occur.  In this case
tensorflow will just report
    "File system scheme 'dfs' not implemented"
when a "dfs://" path is used.

A number of separate functions existed each of which was only
called once as part of DFS destruction, these were combined into
the DFS destructor for simplicity.  Similar recombinations were
done to simplify DFS construction.

Signed-off-by: Kevan Rehm <[email protected]>
Global Changes:
  * The plugin was using duplicate definitions of internal DAOS client
    structures (dfs_obj_t, dfs_t, dfs_entry_t), and would create malloc'd
    copies of those structs in order to be able to access their private
    fields.  Should DAOS modify those structures in future releases, the
    plugin would break for those releases.  The dependencies on internal
    fields have been removed, the DAOS client API is now strictly followed.
  * The path_map and size_map caches used DFS mount-point-relative
    pathnames as keys.  If more than one DFS filesystem is mounted during
    the same run, then the same relative pathname could be in use in
    both filesystems.  Callers that retrieved values from the caches
    could get results for the wrong filesytem.  Code was changed to create
    path_map and size_map caches per-filesystem.
  * A number of fields (connected, daos_fs, pool, container) were
    stored in the global DFS structure which meant that any time a
    path was presented to the plugin that was in a different DFS
    filesystem than the previous path, the current filesystem would
    have to be unmounted and then the new filesystem would be mounted.
    The application could have had files open at the time in the
    filesystem that was unmounted.  The code was changed to maintain
    filesystem state relative to each pool/container combo, and so
    any number of DFS filesystems can now be mounted simultaneously.
  * None of the code in the DFS Cleanup() function was ever being called.
    This is a known tensorflow issue, see
        tensorflow/tensorflow#27535
    The workaround is to call Cleanup() via the atexit() function.
  * The RenameFile function was enhanced to delete the cached size
    of the source file and store that cached size for the destination
    file.
  * The dfsLookUp() routine required the caller to indicate whether
    or not the object to be looked up was a directory.  This was
    necessary because dfs_open() was being used to open the object,
    and that call requires a different open_mode for directories and
    files.  However, the caller does not always know the type of the
    object being looked up, e.g PathExists() and IsDir().  If the
    caller guesses wrong, then the dfs_open() call fails, either with
    EINVAL or ENOTDIR.  The caller would map these errors to ENOENT,
    which is incorrect.  Code was changed to replace the dfs_open()
    call with dfs_lookup_rel(), which removes the requirement that
    the caller know the object's type a priori, the caller can check
    the type of the object after it has been opened.
  * The dfsLookUp() routine required all callers to implement three
    different behaviors depending upon the type of object being opened.
    1. If a DFS root directory, a null dfs_obj_t would be returned,
       this would have to be special-cased by the caller.
    2. If a non-root directory, a non-null dfs_obj_t would be returned
       which the caller must never release because the dfs_obj_t is
       also an entry in the path_map cache.  Releasing the entry would
       cause future requests that use that cache entry to fail.  There
       were a few cases in the code where this was occurring.
    3. If a non-directory, a non-null dfs_obj_t would be returned
       which the caller must always release when done with it.
    The code was changed so that a DFS root directory returns a
    non-null dfs_obj_t.  Also, whenever a directory that is in the
    path_map cache is referenced, dfs_dup() is used to make a (cheap)
    copy of the dfs_obj_t to return to the caller, so that the cached
    copy is never used outside of the cache.  As a result, dfsLookUp()
    now always returns a non-null dfs_obj_t which must be released when
    no longer in use.  Another advantage of using dfs_dup() is that it
    is then safe at any moment to clear a filesystem's path_map cache,
    there is no possibility that some caller is using a cached dfs_obj_t
    at that time.
  * All relative path references in the code have been replaced with
    references to a dfs_path_t class which encapsulates everything
    known about a particular DFS path, including the filesystem in
    which the path resides.  Member functions make it easy to update
    the correct caches for the correct filesystem for each path.
    Also, there were many places throughout the code where string
    manipulation was being done, e.g. to extract a parent pathname or
    a basename.  That code has been replaced with dfs_path_t member
    functions so that the actual string manipulation only occurs in
    a single place in the plugin.
  * Setup() now initializes a dfs_path_t instead of global pool, cont,
    and rel_path variables.  It also does some minor lexical
    normalization of the rel_path member, as opposed to doing so in
    multiple places in the code downstream.
  * Code was modified in various places so that 100 of the tests in
    the tensorflow modular_filesystem_test' test suite pass.  there are
    three remaining failing tests.  One is an incorrect test, one is
    checking a function not implemented in the plugin.  The third is
    reporting failures in TranslateName() which will be handled in a
    separate PR.
  * The plugin was coded to use 'dfs://' as the filesystem prefix, but
    the DAOS client API is coded internally to use 'daos://' as the
    prefix.  The plugin was changed to use 'daos://' so that pathnames
    used by one application would not have to be munged in order to
    also work with tensorflow.

Per file changes:

dfs_utils.h:
    * The per-container class cont_info_t was added that maintains all
      per-filesystem state.
    * Class dfs_path_t was added that maintains all per-file state.
      The class knows which filesystem the file is in, e.g. to update
      the correct cache maps.
    * The global fields connected, daos_fs, pool, container, path_map,
      and size_map are removed, replaced by the per-filesystem versions.
    * Mounts are now done at the same time as connection to the container,
      filesytems remain mounted until their containers are disconnected.

dfs_filesystem.cc:
  * Many of the functions were made static so that they don't show up
    in the library's symbol table, avoiding potential conflicts with
    other plugins.
  * Changed path references to dfs_path_t references throughout.
  DFSRandomAccessFile()
    * Replaced the dpath string with the dfs_path_t as a constructor
      parameter so that the per-filesystem size cache can be updated.
  DFSWritableFile()
    * Replaced the dpath string with the dfs_path_t as a constructor
      parameter so that the per-filesystem size cache can be updated
      whenever the file is appended to.
  NewWritableFile()
    * Changed file creation mode parameter to include S_IRUSR so that
      files can be read when the filesystem is mounted via fuse.
  NewAppendableFile()
    * Changed file creation mode parameter to include S_IRUSR so that
      files can be read when the filesystem is mounted via fuse.
  PathExists()
    * Reworked the code to work with the new dfsLookUp() behavior.
      dfsPathExists() call was removed as it no longer provided anything
      not already provided by dfsLookUp().  Also, many errors returned
      by dfsPathExists() were mapped to TF_NOT_FOUND, which was
      incorrect.  Also, PathExists() can be called for either files or
      directories, but dfsPathExists internally called dfsLookUp() with
      isDirectory = false, so callers that passed in a directory path
      would get failures.
  Stat()
    * Used to call dfs_get_size(), then called dfs_ostat(), but the file
      size is available  in stbuf, so the dfs_get_size() call was
      extra overhead and was removed.
  FlushCaches()
    * Used to call ClearConnections, which unmounted any filesystem and
      disconnected from its container and pool, when there could be
      files open for read or write.  The ClearConnections call was
      removed.  Code was added to clear the size caches as well as the
      directory caches.

dfs_utils.cc
  * New functions were added for clearing individual filesystem caches
    and all filesystem caches for all mounted filesystems.
  * There was code in many places for path string manipulation, checking
    if an object was a directory, etc.  dfs_path_t member functions were
    created to replace all these so that a particular operation was only
    implemented in one spot in the code.
  DFS::~DFS()
    * The code to clear the directory cache only released the first entry,
      there was no code to iterate through the container.  Replaced with
      function calls to clear all directory and size caches.
  Unmount()
    * Now done automatically as part of disconnecting a container, a
      separate function was no longer needed.
  ParseDFSPath()
    * The code assumed that any path it was given would have both pool
      and container components, it was unable to handle malformed paths.
      Code was changed to let duns_resolve_path() validate the path
      components.  There used to be two calls to duns_resolve_path()
      because DUNS_NO_CHECK_PATH was not set, and so the first path
      would fail if the pool and container components were labels,
      duns_resolv_path() only recognizes uuids if DUNS_NO_CHECK_PATH
      is not set.  When pool and/or container labels were used, the
      duns_resolve_path() code would check against local mounted
      filesystems, and would hopefully fail.  The code then prepended
      dfs:: and tried again, which would be recognized as a
      "direct path".  Paths which only contained uuids were
      successfully parsed with the first duns_resolve_path() call.
      By using the DUNS_NO_CHECK_PATH flag and always including the
      daos:// prefix, only a single system call is needed.
  Setup()
    * Reworked to populate a dfs_path_t instead of separate pool,
      cont, and relpath variables.  A filesystem is now automatically
      mounted as part of connecting to the container, so a separate
      function was no longer needed.
  ClearConnections()
    * The code for looping through pools and containers didn't work
      properly because the subroutines erase their map entries
      internally which invalidates the iterators being used in
      ClearConnections().  Code was changed so that the iterators
      are reinitialized each time through the loop.
    * Code to disconnect all the containers in a pool was moved to
      the DisconnectPool() function, so that it is not possible to
      disconnect a pool without first disconnecting all its containers.
  dfsDeleteObject()
    * Enhanced to only clear the directory cache for the filesystem in
      which the object existed.
    * If the object was a file, the size cache entry for that file is
      deleted.  If a directory was being recursively deleted, the
      filesystem's size cache is now also cleared.
  dfsLookUp() and dfsFindParent()
    * As mentioned at the top, code was rewritten so that cached
      directory entries are never returned to a caller, instead
      a dup reference is returned so that the caller is always
      given an object reference it must release.
   dfsCreateDir()
    * Error exit statuses were enhanced in order to pass the tensorflow
      'modular_filesystem_test' test suite.
  ConnectPool()
    * Simplified somewhat as the pool id_handle_t is no longer needed.
  ConnectContainer()
    * Simplified somewhat as the cont id_handle_t is no longer needed.
    * Added code to immediately mount any container that is connected.
    * Code added to initialize all the per-filesystem state variables
      was added.
  DisconnectPool()
    * Added code to disconnect any containers before disconnecting the
      pool.
  DisconnectContainer()
    * Added code to unmount any filesystem before disconnecting its
      container.
  * Added all the dfs_path_t member function implementations.
  * Included a few new dsym references for dfs function calls that have
    been added.

Signed-off-by: Kevan Rehm <[email protected]>
Signed-off-by: Yong Tang <[email protected]>
@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@google-cla
Copy link

google-cla bot commented Jul 25, 2022

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

Signed-off-by: Yong Tang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants