Skip to content

Commit

Permalink
Add contract pruning to the bus (#1472)
Browse files Browse the repository at this point in the history
This PR moves contract pruning over to the bus. I kept the
`/rhp/contract/:id/roots` endpoint on the worker. This PR also uses a
temporary table to store the roots we fetch from the host, this allows
us to calculate which indices are prunable without fetching the contract
roots from the database. This saves on network traffic but also turned
out to make contract pruning about 2-3 times as fast. Not sure if this
performance boost is linear though because the slow part might be the
deletions.. pruning ~200MiB from 1.5 TiB contracts takes ~11s on average
as opposed to ~25s when we fetch roots and do the diff manually.
  • Loading branch information
peterjan authored Sep 4, 2024
1 parent 4fa4e10 commit 53d9f46
Show file tree
Hide file tree
Showing 23 changed files with 826 additions and 328 deletions.
20 changes: 20 additions & 0 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@ type (
LockID uint64 `json:"lockID"`
}

// ContractPruneRequest is the request type for the /contract/:id/prune
// endpoint.
ContractPruneRequest struct {
Timeout DurationMS `json:"timeout"`
}

// ContractPruneResponse is the response type for the /contract/:id/prune
// endpoint.
ContractPruneResponse struct {
ContractSize uint64 `json:"size"`
Pruned uint64 `json:"pruned"`
Remaining uint64 `json:"remaining"`
Error string `json:"error,omitempty"`
}

// ContractAcquireRequest is the request type for the /contract/:id/release
// endpoint.
ContractReleaseRequest struct {
Expand Down Expand Up @@ -211,6 +226,11 @@ type (
}
)

// Total returns the total cost of the contract spending.
func (x ContractSpending) Total() types.Currency {
return x.Uploads.Add(x.Downloads).Add(x.FundAccount).Add(x.Deletions).Add(x.SectorRoots)
}

// Add returns the sum of the current and given contract spending.
func (x ContractSpending) Add(y ContractSpending) (z ContractSpending) {
z.Uploads = x.Uploads.Add(y.Uploads)
Expand Down
18 changes: 6 additions & 12 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,12 @@ type (
TransactionSet []types.Transaction `json:"transactionSet"`
}

// RHPPruneContractRequest is the request type for the /rhp/contract/:id/prune
// endpoint.
RHPPruneContractRequest struct {
Timeout DurationMS `json:"timeout"`
}

// RHPPruneContractResponse is the response type for the /rhp/contract/:id/prune
// endpoint.
RHPPruneContractResponse struct {
Pruned uint64 `json:"pruned"`
Remaining uint64 `json:"remaining"`
Error string `json:"error,omitempty"`
// RHPFundRequest is the request type for the /rhp/fund endpoint.
RHPFundRequest struct {
ContractID types.FileContractID `json:"contractID"`
HostKey types.PublicKey `json:"hostKey"`
SiamuxAddr string `json:"siamuxAddr"`
Balance types.Currency `json:"balance"`
}

// RHPPriceTableRequest is the request type for the /rhp/pricetable endpoint.
Expand Down
6 changes: 5 additions & 1 deletion autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Bus interface {
RenewContract(ctx context.Context, fcid types.FileContractID, endHeight uint64, renterFunds, minNewCollateral, maxFundAmount types.Currency, expectedNewStorage uint64) (api.ContractMetadata, error)
SetContractSet(ctx context.Context, set string, contracts []types.FileContractID) error
PrunableData(ctx context.Context) (prunableData api.ContractsPrunableDataResponse, err error)
PruneContract(ctx context.Context, id types.FileContractID, timeout time.Duration) (api.ContractPruneResponse, error)

// hostdb
Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error)
Expand Down Expand Up @@ -322,7 +323,7 @@ func (ap *Autopilot) Run() {

// pruning
if autopilot.Config.Contracts.Prune {
ap.tryPerformPruning(ap.workers)
ap.tryPerformPruning()
} else {
ap.logger.Info("pruning disabled")
}
Expand Down Expand Up @@ -670,6 +671,9 @@ func (ap *Autopilot) configHandlerPUT(jc jape.Context) {
autopilot, err := ap.bus.Autopilot(jc.Request.Context(), ap.id)
if utils.IsErr(err, api.ErrAutopilotNotFound) {
autopilot = api.Autopilot{ID: ap.id, Config: cfg}
} else if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
} else {
if autopilot.Config.Contracts.Set != cfg.Contracts.Set {
contractSetChanged = true
Expand Down
100 changes: 59 additions & 41 deletions autopilot/contract_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,17 @@ func (ap *Autopilot) fetchHostContract(fcid types.FileContractID) (host api.Host
return
}

func (ap *Autopilot) performContractPruning(wp *workerPool) {
ap.logger.Info("performing contract pruning")
func (ap *Autopilot) performContractPruning() {
log := ap.logger.Named("performContractPruning")
log.Info("performing contract pruning")

// fetch prunable contracts
prunable, err := ap.fetchPrunableContracts()
if err != nil {
ap.logger.Error(err)
log.Error(err)
return
}
log.Debugf("found %d prunable contracts", len(prunable))

// dismiss alerts for contracts that are no longer prunable
ap.dismissPruneAlerts(prunable)
Expand All @@ -129,80 +131,96 @@ func (ap *Autopilot) performContractPruning(wp *workerPool) {
// fetch host
h, _, err := ap.fetchHostContract(contract.ID)
if utils.IsErr(err, api.ErrContractNotFound) {
log.Debugw("contract got archived", "contract", contract.ID)
continue // contract got archived
} else if err != nil {
ap.logger.Errorf("failed to fetch host for contract '%v', err %v", contract.ID, err)
log.Errorw("failed to fetch host", zap.Error(err), "contract", contract.ID)
continue
}

// prune contract using a random worker
wp.withWorker(func(w Worker) {
total += ap.pruneContract(w, contract.ID, h.PublicKey, h.Settings.Version, h.Settings.Release)
})
// prune contract
n, err := ap.pruneContract(ap.shutdownCtx, contract.ID, h.PublicKey, h.Settings.Version, h.Settings.Release, log)
if err != nil {
log.Errorw("failed to prune contract", zap.Error(err), "contract", contract.ID)
continue
}

// handle alerts
ap.mu.Lock()
alertID := alerts.IDForContract(alertPruningID, contract.ID)
if shouldSendPruneAlert(err, h.Settings.Version, h.Settings.Release) {
ap.RegisterAlert(ap.shutdownCtx, newContractPruningFailedAlert(h.PublicKey, h.Settings.Version, h.Settings.Release, contract.ID, err))
ap.pruningAlertIDs[contract.ID] = alertID // store id to dismiss stale alerts
} else {
ap.DismissAlert(ap.shutdownCtx, alertID)
delete(ap.pruningAlertIDs, contract.ID)
}
ap.mu.Unlock()

// adjust total
total += n
}

// log total pruned
ap.logger.Info(fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(prunable)))
log.Info(fmt.Sprintf("pruned %d (%s) from %v contracts", total, humanReadableSize(int(total)), len(prunable)))
}

func (ap *Autopilot) pruneContract(w Worker, fcid types.FileContractID, hk types.PublicKey, hostVersion, hostRelease string) uint64 {
// use a sane timeout
ctx, cancel := context.WithTimeout(ap.shutdownCtx, timeoutPruneContract+5*time.Minute)
defer cancel()
func (ap *Autopilot) pruneContract(ctx context.Context, fcid types.FileContractID, hk types.PublicKey, hostVersion, hostRelease string, logger *zap.SugaredLogger) (uint64, error) {
// define logger
log := logger.With(
zap.Stringer("contract", fcid),
zap.Stringer("host", hk),
zap.String("version", hostVersion),
zap.String("release", hostRelease))

// prune the contract
start := time.Now()
pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract)
duration := time.Since(start)
res, err := ap.bus.PruneContract(ctx, fcid, timeoutPruneContract)
if err != nil {
return 0, err
}

// decorate logger
log = log.With(
zap.String("pruned", utils.HumanReadableSize(int(res.Pruned))),
zap.String("remaining", utils.HumanReadableSize(int(res.Remaining))),
zap.String("size", utils.HumanReadableSize(int(res.ContractSize))),
zap.Duration("elapsed", time.Since(start)),
)

// ignore slow pruning until host network is 1.6.0+
if utils.IsErr(err, context.DeadlineExceeded) && pruned > 0 {
err = nil
if res.Error != "" && utils.IsErr(errors.New(res.Error), context.DeadlineExceeded) && res.Pruned > 0 {
res.Error = ""
}

// handle metrics
if err == nil || pruned > 0 {
if res.Pruned > 0 {
if err := ap.bus.RecordContractPruneMetric(ctx, api.ContractPruneMetric{
Timestamp: api.TimeRFC3339(start),

ContractID: fcid,
HostKey: hk,
HostVersion: hostVersion,

Pruned: pruned,
Remaining: remaining,
Duration: duration,
Pruned: res.Pruned,
Remaining: res.Remaining,
Duration: time.Since(start),
}); err != nil {
ap.logger.Error(err)
}
}

// handle logs
log := ap.logger.With("contract", fcid, "host", hk, "version", hostVersion, "release", hostRelease, "pruned", pruned, "remaining", remaining, "elapsed", duration)
if err != nil && pruned > 0 {
log.With(zap.Error(err)).Error("unexpected error interrupted pruning")
} else if err != nil {
log.With(zap.Error(err)).Error("failed to prune contract")
if res.Error != "" {
log.Errorw("unexpected error interrupted pruning", zap.Error(errors.New(res.Error)))
} else {
log.Info("successfully pruned contract")
}

// handle alerts
ap.mu.Lock()
defer ap.mu.Unlock()
alertID := alerts.IDForContract(alertPruningID, fcid)
if shouldSendPruneAlert(err, hostVersion, hostRelease) {
ap.RegisterAlert(ctx, newContractPruningFailedAlert(hk, hostVersion, hostRelease, fcid, err))
ap.pruningAlertIDs[fcid] = alertID // store id to dismiss stale alerts
} else {
ap.DismissAlert(ctx, alertID)
delete(ap.pruningAlertIDs, fcid)
}

return pruned
return res.Pruned, nil
}

func (ap *Autopilot) tryPerformPruning(wp *workerPool) {
func (ap *Autopilot) tryPerformPruning() {
ap.mu.Lock()
if ap.pruning || ap.isStopped() {
ap.mu.Unlock()
Expand All @@ -215,7 +233,7 @@ func (ap *Autopilot) tryPerformPruning(wp *workerPool) {
ap.wg.Add(1)
go func() {
defer ap.wg.Done()
ap.performContractPruning(wp)
ap.performContractPruning()
ap.mu.Lock()
ap.pruning = false
ap.mu.Unlock()
Expand Down
1 change: 0 additions & 1 deletion autopilot/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Worker interface {

RHPBroadcast(ctx context.Context, fcid types.FileContractID) (err error)
RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error)
RHPPruneContract(ctx context.Context, fcid types.FileContractID, timeout time.Duration) (pruned, remaining uint64, err error)
RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error)
}

Expand Down
11 changes: 8 additions & 3 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ const (
defaultWalletRecordMetricInterval = 5 * time.Minute
defaultPinUpdateInterval = 5 * time.Minute
defaultPinRateWindow = 6 * time.Hour
lockingPriorityFunding = 40
lockingPriorityRenew = 80
stdTxnSize = 1200 // bytes

lockingPriorityPruning = 20
lockingPriorityFunding = 40
lockingPriorityRenew = 80

stdTxnSize = 1200 // bytes
)

// Client re-exports the client from the client package.
Expand Down Expand Up @@ -223,6 +226,7 @@ type (
ContractRoots(ctx context.Context, id types.FileContractID) ([]types.Hash256, error)
ContractSizes(ctx context.Context) (map[types.FileContractID]api.ContractSize, error)
ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error)
PrunableContractRoots(ctx context.Context, id types.FileContractID, roots []types.Hash256) ([]uint64, error)

DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error)

Expand Down Expand Up @@ -421,6 +425,7 @@ func (b *Bus) Handler() http.Handler {
"POST /contract/:id/acquire": b.contractAcquireHandlerPOST,
"GET /contract/:id/ancestors": b.contractIDAncestorsHandler,
"POST /contract/:id/keepalive": b.contractKeepaliveHandlerPOST,
"POST /contract/:id/prune": b.contractPruneHandlerPOST,
"POST /contract/:id/renew": b.contractIDRenewHandlerPOST,
"POST /contract/:id/renewed": b.contractIDRenewedHandlerPOST,
"POST /contract/:id/release": b.contractReleaseHandlerPOST,
Expand Down
6 changes: 6 additions & 0 deletions bus/client/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ func (c *Client) PrunableData(ctx context.Context) (prunableData api.ContractsPr
return
}

// PruneContract prunes the given contract.
func (c *Client) PruneContract(ctx context.Context, contractID types.FileContractID, timeout time.Duration) (res api.ContractPruneResponse, err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/contract/%s/prune", contractID), api.ContractPruneRequest{Timeout: api.DurationMS(timeout)}, &res)
return
}

// RenewContract renews an existing contract with a host and adds it to the bus.
func (c *Client) RenewContract(ctx context.Context, contractID types.FileContractID, endHeight uint64, renterFunds, minNewCollateral, maxFundAmount types.Currency, expectedStorage uint64) (renewal api.ContractMetadata, err error) {
req := api.ContractRenewRequest{
Expand Down
Loading

0 comments on commit 53d9f46

Please sign in to comment.