Skip to content

Commit

Permalink
JobSink: Inject execution mode as env variable
Browse files Browse the repository at this point in the history
To support long running functions in Knative Functions, we will
inject a `KNATIVE_EXECUTION_MODE` environment variable with value
`batch` so that function can change it's runtime behavior to
read the event file rather than starting a long-running server.

Ref: knative/func#2586

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Nov 25, 2024
1 parent 366ff26 commit 04dd761
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 2 deletions.
3 changes: 3 additions & 0 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

js = js.DeepCopy() // Do not modify informer copy.
js.SetDefaults(ctx)

job := js.Spec.Job.DeepCopy()
job.Name = jobName
if job.Labels == nil {
Expand Down
41 changes: 41 additions & 0 deletions pkg/apis/sinks/v1alpha1/job_sink_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,48 @@ package v1alpha1

import (
"context"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
)

func (sink *JobSink) SetDefaults(ctx context.Context) {
if sink.Spec.Job != nil {
setBatchJobDefaults(sink.Spec.Job)
}
}

func setBatchJobDefaults(job *batchv1.Job) {
for i := range job.Spec.Template.Spec.Containers {
executionModeFound := false
for j := range job.Spec.Template.Spec.Containers[i].Env {
if job.Spec.Template.Spec.Containers[i].Env[j].Name == ExecutionModeEnvVar {
executionModeFound = true
break
}
}
if executionModeFound {
continue
}
job.Spec.Template.Spec.Containers[i].Env = append(job.Spec.Template.Spec.Containers[i].Env, corev1.EnvVar{
Name: ExecutionModeEnvVar,
Value: string(ExecutionModeBatch),
})
}
for i := range job.Spec.Template.Spec.InitContainers {
executionModeFound := false
for j := range job.Spec.Template.Spec.InitContainers[i].Env {
if job.Spec.Template.Spec.InitContainers[i].Env[j].Name == ExecutionModeEnvVar {
executionModeFound = true
break
}
}
if executionModeFound {
continue
}
job.Spec.Template.Spec.InitContainers[i].Env = append(job.Spec.Template.Spec.InitContainers[i].Env, corev1.EnvVar{
Name: ExecutionModeEnvVar,
Value: string(ExecutionModeBatch),
})
}
}
100 changes: 99 additions & 1 deletion pkg/apis/sinks/v1alpha1/job_sink_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,111 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
)

func TestSetDefaults(t *testing.T) {
testCases := map[string]struct {
initial JobSink
expected JobSink
}{}
}{
"execution mode": {
initial: JobSink{
Spec: JobSinkSpec{
Job: &batchv1.Job{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "cnt",
Image: "img",
},
{
Name: "cnt2",
Image: "img2",
},
{
Name: "cnt3",
Image: "img3",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "something"},
},
},
},
InitContainers: []corev1.Container{
{
Name: "cnt",
Image: "img",
},
{
Name: "cnt-ini2",
Image: "img-ini2",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "something"},
},
},
},
},
},
},
},
},
},
expected: JobSink{
Spec: JobSinkSpec{
Job: &batchv1.Job{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
{
Name: "cnt",
Image: "img",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "batch"},
},
},
{
Name: "cnt-ini2",
Image: "img-ini2",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "something"},
},
},
},
Containers: []corev1.Container{
{
Name: "cnt",
Image: "img",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "batch"},
},
},
{
Name: "cnt2",
Image: "img2",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "batch"},
},
},
{
Name: "cnt3",
Image: "img3",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "something"},
},
},
},
},
},
},
},
},
},
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
tc.initial.SetDefaults(context.TODO())
Expand Down
13 changes: 12 additions & 1 deletion pkg/apis/sinks/v1alpha1/job_sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,20 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
)

const (
ExecutionModeEnvVar = "KNATIVE_EXECUTION_MODE"
)

type ExecutionMode string

const (
ExecutionModeBatch ExecutionMode = "batch"
)

// +genclient
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/jobsink/jobsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ func testJob(name string) *batchv1.Job {
Containers: []corev1.Container{
{
Name: "test-container",
Env: []corev1.EnvVar{
{Name: "KNATIVE_EXECUTION_MODE", Value: "batch"},
},
},
},
},
Expand Down

0 comments on commit 04dd761

Please sign in to comment.