diff --git a/gen/dart/lib/services/updates_svc/v1/updates_svc.pb.dart b/gen/dart/lib/services/updates_svc/v1/updates_svc.pb.dart index b23d1c0bc..b388355c4 100644 --- a/gen/dart/lib/services/updates_svc/v1/updates_svc.pb.dart +++ b/gen/dart/lib/services/updates_svc/v1/updates_svc.pb.dart @@ -11,6 +11,7 @@ import 'dart:core' as $core; +import 'package:fixnum/fixnum.dart' as $fixnum; import 'package:protobuf/protobuf.dart' as $pb; class EntityEvent extends $pb.GeneratedMessage { @@ -156,12 +157,21 @@ class DomainEvent extends $pb.GeneratedMessage { } class ReceiveUpdatesRequest extends $pb.GeneratedMessage { - factory ReceiveUpdatesRequest() => create(); + factory ReceiveUpdatesRequest({ + $fixnum.Int64? revision, + }) { + final $result = create(); + if (revision != null) { + $result.revision = revision; + } + return $result; + } ReceiveUpdatesRequest._() : super(); factory ReceiveUpdatesRequest.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); factory ReceiveUpdatesRequest.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'ReceiveUpdatesRequest', package: const $pb.PackageName(_omitMessageNames ? '' : 'services.updates_svc.v1'), createEmptyInstance: create) + ..a<$fixnum.Int64>(1, _omitFieldNames ? '' : 'revision', $pb.PbFieldType.OU6, defaultOrMaker: $fixnum.Int64.ZERO) ..hasRequiredFields = false ; @@ -185,6 +195,15 @@ class ReceiveUpdatesRequest extends $pb.GeneratedMessage { @$core.pragma('dart2js:noInline') static ReceiveUpdatesRequest getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); static ReceiveUpdatesRequest? _defaultInstance; + + @$pb.TagNumber(1) + $fixnum.Int64 get revision => $_getI64(0); + @$pb.TagNumber(1) + set revision($fixnum.Int64 v) { $_setInt64(0, v); } + @$pb.TagNumber(1) + $core.bool hasRevision() => $_has(0); + @$pb.TagNumber(1) + void clearRevision() => clearField(1); } enum ReceiveUpdatesResponse_Event { @@ -195,10 +214,14 @@ enum ReceiveUpdatesResponse_Event { class ReceiveUpdatesResponse extends $pb.GeneratedMessage { factory ReceiveUpdatesResponse({ + $fixnum.Int64? revision, EntityEvent? entityEvent, DomainEvent? domainEvent, }) { final $result = create(); + if (revision != null) { + $result.revision = revision; + } if (entityEvent != null) { $result.entityEvent = entityEvent; } @@ -212,14 +235,15 @@ class ReceiveUpdatesResponse extends $pb.GeneratedMessage { factory ReceiveUpdatesResponse.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); static const $core.Map<$core.int, ReceiveUpdatesResponse_Event> _ReceiveUpdatesResponse_EventByTag = { - 1 : ReceiveUpdatesResponse_Event.entityEvent, - 2 : ReceiveUpdatesResponse_Event.domainEvent, + 2 : ReceiveUpdatesResponse_Event.entityEvent, + 3 : ReceiveUpdatesResponse_Event.domainEvent, 0 : ReceiveUpdatesResponse_Event.notSet }; static final $pb.BuilderInfo _i = $pb.BuilderInfo(_omitMessageNames ? '' : 'ReceiveUpdatesResponse', package: const $pb.PackageName(_omitMessageNames ? '' : 'services.updates_svc.v1'), createEmptyInstance: create) - ..oo(0, [1, 2]) - ..aOM(1, _omitFieldNames ? '' : 'entityEvent', subBuilder: EntityEvent.create) - ..aOM(2, _omitFieldNames ? '' : 'domainEvent', subBuilder: DomainEvent.create) + ..oo(0, [2, 3]) + ..a<$fixnum.Int64>(1, _omitFieldNames ? '' : 'revision', $pb.PbFieldType.OU6, defaultOrMaker: $fixnum.Int64.ZERO) + ..aOM(2, _omitFieldNames ? '' : 'entityEvent', subBuilder: EntityEvent.create) + ..aOM(3, _omitFieldNames ? '' : 'domainEvent', subBuilder: DomainEvent.create) ..hasRequiredFields = false ; @@ -248,26 +272,35 @@ class ReceiveUpdatesResponse extends $pb.GeneratedMessage { void clearEvent() => clearField($_whichOneof(0)); @$pb.TagNumber(1) - EntityEvent get entityEvent => $_getN(0); - @$pb.TagNumber(1) - set entityEvent(EntityEvent v) { setField(1, v); } + $fixnum.Int64 get revision => $_getI64(0); @$pb.TagNumber(1) - $core.bool hasEntityEvent() => $_has(0); + set revision($fixnum.Int64 v) { $_setInt64(0, v); } @$pb.TagNumber(1) - void clearEntityEvent() => clearField(1); + $core.bool hasRevision() => $_has(0); @$pb.TagNumber(1) - EntityEvent ensureEntityEvent() => $_ensure(0); + void clearRevision() => clearField(1); @$pb.TagNumber(2) - DomainEvent get domainEvent => $_getN(1); + EntityEvent get entityEvent => $_getN(1); @$pb.TagNumber(2) - set domainEvent(DomainEvent v) { setField(2, v); } + set entityEvent(EntityEvent v) { setField(2, v); } @$pb.TagNumber(2) - $core.bool hasDomainEvent() => $_has(1); + $core.bool hasEntityEvent() => $_has(1); @$pb.TagNumber(2) - void clearDomainEvent() => clearField(2); + void clearEntityEvent() => clearField(2); @$pb.TagNumber(2) - DomainEvent ensureDomainEvent() => $_ensure(1); + EntityEvent ensureEntityEvent() => $_ensure(1); + + @$pb.TagNumber(3) + DomainEvent get domainEvent => $_getN(2); + @$pb.TagNumber(3) + set domainEvent(DomainEvent v) { setField(3, v); } + @$pb.TagNumber(3) + $core.bool hasDomainEvent() => $_has(2); + @$pb.TagNumber(3) + void clearDomainEvent() => clearField(3); + @$pb.TagNumber(3) + DomainEvent ensureDomainEvent() => $_ensure(2); } diff --git a/gen/dart/lib/services/updates_svc/v1/updates_svc.pbjson.dart b/gen/dart/lib/services/updates_svc/v1/updates_svc.pbjson.dart index 4abff0acd..f426cb6d5 100644 --- a/gen/dart/lib/services/updates_svc/v1/updates_svc.pbjson.dart +++ b/gen/dart/lib/services/updates_svc/v1/updates_svc.pbjson.dart @@ -45,18 +45,26 @@ final $typed_data.Uint8List domainEventDescriptor = $convert.base64Decode( @$core.Deprecated('Use receiveUpdatesRequestDescriptor instead') const ReceiveUpdatesRequest$json = { '1': 'ReceiveUpdatesRequest', + '2': [ + {'1': 'revision', '3': 1, '4': 1, '5': 4, '9': 0, '10': 'revision', '17': true}, + ], + '8': [ + {'1': '_revision'}, + ], }; /// Descriptor for `ReceiveUpdatesRequest`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List receiveUpdatesRequestDescriptor = $convert.base64Decode( - 'ChVSZWNlaXZlVXBkYXRlc1JlcXVlc3Q='); + 'ChVSZWNlaXZlVXBkYXRlc1JlcXVlc3QSHwoIcmV2aXNpb24YASABKARIAFIIcmV2aXNpb26IAQ' + 'FCCwoJX3JldmlzaW9u'); @$core.Deprecated('Use receiveUpdatesResponseDescriptor instead') const ReceiveUpdatesResponse$json = { '1': 'ReceiveUpdatesResponse', '2': [ - {'1': 'entity_event', '3': 1, '4': 1, '5': 11, '6': '.services.updates_svc.v1.EntityEvent', '9': 0, '10': 'entityEvent'}, - {'1': 'domain_event', '3': 2, '4': 1, '5': 11, '6': '.services.updates_svc.v1.DomainEvent', '9': 0, '10': 'domainEvent'}, + {'1': 'revision', '3': 1, '4': 1, '5': 4, '10': 'revision'}, + {'1': 'entity_event', '3': 2, '4': 1, '5': 11, '6': '.services.updates_svc.v1.EntityEvent', '9': 0, '10': 'entityEvent'}, + {'1': 'domain_event', '3': 3, '4': 1, '5': 11, '6': '.services.updates_svc.v1.DomainEvent', '9': 0, '10': 'domainEvent'}, ], '8': [ {'1': 'event'}, @@ -65,8 +73,8 @@ const ReceiveUpdatesResponse$json = { /// Descriptor for `ReceiveUpdatesResponse`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List receiveUpdatesResponseDescriptor = $convert.base64Decode( - 'ChZSZWNlaXZlVXBkYXRlc1Jlc3BvbnNlEkkKDGVudGl0eV9ldmVudBgBIAEoCzIkLnNlcnZpY2' - 'VzLnVwZGF0ZXNfc3ZjLnYxLkVudGl0eUV2ZW50SABSC2VudGl0eUV2ZW50EkkKDGRvbWFpbl9l' - 'dmVudBgCIAEoCzIkLnNlcnZpY2VzLnVwZGF0ZXNfc3ZjLnYxLkRvbWFpbkV2ZW50SABSC2RvbW' - 'FpbkV2ZW50QgcKBWV2ZW50'); + 'ChZSZWNlaXZlVXBkYXRlc1Jlc3BvbnNlEhoKCHJldmlzaW9uGAEgASgEUghyZXZpc2lvbhJJCg' + 'xlbnRpdHlfZXZlbnQYAiABKAsyJC5zZXJ2aWNlcy51cGRhdGVzX3N2Yy52MS5FbnRpdHlFdmVu' + 'dEgAUgtlbnRpdHlFdmVudBJJCgxkb21haW5fZXZlbnQYAyABKAsyJC5zZXJ2aWNlcy51cGRhdG' + 'VzX3N2Yy52MS5Eb21haW5FdmVudEgAUgtkb21haW5FdmVudEIHCgVldmVudA=='); diff --git a/gen/go/services/updates_svc/v1/updates_svc.pb.go b/gen/go/services/updates_svc/v1/updates_svc.pb.go index 5055eb2d3..0e228a194 100644 --- a/gen/go/services/updates_svc/v1/updates_svc.pb.go +++ b/gen/go/services/updates_svc/v1/updates_svc.pb.go @@ -142,6 +142,8 @@ type ReceiveUpdatesRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + Revision *uint64 `protobuf:"varint,1,opt,name=revision,proto3,oneof" json:"revision,omitempty"` } func (x *ReceiveUpdatesRequest) Reset() { @@ -176,11 +178,19 @@ func (*ReceiveUpdatesRequest) Descriptor() ([]byte, []int) { return file_services_updates_svc_v1_updates_svc_proto_rawDescGZIP(), []int{2} } +func (x *ReceiveUpdatesRequest) GetRevision() uint64 { + if x != nil && x.Revision != nil { + return *x.Revision + } + return 0 +} + type ReceiveUpdatesResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + Revision uint64 `protobuf:"varint,1,opt,name=revision,proto3" json:"revision,omitempty"` // Types that are assignable to Event: // // *ReceiveUpdatesResponse_EntityEvent @@ -220,6 +230,13 @@ func (*ReceiveUpdatesResponse) Descriptor() ([]byte, []int) { return file_services_updates_svc_v1_updates_svc_proto_rawDescGZIP(), []int{3} } +func (x *ReceiveUpdatesResponse) GetRevision() uint64 { + if x != nil { + return x.Revision + } + return 0 +} + func (m *ReceiveUpdatesResponse) GetEvent() isReceiveUpdatesResponse_Event { if m != nil { return m.Event @@ -246,11 +263,11 @@ type isReceiveUpdatesResponse_Event interface { } type ReceiveUpdatesResponse_EntityEvent struct { - EntityEvent *EntityEvent `protobuf:"bytes,1,opt,name=entity_event,json=entityEvent,proto3,oneof"` + EntityEvent *EntityEvent `protobuf:"bytes,2,opt,name=entity_event,json=entityEvent,proto3,oneof"` } type ReceiveUpdatesResponse_DomainEvent struct { - DomainEvent *DomainEvent `protobuf:"bytes,2,opt,name=domain_event,json=domainEvent,proto3,oneof"` + DomainEvent *DomainEvent `protobuf:"bytes,3,opt,name=domain_event,json=domainEvent,proto3,oneof"` } func (*ReceiveUpdatesResponse_EntityEvent) isReceiveUpdatesResponse_Event() {} @@ -276,42 +293,46 @@ var file_services_updates_svc_v1_updates_svc_proto_rawDesc = []byte{ 0x52, 0x0b, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, - 0x54, 0x79, 0x70, 0x65, 0x22, 0x17, 0x0a, 0x15, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, - 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xb7, 0x01, - 0x0a, 0x16, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x0c, 0x65, 0x6e, 0x74, 0x69, - 0x74, 0x79, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, - 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x73, 0x5f, 0x73, 0x76, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x12, 0x49, 0x0a, 0x0c, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x5f, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, 0x73, 0x76, 0x63, - 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, - 0x00, 0x52, 0x0b, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x42, 0x07, - 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x32, 0x87, 0x01, 0x0a, 0x0e, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x75, 0x0a, 0x0e, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x2e, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, - 0x73, 0x76, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2f, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, - 0x73, 0x76, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, - 0x01, 0x42, 0xc5, 0x01, 0x0a, 0x1b, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, 0x73, 0x76, 0x63, 0x2e, 0x76, - 0x31, 0x42, 0x0f, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x1b, 0x67, 0x65, 0x6e, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x73, 0x2f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x2d, 0x73, 0x76, 0x63, 0x2f, 0x76, - 0x31, 0xa2, 0x02, 0x03, 0x53, 0x55, 0x58, 0xaa, 0x02, 0x16, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x73, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x2e, 0x56, 0x31, - 0xca, 0x02, 0x16, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x5c, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x22, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x73, 0x5c, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x5c, - 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, - 0x18, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x3a, 0x3a, 0x55, 0x70, 0x64, 0x61, 0x74, - 0x65, 0x73, 0x53, 0x76, 0x63, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x54, 0x79, 0x70, 0x65, 0x22, 0x45, 0x0a, 0x15, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, + 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x48, + 0x00, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x42, 0x0b, + 0x0a, 0x09, 0x5f, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0xd3, 0x01, 0x0a, 0x16, + 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x49, 0x0a, 0x0c, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x5f, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, 0x73, 0x76, 0x63, 0x2e, + 0x76, 0x31, 0x2e, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, + 0x52, 0x0b, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x49, 0x0a, + 0x0c, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x75, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, 0x73, 0x76, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x6f, + 0x6d, 0x61, 0x69, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x64, 0x6f, 0x6d, + 0x61, 0x69, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x42, 0x07, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x32, 0x87, 0x01, 0x0a, 0x0e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x12, 0x75, 0x0a, 0x0e, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x2e, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, 0x73, 0x76, 0x63, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2f, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x2e, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x5f, 0x73, 0x76, 0x63, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0xc5, 0x01, 0x0a, 0x1b, + 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x75, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x73, 0x5f, 0x73, 0x76, 0x63, 0x2e, 0x76, 0x31, 0x42, 0x0f, 0x55, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x1b, + 0x67, 0x65, 0x6e, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x75, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x73, 0x2d, 0x73, 0x76, 0x63, 0x2f, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x53, 0x55, + 0x58, 0xaa, 0x02, 0x16, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x55, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x16, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x73, 0x5c, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, + 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x22, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x5c, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x18, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x73, 0x3a, 0x3a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x53, 0x76, 0x63, 0x3a, + 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -400,6 +421,7 @@ func file_services_updates_svc_v1_updates_svc_proto_init() { } } } + file_services_updates_svc_v1_updates_svc_proto_msgTypes[2].OneofWrappers = []interface{}{} file_services_updates_svc_v1_updates_svc_proto_msgTypes[3].OneofWrappers = []interface{}{ (*ReceiveUpdatesResponse_EntityEvent)(nil), (*ReceiveUpdatesResponse_DomainEvent)(nil), diff --git a/gen/ts/services/updates_svc/v1/updates_svc_pb.d.ts b/gen/ts/services/updates_svc/v1/updates_svc_pb.d.ts index fac0d407f..29e342ec5 100644 --- a/gen/ts/services/updates_svc/v1/updates_svc_pb.d.ts +++ b/gen/ts/services/updates_svc/v1/updates_svc_pb.d.ts @@ -51,6 +51,11 @@ export namespace DomainEvent { } export class ReceiveUpdatesRequest extends jspb.Message { + getRevision(): number; + setRevision(value: number): ReceiveUpdatesRequest; + hasRevision(): boolean; + clearRevision(): ReceiveUpdatesRequest; + serializeBinary(): Uint8Array; toObject(includeInstance?: boolean): ReceiveUpdatesRequest.AsObject; static toObject(includeInstance: boolean, msg: ReceiveUpdatesRequest): ReceiveUpdatesRequest.AsObject; @@ -61,10 +66,19 @@ export class ReceiveUpdatesRequest extends jspb.Message { export namespace ReceiveUpdatesRequest { export type AsObject = { + revision?: number, + } + + export enum RevisionCase { + _REVISION_NOT_SET = 0, + REVISION = 1, } } export class ReceiveUpdatesResponse extends jspb.Message { + getRevision(): number; + setRevision(value: number): ReceiveUpdatesResponse; + getEntityEvent(): EntityEvent | undefined; setEntityEvent(value?: EntityEvent): ReceiveUpdatesResponse; hasEntityEvent(): boolean; @@ -87,14 +101,15 @@ export class ReceiveUpdatesResponse extends jspb.Message { export namespace ReceiveUpdatesResponse { export type AsObject = { + revision: number, entityEvent?: EntityEvent.AsObject, domainEvent?: DomainEvent.AsObject, } export enum EventCase { EVENT_NOT_SET = 0, - ENTITY_EVENT = 1, - DOMAIN_EVENT = 2, + ENTITY_EVENT = 2, + DOMAIN_EVENT = 3, } } diff --git a/gen/ts/services/updates_svc/v1/updates_svc_pb.js b/gen/ts/services/updates_svc/v1/updates_svc_pb.js index efdf6dd06..1cde2db01 100644 --- a/gen/ts/services/updates_svc/v1/updates_svc_pb.js +++ b/gen/ts/services/updates_svc/v1/updates_svc_pb.js @@ -492,7 +492,7 @@ proto.services.updates_svc.v1.ReceiveUpdatesRequest.prototype.toObject = functio */ proto.services.updates_svc.v1.ReceiveUpdatesRequest.toObject = function(includeInstance, msg) { var f, obj = { - + revision: jspb.Message.getFieldWithDefault(msg, 1, 0) }; if (includeInstance) { @@ -529,6 +529,10 @@ proto.services.updates_svc.v1.ReceiveUpdatesRequest.deserializeBinaryFromReader } var field = reader.getFieldNumber(); switch (field) { + case 1: + var value = /** @type {number} */ (reader.readUint64()); + msg.setRevision(value); + break; default: reader.skipField(); break; @@ -558,6 +562,49 @@ proto.services.updates_svc.v1.ReceiveUpdatesRequest.prototype.serializeBinary = */ proto.services.updates_svc.v1.ReceiveUpdatesRequest.serializeBinaryToWriter = function(message, writer) { var f = undefined; + f = /** @type {number} */ (jspb.Message.getField(message, 1)); + if (f != null) { + writer.writeUint64( + 1, + f + ); + } +}; + + +/** + * optional uint64 revision = 1; + * @return {number} + */ +proto.services.updates_svc.v1.ReceiveUpdatesRequest.prototype.getRevision = function() { + return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 1, 0)); +}; + + +/** + * @param {number} value + * @return {!proto.services.updates_svc.v1.ReceiveUpdatesRequest} returns this + */ +proto.services.updates_svc.v1.ReceiveUpdatesRequest.prototype.setRevision = function(value) { + return jspb.Message.setField(this, 1, value); +}; + + +/** + * Clears the field making it undefined. + * @return {!proto.services.updates_svc.v1.ReceiveUpdatesRequest} returns this + */ +proto.services.updates_svc.v1.ReceiveUpdatesRequest.prototype.clearRevision = function() { + return jspb.Message.setField(this, 1, undefined); +}; + + +/** + * Returns whether this field is set. + * @return {boolean} + */ +proto.services.updates_svc.v1.ReceiveUpdatesRequest.prototype.hasRevision = function() { + return jspb.Message.getField(this, 1) != null; }; @@ -570,15 +617,15 @@ proto.services.updates_svc.v1.ReceiveUpdatesRequest.serializeBinaryToWriter = fu * @private {!Array>} * @const */ -proto.services.updates_svc.v1.ReceiveUpdatesResponse.oneofGroups_ = [[1,2]]; +proto.services.updates_svc.v1.ReceiveUpdatesResponse.oneofGroups_ = [[2,3]]; /** * @enum {number} */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.EventCase = { EVENT_NOT_SET: 0, - ENTITY_EVENT: 1, - DOMAIN_EVENT: 2 + ENTITY_EVENT: 2, + DOMAIN_EVENT: 3 }; /** @@ -619,6 +666,7 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.toObject = functi */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.toObject = function(includeInstance, msg) { var f, obj = { + revision: jspb.Message.getFieldWithDefault(msg, 1, 0), entityEvent: (f = msg.getEntityEvent()) && proto.services.updates_svc.v1.EntityEvent.toObject(includeInstance, f), domainEvent: (f = msg.getDomainEvent()) && proto.services.updates_svc.v1.DomainEvent.toObject(includeInstance, f) }; @@ -658,11 +706,15 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.deserializeBinaryFromReader var field = reader.getFieldNumber(); switch (field) { case 1: + var value = /** @type {number} */ (reader.readUint64()); + msg.setRevision(value); + break; + case 2: var value = new proto.services.updates_svc.v1.EntityEvent; reader.readMessage(value,proto.services.updates_svc.v1.EntityEvent.deserializeBinaryFromReader); msg.setEntityEvent(value); break; - case 2: + case 3: var value = new proto.services.updates_svc.v1.DomainEvent; reader.readMessage(value,proto.services.updates_svc.v1.DomainEvent.deserializeBinaryFromReader); msg.setDomainEvent(value); @@ -696,10 +748,17 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.serializeBinary = */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.serializeBinaryToWriter = function(message, writer) { var f = undefined; + f = message.getRevision(); + if (f !== 0) { + writer.writeUint64( + 1, + f + ); + } f = message.getEntityEvent(); if (f != null) { writer.writeMessage( - 1, + 2, f, proto.services.updates_svc.v1.EntityEvent.serializeBinaryToWriter ); @@ -707,7 +766,7 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.serializeBinaryToWriter = f f = message.getDomainEvent(); if (f != null) { writer.writeMessage( - 2, + 3, f, proto.services.updates_svc.v1.DomainEvent.serializeBinaryToWriter ); @@ -716,12 +775,30 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.serializeBinaryToWriter = f /** - * optional EntityEvent entity_event = 1; + * optional uint64 revision = 1; + * @return {number} + */ +proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.getRevision = function() { + return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 1, 0)); +}; + + +/** + * @param {number} value + * @return {!proto.services.updates_svc.v1.ReceiveUpdatesResponse} returns this + */ +proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.setRevision = function(value) { + return jspb.Message.setProto3IntField(this, 1, value); +}; + + +/** + * optional EntityEvent entity_event = 2; * @return {?proto.services.updates_svc.v1.EntityEvent} */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.getEntityEvent = function() { return /** @type{?proto.services.updates_svc.v1.EntityEvent} */ ( - jspb.Message.getWrapperField(this, proto.services.updates_svc.v1.EntityEvent, 1)); + jspb.Message.getWrapperField(this, proto.services.updates_svc.v1.EntityEvent, 2)); }; @@ -730,7 +807,7 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.getEntityEvent = * @return {!proto.services.updates_svc.v1.ReceiveUpdatesResponse} returns this */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.setEntityEvent = function(value) { - return jspb.Message.setOneofWrapperField(this, 1, proto.services.updates_svc.v1.ReceiveUpdatesResponse.oneofGroups_[0], value); + return jspb.Message.setOneofWrapperField(this, 2, proto.services.updates_svc.v1.ReceiveUpdatesResponse.oneofGroups_[0], value); }; @@ -748,17 +825,17 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.clearEntityEvent * @return {boolean} */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.hasEntityEvent = function() { - return jspb.Message.getField(this, 1) != null; + return jspb.Message.getField(this, 2) != null; }; /** - * optional DomainEvent domain_event = 2; + * optional DomainEvent domain_event = 3; * @return {?proto.services.updates_svc.v1.DomainEvent} */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.getDomainEvent = function() { return /** @type{?proto.services.updates_svc.v1.DomainEvent} */ ( - jspb.Message.getWrapperField(this, proto.services.updates_svc.v1.DomainEvent, 2)); + jspb.Message.getWrapperField(this, proto.services.updates_svc.v1.DomainEvent, 3)); }; @@ -767,7 +844,7 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.getDomainEvent = * @return {!proto.services.updates_svc.v1.ReceiveUpdatesResponse} returns this */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.setDomainEvent = function(value) { - return jspb.Message.setOneofWrapperField(this, 2, proto.services.updates_svc.v1.ReceiveUpdatesResponse.oneofGroups_[0], value); + return jspb.Message.setOneofWrapperField(this, 3, proto.services.updates_svc.v1.ReceiveUpdatesResponse.oneofGroups_[0], value); }; @@ -785,7 +862,7 @@ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.clearDomainEvent * @return {boolean} */ proto.services.updates_svc.v1.ReceiveUpdatesResponse.prototype.hasDomainEvent = function() { - return jspb.Message.getField(this, 2) != null; + return jspb.Message.getField(this, 3) != null; }; diff --git a/libs/common/auth/auth.go b/libs/common/auth/auth.go index cc691f807..a904796f1 100644 --- a/libs/common/auth/auth.go +++ b/libs/common/auth/auth.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/status" "hwutil" "telemetry" + "time" ) // to avoid ambiguity please read: https://wiki.helpwave.de/doc/keycloak-jedzCcERwF @@ -26,9 +27,11 @@ var ( oauthConfig *oauth2.Config Verifier *oidc.IDTokenVerifier authSetupDone bool + FakeTokenValidFor = time.Second * 30 ) type ClaimsKey struct{} +type TokenExpires struct{} type UserIDKey struct{} type OrganizationIDKey struct{} @@ -140,49 +143,50 @@ func (t IDTokenClaims) AsExpected() error { // VerifyIDToken verifies the correctness of the accessToken and returns its claim. // The claim is checked to be as expected. // Service must be set up with auth! -func VerifyIDToken(ctx context.Context, token string) (*IDTokenClaims, error) { +func VerifyIDToken(ctx context.Context, token string) (*IDTokenClaims, *time.Time, error) { // Verify() verifies formal validity, proper signing, usage of the correct keys, ... // and still exposes .Claims() for us to access non-standard ID token claims idToken, err := GetIDTokenVerifier(ctx).Verify(context.Background(), token) if err != nil { - return nil, fmt.Errorf("getIDTokenVerifier: verify failed: %w", err) + return nil, nil, fmt.Errorf("getIDTokenVerifier: verify failed: %w", err) } // now get the claims claims := IDTokenClaims{} if err = idToken.Claims(&claims); err != nil { - return nil, fmt.Errorf("getIDTokenVerifier: could not get claims: %w", err) + return nil, nil, fmt.Errorf("getIDTokenVerifier: could not get claims: %w", err) } // and check that they are in the expected format if err = claims.AsExpected(); err != nil { - return nil, fmt.Errorf("getIDTokenVerifier: claims are not as expected: %w", err) + return nil, nil, fmt.Errorf("getIDTokenVerifier: claims are not as expected: %w", err) } - return &claims, nil + return &claims, &idToken.Expiry, nil } // VerifyFakeToken accepts a Base64 encoded json structure with the schema of IDTokenClaims -func VerifyFakeToken(ctx context.Context, token string) (*IDTokenClaims, error) { +func VerifyFakeToken(ctx context.Context, token string) (*IDTokenClaims, *time.Time, error) { log := zlog.Ctx(ctx) plainToken, err := base64.StdEncoding.DecodeString(token) if err != nil { - return nil, fmt.Errorf("VerifyFakeToken: cant decode fake token: %w", err) + return nil, nil, fmt.Errorf("VerifyFakeToken: cant decode fake token: %w", err) } claims := IDTokenClaims{} if err := hwutil.ParseValidJson(plainToken, &claims); err != nil { - return nil, fmt.Errorf("VerifyFakeToken: cant parse json: %w", err) + return nil, nil, fmt.Errorf("VerifyFakeToken: cant parse json: %w", err) } if err = claims.AsExpected(); err != nil { - return nil, fmt.Errorf("VerifyFakeToken: claims not as expected: %w", err) + return nil, nil, fmt.Errorf("VerifyFakeToken: claims not as expected: %w", err) } log.Warn().Interface("claims", claims).Msg("fake token was verified") - return &claims, err + expiry := time.Now().Add(FakeTokenValidFor) + return &claims, &expiry, err } func ContextWithUserID(ctx context.Context, userID uuid.UUID) context.Context { @@ -206,6 +210,16 @@ func GetAuthClaims(ctx context.Context) (*IDTokenClaims, error) { } } +// SessionValidUntil returns time.Time when the session gets marked as expired +func SessionValidUntil(ctx context.Context) (time.Time, error) { + res, ok := ctx.Value(TokenExpires{}).(*time.Time) + if !ok { + return time.Now(), status.Error(codes.Internal, "tokenExpires not in context, set up auth") + } else { + return *res, nil + } +} + func GetUserID(ctx context.Context) (uuid.UUID, error) { res, ok := ctx.Value(UserIDKey{}).(uuid.UUID) if !ok { diff --git a/libs/common/hwgrpc/auth_interceptor.go b/libs/common/hwgrpc/auth_interceptor.go index de52ecd9b..29a0a48b2 100644 --- a/libs/common/hwgrpc/auth_interceptor.go +++ b/libs/common/hwgrpc/auth_interceptor.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "telemetry" + "time" ) func UnaryAuthInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, next grpc.UnaryHandler) (any, error) { @@ -51,15 +52,18 @@ func authInterceptor(ctx context.Context) (context.Context, error) { return nil, status.Errorf(codes.Unauthenticated, "no auth token: %v", err) } - var claims *auth.IDTokenClaims + var ( + claims *auth.IDTokenClaims + tokenExpires *time.Time + ) if auth.IsOnlyFakeAuthEnabled() { log.Warn(). Msg("only fake auth is enabled! no attempt verifying token. falling back to fake token instead") - claims, err = auth.VerifyFakeToken(ctx, token) + claims, tokenExpires, err = auth.VerifyFakeToken(ctx, token) } else { // verify token -> if fakeToken is used claims will be nil and we will get an error - claims, err = auth.VerifyIDToken(ctx, token) + claims, tokenExpires, err = auth.VerifyIDToken(ctx, token) } // If auth.IsInsecureFakeTokenEnabled() is true and Mode is development, @@ -68,16 +72,19 @@ func authInterceptor(ctx context.Context) (context.Context, error) { // ONLY FOR NON-PUBLIC DEVELOPMENT AND STAGING ENVIRONMENTS if claims == nil && err != nil && auth.IsInsecureFakeTokenEnabled() { log.Warn().Msg("could not verify token, falling back to fake token instead") - claims, err = auth.VerifyFakeToken(ctx, token) + claims, tokenExpires, err = auth.VerifyFakeToken(ctx, token) } - if err != nil || claims == nil { + if err != nil || claims == nil || tokenExpires == nil { return nil, status.Errorf(codes.Unauthenticated, "invalid auth token: %v", err) } // attach claims to the context, so we can get it in a handler using GetAuthClaims() ctx = context.WithValue(ctx, auth.ClaimsKey{}, claims) + // attach token expires time to the context + ctx = context.WithValue(ctx, auth.TokenExpires{}, tokenExpires) + // parse userID userID, err := uuid.Parse(claims.Sub) if err != nil { diff --git a/libs/hwtesting/containers.go b/libs/hwtesting/containers.go index 2c22627c1..44f995af5 100644 --- a/libs/hwtesting/containers.go +++ b/libs/hwtesting/containers.go @@ -4,7 +4,6 @@ import ( "context" _ "github.com/golang-migrate/migrate/v4/database/postgres" _ "github.com/golang-migrate/migrate/v4/source/file" - "hwutil" "sync" "time" ) diff --git a/proto/services/updates_svc/v1/updates_svc.proto b/proto/services/updates_svc/v1/updates_svc.proto index 127d60ec5..322858d40 100644 --- a/proto/services/updates_svc/v1/updates_svc.proto +++ b/proto/services/updates_svc/v1/updates_svc.proto @@ -20,7 +20,7 @@ message DomainEvent { // message ReceiveUpdatesRequest { - + optional uint64 revision = 1; } // @@ -28,9 +28,10 @@ message ReceiveUpdatesRequest { // message ReceiveUpdatesResponse { + uint64 revision = 1; oneof event { - EntityEvent entity_event = 1; - DomainEvent domain_event = 2; + EntityEvent entity_event = 2; + DomainEvent domain_event = 3; } } diff --git a/services/updates-svc/cmd/service/main.go b/services/updates-svc/cmd/service/main.go index d1c6122e6..a7d1c2d16 100644 --- a/services/updates-svc/cmd/service/main.go +++ b/services/updates-svc/cmd/service/main.go @@ -13,7 +13,6 @@ const ServiceName = "updates-svc" func Main(version string, ready func()) { ctx := common.Setup(ServiceName, version, common.WithAuth()) - // authz := hwspicedb.NewSpiceDBAuthZ() eventStore := eventstoredb.SetupEventStoreByEnv() common.StartNewGRPCServer(ctx, common.ResolveAddrFromEnv(), func(server *daprd.Server) { diff --git a/services/updates-svc/go.mod b/services/updates-svc/go.mod index 8633e9e8a..7f61ae37d 100644 --- a/services/updates-svc/go.mod +++ b/services/updates-svc/go.mod @@ -21,13 +21,11 @@ require ( github.com/EventStore/EventStore-Client-Go/v4 v4.1.0 github.com/dapr/go-sdk v1.11.0 github.com/golang-migrate/migrate/v4 v4.18.1 - github.com/google/uuid v1.6.0 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.9.0 - google.golang.org/protobuf v1.34.2 hwes v0.0.0 hwtesting v0.0.0-00010101000000-000000000000 - hwutil v0.0.0 + telemetry v0.0.0 ) require ( @@ -60,7 +58,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.22.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect @@ -123,8 +121,9 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/grpc v1.66.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect hwlocale v0.0.0 // indirect - telemetry v0.0.0 // indirect + hwutil v0.0.0 // indirect ) diff --git a/services/updates-svc/go.sum b/services/updates-svc/go.sum index 243c1e1f6..c8ad93653 100644 --- a/services/updates-svc/go.sum +++ b/services/updates-svc/go.sum @@ -89,8 +89,6 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-migrate/migrate/v4 v4.18.1 h1:JML/k+t4tpHCpQTCAD62Nu43NUFzHY4CV3uAuvHGC+Y= github.com/golang-migrate/migrate/v4 v4.18.1/go.mod h1:HAX6m3sQgcdO81tdjn5exv20+3Kb13cmGli1hrD6hks= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= diff --git a/services/updates-svc/internal/updates/grpc.go b/services/updates-svc/internal/updates/grpc.go index 4c100d63d..34070c571 100644 --- a/services/updates-svc/internal/updates/grpc.go +++ b/services/updates-svc/internal/updates/grpc.go @@ -1,9 +1,13 @@ package updates import ( + "common/auth" + "context" pb "gen/services/updates_svc/v1" "github.com/EventStore/EventStore-Client-Go/v4/esdb" "hwes" + "telemetry" + "time" ) type UpdatesGrpcService struct { @@ -18,13 +22,51 @@ func NewUpdatesGrpcService(es *esdb.Client) *UpdatesGrpcService { } func (s *UpdatesGrpcService) ReceiveUpdates(req *pb.ReceiveUpdatesRequest, stream pb.UpdatesService_ReceiveUpdatesServer) error { - ctx := stream.Context() + ctx, span, log := telemetry.StartSpan(stream.Context(), "updates.ReceiveUpdates") + defer span.End() + + tokenExpires, err := auth.SessionValidUntil(ctx) + if err != nil { + return err + } + + tokenExpiresIn := time.Until(tokenExpires) + + ctx, cancel := context.WithCancel(ctx) + // Close stream after session is invalid + time.AfterFunc(tokenExpiresIn, func() { + log.Debug().Msg("closing stream, token expired") + cancel() + }) + + log.Debug(). + // We are using .Str() instead of .Dur() to enhance human readability in the logs + // .Dur() -> 210000 + // .Str() -> 3m30s + Str("closeStreamIn", tokenExpiresIn.String()). + Msg("will close session when token expires") + + organizationID, err := auth.GetOrganizationID(ctx) + if err != nil { + return err + } esSubscribeToAllOptions := esdb.SubscribeToAllOptions{ From: esdb.End{}, Filter: esdb.ExcludeSystemEventsFilter(), } + if req.Revision != nil { + log.Debug(). + Uint64("revision", *req.Revision). + Msg("revision provided to begin stream") + + esSubscribeToAllOptions.From = esdb.Position{ + Commit: *req.Revision, + Prepare: *req.Revision, + } + } + esStream, err := s.es.SubscribeToAll(ctx, esSubscribeToAllOptions) if err != nil { return err @@ -42,22 +84,31 @@ func (s *UpdatesGrpcService) ReceiveUpdates(req *pb.ReceiveUpdatesRequest, strea if esdbEvent.SubscriptionDropped != nil { esStream.Close() + log.Debug().Msg("subscription dropped, close stream") break } if esdbEvent.CaughtUp != nil { + log.Debug().Msg("caught up, continue") continue } if esdbEvent.EventAppeared == nil || esdbEvent.EventAppeared.Event == nil { + log.Debug().Msg("no event in esdb event, continue") continue } event, err := hwes.NewEventFromRecordedEvent(esdbEvent.EventAppeared.Event) if err != nil { + log.Error().Err(err).Msg("cannot create new event from recorded event") return err } + // Only handle events for the users organization + if event.OrganizationID != nil && *event.OrganizationID != organizationID { + continue + } + domainEvent := &pb.DomainEvent{ EventId: event.EventID.String(), EventType: event.EventType, @@ -66,12 +117,14 @@ func (s *UpdatesGrpcService) ReceiveUpdates(req *pb.ReceiveUpdatesRequest, strea } res := &pb.ReceiveUpdatesResponse{ + Revision: *esdbEvent.EventAppeared.Commit, Event: &pb.ReceiveUpdatesResponse_DomainEvent{ DomainEvent: domainEvent, }, } if err := stream.Send(res); err != nil { + log.Error().Err(err).Msg("cannot send on stream") return err } } diff --git a/services/updates-svc/stories/setup_test.go b/services/updates-svc/stories/setup_test.go index f4ffe9d12..74529fa67 100644 --- a/services/updates-svc/stories/setup_test.go +++ b/services/updates-svc/stories/setup_test.go @@ -19,19 +19,16 @@ func TestMain(m *testing.M) { ctx, cancel := context.WithCancel(context.Background()) zlog.Info().Msg("starting containers") - endpoints, teardownContainers := hwtesting.StartContainers(ctx, hwtesting.Eventstore, hwtesting.Spice) + endpoints, teardownContainers := hwtesting.StartContainers(ctx, hwtesting.Eventstore) eventstoreEndpoint := endpoints.Get(hwtesting.Eventstore) - spiceEndpoint := endpoints.Get(hwtesting.Spice) zlog.Info(). Str("eventstoreEndpoint", eventstoreEndpoint). - Str("spiceEndpoint", spiceEndpoint). Msg("containers are up") // prepare env hwtesting.SetCommonEnv() hwtesting.SetEventstoreEnv(eventstoreEndpoint) - hwtesting.SetSpiceEnv(spiceEndpoint) // TODO: spice migrations (PR #812) diff --git a/services/updates-svc/stories/updates_test.go b/services/updates-svc/stories/updates_test.go index 1034e2383..b1a96de92 100644 --- a/services/updates-svc/stories/updates_test.go +++ b/services/updates-svc/stories/updates_test.go @@ -1,10 +1,13 @@ package stories import ( + "common/auth" "context" pb "gen/services/updates_svc/v1" "github.com/stretchr/testify/assert" + "io" "testing" + "time" ) func TestOpenAndClosingReceiveUpdatesStream(t *testing.T) { @@ -16,3 +19,34 @@ func TestOpenAndClosingReceiveUpdatesStream(t *testing.T) { assert.NoError(t, err) assert.NoError(t, stream.CloseSend()) } + +func TestAutoClosingWhenTokenExpiresReceiveUpdateStream(t *testing.T) { + ctx := context.Background() + updatesClient := updatesServiceClient() + + auth.FakeTokenValidFor = time.Second * 3 + upperBound := auth.FakeTokenValidFor + (time.Second * 1) + lowerBound := auth.FakeTokenValidFor - (time.Second * 1) + + req := &pb.ReceiveUpdatesRequest{} + + timeBeforeOpeningStream := time.Now() + stream, err := updatesClient.ReceiveUpdates(ctx, req) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(ctx, auth.FakeTokenValidFor+(time.Second*3)) + defer cancel() + + for { + _, err := stream.Recv() + if err == io.EOF { + elapsedTime := time.Since(timeBeforeOpeningStream) + if elapsedTime > upperBound || elapsedTime < lowerBound { + t.Errorf("Connection timeout is out of bounds (upper bound %s, lower bound %s) with %s", upperBound, lowerBound, elapsedTime) + return + } + return + } + assert.NoError(t, err) + } +}