diff --git a/api/client/events.go b/api/client/events.go index e856a77d26d2..0cce9664d248 100644 --- a/api/client/events.go +++ b/api/client/events.go @@ -254,6 +254,10 @@ func EventToGRPC(in types.Event) (*proto.Event, error) { out.Resource = &proto.Event_WindowsDesktop{ WindowsDesktop: r, } + case *types.DynamicWindowsDesktopV1: + out.Resource = &proto.Event_DynamicWindowsDesktop{ + DynamicWindowsDesktop: r, + } case *types.InstallerV1: out.Resource = &proto.Event_Installer{ Installer: r, @@ -444,6 +448,9 @@ func EventFromGRPC(in *proto.Event) (*types.Event, error) { } else if r := in.GetWindowsDesktop(); r != nil { out.Resource = r return &out, nil + } else if r := in.GetDynamicWindowsDesktop(); r != nil { + out.Resource = r + return &out, nil } else if r := in.GetKubernetesServer(); r != nil { out.Resource = r return &out, nil diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 47678d0b89a4..312705c25819 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -161,6 +161,7 @@ func ForAuth(cfg Config) Config { {Kind: types.KindLock}, {Kind: types.KindWindowsDesktopService}, {Kind: types.KindWindowsDesktop}, + {Kind: types.KindDynamicWindowsDesktop}, {Kind: types.KindKubeServer}, {Kind: types.KindInstaller}, {Kind: types.KindKubernetesCluster}, @@ -233,6 +234,7 @@ func ForProxy(cfg Config) Config { {Kind: types.KindDatabase}, {Kind: types.KindWindowsDesktopService}, {Kind: types.KindWindowsDesktop}, + {Kind: types.KindDynamicWindowsDesktop}, {Kind: types.KindKubeServer}, {Kind: types.KindInstaller}, {Kind: types.KindKubernetesCluster}, @@ -392,6 +394,7 @@ func ForWindowsDesktop(cfg Config) Config { {Kind: types.KindNamespace, Name: apidefaults.Namespace}, {Kind: types.KindWindowsDesktopService}, {Kind: types.KindWindowsDesktop}, + {Kind: types.KindDynamicWindowsDesktop}, } cfg.QueueSize = defaults.WindowsDesktopQueueSize return cfg @@ -520,6 +523,7 @@ type Cache struct { webSessionCache types.WebSessionInterface webTokenCache types.WebTokenInterface windowsDesktopsCache services.WindowsDesktops + dynamicWindowsDesktopsCache services.DynamicWindowsDesktops samlIdPServiceProvidersCache services.SAMLIdPServiceProviders //nolint:revive // Because we want this to be IdP. userGroupsCache services.UserGroups oktaCache services.Okta @@ -690,6 +694,8 @@ type Config struct { WebToken types.WebTokenInterface // WindowsDesktops is a windows desktop service. WindowsDesktops services.WindowsDesktops + // DynamicWindowsDesktops is a dynamic Windows desktop service. + DynamicWindowsDesktops services.DynamicWindowsDesktops // SAMLIdPServiceProviders is a SAML IdP service providers service. SAMLIdPServiceProviders services.SAMLIdPServiceProviders // UserGroups is a user groups service. @@ -993,6 +999,12 @@ func New(config Config) (*Cache, error) { return nil, trace.Wrap(err) } + dynamicDesktopsService, err := local.NewDynamicWindowsDesktopService(config.Backend) + if err != nil { + cancel() + return nil, trace.Wrap(err) + } + cs := &Cache{ ctx: ctx, cancel: cancel, @@ -1019,6 +1031,7 @@ func New(config Config) (*Cache, error) { webSessionCache: identityService.WebSessions(), webTokenCache: identityService.WebTokens(), windowsDesktopsCache: local.NewWindowsDesktopService(config.Backend), + dynamicWindowsDesktopsCache: dynamicDesktopsService, accessMontoringRuleCache: accessMonitoringRuleCache, samlIdPServiceProvidersCache: samlIdPServiceProvidersCache, userGroupsCache: userGroupsCache, @@ -2822,6 +2835,32 @@ func (c *Cache) ListWindowsDesktopServices(ctx context.Context, req types.ListWi return rg.reader.ListWindowsDesktopServices(ctx, req) } +// GetDynamicWindowsDesktop returns registered dynamic Windows desktop by name. +func (c *Cache) GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) { + ctx, span := c.Tracer.Start(ctx, "cache/GetDynamicWindowsDesktop") + defer span.End() + + rg, err := readCollectionCache(c, c.collections.dynamicWindowsDesktops) + if err != nil { + return nil, trace.Wrap(err) + } + defer rg.Release() + return rg.reader.GetDynamicWindowsDesktop(ctx, name) +} + +// ListDynamicWindowsDesktops returns all registered dynamic Windows desktop. +func (c *Cache) ListDynamicWindowsDesktops(ctx context.Context, pageSize int, nextPage string) ([]types.DynamicWindowsDesktop, string, error) { + ctx, span := c.Tracer.Start(ctx, "cache/ListDynamicWindowsDesktops") + defer span.End() + + rg, err := readCollectionCache(c, c.collections.dynamicWindowsDesktops) + if err != nil { + return nil, "", trace.Wrap(err) + } + defer rg.Release() + return rg.reader.ListDynamicWindowsDesktops(ctx, pageSize, nextPage) +} + // ListSAMLIdPServiceProviders returns a paginated list of SAML IdP service provider resources. func (c *Cache) ListSAMLIdPServiceProviders(ctx context.Context, pageSize int, nextKey string) ([]types.SAMLIdPServiceProvider, string, error) { ctx, span := c.Tracer.Start(ctx, "cache/ListSAMLIdPServiceProviders") diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 7a9546bc1762..6bbae8148f23 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -3424,6 +3424,7 @@ func TestCacheWatchKindExistsInEvents(t *testing.T) { types.KindLock: &types.LockV2{}, types.KindWindowsDesktopService: &types.WindowsDesktopServiceV3{}, types.KindWindowsDesktop: &types.WindowsDesktopV3{}, + types.KindDynamicWindowsDesktop: &types.DynamicWindowsDesktopV1{}, types.KindInstaller: &types.InstallerV1{}, types.KindKubernetesCluster: &types.KubernetesClusterV3{}, types.KindSAMLIdPServiceProvider: &types.SAMLIdPServiceProviderV1{}, diff --git a/lib/cache/collections.go b/lib/cache/collections.go index 093d52b4c374..fb7b04850c4c 100644 --- a/lib/cache/collections.go +++ b/lib/cache/collections.go @@ -22,6 +22,7 @@ package cache import ( "context" "fmt" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/trace" @@ -258,6 +259,7 @@ type cacheCollections struct { webSessions collectionReader[webSessionGetter] webTokens collectionReader[webTokenGetter] windowsDesktops collectionReader[windowsDesktopsGetter] + dynamicWindowsDesktops collectionReader[dynamicWindowsDesktopsGetter] windowsDesktopServices collectionReader[windowsDesktopServiceGetter] userNotifications collectionReader[notificationGetter] accessGraphSettings collectionReader[accessGraphSettingsGetter] @@ -621,6 +623,15 @@ func setupCollections(c *Cache, watches []types.WatchKind) (*cacheCollections, e watch: watch, } collections.byKind[resourceKind] = collections.windowsDesktops + case types.KindDynamicWindowsDesktop: + if c.WindowsDesktops == nil { + return nil, trace.BadParameter("missing parameter DynamicWindowsDesktops") + } + collections.dynamicWindowsDesktops = &genericCollection[types.DynamicWindowsDesktop, dynamicWindowsDesktopsGetter, dynamicWindowsDesktopsExecutor]{ + cache: c, + watch: watch, + } + collections.byKind[resourceKind] = collections.dynamicWindowsDesktops case types.KindSAMLIdPServiceProvider: if c.SAMLIdPServiceProviders == nil { return nil, trace.BadParameter("missing parameter SAMLIdPServiceProviders") @@ -2318,6 +2329,54 @@ type windowsDesktopsGetter interface { var _ executor[types.WindowsDesktop, windowsDesktopsGetter] = windowsDesktopsExecutor{} +type dynamicWindowsDesktopsExecutor struct{} + +func (dynamicWindowsDesktopsExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]types.DynamicWindowsDesktop, error) { + var desktops []types.DynamicWindowsDesktop + next := "" + for { + d, token, err := cache.dynamicWindowsDesktopsCache.ListDynamicWindowsDesktops(ctx, defaults.MaxIterationLimit, next) + if err != nil { + return nil, err + } + desktops = append(desktops, d...) + if token == "" { + break + } + next = token + } + return desktops, nil +} + +func (dynamicWindowsDesktopsExecutor) upsert(ctx context.Context, cache *Cache, resource types.DynamicWindowsDesktop) error { + _, err := cache.dynamicWindowsDesktopsCache.UpsertDynamicWindowsDesktop(ctx, resource) + return err +} + +func (dynamicWindowsDesktopsExecutor) deleteAll(ctx context.Context, cache *Cache) error { + return cache.dynamicWindowsDesktopsCache.DeleteAllDynamicWindowsDesktops(ctx) +} + +func (dynamicWindowsDesktopsExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error { + return cache.dynamicWindowsDesktopsCache.DeleteDynamicWindowsDesktop(ctx, resource.GetName()) +} + +func (dynamicWindowsDesktopsExecutor) isSingleton() bool { return false } + +func (dynamicWindowsDesktopsExecutor) getReader(cache *Cache, cacheOK bool) dynamicWindowsDesktopsGetter { + if cacheOK { + return cache.dynamicWindowsDesktopsCache + } + return cache.Config.DynamicWindowsDesktops +} + +type dynamicWindowsDesktopsGetter interface { + GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) + ListDynamicWindowsDesktops(ctx context.Context, pageSize int, nextPage string) ([]types.DynamicWindowsDesktop, string, error) +} + +var _ executor[types.DynamicWindowsDesktop, dynamicWindowsDesktopsGetter] = dynamicWindowsDesktopsExecutor{} + type kubeClusterExecutor struct{} func (kubeClusterExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]types.KubeCluster, error) { diff --git a/lib/services/dynamic_desktop.go b/lib/services/dynamic_desktop.go index 334747b82226..806361e2fbba 100644 --- a/lib/services/dynamic_desktop.go +++ b/lib/services/dynamic_desktop.go @@ -30,16 +30,17 @@ import ( // DynamicWindowsDesktops defines an interface for managing dynamic Windows desktops. type DynamicWindowsDesktops interface { DynamicWindowsDesktopGetter + GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) CreateDynamicWindowsDesktop(context.Context, types.DynamicWindowsDesktop) (types.DynamicWindowsDesktop, error) UpdateDynamicWindowsDesktop(context.Context, types.DynamicWindowsDesktop) (types.DynamicWindowsDesktop, error) UpsertDynamicWindowsDesktop(context.Context, types.DynamicWindowsDesktop) (types.DynamicWindowsDesktop, error) DeleteDynamicWindowsDesktop(ctx context.Context, name string) error - ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) + DeleteAllDynamicWindowsDesktops(ctx context.Context) error } // DynamicWindowsDesktopGetter is an interface for fetching DynamicWindowsDesktop resources. type DynamicWindowsDesktopGetter interface { - GetDynamicWindowsDesktop(ctx context.Context, name string) (types.DynamicWindowsDesktop, error) + ListDynamicWindowsDesktops(ctx context.Context, pageSize int, pageToken string) ([]types.DynamicWindowsDesktop, string, error) } // MarshalDynamicWindowsDesktop marshals the DynamicWindowsDesktop resource to JSON. diff --git a/lib/services/local/events.go b/lib/services/local/events.go index 5123421e9f8d..f0ef42d33648 100644 --- a/lib/services/local/events.go +++ b/lib/services/local/events.go @@ -167,6 +167,8 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type parser = newWindowsDesktopServicesParser() case types.KindWindowsDesktop: parser = newWindowsDesktopsParser() + case types.KindDynamicWindowsDesktop: + parser = newDynamicWindowsDesktopsParser() case types.KindInstaller: parser = newInstallerParser() case types.KindKubernetesCluster: @@ -1851,6 +1853,43 @@ func (p *windowsDesktopServicesParser) parse(event backend.Event) (types.Resourc } } +func newDynamicWindowsDesktopsParser() *dynamicWindowsDesktopsParser { + return &dynamicWindowsDesktopsParser{ + baseParser: newBaseParser(backend.NewKey(dynamicWindowsDesktopsPrefix, "")), + } +} + +type dynamicWindowsDesktopsParser struct { + baseParser +} + +func (p *dynamicWindowsDesktopsParser) parse(event backend.Event) (types.Resource, error) { + switch event.Type { + case types.OpDelete: + name := event.Item.Key.TrimPrefix(backend.NewKey(dynamicWindowsDesktopsPrefix, "")).String() + if name == "" { + return nil, trace.NotFound("failed parsing %v", event.Item.Key.String()) + } + + return &types.ResourceHeader{ + Kind: types.KindDynamicWindowsDesktop, + Version: types.V1, + Metadata: types.Metadata{ + Name: strings.TrimPrefix(name, backend.SeparatorString), + Namespace: apidefaults.Namespace, + }, + }, nil + case types.OpPut: + return services.UnmarshalDynamicWindowsDesktop( + event.Item.Value, + services.WithExpires(event.Item.Expires), + services.WithRevision(event.Item.Revision), + ) + default: + return nil, trace.BadParameter("event %v is not supported", event.Type) + } +} + func newWindowsDesktopsParser() *windowsDesktopsParser { return &windowsDesktopsParser{ baseParser: newBaseParser(backend.NewKey(windowsDesktopsPrefix, "")), diff --git a/lib/services/watcher.go b/lib/services/watcher.go index 63a4af59ae73..f896500b1021 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -963,6 +963,158 @@ func (p *databaseCollector) processEventsAndUpdateCurrent(ctx context.Context, e func (*databaseCollector) notifyStale() {} +// DynamicWindowsDesktopWatcherConfig is a DynamicWindowsDesktopWatcher configuration. +type DynamicWindowsDesktopWatcherConfig struct { + // ResourceWatcherConfig is the resource watcher configuration. + ResourceWatcherConfig + // DynamicWindowsDesktopGetter is responsible for fetching DynamicWindowsDesktop resources. + DynamicWindowsDesktopGetter + // DynamicWindowsDesktopsC receives up-to-date list of all DynamicWindowsDesktop resources. + DynamicWindowsDesktopsC chan types.DynamicWindowsDesktops +} + +// CheckAndSetDefaults checks parameters and sets default values. +func (cfg *DynamicWindowsDesktopWatcherConfig) CheckAndSetDefaults() error { + if err := cfg.ResourceWatcherConfig.CheckAndSetDefaults(); err != nil { + return trace.Wrap(err) + } + if cfg.DynamicWindowsDesktopGetter == nil { + getter, ok := cfg.Client.(DynamicWindowsDesktopGetter) + if !ok { + return trace.BadParameter("missing parameter DynamicWindowsDesktopGetter and Client %T not usable as DynamicWindowsDesktopGetter", cfg.Client) + } + cfg.DynamicWindowsDesktopGetter = getter + } + if cfg.DynamicWindowsDesktopsC == nil { + cfg.DynamicWindowsDesktopsC = make(chan types.DynamicWindowsDesktops) + } + return nil +} + +// NewDynamicWindowsDesktopWatcher returns a new instance of DynamicWindowsDesktopWatcher. +func NewDynamicWindowsDesktopWatcher(ctx context.Context, cfg DynamicWindowsDesktopWatcherConfig) (*DynamicWindowsDesktopWatcher, error) { + if err := cfg.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } + collector := &dynamicWindowsDesktopCollector{ + DynamicWindowsDesktopWatcherConfig: cfg, + initializationC: make(chan struct{}), + } + watcher, err := newResourceWatcher(ctx, collector, cfg.ResourceWatcherConfig) + if err != nil { + return nil, trace.Wrap(err) + } + return &DynamicWindowsDesktopWatcher{watcher, collector}, nil +} + +// DynamicWindowsDesktopWatcher is built on top of resourceWatcher to monitor DynamicWindowsDesktop resources. +type DynamicWindowsDesktopWatcher struct { + *resourceWatcher + *dynamicWindowsDesktopCollector +} + +// dynamicWindowsDesktopCollector accompanies resourceWatcher when monitoring DynamicWindowsDesktop resources. +type dynamicWindowsDesktopCollector struct { + // DynamicWindowsDesktopWatcherConfig is the watcher configuration. + DynamicWindowsDesktopWatcherConfig + // current holds a map of the currently known DynamicWindowsDesktop resources. + current map[string]types.DynamicWindowsDesktop + // lock protects the "current" map. + lock sync.RWMutex + // initializationC is used to check that the + initializationC chan struct{} + once sync.Once +} + +// resourceKinds specifies the resource kind to watch. +func (p *dynamicWindowsDesktopCollector) resourceKinds() []types.WatchKind { + return []types.WatchKind{{Kind: types.KindDynamicWindowsDesktop}} +} + +// isInitialized is used to check that the cache has done its initial +// sync +func (p *dynamicWindowsDesktopCollector) initializationChan() <-chan struct{} { + return p.initializationC +} + +// getResourcesAndUpdateCurrent refreshes the list of current resources. +func (p *dynamicWindowsDesktopCollector) getResourcesAndUpdateCurrent(ctx context.Context) error { + var dynamicWindowsDesktops []types.DynamicWindowsDesktop + next := "" + for { + desktops, token, err := p.DynamicWindowsDesktopGetter.ListDynamicWindowsDesktops(ctx, defaults.MaxIterationLimit, next) + if err != nil { + return trace.Wrap(err) + } + dynamicWindowsDesktops = append(dynamicWindowsDesktops, desktops...) + if token == "" { + break + } + next = token + } + newCurrent := make(map[string]types.DynamicWindowsDesktop, len(dynamicWindowsDesktops)) + for _, dynamicWindowsDesktop := range dynamicWindowsDesktops { + newCurrent[dynamicWindowsDesktop.GetName()] = dynamicWindowsDesktop + } + p.lock.Lock() + defer p.lock.Unlock() + p.current = newCurrent + p.defineCollectorAsInitialized() + + select { + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + case p.DynamicWindowsDesktopsC <- dynamicWindowsDesktops: + } + + return nil +} + +func (p *dynamicWindowsDesktopCollector) defineCollectorAsInitialized() { + p.once.Do(func() { + // mark watcher as initialized. + close(p.initializationC) + }) +} + +// processEventsAndUpdateCurrent is called when a watcher event is received. +func (p *dynamicWindowsDesktopCollector) processEventsAndUpdateCurrent(ctx context.Context, events []types.Event) { + p.lock.Lock() + defer p.lock.Unlock() + + var updated bool + for _, event := range events { + if event.Resource == nil || event.Resource.GetKind() != types.KindDynamicWindowsDesktop { + p.Log.Warnf("Unexpected event: %v.", event) + continue + } + switch event.Type { + case types.OpDelete: + delete(p.current, event.Resource.GetName()) + updated = true + case types.OpPut: + dynamicWindowsDesktop, ok := event.Resource.(types.DynamicWindowsDesktop) + if !ok { + p.Log.Warnf("Unexpected resource type %T.", event.Resource) + continue + } + p.current[dynamicWindowsDesktop.GetName()] = dynamicWindowsDesktop + updated = true + default: + p.Log.Warnf("Unsupported event type %s.", event.Type) + } + } + + if updated { + select { + case <-ctx.Done(): + case p.DynamicWindowsDesktopsC <- resourcesToSlice(p.current): + } + } +} + +func (*dynamicWindowsDesktopCollector) notifyStale() {} + // AppWatcherConfig is an AppWatcher configuration. type AppWatcherConfig struct { // ResourceWatcherConfig is the resource watcher configuration.