Skip to content

Commit

Permalink
Merge pull request #174 from vshn/event-forward
Browse files Browse the repository at this point in the history
Event forward in comp function services
  • Loading branch information
zugao authored Jun 5, 2024
2 parents 57bb247 + e7ad89d commit 7be007a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 15 deletions.
4 changes: 2 additions & 2 deletions docs/modules/ROOT/pages/explanations/eventforwarding.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ The annotation contains the kind, namespace and name of a claim that should rece

== Annotation

The annotation is called `appcat.vshn.io/forward-events-to` and has the following format: `<claim_kind>/<claim_namespace>/<claim_name>`.
The annotation is called `appcat.vshn.io/forward-events-to` and has the following format: `<api_version>/<claim_kind>/<claim_namespace>/<claim_name>`.

== Example

```yaml
annotations:
appcat.vshn.io/forward-events-to: VSHNRedis/my-project/my-redis
appcat.vshn.io/forward-events-to: vshn.appcat.vshn.io/v1/VSHNRedis/my-project/my-redis
```

will forward all events that are owned by that object to the `VSHNRedis` claim `my-redis` in the namespace `my-project`.
69 changes: 56 additions & 13 deletions pkg/comp-functions/runtime/function_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
OwnerGroupAnnotation = "appcat.vshn.io/ownergroup"
ProtectedByAnnotation = "appcat.vshn.io/protectedby"
ProtectsAnnotation = "appcat.vshn.io/protects"
EventForwardAnnotation = "appcat.vshn.io/forward-events-to"
)

// Step describes a single change within a service.
Expand All @@ -67,7 +68,7 @@ type ServiceRuntime struct {
Config corev1.ConfigMap
// Copy of the desired resources from the request. Will be added to the resp
// once all steps are finished.
desirdResources map[resource.Name]*resource.DesiredComposed
desiredResources map[resource.Name]*resource.DesiredComposed
// connectionDetails contains all connection details that should get added
// to the desired composite.
connectionDetails resource.ConnectionDetails
Expand Down Expand Up @@ -169,6 +170,10 @@ func (m Manager) RunFunction(ctx context.Context, req *fnv1beta1.RunFunctionRequ
if err != nil {
return errResp, err
}
err = sr.ForwardEvents()
if err != nil {
return errResp, err
}

return sr.GetResponse()
}
Expand Down Expand Up @@ -268,7 +273,7 @@ func NewServiceRuntime(l logr.Logger, config corev1.ConfigMap, req *fnv1beta1.Ru
Log: l,
Config: config,
req: req,
desirdResources: desiredResources,
desiredResources: desiredResources,
connectionDetails: comp.ConnectionDetails,
results: []*xfnproto.Result{},
desiredComposite: desiredComposite.Resource,
Expand All @@ -293,7 +298,7 @@ func (s *ServiceRuntime) GetResponse() (*fnv1beta1.RunFunctionResponse, error) {
return nil, err
}

err = response.SetDesiredComposedResources(resp, s.desirdResources)
err = response.SetDesiredComposedResources(resp, s.desiredResources)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -338,7 +343,7 @@ func (s *ServiceRuntime) SetDesiredComposedResourceWithName(obj xpresource.Manag
return err
}

s.desirdResources[resource.Name(name)] = &resource.DesiredComposed{Resource: unstructuredObj}
s.desiredResources[resource.Name(name)] = &resource.DesiredComposed{Resource: unstructuredObj}
return nil
}

Expand Down Expand Up @@ -641,7 +646,7 @@ func (s *ServiceRuntime) GetObservedComposedResource(obj xpresource.Managed, nam
// GetDesiredComposedResourceByName will return a desired composed resource from the request.
// Use this, if you want anything from a previous function in the pipeline.
func (s *ServiceRuntime) GetDesiredComposedResourceByName(obj xpresource.Managed, name string) error {
if res, ok := s.desirdResources[resource.Name(name)]; ok {
if res, ok := s.desiredResources[resource.Name(name)]; ok {
jsonString, err := res.Resource.Unstructured.MarshalJSON()
if err != nil {
return err
Expand Down Expand Up @@ -734,7 +739,7 @@ func (s *ServiceRuntime) GetObservedKubeObject(obj client.Object, name string) e

// GetDesiredKubeObject returns the object as is on the cluster.
func (s *ServiceRuntime) GetDesiredKubeObject(obj client.Object, name string) error {
res, ok := s.desirdResources[resource.Name(name)]
res, ok := s.desiredResources[resource.Name(name)]
if !ok {
return ErrNotFound
}
Expand Down Expand Up @@ -787,7 +792,7 @@ func (s *ServiceRuntime) checkReadiness() error {
return fmt.Errorf("cannot get observed composed resources from %w", err)
}

desired := s.desirdResources
desired := s.desiredResources

s.Log.V(1).Info("Running readiness check for objects", "count", len(desired))

Expand Down Expand Up @@ -829,7 +834,7 @@ func (s *ServiceRuntime) checkReadiness() error {
}
}

s.desirdResources = desired
s.desiredResources = desired

return nil

Expand All @@ -844,7 +849,7 @@ func (s *ServiceRuntime) GetAllObserved() (map[resource.Name]resource.ObservedCo
// GetAllDesired returns a map of all observed resources.
// This is useful when a function needs to have overview about all objects belonging to a service.
func (s *ServiceRuntime) GetAllDesired() map[resource.Name]*resource.DesiredComposed {
return s.desirdResources
return s.desiredResources
}

// GetDesiredComposite will return the currently desired composite.
Expand All @@ -863,7 +868,7 @@ func (s *ServiceRuntime) GetDesiredComposite(obj client.Object) error {
// DeleteDesiredCompososedResource removes a composite resource from the desired objects.
// If the object is existing on the cluster, it will be deleted!
func (s *ServiceRuntime) DeleteDesiredCompososedResource(name string) {
delete(s.desirdResources, resource.Name(name))
delete(s.desiredResources, resource.Name(name))
}

// isResourceSyncedAndReady checks if the given resource is synced and ready.
Expand Down Expand Up @@ -989,12 +994,12 @@ func (s *ServiceRuntime) addOwnerReferenceAnnotation(obj client.Object, composed
// By is the name of then managed resource which should block the deletion, as set in the desired map. As long as it exists
// the deletion of "Of" will be denied.
func (s *ServiceRuntime) UsageOfBy(of, by string) error {
ofUnstructuredRaw := s.desirdResources[resource.Name(of)]
ofUnstructuredRaw := s.desiredResources[resource.Name(of)]
if ofUnstructuredRaw == nil {
return ErrNotFound
}
ofUnstructured := ofUnstructuredRaw.Resource
byUnstructuredRaw := s.desirdResources[resource.Name(by)]
byUnstructuredRaw := s.desiredResources[resource.Name(by)]
if byUnstructuredRaw == nil {
return ErrNotFound
}
Expand Down Expand Up @@ -1031,7 +1036,7 @@ func (s *ServiceRuntime) UsageOfBy(of, by string) error {
}

func (s *ServiceRuntime) addUsages() error {
for resName, resource := range s.desirdResources {
for resName, resource := range s.desiredResources {
byName, protect := resource.Resource.Unstructured.GetAnnotations()[ProtectedByAnnotation]
if protect {
resources := strings.Split(byName, ",")
Expand Down Expand Up @@ -1065,3 +1070,41 @@ func (s *ServiceRuntime) addUsages() error {
}
return nil
}

func (s *ServiceRuntime) ForwardEvents() error {
claimRef := s.observedComposite.GetClaimReference()
// Claim is not yet populated, retry next time
if claimRef == nil {
return nil
}
eventForwardValue := fmt.Sprintf("%s/%s/%s/%s", claimRef.APIVersion, claimRef.Kind, claimRef.Namespace, claimRef.Name)
for _, res := range s.desiredResources {
r := res.Resource

// For kube objects set the annotation 'EventForwardAnnotation' for themselves and for managed resource
if isKubeObject(r) {
p := "spec.forProvider.manifest.metadata.annotations"
v, _ := r.GetValue(p)
mrAnnotations := make(map[string]any)
if v != nil {
mrAnnotations = v.(map[string]any)
}
mrAnnotations[EventForwardAnnotation] = eventForwardValue
err := r.SetValue(p, mrAnnotations)
if err != nil {
return fmt.Errorf("cannot set event forward annotations for managed object %s: %w", r.GetName(), err)
}
}
annotations := r.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[EventForwardAnnotation] = eventForwardValue
r.SetAnnotations(annotations)
}
return nil
}

func isKubeObject(r *composed.Unstructured) bool {
return r.GetKind() == "Object" && strings.HasPrefix(r.GetAPIVersion(), "kubernetes.crossplane.io")
}

0 comments on commit 7be007a

Please sign in to comment.