Support labels in job dependencies

It is a common pattern to depend on the same set of multiple jobs from
multiple places.  Additionally with orchestration tools this set of job
may be dynamic based on configuration.  In kubernetes we can encapsulate
this set of jobs via shared labels.  This patchset then adds support to
depend on such a set of jobs via labels.

This is exposed via a new DEPENDENCY_JOBS_JSON env var, but
DEPENDENCY_JOBS is retained for backward compatibility.  For consistency
DEPENDENCY_POD is renamed to DEPENDENCY_POD_JSON which is a breaking
change.
This commit is contained in:
Sean Eagan 2018-03-21 14:12:03 -05:00
parent e24ee794ed
commit 84de742d80
9 changed files with 245 additions and 32 deletions

View File

@ -65,10 +65,13 @@ Example:
Simple example how to use downward API to get `POD_NAME` can be found [here](https://raw.githubusercontent.com/kubernetes/kubernetes.github.io/master/docs/user-guide/downward-api/dapi-pod.yaml).
### Job
Checks if a given job succeded at least once.
Example:
Checks if a given job or set of jobs with matching name and/or labels succeeded at least once.
In order to use labels DEPENDENCY_JOBS_JSON must be used, but DEPENDENCY_JOBS is supported
as well for backward compatibility.
Examples:
`DEPENDENCY_JOBS=nova-init,neutron-init`
`DEPENDENCY_JOBS_JSON='[{"namespace": "foo", "name": "nova-init"}, {"labels": {"initializes": "neutron"}}]'`
`DEPENDENCY_JOBS=nova-init,neutron-init'`
### Config
This dependency performs a container level templating of configuration files. It can template an ip address `{{ .IP }}` and hostname `{{ .HOSTNAME }}`.
@ -91,13 +94,12 @@ Example:
Checks if at least one pod matching the specified labels is already running, by
default anywhere in the cluster, or use `"requireSameNode": true` to require a
a pod on the same node.
In contrast to other dependencies, the syntax uses json in order to avoid inventing a new
format to specify labels and the parsing complexities that would come with that.
As seen below the syntax uses JSON to allow for label support.
This dependency requires a `POD_NAME` env which can be easily passed through the
[downward api](http://kubernetes.io/docs/user-guide/downward-api/). The `POD_NAME` variable is mandatory and is used to resolve dependencies.
Example:
`DEPENDENCY_POD="[{\"namespace\": \"foo\", \"labels\": {\"k1\": \"v1\", \"k2\": \"v2\"}}, {\"labels\": {\"k1\": \"v1\", \"k2\": \"v2\"}, \"requireSameNode\": true}]"`
`DEPENDENCY_POD_JSON='[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}}, {"labels": {"k1": "v1", "k2": "v2"}, "requireSameNode": true}]'`
## Image

View File

@ -4,7 +4,11 @@ import (
"fmt"
entry "github.com/stackanetes/kubernetes-entrypoint/entrypoint"
"github.com/stackanetes/kubernetes-entrypoint/logger"
"github.com/stackanetes/kubernetes-entrypoint/util/env"
api "k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/apis/batch/v1"
"k8s.io/client-go/1.5/pkg/labels"
)
const FailingStatusFormat = "Job %s is not completed yet"
@ -12,38 +16,75 @@ const FailingStatusFormat = "Job %s is not completed yet"
type Job struct {
name string
namespace string
labels map[string]string
}
func init() {
jobsEnv := fmt.Sprintf("%sJOBS", entry.DependencyPrefix)
if jobsDeps := env.SplitEnvToDeps(jobsEnv); jobsDeps != nil {
jobsJsonEnv := fmt.Sprintf("%s%s", jobsEnv, entry.JsonSuffix)
if jobsDeps := env.SplitJobEnvToDeps(jobsEnv, jobsJsonEnv); jobsDeps != nil {
if len(jobsDeps) > 0 {
for _, dep := range jobsDeps {
entry.Register(NewJob(dep.Name, dep.Namespace))
job := NewJob(dep.Name, dep.Namespace, dep.Labels)
if job != nil {
entry.Register(*job)
}
}
}
}
}
func NewJob(name string, namespace string) Job {
return Job{
func NewJob(name string, namespace string, labels map[string]string) *Job {
if name != "" && labels != nil {
logger.Warning.Printf("Cannot specify both name and labels for job depependency")
return nil
}
return &Job{
name: name,
namespace: namespace,
labels: labels,
}
}
func (j Job) IsResolved(entrypoint entry.EntrypointInterface) (bool, error) {
job, err := entrypoint.Client().Jobs(j.namespace).Get(j.name)
if err != nil {
return false, err
iface := entrypoint.Client().Jobs(j.namespace)
var jobs []v1.Job
if j.name != "" {
job, err := iface.Get(j.name)
if err != nil {
return false, err
}
jobs = []v1.Job{*job}
} else if j.labels != nil {
label := labels.SelectorFromSet(j.labels)
opts := api.ListOptions{LabelSelector: label}
jobList, err := iface.List(opts)
if err != nil {
return false, err
}
jobs = jobList.Items
}
if job.Status.Succeeded == 0 {
return false, fmt.Errorf(FailingStatusFormat, j)
if len(jobs) == 0 {
return false, fmt.Errorf("No matching jobs found: %v", j)
}
for _, job := range jobs {
if job.Status.Succeeded == 0 {
return false, fmt.Errorf(FailingStatusFormat, j)
}
}
return true, nil
}
func (j Job) String() string {
return fmt.Sprintf("Job %s in namespace %s", j.name, j.namespace)
var prefix string
if j.name != "" {
prefix = fmt.Sprintf("Job %s", j.name)
} else if j.labels != nil {
prefix = fmt.Sprintf("Jobs with labels %s", j.labels)
} else {
prefix = "Jobs"
}
return fmt.Sprintf("%s in namespace %s", prefix, j.namespace)
}

View File

@ -13,6 +13,10 @@ import (
const testJobName = "TEST_JOB_NAME"
const testJobNamespace = "TEST_JOB_NAMESPACE"
var testLabels = map[string]string{
"k1": "v1",
}
var testEntrypoint entrypoint.EntrypointInterface
var _ = Describe("Job", func() {
@ -21,15 +25,25 @@ var _ = Describe("Job", func() {
testEntrypoint = mocks.NewEntrypoint()
})
It("checks the name of a newly created job", func() {
job := NewJob(testJobName, testJobNamespace)
It("constructor correctly assigns fields", func() {
nameJob := NewJob(testJobName, testJobNamespace, nil)
Expect(job.name).To(Equal(testJobName))
Expect(job.namespace).To(Equal(testJobNamespace))
Expect(nameJob.name).To(Equal(testJobName))
Expect(nameJob.namespace).To(Equal(testJobNamespace))
labelsJob := NewJob("", testJobNamespace, testLabels)
Expect(labelsJob.labels).To(Equal(testLabels))
})
It("checks resolution of a succeeding job", func() {
job := NewJob(mocks.SucceedingJobName, mocks.SucceedingJobName)
It("constructor returns nil when both name and labels specified", func() {
job := NewJob(testJobName, testJobNamespace, testLabels)
Expect(job).To(BeNil())
})
It("checks resolution of a succeeding job by name", func() {
job := NewJob(mocks.SucceedingJobName, mocks.SucceedingJobName, nil)
isResolved, err := job.IsResolved(testEntrypoint)
@ -37,13 +51,31 @@ var _ = Describe("Job", func() {
Expect(err).NotTo(HaveOccurred())
})
It("checks resolution failure of a failing job", func() {
job := NewJob(mocks.FailingJobName, mocks.FailingJobName)
It("checks resolution failure of a failing job by name", func() {
job := NewJob(mocks.FailingJobName, mocks.FailingJobName, nil)
isResolved, err := job.IsResolved(testEntrypoint)
Expect(isResolved).To(Equal(false))
Expect(err.Error()).To(Equal(fmt.Sprintf(FailingStatusFormat, job)))
})
It("checks resolution of a succeeding job by labels", func() {
job := NewJob("", mocks.SucceedingJobName, map[string]string{"name": mocks.SucceedingJobLabel})
isResolved, err := job.IsResolved(testEntrypoint)
Expect(isResolved).To(Equal(true))
Expect(err).NotTo(HaveOccurred())
})
It("checks resolution failure of a failing job by labels", func() {
job := NewJob("", mocks.FailingJobName, map[string]string{"name": mocks.FailingJobLabel})
isResolved, err := job.IsResolved(testEntrypoint)
Expect(isResolved).To(Equal(false))
Expect(err.Error()).To(Equal(fmt.Sprintf(FailingStatusFormat, job)))
})
})

View File

@ -25,7 +25,7 @@ type Pod struct {
}
func init() {
podEnv := fmt.Sprintf("%sPOD", entry.DependencyPrefix)
podEnv := fmt.Sprintf("%sPOD%s", entry.DependencyPrefix, entry.JsonSuffix)
if podDeps := env.SplitPodEnvToDeps(podEnv); podDeps != nil {
for _, dep := range podDeps {
pod, err := NewPod(dep.Labels, dep.Namespace, dep.RequireSameNode)

View File

@ -13,6 +13,7 @@ var dependencies []Resolver // List containing all dependencies to be resolved
const (
//DependencyPrefix is a prefix for env variables
DependencyPrefix = "DEPENDENCY_"
JsonSuffix = "_JSON"
resolverSleepInterval = 2
)

Binary file not shown.

View File

@ -10,8 +10,10 @@ import (
)
const (
SucceedingJobName = "succeed"
FailingJobName = "fail"
SucceedingJobName = "succeed"
FailingJobName = "fail"
SucceedingJobLabel = "succeed"
FailingJobLabel = "fail"
)
type jClient struct {
@ -41,7 +43,17 @@ func (j jClient) DeleteCollection(options *api.DeleteOptions, listOptions api.Li
return fmt.Errorf("Not implemented")
}
func (j jClient) List(options api.ListOptions) (*batch.JobList, error) {
return nil, fmt.Errorf("Not implemented")
var jobs []batch.Job
if options.LabelSelector.String() == fmt.Sprintf("name=%s", SucceedingJobLabel) {
jobs = []batch.Job{NewJob(1)}
} else if options.LabelSelector.String() == fmt.Sprintf("name=%s", FailingJobLabel) {
jobs = []batch.Job{NewJob(1), NewJob(0)}
} else {
return nil, fmt.Errorf("Mock job didnt work")
}
return &batch.JobList{
Items: jobs,
}, nil
}
func (j jClient) Update(job *batch.Job) (*batch.Job, error) {
@ -62,3 +74,9 @@ func (j jClient) Patch(name string, pt api.PatchType, data []byte, subresources
func NewJClient() v1batch.JobInterface {
return jClient{}
}
func NewJob(succeeded int32) batch.Job {
return batch.Job{
Status: batch.JobStatus{Succeeded: succeeded},
}
}

50
util/env/env.go vendored
View File

@ -23,6 +23,12 @@ type PodDependency struct {
RequireSameNode bool
}
type JobDependency struct {
Name string
Labels map[string]string
Namespace string
}
func SplitCommand() []string {
command := os.Getenv("COMMAND")
if command == "" {
@ -96,6 +102,50 @@ func SplitPodEnvToDeps(env string) []PodDependency {
return deps
}
//SplitJobEnvToDeps returns list of JobDependency
func SplitJobEnvToDeps(env string, jsonEnv string) []JobDependency {
deps := []JobDependency{}
namespace := GetBaseNamespace()
envVal := os.Getenv(env)
jsonEnvVal := os.Getenv(jsonEnv)
if jsonEnvVal != "" {
if envVal != "" {
logger.Warning.Printf("Ignoring %s since %s was specified", env, jsonEnv)
}
err := json.Unmarshal([]byte(jsonEnvVal), &deps)
if err != nil {
logger.Warning.Printf("Invalid format: ", jsonEnvVal)
return []JobDependency{}
}
valid := []JobDependency{}
for _, dep := range deps {
if dep.Namespace == "" {
dep.Namespace = namespace
}
valid = append(valid, dep)
}
return valid
}
if envVal != "" {
plainDeps := SplitEnvToDeps(env)
deps = []JobDependency{}
for _, dep := range plainDeps {
deps = append(deps, JobDependency{Name: dep.Name, Namespace: dep.Namespace})
}
return deps
}
return deps
}
//GetBaseNamespace returns default namespace when user set empty one
func GetBaseNamespace() string {
namespace := os.Getenv("NAMESPACE")

75
util/env/env_test.go vendored
View File

@ -95,9 +95,9 @@ func TestSplitEmptyEnvWithColon(t *testing.T) {
func TestSplitPodEnvToDepsSuccess(t *testing.T) {
defer os.Unsetenv("NAMESPACE")
os.Setenv("NAMESPACE", `TEST_NAMESPACE`)
defer os.Unsetenv("TEST_LIST")
os.Setenv("TEST_LIST", `[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}, "requireSameNode": true}, {"labels": {"k1": "v1", "k2": "v2"}}]`)
actual := SplitPodEnvToDeps("TEST_LIST")
defer os.Unsetenv("TEST_LIST_JSON")
os.Setenv("TEST_LIST_JSON", `[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}, "requireSameNode": true}, {"labels": {"k1": "v1", "k2": "v2"}}]`)
actual := SplitPodEnvToDeps("TEST_LIST_JSON")
expected := []PodDependency{
PodDependency{Namespace: "foo", Labels: map[string]string{
"k1": "v1",
@ -132,6 +132,75 @@ func TestSplitPodEnvToDepsIgnoreInvalid(t *testing.T) {
}
}
func TestSplitJobEnvToDepsJsonSuccess(t *testing.T) {
defer os.Unsetenv("NAMESPACE")
os.Setenv("NAMESPACE", `TEST_NAMESPACE`)
defer os.Unsetenv("TEST_LIST_JSON")
os.Setenv("TEST_LIST_JSON", `[{"namespace": "foo", "labels": {"k1": "v1", "k2": "v2"}}, {"name": "bar"}]`)
actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON")
expected := []JobDependency{
JobDependency{
Name: "",
Namespace: "foo", Labels: map[string]string{
"k1": "v1",
"k2": "v2",
}},
JobDependency{Name: "bar", Namespace: "TEST_NAMESPACE", Labels: nil},
}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Expected: %v Got: %v", expected, actual)
}
}
func TestSplitJobEnvToDepsPlainSuccess(t *testing.T) {
defer os.Unsetenv("NAMESPACE")
os.Setenv("NAMESPACE", `TEST_NAMESPACE`)
defer os.Unsetenv("TEST_LIST")
os.Setenv("TEST_LIST", `plain`)
actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON")
expected := []JobDependency{
JobDependency{Name: "plain", Namespace: "TEST_NAMESPACE", Labels: nil},
}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Expected: %v Got: %v", expected, actual)
}
}
func TestSplitJobEnvToDepsJsonPrecedence(t *testing.T) {
defer os.Unsetenv("NAMESPACE")
os.Setenv("NAMESPACE", `TEST_NAMESPACE`)
defer os.Unsetenv("TEST_LIST_JSON")
os.Setenv("TEST_LIST_JSON", `[{"name": "json"}]`)
defer os.Unsetenv("TEST_LIST")
os.Setenv("TEST_LIST", `plain`)
actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON")
expected := []JobDependency{
JobDependency{Name: "json", Namespace: "TEST_NAMESPACE", Labels: nil},
}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Expected: %v Got: %v", expected, actual)
}
}
func TestSplitJobEnvToDepsUnset(t *testing.T) {
actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON")
if len(actual) != 0 {
t.Errorf("Expected: no dependencies Got: %v", actual)
}
}
func TestSplitJobEnvToDepsIgnoreInvalid(t *testing.T) {
defer os.Unsetenv("TEST_LIST_JSON")
os.Setenv("TEST_LIST_JSON", `[{"invalid": json}`)
actual := SplitJobEnvToDeps("TEST_LIST", "TEST_LIST_JSON")
if len(actual) != 0 {
t.Errorf("Expected: ignore invalid dependencies Got: %v", actual)
}
}
func TestSplitCommand(t *testing.T) {
defer os.Unsetenv("COMMAND")
list2 := SplitCommand()