Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new usable hosts endpoint #1647

Merged
merged 17 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,11 @@ func (cm ContractMetadata) InSet(set string) bool {
}
return false
}

func (cm ContractMetadata) HostInfo() HostInfo {
return HostInfo{
PublicKey: cm.HostKey,
ContractID: cm.ID,
SiamuxAddr: cm.SiamuxAddr,
}
}
5 changes: 3 additions & 2 deletions api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ type (
}

HostInfo struct {
PublicKey types.PublicKey `json:"publicKey"`
SiamuxAddr string `json:"siamuxAddr"`
ContractID types.FileContractID `json:"contractID"`
PublicKey types.PublicKey `json:"publicKey"`
SiamuxAddr string `json:"siamuxAddr"`

Prices HostPriceTable `json:"prices"`
Settings rhpv2.HostSettings `json:"settings"`
Expand Down
18 changes: 10 additions & 8 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type (
UpdateHostAllowlistEntries(ctx context.Context, add, remove []types.PublicKey, clear bool) error
UpdateHostBlocklistEntries(ctx context.Context, add, remove []string, clear bool) error
UpdateHostCheck(ctx context.Context, autopilotID string, hk types.PublicKey, check api.HostCheck) error
UsableHosts(ctx context.Context, offset, limit int) ([]api.HostInfo, error)
UsableHosts(ctx context.Context, minWindowStart uint64, offset, limit int) ([]api.HostInfo, error)
}

// A MetadataStore stores information about contracts and objects.
Expand Down Expand Up @@ -316,9 +316,10 @@ type (
)

type Bus struct {
allowPrivateIPs bool
startTime time.Time
masterKey utils.MasterKey
allowPrivateIPs bool
startTime time.Time
masterKey utils.MasterKey
revisionSubmissionBuffer uint64

alerts alerts.Alerter
alertMgr AlertManager
Expand All @@ -343,14 +344,15 @@ type Bus struct {
}

// New returns a new Bus
func New(ctx context.Context, cfg config.Bus, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, explorerURL string, l *zap.Logger) (_ *Bus, err error) {
func New(ctx context.Context, cfg config.Bus, masterKey [32]byte, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, explorerURL string, revisionSubmissionBuffer uint64, l *zap.Logger) (_ *Bus, err error) {
l = l.Named("bus")
dialer := rhp.NewFallbackDialer(store, net.Dialer{}, l)

b := &Bus{
allowPrivateIPs: cfg.AllowPrivateIPs,
startTime: time.Now(),
masterKey: masterKey,
allowPrivateIPs: cfg.AllowPrivateIPs,
revisionSubmissionBuffer: revisionSubmissionBuffer,
startTime: time.Now(),
masterKey: masterKey,

s: s,
cm: cm,
Expand Down
8 changes: 7 additions & 1 deletion bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,13 @@ func (b *Bus) hostsHandlerGET(jc jape.Context) {
return
}

hosts, err := b.store.UsableHosts(jc.Request.Context(), offset, limit)
cs, err := b.consensusState(jc.Request.Context())
if jc.Check("couldn't fetch consensus state", err) != nil {
return
}
minWindowStart := cs.BlockHeight + b.revisionSubmissionBuffer

hosts, err := b.store.UsableHosts(jc.Request.Context(), minWindowStart, offset, limit)
if jc.Check("couldn't fetch hosts", err) != nil {
return
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/renterd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ func defaultConfig() config.Config {
AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year
Bootstrap: true,
GatewayAddr: ":9981",
UsedUTXOExpiry: 24 * time.Hour,
RevisionSubmissionBuffer: 150, // 144 + 6 blocks leeway
SlabBufferCompletionThreshold: 1 << 12,
UsedUTXOExpiry: 24 * time.Hour,
},
Worker: config.Worker{
Enabled: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/renterd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func newBus(ctx context.Context, cfg config.Config, pk types.PrivateKey, network
}

// create bus
b, err := bus.New(ctx, cfg.Bus, masterKey, alertsMgr, wh, cm, s, w, sqlStore, explorerURL, logger)
b, err := bus.New(ctx, cfg.Bus, masterKey, alertsMgr, wh, cm, s, w, sqlStore, explorerURL, cfg.Bus.RevisionSubmissionBuffer, logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to create bus: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ type (
GatewayAddr string `yaml:"gatewayAddr,omitempty"`
RemoteAddr string `yaml:"remoteAddr,omitempty"`
RemotePassword string `yaml:"remotePassword,omitempty"`
UsedUTXOExpiry time.Duration `yaml:"usedUtxoExpiry,omitempty"`
RevisionSubmissionBuffer uint64 `yaml:"revisionSubmissionBuffer,omitempty"`
SlabBufferCompletionThreshold int64 `yaml:"slabBufferCompleionThreshold,omitempty"`
UsedUTXOExpiry time.Duration `yaml:"usedUtxoExpiry,omitempty"`
}

// LogFile configures the file output of the logger.
Expand Down
5 changes: 4 additions & 1 deletion internal/test/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func newTestBus(ctx context.Context, cm *chain.Manager, genesisBlock types.Block
masterKey := blake2b.Sum256(append([]byte("worker"), pk...))

// create bus
b, err := bus.New(ctx, cfg, masterKey, alertsMgr, wh, cm, s, w, sqlStore, "", logger)
b, err := bus.New(ctx, cfg, masterKey, alertsMgr, wh, cm, s, w, sqlStore, "", cfg.RevisionSubmissionBuffer, logger)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -716,6 +716,9 @@ func (c *TestCluster) sync() {
func (c *TestCluster) WaitForAccounts() []api.Account {
c.tt.Helper()

// mine a block (ensures worker cache gets invalidated)
c.MineBlocks(1)

// build hosts map
hostsMap := make(map[types.PublicKey]struct{})
for _, host := range c.hosts {
Expand Down
101 changes: 42 additions & 59 deletions internal/worker/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ var (

type (
AccountFunder interface {
FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, desired types.Currency) error
FundAccount(ctx context.Context, hi api.HostInfo, desired types.Currency) error
}

AccountSyncer interface {
SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error
SyncAccount(ctx context.Context, hi api.HostInfo) error
}

AccountStore interface {
Expand All @@ -49,27 +49,26 @@ type (
ConsensusState(ctx context.Context) (api.ConsensusState, error)
}

DownloadContracts interface {
DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error)
HostStore interface {
UsableHosts(ctx context.Context) ([]api.HostInfo, error)
}
)

type (
AccountMgr struct {
alerts alerts.Alerter
funder AccountFunder
syncer AccountSyncer
dc DownloadContracts
cs ConsensusState
s AccountStore
key utils.AccountsKey
logger *zap.SugaredLogger
owner string
refillInterval time.Duration
revisionSubmissionBuffer uint64
shutdownCtx context.Context
shutdownCancel context.CancelFunc
wg sync.WaitGroup
alerts alerts.Alerter
funder AccountFunder
syncer AccountSyncer
cs ConsensusState
s AccountStore
hs HostStore
key utils.AccountsKey
logger *zap.SugaredLogger
owner string
refillInterval time.Duration
shutdownCtx context.Context
shutdownCancel context.CancelFunc
wg sync.WaitGroup

mu sync.Mutex
byID map[rhpv3.Account]*Account
Expand All @@ -92,7 +91,7 @@ type (
// NewAccountManager creates a new account manager. It will load all accounts
// from the given store and mark the shutdown as unclean. When Shutdown is
// called it will save all accounts.
func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, cs ConsensusState, dc DownloadContracts, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) {
func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alerter, funder AccountFunder, syncer AccountSyncer, cs ConsensusState, hs HostStore, s AccountStore, refillInterval time.Duration, l *zap.Logger) (*AccountMgr, error) {
logger := l.Named("accounts").Sugar()

shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
Expand All @@ -101,7 +100,7 @@ func NewAccountManager(key utils.AccountsKey, owner string, alerter alerts.Alert
funder: funder,
syncer: syncer,
cs: cs,
dc: dc,
hs: hs,
s: s,
key: key,
logger: logger,
Expand Down Expand Up @@ -309,75 +308,59 @@ func (a *AccountMgr) markRefillDone(hk types.PublicKey) {
// goroutine from a previous call, refillWorkerAccounts will skip that account
// until the previously launched goroutine returns.
func (a *AccountMgr) refillAccounts() {
// fetch config
cs, err := a.cs.ConsensusState(a.shutdownCtx)
// fetch all usable hosts
hosts, err := a.hs.UsableHosts(a.shutdownCtx)
if err != nil {
a.logger.Errorw(fmt.Sprintf("failed to fetch consensus state for refill: %v", err))
return
}

// fetch all contracts
contracts, err := a.dc.DownloadContracts(a.shutdownCtx)
if err != nil {
a.logger.Errorw(fmt.Sprintf("failed to fetch contracts for refill: %v", err))
return
} else if len(contracts) == 0 {
a.logger.Errorw(fmt.Sprintf("failed to fetch usable hosts: %v", err))
return
}

// refill accounts in separate goroutines
for _, c := range contracts {
for _, hi := range hosts {
// launch refill if not already in progress
if a.markRefillInProgress(c.HostKey) {
go func(contract api.ContractMetadata) {
defer a.markRefillDone(contract.HostKey)
if a.markRefillInProgress(hi.PublicKey) {
go func() {
defer a.markRefillDone(hi.PublicKey)

rCtx, cancel := context.WithTimeout(a.shutdownCtx, 5*time.Minute)
defer cancel()

// refill
refilled, err := a.refillAccount(rCtx, c, cs.BlockHeight, a.revisionSubmissionBuffer)
refilled, err := a.refillAccount(rCtx, hi)

// determine whether to log something
shouldLog := true
a.mu.Lock()
if t, exists := a.lastLoggedRefillErr[contract.HostKey]; !exists || err == nil {
a.lastLoggedRefillErr[contract.HostKey] = time.Now()
if t, exists := a.lastLoggedRefillErr[hi.PublicKey]; !exists || err == nil {
a.lastLoggedRefillErr[hi.PublicKey] = time.Now()
} else if time.Since(t) < time.Hour {
// only log error once per hour per account
shouldLog = false
}
a.mu.Unlock()

if err != nil && shouldLog {
a.logger.Error("failed to refill account for host", zap.Stringer("hostKey", contract.HostKey), zap.Error(err))
a.logger.Error("failed to refill account for host", zap.Stringer("hostKey", hi.PublicKey), zap.Error(err))
} else if refilled {
a.logger.Infow("successfully refilled account for host", zap.Stringer("hostKey", contract.HostKey), zap.Error(err))
a.logger.Infow("successfully refilled account for host", zap.Stringer("hostKey", hi.PublicKey), zap.Error(err))
}
}(c)
}()
}
}
}

func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMetadata, bh, revisionSubmissionBuffer uint64) (bool, error) {
func (a *AccountMgr) refillAccount(ctx context.Context, hi api.HostInfo) (bool, error) {
// fetch the account
account := a.Account(contract.HostKey)

// check if the contract is too close to the proof window to be revised,
// trying to refill the account would result in the host not returning the
// revision and returning an obfuscated error
if (bh + revisionSubmissionBuffer) > contract.WindowStart {
return false, fmt.Errorf("contract %v is too close to the proof window to be revised", contract.ID)
}
account := a.Account(hi.PublicKey)

// check if a host is potentially cheating before refilling.
// We only check against the max drift if the account's drift is
// negative because we don't care if we have more money than
// expected.
if account.Drift.Cmp(maxNegDrift) < 0 {
alert := newAccountRefillAlert(account.ID, contract, errMaxDriftExceeded,
alert := newAccountRefillAlert(account.ID, hi.PublicKey, hi.ContractID, errMaxDriftExceeded,
"accountID", account.ID.String(),
"hostKey", contract.HostKey.String(),
"hostKey", hi.PublicKey.String(),
"balance", account.Balance.String(),
"drift", account.Drift.String(),
)
Expand All @@ -390,13 +373,13 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet
// check if a resync is needed
if account.RequiresSync {
// sync the account
err := a.syncer.SyncAccount(ctx, contract.ID, contract.HostKey, contract.SiamuxAddr)
err := a.syncer.SyncAccount(ctx, hi)
if err != nil {
return false, fmt.Errorf("failed to sync account's balance: %w", err)
}

// refetch the account after syncing
account = a.Account(contract.HostKey)
account = a.Account(hi.PublicKey)
}

// check if refill is needed
Expand All @@ -405,7 +388,7 @@ func (a *AccountMgr) refillAccount(ctx context.Context, contract api.ContractMet
}

// fund the account
err := a.funder.FundAccount(ctx, contract.ID, contract.HostKey, maxBalance)
err := a.funder.FundAccount(ctx, hi, maxBalance)
if err != nil {
return false, fmt.Errorf("failed to fund account: %w", err)
}
Expand Down Expand Up @@ -593,12 +576,12 @@ func (a *Account) setBalance(balance *big.Int) {
zap.Stringer("drift", drift))
}

func newAccountRefillAlert(id rhpv3.Account, contract api.ContractMetadata, err error, keysAndValues ...string) alerts.Alert {
func newAccountRefillAlert(id rhpv3.Account, hk types.PublicKey, fcid types.FileContractID, err error, keysAndValues ...string) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"accountID": id.String(),
"contractID": contract.ID.String(),
"hostKey": contract.HostKey.String(),
"contractID": fcid.String(),
"hostKey": hk.String(),
}
for i := 0; i < len(keysAndValues); i += 2 {
data[keysAndValues[i]] = keysAndValues[i+1]
Expand Down
7 changes: 5 additions & 2 deletions internal/worker/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func (b *mockAccountMgrBackend) RegisterAlert(context.Context, alerts.Alert) err
return nil
}

func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, balance types.Currency) error {
func (b *mockAccountMgrBackend) FundAccount(ctx context.Context, hi api.HostInfo, balance types.Currency) error {
return nil
}
func (b *mockAccountMgrBackend) SyncAccount(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, siamuxAddr string) error {
func (b *mockAccountMgrBackend) SyncAccount(ctx context.Context, hi api.HostInfo) error {
return nil
}
func (b *mockAccountMgrBackend) Accounts(context.Context, string) ([]api.Account, error) {
Expand All @@ -48,6 +48,9 @@ func (b *mockAccountMgrBackend) ConsensusState(ctx context.Context) (api.Consens
func (b *mockAccountMgrBackend) DownloadContracts(ctx context.Context) ([]api.ContractMetadata, error) {
return nil, nil
}
func (b *mockAccountMgrBackend) UsableHosts(ctx context.Context) ([]api.HostInfo, error) {
return nil, nil
}

func TestAccounts(t *testing.T) {
// create a manager with an account for a single host
Expand Down
Loading
Loading