From ce2b30a274100553b15576b6b0a80e50651cdd84 Mon Sep 17 00:00:00 2001 From: luanshaotong Date: Fri, 18 Aug 2023 09:56:13 +0000 Subject: [PATCH] add logging timestamp; server reconnect to manager --- intercept/src/client.rs | 1 - intercept/src/lib.rs | 3 ++- src/bin/manager.rs | 11 ++++++---- src/bin/server.rs | 21 +++++++++--------- src/client/mod.rs | 3 ++- src/common/info_syncer.rs | 1 + src/common/serialization.rs | 6 +++++ src/rpc/client.rs | 9 +++++--- src/rpc/connection.rs | 7 ++++++ src/rpc/server.rs | 10 ++++++--- src/server/distributed_engine.rs | 8 ++++++- src/server/mod.rs | 28 +++++++++++++++++++----- src/server/storage_engine/file_engine.rs | 11 ++++++---- 13 files changed, 85 insertions(+), 34 deletions(-) diff --git a/intercept/src/client.rs b/intercept/src/client.rs index 3e7edec..08f2fc5 100644 --- a/intercept/src/client.rs +++ b/intercept/src/client.rs @@ -27,7 +27,6 @@ use sealfs::common::serialization::{ use sealfs::rpc::client::TcpStreamCreator; use sealfs::{offset_of, rpc}; pub struct Client { - // TODO replace with a thread safe data structure pub client: Arc< rpc::client::RpcClient< tokio::net::tcp::OwnedReadHalf, diff --git a/intercept/src/lib.rs b/intercept/src/lib.rs index c304c9d..a1c145d 100644 --- a/intercept/src/lib.rs +++ b/intercept/src/lib.rs @@ -5,6 +5,7 @@ pub mod syscall_intercept; pub mod test_log; use client::CLIENT; +use env_logger::fmt; use file_desc::{FdAttr, FdType}; use lazy_static::lazy_static; use libc::{ @@ -69,7 +70,7 @@ extern "C" fn initialize() { let log_level = std::env::var("SEALFS_LOG_LEVEL").unwrap_or("warn".to_string()); let mut builder = env_logger::Builder::from_default_env(); builder - .format_timestamp(None) + .format_timestamp(Some(fmt::TimestampPrecision::Millis)) .filter(None, log::LevelFilter::from_str(&log_level).unwrap()); builder.init(); diff --git a/src/bin/manager.rs b/src/bin/manager.rs index a1a5519..06af20a 100644 --- a/src/bin/manager.rs +++ b/src/bin/manager.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use clap::Parser; +use env_logger::fmt; use log::{error, info, warn}; use sealfs::manager::manager_service::update_server_status; use sealfs::{manager::manager_service::ManagerService, rpc::server::RpcServer}; @@ -92,10 +93,12 @@ async fn main() -> anyhow::Result<()> { }, }; - builder.format_timestamp(None).filter( - None, - log::LevelFilter::from_str(&properties.log_level).unwrap(), - ); + builder + .format_timestamp(Some(fmt::TimestampPrecision::Millis)) + .filter( + None, + log::LevelFilter::from_str(&properties.log_level).unwrap(), + ); builder.init(); info!("Starting manager with log level: {}", properties.log_level); diff --git a/src/bin/server.rs b/src/bin/server.rs index cff079c..20a2581 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use clap::Parser; +use env_logger::fmt; use log::info; use sealfs::server; use serde::{Deserialize, Serialize}; @@ -27,8 +28,6 @@ struct Args { #[arg(required = true, long)] storage_path: Option, #[arg(long)] - heartbeat: Option, - #[arg(long)] log_level: Option, } @@ -40,7 +39,6 @@ struct Properties { cache_capacity: usize, write_buffer_size: usize, storage_path: String, - heartbeat: bool, log_level: String, } @@ -56,18 +54,19 @@ async fn main() -> anyhow::Result<(), Box> { cache_capacity: args.cache_capacity.unwrap_or(13421772), write_buffer_size: args.write_buffer_size.unwrap_or(0x4000000), storage_path: args.storage_path.unwrap(), - heartbeat: args.heartbeat.unwrap_or(false), log_level: args.log_level.unwrap_or("warn".to_owned()), }; let mut builder = env_logger::Builder::from_default_env(); - builder.format_timestamp(None).filter( - None, - match log::LevelFilter::from_str(&properties.log_level) { - Ok(level) => level, - Err(_) => log::LevelFilter::Warn, - }, - ); + builder + .format_timestamp(Some(fmt::TimestampPrecision::Millis)) + .filter( + None, + match log::LevelFilter::from_str(&properties.log_level) { + Ok(level) => level, + Err(_) => log::LevelFilter::Warn, + }, + ); builder.init(); info!("start server with properties: {:?}", properties); diff --git a/src/client/mod.rs b/src/client/mod.rs index 5516651..eae515d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -5,6 +5,7 @@ pub mod daemon; pub mod fuse_client; use clap::{Parser, Subcommand}; +use env_logger::fmt; use fuser::{ Filesystem, ReplyAttr, ReplyCreate, ReplyData, ReplyDirectory, ReplyEntry, ReplyOpen, ReplyWrite, Request, @@ -369,7 +370,7 @@ pub async fn run_command() -> Result<(), Box> { }; let mut builder = env_logger::Builder::from_default_env(); builder - .format_timestamp(None) + .format_timestamp(Some(fmt::TimestampPrecision::Millis)) .filter(None, log::LevelFilter::from_str(&log_level).unwrap()); builder.init(); diff --git a/src/common/info_syncer.rs b/src/common/info_syncer.rs index d207dd2..257a697 100644 --- a/src/common/info_syncer.rs +++ b/src/common/info_syncer.rs @@ -96,6 +96,7 @@ pub trait ClientStatusMonitor: InfoSyncer { ClusterStatus::PreFinish => self.get_new_address(path), ClusterStatus::Finishing => self.get_address(path), ClusterStatus::StatusError => todo!(), + ClusterStatus::Unkown => todo!(), } } diff --git a/src/common/serialization.rs b/src/common/serialization.rs index bdc42e4..de34791 100644 --- a/src/common/serialization.rs +++ b/src/common/serialization.rs @@ -276,6 +276,7 @@ pub enum ClusterStatus { PreFinish = 306, Finishing = 307, StatusError = 308, + Unkown = 309, } impl TryFrom for ClusterStatus { @@ -292,6 +293,7 @@ impl TryFrom for ClusterStatus { 306 => Ok(ClusterStatus::PreFinish), 307 => Ok(ClusterStatus::Finishing), 308 => Ok(ClusterStatus::StatusError), + 309 => Ok(ClusterStatus::Unkown), _ => Err(format!("Unkown value: {}", value)), } } @@ -309,6 +311,7 @@ impl From for u32 { ClusterStatus::PreFinish => 306, ClusterStatus::Finishing => 307, ClusterStatus::StatusError => 308, + ClusterStatus::Unkown => 309, } } } @@ -327,6 +330,7 @@ impl TryFrom for ClusterStatus { 306 => Ok(ClusterStatus::PreFinish), 307 => Ok(ClusterStatus::Finishing), 308 => Ok(ClusterStatus::StatusError), + 309 => Ok(ClusterStatus::Unkown), _ => Err(format!("Unkown value: {}", value)), } } @@ -344,6 +348,7 @@ impl From for i32 { ClusterStatus::PreFinish => 306, ClusterStatus::Finishing => 307, ClusterStatus::StatusError => 308, + ClusterStatus::Unkown => 309, } } } @@ -360,6 +365,7 @@ impl Display for ClusterStatus { Self::PreFinish => write!(f, "PreFinish"), Self::Finishing => write!(f, "DeleteNodes"), Self::StatusError => write!(f, "StatusError"), + Self::Unkown => write!(f, "Unkown"), } } } diff --git a/src/rpc/client.rs b/src/rpc/client.rs index b209640..aecf18c 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -250,7 +250,7 @@ impl< return Ok(()); } Err(e) => { - error!("wait for callback failed: {}", e); + error!("wait for callback failed: {}, batch: {}, id {}, operation type: {}, path: {}", e, batch, id, operation_type, path); continue; } } @@ -293,13 +293,16 @@ pub async fn parse_response( let result = { match pool.lock_if_not_timeout(batch, id) { Ok(_) => Ok(()), - Err(_) => Err("lock timeout"), + Err(e) => Err(e), } }; match result { Ok(_) => {} Err(e) => { - error!("parse_response lock timeout: {}", e); + error!( + "parse_response lock timeout: {}, batch: {}, id: {}", + e, batch, id + ); let result = connection .clean_response(&mut read_stream, total_length) .await; diff --git a/src/rpc/connection.rs b/src/rpc/connection.rs index 1faf9d0..145d042 100644 --- a/src/rpc/connection.rs +++ b/src/rpc/connection.rs @@ -225,6 +225,13 @@ impl ServerConnection { self.name_id.clone() } + pub async fn close(&self) -> Result<(), String> { + let mut stream = self.write_stream.lock().await; + stream.shutdown().await.map_err(|e| e.to_string())?; + info!("close connection {}", self.name_id); + Ok(()) + } + // response // | batch | id | status | flags | total_length | meta_data_lenght | data_length | meta_data | data | // | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 4Byte | 0~ | 0~ | diff --git a/src/rpc/server.rs b/src/rpc/server.rs index a18487c..a18ac0f 100644 --- a/src/rpc/server.rs +++ b/src/rpc/server.rs @@ -43,7 +43,7 @@ pub async fn handle< connection.id, header.r#type, header.flags, - path, + path.clone(), data, metadata, ) @@ -61,11 +61,15 @@ pub async fn handle< ) .await { - error!("handle, send response error: {}", e); + error!("handle connection: {} , send response error: {}, batch: {}, id: {}, operation_type: {}, flags: {}, path: {:?}", connection.id, e, header.batch, header.id, header.r#type, header.flags, std::str::from_utf8(&path)); + let _ = connection.close().await; } } Err(e) => { - error!("handle, dispatch error: {}", e); + error!( + "handle connection: {} , dispatch error: {}", + connection.id, e + ); } } } diff --git a/src/server/distributed_engine.rs b/src/server/distributed_engine.rs index 6f7afd1..82c1a8a 100644 --- a/src/server/distributed_engine.rs +++ b/src/server/distributed_engine.rs @@ -72,7 +72,7 @@ where meta_engine, client: client.clone(), sender: Sender::new(client), - cluster_status: AtomicI32::new(ClusterStatus::Initializing.into()), + cluster_status: AtomicI32::new(ClusterStatus::Unkown.into()), hash_ring: Arc::new(RwLock::new(None)), new_hash_ring: Arc::new(RwLock::new(None)), manager_address: Arc::new(Mutex::new("".to_string())), @@ -471,6 +471,7 @@ where } ClusterStatus::Finishing => (self.get_address(path), false), ClusterStatus::StatusError => todo!(), + ClusterStatus::Unkown => todo!(), //s => panic!("get forward address failed, invalid cluster status: {}", s), } } @@ -893,6 +894,11 @@ where ) -> Result, i32> { let path = get_full_path(parent, name); + debug!( + "create file, parent_dir: {}, file_name: {}, oflag: {}, umask: {}, mode: {}", + parent, name, oflag, umask, mode + ); + if self.lock_file(parent)?.insert(name.to_owned(), 0).is_some() { if (oflag & O_EXCL) != 0 { debug!("create file failed, file exists, path: {}", path); diff --git a/src/server/mod.rs b/src/server/mod.rs index 2eca8c2..f315a20 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -284,6 +284,13 @@ pub async fn run( tokio::spawn(sync_cluster_status(Arc::clone(&engine))); + while >::try_into(engine.cluster_status.load(Ordering::Relaxed)) + .unwrap() + == ClusterStatus::Unkown + { + sleep(Duration::from_secs(1)).await; + } + let handler = Arc::new(FileRequestHandler::new(engine.clone())); let server = RpcServer::new(handler, &server_address); @@ -319,12 +326,23 @@ pub async fn run( .write() .replace(HashRing::new(all_servers_address)); info!("Init: Update Hash Ring Success."); - match engine.update_server_status(ServerStatus::Finished).await { - Ok(_) => { - info!("Update Server Status to Finish Success."); + + match >::try_into(engine.cluster_status.load(Ordering::Relaxed)) + .unwrap() + { + ClusterStatus::Initializing => { + match engine.update_server_status(ServerStatus::Finished).await { + Ok(_) => { + info!("Update Server Status to Finish Success."); + } + Err(e) => { + panic!("Update Server Status to Finish Failed. Error = {}", e); + } + } } - Err(e) => { - panic!("Update Server Status to Finish Failed. Error = {}", e); + ClusterStatus::Idle => {} + e => { + panic!("Cluster Status Unexpected. Status = {:?}", e as u32); } } info!("Init: Start Transferring Data."); diff --git a/src/server/storage_engine/file_engine.rs b/src/server/storage_engine/file_engine.rs index 2fc41a9..2d45fb6 100644 --- a/src/server/storage_engine/file_engine.rs +++ b/src/server/storage_engine/file_engine.rs @@ -117,8 +117,11 @@ impl StorageEngine for FileEngine { return Err(f_errno); }; debug!( - "read_file path: {}, size: {}, offset: {}, data: {:?}", - path, real_size, offset, data + "read_file path: {}, size: {}, offset: {}, data_length: {:?}", + path, + real_size, + offset, + data.len() ); // this is a temporary solution, which results in an extra memory copy. @@ -154,7 +157,7 @@ impl StorageEngine for FileEngine { }; if fd < 0 { let f_errno = errno(); - error!("read file error: {:?}", status_to_string(f_errno)); + error!("write file error: {:?}", status_to_string(f_errno)); return Err(f_errno); } self.cache @@ -201,7 +204,7 @@ impl StorageEngine for FileEngine { }; if fd < 0 { let f_errno = errno(); - error!("read file error: {:?}", status_to_string(f_errno)); + error!("create_file error: {:?}", status_to_string(f_errno)); return Err(f_errno); } self.cache