Skip to content

Commit

Permalink
Merge #1035: Allow UDP clients to limit peers in response
Browse files Browse the repository at this point in the history
084879e feat: [#569] numwant HTTP tracker announce param (Jose Celano)
481d413 feat: [#569] allow UDP clients to limit peers in response (Jose Celano)

Pull request description:

  The UDP tracker announce response always include all peers available up to a maxium of 74 peers, ignoring the `num_want` param in the request described in:

  https://www.bittorrent.org/beps/bep_0015.html

  This change applies that limit only when is lower than then TORRENT_PEERS_LIMIT (74).

  It also adds the `numwant` GET param to the announce request for the HTTP tracker for the same purpose.

  - [x] UDP tracker (`num_want`, positional param in the UDP packet).
  - [x] HTTP tracker (`numwant` GET param).

ACKs for top commit:
  josecelano:
    ACK 084879e

Tree-SHA512: 6e3a7a672d393852d8655c4e2732910dbf2a5beecb3f92a39b251e127e05d3a5ae068962ec1577189ed397b2529201558c971e8c050c2763d8781efd8441f540
  • Loading branch information
josecelano committed Sep 11, 2024
2 parents 1e437f7 + 084879e commit c71d88c
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 41 deletions.
188 changes: 167 additions & 21 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ pub mod torrent;

pub mod peer_tests;

use std::cmp::max;
use std::collections::HashMap;
use std::net::IpAddr;
use std::panic::Location;
Expand Down Expand Up @@ -520,6 +521,48 @@ pub struct AnnounceData {
pub policy: AnnouncePolicy,
}

/// How many peers the peer announcing wants in the announce response.
#[derive(Clone, Debug, PartialEq, Default)]
pub enum PeersWanted {
/// The peer wants as many peers as possible in the announce response.
#[default]
All,
/// The peer only wants a certain amount of peers in the announce response.
Only { amount: usize },
}

impl PeersWanted {
#[must_use]
pub fn only(limit: u32) -> Self {
let amount: usize = match limit.try_into() {
Ok(amount) => amount,
Err(_) => TORRENT_PEERS_LIMIT,
};

Self::Only { amount }
}

fn limit(&self) -> usize {
match self {
PeersWanted::All => TORRENT_PEERS_LIMIT,
PeersWanted::Only { amount } => *amount,
}
}
}

impl From<i32> for PeersWanted {
fn from(value: i32) -> Self {
if value > 0 {
match value.try_into() {
Ok(peers_wanted) => Self::Only { amount: peers_wanted },
Err(_) => Self::All,
}
} else {
Self::All
}
}
}

/// Structure that holds the data returned by the `scrape` request.
#[derive(Debug, PartialEq, Default)]
pub struct ScrapeData {
Expand Down Expand Up @@ -639,7 +682,13 @@ impl Tracker {
/// # Context: Tracker
///
/// BEP 03: [The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html).
pub fn announce(&self, info_hash: &InfoHash, peer: &mut peer::Peer, remote_client_ip: &IpAddr) -> AnnounceData {
pub fn announce(
&self,
info_hash: &InfoHash,
peer: &mut peer::Peer,
remote_client_ip: &IpAddr,
peers_wanted: &PeersWanted,
) -> AnnounceData {
// code-review: maybe instead of mutating the peer we could just return
// a tuple with the new peer and the announce data: (Peer, AnnounceData).
// It could even be a different struct: `StoredPeer` or `PublicPeer`.
Expand All @@ -661,7 +710,7 @@ impl Tracker {

let stats = self.upsert_peer_and_get_stats(info_hash, peer);

let peers = self.get_peers_for(info_hash, peer);
let peers = self.get_peers_for(info_hash, peer, peers_wanted.limit());

AnnounceData {
peers,
Expand Down Expand Up @@ -713,16 +762,21 @@ impl Tracker {
Ok(())
}

fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer) -> Vec<Arc<peer::Peer>> {
/// # Context: Tracker
///
/// Get torrent peers for a given torrent and client.
///
/// It filters out the client making the request.
fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec<Arc<peer::Peer>> {
match self.torrents.get(info_hash) {
None => vec![],
Some(entry) => entry.get_peers_for_client(&peer.peer_addr, Some(TORRENT_PEERS_LIMIT)),
Some(entry) => entry.get_peers_for_client(&peer.peer_addr, Some(max(limit, TORRENT_PEERS_LIMIT))),
}
}

/// # Context: Tracker
///
/// Get all torrent peers for a given torrent
/// Get torrent peers for a given torrent.
pub fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec<Arc<peer::Peer>> {
match self.torrents.get(info_hash) {
None => vec![],
Expand Down Expand Up @@ -1199,6 +1253,7 @@ mod tests {
use std::sync::Arc;

use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId};
use torrust_tracker_configuration::TORRENT_PEERS_LIMIT;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::DurationSinceUnixEpoch;
use torrust_tracker_test_helpers::configuration;
Expand Down Expand Up @@ -1328,7 +1383,7 @@ mod tests {
}

#[tokio::test]
async fn it_should_return_all_the_peers_for_a_given_torrent() {
async fn it_should_return_the_peers_for_a_given_torrent() {
let tracker = public_tracker();

let info_hash = sample_info_hash();
Expand All @@ -1341,20 +1396,93 @@ mod tests {
assert_eq!(peers, vec![Arc::new(peer)]);
}

/// It generates a peer id from a number where the number is the last
/// part of the peer ID. For example, for `12` it returns
/// `-qB00000000000000012`.
fn numeric_peer_id(two_digits_value: i32) -> PeerId {
// Format idx as a string with leading zeros, ensuring it has exactly 2 digits
let idx_str = format!("{two_digits_value:02}");

// Create the base part of the peer ID.
let base = b"-qB00000000000000000";

// Concatenate the base with idx bytes, ensuring the total length is 20 bytes.
let mut peer_id_bytes = [0u8; 20];
peer_id_bytes[..base.len()].copy_from_slice(base);
peer_id_bytes[base.len() - idx_str.len()..].copy_from_slice(idx_str.as_bytes());

PeerId(peer_id_bytes)
}

#[tokio::test]
async fn it_should_return_74_peers_at_the_most_for_a_given_torrent() {
let tracker = public_tracker();

let info_hash = sample_info_hash();

for idx in 1..=75 {
let peer = Peer {
peer_id: numeric_peer_id(idx),
peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, idx.try_into().unwrap())), 8080),
updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0),
uploaded: NumberOfBytes::new(0),
downloaded: NumberOfBytes::new(0),
left: NumberOfBytes::new(0), // No bytes left to download
event: AnnounceEvent::Completed,
};

tracker.upsert_peer_and_get_stats(&info_hash, &peer);
}

let peers = tracker.get_torrent_peers(&info_hash);

assert_eq!(peers.len(), 74);
}

#[tokio::test]
async fn it_should_return_all_the_peers_for_a_given_torrent_excluding_a_given_peer() {
async fn it_should_return_the_peers_for_a_given_torrent_excluding_a_given_peer() {
let tracker = public_tracker();

let info_hash = sample_info_hash();
let peer = sample_peer();

tracker.upsert_peer_and_get_stats(&info_hash, &peer);

let peers = tracker.get_peers_for(&info_hash, &peer);
let peers = tracker.get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT);

assert_eq!(peers, vec![]);
}

#[tokio::test]
async fn it_should_return_74_peers_at_the_most_for_a_given_torrent_when_it_filters_out_a_given_peer() {
let tracker = public_tracker();

let info_hash = sample_info_hash();

let excluded_peer = sample_peer();

tracker.upsert_peer_and_get_stats(&info_hash, &excluded_peer);

// Add 74 peers
for idx in 2..=75 {
let peer = Peer {
peer_id: numeric_peer_id(idx),
peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, idx.try_into().unwrap())), 8080),
updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0),
uploaded: NumberOfBytes::new(0),
downloaded: NumberOfBytes::new(0),
left: NumberOfBytes::new(0), // No bytes left to download
event: AnnounceEvent::Completed,
};

tracker.upsert_peer_and_get_stats(&info_hash, &peer);
}

let peers = tracker.get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT);

assert_eq!(peers.len(), 74);
}

#[tokio::test]
async fn it_should_return_the_torrent_metrics() {
let tracker = public_tracker();
Expand Down Expand Up @@ -1409,6 +1537,7 @@ mod tests {
use crate::core::tests::the_tracker::{
peer_ip, public_tracker, sample_info_hash, sample_peer, sample_peer_1, sample_peer_2,
};
use crate::core::PeersWanted;

mod should_assign_the_ip_to_the_peer {

Expand Down Expand Up @@ -1514,7 +1643,7 @@ mod tests {

let mut peer = sample_peer();

let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.peers, vec![]);
}
Expand All @@ -1524,10 +1653,15 @@ mod tests {
let tracker = public_tracker();

let mut previously_announced_peer = sample_peer_1();
tracker.announce(&sample_info_hash(), &mut previously_announced_peer, &peer_ip());
tracker.announce(
&sample_info_hash(),
&mut previously_announced_peer,
&peer_ip(),
&PeersWanted::All,
);

let mut peer = sample_peer_2();
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.peers, vec![Arc::new(previously_announced_peer)]);
}
Expand All @@ -1537,14 +1671,15 @@ mod tests {
use crate::core::tests::the_tracker::{
completed_peer, leecher, peer_ip, public_tracker, sample_info_hash, seeder, started_peer,
};
use crate::core::PeersWanted;

#[tokio::test]
async fn when_the_peer_is_a_seeder() {
let tracker = public_tracker();

let mut peer = seeder();

let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.stats.complete, 1);
}
Expand All @@ -1555,7 +1690,7 @@ mod tests {

let mut peer = leecher();

let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.stats.incomplete, 1);
}
Expand All @@ -1566,10 +1701,11 @@ mod tests {

// We have to announce with "started" event because peer does not count if peer was not previously known
let mut started_peer = started_peer();
tracker.announce(&sample_info_hash(), &mut started_peer, &peer_ip());
tracker.announce(&sample_info_hash(), &mut started_peer, &peer_ip(), &PeersWanted::All);

let mut completed_peer = completed_peer();
let announce_data = tracker.announce(&sample_info_hash(), &mut completed_peer, &peer_ip());
let announce_data =
tracker.announce(&sample_info_hash(), &mut completed_peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.stats.downloaded, 1);
}
Expand All @@ -1583,7 +1719,7 @@ mod tests {
use torrust_tracker_primitives::info_hash::InfoHash;

use crate::core::tests::the_tracker::{complete_peer, incomplete_peer, public_tracker};
use crate::core::{ScrapeData, SwarmMetadata};
use crate::core::{PeersWanted, ScrapeData, SwarmMetadata};

#[tokio::test]
async fn it_should_return_a_zeroed_swarm_metadata_for_the_requested_file_if_the_tracker_does_not_have_that_torrent(
Expand All @@ -1609,11 +1745,21 @@ mod tests {

// Announce a "complete" peer for the torrent
let mut complete_peer = complete_peer();
tracker.announce(&info_hash, &mut complete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 10)));
tracker.announce(
&info_hash,
&mut complete_peer,
&IpAddr::V4(Ipv4Addr::new(126, 0, 0, 10)),
&PeersWanted::All,
);

// Announce an "incomplete" peer for the torrent
let mut incomplete_peer = incomplete_peer();
tracker.announce(&info_hash, &mut incomplete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 11)));
tracker.announce(
&info_hash,
&mut incomplete_peer,
&IpAddr::V4(Ipv4Addr::new(126, 0, 0, 11)),
&PeersWanted::All,
);

// Scrape
let scrape_data = tracker.scrape(&vec![info_hash]).await;
Expand Down Expand Up @@ -1740,7 +1886,7 @@ mod tests {
use crate::core::tests::the_tracker::{
complete_peer, incomplete_peer, peer_ip, sample_info_hash, whitelisted_tracker,
};
use crate::core::ScrapeData;
use crate::core::{PeersWanted, ScrapeData};

#[test]
fn it_should_be_able_to_build_a_zeroed_scrape_data_for_a_list_of_info_hashes() {
Expand All @@ -1761,11 +1907,11 @@ mod tests {
let info_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::<InfoHash>().unwrap();

let mut peer = incomplete_peer();
tracker.announce(&info_hash, &mut peer, &peer_ip());
tracker.announce(&info_hash, &mut peer, &peer_ip(), &PeersWanted::All);

// Announce twice to force non zeroed swarm metadata
let mut peer = complete_peer();
tracker.announce(&info_hash, &mut peer, &peer_ip());
tracker.announce(&info_hash, &mut peer, &peer_ip(), &PeersWanted::All);

let scrape_data = tracker.scrape(&vec![info_hash]).await;

Expand Down
3 changes: 2 additions & 1 deletion src/servers/http/v1/extractors/announce_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod tests {

#[test]
fn it_should_extract_the_announce_request_from_the_url_query_params() {
let raw_query = "info_hash=%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0&peer_addr=2.137.87.41&downloaded=0&uploaded=0&peer_id=-qB00000000000000001&port=17548&left=0&event=completed&compact=0";
let raw_query = "info_hash=%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0&peer_addr=2.137.87.41&downloaded=0&uploaded=0&peer_id=-qB00000000000000001&port=17548&left=0&event=completed&compact=0&numwant=50";

let announce = extract_announce_from(Some(raw_query)).unwrap();

Expand All @@ -126,6 +126,7 @@ mod tests {
left: Some(NumberOfBytes::new(0)),
event: Some(Event::Completed),
compact: Some(Compact::NotAccepted),
numwant: Some(50),
}
);
}
Expand Down
9 changes: 7 additions & 2 deletions src/servers/http/v1/handlers/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use torrust_tracker_clock::clock::Time;
use torrust_tracker_primitives::peer;

use crate::core::auth::Key;
use crate::core::{AnnounceData, Tracker};
use crate::core::{AnnounceData, PeersWanted, Tracker};
use crate::servers::http::v1::extractors::announce_request::ExtractRequest;
use crate::servers::http::v1::extractors::authentication_key::Extract as ExtractKey;
use crate::servers::http::v1::extractors::client_ip_sources::Extract as ExtractClientIpSources;
Expand Down Expand Up @@ -110,8 +110,12 @@ async fn handle_announce(
};

let mut peer = peer_from_request(announce_request, &peer_ip);
let peers_wanted = match announce_request.numwant {
Some(numwant) => PeersWanted::only(numwant),
None => PeersWanted::All,
};

let announce_data = services::announce::invoke(tracker.clone(), announce_request.info_hash, &mut peer).await;
let announce_data = services::announce::invoke(tracker.clone(), announce_request.info_hash, &mut peer, &peers_wanted).await;

Ok(announce_data)
}
Expand Down Expand Up @@ -205,6 +209,7 @@ mod tests {
left: None,
event: None,
compact: None,
numwant: None,
}
}

Expand Down
Loading

0 comments on commit c71d88c

Please sign in to comment.