-
Notifications
You must be signed in to change notification settings - Fork 4
/
balancer.go
160 lines (144 loc) · 4.19 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package gorb
import (
"database/sql"
"errors"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/go-gorp/gorp"
)
// Balancer embeds multiple connections to physical db and automatically distributes
// queries with a round-robin scheduling around a master/replica replication.
// Write queries are executed by the Master.
// Read queries(SELECTs) are executed by the replicas.
type Balancer struct {
*gorp.DbMap // master
replicas []*gorp.DbMap
count uint64
mu sync.RWMutex
masterCanRead bool
}
// NewBalancer opens a connection to each physical db.
// dataSourceNames must be a semi-comma separated list of DSNs with the first
// one being used as the master and the rest as replicas.
func NewBalancer(driverName string, dialect gorp.Dialect, sources string) (*Balancer, error) {
conns := strings.Split(sources, ";")
if len(conns) == 0 {
return nil, errors.New("empty servers list")
}
b := &Balancer{}
for i, c := range conns {
if len(c) == 0 { // trailing ;
continue
}
s, err := sql.Open(driverName, c)
if err != nil {
return nil, err
}
mapper := &gorp.DbMap{Db: s, Dialect: dialect}
if i == 0 { // first is the master
b.DbMap = mapper
} else {
b.replicas = append(b.replicas, mapper)
}
}
if len(b.replicas) == 0 {
b.replicas = append(b.replicas, b.DbMap)
b.masterCanRead = true
}
return b, nil
}
// MasterCanRead adds the master physical database to the replicas list if read==true
// so that the master can perform WRITE queries AND READ queries .
func (b *Balancer) MasterCanRead(read bool) {
b.mu.Lock()
defer b.mu.Unlock()
if read == true && b.masterCanRead == false {
b.replicas = append(b.replicas, b.DbMap)
b.masterCanRead = read
}
if read == false && b.masterCanRead == true && len(b.replicas) > 1 {
replicas := []*gorp.DbMap{}
for _, db := range b.replicas {
if db != b.DbMap {
replicas = append(replicas, db)
}
}
b.replicas = replicas
b.masterCanRead = read
}
}
// Ping verifies if a connection to each physical database is still alive, establishing a connection if necessary.
func (b *Balancer) Ping() error {
var err, innerErr error
for _, db := range b.GetAllDbs() {
innerErr = db.Db.Ping()
if innerErr != nil {
err = innerErr
}
}
return err
}
// SetMaxIdleConns sets the maximum number of connections
// If MaxOpenConns is greater than 0 but less than the new MaxIdleConns then the
// new MaxIdleConns will be reduced to match the MaxOpenConns limit
// If n <= 0, no idle connections are retained.
func (b *Balancer) SetMaxIdleConns(n int) {
for _, db := range b.GetAllDbs() {
db.Db.SetMaxIdleConns(n)
}
}
// SetMaxOpenConns sets the maximum number of open connections
// If MaxIdleConns is greater than 0 and the new MaxOpenConns
// is less than MaxIdleConns, then MaxIdleConns will be reduced to match
// the new MaxOpenConns limit. If n <= 0, then there is no limit on the number
// of open connections. The default is 0 (unlimited).
func (b *Balancer) SetMaxOpenConns(n int) {
for _, db := range b.GetAllDbs() {
db.Db.SetMaxOpenConns(n)
}
}
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
// Expired connections may be closed lazily before reuse.
// If d <= 0, connections are reused forever.
func (b *Balancer) SetConnMaxLifetime(d time.Duration) {
for _, db := range b.GetAllDbs() {
db.Db.SetConnMaxLifetime(d)
}
}
// Master returns the master database
func (b *Balancer) Master() *gorp.DbMap {
return b.DbMap
}
// Replica returns one of the replicas databases
func (b *Balancer) Replica() *gorp.DbMap {
b.mu.RLock()
defer b.mu.RUnlock()
return b.replicas[b.replica()]
}
// GetAllDbs returns each underlying physical database,
// the first one is the master
func (b *Balancer) GetAllDbs() []*gorp.DbMap {
dbs := []*gorp.DbMap{}
dbs = append(dbs, b.DbMap)
dbs = append(dbs, b.replicas...)
return dbs
}
// Close closes all physical databases
func (b *Balancer) Close() error {
var err, innerErr error
for _, db := range b.GetAllDbs() {
innerErr = db.Db.Close()
if innerErr != nil {
err = innerErr
}
}
return err
}
func (b *Balancer) replica() int {
if len(b.replicas) == 1 {
return 0
}
return int((atomic.AddUint64(&b.count, 1) % uint64(len(b.replicas))))
}