-
Notifications
You must be signed in to change notification settings - Fork 5
/
conn.go
193 lines (163 loc) · 4.09 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package marionette
import (
"io"
"net"
"strings"
"sync"
)
type BufferedConn struct {
net.Conn
mu sync.RWMutex
buf []byte
err error
closing chan struct{}
once sync.Once
seekNotify chan struct{} // sent when seeking forward
writeNotify chan struct{} // sent when data has been written to the buffer.
}
func NewBufferedConn(conn net.Conn, bufferSize int) *BufferedConn {
c := &BufferedConn{
Conn: conn,
buf: make([]byte, 0, bufferSize*2),
closing: make(chan struct{}, 0),
seekNotify: make(chan struct{}, 1),
writeNotify: make(chan struct{}, 1),
}
go c.monitor()
return c
}
// Close closes the connection.
func (conn *BufferedConn) Close() error {
conn.once.Do(func() { close(conn.closing) })
return conn.Conn.Close()
}
// Append adds b to the end of the buffer.
func (conn *BufferedConn) Append(b []byte) {
conn.mu.Lock()
defer conn.mu.Unlock()
copy(conn.buf[len(conn.buf):len(conn.buf)+len(b)], b)
conn.buf = conn.buf[:len(conn.buf)+len(b)]
}
// Read is unavailable for BufferedConn.
func (conn *BufferedConn) Read(p []byte) (int, error) {
panic("BufferedConn.Read(): unavailable, use Peek/Seek")
}
// Peek returns the first n bytes of the read buffer.
// If n is -1 then returns any available data after attempting a read.
func (conn *BufferedConn) Peek(n int, blocking bool) ([]byte, error) {
for {
// Read buffer & error from monitor under read lock.
conn.mu.RLock()
buf, err := conn.buf, conn.err
conn.mu.RUnlock()
// Return any data that exists in the buffer.
switch n {
case -1:
if len(buf) > 0 {
return buf, nil
} else if err != nil {
return nil, err
}
default:
if n <= len(buf) {
return buf[:n], nil
} else if isEOFError(err) {
return buf, io.EOF
} else if err != nil {
return buf, err
}
}
// Exit immediately if we are not blocking.
if !blocking {
return buf, err
}
// Wait for a new write or error from the monitor.
<-conn.writeNotify
}
}
// Seek moves the buffer forward a given number of bytes.
// This implementation only supports io.SeekCurrent.
func (conn *BufferedConn) Seek(offset int64, whence int) (int64, error) {
assert(whence == io.SeekCurrent)
assert(offset <= int64(len(conn.buf)))
conn.mu.Lock()
defer conn.mu.Unlock()
b := conn.buf[offset:]
conn.buf = conn.buf[:len(b)]
copy(conn.buf, b)
conn.notifySeek()
return 0, nil
}
// monitor runs in a separate goroutine and continually reads to the buffer.
func (conn *BufferedConn) monitor() {
conn.mu.RLock()
buf := make([]byte, cap(conn.buf))
conn.mu.RUnlock()
for {
// Ensure connection is not closed.
select {
case <-conn.closing:
return
default:
}
// Determine remaining space on buffer.
// If no capacity remains then wait for seek or connection close.
conn.mu.RLock()
capacity := cap(conn.buf) - len(conn.buf)
conn.mu.RUnlock()
if capacity == 0 {
select {
case <-conn.closing:
return
case <-conn.seekNotify:
continue
}
}
// Attempt to read next bytes from connection.
n, err := conn.Conn.Read(buf[:capacity])
// Append bytes to connection buffer.
if n > 0 {
conn.Append(buf[:n])
conn.notifyWrite()
}
// If an error occurred then save on connection and exit.
if err != nil && !isTemporaryError(err) {
conn.err = err
conn.notifyWrite()
return
}
}
}
func (conn *BufferedConn) notifySeek() {
select {
case conn.seekNotify <- struct{}{}:
default:
}
}
func (conn *BufferedConn) notifyWrite() {
select {
case conn.writeNotify <- struct{}{}:
default:
}
}
// isTimeoutError returns true if the error is a timeout error.
func isTimeoutError(err error) bool {
if err == nil {
return false
} else if err, ok := err.(interface{ Timeout() bool }); ok && err.Timeout() {
return true
}
return false
}
// isTemporaryError returns true if the error is a temporary error.
func isTemporaryError(err error) bool {
if err == nil {
return false
} else if err, ok := err.(interface{ Temporary() bool }); ok && err.Temporary() {
return true
}
return false
}
func isEOFError(err error) bool {
return err != nil && strings.Contains(err.Error(), "connection reset by peer")
}