-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool_async.go
128 lines (116 loc) · 2.74 KB
/
pool_async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package poolasync
import (
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
)
const DefaultLimit = 10
// PoolAsync has a pool for async.
type PoolAsync struct {
// pool properties
poolChan chan int
poolSize int
// job properties
jobChan chan int
jobSize int32
// job error list. the cursor is the index of jobs.
// if job doesn't have error, then the element is nil
errList []error
locker sync.Mutex
}
func NewPoolAsync(count int) *PoolAsync {
if count <= 0 {
count = DefaultLimit
}
result := &PoolAsync{
poolChan: make(chan int, count),
jobChan: make(chan int),
poolSize: count,
errList: make([]error, 0),
}
// init pool chan.
for i := 0; i < count; i++ {
result.poolChan <- i
}
return result
}
func NewDefaultPoolAsync() *PoolAsync {
return NewPoolAsync(DefaultLimit)
}
// Deprecated
//
// get a ticket from poolChan. if got, then do f(), if not, wait for poolChan.
func (a *PoolAsync) Do(f func()) *PoolAsync {
ticket := <-a.poolChan
atomic.AddInt32(&a.jobSize, 1)
go func() {
defer func() {
a.poolChan <- ticket //release pool ticket
a.jobChan <- ticket // done job.
atomic.AddInt32(&a.jobSize, -1) // decrease job size
}()
f()
}()
return a
}
// get a ticket from poolChan. if got, then do f(), if not, wait for poolChan.
func (a *PoolAsync) DoWitError(f func() error) *PoolAsync {
ticket := <-a.poolChan
currentIdx := atomic.AddInt32(&a.jobSize, 1) - 1
// add a new element for later use
a.locker.Lock()
a.errList = append(a.errList, nil)
a.locker.Unlock()
go func() {
defer func() {
a.poolChan <- ticket //release pool ticket
a.jobChan <- ticket // done job.
atomic.AddInt32(&a.jobSize, -1) // decrease job size
if err := recover(); err != nil {
debug.PrintStack()
a.locker.Lock()
a.errList[currentIdx] = fmt.Errorf("PoolAsync.DoWitError %v panic: %v", currentIdx, err)
a.locker.Unlock()
}
}()
err := f()
if err != nil {
// log error
//fmt.Printf("PoolAsync.Do called function return error: %v\n", err)
}
a.locker.Lock()
a.errList[currentIdx] = err
a.locker.Unlock()
}()
return a
}
// wait will block until all jobs done. and return the first none-nil error.
//
// IMPORTANT: Wait must be called after ALL Do/DoWitError
func (a *PoolAsync) Wait() error {
var i int32
// should get size before loop
loopCount := a.GetCurrentJobSize()
for i = 0; i < loopCount; i++ {
select {
case <-a.jobChan:
}
}
for _, er := range a.errList {
if er != nil {
return er
}
}
return nil
}
func (a *PoolAsync) GetCurrentJobSize() int32 {
return atomic.LoadInt32(&a.jobSize)
}
func (a *PoolAsync) GetErrors() []error {
result := make([]error, 0)
for _, v := range a.errList {
result = append(result, v)
}
return result
}