Skip to content

Commit

Permalink
routing: always update payment in the same goroutine
Browse files Browse the repository at this point in the history
This commit refactors `collectResultAsync` such that this method is now
only responsible for collecting results from the switch. A new method
`processSwitchResults` is added to process these results in the same
goroutine where we fetch the payment from db, to make sure the lifecycle
loop always have a consistent view of a given payment.
  • Loading branch information
yyforyongyu committed Oct 14, 2024
1 parent 6c77f75 commit abd6e94
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 101 deletions.
217 changes: 140 additions & 77 deletions routing/payment_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/routing/shards"
Expand All @@ -40,16 +41,19 @@ type paymentLifecycle struct {
quit chan struct{}

// resultCollected is used to signal that the result of an attempt has
// been collected. A nil error means the attempt is either successful
// or failed with temporary error. Otherwise, we should exit the
// lifecycle loop as a terminal error has occurred.
resultCollected chan error
// been collected.
resultCollected chan struct{}

// resultCollector is a function that is used to collect the result of
// an HTLC attempt, which is always mounted to `p.collectResultAsync`
// except in unit test, where we use a much simpler resultCollector to
// decouple the test flow for the payment lifecycle.
resultCollector func(attempt *channeldb.HTLCAttempt)

// switchResults is a map that holds the results for HTLC attempts
// returned from the htlcswitch.
switchResults lnutils.SyncMap[*channeldb.HTLCAttempt,
*htlcswitch.PaymentResult]
}

// newPaymentLifecycle initiates a new payment lifecycle and returns it.
Expand All @@ -66,8 +70,10 @@ func newPaymentLifecycle(r *ChannelRouter, feeLimit lnwire.MilliSatoshi,
shardTracker: shardTracker,
currentHeight: currentHeight,
quit: make(chan struct{}),
resultCollected: make(chan error, 1),
resultCollected: make(chan struct{}, 1),
firstHopCustomRecords: firstHopCustomRecords,
switchResults: lnutils.SyncMap[*channeldb.HTLCAttempt,
*htlcswitch.PaymentResult]{},
}

// Mount the result collector.
Expand Down Expand Up @@ -143,12 +149,7 @@ func (p *paymentLifecycle) decideNextStep(
// NOTE: we don't check `p.quit` since `decideNextStep` is
// running in the same goroutine as `resumePayment`.
select {
case err := <-p.resultCollected:
// If an error is returned, exit with it.
if err != nil {
return stepExit, err
}

case <-p.resultCollected:
log.Tracef("Received attempt result for payment %v",
p.identifier)

Expand Down Expand Up @@ -414,50 +415,51 @@ type attemptResult struct {
}

// collectResultAsync launches a goroutine that will wait for the result of the
// given HTLC attempt to be available then handle its result. Once received, it
// will send a nil error to channel `resultCollected` to indicate there's a
// result.
// given HTLC attempt to be available then save its result in a map. Once
// received, it will send a signal to channel `resultCollected` to indicate
// there's a result.
func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) {
log.Debugf("Collecting result for attempt %v in payment %v",
attempt.AttemptID, p.identifier)

go func() {
// Block until the result is available.
_, err := p.collectResult(attempt)
result, err := p.collectResult(attempt)
if err != nil {
log.Errorf("Error collecting result for attempt %v "+
"in payment %v: %v", attempt.AttemptID,
log.Errorf("Error collecting result for attempt %v in "+
"payment %v: %v", attempt.AttemptID,
p.identifier, err)
return

Check failure on line 431 in routing/payment_lifecycle.go

View workflow job for this annotation

GitHub Actions / lint code

return with no blank line before (nlreturn)
}

log.Debugf("Result collected for attempt %v in payment %v",
attempt.AttemptID, p.identifier)

// Once the result is collected, we signal it by writing the
// error to `resultCollected`.
// Save the result and process it in the next main loop.
p.switchResults.Store(attempt, result)

// Signal that a result has been collected.
select {
// Send the signal or quit.
case p.resultCollected <- err:
case p.resultCollected <- struct{}{}:

case <-p.quit:
log.Debugf("Lifecycle exiting while collecting "+
"result for payment %v", p.identifier)

case <-p.router.quit:
return
}
}()
}

// collectResult waits for the result for the given attempt to be available
// from the Switch, then records the attempt outcome with the control tower.
// An attemptResult is returned, indicating the final outcome of this HTLC
// attempt.
func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
*attemptResult, error) {
// collectResult waits for the result of the given HTLC attempt to be sent by
// the switch and returns it.
func (p *paymentLifecycle) collectResult(
attempt *channeldb.HTLCAttempt) (*htlcswitch.PaymentResult, error) {

log.Tracef("Collecting result for attempt %v", spew.Sdump(attempt))

result := &htlcswitch.PaymentResult{}

// Regenerate the circuit for this attempt.
circuit, err := attempt.Circuit()

Expand All @@ -473,8 +475,7 @@ func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
if err != nil {
log.Debugf("Unable to generate circuit for attempt %v: %v",
attempt.AttemptID, err)

return p.failAttempt(attempt.AttemptID, err)
return nil, err
}

// Using the created circuit, initialize the error decrypter, so we can
Expand All @@ -500,69 +501,29 @@ func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
log.Errorf("Failed getting result for attemptID %d "+
"from switch: %v", attempt.AttemptID, err)

return p.handleSwitchErr(attempt, err)
result.Error = err

return result, nil
}

// The switch knows about this payment, we'll wait for a result to be
// available.
var (
result *htlcswitch.PaymentResult
ok bool
)

select {
case result, ok = <-resultChan:
case r, ok := <-resultChan:
if !ok {
return nil, htlcswitch.ErrSwitchExiting
}

result = r

case <-p.quit:
return nil, ErrPaymentLifecycleExiting

case <-p.router.quit:
return nil, ErrRouterShuttingDown
}

// In case of a payment failure, fail the attempt with the control
// tower and return.
if result.Error != nil {
return p.handleSwitchErr(attempt, result.Error)
}

// We successfully got a payment result back from the switch.
log.Debugf("Payment %v succeeded with pid=%v",
p.identifier, attempt.AttemptID)

// Report success to mission control.
err = p.router.cfg.MissionControl.ReportPaymentSuccess(
attempt.AttemptID, &attempt.Route,
)
if err != nil {
log.Errorf("Error reporting payment success to mc: %v", err)
}

// In case of success we atomically store settle result to the DB move
// the shard to the settled state.
htlcAttempt, err := p.router.cfg.Control.SettleAttempt(
p.identifier, attempt.AttemptID,
&channeldb.HTLCSettleInfo{
Preimage: result.Preimage,
SettleTime: p.router.cfg.Clock.Now(),
},
)
if err != nil {
log.Errorf("Error settling attempt %v for payment %v with "+
"preimage %v: %v", attempt.AttemptID, p.identifier,
result.Preimage, err)

// We won't mark the attempt as failed since we already have
// the preimage.
return nil, err
}

return &attemptResult{
attempt: htlcAttempt,
}, nil
return result, nil
}

// registerAttempt is responsible for creating and saving an HTLC attempt in db
Expand Down Expand Up @@ -1079,6 +1040,11 @@ func (p *paymentLifecycle) reloadInflightAttempts() (DBMPPayment, error) {
// all its attempt results are processed.
func (p *paymentLifecycle) refreshPayment() (DBMPPayment,
*channeldb.MPPaymentState, error) {
// Process the stored results first as they will affect the state of
// the payment.
if err := p.processSwitchResults(); err != nil {
return nil, nil, err
}

// Read the db to get the latest state of the payment.
payment, err := p.router.cfg.Control.FetchPayment(p.identifier)
Expand All @@ -1095,3 +1061,100 @@ func (p *paymentLifecycle) refreshPayment() (DBMPPayment,

return payment, ps, nil
}

// processSwitchResults reads the `p.results` map and process the results
// returned from the htlcswitch.
func (p *paymentLifecycle) processSwitchResults() error {
// Create a slice to remember the results of the attempts that we have
// processed.
attempts := make([]*channeldb.HTLCAttempt, 0, p.switchResults.Len())

var errReturned error

// Range over the map to process all the results.
p.switchResults.Range(func(a *channeldb.HTLCAttempt,
result *htlcswitch.PaymentResult) bool {

// Save the keys so we know which items to delete from the map.
attempts = append(attempts, a)

// Handle the attempt result. If an error is returned here, it
// means the payment lifecycle needs to be terminated.
_, err := p.handleAttemptResult(a, result)
if err != nil {
errReturned = err
}

// Always return true so we will process all results.
return true
})

// Clean up the processed results.
for _, a := range attempts {
p.switchResults.Delete(a)
}

return errReturned
}

// handleAttemptResult processes the result of an HTLC attempt returned from
// the htlcswitch.
func (p *paymentLifecycle) handleAttemptResult(attempt *channeldb.HTLCAttempt,
result *htlcswitch.PaymentResult) (*attemptResult, error) {

// In case of a payment failure, fail the attempt with the control
// tower and return.
if result.Error != nil {
return p.handleSwitchErr(attempt, result.Error)
}

// We successfully got a payment result back from the switch.
log.Debugf("Payment %v succeeded with pid=%v", p.identifier,
attempt.AttemptID)

// Report success to mission control.
err := p.router.cfg.MissionControl.ReportPaymentSuccess(
attempt.AttemptID, &attempt.Route,
)
if err != nil {
log.Errorf("Error reporting payment success to mc: %v", err)
}

// In case of success we atomically store settle result to the DB move
// the shard to the settled state.
htlcAttempt, err := p.router.cfg.Control.SettleAttempt(
p.identifier, attempt.AttemptID,
&channeldb.HTLCSettleInfo{
Preimage: result.Preimage,
SettleTime: p.router.cfg.Clock.Now(),
},
)
if err != nil {
log.Errorf("Error settling attempt %v for payment %v with "+
"preimage %v: %v", attempt.AttemptID, p.identifier,
result.Preimage, err)

// We won't mark the attempt as failed since we already have
// the preimage.
return nil, err
}

return &attemptResult{
attempt: htlcAttempt,
}, nil
}

// collectAndHandleResult waits for the result for the given attempt to be
// available from the Switch, then records the attempt outcome with the control
// tower. An attemptResult is returned, indicating the final outcome of this
// HTLC attempt.
func (p *paymentLifecycle) collectAndHandleResult(
attempt *channeldb.HTLCAttempt) (*attemptResult, error) {

result, err := p.collectResult(attempt)
if err != nil {
return nil, err
}

return p.handleAttemptResult(attempt, result)
}
Loading

0 comments on commit abd6e94

Please sign in to comment.