From 58a5fecbff710bc9d7da32afc3a6533dbd2ee80f Mon Sep 17 00:00:00 2001 From: Snowiiii Date: Sun, 18 Aug 2024 16:51:31 +0200 Subject: [PATCH] Port to tokio --- Cargo.lock | 7 +- Cargo.toml | 2 +- pumpkin-protocol/Cargo.toml | 1 + pumpkin-protocol/src/lib.rs | 5 +- pumpkin-protocol/src/packet_decoder.rs | 1 - pumpkin-world/src/world.rs | 23 ++- pumpkin/Cargo.toml | 1 - pumpkin/src/client/client_packet.rs | 82 ++++++--- pumpkin/src/client/mod.rs | 215 ++++++++++++++-------- pumpkin/src/client/player_packet.rs | 166 ++++++++++------- pumpkin/src/commands/gamemode.rs | 29 +-- pumpkin/src/commands/mod.rs | 16 +- pumpkin/src/commands/pumpkin.rs | 4 +- pumpkin/src/commands/stop.rs | 2 +- pumpkin/src/main.rs | 244 +++++++++---------------- pumpkin/src/proxy/velocity.rs | 28 +-- pumpkin/src/rcon/mod.rs | 6 - pumpkin/src/server.rs | 183 ++++++++++--------- 18 files changed, 536 insertions(+), 479 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce1e60d1..c0b15335 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -879,7 +879,6 @@ checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ "hermit-abi", "libc", - "log", "wasi", "windows-sys 0.52.0", ] @@ -1199,7 +1198,6 @@ dependencies = [ "hmac", "image", "log", - "mio", "num-bigint", "num-derive", "num-traits", @@ -1256,6 +1254,7 @@ name = "pumpkin-protocol" version = "0.1.0" dependencies = [ "aes", + "byteorder", "bytes", "cfb8", "fastnbt", @@ -1881,9 +1880,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 112838e1..318f4e59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,5 +12,5 @@ lto = true codegen-units = 1 [workspace.dependencies] -tokio = { version = "1.39.2", features = ["net", "macros", "rt-multi-thread", "fs", "io-util"] } +tokio = { version = "1.39.3", features = ["net", "macros", "rt-multi-thread", "fs", "io-util"] } rayon = "1.10.0" diff --git a/pumpkin-protocol/Cargo.toml b/pumpkin-protocol/Cargo.toml index 27c0c656..852228e9 100644 --- a/pumpkin-protocol/Cargo.toml +++ b/pumpkin-protocol/Cargo.toml @@ -9,6 +9,7 @@ pumpkin-world = { path = "../pumpkin-world" } pumpkin-text = { path = "../pumpkin-text" } bytes = "1.7" +byteorder = "1.5.0" uuid = "1.10" diff --git a/pumpkin-protocol/src/lib.rs b/pumpkin-protocol/src/lib.rs index 380a487e..3633fe32 100644 --- a/pumpkin-protocol/src/lib.rs +++ b/pumpkin-protocol/src/lib.rs @@ -1,4 +1,5 @@ use bytebuf::{packet_id::Packet, ByteBuffer, DeserializerError}; +use byteorder::ReadBytesExt; use bytes::Buf; use serde::{Deserialize, Serialize, Serializer}; use std::io::{self, Write}; @@ -10,8 +11,8 @@ pub mod packet_decoder; pub mod packet_encoder; pub mod position; pub mod server; -pub mod uuid; pub mod slot; +pub mod uuid; pub const CURRENT_MC_PROTOCOL: u32 = 767; @@ -44,7 +45,7 @@ impl VarInt { pub fn decode_partial(r: &mut &[u8]) -> Result { let mut val = 0; for i in 0..Self::MAX_SIZE { - let byte = r.get_u8(); + let byte = r.read_u8().map_err(|_| VarIntDecodeError::Incomplete)?; val |= (i32::from(byte) & 0b01111111) << (i * 7); if byte & 0b10000000 == 0 { return Ok(val); diff --git a/pumpkin-protocol/src/packet_decoder.rs b/pumpkin-protocol/src/packet_decoder.rs index 3d3e3449..3a297e1f 100644 --- a/pumpkin-protocol/src/packet_decoder.rs +++ b/pumpkin-protocol/src/packet_decoder.rs @@ -24,7 +24,6 @@ pub struct PacketDecoder { impl PacketDecoder { pub fn decode(&mut self) -> Result, PacketError> { let mut r = &self.buf[..]; - let packet_len = match VarInt::decode_partial(&mut r) { Ok(len) => len, Err(VarIntDecodeError::Incomplete) => return Ok(None), diff --git a/pumpkin-world/src/world.rs b/pumpkin-world/src/world.rs index 464ae366..27d71fab 100644 --- a/pumpkin-world/src/world.rs +++ b/pumpkin-world/src/world.rs @@ -153,15 +153,13 @@ impl Level { let modulus = |a: i32, b: i32| ((a % b) + b) % b; let chunk_x = modulus(chunk.0, 32) as u32; let chunk_z = modulus(chunk.1, 32) as u32; - let channel = channel.clone(); let table_entry = (chunk_x + chunk_z * 32) * 4; + let table_entry = table_entry as usize; let mut offset = vec![0u8]; - offset.extend_from_slice( - &location_table[table_entry as usize..table_entry as usize + 3], - ); + offset.extend_from_slice(&location_table[table_entry..table_entry + 3]); let offset = u32::from_be_bytes(offset.try_into().unwrap()) as u64 * 4096; - let size = location_table[table_entry as usize + 3] as usize * 4096; + let size = location_table[table_entry + 3] as usize * 4096; if offset == 0 && size == 0 { let _ = @@ -169,21 +167,22 @@ impl Level { return; } // Read the file using the offset and size - let mut file_buf = { - let seek_result = region_file.seek(std::io::SeekFrom::Start(offset)); - if seek_result.is_err() { + let mut file_buf = match region_file.seek(std::io::SeekFrom::Start(offset)) { + Ok(_) => vec![0; size], + Err(_) => { let _ = channel .blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); return; } - let mut out = vec![0; size]; - let read_result = region_file.read_exact(&mut out); - if read_result.is_err() { + }; + + match region_file.read_exact(&mut file_buf) { + Ok(_) => (), + Err(_) => { let _ = channel .blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); return; } - out }; // TODO: check checksum to make sure chunk is not corrupted diff --git a/pumpkin/Cargo.toml b/pumpkin/Cargo.toml index 3085385e..e6af3e74 100644 --- a/pumpkin/Cargo.toml +++ b/pumpkin/Cargo.toml @@ -58,7 +58,6 @@ simple_logger = "5.0.0" log = "0.4" # networking -mio = { version = "1.0.1", features = ["os-poll", "net"]} crossbeam-channel = "0.5.13" uuid = { version = "1.10", features = ["serde", "v3"]} diff --git a/pumpkin/src/client/client_packet.rs b/pumpkin/src/client/client_packet.rs index ded91bdd..db5f2a6d 100644 --- a/pumpkin/src/client/client_packet.rs +++ b/pumpkin/src/client/client_packet.rs @@ -34,27 +34,33 @@ use super::{ /// Processes incoming Packets from the Client to the Server /// Implements the `Client` Packets impl Client { - pub fn handle_handshake(&mut self, _server: &mut Server, handshake: SHandShake) { + pub async fn handle_handshake(&mut self, _server: &mut Server, handshake: SHandShake) { dbg!("handshake"); self.protocol_version = handshake.protocol_version.0; self.connection_state = handshake.next_state; if self.connection_state == ConnectionState::Login { if self.protocol_version < CURRENT_MC_PROTOCOL as i32 { let protocol = self.protocol_version; - self.kick(&format!("Client outdated ({protocol}), Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")); + self.kick(&format!("Client outdated ({protocol}), Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")).await; } else if self.protocol_version > CURRENT_MC_PROTOCOL as i32 { - self.kick(&format!("Server outdated, Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")); + self.kick(&format!("Server outdated, Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")).await; } } } - pub fn handle_status_request(&mut self, server: &mut Server, _status_request: SStatusRequest) { - self.send_packet(&CStatusResponse::new(&server.status_response_json)); + pub async fn handle_status_request( + &mut self, + server: &mut Server, + _status_request: SStatusRequest, + ) { + self.send_packet(&CStatusResponse::new(&server.status_response_json)) + .await; } - pub fn handle_ping_request(&mut self, _server: &mut Server, ping_request: SPingRequest) { + pub async fn handle_ping_request(&mut self, _server: &mut Server, ping_request: SPingRequest) { dbg!("ping"); - self.send_packet(&CPingResponse::new(ping_request.payload)); + self.send_packet(&CPingResponse::new(ping_request.payload)) + .await; self.close(); } @@ -62,11 +68,11 @@ impl Client { name.len() <= 16 && name.chars().all(|c| c > 32 as char && c < 127 as char) } - pub fn handle_login_start(&mut self, server: &mut Server, login_start: SLoginStart) { + pub async fn handle_login_start(&mut self, server: &mut Server, login_start: SLoginStart) { dbg!("login start"); if !Self::is_valid_player_name(&login_start.name) { - self.kick("Invalid characters in username"); + self.kick("Invalid characters in username").await; return; } // default game profile, when no online mode @@ -80,7 +86,7 @@ impl Client { let proxy = &server.advanced_config.proxy; if proxy.enabled { if proxy.velocity.enabled { - velocity_login(self) + velocity_login(self).await } return; } @@ -94,7 +100,7 @@ impl Client { &verify_token, server.base_config.online_mode, // TODO ); - self.send_packet(&packet); + self.send_packet(&packet).await; } pub async fn handle_encryption_response( @@ -107,8 +113,12 @@ impl Client { .decrypt(Pkcs1v15Encrypt, &encryption_response.shared_secret) .map_err(|_| EncryptionError::FailedDecrypt) .unwrap(); - self.enable_encryption(&shared_secret) - .unwrap_or_else(|e| self.kick(&e.to_string())); + match self.enable_encryption(&shared_secret) { + Ok(d) => d, + Err(e) => { + self.kick(&e.to_string()).await; + } + } if server.base_config.online_mode { let hash = Sha1::new() @@ -135,7 +145,7 @@ impl Client { .allow_banned_players { if !p.is_empty() { - self.kick("Your account can't join"); + self.kick("Your account can't join").await; } } else { for allowed in server @@ -146,14 +156,14 @@ impl Client { .clone() { if !p.contains(&allowed) { - self.kick("Your account can't join"); + self.kick("Your account can't join").await; } } } } self.gameprofile = Some(p); } - Err(e) => self.kick(&e.to_string()), + Err(e) => self.kick(&e.to_string()).await, } } for ele in self.gameprofile.as_ref().unwrap().properties.clone() { @@ -168,32 +178,33 @@ impl Client { .packet_compression .compression_threshold; let level = server.advanced_config.packet_compression.compression_level; - self.send_packet(&CSetCompression::new(threshold.into())); + self.send_packet(&CSetCompression::new(threshold.into())) + .await; self.set_compression(Some((threshold, level))); } if let Some(profile) = self.gameprofile.as_ref().cloned() { let packet = CLoginSuccess::new(profile.id, &profile.name, &profile.properties, false); - self.send_packet(&packet); + self.send_packet(&packet).await; } else { - self.kick("game profile is none"); + self.kick("game profile is none").await; } } - pub fn handle_plugin_response( + pub async fn handle_plugin_response( &mut self, _server: &mut Server, _plugin_response: SLoginPluginResponse, ) { } - pub fn handle_login_acknowledged( + pub async fn handle_login_acknowledged( &mut self, server: &mut Server, _login_acknowledged: SLoginAcknowledged, ) { self.connection_state = ConnectionState::Config; - server.send_brand(self); + server.send_brand(self).await; let resource_config = &server.advanced_config.resource_pack; if resource_config.enabled { @@ -208,7 +219,8 @@ impl Client { resource_config.resource_pack_sha1.clone(), resource_config.force, prompt_message, - )); + )) + .await; } // known data packs @@ -216,10 +228,11 @@ impl Client { namespace: "minecraft", id: "core", version: "1.21", - }])); + }])) + .await; dbg!("login achnowlaged"); } - pub fn handle_client_information_config( + pub async fn handle_client_information_config( &mut self, _server: &mut Server, client_information: SClientInformationConfig, @@ -237,29 +250,38 @@ impl Client { }); } - pub fn handle_plugin_message(&mut self, _server: &mut Server, plugin_message: SPluginMessage) { + pub async fn handle_plugin_message( + &mut self, + _server: &mut Server, + plugin_message: SPluginMessage, + ) { if plugin_message.channel.starts_with("minecraft:brand") || plugin_message.channel.starts_with("MC|Brand") { dbg!("got a client brand"); match String::from_utf8(plugin_message.data) { Ok(brand) => self.brand = Some(brand), - Err(e) => self.kick(&e.to_string()), + Err(e) => self.kick(&e.to_string()).await, } } } - pub fn handle_known_packs(&mut self, server: &mut Server, _config_acknowledged: SKnownPacks) { + pub async fn handle_known_packs( + &mut self, + server: &mut Server, + _config_acknowledged: SKnownPacks, + ) { for registry in &server.cached_registry { self.send_packet(&CRegistryData::new( ®istry.registry_id, ®istry.registry_entries, - )); + )) + .await; } // We are done with configuring dbg!("finish config"); - self.send_packet(&CFinishConfig::new()); + self.send_packet(&CFinishConfig::new()).await; } pub async fn handle_config_acknowledged( diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index a118bea1..01314a9c 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -2,7 +2,8 @@ use std::{ collections::VecDeque, io::{self, Write}, net::SocketAddr, - rc::Rc, + sync::Arc, + time::Duration, }; use crate::{ @@ -11,7 +12,7 @@ use crate::{ }; use authentication::GameProfile; -use mio::{event::Event, net::TcpStream, Token}; +use bytes::BytesMut; use num_traits::ToPrimitive; use pumpkin_protocol::{ bytebuf::packet_id::Packet, @@ -36,6 +37,12 @@ use pumpkin_protocol::{ ClientPacket, ConnectionState, PacketError, RawPacket, ServerPacket, }; use pumpkin_text::TextComponent; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + select, + sync::RwLock, +}; use std::io::Read; use thiserror::Error; @@ -67,7 +74,7 @@ pub struct Client { pub connection_state: ConnectionState, pub encrytion: bool, pub closed: bool, - pub token: Rc, + pub token: u32, pub connection: TcpStream, pub address: SocketAddr, enc: PacketEncoder, @@ -76,7 +83,7 @@ pub struct Client { } impl Client { - pub fn new(token: Rc, connection: TcpStream, address: SocketAddr) -> Self { + pub fn new(token: u32, connection: TcpStream, address: SocketAddr) -> Self { Self { protocol_version: 0, gameprofile: None, @@ -125,25 +132,28 @@ impl Client { } /// Send a Clientbound Packet to the Client - pub fn send_packet(&mut self, packet: &P) { - self.enc - .append_packet(packet) - .unwrap_or_else(|e| self.kick(&e.to_string())); - self.connection - .write_all(&self.enc.take()) - .map_err(|_| PacketError::ConnectionWrite) - .unwrap_or_else(|e| self.kick(&e.to_string())); + pub async fn send_packet(&mut self, packet: &P) { + match self.try_send_packet(packet).await { + Ok(_) => {} + Err(e) => { + self.kick(&e.to_string()).await; + } + }; } - pub fn try_send_packet(&mut self, packet: &P) -> Result<(), PacketError> { + pub async fn try_send_packet( + &mut self, + packet: &P, + ) -> Result<(), PacketError> { self.enc.append_packet(packet)?; self.connection .write_all(&self.enc.take()) + .await .map_err(|_| PacketError::ConnectionWrite)?; Ok(()) } - pub fn teleport(&mut self, x: f64, y: f64, z: f64, yaw: f32, pitch: f32) { + pub async fn teleport(&mut self, x: f64, y: f64, z: f64, yaw: f32, pitch: f32) { assert!(self.is_player()); // TODO let id = 0; @@ -158,7 +168,8 @@ impl Client { entity.yaw = yaw; entity.pitch = pitch; player.awaiting_teleport = Some(id.into()); - self.send_packet(&CSyncPlayerPostion::new(x, y, z, yaw, pitch, 0, id.into())); + self.send_packet(&CSyncPlayerPostion::new(x, y, z, yaw, pitch, 0, id.into())) + .await; } pub fn update_health(&mut self, health: f32, food: i32, food_saturation: f32) { @@ -191,6 +202,7 @@ impl Client { pumpkin_protocol::ConnectionState::HandShake => match packet.id.0 { SHandShake::PACKET_ID => { self.handle_handshake(server, SHandShake::read(bytebuf).unwrap()) + .await } _ => log::error!( "Failed to handle packet id {} while in Handshake state", @@ -200,9 +212,11 @@ impl Client { pumpkin_protocol::ConnectionState::Status => match packet.id.0 { SStatusRequest::PACKET_ID => { self.handle_status_request(server, SStatusRequest::read(bytebuf).unwrap()) + .await } SPingRequest::PACKET_ID => { self.handle_ping_request(server, SPingRequest::read(bytebuf).unwrap()) + .await } _ => log::error!( "Failed to handle packet id {} while in Status state", @@ -212,6 +226,7 @@ impl Client { pumpkin_protocol::ConnectionState::Login => match packet.id.0 { SLoginStart::PACKET_ID => { self.handle_login_start(server, SLoginStart::read(bytebuf).unwrap()) + .await } SEncryptionResponse::PACKET_ID => { self.handle_encryption_response( @@ -220,22 +235,36 @@ impl Client { ) .await } - SLoginPluginResponse::PACKET_ID => self - .handle_plugin_response(server, SLoginPluginResponse::read(bytebuf).unwrap()), - SLoginAcknowledged::PACKET_ID => self - .handle_login_acknowledged(server, SLoginAcknowledged::read(bytebuf).unwrap()), + SLoginPluginResponse::PACKET_ID => { + self.handle_plugin_response( + server, + SLoginPluginResponse::read(bytebuf).unwrap(), + ) + .await + } + SLoginAcknowledged::PACKET_ID => { + self.handle_login_acknowledged( + server, + SLoginAcknowledged::read(bytebuf).unwrap(), + ) + .await + } _ => log::error!( "Failed to handle packet id {} while in Login state", packet.id.0 ), }, pumpkin_protocol::ConnectionState::Config => match packet.id.0 { - SClientInformationConfig::PACKET_ID => self.handle_client_information_config( - server, - SClientInformationConfig::read(bytebuf).unwrap(), - ), + SClientInformationConfig::PACKET_ID => { + self.handle_client_information_config( + server, + SClientInformationConfig::read(bytebuf).unwrap(), + ) + .await + } SPluginMessage::PACKET_ID => { self.handle_plugin_message(server, SPluginMessage::read(bytebuf).unwrap()) + .await } SAcknowledgeFinishConfig::PACKET_ID => { self.handle_config_acknowledged( @@ -246,6 +275,7 @@ impl Client { } SKnownPacks::PACKET_ID => { self.handle_known_packs(server, SKnownPacks::read(bytebuf).unwrap()) + .await } _ => log::error!( "Failed to handle packet id {} while in Config state", @@ -254,58 +284,80 @@ impl Client { }, pumpkin_protocol::ConnectionState::Play => { if self.player.is_some() { - self.handle_play_packet(server, packet); + self.handle_play_packet(server, packet).await; } else { // should be impossible - self.kick("no player in play state?") + self.kick("no player in play state?").await } } _ => log::error!("Invalid Connection state {:?}", self.connection_state), } } - pub fn handle_play_packet(&mut self, server: &mut Server, packet: &mut RawPacket) { + pub async fn handle_play_packet(&mut self, server: &mut Server, packet: &mut RawPacket) { let bytebuf = &mut packet.bytebuf; match packet.id.0 { SConfirmTeleport::PACKET_ID => { self.handle_confirm_teleport(server, SConfirmTeleport::read(bytebuf).unwrap()) + .await } SChatCommand::PACKET_ID => { self.handle_chat_command(server, SChatCommand::read(bytebuf).unwrap()) + .await } SPlayerPosition::PACKET_ID => { self.handle_position(server, SPlayerPosition::read(bytebuf).unwrap()) + .await + } + SPlayerPositionRotation::PACKET_ID => { + self.handle_position_rotation( + server, + SPlayerPositionRotation::read(bytebuf).unwrap(), + ) + .await } - SPlayerPositionRotation::PACKET_ID => self - .handle_position_rotation(server, SPlayerPositionRotation::read(bytebuf).unwrap()), SPlayerRotation::PACKET_ID => { self.handle_rotation(server, SPlayerRotation::read(bytebuf).unwrap()) + .await } SPlayerCommand::PACKET_ID => { self.handle_player_command(server, SPlayerCommand::read(bytebuf).unwrap()) + .await } SSwingArm::PACKET_ID => { self.handle_swing_arm(server, SSwingArm::read(bytebuf).unwrap()) + .await } SChatMessage::PACKET_ID => { self.handle_chat_message(server, SChatMessage::read(bytebuf).unwrap()) + .await + } + SClientInformationPlay::PACKET_ID => { + self.handle_client_information_play( + server, + SClientInformationPlay::read(bytebuf).unwrap(), + ) + .await + } + SInteract::PACKET_ID => { + self.handle_interact(server, SInteract::read(bytebuf).unwrap()) + .await } - SClientInformationPlay::PACKET_ID => self.handle_client_information_play( - server, - SClientInformationPlay::read(bytebuf).unwrap(), - ), - SInteract::PACKET_ID => self.handle_interact(server, SInteract::read(bytebuf).unwrap()), SPlayerAction::PACKET_ID => { self.handle_player_action(server, SPlayerAction::read(bytebuf).unwrap()) + .await } SUseItemOn::PACKET_ID => { self.handle_use_item_on(server, SUseItemOn::read(bytebuf).unwrap()) + .await } SSetHeldItem::PACKET_ID => { self.handle_set_held_item(server, SSetHeldItem::read(bytebuf).unwrap()) + .await } SSetCreativeSlot::PACKET_ID => { self.handle_set_creative_slot(server, SSetCreativeSlot::read(bytebuf).unwrap()) + .await } _ => log::error!("Failed to handle player packet id {}", packet.id.0), } @@ -313,70 +365,79 @@ impl Client { // Reads the connection until our buffer of len 4096 is full, then decode /// Close connection when an error occurs - pub async fn poll(&mut self, server: &mut Server, event: &Event) { - if event.is_readable() { - let mut received_data = vec![0; 4096]; - let mut bytes_read = 0; - // We can (maybe) read from the connection. - loop { - match self.connection.read(&mut received_data[bytes_read..]) { - Ok(0) => { - // Reading 0 bytes means the other side has closed the - // connection or is done writing, then so are we. - self.close(); - break; - } - Ok(n) => { - bytes_read += n; - received_data.extend(&vec![0; n]); + pub async fn poll(&mut self, server: Arc>) { + dbg!("b"); + + let mut buf = BytesMut::new(); + loop { + select! { + result = self.connection.read_buf(&mut buf) => { + match result { + Ok(0) => { + self.close(); + break; + } + Ok(_) => { + self.dec.queue_bytes(buf.split()); + } + Err(e) => { + log::error!("{}", e); + self.kick(&e.to_string()).await; + break; + } } - // Would block "errors" are the OS's way of saying that the - // connection is not actually ready to perform this I/O operation. - Err(ref err) if would_block(err) => break, - Err(ref err) if interrupted(err) => continue, - // Other errors we'll consider fatal. - Err(_) => self.close(), + }, + _ = tokio::time::sleep(Duration::from_millis(100)) => { + // Handle timeout (optional) } } - if bytes_read != 0 { - self.dec.reserve(4096); - self.dec.queue_slice(&received_data[..bytes_read]); - match self.dec.decode() { - Ok(packet) => { - if let Some(packet) = packet { - self.add_packet(packet); - self.process_packets(server).await; - } + match self.dec.decode() { + Ok(packet) => { + if let Some(packet) = packet { + self.add_packet(packet); + let mut server = server.write().await; + self.process_packets(&mut server).await; } - Err(err) => self.kick(&err.to_string()), } - self.dec.clear(); + Err(err) => self.kick(&err.to_string()).await, } } } - pub fn send_system_message(&mut self, text: TextComponent) { - self.send_packet(&CSystemChatMessge::new(text, false)); + pub async fn send_system_message(&mut self, text: TextComponent) { + self.send_packet(&CSystemChatMessge::new(text, false)).await; } /// Kicks the Client with a reason depending on the connection state - pub fn kick(&mut self, reason: &str) { + pub async fn kick(&mut self, reason: &str) { dbg!(reason); match self.connection_state { ConnectionState::Login => { - self.try_send_packet(&CLoginDisconnect::new( - &serde_json::to_string_pretty(&reason).unwrap(), - )) - .unwrap_or_else(|_| self.close()); + match self + .try_send_packet(&CLoginDisconnect::new( + &serde_json::to_string_pretty(&reason).unwrap(), + )) + .await + { + Ok(_) => {} + Err(_) => self.close(), + } } ConnectionState::Config => { - self.try_send_packet(&CConfigDisconnect::new(reason)) - .unwrap_or_else(|_| self.close()); + match self.try_send_packet(&CConfigDisconnect::new(reason)).await { + Ok(_) => {} + Err(_) => self.close(), + } } ConnectionState::Play => { - self.try_send_packet(&CPlayDisconnect::new(TextComponent::from(reason))) - .unwrap_or_else(|_| self.close()); + match self + .try_send_packet(&CPlayDisconnect::new(TextComponent::from(reason))) + .await + { + Ok(_) => {} + Err(_) => self.close(), + } } _ => { log::warn!("Cant't kick in {:?} State", self.connection_state) diff --git a/pumpkin/src/client/player_packet.rs b/pumpkin/src/client/player_packet.rs index f9786951..f7d3a978 100644 --- a/pumpkin/src/client/player_packet.rs +++ b/pumpkin/src/client/player_packet.rs @@ -30,7 +30,7 @@ fn modulus(a: f32, b: f32) -> f32 { /// Handles all Play Packets send by a real Player impl Client { - pub fn handle_confirm_teleport( + pub async fn handle_confirm_teleport( &mut self, _server: &mut Server, confirm_teleport: SConfirmTeleport, @@ -44,6 +44,7 @@ impl Client { player.awaiting_teleport = None; } else { self.kick("Send Teleport confirm, but we did not teleport") + .await; } } @@ -55,9 +56,9 @@ impl Client { pos.clamp(-2.0E7, 2.0E7) } - pub fn handle_position(&mut self, server: &mut Server, position: SPlayerPosition) { + pub async fn handle_position(&mut self, server: &mut Server, position: SPlayerPosition) { if position.x.is_nan() || position.feet_y.is_nan() || position.z.is_nan() { - self.kick("Invalid movement"); + self.kick("Invalid movement").await; return; } let player = self.player.as_mut().unwrap(); @@ -77,19 +78,21 @@ impl Client { let (y, lasty) = (entity.y, entity.lasty); let (z, lastz) = (entity.z, entity.lastz); - server.broadcast_packet( - self, - &CUpdateEntityPos::new( - entity_id.into(), - (x * 4096.0 - lastx * 4096.0) as i16, - (y * 4096.0 - lasty * 4096.0) as i16, - (z * 4096.0 - lastz * 4096.0) as i16, - on_ground, - ), - ); + server + .broadcast_packet( + self, + &CUpdateEntityPos::new( + entity_id.into(), + (x * 4096.0 - lastx * 4096.0) as i16, + (y * 4096.0 - lasty * 4096.0) as i16, + (z * 4096.0 - lastz * 4096.0) as i16, + on_ground, + ), + ) + .await; } - pub fn handle_position_rotation( + pub async fn handle_position_rotation( &mut self, server: &mut Server, position_rotation: SPlayerPositionRotation, @@ -98,11 +101,11 @@ impl Client { || position_rotation.feet_y.is_nan() || position_rotation.z.is_nan() { - self.kick("Invalid movement"); + self.kick("Invalid movement").await; return; } if !position_rotation.yaw.is_finite() || !position_rotation.pitch.is_finite() { - self.kick("Invalid rotation"); + self.kick("Invalid rotation").await; return; } let player = self.player.as_mut().unwrap(); @@ -127,24 +130,28 @@ impl Client { let pitch = modulus(entity.pitch * 256.0 / 360.0, 256.0); // let head_yaw = (entity.head_yaw * 256.0 / 360.0).floor(); - server.broadcast_packet( - self, - &CUpdateEntityPosRot::new( - entity_id.into(), - (x * 4096.0 - lastx * 4096.0) as i16, - (y * 4096.0 - lasty * 4096.0) as i16, - (z * 4096.0 - lastz * 4096.0) as i16, - yaw as u8, - pitch as u8, - on_ground, - ), - ); - server.broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)); + server + .broadcast_packet( + self, + &CUpdateEntityPosRot::new( + entity_id.into(), + (x * 4096.0 - lastx * 4096.0) as i16, + (y * 4096.0 - lasty * 4096.0) as i16, + (z * 4096.0 - lastz * 4096.0) as i16, + yaw as u8, + pitch as u8, + on_ground, + ), + ) + .await; + server + .broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)) + .await; } - pub fn handle_rotation(&mut self, server: &mut Server, rotation: SPlayerRotation) { + pub async fn handle_rotation(&mut self, server: &mut Server, rotation: SPlayerRotation) { if !rotation.yaw.is_finite() || !rotation.pitch.is_finite() { - self.kick("Invalid rotation"); + self.kick("Invalid rotation").await; return; } let player = self.player.as_mut().unwrap(); @@ -158,18 +165,22 @@ impl Client { let pitch = modulus(entity.pitch * 256.0 / 360.0, 256.0); // let head_yaw = modulus(entity.head_yaw * 256.0 / 360.0, 256.0); - server.broadcast_packet( - self, - &CUpdateEntityRot::new(entity_id.into(), yaw as u8, pitch as u8, on_ground), - ); - server.broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)); + server + .broadcast_packet( + self, + &CUpdateEntityRot::new(entity_id.into(), yaw as u8, pitch as u8, on_ground), + ) + .await; + server + .broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)) + .await; } - pub fn handle_chat_command(&mut self, _server: &mut Server, command: SChatCommand) { + pub async fn handle_chat_command(&mut self, _server: &mut Server, command: SChatCommand) { handle_command(&mut CommandSender::Player(self), &command.command); } - pub fn handle_player_command(&mut self, _server: &mut Server, command: SPlayerCommand) { + pub async fn handle_player_command(&mut self, _server: &mut Server, command: SPlayerCommand) { let player = self.player.as_mut().unwrap(); if command.entitiy_id != player.entity.entity_id.into() { @@ -189,36 +200,40 @@ impl Client { pumpkin_protocol::server::play::Action::StartFlyingElytra => {} // TODO } } else { - self.kick("Invalid player command") + self.kick("Invalid player command").await; } } - pub fn handle_swing_arm(&mut self, server: &mut Server, swing_arm: SSwingArm) { + pub async fn handle_swing_arm(&mut self, server: &mut Server, swing_arm: SSwingArm) { let animation = match Hand::from_i32(swing_arm.hand.0).unwrap() { Hand::Main => Animation::SwingMainArm, Hand::Off => Animation::SwingOffhand, }; let player = self.player.as_mut().unwrap(); let id = player.entity_id(); - server.broadcast_packet_expect( - &[&self.token], - &CEntityAnimation::new(id.into(), animation as u8), - ) + server + .broadcast_packet_expect( + &[self.token], + &CEntityAnimation::new(id.into(), animation as u8), + ) + .await } - pub fn handle_chat_message(&mut self, server: &mut Server, chat_message: SChatMessage) { + pub async fn handle_chat_message(&mut self, server: &mut Server, chat_message: SChatMessage) { let message = chat_message.message; // TODO: filter message & validation let gameprofile = self.gameprofile.as_ref().unwrap(); dbg!("got message"); // yeah a "raw system message", the ugly way to do that, but it works - server.broadcast_packet( - self, - &CSystemChatMessge::new( - TextComponent::from(format!("{}: {}", gameprofile.name, message)), - false, - ), - ); + server + .broadcast_packet( + self, + &CSystemChatMessge::new( + TextComponent::from(format!("{}: {}", gameprofile.name, message)), + false, + ), + ) + .await; /* server.broadcast_packet( self, @@ -250,7 +265,7 @@ impl Client { ) */ } - pub fn handle_client_information_play( + pub async fn handle_client_information_play( &mut self, _server: &mut Server, client_information: SClientInformationPlay, @@ -267,7 +282,7 @@ impl Client { }); } - pub fn handle_interact(&mut self, server: &mut Server, interact: SInteract) { + pub async fn handle_interact(&mut self, server: &mut Server, interact: SInteract) { let attacker_player = self.player.as_ref().unwrap(); let entity_id = interact.entity_id; // TODO: do validation and stuff @@ -275,7 +290,7 @@ impl Client { if config.enabled { let attacked_client = server.get_by_entityid(self, entity_id.0 as EntityId); if let Some(mut client) = attacked_client { - let token = client.token.clone(); + let token = client.token; let player = client.player.as_mut().unwrap(); let velo = player.velocity; if config.protect_creative && player.gamemode == GameMode::Creative { @@ -296,44 +311,57 @@ impl Client { player.velocity.z as f32, ); player.velocity = velo; - client.send_packet(packet); + client.send_packet(packet).await; // attacker_player.velocity = attacker_player.velocity.multiply(0.6, 1.0, 0.6); } if config.hurt_animation { // TODO // thats how we prevent borrow errors :c let packet = &CHurtAnimation::new(&entity_id, 10.0); - self.send_packet(packet); - client.send_packet(packet); - server.broadcast_packet_expect( - &[self.token.as_ref(), token.as_ref()], - &CHurtAnimation::new(&entity_id, 10.0), - ) + self.send_packet(packet).await; + client.send_packet(packet).await; + server + .broadcast_packet_expect( + &[self.token, token], + &CHurtAnimation::new(&entity_id, 10.0), + ) + .await } } else { - self.kick("Interacted with invalid entitiy id") + self.kick("Interacted with invalid entitiy id").await; } } } - pub fn handle_player_action(&mut self, _server: &mut Server, player_action: SPlayerAction) {} + pub async fn handle_player_action( + &mut self, + _server: &mut Server, + player_action: SPlayerAction, + ) { + } - pub fn handle_use_item_on(&mut self, server: &mut Server, use_item_on: SUseItemOn) { + pub async fn handle_use_item_on(&mut self, server: &mut Server, use_item_on: SUseItemOn) { let location = use_item_on.location; let face = BlockFace::from_i32(use_item_on.face.0).unwrap(); let location = WorldPosition(location.0 + face.to_offset()); - server.broadcast_packet(self, &CBlockUpdate::new(location, 11.into())); + server + .broadcast_packet(self, &CBlockUpdate::new(location, 11.into())) + .await; } - pub fn handle_set_held_item(&mut self, _server: &mut Server, held: SSetHeldItem) { + pub async fn handle_set_held_item(&mut self, _server: &mut Server, held: SSetHeldItem) { let slot = held.slot; if !(0..=8).contains(&slot) { - self.kick("Invalid held slot") + self.kick("Invalid held slot").await; } let player = self.player.as_mut().unwrap(); player.inventory.set_selected(slot); } - pub fn handle_set_creative_slot(&mut self, _server: &mut Server, packet: SSetCreativeSlot) { + pub async fn handle_set_creative_slot( + &mut self, + _server: &mut Server, + packet: SSetCreativeSlot, + ) { // TODO: handle this dbg!(&packet); } diff --git a/pumpkin/src/commands/gamemode.rs b/pumpkin/src/commands/gamemode.rs index fa00cbeb..a4202a66 100644 --- a/pumpkin/src/commands/gamemode.rs +++ b/pumpkin/src/commands/gamemode.rs @@ -12,15 +12,17 @@ impl<'a> Command<'a> for GamemodeCommand { const DESCRIPTION: &'a str = "Changes the gamemode for a Player"; - fn on_execute(sender: &mut super::CommandSender<'a>, command: String) { + async fn on_execute(sender: &mut super::CommandSender<'a>, command: String) { let player = sender.as_mut_player().unwrap(); let args: Vec<&str> = command.split_whitespace().collect(); if args.len() != 2 { - player.send_system_message( - TextComponent::from("Usage: /gamemode ") - .color_named(pumpkin_text::color::NamedColor::Red), - ); + player + .send_system_message( + TextComponent::from("Usage: /gamemode ") + .color_named(pumpkin_text::color::NamedColor::Red), + ) + .await; return; } @@ -28,7 +30,9 @@ impl<'a> Command<'a> for GamemodeCommand { match mode_str.parse() { Ok(mode) => { player.set_gamemode(mode); - player.send_system_message(format!("Set own game mode to {:?}", mode).into()); + player + .send_system_message(format!("Set own game mode to {:?}", mode).into()) + .await; } Err(_) => { // try to parse from number @@ -36,15 +40,18 @@ impl<'a> Command<'a> for GamemodeCommand { if let Some(mode) = GameMode::from_u8(i) { player.set_gamemode(mode); player - .send_system_message(format!("Set own game mode to {:?}", mode).into()); + .send_system_message(format!("Set own game mode to {:?}", mode).into()) + .await; return; } } - player.send_system_message( - TextComponent::from("Invalid gamemode") - .color_named(pumpkin_text::color::NamedColor::Red), - ); + player + .send_system_message( + TextComponent::from("Invalid gamemode") + .color_named(pumpkin_text::color::NamedColor::Red), + ) + .await; } } } diff --git a/pumpkin/src/commands/mod.rs b/pumpkin/src/commands/mod.rs index dc687430..132f7cc6 100644 --- a/pumpkin/src/commands/mod.rs +++ b/pumpkin/src/commands/mod.rs @@ -15,7 +15,7 @@ pub trait Command<'a> { const NAME: &'a str; const DESCRIPTION: &'a str; - fn on_execute(sender: &mut CommandSender<'a>, command: String); + async fn on_execute(sender: &mut CommandSender<'a>, command: String); /// Specifies wether the Command Sender has to be a Player /// TODO: implement @@ -31,11 +31,11 @@ pub enum CommandSender<'a> { } impl<'a> CommandSender<'a> { - pub fn send_message(&mut self, text: TextComponent) { + pub async fn send_message(&mut self, text: TextComponent) { match self { // TODO: add color and stuff to console CommandSender::Console => log::info!("{:?}", text.content), - CommandSender::Player(c) => c.send_system_message(text), + CommandSender::Player(c) => c.send_system_message(text).await, CommandSender::Rcon(s) => s.push(format!("{:?}", text.content)), } } @@ -63,21 +63,21 @@ impl<'a> CommandSender<'a> { } } } -pub fn handle_command(sender: &mut CommandSender, command: &str) { +pub async fn handle_command<'a>(sender: &mut CommandSender<'a>, command: &str) { let command = command.to_lowercase(); // an ugly mess i know if command.starts_with(PumpkinCommand::NAME) { - PumpkinCommand::on_execute(sender, command); + PumpkinCommand::on_execute(sender, command).await; return; } if command.starts_with(GamemodeCommand::NAME) { - GamemodeCommand::on_execute(sender, command); + GamemodeCommand::on_execute(sender, command).await; return; } if command.starts_with(StopCommand::NAME) { - StopCommand::on_execute(sender, command); + StopCommand::on_execute(sender, command).await; return; } // TODO: red color - sender.send_message("Command not Found".into()); + sender.send_message("Command not Found".into()).await; } diff --git a/pumpkin/src/commands/pumpkin.rs b/pumpkin/src/commands/pumpkin.rs index 8a648dd5..d9d18dc0 100644 --- a/pumpkin/src/commands/pumpkin.rs +++ b/pumpkin/src/commands/pumpkin.rs @@ -12,9 +12,9 @@ impl<'a> Command<'a> for PumpkinCommand { const DESCRIPTION: &'a str = "Displays information about Pumpkin"; - fn on_execute(sender: &mut super::CommandSender<'a>, _command: String) { + async fn on_execute(sender: &mut super::CommandSender<'a>, _command: String) { let version = env!("CARGO_PKG_VERSION"); let description = env!("CARGO_PKG_DESCRIPTION"); - sender.send_message(TextComponent::from(format!("Pumpkin {version}, {description} (Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL})")).color_named(NamedColor::Green)) + sender.send_message(TextComponent::from(format!("Pumpkin {version}, {description} (Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL})")).color_named(NamedColor::Green)).await } } diff --git a/pumpkin/src/commands/stop.rs b/pumpkin/src/commands/stop.rs index 8a2fcfc4..cefb5aef 100644 --- a/pumpkin/src/commands/stop.rs +++ b/pumpkin/src/commands/stop.rs @@ -6,7 +6,7 @@ impl<'a> Command<'a> for StopCommand { const NAME: &'static str = "stop"; const DESCRIPTION: &'static str = "Stops the server"; - fn on_execute(sender: &mut super::CommandSender<'a>, command: String) { + async fn on_execute(sender: &mut super::CommandSender<'a>, command: String) { std::process::exit(0); } fn player_required() -> bool { diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 308140a0..7a86f4b4 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -1,14 +1,18 @@ -use mio::net::TcpListener; -use mio::{Events, Interest, Poll, Token}; -use std::io::{self}; +use std::{ + io::{self}, + net::SocketAddr, + sync::Arc, +}; use client::Client; use commands::handle_command; use config::AdvancedConfiguration; +use tokio::{ + net::TcpListener, + sync::{Mutex, RwLock}, +}; -use std::{collections::HashMap, rc::Rc, thread}; -use client::interrupted; use config::BasicConfiguration; use server::Server; @@ -19,7 +23,7 @@ pub mod commands; pub mod config; pub mod entity; pub mod proxy; -pub mod rcon; +// pub mod rcon; pub mod server; pub mod util; @@ -27,161 +31,91 @@ pub mod util; #[global_allocator] static ALLOC: dhat::Alloc = dhat::Alloc; -#[cfg(not(target_os = "wasi"))] -fn main() -> io::Result<()> { +#[tokio::main] +async fn main() -> io::Result<()> { #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); #[cfg(feature = "dhat-heap")] println!("Using a memory profiler"); - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); // ensure rayon is built outside of tokio scope rayon::ThreadPoolBuilder::new().build_global().unwrap(); - rt.block_on(async { - const SERVER: Token = Token(0); - use std::{cell::RefCell, time::Instant}; - - use rcon::RCONServer; - - let time = Instant::now(); - let basic_config = BasicConfiguration::load("configuration.toml"); - - let advanced_configuration = AdvancedConfiguration::load("features.toml"); - - simple_logger::SimpleLogger::new().init().unwrap(); - - // Create a poll instance. - let mut poll = Poll::new()?; - // Create storage for events. - let mut events = Events::with_capacity(128); - - // Setup the TCP server socket. - - let addr = format!( - "{}:{}", - basic_config.server_address, basic_config.server_port - ) - .parse() - .unwrap(); - - let mut listener = TcpListener::bind(addr)?; - - // Register the server with poll we can receive events for it. - poll.registry() - .register(&mut listener, SERVER, Interest::READABLE)?; - - // Unique token for each incoming connection. - let mut unique_token = Token(SERVER.0 + 1); - - let use_console = advanced_configuration.commands.use_console; - let rcon = advanced_configuration.rcon.clone(); - - let mut connections: HashMap>> = HashMap::new(); - - let mut server = Server::new((basic_config, advanced_configuration)); - log::info!("Started Server took {}ms", time.elapsed().as_millis()); - log::info!("You now can connect to the server, Listening on {}", addr); - - if use_console { - thread::spawn(move || { - let stdin = std::io::stdin(); - loop { - let mut out = String::new(); - stdin - .read_line(&mut out) - .expect("Failed to read console line"); - handle_command(&mut commands::CommandSender::Console, &out); - } - }); - } - if rcon.enabled { - tokio::spawn(async move { - RCONServer::new(&rcon).await.unwrap(); - }); - } - loop { - if let Err(err) = poll.poll(&mut events, None) { - if interrupted(&err) { - continue; - } - return Err(err); - } - - for event in events.iter() { - match event.token() { - SERVER => loop { - // Received an event for the TCP server socket, which - // indicates we can accept an connection. - let (mut connection, address) = match listener.accept() { - Ok((connection, address)) => (connection, address), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // If we get a `WouldBlock` error we know our - // listener has no more incoming connections queued, - // so we can return to polling and wait for some - // more. - break; - } - Err(e) => { - // If it was any other kind of error, something went - // wrong and we terminate with an error. - return Err(e); - } - }; - if let Err(e) = connection.set_nodelay(true) { - log::warn!("failed to set TCP_NODELAY {e}"); - } - - log::info!("Accepted connection from: {}", address); - - let token = next(&mut unique_token); - poll.registry().register( - &mut connection, - token, - Interest::READABLE.add(Interest::WRITABLE), - )?; - let rc_token = Rc::new(token); - let client = Rc::new(RefCell::new(Client::new( - Rc::clone(&rc_token), - connection, - addr, - ))); - server.add_client(rc_token, Rc::clone(&client)); - connections.insert(token, client); - }, - - token => { - // Maybe received an event for a TCP connection. - let done = if let Some(client) = connections.get_mut(&token) { - let mut client = client.borrow_mut(); - client.poll(&mut server, event).await; - client.closed - } else { - // Sporadic events happen, we can safely ignore them. - false - }; - if done { - if let Some(client) = connections.remove(&token) { - server.remove_client(&token); - let mut client = client.borrow_mut(); - poll.registry().deregister(&mut client.connection)?; - } - } - } - } + use std::time::Instant; + + // use rcon::RCONServer; + + let time = Instant::now(); + let basic_config = BasicConfiguration::load("configuration.toml"); + + let advanced_configuration = AdvancedConfiguration::load("features.toml"); + + simple_logger::SimpleLogger::new().init().unwrap(); + + let addr: SocketAddr = format!( + "{}:{}", + basic_config.server_address, basic_config.server_port + ) + .parse() + .unwrap(); + + let listener = TcpListener::bind(addr) + .await + .expect("Failed to start TCP Listener"); + + let use_console = advanced_configuration.commands.use_console; + let rcon = advanced_configuration.rcon.clone(); + + let server = Arc::new(RwLock::new(Server::new(( + basic_config, + advanced_configuration, + )))); + log::info!("Started Server took {}ms", time.elapsed().as_millis()); + log::info!("You now can connect to the server, Listening on {}", addr); + + if use_console { + tokio::spawn(async move { + let stdin = std::io::stdin(); + loop { + let mut out = String::new(); + stdin + .read_line(&mut out) + .expect("Failed to read console line"); + handle_command(&mut commands::CommandSender::Console, &out).await; } + }); + } + // if rcon.enabled { + // tokio::spawn(async move { + // RCONServer::new(&rcon).await.unwrap(); + // }); + // } + let mut current_clients: u32 = 0; + loop { + let (socket, addr) = listener.accept().await?; + log::info!("Accepted connection from: {}", addr); + + if let Err(e) = socket.set_nodelay(true) { + log::error!("failed to set TCP_NODELAY: {e}"); } - }) -} - -fn next(current: &mut Token) -> Token { - let next = current.0; - current.0 += 1; - Token(next) -} - -#[cfg(target_os = "wasi")] -fn main() { - panic!("can't bind to an address with wasi") + let server = server.clone(); + + current_clients += 1; + let token = current_clients; // Replace with your token generation logic + let client = Arc::new(Mutex::new(Client::new(token, socket, addr))); + dbg!("a"); + let mut server_guard = server.write().await; + server_guard.add_client(token, Arc::clone(&client)); + drop(server_guard); + + tokio::spawn(async move { + let mut client = client.lock().await; + + //client.connection.readable().await.expect(":c"); + client.poll(server.clone()).await; + if client.closed { + let mut server_guard = server.write().await; + server_guard.remove_client(&token).await; + current_clients -= 1; + } + }); + } } diff --git a/pumpkin/src/proxy/velocity.rs b/pumpkin/src/proxy/velocity.rs index 3b06ff97..b0dff538 100644 --- a/pumpkin/src/proxy/velocity.rs +++ b/pumpkin/src/proxy/velocity.rs @@ -14,16 +14,18 @@ type HmacSha256 = Hmac; const MAX_SUPPORTED_FORWARDING_VERSION: i32 = 4; const PLAYER_INFO_CHANNEL: &str = "velocity:player_info"; -pub fn velocity_login(client: &mut Client) { +pub async fn velocity_login(client: &mut Client) { let velocity_message_id: i32 = 0; let mut buf = BytesMut::new(); buf.put_u8(MAX_SUPPORTED_FORWARDING_VERSION as u8); - client.send_packet(&CLoginPluginRequest::new( - velocity_message_id.into(), - PLAYER_INFO_CHANNEL, - &buf, - )); + client + .send_packet(&CLoginPluginRequest::new( + velocity_message_id.into(), + PLAYER_INFO_CHANNEL, + &buf, + )) + .await; } pub fn check_integrity(data: (&[u8], &[u8]), secret: String) -> bool { @@ -34,7 +36,7 @@ pub fn check_integrity(data: (&[u8], &[u8]), secret: String) -> bool { mac.verify_slice(signature).is_ok() } -pub fn receive_plugin_response( +pub async fn receive_plugin_response( client: &mut Client, config: VelocityConfig, response: SLoginPluginResponse, @@ -44,7 +46,7 @@ pub fn receive_plugin_response( let (signature, data_without_signature) = data.split_at(32); if !check_integrity((signature, data_without_signature), config.secret) { - client.kick("Unable to verify player details"); + client.kick("Unable to verify player details").await; return; } let mut buf = ByteBuffer::new(BytesMut::new()); @@ -54,9 +56,11 @@ pub fn receive_plugin_response( let version = buf.get_var_int(); let version = version.0; if version > MAX_SUPPORTED_FORWARDING_VERSION { - client.kick(&format!( + client + .kick(&format!( "Unsupported forwarding version {version}, Max: {MAX_SUPPORTED_FORWARDING_VERSION}" - )); + )) + .await; return; } // TODO: no unwrap @@ -64,6 +68,8 @@ pub fn receive_plugin_response( client.address = addr; todo!() } else { - client.kick("This server requires you to connect with Velocity.") + client + .kick("This server requires you to connect with Velocity.") + .await; } } diff --git a/pumpkin/src/rcon/mod.rs b/pumpkin/src/rcon/mod.rs index c3606ca2..580fb69d 100644 --- a/pumpkin/src/rcon/mod.rs +++ b/pumpkin/src/rcon/mod.rs @@ -3,10 +3,6 @@ use std::{ io::{self, Read}, }; -use mio::{ - net::{TcpListener, TcpStream}, - Events, Interest, Poll, Token, -}; use packet::{Packet, PacketError, PacketType}; use thiserror::Error; @@ -24,8 +20,6 @@ pub enum RCONError { Io(io::Error), } -const SERVER: Token = Token(0); - pub struct RCONServer {} impl RCONServer { diff --git a/pumpkin/src/server.rs b/pumpkin/src/server.rs index 30624900..87bd85c2 100644 --- a/pumpkin/src/server.rs +++ b/pumpkin/src/server.rs @@ -1,15 +1,15 @@ use std::{ - cell::{RefCell, RefMut}, collections::HashMap, io::Cursor, - rc::Rc, - sync::atomic::{AtomicI32, Ordering}, + sync::{ + atomic::{AtomicI32, Ordering}, + Arc, + }, time::Duration, }; use base64::{engine::general_purpose, Engine}; use image::GenericImageView; -use mio::{event::Event, Poll, Token}; use num_traits::ToPrimitive; use pumpkin_entity::{entity_type::EntityType, EntityId}; use pumpkin_protocol::{ @@ -30,7 +30,7 @@ use pumpkin_world::{dimension::Dimension, radial_chunk_iterator::RadialIterator} use pumpkin_registry::Registry; use rsa::{traits::PublicKeyParts, RsaPrivateKey, RsaPublicKey}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::{ client::Client, @@ -59,7 +59,7 @@ pub struct Server { /// Cache the registry so we don't have to parse it every time a player joins pub cached_registry: Vec, - pub current_clients: HashMap, Rc>>, + pub current_clients: HashMap>>, // TODO: replace with HashMap entity_id: AtomicI32, // TODO: place this into every world @@ -116,29 +116,25 @@ impl Server { } } - // Returns Tokens to remove - pub async fn poll(&mut self, client: &mut Client, _poll: &Poll, event: &Event) { - // TODO: Poll players in every world - client.poll(self, event).await - } - - pub fn add_client(&mut self, token: Rc, client: Rc>) { + pub fn add_client(&mut self, token: u32, client: Arc>) { self.current_clients.insert(token, client); } - pub fn remove_client(&mut self, token: &Token) { + pub async fn remove_client(&mut self, token: &u32) { let client = self.current_clients.remove(token).unwrap(); - let client = client.borrow(); + let client = client.lock().await; // despawn the player // todo: put this into the entitiy struct if client.is_player() { let id = client.player.as_ref().unwrap().entity_id(); let uuid = client.gameprofile.as_ref().unwrap().id; self.broadcast_packet_expect( - &[&client.token], + &[client.token], &CRemovePlayerInfo::new(1.into(), &[UUID(uuid)]), - ); - self.broadcast_packet_expect(&[&client.token], &CRemoveEntities::new(&[id.into()])) + ) + .await; + self.broadcast_packet_expect(&[client.token], &CRemoveEntities::new(&[id.into()])) + .await } } @@ -156,30 +152,34 @@ impl Server { client.player = Some(player); // login packet for our new player - client.send_packet(&CLogin::new( - entity_id, - self.base_config.hardcore, - &["minecraft:overworld"], - self.base_config.max_players.into(), - self.base_config.view_distance.into(), // TODO: view distance - self.base_config.simulation_distance.into(), // TODO: sim view dinstance - false, - false, - false, - 0.into(), - "minecraft:overworld", - 0, // seed - gamemode.to_u8().unwrap(), - self.base_config.default_gamemode.to_i8().unwrap(), - false, - false, - None, - 0.into(), - false, - )); + client + .send_packet(&CLogin::new( + entity_id, + self.base_config.hardcore, + &["minecraft:overworld"], + self.base_config.max_players.into(), + self.base_config.view_distance.into(), // TODO: view distance + self.base_config.simulation_distance.into(), // TODO: sim view dinstance + false, + false, + false, + 0.into(), + "minecraft:overworld", + 0, // seed + gamemode.to_u8().unwrap(), + self.base_config.default_gamemode.to_i8().unwrap(), + false, + false, + None, + 0.into(), + false, + )) + .await; dbg!("sending abilities"); // player abilities - client.send_packet(&CPlayerAbilities::new(0x02, 0.1, 0.1)); + client + .send_packet(&CPlayerAbilities::new(0x02, 0.1, 0.1)) + .await; // teleport let x = 10.0; @@ -187,7 +187,7 @@ impl Server { let z = 10.0; let yaw = 10.0; let pitch = 10.0; - client.teleport(x, y, z, 10.0, 10.0); + client.teleport(x, y, z, 10.0, 10.0).await; let gameprofile = client.gameprofile.as_ref().unwrap(); // first send info update to our new player, So he can see his Skin // also send his info to everyone else @@ -206,12 +206,13 @@ impl Server { ], }], ), - ); + ) + .await; // here we send all the infos of already joined players let mut entries = Vec::new(); for (_, client) in self.current_clients.iter().filter(|c| c.0 != &client.token) { - let client = client.borrow(); + let client = client.blocking_lock(); if client.is_player() { let gameprofile = client.gameprofile.as_ref().unwrap(); entries.push(pumpkin_protocol::client::play::Player { @@ -226,16 +227,18 @@ impl Server { }) } } - client.send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)); + client + .send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)) + .await; // Start waiting for level chunks - client.send_packet(&CGameEvent::new(13, 0.0)); + client.send_packet(&CGameEvent::new(13, 0.0)).await; let gameprofile = client.gameprofile.as_ref().unwrap(); // spawn player for every client self.broadcast_packet_expect( - &[&client.token], + &[client.token], // TODO: add velo &CSpawnEntity::new( entity_id.into(), @@ -252,29 +255,32 @@ impl Server { 0.0, 0.0, ), - ); + ) + .await; // spawn players for our client - let token = client.token.clone(); + let token = client.token; for (_, existing_client) in self.current_clients.iter().filter(|c| c.0 != &token) { - let existing_client = existing_client.borrow(); + let existing_client = existing_client.blocking_lock(); if let Some(player) = &existing_client.player { let entity = &player.entity; let gameprofile = existing_client.gameprofile.as_ref().unwrap(); - client.send_packet(&CSpawnEntity::new( - player.entity_id().into(), - UUID(gameprofile.id), - EntityType::Player.to_i32().unwrap().into(), - entity.x, - entity.y, - entity.z, - entity.yaw, - entity.pitch, - entity.pitch, - 0.into(), - 0.0, - 0.0, - 0.0, - )) + client + .send_packet(&CSpawnEntity::new( + player.entity_id().into(), + UUID(gameprofile.id), + EntityType::Player.to_i32().unwrap().into(), + entity.x, + entity.y, + entity.z, + entity.yaw, + entity.pitch, + entity.pitch, + 0.into(), + 0.0, + 0.0, + 0.0, + )) + .await; } } // entity meta data @@ -286,16 +292,17 @@ impl Server { Metadata::new(17, VarInt(0), config.skin_parts), ), ) + .await } Server::spawn_test_chunk(client, self.base_config.view_distance as u32).await; } /// TODO: This definitly should be in world - pub fn get_by_entityid(&self, from: &Client, id: EntityId) -> Option> { + pub fn get_by_entityid(&self, from: &Client, id: EntityId) -> Option> { for (_, client) in self.current_clients.iter().filter(|c| c.0 != &from.token) { // Check if client is a player - let client = client.borrow_mut(); + let client = client.blocking_lock(); if client.is_player() && client.player.as_ref().unwrap().entity_id() == id { return Some(client); } @@ -304,34 +311,30 @@ impl Server { } /// Sends a Packet to all Players - pub fn broadcast_packet

(&self, from: &mut Client, packet: &P) + pub async fn broadcast_packet

(&self, from: &mut Client, packet: &P) where P: ClientPacket, { // we can't borrow twice at same time - from.send_packet(packet); + from.send_packet(packet).await; for (_, client) in self.current_clients.iter().filter(|c| c.0 != &from.token) { // Check if client is a player - let mut client = client.borrow_mut(); + let mut client = client.blocking_lock(); if client.is_player() { - client.send_packet(packet); + client.send_packet(packet).await; } } } - pub fn broadcast_packet_expect

(&self, from: &[&Token], packet: &P) + pub async fn broadcast_packet_expect

(&self, from: &[u32], packet: &P) where P: ClientPacket, { - for (_, client) in self - .current_clients - .iter() - .filter(|c| !from.contains(&c.0.as_ref())) - { + for (_, client) in self.current_clients.iter().filter(|c| !from.contains(c.0)) { // Check if client is a player - let mut client = client.borrow_mut(); + let mut client = client.blocking_lock(); if client.is_player() { - client.send_packet(packet); + client.send_packet(packet).await; } } } @@ -350,10 +353,12 @@ impl Server { .await; }); - client.send_packet(&CCenterChunk { - chunk_x: 0.into(), - chunk_z: 0.into(), - }); + client + .send_packet(&CCenterChunk { + chunk_x: 0.into(), + chunk_z: 0.into(), + }) + .await; while let Some((chunk_pos, chunk_data)) = chunk_receiver.recv().await { // dbg!(chunk_pos); @@ -373,7 +378,7 @@ impl Server { len / (1024 * 1024) ); } - client.send_packet(&CChunkData(&chunk_data)); + client.send_packet(&CChunkData(&chunk_data)).await; } let t = inst.elapsed(); dbg!("DONE", t); @@ -392,12 +397,14 @@ impl Server { buf } - pub fn send_brand(&self, client: &mut Client) { + pub async fn send_brand(&self, client: &mut Client) { // send server brand - client.send_packet(&CPluginMessage::new( - "minecraft:brand", - &self.cached_server_brand, - )); + client + .send_packet(&CPluginMessage::new( + "minecraft:brand", + &self.cached_server_brand, + )) + .await; } pub fn build_response(config: &BasicConfiguration) -> StatusResponse {