Skip to content

Commit

Permalink
Fix race condition in the CPU numpy reader (#4848)
Browse files Browse the repository at this point in the history
- when the last sample in the numpy reader is duplicated
  due to padding, the reader tries to read the same
  sample to the same memory, but from different clones of it
  concurrently. What is more the memory that is read can be
  allocated and deallocated concurrently. This PR removed this
  redundant and dangerous work, making sure that each sample is read
  only once.

Signed-off-by: Janusz Lisiecki <[email protected]>
  • Loading branch information
JanuszL authored and stiepan committed May 12, 2023
1 parent 17609b8 commit cdc3970
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions dali/operators/reader/numpy_reader_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,22 @@ void NumpyReaderCPU::Prefetch() {

string previous_path;
for (unsigned idx = 0; idx < curr_batch.size(); ++idx) {
auto &target = curr_batch[idx];
if (target->data.shares_data()) {
target->data.Reset();
// in case of pad_last_batch the curr_batch elements are pointing to the same object
// including the data, so there it no need to read it again or it can even lead to a race
// with allocation/deallocation of memory and concurrent read
if (idx > 0 && curr_batch[idx - 1] == curr_batch[idx]) {
break;
}
auto &target = curr_batch[idx];

// if we pad last batch but we duplicate the samples from the previous one - a case
// with multiple unequal shards where we need to create a full duplicated batch
// so we need to reopen the file and seek
// so there is no need to read again this data
if (!target->current_file) {
target->current_file = FileStream::Open(target->filename, false, false, use_o_direct_);
target->current_file->SeekRead(target->data_offset);
break;
}
if (target->data.shares_data()) {
target->data.Reset();
}
if (use_o_direct_) {
/*
Expand Down Expand Up @@ -308,19 +314,12 @@ void NumpyReaderCPU::Prefetch() {
}
target->data.ShareData(tmp_mem, target->nbytes, false, target->shape, target->type, -1);
} else {
if (idx > 0 && curr_batch[idx - 1]->filename == target->filename && target->current_file) {
// in case of pad_last_batch we can/should copy previous sample as target->current_file
// has already been used to read the data and it moved the offset. If we read it will
// return 0 - no more data to read from this file.
target->data.Copy(curr_batch[idx - 1]->data);
} else {
target->data.Resize(target->shape, target->type);
auto data_ptr = static_cast<uint8_t*>(target->data.raw_mutable_data());
Index ret = target->current_file->Read(data_ptr, target->nbytes);
DALI_ENFORCE(ret == static_cast<Index>(target->nbytes),
make_string("Failed to read file: ", target->filename,
", read: ", ret, " while it should be ", target->nbytes));
}
target->data.Resize(target->shape, target->type);
auto data_ptr = static_cast<uint8_t*>(target->data.raw_mutable_data());
Index ret = target->current_file->Read(data_ptr, target->nbytes);
DALI_ENFORCE(ret == static_cast<Index>(target->nbytes),
make_string("Failed to read file: ", target->filename,
", read: ", ret, " while it should be ", target->nbytes));
}
}
thread_pool_.RunAll();
Expand Down

0 comments on commit cdc3970

Please sign in to comment.