Skip to content

Commit

Permalink
Move sector cache to internal package (#1444)
Browse files Browse the repository at this point in the history
I went back and fourth on naming, both for the sector cache as well as
the contract locks. We can rename it later or we can do it now I don't
mind. Couldn't think of better alternatives that fit well on the `bus`
struct.
  • Loading branch information
ChrisSchinnerl authored Aug 16, 2024
2 parents 4ae7427 + c578d2a commit 26596b0
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 190 deletions.
49 changes: 30 additions & 19 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ type (
UnconfirmedParents(txn types.Transaction) ([]types.Transaction, error)
}

UploadingSectorsCache interface {
AddSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error
FinishUpload(uID api.UploadID)
HandleRenewal(fcid, renewedFrom types.FileContractID)
Pending(fcid types.FileContractID) (size uint64)
Sectors(fcid types.FileContractID) (roots []types.Hash256)
StartUpload(uID api.UploadID) error
}

PinManager interface {
Shutdown(context.Context) error
TriggerUpdate()
Expand Down Expand Up @@ -273,11 +282,10 @@ type Bus struct {
alertMgr AlertManager
pinMgr PinManager
webhooksMgr WebhooksManager

cm ChainManager
cs ChainSubscriber
s Syncer
w Wallet
cm ChainManager
cs ChainSubscriber
s Syncer
w Wallet

as AutopilotStore
eas EphemeralAccountStore
Expand All @@ -286,28 +294,28 @@ type Bus struct {
mtrcs MetricsStore
ss SettingStore

accounts *accounts
contractLocks *contractLocks
uploadingSectors *uploadingSectorsCache
accounts *accounts
contractLocks *contractLocks
sectors UploadingSectorsCache

logger *zap.SugaredLogger
}

// New returns a new Bus
func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, announcementMaxAge time.Duration, l *zap.Logger) (*Bus, error) {
l = l.Named("bus")

b := &Bus{
s: s,
cm: cm,
w: w,
hs: store,
as: store,
ms: store,
mtrcs: store,
ss: store,
eas: store,
contractLocks: newContractLocks(),
uploadingSectors: newUploadingSectorsCache(),
s: s,
cm: cm,
w: w,
hs: store,
as: store,
ms: store,
mtrcs: store,
ss: store,
eas: store,
contractLocks: newContractLocks(),

alerts: alerts.WithOrigin(am, "bus"),
alertMgr: am,
Expand All @@ -327,6 +335,9 @@ func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManag
return nil, err
}

// create sectors cache
b.sectors = ibus.NewSectorsCache()

// create pin manager
b.pinMgr = ibus.NewPinManager(b.alerts, wm, store, defaultPinUpdateInterval, defaultPinRateWindow, l)

Expand Down
14 changes: 7 additions & 7 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func (b *Bus) contractsPrunableDataHandlerGET(jc jape.Context) {
// adjust the amount of prunable data with the pending uploads, due to
// how we record contract spending a contract's size might already
// include pending sectors
pending := b.uploadingSectors.Pending(fcid)
pending := b.sectors.Pending(fcid)
if pending > size.Prunable {
size.Prunable = 0
} else {
Expand Down Expand Up @@ -907,7 +907,7 @@ func (b *Bus) contractSizeHandlerGET(jc jape.Context) {
// adjust the amount of prunable data with the pending uploads, due to how
// we record contract spending a contract's size might already include
// pending sectors
pending := b.uploadingSectors.Pending(id)
pending := b.sectors.Pending(id)
if pending > size.Prunable {
size.Prunable = 0
} else {
Expand Down Expand Up @@ -985,7 +985,7 @@ func (b *Bus) contractIDRenewedHandlerPOST(jc jape.Context) {
return
}

b.uploadingSectors.HandleRenewal(req.Contract.ID(), req.RenewedFrom)
b.sectors.HandleRenewal(req.Contract.ID(), req.RenewedFrom)
b.broadcastAction(webhooks.Event{
Module: api.ModuleContract,
Event: api.EventRenew,
Expand All @@ -1008,7 +1008,7 @@ func (b *Bus) contractIDRootsHandlerGET(jc jape.Context) {
if jc.Check("couldn't fetch contract sectors", err) == nil {
jc.Encode(api.ContractRootsResponse{
Roots: roots,
Uploading: b.uploadingSectors.Sectors(id),
Uploading: b.sectors.Sectors(id),
})
}
}
Expand Down Expand Up @@ -1955,7 +1955,7 @@ func (b *Bus) stateHandlerGET(jc jape.Context) {
func (b *Bus) uploadTrackHandlerPOST(jc jape.Context) {
var id api.UploadID
if jc.DecodeParam("id", &id) == nil {
jc.Check("failed to track upload", b.uploadingSectors.StartUpload(id))
jc.Check("failed to track upload", b.sectors.StartUpload(id))
}
}

Expand All @@ -1968,13 +1968,13 @@ func (b *Bus) uploadAddSectorHandlerPOST(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
jc.Check("failed to add sector", b.uploadingSectors.AddSector(id, req.ContractID, req.Root))
jc.Check("failed to add sector", b.sectors.AddSector(id, req.ContractID, req.Root))
}

func (b *Bus) uploadFinishedHandlerDELETE(jc jape.Context) {
var id api.UploadID
if jc.DecodeParam("id", &id) == nil {
b.uploadingSectors.FinishUpload(id)
b.sectors.FinishUpload(id)
}
}

Expand Down
120 changes: 0 additions & 120 deletions bus/uploadingsectors_test.go

This file was deleted.

Loading

0 comments on commit 26596b0

Please sign in to comment.