Skip to content

Commit

Permalink
Merge pull request #1948 from dedis/work-be2-daniel-clean-logs
Browse files Browse the repository at this point in the history
[BE2] Clean Log
  • Loading branch information
DanielTavaresA authored Jun 27, 2024
2 parents 4ed41a1 + fa1e905 commit d3c68fe
Show file tree
Hide file tree
Showing 15 changed files with 124 additions and 97 deletions.
2 changes: 1 addition & 1 deletion be2-scala/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<configuration debug="false" scan="true" scanPeriod="15 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%d{HH:mm:ss.SSS} - %highlight(%-5level) : %logger{0} > %msg%n</pattern>
</encoder>
</appender>

Expand Down
10 changes: 5 additions & 5 deletions be2-scala/src/main/scala/ch/epfl/pop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ object Server {

bindingFuture.onComplete {
case Success(_) =>
println(f"[Client] ch.epfl.pop.Server online at $ownClientAddress")
println(f"[Server] ch.epfl.pop.Server online at $ownServerAddress")
println(f"[Server] ch.epfl.pop.Server auth server online at $ownAuthAddress")
println(f"[Server] ch.epfl.pop.Server auth ws server online at $ownResponseAddress")
println(f"[Server] ch.epfl.pop.Server public key available at $ownPublicKeyAddress")
logger.info(f"[Client] online at $ownClientAddress")
logger.info(f"[Server] online at $ownServerAddress")
logger.info(f"[Server] auth server online at $ownAuthAddress")
logger.info(f"[Server] auth ws server online at $ownResponseAddress")
logger.info(f"[Server] public key available at $ownPublicKeyAddress")

case Failure(_) =>
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ final case class ConnectionMediator(
)

case ConnectionMediator.ServerLeft(serverRef) =>
log.info("Server left")
log.info(s"Server ${serverMap.get(serverRef) match
case Some(greet) => greet.serverAddress
case None => serverRef.path.name
} left")
serverMap -= serverRef
// Tell monitor to stop scheduling heartbeats since there is no one to receive them
if (serverMap.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
Await.result(readPk, duration) match
case DbActor.DbActorReadServerPublicKeyAck(pk) => Some(pk)
case _ =>
log.info(s"Actor (gossip) $self will not be able to create rumors because it has no publicKey")
log.error(s"Will not be able to create rumors because it has no publicKey")
None
}

Expand All @@ -68,22 +68,23 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
// checks the peers to which we already forwarded the message
val activeGossip: Set[ActorRef] = peersAlreadyReceived(rumorRpc)
// selects a random peer from remaining peers
val rumor = rumorRpc.getParams.asInstanceOf[Rumor]
val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer(activeGossip)
Await.result(randomPeer, duration) match {
// updates the list based on response
// if some peers are available we send
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
val alreadySent: Set[ActorRef] = activeGossip + serverRef
activeGossipProtocol += (rumorRpc -> alreadySent)
log.info(s"rumorSent > dest : ${greetServer.clientAddress}, rumor : $rumorRpc")
log.info(s"Sent rumor {${rumor.senderPk}:${rumor.rumorId}} to ${greetServer.clientAddress}")
serverRef ! ClientAnswer(
Right(rumorRpc)
)
// else remove entry
case ConnectionMediator.NoPeer() =>
activeGossipProtocol = activeGossipProtocol.removed(rumorRpc)
case _ =>
log.info(s"Actor $self received an unexpected message waiting for a random peer")
log.warning(s"Received an unexpected message waiting for a random peer. Gossip step has been ignored for rumor {${rumor.senderPk}:${rumor.rumorId}}")
}
}

Expand Down Expand Up @@ -116,7 +117,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
}
}
} else {
log.info(s"Unexpected match for active gossip. Response with id ${response.id} matched with ${activeGossipPeers.size} entries")
log.warning(s"Unexpected match for active gossip. Response with id ${response.id} matched with ${activeGossipPeers.size} entries. Duplicates will be removed.")
// removes duplicate entries to come back to a stable state
activeGossipPeers.foreach { (rumorRpc, _) =>
activeGossipProtocol -= rumorRpc
Expand All @@ -136,16 +137,16 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case DbActorGetRumorStateAck(rumorState) =>
state = rumorState
case _ =>
log.info(s"Actor (gossip) $self was not able to get its rumor state. Gossip has not started")
log.warning(s"Was not able to get its rumor state. Gossip has not started")
return
val rumor: Rumor = Rumor(publicKey.get, getRumorId(publicKey.get) + 1, messages, state)
val jsonRpcRequest = prepareRumor(rumor)
val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor)
Await.result(writeRumor, duration) match
case DbActorAck() => updateGossip(jsonRpcRequest)
case _ => log.info(s"Actor (gossip) $self was not able to write rumor in memory. Gossip has not started.")
case _ => log.warning(s"Was not able to write rumor in memory. Gossip has not started.")
else
log.info(s"Actor (gossip) $self will not be able to start rumors because it has no publicKey")
log.error(s"Will not be able to start rumors because it has no publicKey")
}

private def getRumorId(publicKey: PublicKey): Int = {
Expand All @@ -164,6 +165,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
val rumorStateGet = dbActorRef ? GetRumorState()
Await.result(rumorStateGet, duration) match
case DbActorGetRumorStateAck(rumorState) =>
log.info(s"Sending rumor_state ${rumorState.state} to ${greetServer.serverAddress}")
serverRef ! ClientAnswer(
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
Expand All @@ -173,9 +175,9 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
))
)
jsonId += 1
case _ => log.info(s"Actor $self failed on creating rumor state")
case _ =>
log.info(s"Actor $self received an unexpected message waiting for a random peer")
case _ => log.error(s"Actor $self failed on creating rumor state. State wasn't gossiped.")
case m @ _ =>
log.warning(s"Received an unexpected message $m waiting for a random peer")
}
}

Expand Down Expand Up @@ -208,7 +210,7 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
startGossip(messages)

case ConnectionMediator.Ping() =>
log.info(s"Actor $self received a ping from Connection Mediator")
log.info(s"Received a ping from Connection Mediator")
connectionMediatorRef = sender()

case Monitor.AtLeastOneServerConnected =>
Expand All @@ -220,8 +222,8 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case TriggerPullState() =>
sendRumorState()

case _ =>
log.info(s"Actor $self received an unexpected message")
case m @ _ =>
log.info(s"Received an unexpected message $m.")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ final case class Monitor(
timers.cancelAll()

case Monitor.TriggerHeartbeat =>
log.info("triggering a heartbeat")
log.info("Triggering a heartbeat")
timers.cancel(singleHbKey)

val askForHeartbeat = dbActorRef ? DbActor.GenerateHeartbeat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,13 @@ class ResultObject(val result: Option[ResultType]) {
case _ => false
case _ => false
}

override def toString: String = {
result.get match
case ResultInt(result) => result.toString
case ResultMessage(result) => result.toString()
case ResultMap(result) => result.toString()
case ResultRumor(result) => result.toString()
case ResultEmptyList() => List.empty.toString()
}
}
22 changes: 12 additions & 10 deletions be2-scala/src/main/scala/ch/epfl/pop/pubsub/ClientActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import ch.epfl.pop.decentralized.ConnectionMediator
import ch.epfl.pop.model.network.{JsonRpcRequest, MethodType}
import ch.epfl.pop.model.network.method.GreetServer
import ch.epfl.pop.model.objects.Channel
import ch.epfl.pop.pubsub.ClientActor._
import ch.epfl.pop.pubsub.PubSubMediator._
import ch.epfl.pop.pubsub.graph.GraphMessage
import ch.epfl.pop.pubsub.ClientActor.*
import ch.epfl.pop.pubsub.PubSubMediator.*
import ch.epfl.pop.pubsub.graph.{GraphMessage, compactPrinter, prettyPrinter}
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.storage.DbActor

Expand Down Expand Up @@ -65,15 +65,15 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef
}
case message: PubSubMediatorMessage => message match {
case SubscribeToAck(channel) =>
log.info(s"Actor $self received ACK mediator $mediator for the subscribe to channel '$channel' request")
log.info(s"Received ACK from Mediator to subscribe request on channel '$channel'")
subscribedChannels += channel
case UnsubscribeFromAck(channel) =>
log.info(s"Actor $self received ACK mediator $mediator for the unsubscribe from channel '$channel' request")
log.info(s"Received ACK from Mediator to unsubscribe request on channel '$channel' request")
subscribedChannels -= channel
case SubscribeToNAck(channel, reason) =>
log.info(s"Actor $self received NACK mediator $mediator for the subscribe to channel '$channel' request for reason: $reason")
log.info(s"Received NACK from Mediator to subscribe request on channel '$channel' for reason: $reason")
case UnsubscribeFromNAck(channel, reason) =>
log.info(s"Actor $self received NACK mediator $mediator for the unsubscribe from channel '$channel' request for reason: $reason")
log.info(s"Received NACK from Mediator for unsubscribe request on channel '$channel' for reason: $reason")
case PropagateAck() => // Nothing to do.
}
case greetServer: GreetServer =>
Expand All @@ -83,7 +83,7 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef
connectionMediatorRef ! ConnectionMediator.NewServerConnected(self, greetServer)

case clientAnswer @ ClientAnswer(_) =>
log.info(s"Sending an answer back to client $wsHandle: $clientAnswer")
log.info(s"Sending an answer back to ${if isServer then "server" else "client"} ${wsHandle.get.path.name}: $clientAnswer")
messageWsHandle(clientAnswer)

case m @ _ => m match {
Expand Down Expand Up @@ -114,7 +114,7 @@ final case class ClientActor(mediator: ActorRef, connectionMediatorRef: ActorRef
}

if (publicKey.isDefined) {
log.info("Sending greet")
log.info("Sending greetServer")
val greetServer = GreetServer(publicKey.get, clientAddress, serverAddress)
messageWsHandle(ClientAnswer(Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
Expand All @@ -135,7 +135,9 @@ object ClientActor {
sealed trait ClientActorMessage

// answer to be sent to the client represented by the client actor
final case class ClientAnswer(graphMessage: GraphMessage) extends ClientActorMessage
final case class ClientAnswer(graphMessage: GraphMessage) extends ClientActorMessage {
override def toString: String = compactPrinter(graphMessage)
}

sealed trait Event

Expand Down
24 changes: 12 additions & 12 deletions be2-scala/src/main/scala/ch/epfl/pop/pubsub/PubSubMediator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,41 @@ class PubSubMediator extends Actor with ActorLogging with AskPatternConstants {
// if we have people already subscribed to said channel
case Some(set) =>
if (set.contains(clientActorRef)) {
log.info(s"$clientActorRef already subscribed to '$channel'")
log.warning(s"client $clientActorRef already subscribed to '$channel'")
Future(SubscribeToAck(channel))
} else {
log.info(s"Subscribing $clientActorRef to channel '$channel'")
log.info(s"Subscribing client $clientActorRef to channel '$channel'")
set += clientActorRef
Future(SubscribeToAck(channel))
}

// if we have no one subscribed to said channel
case _ =>
log.info(s"Subscribing $clientActorRef to channel '$channel'")
log.info(s"Subscribing client $clientActorRef to channel '$channel'")
channelMap = channelMap ++ List(channel -> mutable.Set(clientActorRef))
Future(SubscribeToAck(channel))
}

// db doesn't recognize the channel, thus mediator cannot subscribe anyone to a non existing channel
case _ =>
val reason: String = s"Channel '$channel' doesn't exist in db"
log.info(reason)
val reason: String = s"Channel '$channel' doesn't exist in db."
log.warning(s"$reason. Mediator cannot subscribe to non-existing channel")
Future(SubscribeToNAck(channel, reason))
}
}

private def unsubscribeFrom(channel: Channel, clientActorRef: ActorRef): PubSubMediatorMessage = channelMap.get(channel) match {
case Some(set) if set.contains(clientActorRef) =>
log.info(s"Unsubscribing $clientActorRef from channel '$channel'")
log.info(s"Unsubscribing client $clientActorRef from channel '$channel'")
set -= clientActorRef
UnsubscribeFromAck(channel)
case Some(_) =>
val reason: String = s"Actor $clientActorRef is not subscribed to channel '$channel'"
log.info(reason)
val reason: String = s"Client $clientActorRef is not subscribed to channel '$channel'"
log.warning(reason)
UnsubscribeFromNAck(channel, reason)
case _ =>
val reason: String = s"Channel '$channel' does not exist in the system"
log.info(reason)
log.warning(reason)
UnsubscribeFromNAck(channel, reason)
}

Expand All @@ -79,11 +79,11 @@ class PubSubMediator extends Actor with ActorLogging with AskPatternConstants {

channelMap.get(channel) match {
case Some(clientRefs: mutable.Set[ActorRef]) =>
log.info(s"Actor $self (PubSubMediator) is propagating a message to ${clientRefs.size} clients")
log.info(s"Propagating a message to ${clientRefs.size} clients")
clientRefs.foreach(clientRef => clientRef ! ClientActor.ClientAnswer(generateAnswer()))

case _ =>
log.info(s"Actor $self (PubSubMediator) did not propagate any message since no client is subscribed to channel $channel")
log.warning(s"Did not propagate any message since no client is subscribed to channel $channel")
}
}

Expand All @@ -102,7 +102,7 @@ class PubSubMediator extends Actor with ActorLogging with AskPatternConstants {
sender() ! PropagateAck()

case m @ _ =>
log.error(s"PubSubMediator received an unknown message : $m")
log.warning(s"Received an unknown message : $m. Message is ignored.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object PublishSubscribe {

/* building blocks */
// input message from the client
val input = builder.add(Flow[Message].collect { case TextMessage.Strict(s) => println(s">>> Incoming message : $s"); s })
val input = builder.add(Flow[Message].collect { case TextMessage.Strict(s) => system.log.info(s"Incoming message : $s"); s })
val schemaVerifier = builder.add(SchemaVerifier.rpcSchemaVerifier)
val jsonRpcDecoder = builder.add(MessageDecoder.jsonRpcParser)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package ch.epfl.pop.pubsub.graph

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.event.Logging
import akka.http.scaladsl.model.ws.TextMessage
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{CompletionStrategy, OverflowStrategy}
import ch.epfl.pop.json.HighLevelProtocol._
import ch.epfl.pop.json.HighLevelProtocol.*
import ch.epfl.pop.model.network.{ErrorObject, JsonRpcRequest, JsonRpcResponse}
import ch.epfl.pop.pubsub.ClientActor.{ClientAnswer, ConnectWsHandle, DisconnectWsHandle}
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import spray.json._
import spray.json.*

object Answerer {
private val CLIENT_BUFFER_SIZE: Int = 256
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ object ParamsHandler extends AskPatternConstants {
system.log.info(s"All messages from rumor ${rumor.rumorId} were processed correctly")
return Right(jsonRpcMessage)
}
system.log.info(s"Some messages from rumor ${rumor.rumorId} were not processed")
system.log.warning(s"Some messages from rumor ${rumor.rumorId} were not processed. Unprocessed rumor not written in memory")
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"Some messages from Rumor ${rumor.rumorId} with jsonRpcId : ${jsonRpcMessage.id} couldn't be processed", jsonRpcMessage.id))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object ProcessMessagesHandler extends AskPatternConstants {
processedRumors = processedRumors.prepended(rumor)
}
if !successful then
system.log.info(s"Failed to process all rumors from rumorStateAnswer $jsonId. Processed rumors where ${processedRumors.map(rumor => (rumor.senderPk, rumor.rumorId)).tail}")
system.log.info(s"Failed to process all rumors from rumorStateAnswer $jsonId. Processed rumors where ${processedRumors.map(rumor => (rumor.senderPk, rumor.rumorId)).tail}. Unprocessed rumors not written in memory.")
Left(PipelineError(
ErrorCodes.SERVER_ERROR.id,
s"Rumor state handler was not able to process all rumors from $msg",
Expand Down
11 changes: 11 additions & 0 deletions be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,15 @@ package object graph {
case Left(pipelineError) => pipelineError.toString
}
}

def compactPrinter(graphMessage: GraphMessage): String = {
graphMessage match {
case Right(jsonRpcMessage: JsonRpcMessage) =>
jsonRpcMessage match {
case jsonRpcRequest: JsonRpcRequest => HighLevelProtocol.jsonRpcRequestFormat.write(jsonRpcRequest).compactPrint
case jsonRpcResponse: JsonRpcResponse => HighLevelProtocol.jsonRpcResponseFormat.write(jsonRpcResponse).compactPrint
}
case Left(pipelineError) => pipelineError.toString
}
}
}
Loading

0 comments on commit d3c68fe

Please sign in to comment.