Skip to content

Commit

Permalink
improve naming and split function into multiple methods
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoandredinis committed Oct 17, 2024
1 parent 1af9e5c commit 143341c
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 89 deletions.
38 changes: 3 additions & 35 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
s.awsEC2ResourcesStatus.iterationStarted()
s.awsEC2Tasks.iterationStarted()
s.awsEC2ResourcesStatus.reset()
s.awsEC2Tasks.reset()
}),
)
if err != nil {
Expand Down Expand Up @@ -900,22 +900,6 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) {
discoveryConfig: instances.DiscoveryConfig,
integration: instances.Integration,
}, 1)

s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
accountID: instances.AccountID,
integration: instances.Integration,
issueType: usertasks.AutoDiscoverEC2IssueEICEFailedToCreateNode,
region: instances.Region,
},
&usertasksv1.DiscoverEC2Instance{
// TODO(marco): add instance name
DiscoveryConfig: instances.DiscoveryConfig,
DiscoveryGroup: s.DiscoveryGroup,
InstanceId: ec2Instance.InstanceID,
SyncTime: timestamppb.New(s.clock.Now()),
},
)
continue
}

Expand Down Expand Up @@ -958,22 +942,6 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) {
discoveryConfig: instances.DiscoveryConfig,
integration: instances.Integration,
}, 1)

s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
accountID: instances.AccountID,
integration: instances.Integration,
issueType: usertasks.AutoDiscoverEC2IssueEICEFailedToUpsertNode,
region: instances.Region,
},
&usertasksv1.DiscoverEC2Instance{
// TODO(marco): add instance name
DiscoveryConfig: instances.DiscoveryConfig,
DiscoveryGroup: s.DiscoveryGroup,
InstanceId: instanceID,
SyncTime: timestamppb.New(s.clock.Now()),
},
)
}
})
if err != nil {
Expand Down Expand Up @@ -1012,7 +980,7 @@ func (s *Server) handleEC2RemoteInstallation(instances *server.EC2Instances) err

for _, instance := range req.Instances {
s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
awsEC2FailedEnrollmentTaskKey{
accountID: instances.AccountID,
integration: instances.Integration,
issueType: usertasks.AutoDiscoverEC2IssueInvocationFailure,
Expand Down
134 changes: 80 additions & 54 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1"
Expand Down Expand Up @@ -210,7 +211,7 @@ type awsResourceGroupResult struct {
failed int
}

func (d *awsResourcesStatus) iterationStarted() {
func (d *awsResourcesStatus) reset() {
d.mu.Lock()
defer d.mu.Unlock()

Expand Down Expand Up @@ -300,7 +301,7 @@ func (s *Server) ReportEC2SSMInstallationResult(ctx context.Context, result *ser
s.updateDiscoveryConfigStatus(result.DiscoveryConfig)

s.awsEC2Tasks.addFailedEnrollment(
awsEC2FailedEnrollmentGroup{
awsEC2FailedEnrollmentTaskKey{
integration: result.IntegrationName,
issueType: result.IssueType,
accountID: result.SSMRunEvent.AccountID,
Expand All @@ -323,15 +324,15 @@ func (s *Server) ReportEC2SSMInstallationResult(ctx context.Context, result *ser
type awsEC2Tasks struct {
mu sync.RWMutex
// instancesIssues maps the Discover EC2 User Task grouping parts to a set of instances metadata.
instancesIssues map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance
// groupPending is used to register which groups were changed in memory but were not yet sent upstream.
// When upserting User Tasks, if the group is not present in groupPending,
instancesIssues map[awsEC2FailedEnrollmentTaskKey]map[string]*usertasksv1.DiscoverEC2Instance
// groupsToSync is used to register which groups were changed in memory but were not yet sent to the cluster.
// When upserting User Tasks, if the group is not in groupsToSync,
// then the cluster already has the latest version of this particular group.
groupPending map[awsEC2FailedEnrollmentGroup]struct{}
taskKeysToSync map[awsEC2FailedEnrollmentTaskKey]struct{}
}

// awsEC2FailedEnrollmentGroup identifies a UserTask group.
type awsEC2FailedEnrollmentGroup struct {
// awsEC2FailedEnrollmentTaskKey identifies a UserTask group.
type awsEC2FailedEnrollmentTaskKey struct {
integration string
issueType string
accountID string
Expand All @@ -340,16 +341,16 @@ type awsEC2FailedEnrollmentGroup struct {

// iterationStarted clears out any in memory issues that were recorded.
// This is used when starting a new Auto Discover EC2 watcher iteration.
func (d *awsEC2Tasks) iterationStarted() {
func (d *awsEC2Tasks) reset() {
d.mu.Lock()
defer d.mu.Unlock()

d.instancesIssues = make(map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance)
d.groupPending = make(map[awsEC2FailedEnrollmentGroup]struct{})
d.instancesIssues = make(map[awsEC2FailedEnrollmentTaskKey]map[string]*usertasksv1.DiscoverEC2Instance)
d.taskKeysToSync = make(map[awsEC2FailedEnrollmentTaskKey]struct{})
}

// addFailedEnrollment adds a
func (d *awsEC2Tasks) addFailedEnrollment(g awsEC2FailedEnrollmentGroup, instance *usertasksv1.DiscoverEC2Instance) {
// addFailedEnrollment adds an enrollment failure of a given instance.
func (d *awsEC2Tasks) addFailedEnrollment(g awsEC2FailedEnrollmentTaskKey, instance *usertasksv1.DiscoverEC2Instance) {
// Only failures associated with an Integration are reported.
// There's no major blocking for showing non-integration User Tasks, but this keeps scope smaller.
if g.integration == "" {
Expand All @@ -359,31 +360,23 @@ func (d *awsEC2Tasks) addFailedEnrollment(g awsEC2FailedEnrollmentGroup, instanc
d.mu.Lock()
defer d.mu.Unlock()
if d.instancesIssues == nil {
d.instancesIssues = make(map[awsEC2FailedEnrollmentGroup]map[string]*usertasksv1.DiscoverEC2Instance)
d.instancesIssues = make(map[awsEC2FailedEnrollmentTaskKey]map[string]*usertasksv1.DiscoverEC2Instance)
}
if _, ok := d.instancesIssues[g]; !ok {
d.instancesIssues[g] = make(map[string]*usertasksv1.DiscoverEC2Instance)
}
d.instancesIssues[g][instance.InstanceId] = instance

if d.groupPending == nil {
d.groupPending = make(map[awsEC2FailedEnrollmentGroup]struct{})
if d.taskKeysToSync == nil {
d.taskKeysToSync = make(map[awsEC2FailedEnrollmentTaskKey]struct{})
}
d.groupPending[g] = struct{}{}
d.taskKeysToSync[g] = struct{}{}
}

// mergeUpsertDiscoverEC2Task takes the current DiscoverEC2 User Task issues stored in memory and
// merges them against the ones that exist in the cluster.
//
// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices.
func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2FailedEnrollmentGroup, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error {
userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{
Integration: taskGroup.integration,
IssueType: taskGroup.issueType,
AccountID: taskGroup.accountID,
Region: taskGroup.region,
})

// acquireSemaphoreForUserTask tries to acquire a semaphore lock for this user task.
// It returns a func which must be called to release the lock.
// It also returns a context which is tied to the lease and will be canceled if the lease ends.
func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (func(), context.Context, error) {
// Use the deterministic task name as semaphore name.
semaphoreName := userTaskName
semaphoreExpiration := 5 * time.Second
Expand Down Expand Up @@ -415,19 +408,40 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2FailedEnrollmentGrou
},
)
if err != nil {
return trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}

// Once the lease parent context is canceled, the lease will be released.
ctxWithLease, cancel := context.WithCancel(lease)
defer cancel()

defer func() {
releaseFn := func() {
lease.Stop()
if err := lease.Wait(); err != nil {
s.Log.WithError(err).WithField("semaphore", userTaskName).Warn("error cleaning up UserTask semaphore")
}
}()
}

return releaseFn, ctxWithLease, nil
}

// mergeUpsertDiscoverEC2Task takes the current DiscoverEC2 User Task issues stored in memory and
// merges them against the ones that exist in the cluster.
//
// All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices.
func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2FailedEnrollmentTaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error {
userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{
Integration: taskGroup.integration,
IssueType: taskGroup.issueType,
AccountID: taskGroup.accountID,
Region: taskGroup.region,
})

releaseFn, ctxWithLease, err := s.acquireSemaphoreForUserTask(userTaskName)
if err != nil {
return trace.Wrap(err)
}
defer releaseFn()

// Fetch the current task because it might have instances discovered by another group of DiscoveryServices.
currentUserTask, err := s.AccessPoint.GetUserTask(ctxWithLease, userTaskName)
Expand All @@ -436,25 +450,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2FailedEnrollmentGrou
case err != nil:
return trace.Wrap(err)
default:
for existingIntanceID, existingInstance := range currentUserTask.Spec.DiscoverEc2.Instances {
// Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup.
// So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup.
// If other instances exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup.
if existingInstance.DiscoveryGroup == s.DiscoveryGroup {
continue
}

// For existing instances whose sync time is too far in the past, just drop them.
// This ensures that if an instance is removed from AWS, it will eventually disappear from the User Tasks' instance list.
// It might also be the case that the DiscoveryConfig was changed and the instance is no longer matched (because of labels/regions or other matchers).
instanceIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval)
if existingInstance.SyncTime.AsTime().Before(instanceIssueExpiration) {
continue
}

// Merge existing backend state into in-memory object.
failedInstances[existingIntanceID] = existingInstance
}
failedInstances = s.discoverEC2UserTaskAddExistingInstances(currentUserTask, failedInstances)
}

// If the DiscoveryService is stopped, or the issue does not happen again
Expand Down Expand Up @@ -486,20 +482,50 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2FailedEnrollmentGrou
return nil
}

// discoverEC2UserTaskAddExistingInstances takes the UserTask stored in the cluster and merges it into the existing map of failed instances.
func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *usertasksv1.UserTask, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) map[string]*usertasksv1.DiscoverEC2Instance {
for existingInstanceID, existingInstance := range currentUserTask.Spec.DiscoverEc2.Instances {
// Each DiscoveryService works on all the DiscoveryConfigs assigned to a given DiscoveryGroup.
// So, it's safe to say that current DiscoveryService has the last state for a given DiscoveryGroup.
// If other instances exist for this DiscoveryGroup, they can be discarded because, as said before, the current DiscoveryService has the last state for a given DiscoveryGroup.
if existingInstance.DiscoveryGroup == s.DiscoveryGroup {
continue
}

// For existing instances whose sync time is too far in the past, just drop them.
// This ensures that if an instance is removed from AWS, it will eventually disappear from the User Tasks' instance list.
// It might also be the case that the DiscoveryConfig was changed and the instance is no longer matched (because of labels/regions or other matchers).
instanceIssueExpiration := s.clock.Now().Add(-2 * s.PollInterval)
if existingInstance.SyncTime.AsTime().Before(instanceIssueExpiration) {
continue
}

// Merge existing cluster state into in-memory object.
failedInstances[existingInstanceID] = existingInstance
}
return failedInstances
}

func (s *Server) upsertTasksForAWSEC2FailedEnrollments() {
s.awsEC2Tasks.mu.Lock()
defer s.awsEC2Tasks.mu.Unlock()
for g := range s.awsEC2Tasks.groupPending {
for g := range s.awsEC2Tasks.taskKeysToSync {
instancesIssueByID := s.awsEC2Tasks.instancesIssues[g]
if len(instancesIssueByID) == 0 {
continue
}

if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID); err != nil {
s.Log.WithError(err).Warning("failed to create discover ec2 user task")
s.Log.WithError(err).WithFields(logrus.Fields{
"integration": g.integration,
"issue_type": g.issueType,
"aws_account_id": g.accountID,
"aws_region": g.region,
},
).Warning("Failed to create discover ec2 user task.", g.integration, g.issueType, g.accountID, g.region)
continue
}

delete(s.awsEC2Tasks.groupPending, g)
delete(s.awsEC2Tasks.taskKeysToSync, g)
}
}

0 comments on commit 143341c

Please sign in to comment.