Skip to content

Commit

Permalink
[HOTFIX] Divide frontier update and disable expensive metrics (#3634)
Browse files Browse the repository at this point in the history
Includes #3639
  • Loading branch information
nholland94 authored Oct 15, 2019
1 parent 77280b7 commit 3bac778
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 78 deletions.
45 changes: 24 additions & 21 deletions src/lib/kademlia/membership.ml
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,24 @@ module Haskell_process = struct
type t =
{ failure_response: [`Die | `Ignore] ref
; process: Process.t
; mutable already_waited: bool
; terminated_ivar: unit Ivar.t
(** Filled once the process is terminated. *)
; lock_path: string }

let kill {failure_response; process; lock_path; already_waited} =
let kill {failure_response; process; terminated_ivar; _} =
failure_response := `Ignore ;
if not already_waited then
let%bind _ =
Process.run_exn ~prog:"kill"
~args:[Pid.to_string (Process.pid process)]
()
in
let%bind _ = Process.wait process in
Sys.remove lock_path
else Deferred.unit
match Ivar.peek terminated_ivar with
| None ->
let%bind _ =
Process.run_exn ~prog:"kill"
~args:[Pid.to_string (Process.pid process)]
()
in
(* Wait for the process to terminate before returning. *)
Ivar.read terminated_ivar
| Some () ->
(* Process has already terminated so killing it is a no-op. *)
Deferred.unit

let cli_format : Unix.Inet_addr.t -> int -> string =
fun host discovery_port ->
Expand Down Expand Up @@ -234,13 +238,14 @@ module Haskell_process = struct
{ failure_response= ref `Die
; process
; lock_path
; already_waited= false }
; terminated_ivar= Ivar.create () }
with
| Ok p ->
(* If the Kademlia process dies, kill the parent daemon process. Fix
* for #550 *)
Deferred.bind (Process.wait p.process) ~f:(fun code ->
p.already_waited <- true ;
let%bind () = Sys.remove lock_path in
Ivar.fill p.terminated_ivar () ;
match (!(p.failure_response), code) with
| `Ignore, _ | _, Ok () ->
return ()
Expand All @@ -250,7 +255,6 @@ module Haskell_process = struct
~metadata:
[ ( "exit_or_signal"
, `String (Unix.Exit_or_signal.to_string_hum e) ) ] ;
let%map () = Sys.remove lock_path in
raise Child_died )
|> don't_wait_for ;
Ok p
Expand All @@ -263,20 +267,19 @@ module Haskell_process = struct
in
let kill_locked_process ~logger =
match%bind Sys.file_exists lock_path with
| `Yes -> (
| `Yes ->
let%bind p = Reader.file_contents lock_path in
match%bind Process.run ~prog:"kill" ~args:[p] () with
let%bind kill_res = Process.run ~prog:"kill" ~args:[p] () in
( match kill_res with
| Ok _ ->
Logger.debug logger ~module_:__MODULE__ ~location:__LOC__
"Killing Kademlia process: $process"
~metadata:[("process", `String p)] ;
let%map () = Sys.remove lock_path in
Ok ()
~metadata:[("process", `String p)]
| Error _ ->
Logger.debug logger ~module_:__MODULE__ ~location:__LOC__
"Process $process does not exist, won't kill"
~metadata:[("process", `String p)] ;
return @@ Ok () )
~metadata:[("process", `String p)] ) ;
Deferred.map ~f:Or_error.return @@ Sys.remove lock_path
| _ ->
return @@ Ok ()
in
Expand Down
2 changes: 1 addition & 1 deletion src/lib/non_empty_list/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(library
(name non_empty_list)
(public_name non_empty_list)
(libraries core_kernel)
(libraries core_kernel async_kernel)
(preprocess (pps ppx_coda ppx_jane ppx_deriving.eq)))
5 changes: 5 additions & 0 deletions src/lib/non_empty_list/non_empty_list.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ let min_elt ~compare (x, xs) =
let max_elt ~compare (x, xs) =
Option.value_map ~default:x (List.max_elt ~compare xs) ~f:(fun maximum ->
if compare x maximum > 0 then x else maximum )

let rec iter_deferred (x, xs) ~f =
let open Async_kernel in
let%bind () = f x in
match xs with [] -> return () | h :: t -> iter_deferred (h, t) ~f
5 changes: 5 additions & 0 deletions src/lib/non_empty_list/non_empty_list.mli
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,8 @@ val take : 'a t -> int -> 'a t option
val min_elt : compare:('a -> 'a -> int) -> 'a t -> 'a

val max_elt : compare:('a -> 'a -> int) -> 'a t -> 'a

val iter_deferred :
'a t
-> f:('a -> unit Async_kernel.Deferred.t)
-> unit Async_kernel.Deferred.t
131 changes: 75 additions & 56 deletions src/lib/transition_frontier/transition_frontier.ml
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ struct
* modifies the heir's staged-ledger and sets the heir as the new root.
* Modifications are in-place
*)
let move_root t (soon_to_be_root_node : Node.t) : Node.t =
let move_root t (soon_to_be_root_node : Node.t) : Node.t Deferred.t =
let root_node = Hashtbl.find_exn t.table t.root in
let root_breadcrumb = root_node.breadcrumb in
let root = root_breadcrumb |> Breadcrumb.staged_ledger in
Expand All @@ -554,7 +554,10 @@ struct
let soon_to_be_root_merkle_root =
Ledger.merkle_root soon_to_be_root_ledger
in
Ledger.commit soon_to_be_root_ledger ;
let%bind () = Async.Scheduler.yield () in
O1trace.measure "committing new root mask" (fun () ->
Ledger.commit soon_to_be_root_ledger ) ;
let%map () = Async.Scheduler.yield () in
let root_ledger_merkle_root_after_commit =
Ledger.merkle_root root_ledger
in
Expand All @@ -576,19 +579,11 @@ struct
Hashtbl.remove t.table t.root ;
Hashtbl.set t.table ~key:new_root_hash ~data:new_root_node ;
t.root <- new_root_hash ;
let num_finalized_staged_txns =
Breadcrumb.user_commands new_root |> List.length |> Float.of_int
in
(* TODO: these metrics are too expensive to compute in this way, but it should be ok for beta *)
(*
let root_snarked_ledger_accounts =
Ledger.Db.to_list t.root_snarked_ledger
in
Coda_metrics.(
Gauge.set Transition_frontier.recently_finalized_staged_txns
num_finalized_staged_txns) ;
Coda_metrics.(
Counter.inc Transition_frontier.finalized_staged_txns
num_finalized_staged_txns) ;
Coda_metrics.(
Gauge.set Transition_frontier.root_snarked_ledger_accounts
(Float.of_int @@ List.length root_snarked_ledger_accounts)) ;
Expand All @@ -598,6 +593,16 @@ struct
@@ List.fold_left root_snarked_ledger_accounts ~init:0
~f:(fun sum account ->
sum + Currency.Balance.to_int account.balance ) )) ;
*)
let num_finalized_staged_txns =
Breadcrumb.user_commands new_root |> List.length |> Float.of_int
in
Coda_metrics.(
Gauge.set Transition_frontier.recently_finalized_staged_txns
num_finalized_staged_txns) ;
Coda_metrics.(
Counter.inc Transition_frontier.finalized_staged_txns
num_finalized_staged_txns) ;
Coda_metrics.(Counter.inc_one Transition_frontier.root_transitions) ;
let consensus_state = Breadcrumb.consensus_state new_root in
let blockchain_length =
Expand Down Expand Up @@ -745,7 +750,7 @@ struct
) ] ;
(* 4 *)
(* note: new_root_node is the same as root_node if the root didn't change *)
let garbage_breadcrumbs, new_root_node =
let%bind garbage_breadcrumbs, new_root_node =
if distance_to_root > max_length then (
Logger.debug t.logger ~module_:__MODULE__ ~location:__LOC__
"Moving the root of the transition frontier. The new node is \
Expand Down Expand Up @@ -809,20 +814,22 @@ struct
Gauge.dec Transition_frontier.active_breadcrumbs
(Float.of_int @@ (1 + List.length garbage_hashes))) ;
(* 4.IV *)
let new_root_node = move_root t heir_node in
let%bind new_root_node = move_root t heir_node in
(* 4.V *)
let new_root_staged_ledger =
Breadcrumb.staged_ledger new_root_node.breadcrumb
in
(* 4.VI *)
Consensus.Hooks.frontier_root_transition
(Breadcrumb.consensus_state root_node.breadcrumb)
(Breadcrumb.consensus_state new_root_node.breadcrumb)
~local_state:t.consensus_local_state
~snarked_ledger:
(Coda_base.Ledger.Any_ledger.cast
(module Coda_base.Ledger.Db)
t.root_snarked_ledger) ;
O1trace.measure "calling consensus hook frontier_root_transition"
(fun () ->
Consensus.Hooks.frontier_root_transition
(Breadcrumb.consensus_state root_node.breadcrumb)
(Breadcrumb.consensus_state new_root_node.breadcrumb)
~local_state:t.consensus_local_state
~snarked_ledger:
(Coda_base.Ledger.Any_ledger.cast
(module Coda_base.Ledger.Db)
t.root_snarked_ledger) ) ;
Debug_assert.debug_assert (fun () ->
(* After the lock transition, if the local_state was previously synced, it should continue to be synced *)
match
Expand Down Expand Up @@ -857,43 +864,55 @@ struct
| None ->
() ) ;
(* 4.VII *)
( match
let%map () =
match
( Staged_ledger.proof_txns new_root_staged_ledger
, heir_node.breadcrumb.just_emitted_a_proof )
with
| Some txns, true ->
let proof_data =
Staged_ledger.current_ledger_proof new_root_staged_ledger
|> Option.value_exn
in
[%test_result: Frozen_ledger_hash.t]
~message:
"Root snarked ledger hash should be the same as the \
source hash in the proof that was just emitted"
~expect:(Ledger_proof.statement proof_data).source
( Ledger.Db.merkle_root t.root_snarked_ledger
|> Frozen_ledger_hash.of_ledger_hash ) ;
(* Apply all the transactions associated with the new ledger
proof to the database-backed SNARKed ledger. We create a
mask and apply them to that, then commit it to the DB. This
saves a lot of IO since committing is batched. Would be even
faster if we implemented #2760. *)
let db_casted =
Ledger.Any_ledger.cast
(module Ledger.Db)
t.root_snarked_ledger
in
let db_mask =
Ledger.Maskable.register_mask db_casted
(Ledger.Mask.create ())
in
Non_empty_list.iter txns ~f:(fun txn ->
Ledger.apply_transaction db_mask txn
|> Or_error.ok_exn |> ignore ) ;
Ledger.commit db_mask ;
ignore @@ Ledger.Maskable.unregister_mask_exn db_casted db_mask
| _, false | None, _ ->
() ) ;
| Some txns, true ->
let proof_data =
Staged_ledger.current_ledger_proof new_root_staged_ledger
|> Option.value_exn
in
[%test_result: Frozen_ledger_hash.t]
~message:
"Root snarked ledger hash should be the same as the \
source hash in the proof that was just emitted"
~expect:(Ledger_proof.statement proof_data).source
( Ledger.Db.merkle_root t.root_snarked_ledger
|> Frozen_ledger_hash.of_ledger_hash ) ;
(* Apply all the transactions associated with the new ledger
proof to the database-backed SNARKed ledger. We create a
mask and apply them to that, then commit it to the DB. This
saves a lot of IO since committing is batched. Would be even
faster if we implemented #2760. *)
let db_casted =
Ledger.Any_ledger.cast
(module Ledger.Db)
t.root_snarked_ledger
in
let db_mask =
Ledger.Maskable.register_mask db_casted
(Ledger.Mask.create ())
in
let%bind () = Async.Scheduler.yield () in
let%bind () =
Non_empty_list.iter_deferred txns ~f:(fun txn ->
O1trace.measure "apply transaction to db mask"
(fun () ->
ignore
@@ Or_error.ok_exn
(Ledger.apply_transaction db_mask txn) ) ;
Deferred.unit )
in
O1trace.measure "committing db mask" (fun () ->
Ledger.commit db_mask ) ;
let%map () = Async.Scheduler.yield () in
ignore
@@ Ledger.Maskable.unregister_mask_exn db_casted db_mask
| _, false | None, _ ->
return ()
in
[%test_result: Frozen_ledger_hash.t]
~message:
"Root snarked ledger hash diverged from blockchain state \
Expand All @@ -909,7 +928,7 @@ struct
Extensions.Root_history.enqueue t.extensions.root_history
root_state_hash root_breadcrumb ;
(garbage_breadcrumbs, new_root_node) )
else ([], root_node)
else return ([], root_node)
in
(* 5 *)
Extensions.handle_diff t.extensions t.extension_writers
Expand Down

0 comments on commit 3bac778

Please sign in to comment.