diff --git a/docs/modules/ROOT/pages/explanations/eventforwarding.adoc b/docs/modules/ROOT/pages/explanations/eventforwarding.adoc index 217da734c..11378f2ca 100644 --- a/docs/modules/ROOT/pages/explanations/eventforwarding.adoc +++ b/docs/modules/ROOT/pages/explanations/eventforwarding.adoc @@ -10,13 +10,13 @@ The annotation contains the kind, namespace and name of a claim that should rece == Annotation -The annotation is called `appcat.vshn.io/forward-events-to` and has the following format: `//`. +The annotation is called `appcat.vshn.io/forward-events-to` and has the following format: `///`. == Example ```yaml annotations: - appcat.vshn.io/forward-events-to: VSHNRedis/my-project/my-redis + appcat.vshn.io/forward-events-to: vshn.appcat.vshn.io/v1/VSHNRedis/my-project/my-redis ``` will forward all events that are owned by that object to the `VSHNRedis` claim `my-redis` in the namespace `my-project`. diff --git a/pkg/comp-functions/runtime/function_mgr.go b/pkg/comp-functions/runtime/function_mgr.go index d2149b094..16c12bf4c 100644 --- a/pkg/comp-functions/runtime/function_mgr.go +++ b/pkg/comp-functions/runtime/function_mgr.go @@ -48,6 +48,7 @@ const ( OwnerGroupAnnotation = "appcat.vshn.io/ownergroup" ProtectedByAnnotation = "appcat.vshn.io/protectedby" ProtectsAnnotation = "appcat.vshn.io/protects" + EventForwardAnnotation = "appcat.vshn.io/forward-events-to" ) // Step describes a single change within a service. @@ -67,7 +68,7 @@ type ServiceRuntime struct { Config corev1.ConfigMap // Copy of the desired resources from the request. Will be added to the resp // once all steps are finished. - desirdResources map[resource.Name]*resource.DesiredComposed + desiredResources map[resource.Name]*resource.DesiredComposed // connectionDetails contains all connection details that should get added // to the desired composite. connectionDetails resource.ConnectionDetails @@ -169,6 +170,10 @@ func (m Manager) RunFunction(ctx context.Context, req *fnv1beta1.RunFunctionRequ if err != nil { return errResp, err } + err = sr.ForwardEvents() + if err != nil { + return errResp, err + } return sr.GetResponse() } @@ -268,7 +273,7 @@ func NewServiceRuntime(l logr.Logger, config corev1.ConfigMap, req *fnv1beta1.Ru Log: l, Config: config, req: req, - desirdResources: desiredResources, + desiredResources: desiredResources, connectionDetails: comp.ConnectionDetails, results: []*xfnproto.Result{}, desiredComposite: desiredComposite.Resource, @@ -293,7 +298,7 @@ func (s *ServiceRuntime) GetResponse() (*fnv1beta1.RunFunctionResponse, error) { return nil, err } - err = response.SetDesiredComposedResources(resp, s.desirdResources) + err = response.SetDesiredComposedResources(resp, s.desiredResources) if err != nil { return nil, err } @@ -338,7 +343,7 @@ func (s *ServiceRuntime) SetDesiredComposedResourceWithName(obj xpresource.Manag return err } - s.desirdResources[resource.Name(name)] = &resource.DesiredComposed{Resource: unstructuredObj} + s.desiredResources[resource.Name(name)] = &resource.DesiredComposed{Resource: unstructuredObj} return nil } @@ -641,7 +646,7 @@ func (s *ServiceRuntime) GetObservedComposedResource(obj xpresource.Managed, nam // GetDesiredComposedResourceByName will return a desired composed resource from the request. // Use this, if you want anything from a previous function in the pipeline. func (s *ServiceRuntime) GetDesiredComposedResourceByName(obj xpresource.Managed, name string) error { - if res, ok := s.desirdResources[resource.Name(name)]; ok { + if res, ok := s.desiredResources[resource.Name(name)]; ok { jsonString, err := res.Resource.Unstructured.MarshalJSON() if err != nil { return err @@ -734,7 +739,7 @@ func (s *ServiceRuntime) GetObservedKubeObject(obj client.Object, name string) e // GetDesiredKubeObject returns the object as is on the cluster. func (s *ServiceRuntime) GetDesiredKubeObject(obj client.Object, name string) error { - res, ok := s.desirdResources[resource.Name(name)] + res, ok := s.desiredResources[resource.Name(name)] if !ok { return ErrNotFound } @@ -787,7 +792,7 @@ func (s *ServiceRuntime) checkReadiness() error { return fmt.Errorf("cannot get observed composed resources from %w", err) } - desired := s.desirdResources + desired := s.desiredResources s.Log.V(1).Info("Running readiness check for objects", "count", len(desired)) @@ -829,7 +834,7 @@ func (s *ServiceRuntime) checkReadiness() error { } } - s.desirdResources = desired + s.desiredResources = desired return nil @@ -844,7 +849,7 @@ func (s *ServiceRuntime) GetAllObserved() (map[resource.Name]resource.ObservedCo // GetAllDesired returns a map of all observed resources. // This is useful when a function needs to have overview about all objects belonging to a service. func (s *ServiceRuntime) GetAllDesired() map[resource.Name]*resource.DesiredComposed { - return s.desirdResources + return s.desiredResources } // GetDesiredComposite will return the currently desired composite. @@ -863,7 +868,7 @@ func (s *ServiceRuntime) GetDesiredComposite(obj client.Object) error { // DeleteDesiredCompososedResource removes a composite resource from the desired objects. // If the object is existing on the cluster, it will be deleted! func (s *ServiceRuntime) DeleteDesiredCompososedResource(name string) { - delete(s.desirdResources, resource.Name(name)) + delete(s.desiredResources, resource.Name(name)) } // isResourceSyncedAndReady checks if the given resource is synced and ready. @@ -989,12 +994,12 @@ func (s *ServiceRuntime) addOwnerReferenceAnnotation(obj client.Object, composed // By is the name of then managed resource which should block the deletion, as set in the desired map. As long as it exists // the deletion of "Of" will be denied. func (s *ServiceRuntime) UsageOfBy(of, by string) error { - ofUnstructuredRaw := s.desirdResources[resource.Name(of)] + ofUnstructuredRaw := s.desiredResources[resource.Name(of)] if ofUnstructuredRaw == nil { return ErrNotFound } ofUnstructured := ofUnstructuredRaw.Resource - byUnstructuredRaw := s.desirdResources[resource.Name(by)] + byUnstructuredRaw := s.desiredResources[resource.Name(by)] if byUnstructuredRaw == nil { return ErrNotFound } @@ -1031,7 +1036,7 @@ func (s *ServiceRuntime) UsageOfBy(of, by string) error { } func (s *ServiceRuntime) addUsages() error { - for resName, resource := range s.desirdResources { + for resName, resource := range s.desiredResources { byName, protect := resource.Resource.Unstructured.GetAnnotations()[ProtectedByAnnotation] if protect { resources := strings.Split(byName, ",") @@ -1065,3 +1070,41 @@ func (s *ServiceRuntime) addUsages() error { } return nil } + +func (s *ServiceRuntime) ForwardEvents() error { + claimRef := s.observedComposite.GetClaimReference() + // Claim is not yet populated, retry next time + if claimRef == nil { + return nil + } + eventForwardValue := fmt.Sprintf("%s/%s/%s/%s", claimRef.APIVersion, claimRef.Kind, claimRef.Namespace, claimRef.Name) + for _, res := range s.desiredResources { + r := res.Resource + + // For kube objects set the annotation 'EventForwardAnnotation' for themselves and for managed resource + if isKubeObject(r) { + p := "spec.forProvider.manifest.metadata.annotations" + v, _ := r.GetValue(p) + mrAnnotations := make(map[string]any) + if v != nil { + mrAnnotations = v.(map[string]any) + } + mrAnnotations[EventForwardAnnotation] = eventForwardValue + err := r.SetValue(p, mrAnnotations) + if err != nil { + return fmt.Errorf("cannot set event forward annotations for managed object %s: %w", r.GetName(), err) + } + } + annotations := r.GetAnnotations() + if annotations == nil { + annotations = map[string]string{} + } + annotations[EventForwardAnnotation] = eventForwardValue + r.SetAnnotations(annotations) + } + return nil +} + +func isKubeObject(r *composed.Unstructured) bool { + return r.GetKind() == "Object" && strings.HasPrefix(r.GetAPIVersion(), "kubernetes.crossplane.io") +}