diff --git a/lib/core/rsocket_requester.dart b/lib/core/rsocket_requester.dart index 02d65b8..28f25a7 100644 --- a/lib/core/rsocket_requester.dart +++ b/lib/core/rsocket_requester.dart @@ -153,7 +153,7 @@ class RSocketRequester extends RSocket { connection.write(setupPayloadFrame()); if (mode == 'requester') { keepAliveTimer = Timer.periodic( - Duration(seconds: connectionSetupPayload!.keepAliveInterval), + Duration(milliseconds: connectionSetupPayload!.keepAliveIntervalMs), (Timer t) { if (!closed) { connection.write(FrameCodec.encodeKeepAlive(false, 0)); @@ -261,8 +261,7 @@ class RSocketRequester extends RSocket { case frame_types.REQUEST_RESPONSE: var requestResponseFrame = frame as RequestResponseFrame; if (responder != null && requestResponseFrame.payload != null) { - responder!.subscribe!(requestResponseFrame.payload) - .then((payload) { + responder!.subscribe!(requestResponseFrame.payload).then((payload) { connection.write( FrameCodec.encodePayloadFrame(header.streamId, false, payload)); }).catchError((error) { @@ -315,8 +314,8 @@ class RSocketRequester extends RSocket { Uint8List setupPayloadFrame() { return FrameCodec.encodeSetupFrame( - connectionSetupPayload!.keepAliveInterval, - connectionSetupPayload!.keepAliveMaxLifetime, + connectionSetupPayload!.keepAliveIntervalMs, + connectionSetupPayload!.keepAliveMaxLifetimeMs, connectionSetupPayload!.metadataMimeType, connectionSetupPayload!.dataMimeType, connectionSetupPayload); diff --git a/lib/core/rsocket_responder.dart b/lib/core/rsocket_responder.dart index 1bed9f8..09ad82e 100644 --- a/lib/core/rsocket_responder.dart +++ b/lib/core/rsocket_responder.dart @@ -1,6 +1,5 @@ - - import 'package:universal_io/io.dart'; +import 'package:web_socket_channel/src/channel.dart'; import '../core/rsocket_requester.dart'; import '../duplex_connection.dart'; @@ -21,8 +20,8 @@ class BaseResponder { if (header.type == frame_types.SETUP) { var setupFrame = frame as SetupFrame; var connectionSetupPayload = ConnectionSetupPayload() - ..keepAliveInterval = setupFrame.keepAliveInterval - ..keepAliveMaxLifetime = setupFrame.keepAliveMaxLifetime + ..keepAliveIntervalMs = setupFrame.keepAliveIntervalMs + ..keepAliveMaxLifetimeMs = setupFrame.keepAliveMaxLifetimeMs ..metadataMimeType = setupFrame.metadataMimeType ..dataMimeType = setupFrame.dataMimeType ..data = setupFrame.payload?.data @@ -82,8 +81,8 @@ class WebSocketRSocketResponder extends BaseResponder implements Closeable { httpServer.listen((HttpRequest req) { if (req.uri.path == uri.path) { WebSocketTransformer.upgrade(req) - .then((webSocket) => - receiveConnection(WebSocketDuplexConnection(webSocket))) + .then((webSocket) => receiveConnection( + WebSocketDuplexConnection(webSocket as WebSocketChannel))) .then((value) => {}); } }); diff --git a/lib/frame/frame.dart b/lib/frame/frame.dart index 7080970..6a9e0e0 100644 --- a/lib/frame/frame.dart +++ b/lib/frame/frame.dart @@ -118,8 +118,8 @@ class SetupFrame extends RSocketFrame { Payload? payload; String metadataMimeType = 'message/x.rsocket.composite-metadata.v0'; String dataMimeType = 'application/json'; - int keepAliveInterval = 20; - int keepAliveMaxLifetime = 90; + int keepAliveIntervalMs = 20 * 1000; // 20 seconds + int keepAliveMaxLifetimeMs = 90 * 1000; // 90 seconds String? resumeToken; bool leaseEnable = false; @@ -135,11 +135,11 @@ class SetupFrame extends RSocketFrame { var minorVersion = buffer.readI16(); var keepAliveInterval = buffer.readI32(); if (keepAliveInterval != null) { - this.keepAliveInterval = keepAliveInterval; + this.keepAliveIntervalMs = keepAliveInterval; } var keepAliveMaxLifetime = buffer.readI32(); if (keepAliveMaxLifetime != null) { - this.keepAliveMaxLifetime = keepAliveMaxLifetime; + this.keepAliveMaxLifetimeMs = keepAliveMaxLifetime; } //resume token extraction if (resumeEnable) { @@ -337,8 +337,8 @@ class PayloadFrame extends RSocketFrame { class FrameCodec { static Uint8List encodeSetupFrame( - int keepAliveInterval, - int keepAliveMaxLifetime, + int keepAliveIntervalMs, + int keepAliveMaxLifetimeMs, String metadataMimeType, String dataMimeType, Payload? setupPayload) { @@ -350,8 +350,8 @@ class FrameCodec { frameBuffer, frame_types.SETUP, setupPayload?.metadata, 0); frameBuffer.writeI16(MAJOR_VERSION); frameBuffer.writeI16(MINOR_VERSION); - frameBuffer.writeI32(keepAliveInterval); - frameBuffer.writeI32(keepAliveMaxLifetime); + frameBuffer.writeI32(keepAliveIntervalMs); + frameBuffer.writeI32(keepAliveMaxLifetimeMs); //Metadata Encoding MIME Type frameBuffer.writeI8(metadataMimeType.length); frameBuffer.writeBytes(utf8.encode(metadataMimeType)); diff --git a/lib/payload.dart b/lib/payload.dart index b237919..d608ed8 100644 --- a/lib/payload.dart +++ b/lib/payload.dart @@ -44,16 +44,16 @@ class Payload { class ConnectionSetupPayload extends Payload { String metadataMimeType = 'message/x.rsocket.composite-metadata.v0'; String dataMimeType = 'application/json'; - int keepAliveInterval = 20; - int keepAliveMaxLifetime = 90; + int keepAliveIntervalMs = 20 * 1000; // 20 seconds + int keepAliveMaxLifetimeMs = 90 * 1000; // 90 seconds int flags = 0; @override Map toJson() => { 'metadataMimeType': metadataMimeType, 'dataMimeType': dataMimeType, - 'keepAliveInterval': keepAliveInterval, - 'keepAliveMaxLifetime': keepAliveMaxLifetime, + 'keepAliveInterval': keepAliveIntervalMs, + 'keepAliveMaxLifetime': keepAliveMaxLifetimeMs, 'flags': flags, 'metadata': metadata, 'data': data diff --git a/lib/rsocket_connector.dart b/lib/rsocket_connector.dart index 8a70952..41697f1 100644 --- a/lib/rsocket_connector.dart +++ b/lib/rsocket_connector.dart @@ -8,8 +8,8 @@ import 'rsocket.dart'; class RSocketConnector { Payload? payload; - int keepAliveInterval = 20; - int keepAliveMaxLifeTime = 90; + int keepAliveIntervalMs = 20 * 1000; // 20 seconds + int keepAliveMaxLifeTimeMs = 90 * 1000; // 90 seconds String _dataMimeType = 'application/json'; String _metadataMimeType = 'message/x.rsocket.composite-metadata.v0'; ErrorConsumer? _errorConsumer; @@ -39,16 +39,16 @@ class RSocketConnector { // set the keep alive, and unit is second RSocketConnector keepAlive(int interval, int maxLifeTime) { - this.keepAliveInterval = interval; - this.keepAliveMaxLifeTime = maxLifeTime; + this.keepAliveIntervalMs = interval * 1000; + this.keepAliveMaxLifeTimeMs = maxLifeTime * 1000; return this; } Future connect(String url) async { TcpChunkHandler handler = (Uint8List chunk) {}; var connectionSetupPayload = ConnectionSetupPayload() - ..keepAliveInterval = keepAliveInterval - ..keepAliveMaxLifetime = keepAliveMaxLifeTime + ..keepAliveIntervalMs = keepAliveIntervalMs + ..keepAliveMaxLifetimeMs = keepAliveMaxLifeTimeMs ..metadataMimeType = _metadataMimeType ..dataMimeType = _dataMimeType ..data = payload?.data