diff --git a/src/lib/kademlia/membership.ml b/src/lib/kademlia/membership.ml index 505a4596e8d..1ad54a49ad0 100644 --- a/src/lib/kademlia/membership.ml +++ b/src/lib/kademlia/membership.ml @@ -127,20 +127,24 @@ module Haskell_process = struct type t = { failure_response: [`Die | `Ignore] ref ; process: Process.t - ; mutable already_waited: bool + ; terminated_ivar: unit Ivar.t + (** Filled once the process is terminated. *) ; lock_path: string } - let kill {failure_response; process; lock_path; already_waited} = + let kill {failure_response; process; terminated_ivar; _} = failure_response := `Ignore ; - if not already_waited then - let%bind _ = - Process.run_exn ~prog:"kill" - ~args:[Pid.to_string (Process.pid process)] - () - in - let%bind _ = Process.wait process in - Sys.remove lock_path - else Deferred.unit + match Ivar.peek terminated_ivar with + | None -> + let%bind _ = + Process.run_exn ~prog:"kill" + ~args:[Pid.to_string (Process.pid process)] + () + in + (* Wait for the process to terminate before returning. *) + Ivar.read terminated_ivar + | Some () -> + (* Process has already terminated so killing it is a no-op. *) + Deferred.unit let cli_format : Unix.Inet_addr.t -> int -> string = fun host discovery_port -> @@ -234,13 +238,14 @@ module Haskell_process = struct { failure_response= ref `Die ; process ; lock_path - ; already_waited= false } + ; terminated_ivar= Ivar.create () } with | Ok p -> (* If the Kademlia process dies, kill the parent daemon process. Fix * for #550 *) Deferred.bind (Process.wait p.process) ~f:(fun code -> - p.already_waited <- true ; + let%bind () = Sys.remove lock_path in + Ivar.fill p.terminated_ivar () ; match (!(p.failure_response), code) with | `Ignore, _ | _, Ok () -> return () @@ -250,7 +255,6 @@ module Haskell_process = struct ~metadata: [ ( "exit_or_signal" , `String (Unix.Exit_or_signal.to_string_hum e) ) ] ; - let%map () = Sys.remove lock_path in raise Child_died ) |> don't_wait_for ; Ok p @@ -263,20 +267,19 @@ module Haskell_process = struct in let kill_locked_process ~logger = match%bind Sys.file_exists lock_path with - | `Yes -> ( + | `Yes -> let%bind p = Reader.file_contents lock_path in - match%bind Process.run ~prog:"kill" ~args:[p] () with + let%bind kill_res = Process.run ~prog:"kill" ~args:[p] () in + ( match kill_res with | Ok _ -> Logger.debug logger ~module_:__MODULE__ ~location:__LOC__ "Killing Kademlia process: $process" - ~metadata:[("process", `String p)] ; - let%map () = Sys.remove lock_path in - Ok () + ~metadata:[("process", `String p)] | Error _ -> Logger.debug logger ~module_:__MODULE__ ~location:__LOC__ "Process $process does not exist, won't kill" - ~metadata:[("process", `String p)] ; - return @@ Ok () ) + ~metadata:[("process", `String p)] ) ; + Deferred.map ~f:Or_error.return @@ Sys.remove lock_path | _ -> return @@ Ok () in diff --git a/src/lib/non_empty_list/dune b/src/lib/non_empty_list/dune index 32e20e82a3a..a552536061b 100644 --- a/src/lib/non_empty_list/dune +++ b/src/lib/non_empty_list/dune @@ -1,5 +1,5 @@ (library (name non_empty_list) (public_name non_empty_list) - (libraries core_kernel) + (libraries core_kernel async_kernel) (preprocess (pps ppx_coda ppx_jane ppx_deriving.eq))) diff --git a/src/lib/non_empty_list/non_empty_list.ml b/src/lib/non_empty_list/non_empty_list.ml index 7f1aef740d2..f433f8f52f8 100644 --- a/src/lib/non_empty_list/non_empty_list.ml +++ b/src/lib/non_empty_list/non_empty_list.ml @@ -81,3 +81,8 @@ let min_elt ~compare (x, xs) = let max_elt ~compare (x, xs) = Option.value_map ~default:x (List.max_elt ~compare xs) ~f:(fun maximum -> if compare x maximum > 0 then x else maximum ) + +let rec iter_deferred (x, xs) ~f = + let open Async_kernel in + let%bind () = f x in + match xs with [] -> return () | h :: t -> iter_deferred (h, t) ~f diff --git a/src/lib/non_empty_list/non_empty_list.mli b/src/lib/non_empty_list/non_empty_list.mli index 310b26baec4..b6065a488cc 100644 --- a/src/lib/non_empty_list/non_empty_list.mli +++ b/src/lib/non_empty_list/non_empty_list.mli @@ -67,3 +67,8 @@ val take : 'a t -> int -> 'a t option val min_elt : compare:('a -> 'a -> int) -> 'a t -> 'a val max_elt : compare:('a -> 'a -> int) -> 'a t -> 'a + +val iter_deferred : + 'a t + -> f:('a -> unit Async_kernel.Deferred.t) + -> unit Async_kernel.Deferred.t diff --git a/src/lib/transition_frontier/transition_frontier.ml b/src/lib/transition_frontier/transition_frontier.ml index 09153707ab5..be360c17e07 100644 --- a/src/lib/transition_frontier/transition_frontier.ml +++ b/src/lib/transition_frontier/transition_frontier.ml @@ -542,7 +542,7 @@ struct * modifies the heir's staged-ledger and sets the heir as the new root. * Modifications are in-place *) - let move_root t (soon_to_be_root_node : Node.t) : Node.t = + let move_root t (soon_to_be_root_node : Node.t) : Node.t Deferred.t = let root_node = Hashtbl.find_exn t.table t.root in let root_breadcrumb = root_node.breadcrumb in let root = root_breadcrumb |> Breadcrumb.staged_ledger in @@ -554,7 +554,10 @@ struct let soon_to_be_root_merkle_root = Ledger.merkle_root soon_to_be_root_ledger in - Ledger.commit soon_to_be_root_ledger ; + let%bind () = Async.Scheduler.yield () in + O1trace.measure "committing new root mask" (fun () -> + Ledger.commit soon_to_be_root_ledger ) ; + let%map () = Async.Scheduler.yield () in let root_ledger_merkle_root_after_commit = Ledger.merkle_root root_ledger in @@ -576,19 +579,11 @@ struct Hashtbl.remove t.table t.root ; Hashtbl.set t.table ~key:new_root_hash ~data:new_root_node ; t.root <- new_root_hash ; - let num_finalized_staged_txns = - Breadcrumb.user_commands new_root |> List.length |> Float.of_int - in (* TODO: these metrics are too expensive to compute in this way, but it should be ok for beta *) + (* let root_snarked_ledger_accounts = Ledger.Db.to_list t.root_snarked_ledger in - Coda_metrics.( - Gauge.set Transition_frontier.recently_finalized_staged_txns - num_finalized_staged_txns) ; - Coda_metrics.( - Counter.inc Transition_frontier.finalized_staged_txns - num_finalized_staged_txns) ; Coda_metrics.( Gauge.set Transition_frontier.root_snarked_ledger_accounts (Float.of_int @@ List.length root_snarked_ledger_accounts)) ; @@ -598,6 +593,16 @@ struct @@ List.fold_left root_snarked_ledger_accounts ~init:0 ~f:(fun sum account -> sum + Currency.Balance.to_int account.balance ) )) ; + *) + let num_finalized_staged_txns = + Breadcrumb.user_commands new_root |> List.length |> Float.of_int + in + Coda_metrics.( + Gauge.set Transition_frontier.recently_finalized_staged_txns + num_finalized_staged_txns) ; + Coda_metrics.( + Counter.inc Transition_frontier.finalized_staged_txns + num_finalized_staged_txns) ; Coda_metrics.(Counter.inc_one Transition_frontier.root_transitions) ; let consensus_state = Breadcrumb.consensus_state new_root in let blockchain_length = @@ -745,7 +750,7 @@ struct ) ] ; (* 4 *) (* note: new_root_node is the same as root_node if the root didn't change *) - let garbage_breadcrumbs, new_root_node = + let%bind garbage_breadcrumbs, new_root_node = if distance_to_root > max_length then ( Logger.debug t.logger ~module_:__MODULE__ ~location:__LOC__ "Moving the root of the transition frontier. The new node is \ @@ -809,20 +814,22 @@ struct Gauge.dec Transition_frontier.active_breadcrumbs (Float.of_int @@ (1 + List.length garbage_hashes))) ; (* 4.IV *) - let new_root_node = move_root t heir_node in + let%bind new_root_node = move_root t heir_node in (* 4.V *) let new_root_staged_ledger = Breadcrumb.staged_ledger new_root_node.breadcrumb in (* 4.VI *) - Consensus.Hooks.frontier_root_transition - (Breadcrumb.consensus_state root_node.breadcrumb) - (Breadcrumb.consensus_state new_root_node.breadcrumb) - ~local_state:t.consensus_local_state - ~snarked_ledger: - (Coda_base.Ledger.Any_ledger.cast - (module Coda_base.Ledger.Db) - t.root_snarked_ledger) ; + O1trace.measure "calling consensus hook frontier_root_transition" + (fun () -> + Consensus.Hooks.frontier_root_transition + (Breadcrumb.consensus_state root_node.breadcrumb) + (Breadcrumb.consensus_state new_root_node.breadcrumb) + ~local_state:t.consensus_local_state + ~snarked_ledger: + (Coda_base.Ledger.Any_ledger.cast + (module Coda_base.Ledger.Db) + t.root_snarked_ledger) ) ; Debug_assert.debug_assert (fun () -> (* After the lock transition, if the local_state was previously synced, it should continue to be synced *) match @@ -857,43 +864,55 @@ struct | None -> () ) ; (* 4.VII *) - ( match + let%map () = + match ( Staged_ledger.proof_txns new_root_staged_ledger , heir_node.breadcrumb.just_emitted_a_proof ) with - | Some txns, true -> - let proof_data = - Staged_ledger.current_ledger_proof new_root_staged_ledger - |> Option.value_exn - in - [%test_result: Frozen_ledger_hash.t] - ~message: - "Root snarked ledger hash should be the same as the \ - source hash in the proof that was just emitted" - ~expect:(Ledger_proof.statement proof_data).source - ( Ledger.Db.merkle_root t.root_snarked_ledger - |> Frozen_ledger_hash.of_ledger_hash ) ; - (* Apply all the transactions associated with the new ledger - proof to the database-backed SNARKed ledger. We create a - mask and apply them to that, then commit it to the DB. This - saves a lot of IO since committing is batched. Would be even - faster if we implemented #2760. *) - let db_casted = - Ledger.Any_ledger.cast - (module Ledger.Db) - t.root_snarked_ledger - in - let db_mask = - Ledger.Maskable.register_mask db_casted - (Ledger.Mask.create ()) - in - Non_empty_list.iter txns ~f:(fun txn -> - Ledger.apply_transaction db_mask txn - |> Or_error.ok_exn |> ignore ) ; - Ledger.commit db_mask ; - ignore @@ Ledger.Maskable.unregister_mask_exn db_casted db_mask - | _, false | None, _ -> - () ) ; + | Some txns, true -> + let proof_data = + Staged_ledger.current_ledger_proof new_root_staged_ledger + |> Option.value_exn + in + [%test_result: Frozen_ledger_hash.t] + ~message: + "Root snarked ledger hash should be the same as the \ + source hash in the proof that was just emitted" + ~expect:(Ledger_proof.statement proof_data).source + ( Ledger.Db.merkle_root t.root_snarked_ledger + |> Frozen_ledger_hash.of_ledger_hash ) ; + (* Apply all the transactions associated with the new ledger + proof to the database-backed SNARKed ledger. We create a + mask and apply them to that, then commit it to the DB. This + saves a lot of IO since committing is batched. Would be even + faster if we implemented #2760. *) + let db_casted = + Ledger.Any_ledger.cast + (module Ledger.Db) + t.root_snarked_ledger + in + let db_mask = + Ledger.Maskable.register_mask db_casted + (Ledger.Mask.create ()) + in + let%bind () = Async.Scheduler.yield () in + let%bind () = + Non_empty_list.iter_deferred txns ~f:(fun txn -> + O1trace.measure "apply transaction to db mask" + (fun () -> + ignore + @@ Or_error.ok_exn + (Ledger.apply_transaction db_mask txn) ) ; + Deferred.unit ) + in + O1trace.measure "committing db mask" (fun () -> + Ledger.commit db_mask ) ; + let%map () = Async.Scheduler.yield () in + ignore + @@ Ledger.Maskable.unregister_mask_exn db_casted db_mask + | _, false | None, _ -> + return () + in [%test_result: Frozen_ledger_hash.t] ~message: "Root snarked ledger hash diverged from blockchain state \ @@ -909,7 +928,7 @@ struct Extensions.Root_history.enqueue t.extensions.root_history root_state_hash root_breadcrumb ; (garbage_breadcrumbs, new_root_node) ) - else ([], root_node) + else return ([], root_node) in (* 5 *) Extensions.handle_diff t.extensions t.extension_writers