Skip to content

Commit

Permalink
add prefix and SP request
Browse files Browse the repository at this point in the history
  • Loading branch information
Grodien committed Oct 28, 2024
1 parent 63e2697 commit fbf42be
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 26 deletions.
2 changes: 1 addition & 1 deletion das_client/integration_test/test/fahrbild_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void main() {
await tester.pumpAndSettle(const Duration(seconds: 1));

// check if station is present
expect(find.text('unkown'), findsExactly(6));
expect(find.text('SO_W'), findsOneWidget);
});
});
}
5 changes: 4 additions & 1 deletion das_client/lib/di.dart
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ extension GetItX on GetIt {
void registerMqttService(bool useTms) {
final flavor = get<Flavor>();
registerSingletonWithDependencies<MqttService>(
() => MqttService(mqttUrl: useTms ? flavor.tmsMqttUrl! : flavor.mqttUrl, mqttClientConnector: get()),
() => MqttService(
mqttUrl: useTms ? flavor.tmsMqttUrl! : flavor.mqttUrl,
mqttClientConnector: get(),
prefix: flavor.mqttTopicPrefix),
dependsOn: [MqttClientConnector]);
}

Expand Down
3 changes: 3 additions & 0 deletions das_client/lib/flavor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ enum Flavor {
tmsMqttUrl: 'wss://tms-vad-imtrackside-dev-mobile.messaging.solace.cloud',
authenticatorConfig: _authenticatorConfigMockDev,
tmsAuthenticatorConfig: _authenticatorConfigTmsDev,
mqttTopicPrefix: "thomas/",
),
inte(
displayName: 'Inte',
Expand All @@ -33,6 +34,7 @@ enum Flavor {
this.tmsMqttUrl,
required this.authenticatorConfig,
this.tmsAuthenticatorConfig,
this.mqttTopicPrefix = ''
});

final String displayName;
Expand All @@ -42,6 +44,7 @@ enum Flavor {
final String? tmsMqttUrl;
final AuthenticatorConfig authenticatorConfig;
final AuthenticatorConfig? tmsAuthenticatorConfig;
final String mqttTopicPrefix;
}


Expand Down
13 changes: 10 additions & 3 deletions das_client/lib/model/sfera/b2g_request.dart
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import 'package:das_client/model/sfera/jp_request.dart';
import 'package:das_client/model/sfera/sfera_xml_element.dart';
import 'package:das_client/model/sfera/sp_request.dart';

class B2gRequest extends SferaXmlElement {
static const String elementType = "B2G_Request";

B2gRequest({super.type = elementType, super.attributes, super.children, super.value});

factory B2gRequest.create({JpRequest? jpRequest}) {
factory B2gRequest.createJPRequest(JpRequest jpRequest) {
final request = B2gRequest();
if (jpRequest != null) {
request.children.add(jpRequest);
request.children.add(jpRequest);
return request;
}

factory B2gRequest.createSPRequest(List<SpRequest> spRequests) {
final request = B2gRequest();
for (final spRequest in spRequests) {
request.children.add(spRequest);
}
return request;
}
Expand Down
17 changes: 17 additions & 0 deletions das_client/lib/model/sfera/sp_request.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import 'package:das_client/model/sfera/sfera_xml_element.dart';
import 'package:das_client/model/sfera/sp_zone.dart';

class SpRequest extends SferaXmlElement {
static const String elementType = "SP_Request";

SpRequest({super.type = elementType, super.attributes, super.children, super.value});

factory SpRequest.create({required String id, required String versionMajor, required String versionMinor, required SpZone spZone}) {
final request = SpRequest();
request.attributes["SP_ID"] = id;
request.attributes["SP_VersionMajor"] = versionMajor;
request.attributes["SP_VersionMinor"] = versionMinor;
request.children.add(spZone);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ class MqttClientTMSOauthConnector implements MqttClientConnector {

@override
Future<bool> connect(MqttClient client, String company, String train) async {
Fimber.i("Connecting to mqtt using oauth token");
Fimber.i("Connecting to TMS mqtt using oauth token");

var sferaAuthToken = await _sferaAuthService.retrieveSferaAuthToken(company, train, "active");
Fimber.i("Received sfera token=${sferaAuthToken?.substring(0, 20)}");
Fimber.i("Received TMS sfera token=${sferaAuthToken?.substring(0, 20)}");

if (sferaAuthToken != null) {
try {
Expand Down
11 changes: 6 additions & 5 deletions das_client/lib/service/mqtt/mqtt_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import 'package:rxdart/rxdart.dart';
class MqttService {
final String _mqttUrl;
final MqttClientConnector _mqttClientConnector;
final String prefix;

late MqttServerClient _client;
late String _deviceId;
Expand All @@ -22,7 +23,7 @@ class MqttService {

Stream<String> get messageStream => _messageSubject.stream;

MqttService({required String mqttUrl, required MqttClientConnector mqttClientConnector})
MqttService({required String mqttUrl, required MqttClientConnector mqttClientConnector, required this.prefix})
: _mqttUrl = mqttUrl,
_mqttClientConnector = mqttClientConnector {
_init();
Expand All @@ -44,9 +45,9 @@ class MqttService {
_client.disconnect();
}
if (await _mqttClientConnector.connect(_client, company, train)) {
_client.subscribe("90940/2/G2B/$company/$train", MqttQos.exactlyOnce);
_client.subscribe("90940/2/G2B/$company/$train/$_deviceId", MqttQos.exactlyOnce);
Fimber.i("Subscribed to topic...");
_client.subscribe("${prefix}90940/2/G2B/$company/$train", MqttQos.exactlyOnce);
_client.subscribe("${prefix}90940/2/G2B/$company/$train/$_deviceId", MqttQos.exactlyOnce);
Fimber.i("Subscribed to topic with prefix='$prefix'...");
_startUpdateListener();
return true;
}
Expand All @@ -56,7 +57,7 @@ class MqttService {

bool publishMessage(String company, String train, String message) {
if (_client.connectionStatus?.state == MqttConnectionState.connected) {
final topic = "90940/2/B2G/$company/$train/$_deviceId";
final topic = "${prefix}90940/2/B2G/$company/$train/$_deviceId";

final builder = MqttClientPayloadBuilder();
builder.addString(message);
Expand Down
19 changes: 10 additions & 9 deletions das_client/lib/service/sfera/sfera_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import 'package:das_client/service/sfera/handler/sfera_message_handler.dart';
import 'package:das_client/service/sfera/sfera_service_state.dart';
import 'package:das_client/service/sfera/task/handshake_task.dart';
import 'package:das_client/service/sfera/task/request_journey_profile_task.dart';
import 'package:das_client/service/sfera/task/request_segment_profiles_task.dart';
import 'package:das_client/service/sfera/task/sfera_task.dart';
import 'package:das_client/util/device_id_info.dart';
import 'package:das_client/util/error_code.dart';
Expand Down Expand Up @@ -125,6 +126,12 @@ class SferaService {
_messageHandlers.add(requestJourneyTask);
requestJourneyTask.execute(onTaskCompleted, onTaskFailed);
} else if (task is RequestJourneyProfileTask) {
_stateSubject.add(SferaServiceState.loadingSegments);
var requestSegmentProfilesTask = RequestSegmentProfilesTask(
mqttService: _mqttService, sferaRepository: _sferaRepository, otnId: otnId!, journeyProfile: data);
_messageHandlers.add(requestSegmentProfilesTask);
requestSegmentProfilesTask.execute(onTaskCompleted, onTaskFailed);
} else if (task is RequestSegmentProfilesTask) {
_addMessageHandlers();
_stateSubject.add(SferaServiceState.connected);
}
Expand All @@ -149,15 +156,9 @@ class SferaService {
_stateSubject.add(SferaServiceState.disconnected);
}

static Future<MessageHeader> messageHeader(
{TrainIdentification? trainIdentification}) async {
return MessageHeader.create(
const Uuid().v4(),
Format.sferaTimestamp(DateTime.now()),
await DeviceIdInfo.getDeviceId(),
"TMS",
"1085",
"0085",
static Future<MessageHeader> messageHeader({TrainIdentification? trainIdentification}) async {
return MessageHeader.create(const Uuid().v4(), Format.sferaTimestamp(DateTime.now()),
await DeviceIdInfo.getDeviceId(), "TMS", "1085", "0085",
trainIdentification: trainIdentification);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RequestJourneyProfileTask extends SferaTask<JourneyProfile> {

var sferaB2gRequestMessage = SferaB2gRequestMessage.create(
await SferaService.messageHeader(trainIdentification: trainIdentification),
b2gRequest: B2gRequest.create(jpRequest: jpRequest));
b2gRequest: B2gRequest.createJPRequest(jpRequest));
Fimber.i("Sending journey profile request...");
_mqttService.publishMessage(otnId.company, SferaService.sferaTrain(otnId.operationalTrainNumber, otnId.startDate),
sferaB2gRequestMessage.buildDocument().toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import 'package:das_client/model/sfera/b2g_request.dart';
import 'package:das_client/model/sfera/journey_profile.dart';
import 'package:das_client/model/sfera/jp_request.dart';
import 'package:das_client/model/sfera/otn_id.dart';
import 'package:das_client/model/sfera/segment_profile.dart';
import 'package:das_client/model/sfera/segment_profile_list.dart';
import 'package:das_client/model/sfera/sfera_b2g_request_message.dart';
import 'package:das_client/model/sfera/sfera_g2b_reply_message.dart';
import 'package:das_client/model/sfera/sp_request.dart';
import 'package:das_client/model/sfera/train_identification.dart';
import 'package:das_client/repo/sfera_repository.dart';
import 'package:das_client/service/mqtt/mqtt_service.dart';
import 'package:das_client/service/sfera/sfera_service.dart';
import 'package:das_client/service/sfera/task/sfera_task.dart';
import 'package:fimber/fimber.dart';

class RequestSegmentProfilesTask extends SferaTask<List<SegmentProfile>> {
RequestSegmentProfilesTask({required MqttService mqttService,
required SferaRepository sferaRepository,
required this.otnId,
required this.journeyProfile,
super.timeout})
: _mqttService = mqttService,
_sferaRepository = sferaRepository;

final MqttService _mqttService;
final OtnId otnId;
final SferaRepository _sferaRepository;
final JourneyProfile journeyProfile;

late TaskCompleted<List<SegmentProfile>> _taskCompletedCallback;
late TaskFailed _taskFailedCallback;

@override
Future<void> execute(TaskCompleted<List<SegmentProfile>> onCompleted, TaskFailed onFailed) async {
_taskCompletedCallback = onCompleted;
_taskFailedCallback = onFailed;

await _requestSegmentProfiles();
startTimeout(_taskFailedCallback);
}

Future<void> _requestSegmentProfiles() async {

var missingSp = await findMissingSegmentProfiles();
if (missingSp.isEmpty) {
Fimber.i("No missing SegmentProfiles found...");
_taskCompletedCallback(this, []);
return;
}

List<SpRequest> spRequests = [];
for (var sp in missingSp) {
spRequests.add(SpRequest.create(id: sp.spId, versionMajor: sp.versionMajor, versionMinor: sp.versionMinor, spZone: sp.spZone));
}

var trainIdentification = TrainIdentification.create(otnId: otnId);
var sferaB2gRequestMessage = SferaB2gRequestMessage.create(
await SferaService.messageHeader(trainIdentification: trainIdentification),
b2gRequest: B2gRequest.createSPRequest(spRequests));
Fimber.i("Sending segment profiles request...");

_mqttService.publishMessage(otnId.company, SferaService.sferaTrain(otnId.operationalTrainNumber, otnId.startDate),
sferaB2gRequestMessage.buildDocument().toString());
}

Future<List<SegmentProfileList>> findMissingSegmentProfiles() async {
var missingSps = <SegmentProfileList>[];

for (var segment in journeyProfile.segmentProfilesLists) {
var existingProfile = await _sferaRepository.findSegmentProfile(
segment.spId, segment.versionMajor, segment.versionMinor);
if (existingProfile == null) {
missingSps.add(segment);
}
}

return missingSps;
}

@override
Future<bool> handleMessage(SferaG2bReplyMessage replyMessage) async {
if (replyMessage.payload != null && replyMessage.payload!.segmentProfiles.isNotEmpty) {
stopTimeout();
Fimber.i(
"Received G2bReplyPayload response with ${replyMessage.payload!.segmentProfiles.length} SegmentProfiles...",
);

for (var element in replyMessage.payload!.segmentProfiles) {
await _sferaRepository.saveSegmentProfile(element);
}

_taskCompletedCallback(this, replyMessage.payload!.segmentProfiles.toList());
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<StartDate>2022-01-04</StartDate>
</OTN_ID>
</TrainIdentification>
<SegmentProfileList SP_ID="2" SP_VersionMajor="1" SP_VersionMinor="1" SP_Direction="Nominal">
<SegmentProfileList SP_ID="3" SP_VersionMajor="1" SP_VersionMinor="1" SP_Direction="Nominal">
<SP_Zone>
<IM_ID>0085</IM_ID>
</SP_Zone>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<SegmentProfile xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="../SFERA_3.0_custom.xsd"
SP_ID="1" SP_VersionMajor="1" SP_VersionMinor="0" SP_Length="41500" SP_Status="Valid">
SP_ID="1" SP_VersionMajor="1" SP_VersionMinor="1" SP_Length="41500" SP_Status="Valid">
<SP_Zone>
<IM_ID>0085</IM_ID>
</SP_Zone>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<SegmentProfile xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="../SFERA_3.0_custom.xsd"
SP_ID="2" SP_VersionMajor="1" SP_VersionMinor="0" SP_Length="41500" SP_Status="Valid">
SP_ID="2" SP_VersionMajor="1" SP_VersionMinor="1" SP_Length="41500" SP_Status="Valid">
<SP_Zone>
<IM_ID>0085</IM_ID>
</SP_Zone>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<SegmentProfile xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="../SFERA_3.0_custom.xsd"
SP_ID="3" SP_VersionMajor="1" SP_VersionMinor="0" SP_Length="41500" SP_Status="Valid">
SP_ID="3" SP_VersionMajor="1" SP_VersionMinor="1" SP_Length="41500" SP_Status="Valid">
<SP_Zone>
<IM_ID>0085</IM_ID>
</SP_Zone>
Expand Down

0 comments on commit fbf42be

Please sign in to comment.