Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn committed Feb 17, 2024
1 parent 47aaee8 commit 8652833
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 25 deletions.
101 changes: 101 additions & 0 deletions include/libtorrent/aux_/disk_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ struct cached_piece_entry
pread_disk_job* clear_piece = nullptr;
};

struct compare_storage
{
bool operator()(piece_location const& lhs, storage_index_t const rhs) const
{
return lhs.torrent < rhs;
}

bool operator()(storage_index_t const lhs, piece_location const& rhs) const
{
return lhs < rhs.torrent;
}
};

struct disk_cache
{
using piece_container = mi::multi_index_container<
Expand Down Expand Up @@ -462,6 +475,8 @@ struct disk_cache
if (!piece_iter->ready_to_flush)
break;

//#error we need to avoid flushing storages that have a fence raised. Maybe the cache should replace some capacity of disk_job_fence

view.modify(piece_iter, [](cached_piece_entry& e) { e.flushing = true; });
int const num_blocks = piece_iter->blocks_in_piece;
m_flushing_blocks += num_blocks;
Expand Down Expand Up @@ -506,6 +521,7 @@ struct disk_cache
if (count < piece_iter->blocks_in_piece)
return;

// #error this is not thread safe!
if (piece_iter->piece_hash_returned)
piece_iter = view.erase(piece_iter);
else
Expand Down Expand Up @@ -658,6 +674,91 @@ struct disk_cache
}
}

void flush_storage(std::function<int(span<cached_block_entry>)> f
, storage_index_t const storage
, std::function<void(jobqueue_t, pread_disk_job*)> clear_piece_fun)
{
std::unique_lock<std::mutex> l(m_mutex);

INVARIANT_CHECK;

auto& view = m_pieces.template get<0>();
auto const [begin, end] = view.equal_range(storage, compare_storage());

for (auto piece_iter = begin; piece_iter != end; ++piece_iter)
{
#ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
INVARIANT_CHECK;
#endif

if (piece_iter->flushing)
{
++piece_iter;
continue;
}

TORRENT_ALLOCA(blocks, cached_block_entry, piece_iter->blocks_in_piece);
view.modify(piece_iter, [](cached_piece_entry& e) { e.flushing = true; });
int num_blocks = 0;
for (int blk = 0; blk < piece_iter->blocks_in_piece; ++blk)
{
auto const& cbe = piece_iter->blocks[blk];
if (cbe.write_job == nullptr) continue;
blocks[num_blocks].write_job = cbe.write_job;
++num_blocks;
}
m_flushing_blocks += num_blocks;
blocks = blocks.first(num_blocks);
// we have to release the lock while flushing, but since we set the
// "flushing" member to true, this piece is pinned to the cache
l.unlock();

int count = 0;
{
auto se = scope_end([&] {
l.lock();
view.modify(piece_iter, [&](cached_piece_entry& e) {
e.flushing = false;
e.flushed_cursor += count;
});
TORRENT_ASSERT(m_flushing_blocks >= num_blocks);
m_flushing_blocks -= num_blocks;
});
if (!blocks.empty())
count = f(blocks);
}
TORRENT_ASSERT(m_blocks >= count);
m_blocks -= count;

// make sure to only clear the job pointers for the blocks that were
// actually flushed, indicated by "count".
int clear_count = count;
for (auto& be : span<cached_block_entry>(piece_iter->blocks.get(), num_blocks))
{
if (clear_count == 0)
break;
if (!be.write_job) continue;
be.buf_holder = std::move(std::get<job::write>(be.write_job->action).buf);
be.write_job = nullptr;
--clear_count;
}
if (piece_iter->clear_piece)
{
jobqueue_t aborted;
auto& cpe = const_cast<cached_piece_entry&>(*piece_iter);
clear_piece_impl(cpe, aborted);
clear_piece_fun(std::move(aborted), std::exchange(cpe.clear_piece, nullptr));
return;
}

// #error this is not thread safe!
if (piece_iter->piece_hash_returned)
piece_iter = view.erase(piece_iter);
else
++piece_iter;
}
}

std::size_t size() const
{
std::unique_lock<std::mutex> l(m_mutex);
Expand Down
61 changes: 36 additions & 25 deletions src/pread_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
void pread_disk_io::perform_job(aux::pread_disk_job* j, jobqueue_t& completed_jobs)
{
TORRENT_ASSERT(j->next == nullptr);
TORRENT_ASSERT((j->flags & aux::disk_job::in_progress) || !j->storage);

#if DEBUG_DISK_THREAD
{
Expand Down Expand Up @@ -388,7 +387,6 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(

j->ret = ret;

TORRENT_ASSERT((j->flags & aux::disk_job::in_progress) || !j->storage);
completed_jobs.push_back(j);
}

Expand Down Expand Up @@ -487,7 +485,6 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
m_need_tick.push_back({aux::time_now() + minutes(2), j->storage});
}

TORRENT_ASSERT((j->flags & aux::disk_job::in_progress) || !j->storage);
return ret != a.buffer_size
? disk_status::fatal_disk_error : status_t{};
}
Expand Down Expand Up @@ -650,16 +647,6 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
std::uint16_t(r.length)
);

if (j->storage->is_blocked(j))
{
m_stats_counters.inc_stats_counter(counters::blocked_disk_jobs);
DLOG("blocked job: %s (torrent: %d total: %d)\n"
, print_job(*j).c_str(), j->storage ? j->storage->num_blocked() : 0
, int(m_stats_counters[counters::blocked_disk_jobs]));
return exceeded;
}

j->flags |= aux::disk_job::in_progress;
m_cache.insert({j->storage->storage_index(), r.piece}, r.start / default_block_size, j);

if (!m_flush_target)
Expand Down Expand Up @@ -694,7 +681,6 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
// immediately
if (ret == aux::disk_cache::job_completed)
{
j->flags |= aux::disk_job::in_progress;
jobqueue_t jobs;
jobs.push_back(j);
add_completed_jobs(std::move(jobs));
Expand All @@ -704,10 +690,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
// In this case the job has been queued on the piece, and will be posted
// once the hashing completes
if (ret == aux::disk_cache::job_queued)
{
j->flags |= aux::disk_job::in_progress;
return;
}

add_job(j);
}
Expand Down Expand Up @@ -876,7 +859,6 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
{
DLOG("immediate clear\n");
jobqueue_t jobs;
j->flags |= aux::disk_job::in_progress;
jobs.push_back(j);
add_completed_jobs(std::move(jobs));
}
Expand Down Expand Up @@ -991,8 +973,8 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
{
// fall back to reading everything from disk

TORRENT_ALLOCA(blocks, char const*, blocks_in_piece);
TORRENT_ALLOCA(v2_hashes, sha256_hash, blocks_in_piece);
TORRENT_ALLOCA(blocks, char const*, blocks_to_read);
TORRENT_ALLOCA(v2_hashes, sha256_hash, blocks_in_piece2);
for (char const*& b : blocks) b = nullptr;
hasher ph;
hash_partial_piece(ph, 0, blocks, v2_hashes);
Expand Down Expand Up @@ -1227,6 +1209,34 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
{
// if this assert fails, something's wrong with the fence logic
TORRENT_ASSERT(j->storage->num_outstanding_jobs() == 1);
storage_index_t const torrent = j->storage->storage_index();
jobqueue_t completed_jobs;
m_cache.flush_storage(
[&](span<aux::cached_block_entry> blocks) {
TORRENT_ASSERT(blocks.size() > 0);

int count = 0;
for (auto& be : blocks)
{
TORRENT_ASSERT(be.write_job);
++count;
perform_job(be.write_job, completed_jobs);
if (be.write_job->error)
break;
}

return count;
}
, torrent
, [&](jobqueue_t aborted_jobs, aux::pread_disk_job* clear_piece) {
m_completed_jobs.abort_jobs(m_ios, std::move(aborted_jobs));
jobqueue_t jobs;
jobs.push_back(clear_piece);
add_completed_jobs(std::move(jobs));
});
if (!completed_jobs.empty())
add_completed_jobs(std::move(completed_jobs));

j->storage->release_files(j->error);
return j->error ? disk_status::fatal_disk_error : status_t{};
}
Expand Down Expand Up @@ -1394,7 +1404,6 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
{
TORRENT_ASSERT(be.write_job);
++count;
be.write_job->flags |= aux::disk_job::in_progress;
perform_job(be.write_job, completed_jobs);
if (be.write_job->error)
break;
Expand Down Expand Up @@ -1609,17 +1618,19 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
for (auto i = jobs.iterate(); i.get(); i.next())
{
auto* j = static_cast<aux::pread_disk_job*>(i.get());
TORRENT_ASSERT((j->flags & aux::disk_job::in_progress) || !j->storage);

if (j->flags & aux::disk_job::fence)
{
m_stats_counters.inc_stats_counter(
counters::num_fenced_read + static_cast<int>(j->get_type()), -1);
}

TORRENT_ASSERT(j->storage);
if (j->storage)
ret += j->storage->job_complete(j, new_jobs);
if (j->flags & aux::disk_job::in_progress)
{
TORRENT_ASSERT(j->storage);
if (j->storage)
ret += j->storage->job_complete(j, new_jobs);
}

TORRENT_ASSERT(ret == new_jobs.size());
TORRENT_ASSERT(!(j->flags & aux::disk_job::in_progress));
Expand Down

0 comments on commit 8652833

Please sign in to comment.