Skip to content

Commit

Permalink
improve syncing up member indices and caching partial signature
Browse files Browse the repository at this point in the history
  • Loading branch information
kafeikui committed Jul 8, 2024
1 parent 75da9ae commit 300e781
Show file tree
Hide file tree
Showing 24 changed files with 202 additions and 205 deletions.
14 changes: 0 additions & 14 deletions crates/arpa-node/proto/management.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ service ManagementService {
returns (VerifyPartialSigsReply);

rpc SendPartialSig(SendPartialSigRequest) returns (SendPartialSigReply);

rpc FulfillRandomness(FulfillRandomnessRequest)
returns (FulfillRandomnessReply);
}

enum ListenerType {
Expand Down Expand Up @@ -198,14 +195,3 @@ message SendPartialSigReply {
bool res = 1;
}

message FulfillRandomnessRequest {
uint32 chain_id = 1;
uint32 group_index = 2;
bytes request_id = 3;
bytes sig = 4;
map<string, bytes> partial_sigs = 5;
}

message FulfillRandomnessReply {
bool res = 1;
}
3 changes: 3 additions & 0 deletions crates/arpa-node/src/committer/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ where
if let Ok(member) = self.group_cache.read().await.get_member(req_id_address) {
let partial_public_key = member.partial_public_key.clone().unwrap();

let member_index = member.index;

SimpleBLSCore::<PC, S>::partial_verify(
&partial_public_key,
&req.message,
Expand Down Expand Up @@ -167,6 +169,7 @@ where
.add_partial_signature(
req.request_id,
req_id_address,
member_index,
req.partial_signature,
)
.await
Expand Down
102 changes: 11 additions & 91 deletions crates/arpa-node/src/management/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::collections::HashMap;

use super::{
algorithm::bls::{BLSCore, SimpleBLSCore},
committer::{client::GeneralCommitterClient, CommitterClient, CommitterService},
context::{chain::Chain, types::GeneralContext, Context, ContextFetcher},
error::{NodeError, NodeResult},
error::NodeResult,
scheduler::FixedTaskScheduler,
};
use anyhow::Result;
use arpa_contract_client::{adapter::AdapterTransactions, controller::ControllerTransactions};
use arpa_contract_client::controller::ControllerTransactions;
use arpa_core::{
BLSTaskType, ComponentTaskType, DKGStatus, ExponentialBackoffRetryDescriptor, Group,
ListenerDescriptor, ListenerType, PartialSignature, SchedulerError, SchedulerResult,
ListenerDescriptor, ListenerType, SchedulerError, SchedulerResult,
DEFAULT_COMMIT_PARTIAL_SIGNATURE_RETRY_BASE, DEFAULT_COMMIT_PARTIAL_SIGNATURE_RETRY_FACTOR,
DEFAULT_COMMIT_PARTIAL_SIGNATURE_RETRY_MAX_ATTEMPTS,
DEFAULT_COMMIT_PARTIAL_SIGNATURE_RETRY_USE_JITTER,
Expand All @@ -20,7 +18,6 @@ use arpa_dal::error::DataAccessResult;
use ethers::types::Address;
use threshold_bls::{
group::Curve,
poly::Eval,
sig::{Share, SignatureScheme, ThresholdScheme},
};

Expand Down Expand Up @@ -107,15 +104,6 @@ pub trait BLSRandomnessService<PC: Curve> {
randomness_task_request_id: Vec<u8>,
partial: Vec<u8>,
) -> Result<()>;

async fn fulfill_randomness(
&self,
chain_id: usize,
group_index: usize,
randomness_task_request_id: Vec<u8>,
sig: Vec<u8>,
partial_sigs: HashMap<Address, Vec<u8>>,
) -> Result<()>;
}

impl<
Expand Down Expand Up @@ -452,13 +440,21 @@ where
.await?;
}

let self_index = self
.get_main_chain()
.get_group_cache()
.read()
.await
.get_self_index()?;

self.get_main_chain()
.get_randomness_result_cache()
.write()
.await
.add_partial_signature(
randomness_task_request_id.clone(),
id_address,
self_index,
partial_signature.clone(),
)
.await?;
Expand Down Expand Up @@ -548,80 +544,4 @@ where

Ok(())
}

async fn fulfill_randomness(
&self,
chain_id: usize,
group_index: usize,
randomness_task_request_id: Vec<u8>,
sig: Vec<u8>,
partial_sigs: HashMap<Address, Vec<u8>>,
) -> Result<()> {
let id_address = self
.get_main_chain()
.get_node_cache()
.read()
.await
.get_id_address()?;

let main_chain_id = self
.get_main_chain()
.get_chain_identity()
.read()
.await
.get_chain_id();

let partial_signatures = partial_sigs
.iter()
.map(|(addr, partial)| {
let eval: Eval<Vec<u8>> = bincode::deserialize(partial)?;
let partial = PartialSignature {
index: eval.index as usize,
signature: eval.value,
};
Ok((*addr, partial))
})
.collect::<Result<_, NodeError>>()?;

let (client, randomness_task) = if chain_id == main_chain_id {
(
self.get_main_chain()
.get_chain_identity()
.read()
.await
.build_adapter_client(id_address),
self.get_main_chain()
.get_randomness_tasks_cache()
.read()
.await
.get(&randomness_task_request_id)
.await?,
)
} else {
if !self.contains_relayed_chain(chain_id) {
return Err(SchedulerError::InvalidChainId(chain_id).into());
}
(
self.get_relayed_chain(chain_id)
.unwrap()
.get_chain_identity()
.read()
.await
.build_adapter_client(id_address),
self.get_relayed_chain(chain_id)
.unwrap()
.get_randomness_tasks_cache()
.read()
.await
.get(&randomness_task_request_id)
.await?,
)
};

client
.fulfill_randomness(group_index, randomness_task, sig, partial_signatures)
.await?;

Ok(())
}
}
45 changes: 8 additions & 37 deletions crates/arpa-node/src/management/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use crate::rpc_stub::management::management_service_server::{
ManagementService, ManagementServiceServer,
};
use crate::rpc_stub::management::{
AggregatePartialSigsReply, AggregatePartialSigsRequest, FulfillRandomnessReply,
FulfillRandomnessRequest, GetGroupInfoReply, GetGroupInfoRequest, GetNodeInfoReply,
GetNodeInfoRequest, Group, ListFixedTasksReply, ListFixedTasksRequest, Member,
NodeActivateReply, NodeActivateRequest, NodeQuitReply, NodeQuitRequest, NodeRegisterReply,
NodeRegisterRequest, PartialSignReply, PartialSignRequest, PostProcessDkgReply,
PostProcessDkgRequest, SendPartialSigReply, SendPartialSigRequest, ShutdownListenerReply,
ShutdownListenerRequest, ShutdownNodeReply, ShutdownNodeRequest, StartListenerReply,
StartListenerRequest, VerifyPartialSigsReply, VerifyPartialSigsRequest, VerifySigReply,
VerifySigRequest,
AggregatePartialSigsReply, AggregatePartialSigsRequest, GetGroupInfoReply, GetGroupInfoRequest,
GetNodeInfoReply, GetNodeInfoRequest, Group, ListFixedTasksReply, ListFixedTasksRequest,
Member, NodeActivateReply, NodeActivateRequest, NodeQuitReply, NodeQuitRequest,
NodeRegisterReply, NodeRegisterRequest, PartialSignReply, PartialSignRequest,
PostProcessDkgReply, PostProcessDkgRequest, SendPartialSigReply, SendPartialSigRequest,
ShutdownListenerReply, ShutdownListenerRequest, ShutdownNodeReply, ShutdownNodeRequest,
StartListenerReply, StartListenerRequest, VerifyPartialSigsReply, VerifyPartialSigsRequest,
VerifySigReply, VerifySigRequest,
};
use arpa_core::{
address_to_string, Group as ModelGroup, ListenerType, Member as ModelMember, SchedulerError,
Expand Down Expand Up @@ -321,34 +320,6 @@ where
.map_err(|e: anyhow::Error| Status::unavailable(e.to_string()))?;
return Ok(Response::new(SendPartialSigReply { res: true }));
}

async fn fulfill_randomness(
&self,
request: Request<FulfillRandomnessRequest>,
) -> Result<tonic::Response<FulfillRandomnessReply>, tonic::Status> {
let req = request.into_inner();
let group_index = req.group_index as usize;
let request_id = req.request_id;
let sig = req.sig;
let partial_sigs = req
.partial_sigs
.into_iter()
.map(|(k, v)| (k.parse().unwrap(), v))
.collect();
self.context
.write()
.await
.fulfill_randomness(
req.chain_id as usize,
group_index,
request_id,
sig,
partial_sigs,
)
.await
.map_err(|e: anyhow::Error| Status::failed_precondition(e.to_string()))?;
return Ok(Response::new(FulfillRandomnessReply { res: true }));
}
}

impl<PC: Curve> From<NodeInfo<PC>> for GetNodeInfoReply {
Expand Down
2 changes: 1 addition & 1 deletion crates/arpa-node/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn start(

let mut node_cache = db.get_node_info_client();

let mut group_cache = db.get_group_info_client();
let mut group_cache = db.get_group_info_client(id_address);

let mut dkg_public_key_to_register: Option<Vec<u8>> = None;

Expand Down
3 changes: 2 additions & 1 deletion crates/arpa-node/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
};
use arpa_core::{
ComponentTaskType, Config, GeneralMainChainIdentity, ListenerType, RandomnessTask,
PLACEHOLDER_ADDRESS,
};
use arpa_dal::{
cache::{
Expand Down Expand Up @@ -116,7 +117,7 @@ mod tests {
));

let group_cache: Arc<RwLock<Box<dyn GroupInfoHandler<G2Curve>>>> = Arc::new(RwLock::new(
Box::new(InMemoryGroupInfoCache::<G2Curve>::default()),
Box::new(InMemoryGroupInfoCache::<G2Curve>::new(PLACEHOLDER_ADDRESS)),
));

let randomness_tasks_cache: Arc<RwLock<Box<dyn BLSTasksHandler<RandomnessTask>>>> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{DebuggableEvent, DebuggableSubscriber, Subscriber};
use crate::{
algorithm::bls::{BLSCore, SimpleBLSCore},
context::ChainIdentityHandlerType,
error::{NodeError, NodeResult},
error::NodeResult,
event::{ready_to_fulfill_randomness_task::ReadyToFulfillRandomnessTask, types::Topic},
queue::{event_queue::EventQueue, EventSubscriber},
scheduler::{dynamic::SimpleDynamicTaskScheduler, TaskScheduler},
Expand All @@ -22,10 +22,9 @@ use async_trait::async_trait;
use ethers::types::{Address, U256};
use log::{debug, error, info};
use serde_json::json;
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use std::{collections::BTreeMap, marker::PhantomData, sync::Arc};
use threshold_bls::{
group::Curve,
poly::Eval,
sig::{SignatureScheme, ThresholdScheme},
};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -82,7 +81,7 @@ pub trait FulfillRandomnessHandler {
group_index: usize,
randomness_task: RandomnessTask,
signature: Vec<u8>,
partial_signatures: HashMap<Address, PartialSignature>,
partial_signatures: BTreeMap<Address, PartialSignature>,
) -> NodeResult<()>;
}

Expand All @@ -102,7 +101,7 @@ impl<PC: Curve> FulfillRandomnessHandler for GeneralFulfillRandomnessHandler<PC>
group_index: usize,
randomness_task: RandomnessTask,
signature: Vec<u8>,
partial_signatures: HashMap<Address, PartialSignature>,
partial_signatures: BTreeMap<Address, PartialSignature>,
) -> NodeResult<()> {
let client = self
.chain_identity
Expand Down Expand Up @@ -312,7 +311,7 @@ where

let partials = partial_signatures
.values()
.cloned()
.map(|partial| partial.signed_partial_signature.clone())
.collect::<Vec<Vec<u8>>>();

match SimpleBLSCore::<PC, S>::aggregate(threshold, &partials) {
Expand All @@ -330,18 +329,6 @@ where
)
);

let partial_signatures = partial_signatures
.iter()
.map(|(addr, partial)| {
let eval: Eval<Vec<u8>> = bincode::deserialize(partial)?;
let partial = PartialSignature {
index: eval.index as usize,
signature: eval.value,
};
Ok((*addr, partial))
})
.collect::<Result<_, NodeError>>()?;

let id_address = self.id_address;

let block_cache = self.block_cache.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where
self.group_cache.read().await.get_secret_share()?,
&actual_seed,
) {
Ok(partial_signature) => {
Ok(signed_partial_signature) => {
info!(
"{}",
build_task_related_payload(
Expand All @@ -164,7 +164,7 @@ where
)
);

self.send_partial_signature(task, actual_seed, partial_signature)
self.send_partial_signature(task, actual_seed, signed_partial_signature)
.await?;
}
Err(e) => {
Expand Down Expand Up @@ -197,6 +197,8 @@ where

let current_group_index = self.group_cache.read().await.get_index()?;

let current_member_index = self.group_cache.read().await.get_self_index()?;

if self
.group_cache
.read()
Expand Down Expand Up @@ -230,6 +232,7 @@ where
.add_partial_signature(
task.request_id.clone(),
self.id_address,
current_member_index,
partial_signature.clone(),
)
.await?;
Expand Down
Loading

0 comments on commit 300e781

Please sign in to comment.