Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: upgrade to final WATM v1 API #76

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"

"github.com/gaukas/wazerofs/memfs"
"github.com/refraction-networking/water/internal/wazerofs/memfs"
expsysfs "github.com/tetratelabs/wazero/experimental/sysfs"
)

Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ retract (
replace github.com/tetratelabs/wazero => github.com/refraction-networking/wazero v1.7.3-w

require (
github.com/gaukas/wazerofs v0.1.0
github.com/blang/vfs v1.0.0
github.com/tetratelabs/wazero v1.7.3
google.golang.org/protobuf v1.33.0
google.golang.org/protobuf v1.34.2
)

require github.com/blang/vfs v1.0.0 // indirect
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
github.com/blang/vfs v1.0.0 h1:AUZUgulCDzbaNjTRWEP45X7m/J10brAptZpSRKRZBZc=
github.com/blang/vfs v1.0.0/go.mod h1:jjuNUc/IKcRNNWC9NUCvz4fR9PZLPIKxEygtPs/4tSI=
github.com/gaukas/wazerofs v0.1.0 h1:wIkW1bAxSnpaaVkQ5LOb1tm1BXdVap3eKjJpVWIqt2E=
github.com/gaukas/wazerofs v0.1.0/go.mod h1:+JECB9Fwt0taPqSgHckG9lmT3tcoVK+9VJozTsq9UlI=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/refraction-networking/wazero v1.7.3-w h1:Br3UuVPrKAD3pUSIlpT1+iBIYMbs8h2wS4d0ziU9Yoc=
github.com/refraction-networking/wazero v1.7.3-w/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
29 changes: 29 additions & 0 deletions internal/io/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
BSD 3-Clause License

Copyright (c) 2018, the respective contributors, as shown by the AUTHORS file.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3 changes: 3 additions & 0 deletions internal/io/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# io: Easy to use I/O object and function implementations

Copied from `github.com/gaukas/[email protected]`.
3 changes: 3 additions & 0 deletions internal/io/conn/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## package `io/conn`

This package provides abstractions for connections build on top of other types.
324 changes: 324 additions & 0 deletions internal/io/conn/channelconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
package conn

import (
"context"
"errors"
"io"
"net"
"os"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
)

type ChannelConn struct {
chanRX <-chan []byte // read from this channel, owned by the writing-side to this channel
chanTX chan<- []byte // write to this channel, owned by this struct

chanClose chan struct{} // notify close event to unblock blocking read/write operations
closed atomic.Bool // true if the channel is closed

readBuf []byte // protected by readBufMutex, accessed only from readLocked and readLockedFromBuffer
readBufMutex sync.Mutex

pendingWrite atomic.Bool // indicates if there is an outstanding writer blocking, protecting TX channel

nonblocking atomic.Bool
}

func NewChannelConn(rx <-chan []byte, tx chan<- []byte) *ChannelConn {
return &ChannelConn{
chanRX: rx,
chanTX: tx,
chanClose: make(chan struct{}),
}
}

// ChannelConn implements [Conn].
var _ Conn = (*ChannelConn)(nil)

// Read reads data from the channel. Implements [net.Conn].
func (c *ChannelConn) Read(b []byte) (n int, err error) {
if c.closed.Load() {
return 0, io.ErrClosedPipe
}

c.readBufMutex.Lock()
defer c.readBufMutex.Unlock()

return c.readLocked(b)
}

// read blocks until some data is available, or the channel is closed.
func (c *ChannelConn) readLocked(b []byte) (int, error) {
if len(c.readBuf) != 0 { // need to resume reading from the buffer
return c.readLockedFromBuffer(b)
}

if c.nonblocking.Load() {
for {
select {
case <-c.chanClose:
return 0, io.ErrClosedPipe
case c.readBuf = <-c.chanRX:
if len(c.readBuf) != 0 { // buffer is empty, read from the channel OK
return c.readLockedFromBuffer(b)
} else { // empty read from channel
if c.readBuf == nil { // closed channel
return 0, io.EOF
} else { // channel open, but empty read: other end testing if write will block
continue
}
}
default:
return 0, syscall.EAGAIN
}
}
} else {
for {
select {
case <-c.chanClose:
return 0, io.ErrClosedPipe
case c.readBuf = <-c.chanRX:
if len(c.readBuf) != 0 { // buffer is empty, read from the channel OK
return c.readLockedFromBuffer(b)
} else { // empty read from channel
if c.readBuf == nil { // closed channel
return 0, io.EOF
} else { // channel open, but empty read: other end testing if write will block
continue
}
}
}
}
}
}

// readLockedFromBuffer reads from the buffer. Assumes the buffer is non-empty.
func (c *ChannelConn) readLockedFromBuffer(b []byte) (n int, err error) {
n = copy(b, c.readBuf)
c.readBuf = c.readBuf[n:]
return
}

// Write writes data to the channel. Implements [net.Conn].
func (c *ChannelConn) Write(b []byte) (n int, err error) {
if c.nonblocking.Load() {
if c.pendingWrite.CompareAndSwap(false, true) {
defer c.pendingWrite.Store(false)
n, err = c.writeFlagAcquired(b)
} else {
return 0, syscall.EAGAIN
}
} else {
// retry until acquired the pending write flag
for !c.pendingWrite.CompareAndSwap(false, true) {
runtime.Gosched()
}
defer c.pendingWrite.Store(false)
n, err = c.writeFlagAcquired(b)
}

return
}

// writeFlagAcquired writes data to the channel. Caller must
// acquire the pending write flag before calling this function.
func (c *ChannelConn) writeFlagAcquired(b []byte) (n int, err error) {
if c.closed.Load() { // check if the channel is closed only after acquiring the pending write flag to prevent racing condition
return 0, io.ErrClosedPipe
}

expectedLen := len(b)

bCopy := make([]byte, expectedLen)
if copy(bCopy, b) != expectedLen {
return 0, io.ErrUnexpectedEOF
}

if c.nonblocking.Load() {
select {
case <-c.chanClose:
return 0, io.ErrClosedPipe
case c.chanTX <- bCopy:
return expectedLen, nil
default:
return 0, syscall.EAGAIN
}
} else {
select {
case <-c.chanClose:
return 0, io.ErrClosedPipe
case c.chanTX <- bCopy:
return expectedLen, nil
}
}
}

func (c *ChannelConn) Close() error {
if c.closed.CompareAndSwap(false, true) {
close(c.chanClose)

// acquire the pending write flag before closing the TX channel
for !c.pendingWrite.CompareAndSwap(false, true) {
runtime.Gosched()
}
close(c.chanTX)
c.pendingWrite.Store(false)

return nil
}

return io.ErrClosedPipe // double close
}

type channelAddr struct{}

func (channelAddr) Network() string { return "channel" }
func (channelAddr) String() string { return "channel" }

// ChannelConn does not implement [NetworkConn].
var _ NetworkConn = (*ChannelConn)(nil)

// LocalAddr returns the local network address. Implements [net.Conn].
func (*ChannelConn) LocalAddr() net.Addr { return channelAddr{} }

// RemoteAddr returns the remote network address. Implements [net.Conn].
func (*ChannelConn) RemoteAddr() net.Addr { return channelAddr{} }

// ChannelConn does not implement [DeadlineConn]. However, fake implementation
// is provided such that it can be used as [net.Conn] in some cases when
// deadlines are not used.
//
// TODO: properly implement [DeadlineConn].
var _ DeadlineConn = (*ChannelConn)(nil)

// SetDeadline is not supported by ChannelConn. It will always return
// [os.ErrNoDeadline].
//
// TODO: properly implement the support for deadlines.
func (*ChannelConn) SetDeadline(time.Time) error {
return os.ErrNoDeadline
}

// SetReadDeadline is not supported by ChannelConn. It will always return
// [os.ErrNoDeadline].
//
// TODO: properly implement the support for read deadline.
func (*ChannelConn) SetReadDeadline(time.Time) error {
return os.ErrNoDeadline
}

// SetWriteDeadline is not supported by ChannelConn. It will always return
// [os.ErrNoDeadline].
//
// TODO: properly implement the support for write deadline.
func (*ChannelConn) SetWriteDeadline(time.Time) error {
return os.ErrNoDeadline
}

// ChannelConn implements [NonblockingConn].
var _ NonblockingConn = (*ChannelConn)(nil)

// IsNonblock returns true if the connection is in non-blocking mode.
func (c *ChannelConn) IsNonblock() bool {
return c.nonblocking.Load()
}

// SetNonblock updates the non-blocking mode of the connection if applicable.
func (c *ChannelConn) SetNonblock(nonblocking bool) (ok bool) {
c.nonblocking.Store(nonblocking)
return true
}

// ChannelConn implements [PollConn].
var _ PollConn = (*ChannelConn)(nil)

func (c *ChannelConn) PollR(ctx context.Context) (bool, error) {
if !c.nonblocking.Load() {
return false, errors.New("polling is not supported in blocking mode")
}

if c.closed.Load() {
return false, io.EOF
}

for !c.readBufMutex.TryLock() && ctx.Err() == nil {
runtime.Gosched()
}

if ctx.Err() != nil {
return false, ctx.Err()
}

defer c.readBufMutex.Unlock()

if len(c.readBuf) != 0 {
return true, nil
}

// We cannot check cap(c.chanRX) vs. len(c.chanRX) here because it is
// possible that messages in the buffer being empty probes sent by the
// other end to check if the write will block. Instead the universal
// reading strategy below is used.

for {
select {
case <-c.chanClose:
return false, io.EOF
case c.readBuf = <-c.chanRX:
if len(c.readBuf) != 0 {
return true, nil
} else {
if c.readBuf == nil {
return false, io.EOF
} else {
continue
}
}
case <-ctx.Done():
return false, ctx.Err()
}
}
}

func (c *ChannelConn) PollW(ctx context.Context) (bool, error) {
if !c.nonblocking.Load() {
return false, errors.New("polling is not supported in blocking mode")
}

// aquire the pending write flag before writing to the TX channel
for !c.pendingWrite.CompareAndSwap(false, true) && ctx.Err() == nil {
runtime.Gosched()
}

if ctx.Err() != nil {
return false, ctx.Err()
}

defer c.pendingWrite.Store(false)

if c.closed.Load() {
return false, io.EOF
}

// Buffered channel:
if cap(c.chanTX) > 0 {
for ctx.Err() == nil && len(c.chanTX) >= cap(c.chanTX) {
runtime.Gosched()
}
return len(c.chanTX) < cap(c.chanTX), ctx.Err()
}

// Unbuffered channel:
select {
case <-c.chanClose:
return false, io.EOF
case c.chanTX <- []byte{}:
return true, nil
case <-ctx.Done():
return false, ctx.Err()
}
}
Loading