diff --git a/das_client/integration_test/test/fahrbild_test.dart b/das_client/integration_test/test/fahrbild_test.dart index 0985c76b..899a27b9 100644 --- a/das_client/integration_test/test/fahrbild_test.dart +++ b/das_client/integration_test/test/fahrbild_test.dart @@ -28,7 +28,7 @@ void main() { await tester.pumpAndSettle(const Duration(seconds: 1)); // check if station is present - expect(find.text('unkown'), findsExactly(6)); + expect(find.text('SO_W'), findsOneWidget); }); }); } diff --git a/das_client/lib/di.dart b/das_client/lib/di.dart index 79a0821a..4b318b8d 100644 --- a/das_client/lib/di.dart +++ b/das_client/lib/di.dart @@ -90,7 +90,10 @@ extension GetItX on GetIt { void registerMqttService(bool useTms) { final flavor = get(); registerSingletonWithDependencies( - () => MqttService(mqttUrl: useTms ? flavor.tmsMqttUrl! : flavor.mqttUrl, mqttClientConnector: get()), + () => MqttService( + mqttUrl: useTms ? flavor.tmsMqttUrl! : flavor.mqttUrl, + mqttClientConnector: get(), + prefix: flavor.mqttTopicPrefix), dependsOn: [MqttClientConnector]); } diff --git a/das_client/lib/flavor.dart b/das_client/lib/flavor.dart index 0109082d..a7b73232 100644 --- a/das_client/lib/flavor.dart +++ b/das_client/lib/flavor.dart @@ -11,6 +11,7 @@ enum Flavor { tmsMqttUrl: 'wss://tms-vad-imtrackside-dev-mobile.messaging.solace.cloud', authenticatorConfig: _authenticatorConfigMockDev, tmsAuthenticatorConfig: _authenticatorConfigTmsDev, + mqttTopicPrefix: "thomas/", ), inte( displayName: 'Inte', @@ -33,6 +34,7 @@ enum Flavor { this.tmsMqttUrl, required this.authenticatorConfig, this.tmsAuthenticatorConfig, + this.mqttTopicPrefix = '' }); final String displayName; @@ -42,6 +44,7 @@ enum Flavor { final String? tmsMqttUrl; final AuthenticatorConfig authenticatorConfig; final AuthenticatorConfig? tmsAuthenticatorConfig; + final String mqttTopicPrefix; } diff --git a/das_client/lib/model/sfera/b2g_request.dart b/das_client/lib/model/sfera/b2g_request.dart index 95427913..df14980c 100644 --- a/das_client/lib/model/sfera/b2g_request.dart +++ b/das_client/lib/model/sfera/b2g_request.dart @@ -1,15 +1,22 @@ import 'package:das_client/model/sfera/jp_request.dart'; import 'package:das_client/model/sfera/sfera_xml_element.dart'; +import 'package:das_client/model/sfera/sp_request.dart'; class B2gRequest extends SferaXmlElement { static const String elementType = "B2G_Request"; B2gRequest({super.type = elementType, super.attributes, super.children, super.value}); - factory B2gRequest.create({JpRequest? jpRequest}) { + factory B2gRequest.createJPRequest(JpRequest jpRequest) { final request = B2gRequest(); - if (jpRequest != null) { - request.children.add(jpRequest); + request.children.add(jpRequest); + return request; + } + + factory B2gRequest.createSPRequest(List spRequests) { + final request = B2gRequest(); + for (final spRequest in spRequests) { + request.children.add(spRequest); } return request; } diff --git a/das_client/lib/model/sfera/sp_request.dart b/das_client/lib/model/sfera/sp_request.dart new file mode 100644 index 00000000..07ed4214 --- /dev/null +++ b/das_client/lib/model/sfera/sp_request.dart @@ -0,0 +1,17 @@ +import 'package:das_client/model/sfera/sfera_xml_element.dart'; +import 'package:das_client/model/sfera/sp_zone.dart'; + +class SpRequest extends SferaXmlElement { + static const String elementType = "SP_Request"; + + SpRequest({super.type = elementType, super.attributes, super.children, super.value}); + + factory SpRequest.create({required String id, required String versionMajor, required String versionMinor, required SpZone spZone}) { + final request = SpRequest(); + request.attributes["SP_ID"] = id; + request.attributes["SP_VersionMajor"] = versionMajor; + request.attributes["SP_VersionMinor"] = versionMinor; + request.children.add(spZone); + return request; + } +} diff --git a/das_client/lib/service/mqtt/mqtt_client_tms_oauth_connector.dart b/das_client/lib/service/mqtt/mqtt_client_tms_oauth_connector.dart index 4a44dae0..934265e9 100644 --- a/das_client/lib/service/mqtt/mqtt_client_tms_oauth_connector.dart +++ b/das_client/lib/service/mqtt/mqtt_client_tms_oauth_connector.dart @@ -10,10 +10,10 @@ class MqttClientTMSOauthConnector implements MqttClientConnector { @override Future connect(MqttClient client, String company, String train) async { - Fimber.i("Connecting to mqtt using oauth token"); + Fimber.i("Connecting to TMS mqtt using oauth token"); var sferaAuthToken = await _sferaAuthService.retrieveSferaAuthToken(company, train, "active"); - Fimber.i("Received sfera token=${sferaAuthToken?.substring(0, 20)}"); + Fimber.i("Received TMS sfera token=${sferaAuthToken?.substring(0, 20)}"); if (sferaAuthToken != null) { try { diff --git a/das_client/lib/service/mqtt/mqtt_service.dart b/das_client/lib/service/mqtt/mqtt_service.dart index 596519e4..26ce4bda 100644 --- a/das_client/lib/service/mqtt/mqtt_service.dart +++ b/das_client/lib/service/mqtt/mqtt_service.dart @@ -12,6 +12,7 @@ import 'package:rxdart/rxdart.dart'; class MqttService { final String _mqttUrl; final MqttClientConnector _mqttClientConnector; + final String prefix; late MqttServerClient _client; late String _deviceId; @@ -22,7 +23,7 @@ class MqttService { Stream get messageStream => _messageSubject.stream; - MqttService({required String mqttUrl, required MqttClientConnector mqttClientConnector}) + MqttService({required String mqttUrl, required MqttClientConnector mqttClientConnector, required this.prefix}) : _mqttUrl = mqttUrl, _mqttClientConnector = mqttClientConnector { _init(); @@ -44,9 +45,9 @@ class MqttService { _client.disconnect(); } if (await _mqttClientConnector.connect(_client, company, train)) { - _client.subscribe("90940/2/G2B/$company/$train", MqttQos.exactlyOnce); - _client.subscribe("90940/2/G2B/$company/$train/$_deviceId", MqttQos.exactlyOnce); - Fimber.i("Subscribed to topic..."); + _client.subscribe("${prefix}90940/2/G2B/$company/$train", MqttQos.exactlyOnce); + _client.subscribe("${prefix}90940/2/G2B/$company/$train/$_deviceId", MqttQos.exactlyOnce); + Fimber.i("Subscribed to topic with prefix='$prefix'..."); _startUpdateListener(); return true; } @@ -56,7 +57,7 @@ class MqttService { bool publishMessage(String company, String train, String message) { if (_client.connectionStatus?.state == MqttConnectionState.connected) { - final topic = "90940/2/B2G/$company/$train/$_deviceId"; + final topic = "${prefix}90940/2/B2G/$company/$train/$_deviceId"; final builder = MqttClientPayloadBuilder(); builder.addString(message); diff --git a/das_client/lib/service/sfera/sfera_service.dart b/das_client/lib/service/sfera/sfera_service.dart index 0849e764..a906b099 100644 --- a/das_client/lib/service/sfera/sfera_service.dart +++ b/das_client/lib/service/sfera/sfera_service.dart @@ -16,6 +16,7 @@ import 'package:das_client/service/sfera/handler/sfera_message_handler.dart'; import 'package:das_client/service/sfera/sfera_service_state.dart'; import 'package:das_client/service/sfera/task/handshake_task.dart'; import 'package:das_client/service/sfera/task/request_journey_profile_task.dart'; +import 'package:das_client/service/sfera/task/request_segment_profiles_task.dart'; import 'package:das_client/service/sfera/task/sfera_task.dart'; import 'package:das_client/util/device_id_info.dart'; import 'package:das_client/util/error_code.dart'; @@ -125,6 +126,12 @@ class SferaService { _messageHandlers.add(requestJourneyTask); requestJourneyTask.execute(onTaskCompleted, onTaskFailed); } else if (task is RequestJourneyProfileTask) { + _stateSubject.add(SferaServiceState.loadingSegments); + var requestSegmentProfilesTask = RequestSegmentProfilesTask( + mqttService: _mqttService, sferaRepository: _sferaRepository, otnId: otnId!, journeyProfile: data); + _messageHandlers.add(requestSegmentProfilesTask); + requestSegmentProfilesTask.execute(onTaskCompleted, onTaskFailed); + } else if (task is RequestSegmentProfilesTask) { _addMessageHandlers(); _stateSubject.add(SferaServiceState.connected); } @@ -149,15 +156,9 @@ class SferaService { _stateSubject.add(SferaServiceState.disconnected); } - static Future messageHeader( - {TrainIdentification? trainIdentification}) async { - return MessageHeader.create( - const Uuid().v4(), - Format.sferaTimestamp(DateTime.now()), - await DeviceIdInfo.getDeviceId(), - "TMS", - "1085", - "0085", + static Future messageHeader({TrainIdentification? trainIdentification}) async { + return MessageHeader.create(const Uuid().v4(), Format.sferaTimestamp(DateTime.now()), + await DeviceIdInfo.getDeviceId(), "TMS", "1085", "0085", trainIdentification: trainIdentification); } diff --git a/das_client/lib/service/sfera/task/request_journey_profile_task.dart b/das_client/lib/service/sfera/task/request_journey_profile_task.dart index eb1f66cf..64a9fefc 100644 --- a/das_client/lib/service/sfera/task/request_journey_profile_task.dart +++ b/das_client/lib/service/sfera/task/request_journey_profile_task.dart @@ -42,7 +42,7 @@ class RequestJourneyProfileTask extends SferaTask { var sferaB2gRequestMessage = SferaB2gRequestMessage.create( await SferaService.messageHeader(trainIdentification: trainIdentification), - b2gRequest: B2gRequest.create(jpRequest: jpRequest)); + b2gRequest: B2gRequest.createJPRequest(jpRequest)); Fimber.i("Sending journey profile request..."); _mqttService.publishMessage(otnId.company, SferaService.sferaTrain(otnId.operationalTrainNumber, otnId.startDate), sferaB2gRequestMessage.buildDocument().toString()); diff --git a/das_client/lib/service/sfera/task/request_segment_profiles_task.dart b/das_client/lib/service/sfera/task/request_segment_profiles_task.dart new file mode 100644 index 00000000..96404319 --- /dev/null +++ b/das_client/lib/service/sfera/task/request_segment_profiles_task.dart @@ -0,0 +1,98 @@ +import 'package:das_client/model/sfera/b2g_request.dart'; +import 'package:das_client/model/sfera/journey_profile.dart'; +import 'package:das_client/model/sfera/jp_request.dart'; +import 'package:das_client/model/sfera/otn_id.dart'; +import 'package:das_client/model/sfera/segment_profile.dart'; +import 'package:das_client/model/sfera/segment_profile_list.dart'; +import 'package:das_client/model/sfera/sfera_b2g_request_message.dart'; +import 'package:das_client/model/sfera/sfera_g2b_reply_message.dart'; +import 'package:das_client/model/sfera/sp_request.dart'; +import 'package:das_client/model/sfera/train_identification.dart'; +import 'package:das_client/repo/sfera_repository.dart'; +import 'package:das_client/service/mqtt/mqtt_service.dart'; +import 'package:das_client/service/sfera/sfera_service.dart'; +import 'package:das_client/service/sfera/task/sfera_task.dart'; +import 'package:fimber/fimber.dart'; + +class RequestSegmentProfilesTask extends SferaTask> { + RequestSegmentProfilesTask({required MqttService mqttService, + required SferaRepository sferaRepository, + required this.otnId, + required this.journeyProfile, + super.timeout}) + : _mqttService = mqttService, + _sferaRepository = sferaRepository; + + final MqttService _mqttService; + final OtnId otnId; + final SferaRepository _sferaRepository; + final JourneyProfile journeyProfile; + + late TaskCompleted> _taskCompletedCallback; + late TaskFailed _taskFailedCallback; + + @override + Future execute(TaskCompleted> onCompleted, TaskFailed onFailed) async { + _taskCompletedCallback = onCompleted; + _taskFailedCallback = onFailed; + + await _requestSegmentProfiles(); + startTimeout(_taskFailedCallback); + } + + Future _requestSegmentProfiles() async { + + var missingSp = await findMissingSegmentProfiles(); + if (missingSp.isEmpty) { + Fimber.i("No missing SegmentProfiles found..."); + _taskCompletedCallback(this, []); + return; + } + + List spRequests = []; + for (var sp in missingSp) { + spRequests.add(SpRequest.create(id: sp.spId, versionMajor: sp.versionMajor, versionMinor: sp.versionMinor, spZone: sp.spZone)); + } + + var trainIdentification = TrainIdentification.create(otnId: otnId); + var sferaB2gRequestMessage = SferaB2gRequestMessage.create( + await SferaService.messageHeader(trainIdentification: trainIdentification), + b2gRequest: B2gRequest.createSPRequest(spRequests)); + Fimber.i("Sending segment profiles request..."); + + _mqttService.publishMessage(otnId.company, SferaService.sferaTrain(otnId.operationalTrainNumber, otnId.startDate), + sferaB2gRequestMessage.buildDocument().toString()); + } + + Future> findMissingSegmentProfiles() async { + var missingSps = []; + + for (var segment in journeyProfile.segmentProfilesLists) { + var existingProfile = await _sferaRepository.findSegmentProfile( + segment.spId, segment.versionMajor, segment.versionMinor); + if (existingProfile == null) { + missingSps.add(segment); + } + } + + return missingSps; + } + + @override + Future handleMessage(SferaG2bReplyMessage replyMessage) async { + if (replyMessage.payload != null && replyMessage.payload!.segmentProfiles.isNotEmpty) { + stopTimeout(); + Fimber.i( + "Received G2bReplyPayload response with ${replyMessage.payload!.segmentProfiles.length} SegmentProfiles...", + ); + + for (var element in replyMessage.payload!.segmentProfiles) { + await _sferaRepository.saveSegmentProfile(element); + } + + _taskCompletedCallback(this, replyMessage.payload!.segmentProfiles.toList()); + return true; + } + return false; + } +} diff --git a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_JP_7839.xml b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_JP_7839.xml index 00aae2f1..e7300aab 100644 --- a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_JP_7839.xml +++ b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_JP_7839.xml @@ -9,7 +9,7 @@ 2022-01-04 - + 0085 diff --git a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_1.xml b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_1.xml index fa6841ba..3bf2d7a3 100644 --- a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_1.xml +++ b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_1.xml @@ -1,7 +1,7 @@ + SP_ID="1" SP_VersionMajor="1" SP_VersionMinor="1" SP_Length="41500" SP_Status="Valid"> 0085 diff --git a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_2.xml b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_2.xml index 4e00003e..ec912f3a 100644 --- a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_2.xml +++ b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_2.xml @@ -1,7 +1,7 @@ + SP_ID="2" SP_VersionMajor="1" SP_VersionMinor="1" SP_Length="41500" SP_Status="Valid"> 0085 diff --git a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_3.xml b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_3.xml index d0cd5ec4..d0e2b6c4 100644 --- a/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_3.xml +++ b/sfera-mock/src/main/resources/sfera_example_messages/SFERA_SP_3.xml @@ -1,7 +1,7 @@ + SP_ID="3" SP_VersionMajor="1" SP_VersionMinor="1" SP_Length="41500" SP_Status="Valid"> 0085