Skip to content

Commit

Permalink
sweep: Remove publishing last-tx logic.
Browse files Browse the repository at this point in the history
We remove the publishing of the last published sweep tx during the
startup of the sweeper. This republishing can lead to situations
where funds of the default wallet might be locked for neutrino
backend clients.
Moreover all related tests are removed as well.
  • Loading branch information
ziggie1984 authored and JssDWt committed Aug 17, 2023
1 parent 39aeafc commit b9b7d37
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 169 deletions.
67 changes: 0 additions & 67 deletions sweep/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,13 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/kvdb"
)

var (
// lastTxBucketKey is the key that points to a bucket containing a
// single item storing the last published tx.
//
// maps: lastTxKey -> serialized_tx
lastTxBucketKey = []byte("sweeper-last-tx")

// lastTxKey is the fixed key under which the serialized tx is stored.
lastTxKey = []byte("last-tx")

// txHashesBucketKey is the key that points to a bucket containing the
// hashes of all sweep txes that were published successfully.
//
Expand Down Expand Up @@ -52,10 +42,6 @@ type SweeperStore interface {
// NotifyPublishTx signals that we are about to publish a tx.
NotifyPublishTx(*wire.MsgTx) error

// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
GetLastPublishedTx() (*wire.MsgTx, error)

// ListSweeps lists all the sweeps we have successfully published.
ListSweeps() ([]chainhash.Hash, error)
}
Expand All @@ -69,13 +55,6 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
SweeperStore, error) {

err := kvdb.Update(db, func(tx kvdb.RwTx) error {
_, err := tx.CreateTopLevelBucket(
lastTxBucketKey,
)
if err != nil {
return err
}

if tx.ReadWriteBucket(txHashesBucketKey) != nil {
return nil
}
Expand Down Expand Up @@ -171,64 +150,18 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
// NotifyPublishTx signals that we are about to publish a tx.
func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error {
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
lastTxBucket := tx.ReadWriteBucket(lastTxBucketKey)
if lastTxBucket == nil {
return errors.New("last tx bucket does not exist")
}

txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
if txHashesBucket == nil {
return errNoTxHashesBucket
}

var b bytes.Buffer
if err := sweepTx.Serialize(&b); err != nil {
return err
}

if err := lastTxBucket.Put(lastTxKey, b.Bytes()); err != nil {
return err
}

hash := sweepTx.TxHash()

return txHashesBucket.Put(hash[:], []byte{})
}, func() {})
}

// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
func (s *sweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
var sweepTx *wire.MsgTx

err := kvdb.View(s.db, func(tx kvdb.RTx) error {
lastTxBucket := tx.ReadBucket(lastTxBucketKey)
if lastTxBucket == nil {
return errors.New("last tx bucket does not exist")
}

sweepTxRaw := lastTxBucket.Get(lastTxKey)
if sweepTxRaw == nil {
return nil
}

sweepTx = &wire.MsgTx{}
txReader := bytes.NewReader(sweepTxRaw)
if err := sweepTx.Deserialize(txReader); err != nil {
return fmt.Errorf("tx deserialize: %v", err)
}

return nil
}, func() {
sweepTx = nil
})
if err != nil {
return nil, err
}

return sweepTx, nil
}

// IsOurTx determines whether a tx is published by us, based on its
// hash.
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
Expand Down
8 changes: 0 additions & 8 deletions sweep/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
// MockSweeperStore is a mock implementation of sweeper store. This type is
// exported, because it is currently used in nursery tests too.
type MockSweeperStore struct {
lastTx *wire.MsgTx
ourTxes map[chainhash.Hash]struct{}
}

Expand All @@ -30,17 +29,10 @@ func (s *MockSweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
func (s *MockSweeperStore) NotifyPublishTx(tx *wire.MsgTx) error {
txHash := tx.TxHash()
s.ourTxes[txHash] = struct{}{}
s.lastTx = tx

return nil
}

// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
// for.
func (s *MockSweeperStore) GetLastPublishedTx() (*wire.MsgTx, error) {
return s.lastTx, nil
}

// ListSweeps lists all the sweeps we have successfully published.
func (s *MockSweeperStore) ListSweeps() ([]chainhash.Hash, error) {
var txns []chainhash.Hash
Expand Down
19 changes: 0 additions & 19 deletions sweep/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) {
t.Fatal(err)
}

// Initially we expect the store not to have a last published tx.
retrievedTx, err := store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
if retrievedTx != nil {
t.Fatal("expected no last published tx")
}

// Notify publication of tx1
tx1 := wire.MsgTx{}
tx1.AddTxIn(&wire.TxIn{
Expand Down Expand Up @@ -83,16 +74,6 @@ func testStore(t *testing.T, createStore func() (SweeperStore, error)) {
t.Fatal(err)
}

// Assert that last published tx2 is present.
retrievedTx, err = store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}

if tx2.TxHash() != retrievedTx.TxHash() {
t.Fatal("txes do not match")
}

// Assert that both txes are recognized as our own.
ours, err := store.IsOurTx(tx1.TxHash())
if err != nil {
Expand Down
24 changes: 0 additions & 24 deletions sweep/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,30 +340,6 @@ func (s *UtxoSweeper) Start() error {

log.Info("Sweeper starting")

// Retrieve last published tx from database.
lastTx, err := s.cfg.Store.GetLastPublishedTx()
if err != nil {
return fmt.Errorf("get last published tx: %v", err)
}

// Republish in case the previous call crashed lnd. We don't care about
// the return value, because inputs will be re-offered and retried
// anyway. The only reason we republish here is to prevent the corner
// case where lnd goes into a restart loop because of a crashing publish
// tx where we keep deriving new output script. By publishing and
// possibly crashing already now, we haven't derived a new output script
// yet.
if lastTx != nil {
log.Debugf("Publishing last tx %v", lastTx.TxHash())

// Error can be ignored. Because we are starting up, there are
// no pending inputs to update based on the publish result.
err := s.cfg.Wallet.PublishTransaction(lastTx, "")
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Errorf("last tx publish: %v", err)
}
}

// Retrieve relay fee for dust limit calculation. Assume that this will
// not change from here on.
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
Expand Down
51 changes: 0 additions & 51 deletions sweep/sweeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,16 +404,6 @@ func TestSuccess(t *testing.T) {
}

ctx.finish(1)

// Assert that last tx is stored in the database so we can republish
// on restart.
lastTx, err := ctx.store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
if lastTx == nil || sweepTx.TxHash() != lastTx.TxHash() {
t.Fatalf("last tx not stored")
}
}

// TestDust asserts that inputs that are not big enough to raise above the dust
Expand Down Expand Up @@ -780,9 +770,6 @@ func TestRestart(t *testing.T) {
// Restart sweeper.
ctx.restartSweeper()

// Expect last tx to be republished.
ctx.receiveTx()

// Simulate other subsystem (e.g. contract resolver) re-offering inputs.
spendChan1, err := ctx.sweeper.SweepInput(input1, defaultFeePref)
if err != nil {
Expand Down Expand Up @@ -830,9 +817,6 @@ func TestRestart(t *testing.T) {
// Restart sweeper again. No action is expected.
ctx.restartSweeper()

// Expect last tx to be republished.
ctx.receiveTx()

ctx.finish(1)
}

Expand Down Expand Up @@ -861,9 +845,6 @@ func TestRestartRemoteSpend(t *testing.T) {
// Restart sweeper.
ctx.restartSweeper()

// Expect last tx to be republished.
ctx.receiveTx()

// Replace the sweep tx with a remote tx spending input 1.
ctx.backend.deleteUnconfirmed(sweepTx.TxHash())

Expand Down Expand Up @@ -918,9 +899,6 @@ func TestRestartConfirmed(t *testing.T) {
// Restart sweeper.
ctx.restartSweeper()

// Expect last tx to be republished.
ctx.receiveTx()

// Mine the sweep tx.
ctx.backend.mine()

Expand All @@ -939,35 +917,6 @@ func TestRestartConfirmed(t *testing.T) {
ctx.finish(1)
}

// TestRestartRepublish asserts that sweeper republishes the last published
// tx on restart.
func TestRestartRepublish(t *testing.T) {
ctx := createSweeperTestContext(t)

_, err := ctx.sweeper.SweepInput(spendableInputs[0], defaultFeePref)
if err != nil {
t.Fatal(err)
}

ctx.tick()

sweepTx := ctx.receiveTx()

// Restart sweeper again. No action is expected.
ctx.restartSweeper()

republishedTx := ctx.receiveTx()

if sweepTx.TxHash() != republishedTx.TxHash() {
t.Fatalf("last tx not republished")
}

// Mine the tx to conclude the test properly.
ctx.backend.mine()

ctx.finish(1)
}

// TestRetry tests the sweeper retry flow.
func TestRetry(t *testing.T) {
ctx := createSweeperTestContext(t)
Expand Down

0 comments on commit b9b7d37

Please sign in to comment.