-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Psiphon clientlib #251
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,27 +16,29 @@ package psiphon | |
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net" | ||
"runtime" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"unicode" | ||
|
||
"github.com/Jigsaw-Code/outline-sdk/transport" | ||
"github.com/Psiphon-Labs/psiphon-tunnel-core/ClientLibrary/clientlib" | ||
psi "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon" | ||
psi "github.com/Psiphon-Labs/psiphon-tunnel-core/ClientLibrary/clientlib" | ||
) | ||
|
||
// The single [Dialer] we can have. | ||
var singletonDialer = Dialer{ | ||
setNoticeWriter: psi.SetNoticeWriter, | ||
startTunnel: startTunnel, | ||
tunnelDial: tunnelDial, | ||
} | ||
|
||
var ( | ||
errNotStartedDial = errors.New("dialer has not been started yet") | ||
errNotStartedStop = errors.New("tried to stop dialer that is not running") | ||
errTunnelTimeout = errors.New("tunnel establishment timed out") | ||
errNotStartedDial = errors.New("dialer has not been started yet") | ||
errNotStartedStop = errors.New("tried to stop dialer that is not running") | ||
errTunnelTimeout = errors.New("tunnel establishment timed out") | ||
errTunnelAlreadyStarted = errors.New("tunnel already started") | ||
) | ||
|
||
// DialerConfig specifies the parameters for [Dialer]. | ||
|
@@ -48,7 +50,7 @@ type DialerConfig struct { | |
DataRootDirectory string | ||
|
||
// Raw JSON config provided by Psiphon. | ||
ProviderConfig json.RawMessage | ||
ProviderConfig []byte | ||
} | ||
|
||
// Dialer is a [transport.StreamDialer] that uses Psiphon to connect to a destination. | ||
|
@@ -59,184 +61,155 @@ type DialerConfig struct { | |
// called before you can start it again with a new configuration. Dialer.Stop should be called | ||
// when you no longer need the Dialer in order to release resources. | ||
type Dialer struct { | ||
// The Psiphon tunnel. It is nil until the tunnel is started. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a lot of state tracking here, and I had a hard time trying to assert the correctness of this code. The mix of Mutex and atomics also complicates analysis. I believe I got a simpler solution, with the entire tunnel lifecycle in one goroutine. Also, for tests, we can replace a global startTunnel. No need to make it part of the Dialer. |
||
tunnel atomic.Pointer[psi.PsiphonTunnel] | ||
// It is (and must be) okay for this function to be called multiple times concurrently | ||
// or in series. | ||
stop atomic.Pointer[func() error] | ||
|
||
// Controls the Dialer state and Psiphon's global state. | ||
mu sync.Mutex | ||
// Used by DialStream. | ||
controller *psi.Controller | ||
// Used by Stop. | ||
stop func() | ||
// Allows for overriding the global notice writer for testing. | ||
setNoticeWriter func(io.Writer) | ||
// Allows tests to override the tunnel creation. | ||
startTunnel func(ctx context.Context, config *DialerConfig) (*psi.PsiphonTunnel, error) | ||
// Allows tests to override the tunnel dialing. | ||
tunnelDial func(tunnel *psi.PsiphonTunnel, addr string) (net.Conn, error) | ||
} | ||
|
||
// Ensure that [Dialer] implements [transport.StreamDialer]. | ||
var _ transport.StreamDialer = (*Dialer)(nil) | ||
|
||
func tunnelDial(tunnel *psi.PsiphonTunnel, addr string) (net.Conn, error) { | ||
return tunnel.Dial(addr) | ||
} | ||
|
||
// DialStream implements [transport.StreamDialer]. | ||
// The context is not used because Psiphon's implementation doesn't support it. If you need cancellation, | ||
// you will need to add it independently. | ||
func (d *Dialer) DialStream(unusedContext context.Context, addr string) (transport.StreamConn, error) { | ||
d.mu.Lock() | ||
controller := d.controller | ||
d.mu.Unlock() | ||
if controller == nil { | ||
tunnel := d.tunnel.Load() | ||
if tunnel == nil { | ||
return nil, errNotStartedDial | ||
} | ||
netConn, err := controller.Dial(addr, nil) | ||
netConn, err := d.tunnelDial(tunnel, addr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return streamConn{netConn}, nil | ||
} | ||
|
||
func newPsiphonConfig(config *DialerConfig) (*psi.Config, error) { | ||
if config == nil { | ||
return nil, errors.New("config must not be nil") | ||
} | ||
// Validate keys. We parse as a map first because we need to check for the existence | ||
// of certain keys. | ||
var configMap map[string]interface{} | ||
if err := json.Unmarshal(config.ProviderConfig, &configMap); err != nil { | ||
return nil, fmt.Errorf("failed to parse config: %w", err) | ||
} | ||
for key, value := range configMap { | ||
switch key { | ||
case "DisableLocalHTTPProxy", "DisableLocalSocksProxy": | ||
b, ok := value.(bool) | ||
if !ok { | ||
return nil, fmt.Errorf("field %v must be a boolean", key) | ||
} | ||
if b != true { | ||
return nil, fmt.Errorf("field %v must be true if set", key) | ||
} | ||
case "DataRootDirectory": | ||
return nil, errors.New("field DataRootDirectory must not be set in the provider config. Specify it in the DialerConfig instead.") | ||
} | ||
func startTunnel(ctx context.Context, config *DialerConfig) (*psi.PsiphonTunnel, error) { | ||
// Note that these parameters override anything in the provider config. | ||
clientPlatformAllowChars := func(r rune) bool { | ||
return !unicode.IsSpace(r) && r != '_' | ||
} | ||
|
||
// Parse provider config. | ||
pConfig, err := psi.LoadConfig(config.ProviderConfig) | ||
if err != nil { | ||
return nil, fmt.Errorf("config load failed: %w", err) | ||
goos := strings.Join(strings.FieldsFunc(runtime.GOOS, clientPlatformAllowChars), "-") | ||
goarch := strings.Join(strings.FieldsFunc(runtime.GOARCH, clientPlatformAllowChars), "-") | ||
clientPlatform := "outline-sdk_" + goos + "_" + goarch | ||
trueValue := true | ||
|
||
params := psi.Parameters{ | ||
DataRootDirectory: &config.DataRootDirectory, | ||
ClientPlatform: &clientPlatform, | ||
// Disable Psiphon's local proxy servers, which we don't use. | ||
DisableLocalSocksProxy: &trueValue, | ||
DisableLocalHTTPProxy: &trueValue, | ||
} | ||
|
||
// Force some Psiphon config defaults for the Outline SDK case. | ||
pConfig.DisableLocalHTTPProxy = true | ||
pConfig.DisableLocalSocksProxy = true | ||
pConfig.DataRootDirectory = config.DataRootDirectory | ||
|
||
return pConfig, nil | ||
return psi.StartTunnel(ctx, config.ProviderConfig, "", params, nil, nil) | ||
} | ||
|
||
// Start configures and runs the Dialer. It must be called before you can use the Dialer. It returns when the tunnel is ready. | ||
// Start configures and runs the Dialer. It must be called before you can use the Dialer. It returns when the tunnel is ready for use. It is safe to call concurrently with itself and other methods. | ||
func (d *Dialer) Start(ctx context.Context, config *DialerConfig) error { | ||
pConfig, err := newPsiphonConfig(config) | ||
if err != nil { | ||
return err | ||
if config == nil { | ||
return errors.New("config must not be nil") | ||
} | ||
|
||
// Will receive a value if an error occurs during the connection sequence. | ||
// It will be closed on succesful connection. | ||
errCh := make(chan error) | ||
|
||
// Start returns either when a tunnel is ready, or an error happens, whichever comes first. | ||
// When emitting the errors, we use a select statement to ensure the channel is being listened | ||
// on, to avoid a deadlock after the initial error. | ||
go func() { | ||
onTunnel := func() { | ||
select { | ||
case errCh <- nil: | ||
default: | ||
} | ||
} | ||
err := d.runController(ctx, pConfig, onTunnel) | ||
select { | ||
case errCh <- err: | ||
default: | ||
} | ||
}() | ||
|
||
// Wait for an active tunnel or error | ||
return <-errCh | ||
} | ||
|
||
func (d *Dialer) runController(ctx context.Context, pConfig *psi.Config, onTunnel func()) error { | ||
// The mutex is locked only in this method, and is used to prevent concurrent calls to | ||
// Start from returning before the tunnel is started (or failed). | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
if d.stop != nil { | ||
return errors.New("tried to start dialer that is alread running") | ||
|
||
if d.tunnel.Load() != nil { | ||
return errTunnelAlreadyStarted | ||
} | ||
ctx, cancel := context.WithCancelCause(ctx) | ||
defer cancel(context.Canceled) | ||
controllerDone := make(chan struct{}) | ||
defer close(controllerDone) | ||
d.stop = func() { | ||
// Tell controller to stop. | ||
cancel(context.Canceled) | ||
// Wait for controller to return. | ||
<-controllerDone | ||
// This function is the only place where d.tunnel gets set to a non-nil value, and we're inside a | ||
// locked mutex, so we can be sure that it will remain nil until we set it below. | ||
|
||
startDoneSignal := make(chan struct{}) | ||
defer close(startDoneSignal) | ||
cancelCtx, cancel := context.WithCancel(ctx) | ||
|
||
// The stop function is not called from within a mutex. | ||
// It must be safe to call concurrently and multiple times. | ||
stop := func() error { | ||
// The fact that this stop function exists means that we are either in the process of | ||
// starting the tunnel or have already started it. | ||
|
||
// Cancelling the context will cause the tunnel to stop or stop connecting. | ||
cancel() | ||
|
||
// Wait for Start to return (and note that it may return success or error at this point). | ||
<-startDoneSignal | ||
|
||
// Swap the stop function to nil to indicate that the tunnel is stopped or certainly will be. | ||
tunnel := d.tunnel.Swap(nil) | ||
if tunnel == nil { | ||
// We were connecting, but not yet connected; we interrupted the connection | ||
// sequence by canceling the context. There is no further cleanup to do. | ||
return nil | ||
} | ||
|
||
// This will block until the tunnel is stopped. | ||
tunnel.Stop() | ||
return nil | ||
} | ||
|
||
// Set up NoticeWriter to receive events. | ||
d.setNoticeWriter(psi.NewNoticeReceiver( | ||
func(notice []byte) { | ||
var event clientlib.NoticeEvent | ||
err := json.Unmarshal(notice, &event) | ||
if err != nil { | ||
// This is unexpected and probably indicates something fatal has occurred. | ||
// We'll interpret it as a connection error and abort. | ||
cancel(fmt.Errorf("failed to unmarshal notice JSON: %w", err)) | ||
return | ||
} | ||
switch event.Type { | ||
case "EstablishTunnelTimeout": | ||
cancel(errTunnelTimeout) | ||
case "Tunnels": | ||
count := event.Data["count"].(float64) | ||
if count > 0 { | ||
onTunnel() | ||
} | ||
} | ||
})) | ||
defer psi.SetNoticeWriter(io.Discard) | ||
// Note that if Stop is called between the beginning of Start and this line, it won't actually stop the tunnel. | ||
// This is an acceptable limitation and is very unlikely to occur, as there is no significant processing | ||
// before this point. (And if you're calling Stop that quickly and concurrently, then you're rolling the dice with whether Start has even begun.) | ||
d.stop.Store(&stop) | ||
|
||
err := pConfig.Commit(true) | ||
if err != nil { | ||
return fmt.Errorf("failed to commit config: %w", err) | ||
} | ||
// StartTunnel returns when a tunnel is established or an error occurs. | ||
tunnel, err := d.startTunnel(cancelCtx, config) | ||
|
||
err = psi.OpenDataStore(&psi.Config{DataRootDirectory: pConfig.DataRootDirectory}) | ||
if err != nil { | ||
return fmt.Errorf("failed to open data store: %w", err) | ||
} | ||
defer psi.CloseDataStore() | ||
// There are some specific error values that we want to return. | ||
if err == psi.ErrTimeout { | ||
// This can occur either because there was a timeout set in the tunnel config | ||
// or because the context deadline was exceeded. | ||
err = errTunnelTimeout | ||
if ctx.Err() == context.DeadlineExceeded { | ||
err = context.DeadlineExceeded | ||
} | ||
} else if ctx.Err() == context.Canceled { | ||
err = context.Canceled | ||
} | ||
|
||
controller, err := psi.NewController(pConfig) | ||
if err != nil { | ||
return fmt.Errorf("failed to create Controller: %w", err) | ||
// Ensure this is below the ctx.Err() checks above. | ||
cancel() | ||
// Canceling the context is the only cleanup to be done on error (which implies no tunnel), so clear the stop function. | ||
d.stop.Store(nil) | ||
|
||
return err | ||
} | ||
d.controller = controller | ||
d.mu.Unlock() | ||
|
||
controller.Run(ctx) | ||
// We have a good tunnel and it's time to make it available. | ||
d.tunnel.Store(tunnel) | ||
|
||
d.mu.Lock() | ||
d.controller = nil | ||
d.stop = nil | ||
return context.Cause(ctx) | ||
return nil | ||
} | ||
|
||
// Stop stops the Dialer background processes, releasing resources and allowing it to be reconfigured. | ||
// It returns when the Dialer is completely stopped. | ||
func (d *Dialer) Stop() error { | ||
d.mu.Lock() | ||
stop := d.stop | ||
d.stop = nil | ||
d.mu.Unlock() | ||
// The stop function should only be called once, so swap it to nil as we get it. | ||
stop := d.stop.Swap(nil) | ||
if stop == nil { | ||
return errNotStartedStop | ||
} | ||
stop() | ||
return nil | ||
// Note that stop is not being called within a mutex. It must not be, so that it can execute | ||
// during the Start method, which is entirely mutexed. | ||
return (*stop)() | ||
} | ||
|
||
// GetSingletonDialer returns the single Psiphon dialer instance. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be here. I think you need
go mod tidy
.