diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 7367003e0..c53e4ec4c 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -18,6 +18,7 @@ import ( "go.sia.tech/renterd/api" "go.sia.tech/renterd/build" "go.sia.tech/renterd/hostdb" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/webhooks" @@ -299,7 +300,7 @@ func (ap *Autopilot) Run() error { // perform maintenance setChanged, err := ap.c.performContractMaintenance(ap.shutdownCtx, w) - if err != nil && isErr(err, context.Canceled) { + if err != nil && utils.IsErr(err, context.Canceled) { return } else if err != nil { ap.logger.Errorf("contract maintenance failed, err: %v", err) @@ -405,9 +406,9 @@ func (ap *Autopilot) blockUntilConfigured(interrupt <-chan time.Time) (configure cancel() // if the config was not found, or we were unable to fetch it, keep blocking - if isErr(err, context.Canceled) { + if utils.IsErr(err, context.Canceled) { return - } else if isErr(err, api.ErrAutopilotNotFound) { + } else if utils.IsErr(err, api.ErrAutopilotNotFound) { once.Do(func() { ap.logger.Info("autopilot is waiting to be configured...") }) } else if err != nil { ap.logger.Errorf("autopilot is unable to fetch its configuration from the bus, err: %v", err) @@ -438,7 +439,7 @@ func (ap *Autopilot) blockUntilOnline() (online bool) { online = len(peers) > 0 cancel() - if isErr(err, context.Canceled) { + if utils.IsErr(err, context.Canceled) { return } else if err != nil { ap.logger.Errorf("failed to get peers, err: %v", err) @@ -472,7 +473,7 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) (synced, block cancel() // if an error occurred, or if we're not synced, we continue - if isErr(err, context.Canceled) { + if utils.IsErr(err, context.Canceled) { return } else if err != nil { ap.logger.Errorf("failed to get consensus state, err: %v", err) diff --git a/autopilot/contract_pruning.go b/autopilot/contract_pruning.go index e32cd3fa0..aa0eb505f 100644 --- a/autopilot/contract_pruning.go +++ b/autopilot/contract_pruning.go @@ -9,6 +9,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/siad/build" ) @@ -65,14 +66,14 @@ func (pm pruneMetrics) String() string { func (pr pruneResult) toAlert() (id types.Hash256, alert *alerts.Alert) { id = alertIDForContract(alertPruningID, pr.fcid) - if shouldTrigger := pr.err != nil && !((isErr(pr.err, errInvalidMerkleProof) && build.VersionCmp(pr.version, "1.6.0") < 0) || - isErr(pr.err, api.ErrContractNotFound) || // contract got archived - isErr(pr.err, errConnectionRefused) || - isErr(pr.err, errConnectionTimedOut) || - isErr(pr.err, errConnectionResetByPeer) || - isErr(pr.err, errInvalidHandshakeSignature) || - isErr(pr.err, errNoRouteToHost) || - isErr(pr.err, errNoSuchHost)); shouldTrigger { + if shouldTrigger := pr.err != nil && !((utils.IsErr(pr.err, errInvalidMerkleProof) && build.VersionCmp(pr.version, "1.6.0") < 0) || + utils.IsErr(pr.err, api.ErrContractNotFound) || // contract got archived + utils.IsErr(pr.err, errConnectionRefused) || + utils.IsErr(pr.err, errConnectionTimedOut) || + utils.IsErr(pr.err, errConnectionResetByPeer) || + utils.IsErr(pr.err, errInvalidHandshakeSignature) || + utils.IsErr(pr.err, errNoRouteToHost) || + utils.IsErr(pr.err, errNoSuchHost)); shouldTrigger { alert = newContractPruningFailedAlert(pr.hk, pr.version, pr.fcid, pr.err) } return @@ -196,7 +197,7 @@ func (c *contractor) pruneContract(w Worker, fcid types.FileContractID) pruneRes pruned, remaining, err := w.RHPPruneContract(ctx, fcid, timeoutPruneContract) if err != nil && pruned == 0 { return pruneResult{fcid: fcid, hk: host.PublicKey, version: host.Settings.Version, err: err} - } else if err != nil && isErr(err, context.DeadlineExceeded) { + } else if err != nil && utils.IsErr(err, context.DeadlineExceeded) { err = nil } diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 4e5e8c842..d69f2a354 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -16,6 +16,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/hostdb" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/wallet" "go.sia.tech/renterd/worker" "go.uber.org/zap" @@ -1425,7 +1426,7 @@ func (c *contractor) renewContract(ctx context.Context, w Worker, ci contractInf "renterFunds", renterFunds, "expectedNewStorage", expectedNewStorage, ) - if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) { + if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) { return api.ContractMetadata{}, false, err } return api.ContractMetadata{}, true, err @@ -1508,7 +1509,7 @@ func (c *contractor) refreshContract(ctx context.Context, w Worker, ci contractI return api.ContractMetadata{}, true, err } c.logger.Errorw("refresh failed", zap.Error(err), "hk", hk, "fcid", fcid) - if strings.Contains(err.Error(), wallet.ErrInsufficientBalance.Error()) { + if utils.IsErr(err, wallet.ErrInsufficientBalance) && !worker.IsErrHost(err) { return api.ContractMetadata{}, false, err } return api.ContractMetadata{}, true, err diff --git a/autopilot/ipfilter.go b/autopilot/ipfilter.go index 6aa244047..0932d7676 100644 --- a/autopilot/ipfilter.go +++ b/autopilot/ipfilter.go @@ -9,6 +9,7 @@ import ( "time" "go.sia.tech/core/types" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) @@ -137,7 +138,7 @@ func (r *ipResolver) lookup(hostIP string) ([]string, error) { addrs, err := r.resolver.LookupIPAddr(ctx, host) if err != nil { // check the cache if it's an i/o timeout or server misbehaving error - if isErr(err, errIOTimeout) || isErr(err, errServerMisbehaving) { + if utils.IsErr(err, errIOTimeout) || utils.IsErr(err, errServerMisbehaving) { if entry, found := r.cache[hostIP]; found && time.Since(entry.created) < ipCacheEntryValidity { r.logger.Debugf("using cached IP addresses for %v, err: %v", hostIP, err) return entry.subnets, nil @@ -188,10 +189,3 @@ func parseSubnets(addresses []net.IPAddr) []string { return subnets } - -func isErr(err error, target error) bool { - if errors.Is(err, target) { - return true - } - return err != nil && target != nil && strings.Contains(err.Error(), target.Error()) -} diff --git a/autopilot/ipfilter_test.go b/autopilot/ipfilter_test.go index 74b2e56c5..29fc3c8cf 100644 --- a/autopilot/ipfilter_test.go +++ b/autopilot/ipfilter_test.go @@ -8,6 +8,7 @@ import ( "time" "go.sia.tech/core/types" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) @@ -61,20 +62,20 @@ func TestIPResolver(t *testing.T) { // test lookup error r.setNextErr(errors.New("unknown error")) - if _, err := ipr.lookup("example.com:1234"); !isErr(err, errors.New("unknown error")) { + if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errors.New("unknown error")) { t.Fatal("unexpected error", err) } // test IO timeout - no cache entry r.setNextErr(errIOTimeout) - if _, err := ipr.lookup("example.com:1234"); !isErr(err, errIOTimeout) { + if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errIOTimeout) { t.Fatal("unexpected error", err) } // test IO timeout - expired cache entry ipr.cache["example.com:1234"] = ipCacheEntry{subnets: []string{"a"}} r.setNextErr(errIOTimeout) - if _, err := ipr.lookup("example.com:1234"); !isErr(err, errIOTimeout) { + if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errIOTimeout) { t.Fatal("unexpected error", err) } @@ -89,19 +90,19 @@ func TestIPResolver(t *testing.T) { // test too many addresses - more than two r.setAddr("example.com", []net.IPAddr{{}, {}, {}}) - if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) { + if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) { t.Fatal("unexpected error", err) } // test too many addresses - two of the same type r.setAddr("example.com", []net.IPAddr{{IP: net.IPv4(1, 2, 3, 4)}, {IP: net.IPv4(1, 2, 3, 4)}}) - if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) { + if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) { t.Fatal("unexpected error", err) } // test invalid addresses r.setAddr("example.com", []net.IPAddr{{IP: ipv4Localhost}, {IP: net.IP{127, 0, 0, 2}}}) - if _, err := ipr.lookup("example.com:1234"); !isErr(err, errTooManyAddresses) { + if _, err := ipr.lookup("example.com:1234"); !utils.IsErr(err, errTooManyAddresses) { t.Fatal("unexpected error", err) } diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 4a4e31de6..c55b9c734 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -10,6 +10,7 @@ import ( "time" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" "go.sia.tech/renterd/stats" "go.uber.org/zap" @@ -156,7 +157,7 @@ func (m *migrator) performMigrations(p *workerPool) { if err != nil { m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.Key, j.Health, res.SurchargeApplied, err) - skipAlert := isErr(err, api.ErrSlabNotFound) + skipAlert := utils.IsErr(err, api.ErrSlabNotFound) if !skipAlert { if res.SurchargeApplied { m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.Key, j.Health, err)) diff --git a/autopilot/scanner.go b/autopilot/scanner.go index e512d1f87..230400619 100644 --- a/autopilot/scanner.go +++ b/autopilot/scanner.go @@ -12,6 +12,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/hostdb" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) @@ -314,7 +315,7 @@ func (s *scanner) launchScanWorkers(ctx context.Context, w scanWorker, reqs chan scan, err := w.RHPScan(ctx, req.hostKey, req.hostIP, s.currentTimeout()) if err != nil { break // abort - } else if !isErr(errors.New(scan.ScanError), errIOTimeout) && scan.Ping > 0 { + } else if !utils.IsErr(errors.New(scan.ScanError), errIOTimeout) && scan.Ping > 0 { s.tracker.addDataPoint(time.Duration(scan.Ping)) } diff --git a/internal/utils/errors.go b/internal/utils/errors.go new file mode 100644 index 000000000..b884cde70 --- /dev/null +++ b/internal/utils/errors.go @@ -0,0 +1,20 @@ +package utils + +import ( + "errors" + "strings" +) + +// IsErr can be used to compare an error to a target and also works when used on +// errors that haven't been wrapped since it will fall back to a string +// comparison. Useful to check errors returned over the network. +func IsErr(err error, target error) bool { + if (err == nil) != (target == nil) { + return false + } else if errors.Is(err, target) { + return true + } + // TODO: we can get rid of the lower casing once siad is gone and + // renterd/hostd use the same error messages + return strings.Contains(strings.ToLower(err.Error()), strings.ToLower(target.Error())) +} diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 9c280f2bd..203d2c3da 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -10,7 +10,6 @@ import ( "math" "math/big" "net" - "strings" "sync" "time" @@ -20,6 +19,7 @@ import ( "go.sia.tech/mux/v1" "go.sia.tech/renterd/api" "go.sia.tech/renterd/hostdb" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/siad/crypto" "go.uber.org/zap" ) @@ -47,6 +47,12 @@ const ( ) var ( + // errHost is used to wrap rpc errors returned by the host. + errHost = errors.New("host responded with error") + + // errTransport is used to wrap rpc errors caused by the transport. + errTransport = errors.New("transport error") + // errBalanceInsufficient occurs when a withdrawal failed because the // account balance was insufficient. errBalanceInsufficient = errors.New("ephemeral account balance was insufficient") @@ -83,31 +89,42 @@ var ( errWithdrawalExpired = errors.New("withdrawal request expired") ) -func isBalanceInsufficient(err error) bool { return isError(err, errBalanceInsufficient) } -func isBalanceMaxExceeded(err error) bool { return isError(err, errBalanceMaxExceeded) } +// IsErrHost indicates whether an error was returned by a host as part of an RPC. +func IsErrHost(err error) bool { + return utils.IsErr(err, errHost) +} + +func isBalanceInsufficient(err error) bool { return utils.IsErr(err, errBalanceInsufficient) } +func isBalanceMaxExceeded(err error) bool { return utils.IsErr(err, errBalanceMaxExceeded) } func isClosedStream(err error) bool { - return isError(err, mux.ErrClosedStream) || isError(err, net.ErrClosed) + return utils.IsErr(err, mux.ErrClosedStream) || utils.IsErr(err, net.ErrClosed) } -func isInsufficientFunds(err error) bool { return isError(err, ErrInsufficientFunds) } -func isPriceTableExpired(err error) bool { return isError(err, errPriceTableExpired) } -func isPriceTableGouging(err error) bool { return isError(err, errPriceTableGouging) } -func isPriceTableNotFound(err error) bool { return isError(err, errPriceTableNotFound) } +func isInsufficientFunds(err error) bool { return utils.IsErr(err, ErrInsufficientFunds) } +func isPriceTableExpired(err error) bool { return utils.IsErr(err, errPriceTableExpired) } +func isPriceTableGouging(err error) bool { return utils.IsErr(err, errPriceTableGouging) } +func isPriceTableNotFound(err error) bool { return utils.IsErr(err, errPriceTableNotFound) } func isSectorNotFound(err error) bool { - return isError(err, errSectorNotFound) || isError(err, errSectorNotFoundOld) + return utils.IsErr(err, errSectorNotFound) || utils.IsErr(err, errSectorNotFoundOld) } -func isWithdrawalsInactive(err error) bool { return isError(err, errWithdrawalsInactive) } -func isWithdrawalExpired(err error) bool { return isError(err, errWithdrawalExpired) } +func isWithdrawalsInactive(err error) bool { return utils.IsErr(err, errWithdrawalsInactive) } +func isWithdrawalExpired(err error) bool { return utils.IsErr(err, errWithdrawalExpired) } -func isError(err error, target error) bool { - if err == nil { - return err == target +// wrapRPCErr extracts the innermost error, wraps it in either a errHost or +// errTransport and finally wraps it using the provided fnName. +func wrapRPCErr(err *error, fnName string) { + if *err == nil { + return + } + innerErr := *err + for errors.Unwrap(innerErr) != nil { + innerErr = errors.Unwrap(innerErr) } - // compare error first - if errors.Is(err, target) { - return true + if errors.As(*err, new(*rhpv3.RPCError)) { + *err = fmt.Errorf("%w: '%w'", errHost, innerErr) + } else { + *err = fmt.Errorf("%w: '%w'", errTransport, innerErr) } - // then compare the string in case the error was returned by a host - return strings.Contains(strings.ToLower(err.Error()), strings.ToLower(target.Error())) + *err = fmt.Errorf("%s: %w", fnName, *err) } // transportV3 is a reference-counted wrapper for rhpv3.Transport. @@ -125,6 +142,26 @@ type streamV3 struct { *rhpv3.Stream } +func (s *streamV3) ReadResponse(resp rhpv3.ProtocolObject, maxLen uint64) (err error) { + defer wrapRPCErr(&err, "ReadResponse") + return s.Stream.ReadResponse(resp, maxLen) +} + +func (s *streamV3) WriteResponse(resp rhpv3.ProtocolObject) (err error) { + defer wrapRPCErr(&err, "WriteResponse") + return s.Stream.WriteResponse(resp) +} + +func (s *streamV3) ReadRequest(req rhpv3.ProtocolObject, maxLen uint64) (err error) { + defer wrapRPCErr(&err, "ReadRequest") + return s.Stream.ReadRequest(req, maxLen) +} + +func (s *streamV3) WriteRequest(rpcID types.Specifier, req rhpv3.ProtocolObject) (err error) { + defer wrapRPCErr(&err, "WriteRequest") + return s.Stream.WriteRequest(rpcID, req) +} + // Close closes the stream and cancels the goroutine launched by DialStream. func (s *streamV3) Close() error { s.cancel() diff --git a/worker/rhpv3_test.go b/worker/rhpv3_test.go new file mode 100644 index 000000000..83f605807 --- /dev/null +++ b/worker/rhpv3_test.go @@ -0,0 +1,34 @@ +package worker + +import ( + "errors" + "fmt" + "testing" + + rhpv3 "go.sia.tech/core/rhp/v3" +) + +func TestWrapRPCErr(t *testing.T) { + // host error + err := fmt.Errorf("ReadResponse: %w", &rhpv3.RPCError{ + Description: "some host error", + }) + if err.Error() != "ReadResponse: some host error" { + t.Fatal("unexpected error:", err) + } + wrapRPCErr(&err, "ReadResponse") + if err.Error() != "ReadResponse: host responded with error: 'some host error'" { + t.Fatal("unexpected error:", err) + } else if !errors.Is(err, errHost) { + t.Fatalf("expected error to be wrapped with %v, got %v", errHost, err) + } + + // transport error + err = fmt.Errorf("ReadResponse: %w", errors.New("some transport error")) + wrapRPCErr(&err, "ReadResponse") + if err.Error() != "ReadResponse: transport error: 'some transport error'" { + t.Fatal("unexpected error:", err) + } else if !errors.Is(err, errTransport) { + t.Fatalf("expected error to be wrapped with %v, got %v", errHost, err) + } +} diff --git a/worker/uploader.go b/worker/uploader.go index 28b04033d..403accbc8 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -11,6 +11,7 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/stats" "go.uber.org/zap" ) @@ -298,7 +299,7 @@ func (u *uploader) tryRecomputeStats() { func (u *uploader) tryRefresh(ctx context.Context) bool { // fetch the renewed contract renewed, err := u.cs.RenewedContract(ctx, u.ContractID()) - if isError(err, api.ErrContractNotFound) || isError(err, context.Canceled) { + if utils.IsErr(err, api.ErrContractNotFound) || utils.IsErr(err, context.Canceled) { return false } else if err != nil { u.logger.Errorf("failed to fetch renewed contract %v, err: %v", u.ContractID(), err) diff --git a/worker/worker.go b/worker/worker.go index 9e4dacdd2..fb645840d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -25,6 +25,7 @@ import ( "go.sia.tech/renterd/api" "go.sia.tech/renterd/build" "go.sia.tech/renterd/hostdb" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" "go.sia.tech/renterd/webhooks" "go.sia.tech/renterd/worker/client" @@ -1197,7 +1198,7 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) { // fetch upload from bus upload, err := w.bus.MultipartUpload(ctx, uploadID) - if isError(err, api.ErrMultipartUploadNotFound) { + if utils.IsErr(err, api.ErrMultipartUploadNotFound) { jc.Error(err, http.StatusNotFound) return } else if jc.Check("failed to fetch multipart upload", err) != nil { @@ -1546,14 +1547,14 @@ func discardTxnOnErr(ctx context.Context, bus Bus, l *zap.SugaredLogger, txn typ } func isErrHostUnreachable(err error) bool { - return isError(err, os.ErrDeadlineExceeded) || - isError(err, context.DeadlineExceeded) || - isError(err, api.ErrHostOnPrivateNetwork) || - isError(err, errors.New("no route to host")) || - isError(err, errors.New("no such host")) || - isError(err, errors.New("connection refused")) || - isError(err, errors.New("unknown port")) || - isError(err, errors.New("cannot assign requested address")) + return utils.IsErr(err, os.ErrDeadlineExceeded) || + utils.IsErr(err, context.DeadlineExceeded) || + utils.IsErr(err, api.ErrHostOnPrivateNetwork) || + utils.IsErr(err, errors.New("no route to host")) || + utils.IsErr(err, errors.New("no such host")) || + utils.IsErr(err, errors.New("connection refused")) || + utils.IsErr(err, errors.New("unknown port")) || + utils.IsErr(err, errors.New("cannot assign requested address")) } func isErrDuplicateTransactionSet(err error) bool {