Skip to content

Commit

Permalink
Peer read refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanjermakov committed Nov 9, 2023
1 parent 352cf1e commit 000cf68
Showing 1 changed file with 169 additions and 144 deletions.
313 changes: 169 additions & 144 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use crate::{
metainfo::Metainfo,
peer_metainfo::{PeerMetainfoMessage, METAINFO_PIECE_SIZE},
sha1,
state::{init_pieces, Peer, PeerInfo, PeerStatus, Piece, State, TorrentStatus, BLOCK_SIZE},
state::{
init_pieces, Block, Peer, PeerInfo, PeerStatus, Piece, State, TorrentStatus, BLOCK_SIZE,
},
torrent::write_piece,
types::ByteString,
};
Expand Down Expand Up @@ -354,84 +356,8 @@ async fn read_loop(
begin,
block,
}) => {
let status = state.lock().await.status.clone();
if status != TorrentStatus::Downloading {
debug!("not accepting pieces with status {:?}", status);
continue;
}
if begin % BLOCK_SIZE != 0 {
warn!("block begin is not a multiple of block size");
continue;
}
let block_index = begin / BLOCK_SIZE;

{
let mut state = state.lock().await;
let piece = match state.pieces.as_mut().unwrap().get_mut(&piece_index) {
Some(p) => p,
_ => {
debug!("no piece with index {:?}", piece_index);
continue;
}
};
if piece.status != TorrentStatus::Downloading {
debug!("downloaded block of already completed piece, loss");
continue;
}
let total_blocks = piece.total_blocks();
if block_index != total_blocks - 1 && block.0.len() != BLOCK_SIZE as usize {
debug!("block of unexpected size: {}", block.0.len());
continue;
}
if piece.blocks.insert(block_index, block).is_some() {
debug!("repeaded block download, loss");
};
trace!("got block {}/{}", piece.blocks.len(), total_blocks);
if piece.blocks.len() as u32 == total_blocks {
let piece_data: Vec<u8> = piece
.blocks
.values()
.flat_map(|b| b.0.as_slice())
.copied()
.collect();
let piece_hash = sha1::encode(piece_data);
if piece_hash != piece.hash.0 {
warn!("piece hash does not match: {:?}", piece);
trace!("{}", hex(&piece_hash));
trace!("{}", hex(&piece.hash.0));
continue;
}
piece.status = TorrentStatus::Downloaded;
info!(
"piece {}/{}",
state
.pieces
.as_ref()
.unwrap()
.values()
.filter(|p| p.status > TorrentStatus::Downloading)
.count(),
state.pieces.as_ref().unwrap().len(),
);
}
}

if state
.lock()
.await
.pieces
.as_ref()
.unwrap()
.get(&piece_index)
.unwrap()
.status
== TorrentStatus::Downloaded
{
// TODO: async
match spawn(write_piece(piece_index, state.clone())).await {
Ok(_) => debug!("piece saved"),
Err(e) => error!("error writing piece: {:#}", e),
};
if let Err(e) = read_piece(state.clone(), piece_index, begin, block).await {
debug!("{e:#}");
}
}
Ok(Message::Port { port }) => match state.lock().await.peers.get_mut(&peer) {
Expand All @@ -445,71 +371,8 @@ async fn read_loop(
ext_id,
payload: Some(payload),
}) => {
trace!("got extended message: #{}", ext_id,);
match ext_id {
0 => {
debug!("got extended handshake");
if let Some(BencodeValue::Dict(dict)) = parse_bencoded(payload).0 {
match dict.get("m") {
Some(BencodeValue::Dict(m_d)) => {
let ext_map = m_d
.iter()
.filter_map(|(k, v)| {
let ext = Extension::try_from(k.as_str()).ok()?;
let num = match v {
BencodeValue::Int(i) => *i as u8,
_ => {
return Err(Error::msg("ext id is not an int"))
.ok()
}
};
Some((ext, num))
})
.collect();
state
.lock()
.await
.peers
.get_mut(&peer)
.context("no peer")?
.extension_map = ext_map;
}
_ => debug!("no `m` key"),
}
};
}
_ => match Extension::try_from(ext_id as usize) {
Ok(Extension::Metadata) => match PeerMetainfoMessage::try_from(payload) {
Ok(msg) => {
debug!("got metadata message {:?}", msg);
match msg {
PeerMetainfoMessage::Data {
piece,
total_size,
data,
} => {
let mut state = state.lock().await;
if let Err(m_state) = state.metainfo.as_mut() {
m_state.pieces.insert(piece, data);
m_state.total_size = Some(total_size);
debug!(
"new metainfo piece {}/{}",
m_state.pieces.len(),
total_size.div_ceil(METAINFO_PIECE_SIZE)
);
} else {
debug!("metainfo already set");
}
}
_ => {
debug!("unhandled metadata message {:?}", msg);
}
}
}
Err(e) => debug!("{e:#}"),
},
Err(..) => debug!("unsupported extension id: #{}", ext_id),
},
if let Err(e) = read_extended(state.clone(), &peer, ext_id, payload).await {
debug!("{e:#}");
}
}
Ok(msg) => {
Expand All @@ -522,3 +385,165 @@ async fn read_loop(
};
}
}

async fn read_piece(
state: Arc<Mutex<State>>,
piece_index: u32,
begin: u32,
block: Block,
) -> Result<()> {
let status = state.lock().await.status.clone();
if status != TorrentStatus::Downloading {
debug!("not accepting pieces with status {:?}", status);
return Ok(());
}
if begin % BLOCK_SIZE != 0 {
return Err(Error::msg("block begin is not a multiple of block size"));
}
let block_index = begin / BLOCK_SIZE;

{
let mut state = state.lock().await;
let piece = match state.pieces.as_mut().unwrap().get_mut(&piece_index) {
Some(p) => p,
_ => {
debug!("no piece with index {:?}", piece_index);
return Ok(());
}
};
if piece.status != TorrentStatus::Downloading {
debug!("downloaded block of already completed piece, loss");
return Ok(());
}
let total_blocks = piece.total_blocks();
if block_index != total_blocks - 1 && block.0.len() != BLOCK_SIZE as usize {
debug!("block of unexpected size: {}", block.0.len());
return Ok(());
}
if piece.blocks.insert(block_index, block).is_some() {
debug!("repeaded block download, loss");
};
trace!("got block {}/{}", piece.blocks.len(), total_blocks);
if piece.blocks.len() as u32 == total_blocks {
let piece_data: Vec<u8> = piece
.blocks
.values()
.flat_map(|b| b.0.as_slice())
.copied()
.collect();
let piece_hash = sha1::encode(piece_data);
if piece_hash != piece.hash.0 {
warn!("piece hash does not match: {:?}", piece);
trace!("{}", hex(&piece_hash));
trace!("{}", hex(&piece.hash.0));
return Ok(());
}
piece.status = TorrentStatus::Downloaded;
info!(
"piece {}/{}",
state
.pieces
.as_ref()
.unwrap()
.values()
.filter(|p| p.status > TorrentStatus::Downloading)
.count(),
state.pieces.as_ref().unwrap().len(),
);
}
}

let status = state
.lock()
.await
.pieces
.as_ref()
.unwrap()
.get(&piece_index)
.context("no piece")?
.status
.clone();
if status == TorrentStatus::Downloaded {
// TODO: async
spawn(write_piece(piece_index, state.clone()))
.await?
.context("error writing piece")?;
debug!("piece saved");
}
Ok(())
}

async fn read_extended(
state: Arc<Mutex<State>>,
peer: &PeerInfo,
ext_id: u8,
payload: Vec<u8>,
) -> Result<()> {
trace!("got extended message: #{}", ext_id);
match ext_id {
0 => {
debug!("got extended handshake");
if let Some(BencodeValue::Dict(dict)) = parse_bencoded(payload).0 {
match dict.get("m") {
Some(BencodeValue::Dict(m_d)) => {
let ext_map = m_d
.iter()
.filter_map(|(k, v)| {
let ext = Extension::try_from(k.as_str()).ok()?;
let num = match v {
BencodeValue::Int(i) => *i as u8,
_ => return Err(Error::msg("ext id is not an int")).ok(),
};
Some((ext, num))
})
.collect();
state
.lock()
.await
.peers
.get_mut(peer)
.context("no peer")?
.extension_map = ext_map;
}
_ => return Err(Error::msg("no `m` key")),
}
};
}
_ => match Extension::try_from(ext_id as usize) {
Ok(Extension::Metadata) => match PeerMetainfoMessage::try_from(payload) {
Ok(msg) => {
debug!("got metadata message {:?}", msg);
match msg {
PeerMetainfoMessage::Data {
piece,
total_size,
data,
} => {
let mut state = state.lock().await;
if let Err(m_state) = state.metainfo.as_mut() {
m_state.pieces.insert(piece, data);
m_state.total_size = Some(total_size);
debug!(
"new metainfo piece {}/{}",
m_state.pieces.len(),
total_size.div_ceil(METAINFO_PIECE_SIZE)
);
} else {
return Err(Error::msg("metainfo already set"));
}
}
_ => {
return Err(Error::msg(format!(
"unhandled metadata message {:?}",
msg
)));
}
}
}
Err(e) => return Err(Error::msg(format!("{e:#}"))),
},
Err(..) => return Err(Error::msg(format!("unsupported extension id: #{}", ext_id))),
},
};
Ok(())
}

0 comments on commit 000cf68

Please sign in to comment.