Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement wallet.ApplyTx and wallet.RevertTx #1013

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 121 additions & 2 deletions stores/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/wallet"
"go.uber.org/zap"
"gorm.io/gorm"
)

var _ chain.Subscriber = (*chainSubscriber)(nil)
var (
_ chain.Subscriber = (*chainSubscriber)(nil)
_ wallet.ApplyTx = (*chainSubscriber)(nil)
_ wallet.RevertTx = (*chainSubscriber)(nil)
)

type (
chainSubscriber struct {
Expand All @@ -34,10 +39,12 @@ type (
persistTimer *time.Timer

announcements []announcement
events []eventChange

contractState map[types.Hash256]contractState
hosts map[types.PublicKey]struct{}
mayCommit bool
outputs []outputChange
outputs map[types.Hash256]outputChange
proofs map[types.Hash256]uint64
revisions map[types.Hash256]revisionUpdate
transactions []txnChange
Expand All @@ -55,6 +62,7 @@ func NewChainSubscriber(db *gorm.DB, logger *zap.SugaredLogger, intvls []time.Du
persistInterval: persistInterval,

contractState: make(map[types.Hash256]contractState),
outputs: make(map[types.Hash256]outputChange),
hosts: make(map[types.PublicKey]struct{}),
proofs: make(map[types.Hash256]uint64),
revisions: make(map[types.Hash256]revisionUpdate),
Expand Down Expand Up @@ -462,3 +470,114 @@ func (cs *chainSubscriber) processChainRevertUpdateContracts(cru *chain.RevertUp
func (cs *chainSubscriber) retryTransaction(fc func(tx *gorm.DB) error, opts ...*sql.TxOptions) error {
return retryTransaction(cs.db, cs.logger, fc, cs.retryIntervals, opts...)
}

// AddEvents is called with all relevant events added in the update.
func (cs *chainSubscriber) AddEvents(events []wallet.Event) error {
for _, event := range events {
cs.events = append(cs.events, eventChange{
addition: true,
event: dbWalletEvent{
EventID: hash256(event.ID),
Inflow: currency(event.Inflow),
Outflow: currency(event.Outflow),
Transaction: event.Transaction,
MaturityHeight: event.MaturityHeight,
Source: string(event.Source),
Timestamp: event.Timestamp.Unix(),
Height: event.Index.Height,
BlockID: hash256(event.Index.ID),
},
})
}
return nil
}

// AddSiacoinElements is called with all new siacoin elements in the
// update. Ephemeral siacoin elements are not included.
func (cs *chainSubscriber) AddSiacoinElements(elements []wallet.SiacoinElement) error {
for _, el := range elements {
if _, ok := cs.outputs[el.ID]; ok {
return fmt.Errorf("output %q already exists", el.ID)
}
cs.outputs[el.ID] = outputChange{
addition: true,
se: dbWalletOutput{
OutputID: hash256(el.ID),
LeafIndex: el.StateElement.LeafIndex,
MerkleProof: merkleProof{proof: el.StateElement.MerkleProof},
Value: currency(el.SiacoinOutput.Value),
Address: hash256(el.SiacoinOutput.Address),
MaturityHeight: el.MaturityHeight,
Height: el.Index.Height,
BlockID: hash256(el.Index.ID),
},
}
}

return nil
}

// RemoveSiacoinElements is called with all siacoin elements that were
// spent in the update.
func (cs *chainSubscriber) RemoveSiacoinElements(ids []types.SiacoinOutputID) error {
for _, id := range ids {
if _, ok := cs.outputs[types.Hash256(id)]; ok {
return fmt.Errorf("output %q not found", id)
}

cs.outputs[types.Hash256(id)] = outputChange{
addition: false,
se: dbWalletOutput{
OutputID: hash256(id),
},
}
}
return nil
}

// WalletStateElements returns all state elements in the database. It is used
// to update the proofs of all state elements affected by the update.
func (cs *chainSubscriber) WalletStateElements() (elements []types.StateElement, _ error) {
for id, el := range cs.outputs {
elements = append(elements, types.StateElement{
ID: id,
LeafIndex: el.se.LeafIndex,
MerkleProof: el.se.MerkleProof.proof,
})
}
return
}

// UpdateStateElements updates the proofs of all state elements affected by the
// update.
func (cs *chainSubscriber) UpdateStateElements(elements []types.StateElement) error {
for _, se := range elements {
curr := cs.outputs[se.ID]
curr.se.MerkleProof = merkleProof{proof: se.MerkleProof}
curr.se.LeafIndex = se.LeafIndex
cs.outputs[se.ID] = curr
}
return nil
}

// RevertIndex is called with the chain index that is being reverted. Any events
// and siacoin elements that were created by the index should be removed.
func (cs *chainSubscriber) RevertIndex(index types.ChainIndex) error {
// remove any events that were added in the reverted block
filtered := cs.events[:0]
for i := range cs.events {
if cs.events[i].event.Index() != index {
filtered = append(filtered, cs.events[i])
}
}
cs.events = filtered

// remove any siacoin elements that were added in the reverted block
for id, el := range cs.outputs {
if el.se.Index() == index {
delete(cs.outputs, id)
}
}

return nil
}
65 changes: 62 additions & 3 deletions stores/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,55 @@ type (
Timestamp int64 `gorm:"index:idx_transactions_timestamp"`
}

dbWalletEvent struct {
Model

// event
EventID hash256 `gorm:"unique;index:idx_events_event_id;NOT NULL;size:32"`
Inflow currency
Outflow currency
Transaction types.Transaction `gorm:"serializer:json"`
MaturityHeight uint64 `gorm:"index:idx_wallet_events_maturity_height"`
Source string `gorm:"index:idx_wallet_events_source"`
Timestamp int64 `gorm:"index:idx_wallet_events_timestamp"`

// chain index
Height uint64 `gorm:"index:idx_wallet_events_height"`
BlockID hash256 `gorm:"size:32"`
}

txnChange struct {
addition bool
txnID hash256
txn dbTransaction
}

dbWalletOutput struct {
Model

// siacoin element
OutputID hash256 `gorm:"unique;index:idx_wallet_outputs_output_id;NOT NULL;size:32"`
LeafIndex uint64
MerkleProof merkleProof
Value currency
Address hash256 `gorm:"size:32"`
MaturityHeight uint64 `gorm:"index:idx_wallet_outputs_maturity_height"`

// chain index
Height uint64 `gorm:"index:idx_wallet_outputs_height"`
BlockID hash256 `gorm:"size:32"`
}

outputChange struct {
addition bool
oid hash256
sco dbSiacoinElement
se dbWalletOutput
}

txnChange struct {
eventChange struct {
addition bool
txnID hash256
txn dbTransaction
event dbWalletEvent
}
)

Expand All @@ -51,6 +90,26 @@ func (dbSiacoinElement) TableName() string { return "siacoin_elements" }
// TableName implements the gorm.Tabler interface.
func (dbTransaction) TableName() string { return "transactions" }

// TableName implements the gorm.Tabler interface.
func (dbWalletEvent) TableName() string { return "wallet_events" }

// TableName implements the gorm.Tabler interface.
func (dbWalletOutput) TableName() string { return "wallet_outputs" }

func (e dbWalletEvent) Index() types.ChainIndex {
return types.ChainIndex{
Height: e.Height,
ID: types.BlockID(e.BlockID),
}
}

func (se dbWalletOutput) Index() types.ChainIndex {
return types.ChainIndex{
Height: se.Height,
ID: types.BlockID(se.BlockID),
}
}

func (s *SQLStore) Height() uint64 {
s.persistMu.Lock()
height := s.chainIndex.Height
Expand Down
Loading