Skip to content

Commit

Permalink
don't wait for quit
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Nov 1, 2024
1 parent 93f30bb commit afea0b6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 33 deletions.
10 changes: 1 addition & 9 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ type State struct {

// offline state sync height indicating to which height the node synced offline
offlineStateSyncHeight int64

// run tasks asynchronously to avoid blocking block executor, currently mainly for firing tx/block events.
taskRunner *TaskRunner
}

// StateOption sets an optional parameter on the State.
Expand All @@ -166,8 +163,7 @@ func NewState(
evpool evidencePool,
options ...StateOption,
) *State {
taskRunner := StartTaskRunner(taskQueueSize)
blockExec.SetTaskRunner(taskRunner.RunTask)
blockExec.SetTaskRunner(StartTaskRunner(taskQueueSize))
cs := &State{
config: config,
blockExec: blockExec,
Expand All @@ -183,7 +179,6 @@ func NewState(
evpool: evpool,
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
taskRunner: taskRunner,
}
for _, option := range options {
option(cs)
Expand Down Expand Up @@ -447,9 +442,6 @@ func (cs *State) OnStop() {
cs.Logger.Error("failed trying to stop timeoutTicket", "error", err)
}
// WAL is stopped in receiveRoutine.

// Stop the task runner
cs.taskRunner.StopAndWait()
}

// Wait waits for the the main routine to return.
Expand Down
31 changes: 7 additions & 24 deletions consensus/task.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,14 @@
package consensus

import "sync"

type TaskRunner struct {
wg sync.WaitGroup
taskCh chan func()
}

func StartTaskRunner(buf int) *TaskRunner {
tr := &TaskRunner{
taskCh: make(chan func(), buf),
}
tr.wg.Add(1)
// StartTaskRunner spawn a single goroutine to run tasks in FIFO order.
func StartTaskRunner(buf int) func(func()) {
taskCh := make(chan func(), buf)
go func() {
defer tr.wg.Done()
for f := range tr.taskCh {
for f := range taskCh {
f()
}
}()
return tr
}

func (tr *TaskRunner) StopAndWait() {
close(tr.taskCh)
tr.wg.Wait()
}

func (tr *TaskRunner) RunTask(f func()) {
tr.taskCh <- f
return func(f func()) {
taskCh <- f
}
}

0 comments on commit afea0b6

Please sign in to comment.