Skip to content

Commit

Permalink
Merge pull request #115 from ava-labs/c-chain-integ-tests
Browse files Browse the repository at this point in the history
C-Chain compatibility
  • Loading branch information
cam-schultz authored Dec 21, 2023
2 parents 8cddd8c + d12a01d commit 76056e3
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 164 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/ava-labs/awm-relayer
go 1.20

require (
github.com/ava-labs/avalanchego v1.10.15
github.com/ava-labs/subnet-evm v0.5.9
github.com/ava-labs/teleporter v0.0.0-20231128155319-e21ccab967a4
github.com/ava-labs/avalanchego v1.10.17
github.com/ava-labs/subnet-evm v0.5.10
github.com/ava-labs/teleporter v0.0.0-20231221165433-826fa59bed3c
github.com/ethereum/go-ethereum v1.12.0
github.com/onsi/ginkgo/v2 v2.13.2
github.com/onsi/gomega v1.30.0
Expand All @@ -20,13 +20,13 @@ require (

require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/ava-labs/avalanche-network-runner v1.7.3-0.20231026153158-2931f8a448d9 // indirect
github.com/ava-labs/coreth v0.12.8-rc.1 // indirect
github.com/ava-labs/avalanche-network-runner v1.7.4-0.20231127162258-2f3ceed8ae4b // indirect
github.com/ava-labs/coreth v0.12.9-rc.9 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3 // indirect
Expand Down Expand Up @@ -70,6 +70,7 @@ require (
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down Expand Up @@ -139,7 +140,6 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
26 changes: 14 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,18 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanche-network-runner v1.7.3-0.20231026153158-2931f8a448d9 h1:mTxfmBxmV8GSW0PUt0Qj/Obetzsw0gP6dtoNHaKW2+o=
github.com/ava-labs/avalanche-network-runner v1.7.3-0.20231026153158-2931f8a448d9/go.mod h1:M9FC+xU4hQU3Botux8V8j574YETeX6tMvxiOmqLfl5c=
github.com/ava-labs/avalanchego v1.10.15 h1:GQ1bkwgKnv5D/yUwXUjWPFqx1cWrf35WOaHSykHUXyE=
github.com/ava-labs/avalanchego v1.10.15/go.mod h1:fHTzxKZOMdM0n4EEXDDR0V3Ieb/Jnz7PM8zAsJRsh2U=
github.com/ava-labs/coreth v0.12.8-rc.1 h1:tvJcxQTQzxIQqx8TnrxdyMhZYbdsMaiy6AEiOyjvaa4=
github.com/ava-labs/coreth v0.12.8-rc.1/go.mod h1:GBH5SxHZdScSp95IijDs9+Gxw/QDIWvfoLKiJMNYLsE=
github.com/ava-labs/subnet-evm v0.5.9 h1:IYWj5j83C2oterPOrxOix6/qYCua85HTQ/viMFHNPOM=
github.com/ava-labs/subnet-evm v0.5.9/go.mod h1:n4/abB4BYU1xlAqD15uKz6pRROOh/5u5Jxf4JPK6DbY=
github.com/ava-labs/teleporter v0.0.0-20231128155319-e21ccab967a4 h1:IR2aGMcH7BPwlLxze5USWVZCF5Jgc6smMlGtgqUfQT0=
github.com/ava-labs/teleporter v0.0.0-20231128155319-e21ccab967a4/go.mod h1:60cR99/bNua2CMt5Sq+YrYhJ3a8B/3K++v6H03sSkl0=
github.com/ava-labs/avalanche-network-runner v1.7.4-0.20231127162258-2f3ceed8ae4b h1:iH6q+S7dmBOYCXrZx+nNlS1HBp72L2msiVCLs39Ls5A=
github.com/ava-labs/avalanche-network-runner v1.7.4-0.20231127162258-2f3ceed8ae4b/go.mod h1:aeAm8dgJ1xucQKlYoRDMgYjA0UWGwmaICG9wL0WvseU=
github.com/ava-labs/avalanchego v1.10.17 h1:Ri01nU5ukKC38ZCkCh3namaMZtJkSuv1X/vC13uJguc=
github.com/ava-labs/avalanchego v1.10.17/go.mod h1:A6f3877qlq7bePjCU4T0D60bZGecRMCk15pMpJGOb4Q=
github.com/ava-labs/coreth v0.12.9-rc.9 h1:mvYxABdyPByXwwwIxnTBCiNO23dsE1Kfnd5H106lric=
github.com/ava-labs/coreth v0.12.9-rc.9/go.mod h1:yrf2vEah4Fgj6sJ4UpHewo4DLolwdpf2bJuLRT80PGw=
github.com/ava-labs/subnet-evm v0.5.10 h1:ed9BxoiuXRnB/qKakKzYKtZzV/gVjOB2LxuDegpLs9g=
github.com/ava-labs/subnet-evm v0.5.10/go.mod h1:wln8B4siQ1Osch+elW9vW1XJGjj5PYxQETkzFyDEMjk=
github.com/ava-labs/teleporter v0.0.0-20231216161900-52e5f0c35bb1 h1:90y8wOAZYjFXCQcWpNzbWAkX45l9yHJCGEn8pFMaGKk=
github.com/ava-labs/teleporter v0.0.0-20231216161900-52e5f0c35bb1/go.mod h1:xE+pCNxh0OR988CQysMyhi8jBqK/0oX7ctETHGFVduo=
github.com/ava-labs/teleporter v0.0.0-20231221165433-826fa59bed3c h1:vnMlfP4SHFoatRufgUma/eGwvVzWdwMo17ADdrh6YYQ=
github.com/ava-labs/teleporter v0.0.0-20231221165433-826fa59bed3c/go.mod h1:qeclhkPTO4R2McXNrXXca4JmiRSgQ0gJ0KtJWzQGGPE=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -138,8 +140,8 @@ github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
142 changes: 121 additions & 21 deletions relayer/message_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ import (
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/awm-relayer/messages"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ava-labs/coreth/params"
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
warpBackend "github.com/ava-labs/subnet-evm/warp"

"go.uber.org/zap"
)

Expand All @@ -37,9 +42,11 @@ const (
)

var (
codec = msg.Codec
codec = msg.Codec
coreEthCodec = coreEthMsg.Codec
// Errors
errNotEnoughSignatures = fmt.Errorf("failed to collect a threshold of signatures")
errFailedToGetAggSig = fmt.Errorf("failed to get aggregate signature from node endpoint")
)

// messageRelayers are created for each warp message to be relayed.
Expand Down Expand Up @@ -76,7 +83,7 @@ func newMessageRelayer(
}
}

func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, requestID uint32, messageManager messages.MessageManager) error {
func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, requestID uint32, messageManager messages.MessageManager, useAppRequestNetwork bool) error {
shouldSend, err := messageManager.ShouldSendMessage(warpMessageInfo, r.destinationBlockchainID)
if err != nil {
r.logger.Error(
Expand All @@ -95,14 +102,27 @@ func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo,

startCreateSignedMessageTime := time.Now()
// Query nodes on the origin chain for signatures, and construct the signed warp message.
signedMessage, err := r.createSignedMessage(requestID)
if err != nil {
r.logger.Error(
"Failed to create signed warp message",
zap.Error(err),
)
r.incFailedRelayMessageCount("failed to create signed warp message")
return err
var signedMessage *avalancheWarp.Message
if useAppRequestNetwork {
signedMessage, err = r.createSignedMessageAppRequest(requestID)
if err != nil {
r.logger.Error(
"Failed to create signed warp message via AppRequest network",
zap.Error(err),
)
r.incFailedRelayMessageCount("failed to create signed warp message via AppRequest network")
return err
}
} else {
signedMessage, err = r.createSignedMessage()
if err != nil {
r.logger.Error(
"Failed to create signed warp message via RPC",
zap.Error(err),
)
r.incFailedRelayMessageCount("failed to create signed warp message via RPC")
return err
}
}

// create signed message latency (ms)
Expand All @@ -125,12 +145,82 @@ func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo,
return nil
}

// Run collects signatures from nodes by directly querying them via AppRequest, then aggregates the signatures, and constructs the signed warp message.
func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, error) {
r.logger.Info(
"Starting relayer routine",
// createSignedMessage fetches the signed Warp message from the source chain via RPC.
// Each VM may implement their own RPC method to construct the aggregate signature, which
// will need to be accounted for here.
func (r *messageRelayer) createSignedMessage() (*warp.Message, error) {
r.logger.Info("Fetching aggregate signature from the source chain validators via API")
warpClient, err := warpBackend.NewClient(r.relayer.apiNodeURI, r.relayer.sourceBlockchainID.String())
if err != nil {
r.logger.Error(
"Failed to create Warp API client",
zap.Error(err),
)
return nil, err
}
signingSubnetID := r.relayer.sourceSubnetID
if r.relayer.sourceSubnetID == constants.PrimaryNetworkID {
signingSubnetID, err = r.relayer.pChainClient.ValidatedBy(context.Background(), r.destinationBlockchainID)
if err != nil {
r.logger.Error(
"failed to get validating subnet for destination chain",
zap.String("destinationBlockchainID", r.destinationBlockchainID.String()),
zap.Error(err),
)
return nil, err
}
}

var signedWarpMessageBytes []byte
for attempt := 1; attempt <= maxRelayerQueryAttempts; attempt++ {
r.logger.Debug(
"Relayer collecting signatures from peers.",
zap.Int("attempt", attempt),
zap.String("sourceBlockchainID", r.relayer.sourceBlockchainID.String()),
zap.String("destinationBlockchainID", r.destinationBlockchainID.String()),
zap.String("signingSubnetID", signingSubnetID.String()),
)
signedWarpMessageBytes, err = warpClient.GetMessageAggregateSignature(
context.Background(),
r.warpMessage.ID(),
params.WarpDefaultQuorumNumerator,
signingSubnetID.String(),
)
if err == nil {
warpMsg, err := avalancheWarp.ParseMessage(signedWarpMessageBytes)
if err != nil {
r.logger.Error(
"Failed to parse signed warp message",
zap.Error(err),
)
return nil, err
}
return warpMsg, err
}
r.logger.Info(
"Failed to get aggregate signature from node endpoint. Retrying.",
zap.Int("attempt", attempt),
zap.Error(err),
)
if attempt != maxRelayerQueryAttempts {
// Sleep such that all retries are uniformly spread across totalRelayerQueryPeriodMs
// TODO: We may want to consider an exponential back off rather than a uniform sleep period.
time.Sleep(time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond)
}
}
r.logger.Warn(
"Failed to get aggregate signature from node endpoint",
zap.Int("attempts", maxRelayerQueryAttempts),
zap.String("sourceBlockchainID", r.relayer.sourceBlockchainID.String()),
zap.String("destinationBlockchainID", r.destinationBlockchainID.String()),
zap.String("signingSubnetID", signingSubnetID.String()),
)
return nil, errFailedToGetAggSig
}

// createSignedMessageAppRequest collects signatures from nodes by directly querying them via AppRequest, then aggregates the signatures, and constructs the signed warp message.
func (r *messageRelayer) createSignedMessageAppRequest(requestID uint32) (*warp.Message, error) {
r.logger.Info("Fetching aggregate signature from the source chain validators via AppRequest")

// Get the current canonical validator set of the source subnet.
validatorSet, totalValidatorWeight, err := r.getCurrentCanonicalValidatorSet()
Expand Down Expand Up @@ -173,14 +263,24 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
}

// Construct the request
req := msg.MessageSignatureRequest{
MessageID: r.warpMessage.ID(),

// Make sure to use the correct codec
var reqBytes []byte
if r.relayer.sourceSubnetID == constants.PrimaryNetworkID {
req := coreEthMsg.MessageSignatureRequest{
MessageID: r.warpMessage.ID(),
}
reqBytes, err = coreEthMsg.RequestToBytes(coreEthCodec, req)
} else {
req := msg.MessageSignatureRequest{
MessageID: r.warpMessage.ID(),
}
reqBytes, err = msg.RequestToBytes(codec, req)
}
reqBytes, err := msg.RequestToBytes(codec, req)
if err != nil {
r.logger.Error(
"Failed to marshal request bytes",
zap.String("destinationBlockchainID", r.destinationBlockchainID.String()),
zap.String("warpMessageID", r.warpMessage.ID().String()),
zap.Error(err),
)
return nil, err
Expand Down Expand Up @@ -240,7 +340,7 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
sentTo := r.relayer.network.Network.Send(outMsg, vdrSet, r.relayer.sourceSubnetID, subnets.NoOpAllower)
r.logger.Debug(
"Sent signature request to network",
zap.String("messageID", req.MessageID.String()),
zap.String("messageID", r.warpMessage.ID().String()),
zap.Any("sentTo", sentTo),
)
for nodeID := range vdrSet {
Expand Down Expand Up @@ -311,7 +411,7 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
}

// As soon as the signatures exceed the stake weight threshold we try to aggregate and send the transaction.
if utils.CheckStakeWeightExceedsThreshold(accumulatedSignatureWeight, totalValidatorWeight, utils.DefaultQuorumNumerator, utils.DefaultQuorumDenominator) {
if utils.CheckStakeWeightExceedsThreshold(accumulatedSignatureWeight, totalValidatorWeight, params.WarpDefaultQuorumNumerator, params.WarpQuorumDenominator) {
aggSig, vdrBitSet, err := r.aggregateSignatures(signatureMap)
if err != nil {
r.logger.Error(
Expand Down Expand Up @@ -380,7 +480,7 @@ func (r *messageRelayer) getCurrentCanonicalValidatorSet() ([]*warp.Validator, u
signingSubnet, err = r.relayer.pChainClient.ValidatedBy(context.Background(), r.destinationBlockchainID)
if err != nil {
r.logger.Error(
"failed to get validating subnet for destination chain",
"Failed to get validating subnet for destination chain",
zap.String("destinationBlockchainID", r.destinationBlockchainID.String()),
zap.Error(err),
)
Expand Down
8 changes: 7 additions & 1 deletion relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ava-labs/awm-relayer/database"
"github.com/ava-labs/awm-relayer/messages"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/utils"
vms "github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -39,6 +40,7 @@ type Relayer struct {
logger logging.Logger
db database.RelayerDatabase
supportedDestinations set.Set[ids.ID]
apiNodeURI string
}

func NewRelayer(
Expand Down Expand Up @@ -97,6 +99,8 @@ func NewRelayer(
messageManagers[addressHash] = messageManager
}

uri := utils.StripFromString(sourceSubnetInfo.GetNodeRPCEndpoint(), "/ext")

logger.Info(
"Creating relayer",
zap.String("subnetID", subnetID.String()),
Expand All @@ -117,6 +121,7 @@ func NewRelayer(
logger: logger,
db: db,
supportedDestinations: supportedDestinationsBlockchainIDs,
apiNodeURI: uri,
}

// Open the subscription. We must do this before processing any missed messages, otherwise we may miss an incoming message
Expand Down Expand Up @@ -277,7 +282,8 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *Messag

// Relay the message to the destination. Messages from a given source chain must be processed in serial in order to
// guarantee that the previous block (n-1) is fully processed by the relayer when processing a given log from block n.
err = messageRelayer.relayMessage(warpMessageInfo, r.currentRequestID, messageManager)
// TODO: Add a config option to use the Warp API, instead of hardcoding to the app request network here
err = messageRelayer.relayMessage(warpMessageInfo, r.currentRequestID, messageManager, true)
if err != nil {
r.logger.Error(
"Failed to run message relayer",
Expand Down
Loading

0 comments on commit 76056e3

Please sign in to comment.