diff --git a/core.go b/core.go index c2ac93c..4e41d42 100644 --- a/core.go +++ b/core.go @@ -17,7 +17,7 @@ import ( "github.com/tetratelabs/wazero/experimental/sys" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" - "github.com/gaukas/wazerofs/memfs" + "github.com/refraction-networking/water/internal/wazerofs/memfs" expsysfs "github.com/tetratelabs/wazero/experimental/sysfs" ) diff --git a/go.mod b/go.mod index e1c9596..b01766f 100644 --- a/go.mod +++ b/go.mod @@ -10,9 +10,7 @@ retract ( replace github.com/tetratelabs/wazero => github.com/refraction-networking/wazero v1.7.3-w require ( - github.com/gaukas/wazerofs v0.1.0 + github.com/blang/vfs v1.0.0 github.com/tetratelabs/wazero v1.7.3 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.2 ) - -require github.com/blang/vfs v1.0.0 // indirect diff --git a/go.sum b/go.sum index f18256d..ef2c93b 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,10 @@ github.com/blang/vfs v1.0.0 h1:AUZUgulCDzbaNjTRWEP45X7m/J10brAptZpSRKRZBZc= github.com/blang/vfs v1.0.0/go.mod h1:jjuNUc/IKcRNNWC9NUCvz4fR9PZLPIKxEygtPs/4tSI= -github.com/gaukas/wazerofs v0.1.0 h1:wIkW1bAxSnpaaVkQ5LOb1tm1BXdVap3eKjJpVWIqt2E= -github.com/gaukas/wazerofs v0.1.0/go.mod h1:+JECB9Fwt0taPqSgHckG9lmT3tcoVK+9VJozTsq9UlI= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/refraction-networking/wazero v1.7.3-w h1:Br3UuVPrKAD3pUSIlpT1+iBIYMbs8h2wS4d0ziU9Yoc= github.com/refraction-networking/wazero v1.7.3-w/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/internal/io/LICENSE b/internal/io/LICENSE new file mode 100644 index 0000000..57d3a44 --- /dev/null +++ b/internal/io/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2018, the respective contributors, as shown by the AUTHORS file. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/internal/io/README.md b/internal/io/README.md new file mode 100644 index 0000000..9913d91 --- /dev/null +++ b/internal/io/README.md @@ -0,0 +1,3 @@ +# io: Easy to use I/O object and function implementations + +Copied from `github.com/gaukas/io@v0.0.2`. diff --git a/internal/io/conn/README.md b/internal/io/conn/README.md new file mode 100644 index 0000000..1a29dba --- /dev/null +++ b/internal/io/conn/README.md @@ -0,0 +1,3 @@ +## package `io/conn` + +This package provides abstractions for connections build on top of other types. \ No newline at end of file diff --git a/internal/io/conn/channelconn.go b/internal/io/conn/channelconn.go new file mode 100644 index 0000000..e06503a --- /dev/null +++ b/internal/io/conn/channelconn.go @@ -0,0 +1,324 @@ +package conn + +import ( + "context" + "errors" + "io" + "net" + "os" + "runtime" + "sync" + "sync/atomic" + "syscall" + "time" +) + +type ChannelConn struct { + chanRX <-chan []byte // read from this channel, owned by the writing-side to this channel + chanTX chan<- []byte // write to this channel, owned by this struct + + chanClose chan struct{} // notify close event to unblock blocking read/write operations + closed atomic.Bool // true if the channel is closed + + readBuf []byte // protected by readBufMutex, accessed only from readLocked and readLockedFromBuffer + readBufMutex sync.Mutex + + pendingWrite atomic.Bool // indicates if there is an outstanding writer blocking, protecting TX channel + + nonblocking atomic.Bool +} + +func NewChannelConn(rx <-chan []byte, tx chan<- []byte) *ChannelConn { + return &ChannelConn{ + chanRX: rx, + chanTX: tx, + chanClose: make(chan struct{}), + } +} + +// ChannelConn implements [Conn]. +var _ Conn = (*ChannelConn)(nil) + +// Read reads data from the channel. Implements [net.Conn]. +func (c *ChannelConn) Read(b []byte) (n int, err error) { + if c.closed.Load() { + return 0, io.ErrClosedPipe + } + + c.readBufMutex.Lock() + defer c.readBufMutex.Unlock() + + return c.readLocked(b) +} + +// read blocks until some data is available, or the channel is closed. +func (c *ChannelConn) readLocked(b []byte) (int, error) { + if len(c.readBuf) != 0 { // need to resume reading from the buffer + return c.readLockedFromBuffer(b) + } + + if c.nonblocking.Load() { + for { + select { + case <-c.chanClose: + return 0, io.ErrClosedPipe + case c.readBuf = <-c.chanRX: + if len(c.readBuf) != 0 { // buffer is empty, read from the channel OK + return c.readLockedFromBuffer(b) + } else { // empty read from channel + if c.readBuf == nil { // closed channel + return 0, io.EOF + } else { // channel open, but empty read: other end testing if write will block + continue + } + } + default: + return 0, syscall.EAGAIN + } + } + } else { + for { + select { + case <-c.chanClose: + return 0, io.ErrClosedPipe + case c.readBuf = <-c.chanRX: + if len(c.readBuf) != 0 { // buffer is empty, read from the channel OK + return c.readLockedFromBuffer(b) + } else { // empty read from channel + if c.readBuf == nil { // closed channel + return 0, io.EOF + } else { // channel open, but empty read: other end testing if write will block + continue + } + } + } + } + } +} + +// readLockedFromBuffer reads from the buffer. Assumes the buffer is non-empty. +func (c *ChannelConn) readLockedFromBuffer(b []byte) (n int, err error) { + n = copy(b, c.readBuf) + c.readBuf = c.readBuf[n:] + return +} + +// Write writes data to the channel. Implements [net.Conn]. +func (c *ChannelConn) Write(b []byte) (n int, err error) { + if c.nonblocking.Load() { + if c.pendingWrite.CompareAndSwap(false, true) { + defer c.pendingWrite.Store(false) + n, err = c.writeFlagAcquired(b) + } else { + return 0, syscall.EAGAIN + } + } else { + // retry until acquired the pending write flag + for !c.pendingWrite.CompareAndSwap(false, true) { + runtime.Gosched() + } + defer c.pendingWrite.Store(false) + n, err = c.writeFlagAcquired(b) + } + + return +} + +// writeFlagAcquired writes data to the channel. Caller must +// acquire the pending write flag before calling this function. +func (c *ChannelConn) writeFlagAcquired(b []byte) (n int, err error) { + if c.closed.Load() { // check if the channel is closed only after acquiring the pending write flag to prevent racing condition + return 0, io.ErrClosedPipe + } + + expectedLen := len(b) + + bCopy := make([]byte, expectedLen) + if copy(bCopy, b) != expectedLen { + return 0, io.ErrUnexpectedEOF + } + + if c.nonblocking.Load() { + select { + case <-c.chanClose: + return 0, io.ErrClosedPipe + case c.chanTX <- bCopy: + return expectedLen, nil + default: + return 0, syscall.EAGAIN + } + } else { + select { + case <-c.chanClose: + return 0, io.ErrClosedPipe + case c.chanTX <- bCopy: + return expectedLen, nil + } + } +} + +func (c *ChannelConn) Close() error { + if c.closed.CompareAndSwap(false, true) { + close(c.chanClose) + + // acquire the pending write flag before closing the TX channel + for !c.pendingWrite.CompareAndSwap(false, true) { + runtime.Gosched() + } + close(c.chanTX) + c.pendingWrite.Store(false) + + return nil + } + + return io.ErrClosedPipe // double close +} + +type channelAddr struct{} + +func (channelAddr) Network() string { return "channel" } +func (channelAddr) String() string { return "channel" } + +// ChannelConn does not implement [NetworkConn]. +var _ NetworkConn = (*ChannelConn)(nil) + +// LocalAddr returns the local network address. Implements [net.Conn]. +func (*ChannelConn) LocalAddr() net.Addr { return channelAddr{} } + +// RemoteAddr returns the remote network address. Implements [net.Conn]. +func (*ChannelConn) RemoteAddr() net.Addr { return channelAddr{} } + +// ChannelConn does not implement [DeadlineConn]. However, fake implementation +// is provided such that it can be used as [net.Conn] in some cases when +// deadlines are not used. +// +// TODO: properly implement [DeadlineConn]. +var _ DeadlineConn = (*ChannelConn)(nil) + +// SetDeadline is not supported by ChannelConn. It will always return +// [os.ErrNoDeadline]. +// +// TODO: properly implement the support for deadlines. +func (*ChannelConn) SetDeadline(time.Time) error { + return os.ErrNoDeadline +} + +// SetReadDeadline is not supported by ChannelConn. It will always return +// [os.ErrNoDeadline]. +// +// TODO: properly implement the support for read deadline. +func (*ChannelConn) SetReadDeadline(time.Time) error { + return os.ErrNoDeadline +} + +// SetWriteDeadline is not supported by ChannelConn. It will always return +// [os.ErrNoDeadline]. +// +// TODO: properly implement the support for write deadline. +func (*ChannelConn) SetWriteDeadline(time.Time) error { + return os.ErrNoDeadline +} + +// ChannelConn implements [NonblockingConn]. +var _ NonblockingConn = (*ChannelConn)(nil) + +// IsNonblock returns true if the connection is in non-blocking mode. +func (c *ChannelConn) IsNonblock() bool { + return c.nonblocking.Load() +} + +// SetNonblock updates the non-blocking mode of the connection if applicable. +func (c *ChannelConn) SetNonblock(nonblocking bool) (ok bool) { + c.nonblocking.Store(nonblocking) + return true +} + +// ChannelConn implements [PollConn]. +var _ PollConn = (*ChannelConn)(nil) + +func (c *ChannelConn) PollR(ctx context.Context) (bool, error) { + if !c.nonblocking.Load() { + return false, errors.New("polling is not supported in blocking mode") + } + + if c.closed.Load() { + return false, io.EOF + } + + for !c.readBufMutex.TryLock() && ctx.Err() == nil { + runtime.Gosched() + } + + if ctx.Err() != nil { + return false, ctx.Err() + } + + defer c.readBufMutex.Unlock() + + if len(c.readBuf) != 0 { + return true, nil + } + + // We cannot check cap(c.chanRX) vs. len(c.chanRX) here because it is + // possible that messages in the buffer being empty probes sent by the + // other end to check if the write will block. Instead the universal + // reading strategy below is used. + + for { + select { + case <-c.chanClose: + return false, io.EOF + case c.readBuf = <-c.chanRX: + if len(c.readBuf) != 0 { + return true, nil + } else { + if c.readBuf == nil { + return false, io.EOF + } else { + continue + } + } + case <-ctx.Done(): + return false, ctx.Err() + } + } +} + +func (c *ChannelConn) PollW(ctx context.Context) (bool, error) { + if !c.nonblocking.Load() { + return false, errors.New("polling is not supported in blocking mode") + } + + // aquire the pending write flag before writing to the TX channel + for !c.pendingWrite.CompareAndSwap(false, true) && ctx.Err() == nil { + runtime.Gosched() + } + + if ctx.Err() != nil { + return false, ctx.Err() + } + + defer c.pendingWrite.Store(false) + + if c.closed.Load() { + return false, io.EOF + } + + // Buffered channel: + if cap(c.chanTX) > 0 { + for ctx.Err() == nil && len(c.chanTX) >= cap(c.chanTX) { + runtime.Gosched() + } + return len(c.chanTX) < cap(c.chanTX), ctx.Err() + } + + // Unbuffered channel: + select { + case <-c.chanClose: + return false, io.EOF + case c.chanTX <- []byte{}: + return true, nil + case <-ctx.Done(): + return false, ctx.Err() + } +} diff --git a/internal/io/conn/conn.go b/internal/io/conn/conn.go new file mode 100644 index 0000000..2d3c46b --- /dev/null +++ b/internal/io/conn/conn.go @@ -0,0 +1,72 @@ +package conn + +import ( + "context" + "io" + "net" + "time" +) + +// Conn is an interface that represents a connection. +type Conn interface { + io.ReadWriteCloser // Conn embeds io.ReadWriteCloser +} + +// NetworkConn is an interface that represents a network connection. +type NetworkConn interface { + Conn // NetworkConn embeds Conn + + // LocalAddr returns the local network address. + LocalAddr() net.Addr + + // RemoteAddr returns the remote network address. + RemoteAddr() net.Addr +} + +type DeadlineConn interface { + Conn // embeds Conn + + // SetDeadline sets the read and write deadlines associated with the + // connection. + SetDeadline(time.Time) error + + // SetReadDeadline sets the read deadline associated with the + // connection. + SetReadDeadline(time.Time) error + + // SetWriteDeadline sets the write deadline associated with the + // connection. + SetWriteDeadline(time.Time) error +} + +type NonblockingConn interface { + Conn // embeds Conn + + // IsNonblock returns true if the connection is in non-blocking mode. + IsNonblock() bool + + // SetNonblock updates the non-blocking mode of the connection if + // applicable. + // + // It should return true if the update was successful, and false + // otherwise. Caller to this function is not expected to retry + // even if the operation failed. + SetNonblock(bool) bool +} + +// PollConn is an interface that represents a connection that can be polled. +// +// The methods, such as PollR, PollW, and PollRW, returns true with nil error +// if the connection became readable, writable, or both before the timeout. +// If the method returns false, the error must not be nil. +// For example, [io.EOF] is expected if the connection is closed, and +// ctx.Err() is expected if the context is canceled. +type PollConn interface { + NonblockingConn // PollConn embeds NonblockingConn, given that polling on a blocking connection might not be able to respect the deadline. + + // PollR polls the connection for readability. + PollR(ctx context.Context) (bool, error) + + // PollW polls the connection for writability. + PollW(ctx context.Context) (bool, error) +} diff --git a/internal/io/pipe/README.md b/internal/io/pipe/README.md new file mode 100644 index 0000000..e6193d3 --- /dev/null +++ b/internal/io/pipe/README.md @@ -0,0 +1,3 @@ +## package `io/pipe` + +This package provides utilities for creating pipes made of interconnected pairs of connection. \ No newline at end of file diff --git a/internal/io/pipe/channel_pipe.go b/internal/io/pipe/channel_pipe.go new file mode 100644 index 0000000..4331478 --- /dev/null +++ b/internal/io/pipe/channel_pipe.go @@ -0,0 +1,44 @@ +package pipe + +import ( + "syscall" + + "github.com/refraction-networking/water/internal/io/conn" +) + +// ChannelPipe creates a pair of interconnected [conn.ChannelConn]. Data written +// to one connection will become readable from the other. +// +// Note: This function creates unbuffered channels which will block on write +// if the other side is not reading, assuming nonblocking mode is not set. +// Use [BufferedChannelPipe] instead if channel capacity > 0 is desired. +func ChannelPipe() (c1, c2 *conn.ChannelConn) { + // channels as pipes + chan1 := make(chan []byte) + chan2 := make(chan []byte) + + return conn.NewChannelConn(chan1, chan2), conn.NewChannelConn(chan2, chan1) +} + +// BufferedChannelPipe creates a pair of interconnected [conn.ChannelConn] +// with the specified buffer size. Data written to one connection will become +// readable from the other. +func BufferedChannelPipe(bufSize ...int) (c1, c2 *conn.ChannelConn, err error) { + var chan1Capacity, chan2Capaticy int + switch len(bufSize) { + case 0: // do nothing, capacity will be 0 for both channel + case 1: + chan1Capacity = bufSize[0] + case 2: + chan1Capacity = bufSize[0] + chan2Capaticy = bufSize[1] + default: + return nil, nil, syscall.EINVAL + } + + // channels as pipes + chan1 := make(chan []byte, chan1Capacity) + chan2 := make(chan []byte, chan2Capaticy) + + return conn.NewChannelConn(chan1, chan2), conn.NewChannelConn(chan2, chan1), nil +} diff --git a/internal/io/pipe/tcp_pipe.go b/internal/io/pipe/tcp_pipe.go new file mode 100644 index 0000000..daec9f9 --- /dev/null +++ b/internal/io/pipe/tcp_pipe.go @@ -0,0 +1,40 @@ +package pipe + +import ( + "fmt" + "net" +) + +// TCPPipe creates a pair of interconnected [net.TCPConn]. Data written +// to one connection will become readable from the other. +func TCPPipe(listenAddr *net.TCPAddr) (c1, c2 *net.TCPConn, err error) { + if listenAddr == nil { + listenAddr, err = net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return nil, nil, fmt.Errorf("net.ResolveTCPAddr returned error: %w", err) + } + } + + // Temporary TCPListener + l, err := net.ListenTCP("tcp", listenAddr) // skipcq: GSC-G102 + if err != nil { + return nil, nil, fmt.Errorf("net.Listen returned error: %w", err) + } + + if c1, err = net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr)); err != nil { + return nil, nil, fmt.Errorf("net.Dial returned error: %w", err) + } + + if c2, err = l.AcceptTCP(); err != nil { + return nil, nil, fmt.Errorf("(*net.TCPListener).Accept returned error: %w", err) + } + + if c1 == nil || c2 == nil { + return nil, nil, fmt.Errorf("unexpected nil connection without error") + } + + if err := l.Close(); err != nil { + return c1, c2, fmt.Errorf("l.Close() failed: %w", err) // this error is not fatal + } + return c1, c2, nil +} diff --git a/internal/io/pipe/unix_pipe.go b/internal/io/pipe/unix_pipe.go new file mode 100644 index 0000000..ae5b38b --- /dev/null +++ b/internal/io/pipe/unix_pipe.go @@ -0,0 +1,48 @@ +package pipe + +import ( + "crypto/rand" + "fmt" + "net" + "os" +) + +// UnixPipe creates a pair of interconnected [net.UnixConn]. Data written +// to one connection will become readable from the other. +func UnixPipe(listenAddr *net.UnixAddr) (c1, c2 *net.UnixConn, err error) { + if listenAddr == nil { + // randomize a socket name + randBytes := make([]byte, 16) + if _, err := rand.Read(randBytes); err != nil { + return nil, nil, fmt.Errorf("crypto/rand.Read returned error: %w", err) + } + + unixPath := os.TempDir() + string(os.PathSeparator) + fmt.Sprintf("%x", randBytes) + if listenAddr, err = net.ResolveUnixAddr("unix", unixPath); err != nil { + return nil, nil, fmt.Errorf("net.ResolveUnixAddr returned error: %w", err) + } + } + + // Temporary UnixListener + ul, err := net.ListenUnix("unix", listenAddr) + if err != nil { + return nil, nil, fmt.Errorf("net.Listen returned error: %w", err) + } + + if c1, err = net.DialUnix("unix", nil, listenAddr); err != nil { + return nil, nil, fmt.Errorf("net.Dial returned error: %w", err) + } + + if c2, err = ul.AcceptUnix(); err != nil { + return nil, nil, fmt.Errorf("(*net.UnixListener).Accept returned error: %w", err) + } + + if c1 == nil || c2 == nil { + return nil, nil, fmt.Errorf("unexpected nil connection without error") + } + + if err := ul.Close(); err != nil { + return c1, c2, fmt.Errorf("ul.Close() failed: %w", err) // this error is not fatal + } + return c1, c2, nil +} diff --git a/internal/socket/README.md b/internal/socket/README.md deleted file mode 100644 index eb910f2..0000000 --- a/internal/socket/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# `socket` - -This package provides some helper function to abuse network sockets and do weird things, including but not limited to: -- Spawning connection pairs -- Wrap a readable/writable interface into a `net.Conn` \ No newline at end of file diff --git a/internal/socket/tcpconn.go b/internal/socket/tcpconn.go deleted file mode 100644 index 185a860..0000000 --- a/internal/socket/tcpconn.go +++ /dev/null @@ -1,179 +0,0 @@ -package socket - -import ( - "context" - "fmt" - "io" - "net" - "os" - "sync" - - "github.com/refraction-networking/water/internal/log" -) - -// TCPConnPair returns a pair of connected net.TCPConn. -func TCPConnPair(address ...string) (c1, c2 *net.TCPConn, err error) { - var addr string = "localhost:0" // use a localhost TCP connection by default - if len(address) > 0 && address[0] != "" { - addr = address[0] - } - - tcpAddr, err := net.ResolveTCPAddr("tcp", addr) - if err != nil { - return nil, nil, fmt.Errorf("net.ResolveTCPAddr returned error: %w", err) - } - - l, err := net.ListenTCP("tcp", tcpAddr) // skipcq: GSC-G102 - if err != nil { - return nil, nil, fmt.Errorf("net.Listen returned error: %w", err) - } - - var wg *sync.WaitGroup = new(sync.WaitGroup) - var goroutineErr error - wg.Add(1) - go func() { - defer wg.Done() - c2, goroutineErr = l.AcceptTCP() - }() - - c1, err = net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr)) - if err != nil { - return nil, nil, fmt.Errorf("net.Dial returned error: %w", err) - } - wg.Wait() - - if goroutineErr != nil { - return nil, nil, fmt.Errorf("l.Accept returned error: %w", goroutineErr) - } - - if c1 == nil || c2 == nil { - return nil, nil, fmt.Errorf("c1 or c2 is nil") - } - - return c1, c2, l.Close() -} - -// TCPConnWrap wraps an io.Reader/io.Writer/io.Closer -// interface into a TCPConn. -// -// This function spins up goroutine(s) to copy data between the -// ReadWrite(Close)r and the TCPConn. Anything written to the -// TCPConn by caller will be written to the wrapped object if -// the object implements io.Writer, and if the object implements -// io.Reader, anything read by goroutine from the wrapped object -// will be readable from the TCPConn by caller. -// -// Once this function is invoked, the caller should not perform I/O -// operations on the wrapped connection anymore. -// -// The returned context.Context can be used to check if the connection -// is still alive. If the connection is closed, the context will be -// canceled. -func TCPConnWrap(wrapped any) (wrapperConn *net.TCPConn, ctxCancel context.Context, err error) { - // get a pair of connected TCPConn - tcpConn, reverseTCPConn, err := TCPConnPair() - if err != nil && (tcpConn == nil || reverseTCPConn == nil) { // ignore error caused by closing TCP Listener - return nil, nil, err - } - - var cancel context.CancelFunc - ctxCancel, cancel = context.WithCancel(context.Background()) - - reader, readerOk := wrapped.(io.Reader) - writer, writerOk := wrapped.(io.Writer) - var wg *sync.WaitGroup = new(sync.WaitGroup) - if !readerOk && !writerOk { - cancel() - return nil, nil, fmt.Errorf("wrapped does not implement io.Reader nor io.Writer") - } else if readerOk && !writerOk { - // only reader is implemented - log.Debugf("wrapped does not implement io.Writer, skipping copy from wrapped to wrapper") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseTCPConn, reader) // unsafe: error is ignored - _ = reverseTCPConn.Close() // unsafe: error is ignored - _ = tcpConn.Close() // unsafe: error is ignored - }(wg) - } else if !readerOk && writerOk { - // only writer is implemented - log.Debugf("wrapped does not implement io.Reader, skipping copy from wrapper to wrapped") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseTCPConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } else { - // both reader and writer are implemented - wg.Add(2) - - // copy from wrapped to wrapper - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseTCPConn, reader) // unsafe: error is ignored - _ = reverseTCPConn.Close() // unsafe: error is ignored - _ = tcpConn.Close() // unsafe: error is ignored - }(wg) - - // copy from wrapper to wrapped - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseTCPConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } - - // spawn a goroutine to wait for all copying to finish - go func(wg *sync.WaitGroup) { - wg.Wait() - cancel() - - // close again to make sure we don't forget to close anything - // if io.Reader or io.Writer is not implemented. - - // close the reverseTCPConn - _ = reverseTCPConn.Close() // unsafe: error is ignored - - // close the tcpConn - _ = tcpConn.Close() // unsafe: error is ignored - - // close the wrapped - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - - return tcpConn, ctxCancel, nil -} - -// TCPConnFileWrap wraps an object into a *os.File from an -// underlying net.TCPConn. The object must implement io.Reader -// and/or io.Writer. -// -// If the object implements io.Reader, upon completing copying -// the object to the returned *os.File, the callback functions -// will be called. -// -// It is caller's responsibility to close the returned *os.File. -func TCPConnFileWrap(wrapped any) (wrapperFile *os.File, ctxCancel context.Context, err error) { - tcpWrapperConn, ctxCancel, err := TCPConnWrap(wrapped) - if err != nil { - return nil, nil, err - } - - tcpWrapperFile, err := tcpWrapperConn.File() - if err != nil { - return nil, nil, fmt.Errorf("(*net.TCPConn).File returned error: %w", err) - } - - return tcpWrapperFile, ctxCancel, nil -} diff --git a/internal/socket/tcpconn_test.go b/internal/socket/tcpconn_test.go deleted file mode 100644 index 8f6ef93..0000000 --- a/internal/socket/tcpconn_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package socket_test - -import ( - "runtime" - "testing" - "time" - - "github.com/refraction-networking/water/internal/socket" -) - -func TestTCPConnPair(t *testing.T) { - c1, c2, err := socket.TCPConnPair() - if err != nil { - if c1 == nil || c2 == nil { - t.Fatal(err) - } else { // likely due to Close() call errored - t.Logf("socket.TCPConnPair returned non-fatal error: %v", err) - } - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c1 -> c2 - err = testIO(c1, c2, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c2 -> c1 - err = testIO(c2, c1, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } -} diff --git a/internal/socket/unixconn.go b/internal/socket/unixconn.go deleted file mode 100644 index 0c188c2..0000000 --- a/internal/socket/unixconn.go +++ /dev/null @@ -1,195 +0,0 @@ -package socket - -import ( - "context" - "crypto/rand" - "fmt" - "io" - "net" - "os" - "sync" - - "github.com/refraction-networking/water/internal/log" -) - -// UnixConnPair returns a pair of connected net.UnixConn. -func UnixConnPair(path ...string) (*net.UnixConn, *net.UnixConn, error) { - var c1, c2 net.Conn - - unixPath := "" - if len(path) == 0 || path[0] == "" { - // randomize a socket name - randBytes := make([]byte, 16) - if _, err := rand.Read(randBytes); err != nil { - return nil, nil, fmt.Errorf("crypto/rand.Read returned error: %w", err) - } - unixPath = os.TempDir() + string(os.PathSeparator) + fmt.Sprintf("%x", randBytes) - } else { - unixPath = path[0] - } - - // create a one-time use UnixListener - ul, err := net.Listen("unix", unixPath) - if err != nil { - return nil, nil, fmt.Errorf("net.Listen returned error: %w", err) - } - - var wg *sync.WaitGroup = new(sync.WaitGroup) - var goroutineErr error - wg.Add(1) - go func() { - defer wg.Done() - c2, goroutineErr = ul.Accept() - }() - - // dial the one-time use UnixListener - c1, err = net.Dial("unix", ul.Addr().String()) - if err != nil { - return nil, nil, fmt.Errorf("net.Dial returned error: %w", err) - } - wg.Wait() - - if goroutineErr != nil { - return nil, nil, fmt.Errorf("ul.Accept returned error: %w", goroutineErr) - } - - if c1 == nil || c2 == nil { - return nil, nil, fmt.Errorf("c1 or c2 is nil") - } - - // type assertion - if uc1, ok := c1.(*net.UnixConn); ok { - if uc2, ok := c2.(*net.UnixConn); ok { - return uc1, uc2, ul.Close() - } else { - return nil, nil, fmt.Errorf("c2 is not *net.UnixConn") - } - } else { - return nil, nil, fmt.Errorf("c1 is not *net.UnixConn") - } -} - -// UnixConnWrap wraps an io.Reader/io.Writer/io.Closer -// interface into a UnixConn. -// -// This function spins up goroutine(s) to copy data between the -// ReadWrite(Close)r and the UnixConn. Anything written to the -// UnixConn by caller will be written to the wrapped object if -// the object implements io.Writer, and if the object implements -// io.Reader, anything read by goroutine from the wrapped object -// will be readable from the UnixConn by caller. -// -// Once this function is invoked, the caller should not perform I/O -// operations on the wrapped connection anymore. -// -// The returned context.Context can be used to check if the connection -// is still alive. If the connection is closed, the context will be -// canceled. -func UnixConnWrap(wrapped any) (wrapperConn *net.UnixConn, ctxCancel context.Context, err error) { - // get a pair of connected UnixConn - unixConn, reverseUnixConn, err := UnixConnPair() - if err != nil && (unixConn == nil || reverseUnixConn == nil) { - return nil, nil, err - } - - var cancel context.CancelFunc - ctxCancel, cancel = context.WithCancel(context.Background()) - - reader, readerOk := wrapped.(io.Reader) - writer, writerOk := wrapped.(io.Writer) - var wg *sync.WaitGroup = new(sync.WaitGroup) - if !readerOk && !writerOk { - cancel() - return nil, nil, fmt.Errorf("wrapped does not implement io.Reader nor io.Writer") - } else if readerOk && !writerOk { - // only reader is implemented - log.Debugf("wrapped does not implement io.Writer, skipping copy from wrapped to wrapper") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseUnixConn, reader) // unsafe: error is ignored - _ = reverseUnixConn.Close() // unsafe: error is ignored - _ = unixConn.Close() // unsafe: error is ignored - }(wg) - } else if !readerOk && writerOk { - // only writer is implemented - log.Debugf("wrapped does not implement io.Reader, skipping copy from wrapper to wrapped") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseUnixConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } else { - // both reader and writer are implemented - wg.Add(2) - - // copy from wrapped to wrapper - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseUnixConn, reader) // unsafe: error is ignored - _ = reverseUnixConn.Close() // unsafe: error is ignored - _ = unixConn.Close() // unsafe: error is ignored - }(wg) - - // copy from wrapper to wrapped - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseUnixConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } - - // spawn a goroutine to wait for all copying to finish - go func(wg *sync.WaitGroup) { - wg.Wait() - cancel() - - // close again to make sure we don't forget to close anything - // if io.Reader or io.Writer is not implemented. - - // close the reverseTCPConn - _ = reverseUnixConn.Close() // unsafe: error is ignored - - // close the tcpConn - _ = unixConn.Close() // unsafe: error is ignored - - // close the wrapped - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - - return unixConn, ctxCancel, nil -} - -// UnixConnFileWrap wraps an object into a *os.File from an -// underlying net.UnixConn. The object must implement io.Reader -// and/or io.Writer. -// -// If the object implements io.Reader, upon completing copying -// the object to the returned *os.File, the callback functions -// will be called. -// -// It is caller's responsibility to close the returned *os.File. -func UnixConnFileWrap(wrapped any) (wrapperFile *os.File, ctxCancel context.Context, err error) { - unixWrapperConn, ctxCancel, err := UnixConnWrap(wrapped) - if err != nil { - return nil, nil, err - } - - unixWrapperFile, err := unixWrapperConn.File() - if err != nil { - return nil, nil, fmt.Errorf("(*net.UnixConn).File returned error: %w", err) - } - - return unixWrapperFile, ctxCancel, nil -} diff --git a/internal/socket/unixconn_test.go b/internal/socket/unixconn_test.go deleted file mode 100644 index f91570c..0000000 --- a/internal/socket/unixconn_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package socket_test - -import ( - "crypto/rand" - "fmt" - "net" - "runtime" - "testing" - "time" - - "github.com/refraction-networking/water/internal/socket" -) - -func TestUnixConnPair(t *testing.T) { - c1, c2, err := socket.UnixConnPair() - if err != nil { - if c1 == nil || c2 == nil { - t.Fatal(err) - } else { // likely due to Close() call errored - t.Logf("socket.UnixConnPair returned non-fatal error: %v", err) - } - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c1 -> c2 - err = testIO(c1, c2, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c2 -> c1 - err = testIO(c2, c1, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } -} - -func testIO(wrConn, rdConn net.Conn, N int, sz int, sleep time.Duration) error { - var sendMsg []byte = make([]byte, sz) - _, err := rand.Read(sendMsg) - if err != nil { - return fmt.Errorf("rand.Read error: %w", err) - } - - for i := 0; i < N; i++ { - _, err = wrConn.Write(sendMsg) - if err != nil { - return fmt.Errorf("Write error: %w, cntr: %d, N: %d", err, i, N) - } - - // receive data - buf := make([]byte, 1024) - _, err = rdConn.Read(buf) - if err != nil { - return fmt.Errorf("Read error: %w, cntr: %d, N: %d", err, i, N) - } - - time.Sleep(sleep) - } - - return nil -} diff --git a/internal/wazerofs/LICENSE b/internal/wazerofs/LICENSE new file mode 100644 index 0000000..b79dc49 --- /dev/null +++ b/internal/wazerofs/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Karel Bilek + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/internal/wazerofs/NOTICE b/internal/wazerofs/NOTICE new file mode 100644 index 0000000..055b807 --- /dev/null +++ b/internal/wazerofs/NOTICE @@ -0,0 +1,3 @@ +(C) 2024 Karel BĂ­lek +(C) 2020-2023 wazero authors +(c) 2018 Max Chechel \ No newline at end of file diff --git a/internal/wazerofs/README.md b/internal/wazerofs/README.md new file mode 100644 index 0000000..1d75bd0 --- /dev/null +++ b/internal/wazerofs/README.md @@ -0,0 +1,5 @@ +# wazerofs + +Copied from `github.com/gaukas/wazerofs@v0.1.0`. + +Forked from karelbilek/wazero-fs-tools for experimental purposes. diff --git a/internal/wazerofs/memfs/dir.go b/internal/wazerofs/memfs/dir.go new file mode 100644 index 0000000..6b568ea --- /dev/null +++ b/internal/wazerofs/memfs/dir.go @@ -0,0 +1,24 @@ +package memfs + +import ( + wasys "github.com/tetratelabs/wazero/sys" + + "github.com/tetratelabs/wazero/experimental/sys" + + "github.com/blang/vfs/memfs" +) + +type memoryFSDir struct { + fs *memfs.MemFS + path string + + sys.UnimplementedFile +} + +func (f *memoryFSDir) IsDir() (bool, sys.Errno) { + return true, 0 +} + +func (f *memoryFSDir) Stat() (wasys.Stat_t, sys.Errno) { + return stat(f.fs, f.path) +} diff --git a/internal/wazerofs/memfs/file.go b/internal/wazerofs/memfs/file.go new file mode 100644 index 0000000..0e9ae79 --- /dev/null +++ b/internal/wazerofs/memfs/file.go @@ -0,0 +1,80 @@ +package memfs + +import ( + "errors" + "io" + "strings" + + wasys "github.com/tetratelabs/wazero/sys" + + "github.com/tetratelabs/wazero/experimental/sys" + + "github.com/blang/vfs" + "github.com/blang/vfs/memfs" +) + +type memoryFSFile struct { + fs *memfs.MemFS + fl vfs.File + path string + + sys.UnimplementedFile +} + +func (f *memoryFSFile) Stat() (wasys.Stat_t, sys.Errno) { + return stat(f.fs, f.path) +} + +func (f *memoryFSFile) Close() sys.Errno { + err := f.fl.Close() + if err != nil { + // this will never happen + return sys.EIO + } + return 0 +} + +func (f *memoryFSFile) IsDir() (bool, sys.Errno) { + return false, 0 +} + +func (f *memoryFSFile) Read(buf []byte) (n int, errno sys.Errno) { + n, err := f.fl.Read(buf) + if err != nil { + if errors.Is(err, io.EOF) { + return n, 0 + } + // this will never happen + return 0, sys.EBADF + } + return +} + +func (f *memoryFSFile) Seek(offset int64, whence int) (newOffset int64, errno sys.Errno) { + r, err := f.fl.Seek(offset, whence) + if err != nil { + + if strings.Contains(err.Error(), "invalid whence") { + return 0, sys.EINVAL + } + if strings.Contains(err.Error(), "negative position") { + return 0, sys.EINVAL + } + if strings.Contains(err.Error(), "too far") { + // it should be POSIX EFBIG but wazero maps that to EIO + return 0, sys.EIO + } + // can never happen + return 0, sys.EINVAL + } + return r, 0 +} + +func (f *memoryFSFile) Write(buf []byte) (n int, errno sys.Errno) { + n, err := f.fl.Write(buf) + if err != nil { + // it should be POSIX EFBIG but wazero maps that to EIO + return 0, sys.EIO + } + return +} diff --git a/internal/wazerofs/memfs/fs.go b/internal/wazerofs/memfs/fs.go new file mode 100644 index 0000000..9cf245a --- /dev/null +++ b/internal/wazerofs/memfs/fs.go @@ -0,0 +1,165 @@ +// memfs implements a simple fake memory FS for Wazero. +// +// The actual implementation of the FS is from github.com/blang/vfs/memfs, +// this package is just wrapping that for Wazero. +// +// It implements only very small number of functions, because only those were needed +// for my purposes (that is, running ghostscript with WASI), as ghostscript only calls these. +// +// Feel free to make a PR if you need to implement some other functions. +package memfs + +import ( + "errors" + "io/fs" + "strings" + + wasys "github.com/tetratelabs/wazero/sys" + + "os" + + "github.com/tetratelabs/wazero/experimental/sys" + + "github.com/blang/vfs/memfs" +) + +// New creates a new memory filesystem +func New() *MemFS { + mfs := memfs.Create() + mmfs := &MemFS{fs: mfs} + return mmfs +} + +// WriteFile is a helper function that writes a content to a file. +// Errors have the same semantics as wazero errors +func (m *MemFS) WriteFile(path string, content []byte) sys.Errno { + f, err := m.OpenFile(path, sys.O_WRONLY|sys.O_CREAT, 0) + if err != 0 { + return err + } + + _, err = f.Write(content) + return err +} + +// ReadFile is a helper function that returns a content of a file. +// Errors have the same semantics as wazero errors +func (m *MemFS) ReadFile(path string) ([]byte, sys.Errno) { + f, err := m.OpenFile(path, sys.O_RDONLY, 0) + if err != 0 { + return nil, err + } + + st, errno := f.Stat() + if errno != 0 { + return nil, errno + } + + buf := make([]byte, st.Size) + _, errno = f.Read(buf) + return buf, errno +} + +// MemFS is a memory-only wazero filesystem, implementing just some basic functions. +type MemFS struct { + fs *memfs.MemFS + + sys.UnimplementedFS +} + +// toOsOpenFlag is copied from wazero codebase +func toOsOpenFlag(oflag sys.Oflag) (flag int) { + // First flags are exclusive + switch oflag & (sys.O_RDONLY | sys.O_RDWR | sys.O_WRONLY) { + case sys.O_RDONLY: + flag |= os.O_RDONLY + case sys.O_RDWR: + flag |= os.O_RDWR + case sys.O_WRONLY: + flag |= os.O_WRONLY + } + + // Run down the flags defined in the os package + if oflag&sys.O_APPEND != 0 { + flag |= os.O_APPEND + } + if oflag&sys.O_CREAT != 0 { + flag |= os.O_CREATE + } + if oflag&sys.O_EXCL != 0 { + flag |= os.O_EXCL + } + if oflag&sys.O_SYNC != 0 { + flag |= os.O_SYNC + } + if oflag&sys.O_TRUNC != 0 { + flag |= os.O_TRUNC + } + return flag +} + +// OpenFile opens a file as defined in sys.File +func (m *MemFS) OpenFile(path string, flag sys.Oflag, perm fs.FileMode) (sys.File, sys.Errno) { + f, err := m.fs.OpenFile(path, toOsOpenFlag(flag), perm) + if err != nil { + if errors.Is(err, memfs.ErrIsDirectory) { + if flag&sys.O_WRONLY == 1 || flag&sys.O_RDWR == 1 { + return nil, sys.EISDIR + } + // return directory as a different type + dir := &memoryFSDir{fs: m.fs, path: path} + return dir, 0 + } + if errors.Is(err, os.ErrNotExist) { + return nil, sys.ENOENT + } + if errors.Is(err, os.ErrExist) { + return nil, sys.EEXIST + } + return nil, sys.EINVAL // just general IO error, not that important + } + fl := &memoryFSFile{fl: f, path: path, fs: m.fs} + return fl, 0 +} + +func (m *MemFS) Mkdir(path string, perm fs.FileMode) sys.Errno { + err := m.fs.Mkdir(path, perm) + // note - this is not 100% correct, but good enough + // note - we canno "just" call stat here, as mkdir should be atomic; the file maybe doesn't exist anymore + // just return EEXIST I guess... + if err != nil { + if strings.Contains(err.Error(), "already exists") { + return sys.EEXIST + } + return sys.EINVAL + } + return 0 +} + +func (m *MemFS) Unlink(path string) sys.Errno { + err := m.fs.Remove(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return sys.ENOENT + } + return sys.EINVAL + } + return 0 +} + +func stat(mfs *memfs.MemFS, path string) (wasys.Stat_t, sys.Errno) { + fst, err := mfs.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return wasys.Stat_t{}, sys.ENOENT + + } + return wasys.Stat_t{}, sys.EIO // this should "never happen" + } + return wasys.NewStat_t(fst), 0 +} + +// Stat returns file stat as defined in sys.File +func (m *MemFS) Stat(path string) (wasys.Stat_t, sys.Errno) { + return stat(m.fs, path) +} diff --git a/transport/v0/conn.go b/transport/v0/conn.go index 167d705..e4dd8c3 100644 --- a/transport/v0/conn.go +++ b/transport/v0/conn.go @@ -11,7 +11,8 @@ import ( "github.com/refraction-networking/water" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" + + "github.com/refraction-networking/water/internal/io/pipe" ) // Conn is the first experimental version of Conn implementation. @@ -55,13 +56,12 @@ func dial(core water.Core, network, address string) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() - // wasmCallerConn, conn.uoConn, err = socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { - return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) + return nil, fmt.Errorf("water: pipe.TCPPipe returned error: %w", err) } else { // likely due to Close() call errored - log.LErrorf(core.Logger(), "water: socket.TCPConnPair returned error: %v", err) + log.LErrorf(core.Logger(), "water: pipe.TCPPipe returned error: %v", err) } } conn.callerConn = callerConn @@ -99,15 +99,15 @@ func accept(core water.Core) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { - return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) + return nil, fmt.Errorf("water: pipe.TCPPipe returned error: %w", err) } else { // likely due to Close() call errored - log.LErrorf(core.Logger(), "water: socket.TCPConnPair returned error: %v", err) + log.LErrorf(core.Logger(), "water: pipe.TCPPipe returned error: %v", err) } } else if reverseCallerConn == nil || callerConn == nil { - return nil, errors.New("water: socket.TCPConnPair returned nil") + return nil, errors.New("water: pipe.TCPPipe returned nil") } conn.callerConn = callerConn diff --git a/transport/v0/transport_module.go b/transport/v0/transport_module.go index a9546a7..91d65bc 100644 --- a/transport/v0/transport_module.go +++ b/transport/v0/transport_module.go @@ -9,8 +9,8 @@ import ( "syscall" "github.com/refraction-networking/water" + "github.com/refraction-networking/water/internal/io/pipe" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" "github.com/refraction-networking/water/internal/wasip1" "github.com/tetratelabs/wazero/api" ) @@ -628,7 +628,7 @@ func (tm *TransportModule) Worker() error { } // create cancel pipe - cancelConnR, cancelConnW, err := socket.TCPConnPair() + cancelConnR, cancelConnW, err := pipe.TCPPipe(nil) if err != nil { return fmt.Errorf("water: creating cancel pipe failed: %w", err) } diff --git a/transport/v1/conn.go b/transport/v1/conn.go index 3cbe210..9189c45 100644 --- a/transport/v1/conn.go +++ b/transport/v1/conn.go @@ -10,8 +10,8 @@ import ( "time" "github.com/refraction-networking/water" + "github.com/refraction-networking/water/internal/io/pipe" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" ) // Conn is the first experimental version of Conn implementation. @@ -57,8 +57,7 @@ func dialFixed(core water.Core) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() - // wasmCallerConn, conn.uoConn, err = socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) @@ -111,13 +110,12 @@ func dial(core water.Core, network, address string) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() - // wasmCallerConn, conn.uoConn, err = socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { - return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) + return nil, fmt.Errorf("water: pipe.TCPPipe returned error: %w", err) } else { // likely due to Close() call errored - log.LErrorf(core.Logger(), "water: socket.TCPConnPair returned error: %v", err) + log.LErrorf(core.Logger(), "water: pipe.TCPPipe returned error: %v", err) } } conn.callerConn = callerConn @@ -155,7 +153,7 @@ func accept(core water.Core) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) diff --git a/transport/v1/transport_module.go b/transport/v1/transport_module.go index 738390d..e8352cf 100644 --- a/transport/v1/transport_module.go +++ b/transport/v1/transport_module.go @@ -11,8 +11,8 @@ import ( "time" "github.com/refraction-networking/water" + "github.com/refraction-networking/water/internal/io/pipe" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" "github.com/refraction-networking/water/internal/wasip1" "github.com/tetratelabs/wazero/api" ) @@ -773,7 +773,7 @@ func (tm *TransportModule) StartWorker() error { } // create control pipe connection pair - ctrlConnR, ctrlConnW, err := socket.TCPConnPair() + ctrlConnR, ctrlConnW, err := pipe.TCPPipe(nil) if err != nil { return fmt.Errorf("water: creating cancel pipe failed: %w", err) }