diff --git a/README.md b/README.md index 8a5058b..a8dddfe 100644 --- a/README.md +++ b/README.md @@ -65,10 +65,13 @@ Example: Simple example how to use downward API to get `POD_NAME` can be found [here](https://raw.githubusercontent.com/kubernetes/kubernetes.github.io/master/docs/user-guide/downward-api/dapi-pod.yaml). ### Job -Checks if a given job succeded at least once. -Example: +Checks if a given job or set of jobs with matching name and/or labels succeeded at least once. +In order to use labels DEPENDENCY_JOBS_JSON must be used, but DEPENDENCY_JOBS is supported +as well for backward compatibility. +Examples: -`DEPENDENCY_JOBS=nova-init,neutron-init` +`DEPENDENCY_JOBS_JSON='[{"namespace": "foo", "name": "nova-init"}, {"labels": {"initializes": "neutron"}}]'` +`DEPENDENCY_JOBS=nova-init,neutron-init'` ### Config This dependency performs a container level templating of configuration files. It can template an ip address `{{ .IP }}` and hostname `{{ .HOSTNAME }}`. @@ -91,13 +94,12 @@ Example: Checks if at least one pod matching the specified labels is already running, by default anywhere in the cluster, or use `"requireSameNode": true` to require a a pod on the same node. -In contrast to other dependencies, the syntax uses json in order to avoid inventing a new -format to specify labels and the parsing complexities that would come with that. +As seen below the syntax uses JSON to allow for label support. This dependency requires a `POD_NAME` env which can be easily passed through the [downward api](http://kubernetes.io/docs/user-guide/downward-api/). The `POD_NAME` variable is mandatory and is used to resolve dependencies. Example: -`DEPENDENCY_POD="[{\"namespace\": \"foo\", \"labels\": {\"k1\": \"v1\", \"k2\": \"v2\"}}, {\"labels\": {\"k1\": \"v1\", \"k2\": \"v2\"}, \"requireSameNode\": true}]"` +`DEPENDENCY_POD_JSON='[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}}, {"labels": {"k1": "v1", "k2": "v2"}, "requireSameNode": true}]'` ## Image diff --git a/dependencies/job/job.go b/dependencies/job/job.go index 006f41a..aaf20fb 100644 --- a/dependencies/job/job.go +++ b/dependencies/job/job.go @@ -4,7 +4,11 @@ import ( "fmt" entry "github.com/stackanetes/kubernetes-entrypoint/entrypoint" + "github.com/stackanetes/kubernetes-entrypoint/logger" "github.com/stackanetes/kubernetes-entrypoint/util/env" + api "k8s.io/client-go/1.5/pkg/api" + "k8s.io/client-go/1.5/pkg/apis/batch/v1" + "k8s.io/client-go/1.5/pkg/labels" ) const FailingStatusFormat = "Job %s is not completed yet" @@ -12,38 +16,75 @@ const FailingStatusFormat = "Job %s is not completed yet" type Job struct { name string namespace string + labels map[string]string } func init() { jobsEnv := fmt.Sprintf("%sJOBS", entry.DependencyPrefix) - if jobsDeps := env.SplitEnvToDeps(jobsEnv); jobsDeps != nil { + jobsJsonEnv := fmt.Sprintf("%s%s", jobsEnv, entry.JsonSuffix) + if jobsDeps := env.SplitJobEnvToDeps(jobsEnv, jobsJsonEnv); jobsDeps != nil { if len(jobsDeps) > 0 { for _, dep := range jobsDeps { - entry.Register(NewJob(dep.Name, dep.Namespace)) + job := NewJob(dep.Name, dep.Namespace, dep.Labels) + if job != nil { + entry.Register(*job) + } } } } } -func NewJob(name string, namespace string) Job { - return Job{ +func NewJob(name string, namespace string, labels map[string]string) *Job { + if name != "" && labels != nil { + logger.Warning.Printf("Cannot specify both name and labels for job depependency") + return nil + } + return &Job{ name: name, namespace: namespace, + labels: labels, } - } func (j Job) IsResolved(entrypoint entry.EntrypointInterface) (bool, error) { - job, err := entrypoint.Client().Jobs(j.namespace).Get(j.name) - if err != nil { - return false, err + iface := entrypoint.Client().Jobs(j.namespace) + var jobs []v1.Job + + if j.name != "" { + job, err := iface.Get(j.name) + if err != nil { + return false, err + } + jobs = []v1.Job{*job} + } else if j.labels != nil { + label := labels.SelectorFromSet(j.labels) + opts := api.ListOptions{LabelSelector: label} + jobList, err := iface.List(opts) + if err != nil { + return false, err + } + jobs = jobList.Items } - if job.Status.Succeeded == 0 { - return false, fmt.Errorf(FailingStatusFormat, j) + if len(jobs) == 0 { + return false, fmt.Errorf("No matching jobs found: %v", j) + } + + for _, job := range jobs { + if job.Status.Succeeded == 0 { + return false, fmt.Errorf(FailingStatusFormat, j) + } } return true, nil } func (j Job) String() string { - return fmt.Sprintf("Job %s in namespace %s", j.name, j.namespace) + var prefix string + if j.name != "" { + prefix = fmt.Sprintf("Job %s", j.name) + } else if j.labels != nil { + prefix = fmt.Sprintf("Jobs with labels %s", j.labels) + } else { + prefix = "Jobs" + } + return fmt.Sprintf("%s in namespace %s", prefix, j.namespace) } diff --git a/dependencies/job/job_test.go b/dependencies/job/job_test.go index 7efc2c7..787287d 100644 --- a/dependencies/job/job_test.go +++ b/dependencies/job/job_test.go @@ -13,6 +13,10 @@ import ( const testJobName = "TEST_JOB_NAME" const testJobNamespace = "TEST_JOB_NAMESPACE" +var testLabels = map[string]string{ + "k1": "v1", +} + var testEntrypoint entrypoint.EntrypointInterface var _ = Describe("Job", func() { @@ -21,15 +25,25 @@ var _ = Describe("Job", func() { testEntrypoint = mocks.NewEntrypoint() }) - It("checks the name of a newly created job", func() { - job := NewJob(testJobName, testJobNamespace) + It("constructor correctly assigns fields", func() { + nameJob := NewJob(testJobName, testJobNamespace, nil) - Expect(job.name).To(Equal(testJobName)) - Expect(job.namespace).To(Equal(testJobNamespace)) + Expect(nameJob.name).To(Equal(testJobName)) + Expect(nameJob.namespace).To(Equal(testJobNamespace)) + + labelsJob := NewJob("", testJobNamespace, testLabels) + + Expect(labelsJob.labels).To(Equal(testLabels)) }) - It("checks resolution of a succeeding job", func() { - job := NewJob(mocks.SucceedingJobName, mocks.SucceedingJobName) + It("constructor returns nil when both name and labels specified", func() { + job := NewJob(testJobName, testJobNamespace, testLabels) + + Expect(job).To(BeNil()) + }) + + It("checks resolution of a succeeding job by name", func() { + job := NewJob(mocks.SucceedingJobName, mocks.SucceedingJobName, nil) isResolved, err := job.IsResolved(testEntrypoint) @@ -37,13 +51,31 @@ var _ = Describe("Job", func() { Expect(err).NotTo(HaveOccurred()) }) - It("checks resolution failure of a failing job", func() { - job := NewJob(mocks.FailingJobName, mocks.FailingJobName) + It("checks resolution failure of a failing job by name", func() { + job := NewJob(mocks.FailingJobName, mocks.FailingJobName, nil) isResolved, err := job.IsResolved(testEntrypoint) Expect(isResolved).To(Equal(false)) Expect(err.Error()).To(Equal(fmt.Sprintf(FailingStatusFormat, job))) + }) + + It("checks resolution of a succeeding job by labels", func() { + job := NewJob("", mocks.SucceedingJobName, map[string]string{"name": mocks.SucceedingJobLabel}) + + isResolved, err := job.IsResolved(testEntrypoint) + + Expect(isResolved).To(Equal(true)) + Expect(err).NotTo(HaveOccurred()) + }) + + It("checks resolution failure of a failing job by labels", func() { + job := NewJob("", mocks.FailingJobName, map[string]string{"name": mocks.FailingJobLabel}) + + isResolved, err := job.IsResolved(testEntrypoint) + + Expect(isResolved).To(Equal(false)) Expect(err.Error()).To(Equal(fmt.Sprintf(FailingStatusFormat, job))) }) + }) diff --git a/dependencies/pod/pod.go b/dependencies/pod/pod.go index 2ec736b..828e937 100644 --- a/dependencies/pod/pod.go +++ b/dependencies/pod/pod.go @@ -25,7 +25,7 @@ type Pod struct { } func init() { - podEnv := fmt.Sprintf("%sPOD", entry.DependencyPrefix) + podEnv := fmt.Sprintf("%sPOD%s", entry.DependencyPrefix, entry.JsonSuffix) if podDeps := env.SplitPodEnvToDeps(podEnv); podDeps != nil { for _, dep := range podDeps { pod, err := NewPod(dep.Labels, dep.Namespace, dep.RequireSameNode) diff --git a/entrypoint/entrypoint.go b/entrypoint/entrypoint.go index 5ddf734..450ae71 100644 --- a/entrypoint/entrypoint.go +++ b/entrypoint/entrypoint.go @@ -13,6 +13,7 @@ var dependencies []Resolver // List containing all dependencies to be resolved const ( //DependencyPrefix is a prefix for env variables DependencyPrefix = "DEPENDENCY_" + JsonSuffix = "_JSON" resolverSleepInterval = 2 ) diff --git a/kubernetes-entrypoint b/kubernetes-entrypoint index ca23612..77e76ce 100755 Binary files a/kubernetes-entrypoint and b/kubernetes-entrypoint differ diff --git a/mocks/job.go b/mocks/job.go index bf40c0e..972f6fa 100644 --- a/mocks/job.go +++ b/mocks/job.go @@ -10,8 +10,10 @@ import ( ) const ( - SucceedingJobName = "succeed" - FailingJobName = "fail" + SucceedingJobName = "succeed" + FailingJobName = "fail" + SucceedingJobLabel = "succeed" + FailingJobLabel = "fail" ) type jClient struct { @@ -41,7 +43,17 @@ func (j jClient) DeleteCollection(options *api.DeleteOptions, listOptions api.Li return fmt.Errorf("Not implemented") } func (j jClient) List(options api.ListOptions) (*batch.JobList, error) { - return nil, fmt.Errorf("Not implemented") + var jobs []batch.Job + if options.LabelSelector.String() == fmt.Sprintf("name=%s", SucceedingJobLabel) { + jobs = []batch.Job{NewJob(1)} + } else if options.LabelSelector.String() == fmt.Sprintf("name=%s", FailingJobLabel) { + jobs = []batch.Job{NewJob(1), NewJob(0)} + } else { + return nil, fmt.Errorf("Mock job didnt work") + } + return &batch.JobList{ + Items: jobs, + }, nil } func (j jClient) Update(job *batch.Job) (*batch.Job, error) { @@ -62,3 +74,9 @@ func (j jClient) Patch(name string, pt api.PatchType, data []byte, subresources func NewJClient() v1batch.JobInterface { return jClient{} } + +func NewJob(succeeded int32) batch.Job { + return batch.Job{ + Status: batch.JobStatus{Succeeded: succeeded}, + } +} diff --git a/util/env/env.go b/util/env/env.go index 0b7e05e..60e20ec 100644 --- a/util/env/env.go +++ b/util/env/env.go @@ -23,6 +23,12 @@ type PodDependency struct { RequireSameNode bool } +type JobDependency struct { + Name string + Labels map[string]string + Namespace string +} + func SplitCommand() []string { command := os.Getenv("COMMAND") if command == "" { @@ -96,6 +102,50 @@ func SplitPodEnvToDeps(env string) []PodDependency { return deps } +//SplitJobEnvToDeps returns list of JobDependency +func SplitJobEnvToDeps(env string, jsonEnv string) []JobDependency { + deps := []JobDependency{} + + namespace := GetBaseNamespace() + + envVal := os.Getenv(env) + jsonEnvVal := os.Getenv(jsonEnv) + if jsonEnvVal != "" { + if envVal != "" { + logger.Warning.Printf("Ignoring %s since %s was specified", env, jsonEnv) + } + err := json.Unmarshal([]byte(jsonEnvVal), &deps) + if err != nil { + logger.Warning.Printf("Invalid format: ", jsonEnvVal) + return []JobDependency{} + } + + valid := []JobDependency{} + for _, dep := range deps { + if dep.Namespace == "" { + dep.Namespace = namespace + } + + valid = append(valid, dep) + } + + return valid + } + + if envVal != "" { + plainDeps := SplitEnvToDeps(env) + + deps = []JobDependency{} + for _, dep := range plainDeps { + deps = append(deps, JobDependency{Name: dep.Name, Namespace: dep.Namespace}) + } + + return deps + } + + return deps +} + //GetBaseNamespace returns default namespace when user set empty one func GetBaseNamespace() string { namespace := os.Getenv("NAMESPACE") diff --git a/util/env/env_test.go b/util/env/env_test.go index 470e87b..34e5d04 100644 --- a/util/env/env_test.go +++ b/util/env/env_test.go @@ -95,9 +95,9 @@ func TestSplitEmptyEnvWithColon(t *testing.T) { func TestSplitPodEnvToDepsSuccess(t *testing.T) { defer os.Unsetenv("NAMESPACE") os.Setenv("NAMESPACE", `TEST_NAMESPACE`) - defer os.Unsetenv("TEST_LIST") - os.Setenv("TEST_LIST", `[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}, "requireSameNode": true}, {"labels": {"k1": "v1", "k2": "v2"}}]`) - actual := SplitPodEnvToDeps("TEST_LIST") + defer os.Unsetenv("TEST_LIST_JSON") + os.Setenv("TEST_LIST_JSON", `[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}, "requireSameNode": true}, {"labels": {"k1": "v1", "k2": "v2"}}]`) + actual := SplitPodEnvToDeps("TEST_LIST_JSON") expected := []PodDependency{ PodDependency{Namespace: "foo", Labels: map[string]string{ "k1": "v1", @@ -132,6 +132,75 @@ func TestSplitPodEnvToDepsIgnoreInvalid(t *testing.T) { } } +func TestSplitJobEnvToDepsJsonSuccess(t *testing.T) { + defer os.Unsetenv("NAMESPACE") + os.Setenv("NAMESPACE", `TEST_NAMESPACE`) + defer os.Unsetenv("TEST_LIST_JSON") + os.Setenv("TEST_LIST_JSON", `[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}}, {"name": "bar"}]`) + actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON") + expected := []JobDependency{ + JobDependency{ + Name: "", + Namespace: "foo", Labels: map[string]string{ + "k1": "v1", + "k2": "v2", + }}, + JobDependency{Name: "bar", Namespace: "TEST_NAMESPACE", Labels: nil}, + } + + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Expected: %v Got: %v", expected, actual) + } +} + +func TestSplitJobEnvToDepsPlainSuccess(t *testing.T) { + defer os.Unsetenv("NAMESPACE") + os.Setenv("NAMESPACE", `TEST_NAMESPACE`) + defer os.Unsetenv("TEST_LIST") + os.Setenv("TEST_LIST", `plain`) + actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON") + expected := []JobDependency{ + JobDependency{Name: "plain", Namespace: "TEST_NAMESPACE", Labels: nil}, + } + + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Expected: %v Got: %v", expected, actual) + } +} + +func TestSplitJobEnvToDepsJsonPrecedence(t *testing.T) { + defer os.Unsetenv("NAMESPACE") + os.Setenv("NAMESPACE", `TEST_NAMESPACE`) + defer os.Unsetenv("TEST_LIST_JSON") + os.Setenv("TEST_LIST_JSON", `[{"name": "json"}]`) + defer os.Unsetenv("TEST_LIST") + os.Setenv("TEST_LIST", `plain`) + actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON") + expected := []JobDependency{ + JobDependency{Name: "json", Namespace: "TEST_NAMESPACE", Labels: nil}, + } + + if !reflect.DeepEqual(expected, actual) { + t.Errorf("Expected: %v Got: %v", expected, actual) + } +} + +func TestSplitJobEnvToDepsUnset(t *testing.T) { + actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON") + if len(actual) != 0 { + t.Errorf("Expected: no dependencies Got: %v", actual) + } +} + +func TestSplitJobEnvToDepsIgnoreInvalid(t *testing.T) { + defer os.Unsetenv("TEST_LIST_JSON") + os.Setenv("TEST_LIST_JSON", `[{"invalid": json}`) + actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON") + if len(actual) != 0 { + t.Errorf("Expected: ignore invalid dependencies Got: %v", actual) + } +} + func TestSplitCommand(t *testing.T) { defer os.Unsetenv("COMMAND") list2 := SplitCommand()