Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add future on asynchronous methods that trigger alerts (should I continue?) #7558

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/libtorrent/alert_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <bitset>
#include <cstdarg> // for va_list

#include <boost/type_index.hpp>

#if TORRENT_ABI_VERSION == 1
#define PROGRESS_NOTIFICATION | alert::progress_notification
#else
Expand Down Expand Up @@ -3116,6 +3118,27 @@ TORRENT_VERSION_NAMESPACE_3_END
TORRENT_EXTRA_EXPORT char const* performance_warning_str(performance_alert::performance_warning_t i);


template<typename T>
class alert_exception : public std::exception {
public:
alert_exception(const T* a) : m_alert(a) {}

const char* what() const noexcept override {
return m_alert->message().c_str();
}

private:
const T* m_alert;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks a bit suspicious. alert allocated in the heterogeneous queue have their lifetimes tied to that queue. throwing exceptions with pointers into it seems like asking for accessing danging pointers

};

template<typename T>
class alert_dont_post_exception: public std::exception {
const char* what() const noexcept override {
return std::string(boost::typeindex::type_id<T>().pretty_name() + " ignored").c_str();
}
};


#undef TORRENT_DEFINE_ALERT_IMPL
#undef TORRENT_DEFINE_ALERT
#undef TORRENT_DEFINE_ALERT_PRIO
Expand Down
6 changes: 4 additions & 2 deletions include/libtorrent/aux_/alert_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
~alert_manager();

template <class T, typename... Args>
void emplace_alert(Args&&... args) try
const T* emplace_alert(Args&&... args) try

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable args is not used.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a parse failure in CodeQL..

{
std::unique_lock<std::recursive_mutex> lock(m_mutex);

Expand All @@ -82,13 +82,15 @@
{
// record that we dropped an alert of this type
m_dropped.set(T::alert_type);
return;
return nullptr;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need a return nullptr; on lines 100 as well

}

T& alert = queue.emplace_back<T>(
m_allocations[m_generation], std::forward<Args>(args)...);

maybe_notify(&alert);

return &alert;
}
catch (std::bad_alloc const&)
{
Expand Down
16 changes: 11 additions & 5 deletions include/libtorrent/torrent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <vector>
#include <set>
#include <list>
#include <future>
#include <deque>
#include <limits> // for numeric_limits
#include <memory> // for unique_ptr
Expand Down Expand Up @@ -485,8 +486,11 @@ namespace libtorrent {
int blocks_left;
bool fail;
error_code error;
std::shared_ptr<std::promise<const read_piece_alert*>> promise;
};
void read_piece(piece_index_t);

void read_piece(piece_index_t const piece, std::shared_ptr<std::promise<const read_piece_alert*>> promise = nullptr);

void on_disk_read_complete(disk_buffer_holder, storage_error const&
, peer_request const&, std::shared_ptr<read_piece_struct>);

Expand Down Expand Up @@ -563,7 +567,8 @@ namespace libtorrent {
bool has_error() const { return !!m_error; }
error_code error() const { return m_error; }

void flush_cache();

void flush_cache(std::shared_ptr<std::promise<const cache_flushed_alert*>> promise = nullptr);
void pause(pause_flags_t flags = {});
void resume();

Expand Down Expand Up @@ -1160,7 +1165,7 @@ namespace libtorrent {
// RESOURCE MANAGEMENT

// flags are defined in storage.hpp
void move_storage(std::string const& save_path, move_flags_t flags);
void move_storage(std::string const& save_path, move_flags_t flags, std::shared_ptr<std::promise<const storage_moved_alert*>> promise = nullptr);

// renames the file with the given index to the new name
// the name may include a directory path
Expand Down Expand Up @@ -1303,11 +1308,12 @@ namespace libtorrent {
void on_files_deleted(storage_error const& error);
void on_torrent_paused();
void on_storage_moved(status_t status, std::string const& path
, storage_error const& error);
, storage_error const& error, std::shared_ptr<std::promise<const storage_moved_alert*>> promise = nullptr);
void on_file_renamed(std::string const& filename
, file_index_t file_idx
, storage_error const& error);
void on_cache_flushed(bool manually_triggered);

void on_cache_flushed(bool manually_triggered, std::shared_ptr<std::promise<const cache_flushed_alert*>> promise = nullptr);

// this is used when a torrent is being removed.It synchronizes with the
// disk thread
Expand Down
10 changes: 10 additions & 0 deletions include/libtorrent/torrent_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <set>
#include <functional>
#include <memory>
#include <future>

#include "libtorrent/aux_/disable_warnings_push.hpp"
#if TORRENT_ABI_VERSION == 1
Expand Down Expand Up @@ -81,6 +82,10 @@ namespace aux {
struct torrent;
struct client_data_t;

struct cache_flushed_alert;
struct read_piece_alert;
struct storage_moved_alert;

#ifndef BOOST_NO_EXCEPTIONS
[[noreturn]] void throw_invalid_handle();
#endif
Expand Down Expand Up @@ -310,6 +315,7 @@ namespace aux {
// Note that if you read multiple pieces, the read operations are not
// guaranteed to finish in the same order as you initiated them.
void read_piece(piece_index_t piece) const;
std::future<const read_piece_alert*> async_read_piece(piece_index_t piece);

// Returns true if this piece has been completely downloaded and written
// to disk, and false otherwise.
Expand Down Expand Up @@ -659,6 +665,7 @@ namespace aux {
// data libtorrent had by the time you called
// ``torrent_handle::flush_cache()`` has been written to disk.
void flush_cache() const;
std::future<const cache_flushed_alert*> async_flush_cache();

// ``force_recheck`` puts the torrent back in a state where it assumes to
// have no resume data. All peers will be disconnected and the torrent
Expand Down Expand Up @@ -1329,6 +1336,9 @@ namespace aux {
, move_flags_t flags = move_flags_t::always_replace_files
) const;

std::future<const storage_moved_alert*> async_move_storage(std::string const& save_path
, move_flags_t flags = move_flags_t::always_replace_files);

#if TORRENT_ABI_VERSION == 1
// deprecated in 1.2
TORRENT_DEPRECATED
Expand Down
81 changes: 56 additions & 25 deletions src/torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ bool is_downloading_state(int const st)
m_ses.close_connection(p);
}

void torrent::read_piece(piece_index_t const piece)
void torrent::read_piece(piece_index_t const piece, std::shared_ptr<std::promise<const read_piece_alert*>> promise)
{
error_code ec;
if (m_abort || m_deleted)
Expand All @@ -761,7 +761,8 @@ bool is_downloading_state(int const st)

if (ec)
{
m_ses.alerts().emplace_alert<read_piece_alert>(get_handle(), piece, ec);
auto* a = m_ses.alerts().emplace_alert<read_piece_alert>(get_handle(), piece, ec);
if (promise) promise->set_value(a);
return;
}

Expand All @@ -775,21 +776,24 @@ bool is_downloading_state(int const st)
{
// this shouldn't actually happen
boost::shared_array<char> buf;
m_ses.alerts().emplace_alert<read_piece_alert>(
auto* a = m_ses.alerts().emplace_alert<read_piece_alert>(
get_handle(), piece, buf, 0);
if (promise) promise->set_value(a);
return;
}

std::shared_ptr<read_piece_struct> rp = std::make_shared<read_piece_struct>();
rp->piece_data.reset(new (std::nothrow) char[std::size_t(piece_size)]);
if (!rp->piece_data)
{
m_ses.alerts().emplace_alert<read_piece_alert>(
auto* a = m_ses.alerts().emplace_alert<read_piece_alert>(
get_handle(), piece, error_code(boost::system::errc::not_enough_memory, generic_category()));
if (promise) promise->set_value(a);
return;
}
rp->blocks_left = blocks_in_piece;
rp->fail = false;
rp->promise = promise;

disk_job_flags_t flags{};
auto const read_mode = settings().get_int(settings_pack::disk_io_read_mode);
Expand Down Expand Up @@ -1215,13 +1219,15 @@ bool is_downloading_state(int const st)
int size = m_torrent_file->piece_size(r.piece);
if (rp->fail)
{
m_ses.alerts().emplace_alert<read_piece_alert>(
const auto* a = m_ses.alerts().emplace_alert<read_piece_alert>(
get_handle(), r.piece, rp->error);
if (rp->promise) rp->promise->set_value(a);
}
else
{
m_ses.alerts().emplace_alert<read_piece_alert>(
const auto* a = m_ses.alerts().emplace_alert<read_piece_alert>(
get_handle(), r.piece, rp->piece_data, size);
if (rp->promise) rp->promise->set_value(a);
}
}
}
Expand Down Expand Up @@ -8397,7 +8403,7 @@ namespace {
{
// we need to keep the object alive during this operation
m_ses.disk_thread().async_release_files(m_storage
, std::bind(&torrent::on_cache_flushed, shared_from_this(), false));
, std::bind(&torrent::on_cache_flushed, shared_from_this(), false, nullptr));
m_ses.deferred_submit_jobs();
}

Expand Down Expand Up @@ -8684,26 +8690,35 @@ namespace {
m_ses.deferred_submit_jobs();
}

void torrent::move_storage(std::string const& save_path, move_flags_t const flags)
void torrent::move_storage(std::string const& save_path, move_flags_t const flags, std::shared_ptr<std::promise<const storage_moved_alert*>> promise)
{
TORRENT_ASSERT(is_single_thread());
INVARIANT_CHECK;

if (m_abort)
{
if (alerts().should_post<storage_moved_failed_alert>())
alerts().emplace_alert<storage_moved_failed_alert>(get_handle()
if (alerts().should_post<storage_moved_failed_alert>()) {
auto* a = alerts().emplace_alert<storage_moved_failed_alert>(get_handle()
, boost::asio::error::operation_aborted
, "", operation_t::unknown);

if (promise) promise->set_exception(std::make_exception_ptr(alert_exception(a)));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a is a non-owning pointer to the alert, and the life-time of the alert is tied to when the user calls pop_alerts(). This seems really risky as the future may be checked after the alert object has been deleted.

} else {
if (promise) promise->set_exception(std::make_exception_ptr(alert_dont_post_exception<storage_moved_failed_alert>()));
}
return;
}

// if we don't have metadata yet, we don't know anything about the file
// structure and we have to assume we don't have any file.
if (!valid_metadata())
{
if (alerts().should_post<storage_moved_alert>())
alerts().emplace_alert<storage_moved_alert>(get_handle(), save_path, m_save_path);
if (alerts().should_post<storage_moved_alert>()) {
auto* a = alerts().emplace_alert<storage_moved_alert>(get_handle(), save_path, m_save_path);
if (promise) promise->set_value(a);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing the pointer to the alert to the future is risky in the same way as the exception. Making sure that the client gets the future's value before calling pop_alerts() is not trivial and, I would expect, a source of errors.

} else {
if (promise) promise->set_exception(std::make_exception_ptr(alert_dont_post_exception<storage_moved_alert>()));
}
#if TORRENT_USE_UNC_PATHS
std::string path = canonicalize_path(save_path);
#else
Expand All @@ -8722,14 +8737,18 @@ namespace {
std::string path = save_path;
#endif
m_ses.disk_thread().async_move_storage(m_storage, std::move(path), flags
, std::bind(&torrent::on_storage_moved, shared_from_this(), _1, _2, _3));
, std::bind(&torrent::on_storage_moved, shared_from_this(), _1, _2, _3, promise));
m_moving_storage = true;
m_ses.deferred_submit_jobs();
}
else
{
if (alerts().should_post<storage_moved_alert>())
alerts().emplace_alert<storage_moved_alert>(get_handle(), save_path, m_save_path);
if (alerts().should_post<storage_moved_alert>()) {
auto* a = alerts().emplace_alert<storage_moved_alert>(get_handle(), save_path, m_save_path);
if (promise) promise->set_value(a);
} else {
if (promise) promise->set_exception(std::make_exception_ptr(alert_dont_post_exception<storage_moved_alert>()));
}

#if TORRENT_USE_UNC_PATHS
m_save_path = canonicalize_path(save_path);
Expand All @@ -8742,26 +8761,34 @@ namespace {
}

void torrent::on_storage_moved(status_t const status, std::string const& path
, storage_error const& error) try
, storage_error const& error, std::shared_ptr<std::promise<const storage_moved_alert*>> promise) try
{
TORRENT_ASSERT(is_single_thread());

m_moving_storage = false;
if (status == status_t::no_error
|| status == status_t::need_full_check)
{
if (alerts().should_post<storage_moved_alert>())
alerts().emplace_alert<storage_moved_alert>(get_handle(), path, m_save_path);
if (alerts().should_post<storage_moved_alert>()) {
auto* a = alerts().emplace_alert<storage_moved_alert>(get_handle(), path, m_save_path);
if (promise) promise->set_value(a);
} else {
if (promise) promise->set_exception(std::make_exception_ptr(alert_dont_post_exception<storage_moved_alert>()));
}
m_save_path = path;
set_need_save_resume(torrent_handle::if_config_changed);
if (status == status_t::need_full_check)
force_recheck();
}
else
{
if (alerts().should_post<storage_moved_failed_alert>())
alerts().emplace_alert<storage_moved_failed_alert>(get_handle(), error.ec
if (alerts().should_post<storage_moved_failed_alert>()) {
auto* a = alerts().emplace_alert<storage_moved_failed_alert>(get_handle(), error.ec
, resolve_filename(error.file()), error.operation);
if (promise) promise->set_exception(std::make_exception_ptr(alert_exception(a)));
} else {
if (promise) promise->set_exception(std::make_exception_ptr(alert_dont_post_exception<storage_moved_failed_alert>()));
}
}
}
catch (...) { handle_exception(); }
Expand Down Expand Up @@ -9462,7 +9489,7 @@ namespace {
&& !m_session_paused;
}

void torrent::flush_cache()
void torrent::flush_cache(std::shared_ptr<std::promise<const cache_flushed_alert*>> promise)
{
TORRENT_ASSERT(is_single_thread());

Expand All @@ -9473,18 +9500,22 @@ namespace {
return;
}
m_ses.disk_thread().async_release_files(m_storage
, std::bind(&torrent::on_cache_flushed, shared_from_this(), true));
, std::bind(&torrent::on_cache_flushed, shared_from_this(), true, promise));
m_ses.deferred_submit_jobs();
}

void torrent::on_cache_flushed(bool const manually_triggered) try
void torrent::on_cache_flushed(bool const manually_triggered, std::shared_ptr<std::promise<const cache_flushed_alert*>> promise) try
{
TORRENT_ASSERT(is_single_thread());

if (m_ses.is_aborted()) return;

if (manually_triggered || alerts().should_post<cache_flushed_alert>())
alerts().emplace_alert<cache_flushed_alert>(get_handle());
if (manually_triggered || alerts().should_post<cache_flushed_alert>()) {
auto* a = alerts().emplace_alert<cache_flushed_alert>(get_handle());
if (promise) promise->set_value(a);
} else {
if (promise) promise->set_exception(std::make_exception_ptr(alert_dont_post_exception<cache_flushed_alert>()));
}
}
catch (...) { handle_exception(); }

Expand Down
Loading
Loading