diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index fefc46e..e3a5f27 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -45,4 +45,4 @@ jobs: dart analyze test/ - name: Run tests - run: dart run test + run: dart run test --concurrency=1 diff --git a/lib/phoenix_socket.dart b/lib/phoenix_socket.dart index 74ca15c..92ad68e 100644 --- a/lib/phoenix_socket.dart +++ b/lib/phoenix_socket.dart @@ -23,6 +23,5 @@ export 'src/message.dart'; export 'src/message_serializer.dart'; export 'src/presence.dart'; export 'src/push.dart'; -export 'src/raw_socket.dart'; export 'src/socket.dart'; export 'src/socket_options.dart'; diff --git a/lib/src/channel.dart b/lib/src/channel.dart index f280d68..a7a88d5 100644 --- a/lib/src/channel.dart +++ b/lib/src/channel.dart @@ -95,8 +95,7 @@ class PhoenixChannel { PhoenixChannelState get state => _state; /// Whether the channel can send messages. - bool get canPush => - socket.isConnected && _state == PhoenixChannelState.joined; + bool get canPush => socket.isOpen && _state == PhoenixChannelState.joined; String? _loggerName; @@ -180,7 +179,7 @@ class PhoenixChannel { _joinPush.reset(); } - if (socket.isConnected) { + if (socket.isOpen) { _startRejoinTimer(); } } @@ -201,7 +200,7 @@ class PhoenixChannel { timeout: timeout ?? _timeout, ); - if (!socket.isConnected || prevState != PhoenixChannelState.joined) { + if (!socket.isOpen || prevState != PhoenixChannelState.joined) { currentLeavePush.trigger(PushResponse(status: 'ok')); } else { void onClose(PushResponse reply) { @@ -227,7 +226,7 @@ class PhoenixChannel { } _joinedOnce = true; - if (socket.isConnected) { + if (socket.isOpen) { _attemptJoin(); } else { _state = PhoenixChannelState.errored; @@ -333,7 +332,7 @@ class PhoenixChannel { ..onReply('error', (response) { _logger.warning('Join message got error response: $response'); _state = PhoenixChannelState.errored; - if (socket.isConnected) { + if (socket.isOpen) { _startRejoinTimer(); } }) @@ -349,7 +348,7 @@ class PhoenixChannel { _state = PhoenixChannelState.errored; _joinPush.reset(); - if (socket.isConnected) { + if (socket.isOpen) { _startRejoinTimer(); } }); @@ -358,7 +357,7 @@ class PhoenixChannel { void _startRejoinTimer() { _rejoinTimer?.cancel(); _rejoinTimer = Timer(_timeout, () { - if (socket.isConnected) _attemptJoin(); + if (socket.isOpen) _attemptJoin(); }); } @@ -390,7 +389,7 @@ class PhoenixChannel { _joinPush.reset(); } _state = PhoenixChannelState.errored; - if (socket.isConnected) { + if (socket.isOpen) { _rejoinTimer?.cancel(); _startRejoinTimer(); } diff --git a/lib/src/delayed_callback.dart b/lib/src/delayed_callback.dart new file mode 100644 index 0000000..fee88f8 --- /dev/null +++ b/lib/src/delayed_callback.dart @@ -0,0 +1,93 @@ +import 'dart:async'; +import 'dart:math'; + +final _random = Random(); + +/// Like [Future.delayed], but allows some control of the delay before the +/// callback execution. +final class DelayedCallback { + /// Executes the provided [callback] after [delay] elapses, unless aborted + /// during the delay. + DelayedCallback({ + required Duration delay, + required Future Function() callback, + }) : _callback = callback { + _delayCompleter.future + .then((_) => _runCallback()) + .catchError(_callbackCompleter.completeError); + + _delayTimer = Timer(delay, _delayCompleter.complete); + } + + // JS only supports values up to 1 << 31. + final int _id = _random.nextInt(1 << 31); + late final idAsString = _id.toRadixString(16).padLeft(8, '0'); + + late Timer _delayTimer; + final _delayCompleter = Completer(); + + final Future Function() _callback; + bool _callbackRan = false; + final _callbackCompleter = Completer(); + + /// Returns a future that is completed with result of callback execution. + /// + /// If the callback throws, then this future completes with the thrown error. + /// + /// If the callback gets aborted before execution, the future is completed + /// with a custom error. + late final Future callbackFuture = _callbackCompleter.future; + + void _runCallback() { + if (!_callbackRan) { + _callbackRan = true; + // This way the _callbackCompleter stays uncompleted until callback ends. + _callback() + .then(_callbackCompleter.complete) + .catchError(_callbackCompleter.completeError); + } + } + + /// Whether the delay has expired. Does not guarantee that the callback was + /// executed. + bool get delayDone => !_delayTimer.isActive; + + /// Immediately skips delay and executes callback. Has no effect if the delay + /// has expired already, or if the callback was aborted. + void skipDelay() { + if (_delayTimer.isActive) { + _delayTimer.cancel(); + } + if (!_delayCompleter.isCompleted) { + _delayCompleter.complete(); + } + } + + /// Aborts attempt at calling the callback. The [callbackFuture] is going to + /// be completed with an error. Has no effect if the delay has expired + /// already. + void abort() { + if (_delayTimer.isActive) { + _delayTimer.cancel(); + } + if (!_delayCompleter.isCompleted) { + _delayCompleter.completeError( + 'Connection attempt $idAsString aborted.', + StackTrace.current, + ); + } + } + + @override + bool operator ==(Object other) { + return other is DelayedCallback && _id == other._id; + } + + @override + int get hashCode => _id.hashCode; + + @override + String toString() { + return 'SocketConnectionAttempt(id: $idAsString)'; + } +} diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart index 8b7c1bd..59c1d93 100644 --- a/lib/src/exceptions.dart +++ b/lib/src/exceptions.dart @@ -19,7 +19,7 @@ class PhoenixException implements Exception { final PhoenixSocketCloseEvent? socketClosed; /// The associated channel event. - final String? channelEvent; + final Object? channelEvent; /// The error message for this exception. Message? get message { @@ -36,7 +36,8 @@ class PhoenixException implements Exception { if (socketError != null) { return socketError!.error.toString(); } else { - return 'PhoenixException: socket closed'; + final suffix = channelEvent == null ? '' : ' with event $channelEvent'; + return 'PhoenixException: socket closed$suffix'; } } } diff --git a/lib/src/raw_socket.dart b/lib/src/raw_socket.dart deleted file mode 100644 index 163d17e..0000000 --- a/lib/src/raw_socket.dart +++ /dev/null @@ -1,27 +0,0 @@ -import 'dart:async'; - -import 'socket.dart'; - -/// Main class to use when wishing to establish a persistent connection -/// with a Phoenix backend using WebSockets. -class PhoenixRawSocket extends PhoenixSocket { - final StreamController> _receiveStreamController = - StreamController.broadcast(); - - /// Creates an instance of PhoenixRawSocket - /// - /// endpoint is the full url to which you wish to connect - /// e.g. `ws://localhost:4000/websocket/socket` - PhoenixRawSocket(super.endpoint, {super.socketOptions}); - - @override - void onSocketDataCallback(message) { - if (message is List) { - if (!_receiveStreamController.isClosed) { - _receiveStreamController.add(message); - } - } else { - throw ArgumentError('Received a non-list of integers'); - } - } -} diff --git a/lib/src/socket.dart b/lib/src/socket.dart index 0749261..9b6cef5 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -1,44 +1,18 @@ import 'dart:async'; import 'dart:core'; -import 'dart:math'; import 'package:logging/logging.dart'; +import 'package:phoenix_socket/phoenix_socket.dart'; +import 'package:phoenix_socket/src/socket_connection.dart'; import 'package:phoenix_socket/src/utils/iterable_extensions.dart'; import 'package:rxdart/rxdart.dart'; import 'package:web_socket_channel/status.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; -import 'channel.dart'; -import 'events.dart'; -import 'exceptions.dart'; -import 'message.dart'; -import 'push.dart'; -import 'socket_options.dart'; - part '_stream_router.dart'; -/// State of a [PhoenixSocket]. -enum SocketState { - /// The connection is closed - closed, - - /// The connection is closing - closing, - - /// The connection is opening - connecting, - - /// The connection is established - connected, - - /// Unknown state - unknown, -} - final Logger _logger = Logger('phoenix_socket.socket'); -final Random _random = Random(); - /// Main class to use when wishing to establish a persistent connection /// with a Phoenix backend using WebSockets. class PhoenixSocket { @@ -56,57 +30,56 @@ class PhoenixSocket { /// The factory to use to create the WebSocketChannel. WebSocketChannel Function(Uri uri)? webSocketChannelFactory, }) : _endpoint = endpoint, - _socketState = SocketState.unknown, - _webSocketChannelFactory = webSocketChannelFactory { - _options = socketOptions ?? PhoenixSocketOptions(); - - _reconnects = _options.reconnectDelays; - + _options = socketOptions ?? const PhoenixSocketOptions(), + _webSocketChannelFactory = + webSocketChannelFactory ?? WebSocketChannel.connect { _messageStream = _receiveStreamController.stream.map(_options.serializer.decode); - _openStream = - _stateStreamController.stream.whereType(); - + _stateEventStreamController.stream.whereType(); _closeStream = - _stateStreamController.stream.whereType(); - + _stateEventStreamController.stream.whereType(); _errorStream = - _stateStreamController.stream.whereType(); + _stateEventStreamController.stream.whereType(); _subscriptions = [ _messageStream.listen(_onMessage), - _openStream.listen((_) => _startHeartbeat()), - _closeStream.listen((_) => _cancelHeartbeat()) + _openStream.listen((_) => _isOpen = true), + _closeStream.listen(_onSocketClosed), + _errorStream.listen(_onSocketError), + _socketStateStream.distinct().listen(_onSocketStateChanged), ]; } + final String _endpoint; + final PhoenixSocketOptions _options; + final WebSocketChannel Function(Uri uri)? _webSocketChannelFactory; + + /// Default duration for a connection timeout. + Duration get defaultTimeout => _options.timeout; + final Map> _pendingMessages = {}; final Map> _topicStreams = {}; - final BehaviorSubject _stateStreamController = + final BehaviorSubject _stateEventStreamController = BehaviorSubject(); final StreamController _receiveStreamController = StreamController.broadcast(); - final String _endpoint; final StreamController _topicMessages = StreamController(); - final WebSocketChannel Function(Uri uri)? _webSocketChannelFactory; - - late Uri _mountPoint; + final BehaviorSubject _socketStateStream = + BehaviorSubject(); late Stream _openStream; late Stream _closeStream; late Stream _errorStream; late Stream _messageStream; - - SocketState _socketState; - - WebSocketChannel? _ws; + SocketConnectionManager? _connectionManager; _StreamRouter? _router; /// Stream of [PhoenixSocketOpenEvent] being produced whenever - /// the connection is open. + /// the connection is open, that is when the initial heartbeat + /// completes successfully. Stream get openStream => _openStream; /// Stream of [PhoenixSocketCloseEvent] being produced whenever @@ -120,34 +93,36 @@ class PhoenixSocket { /// Stream of all [Message] instances received. Stream get messageStream => _messageStream; - /// Reconnection durations, increasing in length. - late List _reconnects; - List _subscriptions = []; int _ref = 0; - String? _nextHeartbeatRef; - Timer? _heartbeatTimeout; /// A property yielding unique message reference ids, /// monotonically increasing. String get nextRef => '${_ref++}'; - int _reconnectAttempts = 0; + Timer? _heartbeatTimeout; - bool _shouldReconnect = true; - bool _reconnecting = false; + String? _latestHeartbeatRef; /// [Map] of topic names to [PhoenixChannel] instances being /// maintained and tracked by the socket. Map channels = {}; - late PhoenixSocketOptions _options; + bool _disposed = false; - /// Default duration for a connection timeout. - Duration get defaultTimeout => _options.timeout; + bool _isOpen = false; - bool _disposed = false; + /// Whether the phoenix socket is ready to join channels. Note that this is + /// not the same as the WebSocketConnected state, but rather is set to true + /// when both WebSocket is connected, and the first heartbeat reply has been + /// received. + bool get isOpen => _isOpen; + + bool get _isConnectingOrConnected => switch (_socketStateStream.valueOrNull) { + WebSocketConnecting() || WebSocketConnected() => true, + _ => false, + }; _StreamRouter get _streamRouter => _router ??= _StreamRouter(_topicMessages.stream); @@ -160,89 +135,75 @@ class PhoenixSocket { Stream streamForTopic(String topic) => _topicStreams.putIfAbsent( topic, () => _streamRouter.route((event) => event.topic == topic)); - /// The string URL of the remote Phoenix server. - String get endpoint => _endpoint; + // CONNECTION - /// The [Uri] containing all the parameters and options for the - /// remote connection to occur. - Uri get mountPoint => _mountPoint; - - /// Whether the underlying socket is connected of not. - bool get isConnected => _ws != null && _socketState == SocketState.connected; - - bool get _isConnectingOrConnected => - _socketState == SocketState.connecting || isConnected; + /// Connects to the underlying WebSocket and prepares this PhoenixSocket for + /// connecting to channels. + /// + /// The returned future will complete when a connection is established and the + /// first heartbeat round-trip completes. + /// + /// If [immediately] is set to `true` and if a connection is not established, + /// it will attempt to connect to a socket without delay. + /// + /// If a connection is already open, then this method returns immediately. If + /// you want to force a new connection, use [close] with reconnect parameter + /// set to true. + Future connect({bool immediately = false}) async { + if (_disposed) { + throw StateError('PhoenixSocket cannot connect after being disposed.'); + } - Future connect() async { if (_isConnectingOrConnected) { _logger.warning( 'Calling connect() on already connected or connecting socket.', ); - return; } - _shouldReconnect = true; - - if (_disposed) { - throw StateError('PhoenixSocket cannot connect after being disposed.'); - } - - _socketState = SocketState.connecting; - try { - _mountPoint = await _buildMountPoint(_endpoint, _options); - } catch (error, stacktrace) { - _stateStreamController.add( - PhoenixSocketErrorEvent( - error: error, - stacktrace: stacktrace, - ), + if (isOpen) { + return; + } else if (!_socketStateStream.hasValue || _isConnectingOrConnected) { + await (_connectionManager ??= _createConnectionManager()) + .start(immediately: immediately); + } else { + await _reconnect( + normalClosure, // Any code is a good code. + immediately: immediately, ); - _ws = null; - _socketState = SocketState.closed; - _reconnectAttempts++; - return _delayedReconnect(); } - _logger.finest(() => 'Attempting to connect to $_mountPoint'); - - try { - _ws = _webSocketChannelFactory != null - ? _webSocketChannelFactory!(_mountPoint) - : WebSocketChannel.connect(_mountPoint); - - // Wait for the WebSocket to be ready before continuing. In case of a - // failure to connect, the future will complete with an error and will be - // caught. - await _ws!.ready; - - _socketState = SocketState.connected; - - _ws!.stream - .where(_shouldPipeMessage) - .listen(_onSocketData, cancelOnError: true) - ..onError(_onSocketError) - ..onDone(_onSocketClosed); - } catch (error, stacktrace) { - _onSocketError(error, stacktrace); - } + assert( + _stateEventStreamController.valueOrNull is! PhoenixSocketOpenEvent, + 'Phoenix socket should not be open at this stage', + ); + await _openStream.first; + } - _reconnectAttempts++; + SocketConnectionManager _createConnectionManager() { + return SocketConnectionManager( + factory: () async { + final mountPoint = await _buildMountPoint(); + return (_webSocketChannelFactory ?? WebSocketChannel.connect) + .call(mountPoint); + }, + reconnectDelays: _options.reconnectDelays, + readyTimeout: _options.timeout, + onMessage: onSocketDataCallback, + onError: (error, [stackTrace]) => _stateEventStreamController.add( + PhoenixSocketErrorEvent(error: error, stacktrace: stackTrace), + ), + onStateChange: _socketStateStream.add, + ); + } - try { - _logger.finest('Waiting for initial heartbeat round trip'); - if (await _sendHeartbeat(ignorePreviousHeartbeat: true)) { - _stateStreamController.add(PhoenixSocketOpenEvent()); - _logger.info('Socket open'); - return; - } else { - throw PhoenixException(); - } - } catch (err, stackTrace) { - _logger.severe('Raised Exception', err, stackTrace); - _ws = null; - _socketState = SocketState.closed; - return _delayedReconnect(); - } + Future _buildMountPoint() async { + var decodedUri = Uri.parse(_endpoint); + final params = await _options.getParams(); + final queryParams = decodedUri.queryParameters.entries.toList() + ..addAll(params.entries.toList()); + return decodedUri.replace( + queryParameters: Map.fromEntries(queryParams), + ); } /// Close the underlying connection supporting the socket. @@ -251,13 +212,14 @@ class PhoenixSocket { String? reason, reconnect = false, ]) { - _shouldReconnect = reconnect; - if (isConnected) { - // _ws != null and state is connected - _socketState = SocketState.closing; - _closeSink(code, reason); - } else if (!_shouldReconnect) { - dispose(); + if (reconnect) { + _reconnect(code ?? normalClosure, reason: reason); + } else { + _closeConnection( + // Should be goingAway, but https://github.com/dart-lang/http/issues/1294 + code ?? normalClosure, + reason: reason, + ); } } @@ -266,11 +228,8 @@ class PhoenixSocket { /// Don't forget to call this at the end of the lifetime of /// a socket. void dispose() { - _shouldReconnect = false; if (_disposed) return; - _disposed = true; - _ws?.sink.close(); for (final sub in _subscriptions) { sub.cancel(); @@ -290,46 +249,75 @@ class PhoenixSocket { _topicMessages.close(); _topicStreams.clear(); - _stateStreamController.close(); + _connectionManager?.dispose(normalClosure); + _connectionManager = null; + + _socketStateStream.close(); + _stateEventStreamController.close(); _receiveStreamController.close(); + + _logger.info('Disposed of PhoenixSocket'); } - /// Wait for an expected message to arrive. - /// - /// Used internally when expecting a message like a heartbeat - /// reply, a join reply, etc. If you need to wait for the - /// reply of message you sent on a channel, you would usually - /// use wait the returned [Push.future]. - Future waitForMessage(Message message) { - if (message.ref == null) { - throw ArgumentError.value( - message, - 'message', - 'needs to contain a ref in order to be awaited for', - ); + Future _reconnect( + int code, { + String? reason, + bool immediately = false, + }) async { + if (_disposed) { + throw StateError('Cannot reconnect a disposed socket'); } - final msg = _pendingMessages[message.ref!]; - if (msg != null) { - return msg.future; + + if (_connectionManager == null) { + return connect(immediately: immediately); + } + + _connectionManager! + .reconnect(code, reason: reason, immediately: immediately); + } + + Future _closeConnection(int code, {String? reason}) async { + if (_disposed) { + _logger.warning('Cannot close a disposed socket'); + return; + } + if (_connectionManager != null) { + _connectionManager!.dispose(code, reason); + _connectionManager = null; + + if (_socketStateStream.valueOrNull is! WebSocketDisconnected) { + await _socketStateStream + .firstWhere((state) => state is WebSocketDisconnected); + } + } else { + _isOpen = false; + if (_stateEventStreamController.valueOrNull is! PhoenixSocketCloseEvent) { + _stateEventStreamController + .add(PhoenixSocketCloseEvent(code: code, reason: reason)); + } } - return Future.error( - ArgumentError( - "Message hasn't been sent using this socket.", - ), - ); } + /// MESSAGING + /// Send a channel on the socket. /// - /// Used internally to send prepared message. If you need to send - /// a message on a channel, you would usually use [PhoenixChannel.push] - /// instead. - Future sendMessage(Message message) { - if (_ws?.sink == null) { - return Future.error(PhoenixException( - socketClosed: PhoenixSocketCloseEvent(), - )); + /// Used internally to send prepared message. If you need to send a message on + /// a channel, you would usually use [PhoenixChannel.push] instead. + /// + /// Returns a future that completes when the reply for the sent message is + /// received. If your flow awaits for the result of this future, add a timeout + /// to it so that you are not stuck in case that the reply is never received. + Future sendMessage(Message message) async { + return (_pendingMessages[message.ref!] = await _sendMessage(message)) + .future; + } + + Future> _sendMessage(Message message) async { + if (_disposed) { + throw StateError('Cannot add messages to a disposed socket'); } + if (message.ref == null) { throw ArgumentError.value( message, @@ -337,24 +325,17 @@ class PhoenixSocket { 'does not contain a ref', ); } - _addToSink(_options.serializer.encode(message)); - return (_pendingMessages[message.ref!] = Completer()).future; - } - void _addToSink(String data) { - if (_disposed) { - return; + if (_connectionManager == null) { + throw StateError('Cannot add messages to a disconnected socket'); } - _ws!.sink.add(data); - } - void _closeSink([int? code, String? reason]) { - if (_disposed || _ws == null) { - return; - } - _ws!.sink.close(code, reason); + await _connectionManager!.addMessage(_options.serializer.encode(message)); + return _pendingMessages[message.ref!] = Completer(); } + /// CHANNELS + /// [topic] is the name of the channel you wish to join /// [parameters] are any options parameters you wish to send PhoenixChannel addChannel({ @@ -392,119 +373,161 @@ class PhoenixSocket { } } - /// Processing incoming data from the socket - /// - /// Used to define a custom message type for proper data decoding - onSocketDataCallback(message) { - if (message is String) { - if (!_receiveStreamController.isClosed) { - _receiveStreamController.add(message); - } - } else { - throw ArgumentError('Received a non-string'); - } - } - - bool _shouldPipeMessage(dynamic event) { - if (event is WebSocketChannelException) { - return true; - } else if (_socketState != SocketState.closed) { - return true; - } else { - _logger.warning( - 'Message from socket dropped because PhoenixSocket is closed', - ' $event', + void _triggerChannelExceptions(PhoenixException exception) { + _logger.fine( + () => 'Trigger channel exceptions on ${channels.length} channels', + ); + for (final channel in channels.values) { + _logger.finer( + () => 'Trigger channel exceptions on ${channel.topic}', ); - return false; + channel.triggerError(exception); } } - static Future _buildMountPoint( - String endpoint, PhoenixSocketOptions options) async { - var decodedUri = Uri.parse(endpoint); - final params = await options.getParams(); - final queryParams = decodedUri.queryParameters.entries.toList() - ..addAll(params.entries.toList()); - return decodedUri.replace( - queryParameters: Map.fromEntries(queryParams), - ); - } + /// HEARTBEAT - void _startHeartbeat() { - _reconnectAttempts = 0; - _heartbeatTimeout ??= Timer.periodic( - _options.heartbeat, - (_) => _sendHeartbeat(), - ); - } + Future _startHeartbeat() async { + if (_socketStateStream.valueOrNull is! WebSocketConnected) { + throw StateError('Cannot start heartbeat while disconnected'); + } + + _cancelHeartbeat(successfully: false); - void _cancelHeartbeat() { - _heartbeatTimeout?.cancel(); - _heartbeatTimeout = null; + _logger.info('Waiting for initial heartbeat round trip'); + final initialHeartbeatSucceeded = await _sendHeartbeat(force: true); + if (initialHeartbeatSucceeded) { + _logger.info('Socket open'); + _stateEventStreamController.add(PhoenixSocketOpenEvent()); + } + // The "else" case is handled - if needed - by the catch clauses in + // [_sendHeartbeat]. } - Future _sendHeartbeat({bool ignorePreviousHeartbeat = false}) async { - if (!isConnected) return false; + /// Returns a Future completing with true if the heartbeat message was sent, + /// and the reply to it was received. + Future _sendHeartbeat({bool force = false}) async { + if (!force && !isOpen) return false; - if (_nextHeartbeatRef != null && !ignorePreviousHeartbeat) { - _nextHeartbeatRef = null; - if (_ws != null) { - _closeSink(normalClosure, 'heartbeat timeout'); + if (_heartbeatTimeout != null) { + if (_heartbeatTimeout!.isActive) { + // Previous timeout is active, let it finish and schedule a heartbeat + // itself, if successful. + return false; } - return false; + + _cancelHeartbeat(); } try { - await sendMessage(_heartbeatMessage()); - _logger.fine('[phoenix_socket] Heartbeat completed'); + final heartbeatMessage = Message.heartbeat(nextRef); + // Using _sendMessages to wait for potential problems when sending message + // (eg. WebSocket failure), but not until the response is reported. + final heartbeatCompleter = await _sendMessage(heartbeatMessage); + _logger.fine('Heartbeat ${heartbeatMessage.ref} sent'); + final heartbeatRef = _latestHeartbeatRef = heartbeatMessage.ref!; + + _heartbeatTimeout = _scheduleHeartbeat(heartbeatRef); + + await heartbeatCompleter.future; + _logger.fine('Heartbeat $heartbeatRef completed'); return true; - } on WebSocketChannelException catch (err, stacktrace) { - _logger.severe( - '[phoenix_socket] Heartbeat message failed: WebSocketChannelException', - err, - stacktrace, + } on WebSocketChannelException catch (error, stackTrace) { + _logger.warning( + 'Heartbeat message failed: WebSocketChannelException', + error, + stackTrace, ); - _triggerChannelExceptions(PhoenixException( - socketError: PhoenixSocketErrorEvent( - error: err, - stacktrace: stacktrace, + _stateEventStreamController.add( + PhoenixSocketErrorEvent( + error: error, + stacktrace: stackTrace, ), - )); - - return false; - } catch (err, stacktrace) { - _logger.severe( - '[phoenix_socket] Heartbeat message failed', - err, - stacktrace, ); + return false; + } catch (error, stackTrace) { + _logger.warning('Heartbeat message failed', error, stackTrace); + if (!_disposed && _connectionManager != null) { + _reconnect(heartbeatTimedOut, reason: 'Heartbeat timed out'); + } return false; } } - void _triggerChannelExceptions(PhoenixException exception) { - _logger.fine( - () => 'Trigger channel exceptions on ${channels.length} channels', - ); - for (final channel in channels.values) { - _logger.finer( - () => 'Trigger channel exceptions on ${channel.topic}', + Timer _scheduleHeartbeat([String? previousHeartbeat]) { + return Timer(_options.heartbeat, () { + if (previousHeartbeat != null) { + final completer = _pendingMessages.remove(previousHeartbeat); + if (completer != null && !completer.isCompleted) { + completer.completeError( + TimeoutException( + 'Heartbeat $previousHeartbeat not completed before sending new one', + ), + StackTrace.current, + ); + return; + } + } + + _sendHeartbeat(); + }); + } + + void _cancelHeartbeat({bool successfully = true}) { + if (_heartbeatTimeout != null) { + _heartbeatTimeout?.cancel(); + _heartbeatTimeout = null; + } + if (_latestHeartbeatRef != null) { + final heartbeatCompleter = _pendingMessages.remove(_latestHeartbeatRef); + if (successfully) { + heartbeatCompleter?.complete( + Message.heartbeat(_latestHeartbeatRef!), // doesn't matter + ); + } else { + heartbeatCompleter?.completeError('Heartbeat cancelled'); + } + _latestHeartbeatRef = null; + } + } + + bool _shouldPipeMessage(String message) { + final currentSocketState = _socketStateStream.valueOrNull; + if (currentSocketState is WebSocketConnected) { + return true; + } else { + _logger.warning( + 'Message from socket dropped because PhoenixSocket is not ready (is $currentSocketState)' + '\n$message', ); - channel.triggerError(exception); + + return false; } } - Message _heartbeatMessage() => Message.heartbeat(_nextHeartbeatRef = nextRef); + /// EVENT HANDLERS + + // Exists for testing only + void onSocketDataCallback(String message) { + if (_shouldPipeMessage(message) && !_receiveStreamController.isClosed) { + _receiveStreamController.add(message); + } + } void _onMessage(Message message) { - _nextHeartbeatRef = null; if (message.ref != null) { - final completer = _pendingMessages[message.ref!]; + _logger.finer('Received message with ref ${message.ref}'); + final completer = _pendingMessages.remove(message.ref!); if (completer != null) { - _pendingMessages.remove(message.ref); + if (_logger.isLoggable(Level.FINEST)) { + _logger.finest('Completed ref ${message.ref} with $message'); + } completer.complete(message); } + + // The connection is alive, prevent heartbeat timeout from closing it. + _latestHeartbeatRef = null; } if (message.topic != null && message.topic!.isNotEmpty) { @@ -512,91 +535,63 @@ class PhoenixSocket { } } - void _onSocketData(message) => onSocketDataCallback(message); - - void _onSocketError(dynamic error, dynamic stacktrace) { - final socketError = PhoenixSocketErrorEvent( - error: error, - stacktrace: stacktrace, - ); - - if (!_stateStreamController.isClosed) { - _stateStreamController.add(socketError); + void _onSocketStateChanged(WebSocketConnectionState state) { + if (state is! WebSocketConnected) { + _isOpen = false; } - for (final completer in _pendingMessages.values) { - completer.completeError(error, stacktrace); - } + switch (state) { + case WebSocketConnected(): + _startHeartbeat(); + case WebSocketDisconnecting(): + _cancelHeartbeat(successfully: false); - _logger.severe('Error on socket', error, stacktrace); - _triggerChannelExceptions(PhoenixException(socketError: socketError)); - _pendingMessages.clear(); + case WebSocketDisconnected(:final code, :final reason): + // Just in case we skipped the disconnecting event. + _cancelHeartbeat(successfully: false); - _onSocketClosed(); - } - - void _onSocketClosed() { - if (_shouldReconnect) { - _delayedReconnect(); + _stateEventStreamController.add( + PhoenixSocketCloseEvent(code: code, reason: reason), + ); + case WebSocketConnecting(): } + } - if (_socketState == SocketState.closed) { - return; + void _onSocketError(PhoenixSocketErrorEvent errorEvent) { + for (final completer in _pendingMessages.values) { + completer.completeError( + errorEvent.error ?? 'Unknown error', + errorEvent.stacktrace, + ); } + _pendingMessages.clear(); - final ev = PhoenixSocketCloseEvent( - reason: _ws?.closeReason ?? 'WebSocket closed without providing a reason', - code: _ws?.closeCode, - ); - final exc = PhoenixException(socketClosed: ev); - _ws = null; + _logger.severe('Error on socket', errorEvent.error, errorEvent.stacktrace); - if (!_stateStreamController.isClosed) { - _stateStreamController.add(ev); + if (_isOpen) { + _triggerChannelExceptions(PhoenixException(socketError: errorEvent)); } + } - if (_socketState == SocketState.closing) { - if (!_shouldReconnect) { - dispose(); - } + void _onSocketClosed(PhoenixSocketCloseEvent closeEvent) { + if (_disposed) { return; - } else { - _logger.info( - () => 'Socket closed with reason ${ev.reason} and code ${ev.code}', - ); - _triggerChannelExceptions(exc); } - _socketState = SocketState.closed; + final exception = PhoenixException(socketClosed: closeEvent); for (final completer in _pendingMessages.values) { - completer.completeError(exc); + completer.completeError(exception); } _pendingMessages.clear(); - } - - Future _delayedReconnect() async { - if (_reconnecting) return; - - _reconnecting = true; - await Future.delayed(_reconnectDelay()); - if (!_disposed) { - _reconnecting = false; - return connect(); - } - } + _logger.fine( + () => + 'Socket closed with code ${closeEvent.code} and reason "${closeEvent.reason}"', + ); - Duration _reconnectDelay() { - final durationIdx = _reconnectAttempts; - Duration duration; - if (durationIdx >= _reconnects.length) { - duration = _reconnects.last; - } else { - duration = _reconnects[durationIdx]; + if (_connectionManager != null) { + // Otherwise we have closed the connections ourselves. + _triggerChannelExceptions(exception); } - - // Some random number to prevent many clients from retrying to - // connect at exactly the same time. - return duration + Duration(milliseconds: _random.nextInt(1000)); } } diff --git a/lib/src/socket_connection.dart b/lib/src/socket_connection.dart new file mode 100644 index 0000000..356cd9e --- /dev/null +++ b/lib/src/socket_connection.dart @@ -0,0 +1,450 @@ +import 'dart:async'; + +import 'package:logging/logging.dart'; +import 'package:phoenix_socket/phoenix_socket.dart'; +import 'package:phoenix_socket/src/delayed_callback.dart'; +import 'package:rxdart/rxdart.dart'; +import 'package:web_socket_channel/status.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +part 'socket_state.dart'; + +typedef WebSocketChannelFactory = Future Function(); + +final _logger = Logger('phoenix_socket.connection'); + +// Some custom close codes. +const heartbeatTimedOut = 4001; +const forcedReconnectionRequested = 4002; + +/// Maintains connection to the underlying websocket, reconnecting to it if +/// necessary. +class SocketConnectionManager { + SocketConnectionManager({ + required WebSocketChannelFactory factory, + required List reconnectDelays, + required Duration readyTimeout, + required void Function(String) onMessage, + required void Function(WebSocketConnectionState) onStateChange, + required void Function(Object, [StackTrace?]) onError, + }) : _factory = factory, + _reconnectDelays = reconnectDelays, + _readyTimeout = readyTimeout, + _onError = onError, + _onStateChange = onStateChange, + _onMessage = onMessage; + + final WebSocketChannelFactory _factory; + final List _reconnectDelays; + final Duration _readyTimeout; + final void Function(String message) _onMessage; + final void Function(WebSocketConnectionState state) _onStateChange; + final void Function(Object error, [StackTrace? stacktrace]) _onError; + + /// Currently attempted or live connection. Value is null only before + /// any connection was attempted. + final _connectionsStream = BehaviorSubject<_WebSocketConnection>(); + + /// Count of consecutive attempts at establishing a connection without + /// success. + int _connectionAttempts = 0; + + /// References to the last initialized connection attempt. + DelayedCallback<_WebSocketConnection>? _currentAttempt; + + bool _disposed = false; + + /// Requests to start connecting to the socket. Returns a future that + /// completes when a WebSocket connection has been established. + /// + /// If [immediately] is false, and a connection is already established, then + /// this method does not have any effect. + /// + /// If [immediately] is set to true, then an attempt to connect is made + /// immediately. This might result in dropping the current connection and + /// establishing a new one. + Future start({bool immediately = false}) async { + if (_disposed) { + throw StateError('Cannot start: WebSocket connection manager disposed'); + } + + if (immediately) { + final currentAttempt = _currentAttempt; + if (_connectionsStream.hasValue && + currentAttempt != null && + !currentAttempt.delayDone) { + currentAttempt.skipDelay(); + return; + } + + await reconnect( + forcedReconnectionRequested, + reason: 'Immediate connection requested', + immediately: true, + ); + return; + } + await _maybeConnect(); + } + + /// Sends a message to the socket. Will start connecting to the socket if + /// necessary. + /// + /// Returns a future that completes when the message is successfully passed to + /// an established WebSocket. + /// + /// The future will complete with error if the message couldn't be added to + /// a WebSocket, either because this connection manager was disposed, or the + /// WebSocket could not accept messages. + Future addMessage(String message) async { + final connection = await _maybeConnect(); + return connection.send(message); + } + + /// Forces reconnection to the WebSocket. Returns a future that completes when + /// a WebSocket connection has been established. + /// + /// If a connection was already established, it gets closed with the provided + /// code and optional reason. + /// + /// If [immediately] is true, then the new connection attempt is executed + /// immediately, irrespective of ordinary reconnection delays provided to the + /// constructor. + Future reconnect(int code, {String? reason, bool immediately = false}) { + final currentConnection = _connectionsStream.valueOrNull; + final connectFuture = _connect(); + if (immediately) { + _currentAttempt?.skipDelay(); + } + if (currentConnection?.connected == true) { + currentConnection?.close(code, reason); + } + return connectFuture; + } + + /// Disposes of the connection manager. The current connection (or attempt + /// at one) is cancelled, and attempt to establish a new one will fail. + /// + /// If this manager is already disposed, this is a no-op. + void dispose(int code, [String? reason]) { + if (_disposed) { + _logger.info('WebSocket connection manager already disposed'); + return; + } + _logger.fine('Disposing connection manager'); + _disposed = true; + _currentAttempt = null; + final currentConnection = _connectionsStream.valueOrNull; + _connectionsStream.close(); + if (currentConnection != null && currentConnection.connected) { + currentConnection.close(code, reason); + _onStateChange(WebSocketDisconnected._(code, reason)); + } + } + + /// Establishes a new connection unless one is already available/in progress. + Future<_WebSocketConnection> _maybeConnect() { + if (_disposed) { + throw StateError('Cannot connect: WebSocket connection manager disposed'); + } + + if (!_connectionsStream.hasValue && _currentAttempt == null) { + _connect(); + } + + return _connectionsStream.firstWhere((connection) => connection.connected); + } + + /// Starts attempt to connect, and returns when a WebSocket connection has + /// been successfully established. + /// + /// Upon completing, the [_pendingConnection] field will be set to the newly + /// established connection Future, and the same Future will be returned. + /// + /// Can throw/complete with an exception if during any asynchronous operation, + /// this [SocketConnectionManager] is disposed. + /// + /// The [_onError] callback can be invoked with an instance of + /// [ConnectionInitializationException] in case the initialization of + /// connection fails. However, the reconnection will be triggered until it is + /// established, or interrupted by call to [dispose]. + Future<_WebSocketConnection> _connect() async { + if (_disposed) { + throw StateError('Cannot connect: WebSocket connection manager disposed'); + } + + while (_connectionsStream.valueOrNull?.connected != true) { + final delay = _reconnectDelay(); + late final DelayedCallback attempt; + attempt = _currentAttempt = DelayedCallback<_WebSocketConnection>( + delay: delay, + callback: () => _runConnectionAttempt(attempt), + ); + _connectionAttempts++; + + if (_logger.isLoggable(Level.FINE)) { + _logger.fine(() { + final durationString = delay == Duration.zero + ? 'now' + : 'in ${delay.inMilliseconds} milliseconds'; + return 'Triggering attempt #$_connectionAttempts (id: ${attempt.idAsString}) to connect $durationString'; + }); + } + + _WebSocketConnection? connection; + try { + connection = await attempt.callbackFuture; + } catch (error, stackTrace) { + _logger.warning('Failed to initialize connection', error, stackTrace); + if (attempt == _currentAttempt) { + _onError(error, stackTrace); + } + } finally { + if (_disposed) { + // Manager was disposed while running connection attempt. + // Should be goingAway, but https://github.com/dart-lang/http/issues/1294 + connection?.close(normalClosure, 'Client disposed'); + throw StateError('Client disposed'); + } else if (attempt != _currentAttempt) { + // Some other attempt was started, close and await for other attempt to + // complete. + connection?.close(normalClosure, 'Closing obsolete connection'); + await _connectionsStream + .firstWhere((connection) => connection.connected); + } else if (connection != null) { + // Correctly established connection. + _logger.fine('Established WebSocket connection'); + _connectionAttempts = 0; + _connectionsStream.add(connection); + } + } + } + + return _connectionsStream.value; + } + + Future<_WebSocketConnection> _runConnectionAttempt( + DelayedCallback attempt, + ) async { + if (attempt != _currentAttempt) { + throw StateError('Current attempt obsoleted while delaying'); + } + + try { + return await _WebSocketConnection.connect( + _factory, + timeout: _readyTimeout, + callbacks: _ConnectionCallbacks(attempt: attempt, manager: this), + ); + } on ConnectionInitializationException { + rethrow; + } catch (error, stackTrace) { + throw ConnectionInitializationException(error, stackTrace); + } + } + + Duration _reconnectDelay() { + final delayIndex = + _connectionAttempts.clamp(0, _reconnectDelays.length - 1); + return _reconnectDelays[delayIndex]; + } +} + +/// Wraps upstream callbacks to filter out obsolete or invalid callbacks from +/// _WebSocketConnection. +final class _ConnectionCallbacks { + _ConnectionCallbacks({ + required this.attempt, + required this.manager, + }); + + final DelayedCallback attempt; + String get attemptIdString => attempt.idAsString; + final SocketConnectionManager manager; + + WebSocketConnectionState? lastState; + + void onMessage(String message) { + if (attempt != manager._currentAttempt) { + if (_logger.isLoggable(Level.FINER)) { + _logger.finer( + 'Preventing message reporting for old connection attempt $attemptIdString', + ); + } + return; + } + + manager._onMessage(message); + } + + void onError(Object error, [StackTrace? stackTrace]) { + if (attempt != manager._currentAttempt) { + if (_logger.isLoggable(Level.FINER)) { + _logger.finer( + 'Preventing error reporting for old connection attempt $attemptIdString', + ); + } + return; + } + + manager._onError(error, stackTrace); + } + + void onStateChange(WebSocketConnectionState newState) { + if (attempt != manager._currentAttempt) { + if (_logger.isLoggable(Level.FINER)) { + _logger.finer( + 'Preventing connection state update for old connection attempt $attemptIdString', + ); + } + return; + } + + if (!_isTransitionAllowed(lastState, newState)) { + if (_logger.isLoggable(Level.FINE)) { + _logger.fine( + 'Preventing connection state change for $attemptIdString from $lastState to $newState', + ); + } + return; + } + + if (_logger.isLoggable(Level.FINE)) { + _logger.fine( + 'Changing connection state for $attemptIdString from $lastState to $newState', + ); + } + lastState = newState; + + if (newState is WebSocketDisconnected) { + _logger.fine('Socket closed, attempting to reconnect'); + manager._connect(); + } + + manager._onStateChange(newState); + } + + bool _isTransitionAllowed( + WebSocketConnectionState? lastState, + WebSocketConnectionState newState, + ) { + switch ((lastState, newState)) { + case (null, _): + return true; + case (final a, final b) when a == b: + case (_, WebSocketConnecting()): + case (WebSocketDisconnected(), _): + case (WebSocketDisconnecting(), final b) when b is! WebSocketDisconnected: + return false; + case _: + return true; + } + } +} + +class _WebSocketConnection { + static Future<_WebSocketConnection> connect( + WebSocketChannelFactory factory, { + required _ConnectionCallbacks callbacks, + required Duration timeout, + }) async { + callbacks.onStateChange(const WebSocketConnecting._()); + final WebSocketChannel ws; + try { + ws = await factory(); + await ws.ready.timeout(timeout); + } catch (error, stackTrace) { + throw ConnectionInitializationException(error, stackTrace); + } + + callbacks.onStateChange(const WebSocketConnected._()); + + return _WebSocketConnection._( + ws, + onMessage: callbacks.onMessage, + onError: callbacks.onError, + onStateChange: callbacks.onStateChange, + ); + } + + _WebSocketConnection._( + this._ws, { + required void Function(String message) onMessage, + required void Function(Object error, [StackTrace? stackTrace]) onError, + required void Function(WebSocketConnectionState state) onStateChange, + }) { + late final StreamSubscription subscription; + subscription = _ws.stream.listen( + (event) => event is String + ? onMessage(event) + : onError(PhoenixException(channelEvent: event), StackTrace.current), + onError: onError, + onDone: () { + connected = false; + subscription.cancel(); + onStateChange( + WebSocketDisconnected._( + _ws.closeCode ?? noStatusReceived, + _ws.closeReason, + ), + ); + }, + ); + + _ws.sink.done.then( + (_) { + if (connected) { + connected = false; + onStateChange(const WebSocketDisconnecting._()); + } + }, + onError: onError, + ); + } + + final WebSocketChannel _ws; + bool connected = true; + + void send(dynamic message) { + if (!connected) { + throw WebSocketChannelException( + 'Trying to send a message after WebSocket sink closed.', + ); + } + + _ws.sink.add(message); + } + + sendError(Object messageError, StackTrace? messageTrace) { + if (!connected) { + throw WebSocketChannelException( + 'Trying to send a message after WebSocket sink closed.', + ); + } + _ws.sink.addError(messageError, messageTrace); + } + + /// Closes the underlying connection providing the code and reason to the + /// WebSocket's Close frame. Has no effect if already closed. + /// + /// Note that no [WebSocketDisconnecting] event is going to be emitted during + /// close initiated by the client. + void close(int status, [String? reason]) { + if (connected) { + connected = false; + _ws.sink.close(status, reason); + } + } +} + +final class ConnectionInitializationException { + ConnectionInitializationException(this.cause, this.stackTrace); + + final Object cause; + final StackTrace stackTrace; + + @override + String toString() { + return 'WebSocket connection failed to initialize: $cause\n$stackTrace'; + } +} diff --git a/lib/src/socket_options.dart b/lib/src/socket_options.dart index 778344e..c549d9a 100644 --- a/lib/src/socket_options.dart +++ b/lib/src/socket_options.dart @@ -13,6 +13,10 @@ class PhoenixSocketOptions { /// The interval between heartbeat roundtrips Duration? heartbeat, + /// The duration after which a heartbeat request + /// is considered timed out + Duration? heartbeatTimeout, + /// The list of delays between reconnection attempts. /// /// The last duration will be repeated until it works. @@ -40,6 +44,7 @@ class PhoenixSocketOptions { }) : _timeout = timeout ?? const Duration(seconds: 10), serializer = serializer ?? const MessageSerializer(), _heartbeat = heartbeat ?? const Duration(seconds: 30), + _heartbeatTimeout = heartbeatTimeout ?? const Duration(seconds: 10), assert(!(params != null && dynamicParams != null), "Can't set both params and dynamicParams"); @@ -49,6 +54,7 @@ class PhoenixSocketOptions { final Duration _timeout; final Duration _heartbeat; + final Duration _heartbeatTimeout; /// Duration after which a request is assumed to have timed out. Duration get timeout => _timeout; @@ -56,6 +62,11 @@ class PhoenixSocketOptions { /// Duration between heartbeats Duration get heartbeat => _heartbeat; + /// Duration after which a heartbeat request is considered timed out. + /// If the server does not respond to a heartbeat request within this + /// duration, the connection is considered lost. + Duration get heartbeatTimeout => _heartbeatTimeout; + /// Optional list of Duration between reconnect attempts final List reconnectDelays; diff --git a/lib/src/socket_state.dart b/lib/src/socket_state.dart new file mode 100644 index 0000000..fbb75ca --- /dev/null +++ b/lib/src/socket_state.dart @@ -0,0 +1,71 @@ +part of 'socket_connection.dart'; + +/// Represents current logical state of the underlying WebSocket connection. +/// +/// The flow of the states for a single connection attempt is: +/// Connecting → Connected → Disconnecting → Disconnected +/// +/// However, the state can change to Disconnected from any state. +/// +/// A single WebSocket connection does not get re-initialized. +sealed class WebSocketConnectionState { + const WebSocketConnectionState._(); + + @override + int get hashCode => runtimeType.hashCode; + + @override + bool operator ==(Object other) { + return runtimeType == other.runtimeType; + } +} + +/// A connection attempt has started. This encompasses both local preparation of +/// initial connection, and waiting for connection to become ready. +final class WebSocketConnecting extends WebSocketConnectionState { + const WebSocketConnecting._() : super._(); + + @override + String toString() => 'WebSocketConnecting'; +} + +/// WebSocket connection was established and accepts sending messages through it. +final class WebSocketConnected extends WebSocketConnectionState { + const WebSocketConnected._() : super._(); + + @override + String toString() => 'WebSocketConnected'; +} + +/// WebSocket connection has stopped accepting messages, and waits for final +/// server reply to the Close message. +final class WebSocketDisconnecting extends WebSocketConnectionState { + const WebSocketDisconnecting._() : super._(); + + @override + String toString() => 'WebSocketDisconnecting'; +} + +/// WebSocket connection does not accept nor provide messages, nor will in the +/// future. This also encompasses situations where WebSocket connection was not +/// established at all. +final class WebSocketDisconnected extends WebSocketConnectionState { + WebSocketDisconnected._(this.code, this.reason) : super._(); + + final int code; + final String? reason; + + @override + String toString() => 'WebSocketDisconnected($code, $reason)'; + + @override + int get hashCode => Object.hash(code, reason); + + @override + bool operator ==(Object other) { + return other is WebSocketDisconnected && + other.code == code && + other.reason == reason && + other.runtimeType == runtimeType; + } +} diff --git a/test/channel_integration_test.dart b/test/channel_integration_test.dart index 79b1501..cf93fff 100644 --- a/test/channel_integration_test.dart +++ b/test/channel_integration_test.dart @@ -29,6 +29,7 @@ void main() { final completer = Completer(); await socket.connect(); + addTearDown(socket.dispose); socket.addChannel(topic: 'channel1').join().onReply('ok', (reply) { expect(reply.status, equals('ok')); completer.complete(); @@ -45,6 +46,7 @@ void main() { final completer = Completer(); await socket.connect(); + addTearDown(socket.dispose); socket.addChannel(topic: 'channel1').join().onReply('ok', (reply) { expect(reply.status, equals('ok')); @@ -61,6 +63,7 @@ void main() { final completer = Completer(); await socket.connect(); + addTearDown(socket.dispose); await haltProxy(); final joinFuture = socket.addChannel(topic: 'channel1').join(); @@ -82,9 +85,7 @@ void main() { final completer = Completer(); await socket.connect(); - addTearDown(() { - socket.close(); - }); + addTearDown(socket.dispose); await resetPeer(); runZonedGuarded(() { @@ -180,6 +181,7 @@ void main() { final socket = PhoenixSocket(addr); await socket.connect(); + addTearDown(socket.dispose); final channel1 = socket.addChannel(topic: 'channel1'); await channel1.join().future; @@ -198,6 +200,7 @@ void main() { final socket = PhoenixSocket(addr); await socket.connect(); + addTearDown(socket.dispose); final channel1 = socket.addChannel(topic: 'channel1'); await channel1.join().future; @@ -215,17 +218,16 @@ void main() { 'throws when sending messages to channels that got "peer reset" ' 'and that have not recovered yet', () async { final socket = PhoenixSocket(addr); - - await socket.connect(); - - final channel1 = socket.addChannel(topic: 'channel1'); - await channel1.join().future; - - await resetPeer(); - + addTearDown(socket.dispose); final Completer errorCompleter = Completer(); runZonedGuarded(() async { + await socket.connect(); + final channel1 = socket.addChannel(topic: 'channel1'); + await channel1.join().future; + + await resetPeer(); + final push = channel1.push('hello!', {'foo': 'bar'}); try { await push.future; @@ -234,45 +236,19 @@ void main() { } }, (error, stack) {}); - final Object exception; - expect(exception = await errorCompleter.future, isA()); - expect((exception as PhoenixException).socketClosed, isNotNull); + final Object exception = await errorCompleter.future; + expect( + exception, + isA().having( + (exception) => exception.socketClosed, 'socketClosed', isNotNull), + ); }); - test( - 'throws when sending messages to channels that got disconnected ' - 'and that have not recovered yet', - () async { - final socket = PhoenixSocket(addr); - - await socket.connect(); - - final channel1 = socket.addChannel(topic: 'channel1'); - await channel1.join().future; - - await haltProxy(); - - final Completer errorCompleter = Completer(); - runZonedGuarded(() async { - try { - final push = channel1.push('hello!', {'foo': 'bar'}); - await push.future; - } catch (err) { - errorCompleter.complete(err); - } - }, (error, stack) {}); - - expect(await errorCompleter.future, isA()); - }, - timeout: Timeout( - Duration(seconds: 5), - ), - ); - test('only emits reply messages that are channel replies', () async { final socket = PhoenixSocket(addr); socket.connect(); + addTearDown(socket.dispose); final channel1 = socket.addChannel(topic: 'channel1'); final channelMessages = []; @@ -288,6 +264,7 @@ void main() { final socket = PhoenixSocket(addr); await socket.connect(); + addTearDown(socket.dispose); final channel2 = socket.addChannel(topic: 'channel2'); await channel2.join().future; @@ -312,8 +289,8 @@ void main() { await channel2.join().future; addTearDown(() { - socket1.close(); - socket2.close(); + socket1.dispose(); + socket2.dispose(); }); expect( @@ -371,8 +348,8 @@ void main() { await channel2.join().future; addTearDown(() { - socket1.close(); - socket2.close(); + socket1.dispose(); + socket2.dispose(); }); channel1.push('ping', {'from': 'socket1'}); @@ -405,8 +382,8 @@ void main() { await channel2.join().future; addTearDown(() { - socket1.close(); - socket2.close(); + socket1.dispose(); + socket2.dispose(); }); channel1.push('ping', {'from': 'socket1'}); @@ -442,23 +419,10 @@ void main() { ); }); - test('Pushing message on a closed channel throws exception', () async { - final socket = PhoenixSocket(addr); - await socket.connect(); - final channel = socket.addChannel(topic: 'channel3'); - - await channel.join().future; - await channel.leave().future; - - expect( - () => channel.push('EventName', {}), - throwsA(isA()), - ); - }); - test('timeout on send message will throw', () async { final socket = PhoenixSocket(addr); await socket.connect(); + addTearDown(socket.dispose); final channel = socket.addChannel(topic: 'channel1'); await channel.join().future; diff --git a/test/channel_test.dart b/test/channel_test.dart index e4b891f..8c90725 100644 --- a/test/channel_test.dart +++ b/test/channel_test.dart @@ -11,7 +11,7 @@ void main() { () { final mockSocket = MockPhoenixSocket(); when(mockSocket.defaultTimeout).thenReturn(Duration.zero); - when(mockSocket.isConnected).thenReturn(true); + when(mockSocket.isOpen).thenReturn(true); final channel = PhoenixChannel.fromSocket(mockSocket, topic: 'test'); channel.join(); @@ -26,7 +26,7 @@ void main() { () { final mockSocket = MockPhoenixSocket(); when(mockSocket.defaultTimeout).thenReturn(Duration.zero); - when(mockSocket.isConnected).thenReturn(false); + when(mockSocket.isOpen).thenReturn(false); final channel = PhoenixChannel.fromSocket(mockSocket, topic: 'test'); channel.join(); diff --git a/test/mocks.dart b/test/mocks.dart index a6d4aae..ff08e7c 100644 --- a/test/mocks.dart +++ b/test/mocks.dart @@ -1,6 +1,8 @@ +// ignore_for_file: camel_case_types + import 'package:mockito/annotations.dart'; -import 'package:mockito/mockito.dart'; import 'package:phoenix_socket/phoenix_socket.dart'; +import 'package:phoenix_socket/src/socket_connection.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; export 'mocks.mocks.dart'; @@ -12,8 +14,19 @@ export 'mocks.mocks.dart'; MockSpec(), MockSpec(), MockSpec(), + MockSpec(as: #MockOnMessage), + MockSpec(as: #MockOnError), + MockSpec(as: #MockOnStateChange), ], ) -class MockTest extends Mock { - void test(); +abstract class OnMessage_MockBase { + void call(String message); +} + +abstract class OnError_MockBase { + void call(Object error, [StackTrace stackTrace]); +} + +abstract class OnStateChange_MockBase { + void call(WebSocketConnectionState state); } diff --git a/test/mocks.mocks.dart b/test/mocks.mocks.dart index 010aa7c..bc51af1 100644 --- a/test/mocks.mocks.dart +++ b/test/mocks.mocks.dart @@ -3,22 +3,23 @@ // Do not manually edit this file. // ignore_for_file: no_leading_underscores_for_library_prefixes -import 'dart:async' as _i10; +import 'dart:async' as _i9; -import 'package:async/async.dart' as _i13; +import 'package:async/async.dart' as _i12; import 'package:mockito/mockito.dart' as _i1; -import 'package:mockito/src/dummies.dart' as _i9; -import 'package:phoenix_socket/src/channel.dart' as _i5; -import 'package:phoenix_socket/src/events.dart' as _i11; -import 'package:phoenix_socket/src/exceptions.dart' as _i12; +import 'package:mockito/src/dummies.dart' as _i8; +import 'package:phoenix_socket/phoenix_socket.dart' as _i5; +import 'package:phoenix_socket/src/events.dart' as _i10; +import 'package:phoenix_socket/src/exceptions.dart' as _i11; import 'package:phoenix_socket/src/message.dart' as _i3; -import 'package:phoenix_socket/src/message_serializer.dart' as _i8; import 'package:phoenix_socket/src/push.dart' as _i4; import 'package:phoenix_socket/src/socket.dart' as _i2; -import 'package:phoenix_socket/src/socket_options.dart' as _i14; +import 'package:phoenix_socket/src/socket_connection.dart' as _i14; import 'package:stream_channel/stream_channel.dart' as _i7; import 'package:web_socket_channel/web_socket_channel.dart' as _i6; +import 'mocks.dart' as _i13; + // ignore_for_file: type=lint // ignore_for_file: avoid_redundant_argument_values // ignore_for_file: avoid_setters_without_getters @@ -72,19 +73,9 @@ class _FakeDuration_3 extends _i1.SmartFake implements Duration { ); } -class _FakeUri_4 extends _i1.SmartFake implements Uri { - _FakeUri_4( - Object parent, - Invocation parentInvocation, - ) : super( - parent, - parentInvocation, - ); -} - -class _FakePhoenixChannel_5 extends _i1.SmartFake +class _FakePhoenixChannel_4 extends _i1.SmartFake implements _i5.PhoenixChannel { - _FakePhoenixChannel_5( + _FakePhoenixChannel_4( Object parent, Invocation parentInvocation, ) : super( @@ -93,8 +84,8 @@ class _FakePhoenixChannel_5 extends _i1.SmartFake ); } -class _FakeWebSocketSink_6 extends _i1.SmartFake implements _i6.WebSocketSink { - _FakeWebSocketSink_6( +class _FakeWebSocketSink_5 extends _i1.SmartFake implements _i6.WebSocketSink { + _FakeWebSocketSink_5( Object parent, Invocation parentInvocation, ) : super( @@ -103,9 +94,9 @@ class _FakeWebSocketSink_6 extends _i1.SmartFake implements _i6.WebSocketSink { ); } -class _FakeStreamChannel_7 extends _i1.SmartFake +class _FakeStreamChannel_6 extends _i1.SmartFake implements _i7.StreamChannel { - _FakeStreamChannel_7( + _FakeStreamChannel_6( Object parent, Invocation parentInvocation, ) : super( @@ -114,9 +105,9 @@ class _FakeStreamChannel_7 extends _i1.SmartFake ); } -class _FakeMessageSerializer_8 extends _i1.SmartFake - implements _i8.MessageSerializer { - _FakeMessageSerializer_8( +class _FakeMessageSerializer_7 extends _i1.SmartFake + implements _i5.MessageSerializer { + _FakeMessageSerializer_7( Object parent, Invocation parentInvocation, ) : super( @@ -152,11 +143,11 @@ class MockPhoenixChannel extends _i1.Mock implements _i5.PhoenixChannel { @override String get topic => (super.noSuchMethod( Invocation.getter(#topic), - returnValue: _i9.dummyValue( + returnValue: _i8.dummyValue( this, Invocation.getter(#topic), ), - returnValueForMissingStub: _i9.dummyValue( + returnValueForMissingStub: _i8.dummyValue( this, Invocation.getter(#topic), ), @@ -170,20 +161,20 @@ class MockPhoenixChannel extends _i1.Mock implements _i5.PhoenixChannel { ) as List<_i4.Push>); @override - _i10.Stream<_i3.Message> get messages => (super.noSuchMethod( + _i9.Stream<_i3.Message> get messages => (super.noSuchMethod( Invocation.getter(#messages), - returnValue: _i10.Stream<_i3.Message>.empty(), - returnValueForMissingStub: _i10.Stream<_i3.Message>.empty(), - ) as _i10.Stream<_i3.Message>); + returnValue: _i9.Stream<_i3.Message>.empty(), + returnValueForMissingStub: _i9.Stream<_i3.Message>.empty(), + ) as _i9.Stream<_i3.Message>); @override String get joinRef => (super.noSuchMethod( Invocation.getter(#joinRef), - returnValue: _i9.dummyValue( + returnValue: _i8.dummyValue( this, Invocation.getter(#joinRef), ), - returnValueForMissingStub: _i9.dummyValue( + returnValueForMissingStub: _i8.dummyValue( this, Invocation.getter(#joinRef), ), @@ -206,11 +197,11 @@ class MockPhoenixChannel extends _i1.Mock implements _i5.PhoenixChannel { @override String get loggerName => (super.noSuchMethod( Invocation.getter(#loggerName), - returnValue: _i9.dummyValue( + returnValue: _i8.dummyValue( this, Invocation.getter(#loggerName), ), - returnValueForMissingStub: _i9.dummyValue( + returnValueForMissingStub: _i8.dummyValue( this, Invocation.getter(#loggerName), ), @@ -219,39 +210,38 @@ class MockPhoenixChannel extends _i1.Mock implements _i5.PhoenixChannel { @override String get reference => (super.noSuchMethod( Invocation.getter(#reference), - returnValue: _i9.dummyValue( + returnValue: _i8.dummyValue( this, Invocation.getter(#reference), ), - returnValueForMissingStub: _i9.dummyValue( + returnValueForMissingStub: _i8.dummyValue( this, Invocation.getter(#reference), ), ) as String); @override - _i10.Future<_i3.Message> onPushReply(_i11.PhoenixChannelEvent? replyEvent) => + _i9.Future<_i3.Message> onPushReply(_i10.PhoenixChannelEvent? replyEvent) => (super.noSuchMethod( Invocation.method( #onPushReply, [replyEvent], ), - returnValue: _i10.Future<_i3.Message>.value(_FakeMessage_1( + returnValue: _i9.Future<_i3.Message>.value(_FakeMessage_1( this, Invocation.method( #onPushReply, [replyEvent], ), )), - returnValueForMissingStub: - _i10.Future<_i3.Message>.value(_FakeMessage_1( + returnValueForMissingStub: _i9.Future<_i3.Message>.value(_FakeMessage_1( this, Invocation.method( #onPushReply, [replyEvent], ), )), - ) as _i10.Future<_i3.Message>); + ) as _i9.Future<_i3.Message>); @override void close() => super.noSuchMethod( @@ -272,7 +262,7 @@ class MockPhoenixChannel extends _i1.Mock implements _i5.PhoenixChannel { ); @override - void triggerError(_i12.PhoenixException? error) => super.noSuchMethod( + void triggerError(_i11.PhoenixException? error) => super.noSuchMethod( Invocation.method( #triggerError, [error], @@ -368,7 +358,7 @@ class MockPhoenixChannel extends _i1.Mock implements _i5.PhoenixChannel { @override _i4.Push pushEvent( - _i11.PhoenixChannelEvent? event, + _i10.PhoenixChannelEvent? event, Map? payload, [ Duration? newTimeout, ]) => @@ -428,117 +418,91 @@ class MockPhoenixSocket extends _i1.Mock implements _i2.PhoenixSocket { ); @override - _i10.Stream<_i11.PhoenixSocketOpenEvent> get openStream => - (super.noSuchMethod( + Duration get defaultTimeout => (super.noSuchMethod( + Invocation.getter(#defaultTimeout), + returnValue: _FakeDuration_3( + this, + Invocation.getter(#defaultTimeout), + ), + returnValueForMissingStub: _FakeDuration_3( + this, + Invocation.getter(#defaultTimeout), + ), + ) as Duration); + + @override + _i9.Stream<_i10.PhoenixSocketOpenEvent> get openStream => (super.noSuchMethod( Invocation.getter(#openStream), - returnValue: _i10.Stream<_i11.PhoenixSocketOpenEvent>.empty(), + returnValue: _i9.Stream<_i10.PhoenixSocketOpenEvent>.empty(), returnValueForMissingStub: - _i10.Stream<_i11.PhoenixSocketOpenEvent>.empty(), - ) as _i10.Stream<_i11.PhoenixSocketOpenEvent>); + _i9.Stream<_i10.PhoenixSocketOpenEvent>.empty(), + ) as _i9.Stream<_i10.PhoenixSocketOpenEvent>); @override - _i10.Stream<_i11.PhoenixSocketCloseEvent> get closeStream => + _i9.Stream<_i10.PhoenixSocketCloseEvent> get closeStream => (super.noSuchMethod( Invocation.getter(#closeStream), - returnValue: _i10.Stream<_i11.PhoenixSocketCloseEvent>.empty(), + returnValue: _i9.Stream<_i10.PhoenixSocketCloseEvent>.empty(), returnValueForMissingStub: - _i10.Stream<_i11.PhoenixSocketCloseEvent>.empty(), - ) as _i10.Stream<_i11.PhoenixSocketCloseEvent>); + _i9.Stream<_i10.PhoenixSocketCloseEvent>.empty(), + ) as _i9.Stream<_i10.PhoenixSocketCloseEvent>); @override - _i10.Stream<_i11.PhoenixSocketErrorEvent> get errorStream => + _i9.Stream<_i10.PhoenixSocketErrorEvent> get errorStream => (super.noSuchMethod( Invocation.getter(#errorStream), - returnValue: _i10.Stream<_i11.PhoenixSocketErrorEvent>.empty(), + returnValue: _i9.Stream<_i10.PhoenixSocketErrorEvent>.empty(), returnValueForMissingStub: - _i10.Stream<_i11.PhoenixSocketErrorEvent>.empty(), - ) as _i10.Stream<_i11.PhoenixSocketErrorEvent>); + _i9.Stream<_i10.PhoenixSocketErrorEvent>.empty(), + ) as _i9.Stream<_i10.PhoenixSocketErrorEvent>); @override - _i10.Stream<_i3.Message> get messageStream => (super.noSuchMethod( + _i9.Stream<_i3.Message> get messageStream => (super.noSuchMethod( Invocation.getter(#messageStream), - returnValue: _i10.Stream<_i3.Message>.empty(), - returnValueForMissingStub: _i10.Stream<_i3.Message>.empty(), - ) as _i10.Stream<_i3.Message>); + returnValue: _i9.Stream<_i3.Message>.empty(), + returnValueForMissingStub: _i9.Stream<_i3.Message>.empty(), + ) as _i9.Stream<_i3.Message>); @override String get nextRef => (super.noSuchMethod( Invocation.getter(#nextRef), - returnValue: _i9.dummyValue( + returnValue: _i8.dummyValue( this, Invocation.getter(#nextRef), ), - returnValueForMissingStub: _i9.dummyValue( + returnValueForMissingStub: _i8.dummyValue( this, Invocation.getter(#nextRef), ), ) as String); @override - Duration get defaultTimeout => (super.noSuchMethod( - Invocation.getter(#defaultTimeout), - returnValue: _FakeDuration_3( - this, - Invocation.getter(#defaultTimeout), - ), - returnValueForMissingStub: _FakeDuration_3( - this, - Invocation.getter(#defaultTimeout), - ), - ) as Duration); - - @override - String get endpoint => (super.noSuchMethod( - Invocation.getter(#endpoint), - returnValue: _i9.dummyValue( - this, - Invocation.getter(#endpoint), - ), - returnValueForMissingStub: _i9.dummyValue( - this, - Invocation.getter(#endpoint), - ), - ) as String); - - @override - Uri get mountPoint => (super.noSuchMethod( - Invocation.getter(#mountPoint), - returnValue: _FakeUri_4( - this, - Invocation.getter(#mountPoint), - ), - returnValueForMissingStub: _FakeUri_4( - this, - Invocation.getter(#mountPoint), - ), - ) as Uri); - - @override - bool get isConnected => (super.noSuchMethod( - Invocation.getter(#isConnected), + bool get isOpen => (super.noSuchMethod( + Invocation.getter(#isOpen), returnValue: false, returnValueForMissingStub: false, ) as bool); @override - _i10.Stream<_i3.Message> streamForTopic(String? topic) => (super.noSuchMethod( + _i9.Stream<_i3.Message> streamForTopic(String? topic) => (super.noSuchMethod( Invocation.method( #streamForTopic, [topic], ), - returnValue: _i10.Stream<_i3.Message>.empty(), - returnValueForMissingStub: _i10.Stream<_i3.Message>.empty(), - ) as _i10.Stream<_i3.Message>); + returnValue: _i9.Stream<_i3.Message>.empty(), + returnValueForMissingStub: _i9.Stream<_i3.Message>.empty(), + ) as _i9.Stream<_i3.Message>); @override - _i10.Future<_i2.PhoenixSocket?> connect() => (super.noSuchMethod( + _i9.Future connect({bool? immediately = false}) => (super.noSuchMethod( Invocation.method( #connect, [], + {#immediately: immediately}, ), - returnValue: _i10.Future<_i2.PhoenixSocket?>.value(), - returnValueForMissingStub: _i10.Future<_i2.PhoenixSocket?>.value(), - ) as _i10.Future<_i2.PhoenixSocket?>); + returnValue: _i9.Future.value(), + returnValueForMissingStub: _i9.Future.value(), + ) as _i9.Future); @override void close([ @@ -568,52 +532,27 @@ class MockPhoenixSocket extends _i1.Mock implements _i2.PhoenixSocket { ); @override - _i10.Future<_i3.Message> waitForMessage(_i3.Message? message) => - (super.noSuchMethod( - Invocation.method( - #waitForMessage, - [message], - ), - returnValue: _i10.Future<_i3.Message>.value(_FakeMessage_1( - this, - Invocation.method( - #waitForMessage, - [message], - ), - )), - returnValueForMissingStub: - _i10.Future<_i3.Message>.value(_FakeMessage_1( - this, - Invocation.method( - #waitForMessage, - [message], - ), - )), - ) as _i10.Future<_i3.Message>); - - @override - _i10.Future<_i3.Message> sendMessage(_i3.Message? message) => + _i9.Future<_i3.Message> sendMessage(_i3.Message? message) => (super.noSuchMethod( Invocation.method( #sendMessage, [message], ), - returnValue: _i10.Future<_i3.Message>.value(_FakeMessage_1( + returnValue: _i9.Future<_i3.Message>.value(_FakeMessage_1( this, Invocation.method( #sendMessage, [message], ), )), - returnValueForMissingStub: - _i10.Future<_i3.Message>.value(_FakeMessage_1( + returnValueForMissingStub: _i9.Future<_i3.Message>.value(_FakeMessage_1( this, Invocation.method( #sendMessage, [message], ), )), - ) as _i10.Future<_i3.Message>); + ) as _i9.Future<_i3.Message>); @override _i5.PhoenixChannel addChannel({ @@ -631,7 +570,7 @@ class MockPhoenixSocket extends _i1.Mock implements _i2.PhoenixSocket { #timeout: timeout, }, ), - returnValue: _FakePhoenixChannel_5( + returnValue: _FakePhoenixChannel_4( this, Invocation.method( #addChannel, @@ -643,7 +582,7 @@ class MockPhoenixSocket extends _i1.Mock implements _i2.PhoenixSocket { }, ), ), - returnValueForMissingStub: _FakePhoenixChannel_5( + returnValueForMissingStub: _FakePhoenixChannel_4( this, Invocation.method( #addChannel, @@ -665,6 +604,15 @@ class MockPhoenixSocket extends _i1.Mock implements _i2.PhoenixSocket { ), returnValueForMissingStub: null, ); + + @override + void onSocketDataCallback(String? message) => super.noSuchMethod( + Invocation.method( + #onSocketDataCallback, + [message], + ), + returnValueForMissingStub: null, + ); } /// A class which mocks [WebSocketChannel]. @@ -672,27 +620,27 @@ class MockPhoenixSocket extends _i1.Mock implements _i2.PhoenixSocket { /// See the documentation for Mockito's code generation for more information. class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { @override - _i10.Future get ready => (super.noSuchMethod( + _i9.Future get ready => (super.noSuchMethod( Invocation.getter(#ready), - returnValue: _i10.Future.value(), - returnValueForMissingStub: _i10.Future.value(), - ) as _i10.Future); + returnValue: _i9.Future.value(), + returnValueForMissingStub: _i9.Future.value(), + ) as _i9.Future); @override - _i10.Stream get stream => (super.noSuchMethod( + _i9.Stream get stream => (super.noSuchMethod( Invocation.getter(#stream), - returnValue: _i10.Stream.empty(), - returnValueForMissingStub: _i10.Stream.empty(), - ) as _i10.Stream); + returnValue: _i9.Stream.empty(), + returnValueForMissingStub: _i9.Stream.empty(), + ) as _i9.Stream); @override _i6.WebSocketSink get sink => (super.noSuchMethod( Invocation.getter(#sink), - returnValue: _FakeWebSocketSink_6( + returnValue: _FakeWebSocketSink_5( this, Invocation.getter(#sink), ), - returnValueForMissingStub: _FakeWebSocketSink_6( + returnValueForMissingStub: _FakeWebSocketSink_5( this, Invocation.getter(#sink), ), @@ -715,14 +663,14 @@ class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { #transform, [transformer], ), - returnValue: _FakeStreamChannel_7( + returnValue: _FakeStreamChannel_6( this, Invocation.method( #transform, [transformer], ), ), - returnValueForMissingStub: _FakeStreamChannel_7( + returnValueForMissingStub: _FakeStreamChannel_6( this, Invocation.method( #transform, @@ -733,20 +681,20 @@ class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { @override _i7.StreamChannel transformStream( - _i10.StreamTransformer? transformer) => + _i9.StreamTransformer? transformer) => (super.noSuchMethod( Invocation.method( #transformStream, [transformer], ), - returnValue: _FakeStreamChannel_7( + returnValue: _FakeStreamChannel_6( this, Invocation.method( #transformStream, [transformer], ), ), - returnValueForMissingStub: _FakeStreamChannel_7( + returnValueForMissingStub: _FakeStreamChannel_6( this, Invocation.method( #transformStream, @@ -757,20 +705,20 @@ class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { @override _i7.StreamChannel transformSink( - _i13.StreamSinkTransformer? transformer) => + _i12.StreamSinkTransformer? transformer) => (super.noSuchMethod( Invocation.method( #transformSink, [transformer], ), - returnValue: _FakeStreamChannel_7( + returnValue: _FakeStreamChannel_6( this, Invocation.method( #transformSink, [transformer], ), ), - returnValueForMissingStub: _FakeStreamChannel_7( + returnValueForMissingStub: _FakeStreamChannel_6( this, Invocation.method( #transformSink, @@ -781,20 +729,20 @@ class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { @override _i7.StreamChannel changeStream( - _i10.Stream Function(_i10.Stream)? change) => + _i9.Stream Function(_i9.Stream)? change) => (super.noSuchMethod( Invocation.method( #changeStream, [change], ), - returnValue: _FakeStreamChannel_7( + returnValue: _FakeStreamChannel_6( this, Invocation.method( #changeStream, [change], ), ), - returnValueForMissingStub: _FakeStreamChannel_7( + returnValueForMissingStub: _FakeStreamChannel_6( this, Invocation.method( #changeStream, @@ -805,21 +753,20 @@ class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { @override _i7.StreamChannel changeSink( - _i10.StreamSink Function(_i10.StreamSink)? - change) => + _i9.StreamSink Function(_i9.StreamSink)? change) => (super.noSuchMethod( Invocation.method( #changeSink, [change], ), - returnValue: _FakeStreamChannel_7( + returnValue: _FakeStreamChannel_6( this, Invocation.method( #changeSink, [change], ), ), - returnValueForMissingStub: _FakeStreamChannel_7( + returnValueForMissingStub: _FakeStreamChannel_6( this, Invocation.method( #changeSink, @@ -834,14 +781,14 @@ class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { #cast, [], ), - returnValue: _FakeStreamChannel_7( + returnValue: _FakeStreamChannel_6( this, Invocation.method( #cast, [], ), ), - returnValueForMissingStub: _FakeStreamChannel_7( + returnValueForMissingStub: _FakeStreamChannel_6( this, Invocation.method( #cast, @@ -856,14 +803,14 @@ class MockWebSocketChannel extends _i1.Mock implements _i6.WebSocketChannel { /// See the documentation for Mockito's code generation for more information. class MockWebSocketSink extends _i1.Mock implements _i6.WebSocketSink { @override - _i10.Future get done => (super.noSuchMethod( + _i9.Future get done => (super.noSuchMethod( Invocation.getter(#done), - returnValue: _i10.Future.value(), - returnValueForMissingStub: _i10.Future.value(), - ) as _i10.Future); + returnValue: _i9.Future.value(), + returnValueForMissingStub: _i9.Future.value(), + ) as _i9.Future); @override - _i10.Future close([ + _i9.Future close([ int? closeCode, String? closeReason, ]) => @@ -875,9 +822,9 @@ class MockWebSocketSink extends _i1.Mock implements _i6.WebSocketSink { closeReason, ], ), - returnValue: _i10.Future.value(), - returnValueForMissingStub: _i10.Future.value(), - ) as _i10.Future); + returnValue: _i9.Future.value(), + returnValueForMissingStub: _i9.Future.value(), + ) as _i9.Future); @override void add(dynamic data) => super.noSuchMethod( @@ -905,34 +852,34 @@ class MockWebSocketSink extends _i1.Mock implements _i6.WebSocketSink { ); @override - _i10.Future addStream(_i10.Stream? stream) => + _i9.Future addStream(_i9.Stream? stream) => (super.noSuchMethod( Invocation.method( #addStream, [stream], ), - returnValue: _i10.Future.value(), - returnValueForMissingStub: _i10.Future.value(), - ) as _i10.Future); + returnValue: _i9.Future.value(), + returnValueForMissingStub: _i9.Future.value(), + ) as _i9.Future); } /// A class which mocks [PhoenixSocketOptions]. /// /// See the documentation for Mockito's code generation for more information. class MockPhoenixSocketOptions extends _i1.Mock - implements _i14.PhoenixSocketOptions { + implements _i5.PhoenixSocketOptions { @override - _i8.MessageSerializer get serializer => (super.noSuchMethod( + _i5.MessageSerializer get serializer => (super.noSuchMethod( Invocation.getter(#serializer), - returnValue: _FakeMessageSerializer_8( + returnValue: _FakeMessageSerializer_7( this, Invocation.getter(#serializer), ), - returnValueForMissingStub: _FakeMessageSerializer_8( + returnValueForMissingStub: _FakeMessageSerializer_7( this, Invocation.getter(#serializer), ), - ) as _i8.MessageSerializer); + ) as _i5.MessageSerializer); @override List get reconnectDelays => (super.noSuchMethod( @@ -968,13 +915,76 @@ class MockPhoenixSocketOptions extends _i1.Mock ) as Duration); @override - _i10.Future> getParams() => (super.noSuchMethod( + Duration get heartbeatTimeout => (super.noSuchMethod( + Invocation.getter(#heartbeatTimeout), + returnValue: _FakeDuration_3( + this, + Invocation.getter(#heartbeatTimeout), + ), + returnValueForMissingStub: _FakeDuration_3( + this, + Invocation.getter(#heartbeatTimeout), + ), + ) as Duration); + + @override + _i9.Future> getParams() => (super.noSuchMethod( Invocation.method( #getParams, [], ), - returnValue: _i10.Future>.value({}), + returnValue: _i9.Future>.value({}), returnValueForMissingStub: - _i10.Future>.value({}), - ) as _i10.Future>); + _i9.Future>.value({}), + ) as _i9.Future>); +} + +/// A class which mocks [OnMessage_MockBase]. +/// +/// See the documentation for Mockito's code generation for more information. +class MockOnMessage extends _i1.Mock implements _i13.OnMessage_MockBase { + @override + void call(String? message) => super.noSuchMethod( + Invocation.method( + #call, + [message], + ), + returnValueForMissingStub: null, + ); +} + +/// A class which mocks [OnError_MockBase]. +/// +/// See the documentation for Mockito's code generation for more information. +class MockOnError extends _i1.Mock implements _i13.OnError_MockBase { + @override + void call( + Object? error, [ + StackTrace? stackTrace, + ]) => + super.noSuchMethod( + Invocation.method( + #call, + [ + error, + stackTrace, + ], + ), + returnValueForMissingStub: null, + ); +} + +/// A class which mocks [OnStateChange_MockBase]. +/// +/// See the documentation for Mockito's code generation for more information. +class MockOnStateChange extends _i1.Mock + implements _i13.OnStateChange_MockBase { + @override + void call(_i14.WebSocketConnectionState? state) => super.noSuchMethod( + Invocation.method( + #call, + [state], + ), + returnValueForMissingStub: null, + ); } diff --git a/test/socket_integration_test.dart b/test/socket_integration_test.dart index 7bf3ff1..bcb127c 100644 --- a/test/socket_integration_test.dart +++ b/test/socket_integration_test.dart @@ -3,15 +3,25 @@ import 'dart:async'; import 'package:phoenix_socket/phoenix_socket.dart'; import 'package:test/test.dart'; +import 'helpers/proxy.dart'; + void main() { const addr = 'ws://localhost:4001/socket/websocket'; group('PhoenixSocket', () { + setUp(() async { + await prepareProxy(); + }); + + tearDown(() async { + await destroyProxy(); + }); + test('can connect to a running Phoenix server', () async { final socket = PhoenixSocket(addr); await socket.connect().then((_) { - expect(socket.isConnected, isTrue); + expect(socket.isOpen, isTrue); }); }); @@ -32,7 +42,7 @@ void main() { ); await socket.connect().then((_) { - expect(socket.isConnected, isTrue); + expect(socket.isOpen, isTrue); }); }); @@ -121,5 +131,26 @@ void main() { expect(errCount, 3); }); + + test('heartbeat failure does not reconnect after disposal', () async { + final socket = PhoenixSocket( + addr, + socketOptions: PhoenixSocketOptions( + heartbeat: Duration(milliseconds: 1), + ), + ); + + await socket.connect(); + + await socket.openStream.first; + + // Prevent next heartbeat from getting a reply. + await haltProxy(); + + socket.dispose(); + + await Future.delayed(Duration(milliseconds: 1)); + // The test will fail with unhandled exception if reconnection attempt is made + }); }); } diff --git a/test/socket_test.dart b/test/socket_test.dart index c1a0ab4..d4b0724 100644 --- a/test/socket_test.dart +++ b/test/socket_test.dart @@ -12,84 +12,107 @@ import 'mocks.dart'; void main() { test('socket connect retries on unexpected error', () async { - final sink = MockWebSocketSink(); - final websocket = MockWebSocketChannel(); - final phoenixSocket = PhoenixSocket( - 'endpoint', - socketOptions: PhoenixSocketOptions(params: {'token': 'token'}), - webSocketChannelFactory: (_) => websocket, - ); int invocations = 0; final exceptions = ['E', PhoenixException()]; + final sinkCompleters = [ + Completer(), + Completer(), + Completer(), + ]; + final streamControllers = [ + StreamController(), + StreamController(), + StreamController(), + ]; - when(websocket.sink).thenReturn(sink); - - when(websocket.stream).thenAnswer((_) { - if (invocations < 2) { - // Return a never stream to keep the socket open on the first two - // attempts. If it is an empty Stream the socket will close immediately. - return NeverStream(); - } else { - // Return a heartbeat on the third attempt which allows the socket - // to connect. - final controller = StreamController() - ..add(jsonEncode(Message.heartbeat('$invocations').encode())); - return controller.stream; - } - }); - - // Throw an error adding data to the sink on the first two attempts. - // On the third attempt, the sink add should work as expected. - when(sink.add(any)).thenAnswer((_) { - if (invocations < 2) { - throw exceptions[invocations++]; - } - }); - - // Connect to the socket - await phoenixSocket.connect(); - expect(phoenixSocket.isConnected, isTrue); - - // Expect the first two unexpected failures to be retried - verify(sink.add(any)).called(3); + // This code might throw asynchronous errors, prevent the test from failing + // if these are expected ones (as defined in `exceptions`). + await runZonedGuarded( + () async { + final websocket = MockWebSocketChannel(); + final phoenixSocket = PhoenixSocket( + 'endpoint', + socketOptions: PhoenixSocketOptions( + params: {'token': 'token'}, + reconnectDelays: [Duration.zero], + ), + webSocketChannelFactory: (_) => websocket, + ); + + when(websocket.sink).thenAnswer((_) { + final sink = MockWebSocketSink(); + final doneCompleter = sinkCompleters[invocations]; + when(sink.done).thenAnswer((_) => doneCompleter.future); + when(sink.add(any)).thenAnswer((_) { + // Throw an error adding data to the sink on the first two attempts. + // On the third attempt, the sink add should work as expected. + if (invocations < 2) { + doneCompleter.complete(); + streamControllers[invocations].close(); + throw exceptions[invocations++]; + } + streamControllers[invocations] + .add(jsonEncode(Message.heartbeat('$invocations').encode())); + }); + return sink; + }); + when(websocket.ready).thenAnswer((_) => Future.value()); + + when(websocket.stream) + .thenAnswer((_) => streamControllers[invocations].stream); + + // Connect to the socket + await phoenixSocket.connect(); + + expect(phoenixSocket.isOpen, isTrue); + expect(invocations, 2); + }, + (error, stackTrace) { + if (!exceptions.contains(error)) { + throw Exception('Unexepcted exception: $error'); + } + }, + ); }); test('socket connect does not create new socket if one is already connected', () async { final optionsCompleter = Completer>(); - final mockPhoenixSocketOptions = MockPhoenixSocketOptions(); - when(mockPhoenixSocketOptions.getParams()) - .thenAnswer((_) => optionsCompleter.future); - when(mockPhoenixSocketOptions.heartbeat).thenReturn(Duration(days: 1)); final sentRefs = []; - when(mockPhoenixSocketOptions.serializer).thenReturn(MessageSerializer( - encoder: (object) { - if (object is List) { - final message = Message.fromJson(object); - sentRefs.add(message.ref!); - return message.ref!; - } - return 'ignored'; - }, - decoder: (ref) => Message.heartbeat(ref).encode(), - )); + + PhoenixSocketOptions socketOptions = PhoenixSocketOptions( + dynamicParams: () => optionsCompleter.future, + heartbeat: Duration(days: 1), + reconnectDelays: [Duration.zero], + serializer: MessageSerializer( + encoder: (object) { + if (object is List) { + final message = Message.fromJson(object); + sentRefs.add(message.ref!); + return message.ref!; + } + return 'ignored'; + }, + decoder: (ref) => Message.heartbeat(ref).encode(), + )); int factoryCalls = 0; WebSocketChannel stubWebSocketChannelFactory(Uri uri) { ++factoryCalls; final mockWebSocketChannel = MockWebSocketChannel(); + final mockWebSocketSink = MockWebSocketSink(); when(mockWebSocketChannel.stream).thenAnswer((_) => NeverStream()); - when(mockWebSocketChannel.ready) - .thenAnswer((_) => Future.sync(() => null)); - when(mockWebSocketChannel.sink).thenReturn(MockWebSocketSink()); + when(mockWebSocketChannel.ready).thenAnswer((_) => Future.value()); + when(mockWebSocketChannel.sink).thenReturn(mockWebSocketSink); + when(mockWebSocketSink.done).thenAnswer((_) => Completer().future); return mockWebSocketChannel; } final phoenixSocket = PhoenixSocket( 'ws://endpoint', webSocketChannelFactory: stubWebSocketChannelFactory, - socketOptions: mockPhoenixSocketOptions, + socketOptions: socketOptions, ); // Connect to the socket @@ -103,6 +126,9 @@ void main() { optionsCompleter.complete({'token': 'fakeUserToken'}); + // First skip options retrieval + await Future.delayed(Duration.zero); + // Then skip initialization delay await Future.delayed(Duration.zero); for (final ref in sentRefs) { @@ -135,28 +161,30 @@ void main() { (async) { phoenixSocket.connect(); + async.elapse(Duration.zero); + verify(mockPhoenixSocketOptions.getParams()).called(1); - expect(phoenixSocket.isConnected, isFalse); + expect(phoenixSocket.isOpen, isFalse); // first retry after ~10 seconds async.elapse(const Duration(seconds: 11)); verify(mockPhoenixSocketOptions.getParams()).called(1); - expect(phoenixSocket.isConnected, isFalse); + expect(phoenixSocket.isOpen, isFalse); // second retry after ~20 seconds async.elapse(const Duration(seconds: 21)); verify(mockPhoenixSocketOptions.getParams()).called(1); - expect(phoenixSocket.isConnected, isFalse); + expect(phoenixSocket.isOpen, isFalse); // third retry after ~30 seconds async.elapse(const Duration(seconds: 31)); verify(mockPhoenixSocketOptions.getParams()).called(1); - expect(phoenixSocket.isConnected, isFalse); + expect(phoenixSocket.isOpen, isFalse); // fourth retry after ~30 seconds (the last reconnect delay is repeated from now on) async.elapse(const Duration(seconds: 31)); verify(mockPhoenixSocketOptions.getParams()).called(1); - expect(phoenixSocket.isConnected, isFalse); + expect(phoenixSocket.isOpen, isFalse); }, ); }, diff --git a/test/src/socket_connection_test.dart b/test/src/socket_connection_test.dart new file mode 100644 index 0000000..0ca65db --- /dev/null +++ b/test/src/socket_connection_test.dart @@ -0,0 +1,407 @@ +import 'dart:async'; + +import 'package:mockito/mockito.dart'; +import 'package:phoenix_socket/src/socket_connection.dart'; +import 'package:test/test.dart'; + +import '../mocks.mocks.dart'; + +typedef MockWebSocketChannelConfig = ({ + MockWebSocketChannel channel, + StreamController streamController, + Completer sinkDoneCompleter, + Completer readyCompleter, +}); + +void main() { + group('$SocketConnectionManager', () { + late MockOnMessage mockOnMessage; + late MockOnError mockOnError; + late MockOnStateChange mockOnStateChange; + + setUp(() { + mockOnMessage = MockOnMessage(); + mockOnError = MockOnError(); + mockOnStateChange = MockOnStateChange(); + }); + + MockWebSocketChannelConfig setUpChannelMock() { + final mockSink = MockWebSocketSink(); + final streamController = StreamController(); + final readyCompleter = Completer(); + final doneCompleter = Completer(); + final mockChannel = MockWebSocketChannel(); + + when(mockChannel.ready).thenAnswer((_) => readyCompleter.future); + when(mockChannel.sink).thenReturn(mockSink); + when(mockChannel.stream).thenAnswer((_) => streamController.stream); + when(mockSink.done).thenAnswer((_) => doneCompleter.future); + + return ( + channel: mockChannel, + streamController: streamController, + sinkDoneCompleter: doneCompleter, + readyCompleter: readyCompleter, + ); + } + + test('start() - happy path', () async { + MockWebSocketChannelConfig mockChannelConfig = setUpChannelMock(); + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () => Future.value(mockChannelConfig.channel), + reconnectDelays: [Duration.zero], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + // No interactions in constructor. + verifyZeroInteractions(mockChannelConfig.channel); + + connectionManager.start(); + await Future.delayed(Duration.zero); + + verify(mockChannelConfig.channel.ready).called(1); + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnStateChange); + + mockChannelConfig.readyCompleter.complete(); + await Future.delayed(Duration.zero); + + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnStateChange); + + // Clear expectations on these calls before proceeding. + verify(mockChannelConfig.channel.sink); + verify(mockChannelConfig.channel.stream); + + // Calling start again will not establish a new connection. + connectionManager.start(); + await Future.delayed(Duration.zero); + + verifyNoMoreInteractions(mockChannelConfig.channel); + verifyNoMoreInteractions(mockOnStateChange); + verifyZeroInteractions(mockOnError); + verifyZeroInteractions(mockOnMessage); + }); + + test('start() - reconnection after failure', () async { + int invocationCount = 0; + final channelMocks = [ + setUpChannelMock(), + setUpChannelMock(), + ]; + + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () => Future.value(channelMocks[invocationCount++].channel), + reconnectDelays: [Duration.zero], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + connectionManager.start(); + + expect(invocationCount, 0); + await Future.delayed(Duration.zero); + expect(invocationCount, 1); + + verify(channelMocks[0].channel.ready).called(1); + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnStateChange); + verifyZeroInteractions(mockOnError); + verifyZeroInteractions(mockOnMessage); + + final cause = Object(); + final stackTrace = StackTrace.current; + channelMocks[0].readyCompleter.completeError(cause, stackTrace); + await Future.delayed(Duration.zero); + + verify( + mockOnError( + argThat(isA() + .having((exception) => exception.cause, 'cause', cause) + .having( + (exception) => exception.stackTrace, + 'stackTrace', + stackTrace, + )), + any), + ).called(1); + verifyNever(channelMocks[0].channel.sink); + verifyNever(channelMocks[0].channel.stream); + verifyNoMoreInteractions(mockOnError); + verifyNoMoreInteractions(mockOnStateChange); + + await Future.delayed(Duration.zero); + expect(invocationCount, 2); + + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnStateChange); + + channelMocks[1].readyCompleter.complete(); + await Future.delayed(Duration.zero); + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnError); + verifyNoMoreInteractions(mockOnStateChange); + verifyZeroInteractions(mockOnMessage); + }); + + test('start() - multiple attempts - happy path', () async { + var invocationCount = 0; + final channelMocks = setUpChannelMock(); + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () { + invocationCount++; + return Future.value(channelMocks.channel); + }, + reconnectDelays: [Duration.zero], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + // .start() called twice synchronously. + connectionManager.start(); + connectionManager.start(); + expect(invocationCount, 0); + await Future.delayed(Duration.zero); + expect(invocationCount, 1); + + verify(channelMocks.channel.ready).called(1); + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnStateChange); + + // .start() called during initialization. + connectionManager.start(); + await Future.delayed(Duration.zero); + expect(invocationCount, 1); + + verifyNoMoreInteractions(mockOnStateChange); + verifyNever(channelMocks.channel.ready); + + channelMocks.readyCompleter.complete(); + await Future.delayed(Duration.zero); + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnStateChange); + + verifyZeroInteractions(mockOnError); + verifyZeroInteractions(mockOnMessage); + }); + + test('start() - multiple attempts - connection problems', () async { + var invocationCount = 0; + final channelMocks = [ + setUpChannelMock(), + setUpChannelMock(), + ]; + + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () => Future.value(channelMocks[invocationCount++].channel), + reconnectDelays: [Duration.zero], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + // .start() called twice synchronously. + connectionManager.start(); + connectionManager.start(); + expect(invocationCount, 0); + await Future.delayed(Duration.zero); + expect(invocationCount, 1); + + verify(channelMocks[0].channel.ready).called(1); + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + verifyNoMoreInteractions(mockOnStateChange); + + // .start() called during initialization. + connectionManager.start(); + await Future.delayed(Duration.zero); + expect(invocationCount, 1); + verifyNoMoreInteractions(mockOnStateChange); + verifyNever(channelMocks[0].channel.ready); + + channelMocks[0].readyCompleter.completeError(Object()); + await Future.delayed(Duration.zero); + verifyNoMoreInteractions(mockOnStateChange); + verify(mockOnError.call(any, any)).called(1); + verifyNoMoreInteractions(mockOnError); + + expect(invocationCount, 1); + channelMocks[1].readyCompleter.complete(); + await Future.delayed(Duration.zero); + expect(invocationCount, 2); + verifyInOrder([ + mockOnStateChange.call(argThat(isA())), + mockOnStateChange.call(argThat(isA())), + ]); + verifyNoMoreInteractions(mockOnStateChange); + + verifyNoMoreInteractions(mockOnError); + verifyZeroInteractions(mockOnMessage); + }); + + test('emits WebSocketClosing state when WebSocketChannel\'s done completes', + () async { + final channelMocks = setUpChannelMock(); + + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () => Future.value(channelMocks.channel), + reconnectDelays: [Duration.zero], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + connectionManager.start(); + channelMocks.readyCompleter.complete(); + await Future.delayed(Duration.zero); + + verify(channelMocks.channel.ready).called(1); + verifyInOrder([ + mockOnStateChange.call(argThat(isA())), + mockOnStateChange.call(argThat(isA())), + ]); + verifyNoMoreInteractions(mockOnStateChange); + + channelMocks.sinkDoneCompleter.complete(); + await Future.delayed(Duration.zero); + + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + + verifyNoMoreInteractions(mockOnStateChange); + + verifyZeroInteractions(mockOnError); + verifyZeroInteractions(mockOnMessage); + }); + + test('emits WebSocketClosed state when WebSocketChannel\'s stream closes', + () async { + final channelMock = setUpChannelMock(); + + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () => Future.value(channelMock.channel), + reconnectDelays: [Duration.zero], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + connectionManager.start(); + channelMock.readyCompleter.complete(); + await Future.delayed(Duration.zero); + + verify(channelMock.channel.ready).called(1); + verifyInOrder([ + mockOnStateChange.call(argThat(isA())), + mockOnStateChange.call(argThat(isA())), + ]); + verifyNoMoreInteractions(mockOnStateChange); + + channelMock.streamController.close(); + await Future.delayed(Duration.zero); + + verify( + mockOnStateChange.call(argThat(isA())), + ).called(1); + + verifyNoMoreInteractions(mockOnStateChange); + + verifyZeroInteractions(mockOnError); + verifyZeroInteractions(mockOnMessage); + }); + + test('calls onError callback when WebSocketChannel\'s stream emits error', + () async { + final channelMock = setUpChannelMock(); + + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () => Future.value(channelMock.channel), + reconnectDelays: [Duration.zero], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + connectionManager.start(); + channelMock.readyCompleter.complete(); + await Future.delayed(Duration.zero); + + verify(channelMock.channel.ready).called(1); + verifyInOrder([ + mockOnStateChange.call(argThat(isA())), + mockOnStateChange.call(argThat(isA())), + ]); + verifyNoMoreInteractions(mockOnStateChange); + verifyZeroInteractions(mockOnError); + + final error = Object(); + channelMock.streamController.addError(error); + await Future.delayed(Duration.zero); + verify(mockOnError.call(error, any)).called(1); + verifyNoMoreInteractions(mockOnError); + + verifyNoMoreInteractions(mockOnStateChange); + verifyZeroInteractions(mockOnMessage); + }); + + test('start(immediatelly: true) executes connection attempt without delay', + () async { + final channelMock = setUpChannelMock(); + SocketConnectionManager connectionManager = SocketConnectionManager( + factory: () => Future.value(channelMock.channel), + reconnectDelays: [const Duration(days: 1)], + readyTimeout: Duration(minutes: 1), + onMessage: mockOnMessage.call, + onStateChange: mockOnStateChange.call, + onError: mockOnError.call, + ); + + channelMock.readyCompleter.complete(); // don't delay. + connectionManager.start(immediately: false); + await Future.delayed(Duration.zero); + + verifyZeroInteractions(channelMock.channel); + verifyZeroInteractions(mockOnStateChange); + + connectionManager.start(immediately: true); + await Future.delayed(Duration.zero); + + verify(channelMock.channel.ready).called(1); + verifyInOrder([ + mockOnStateChange.call(argThat(isA())), + mockOnStateChange.call(argThat(isA())), + ]); + verifyNoMoreInteractions(mockOnStateChange); + + verifyZeroInteractions(mockOnError); + verifyZeroInteractions(mockOnMessage); + }); + }); +}