Don't use custom labels when selecting an agent pod

And delete the agent pod when it needs its custom labels to be
updated, so that the creator controller will notice that it is missing
and immediately create it with the new custom labels.
This commit is contained in:
Ryan Richard 2020-10-30 17:41:17 -07:00
parent 01f4fdb5c3
commit f76b9857da
7 changed files with 183 additions and 173 deletions

View File

@ -10,7 +10,6 @@ import (
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
@ -85,11 +84,10 @@ func NewAnnotaterController(
// Sync implements controllerlib.Syncer.
func (c *annotaterController) Sync(ctx controllerlib.Context) error {
agentSelector := labels.SelectorFromSet(c.agentPodConfig.Labels())
agentPods, err := c.agentPodInformer.
Lister().
Pods(c.agentPodConfig.Namespace).
List(agentSelector)
List(c.agentPodConfig.AgentSelector())
if err != nil {
return fmt.Errorf("informer cannot list agent pods: %w", err)
}

View File

@ -96,6 +96,10 @@ func TestAnnotaterControllerSync(t *testing.T) {
Namespace: agentPodNamespace,
ContainerImage: "some-agent-image",
PodNamePrefix: "some-agent-name-",
AdditionalLabels: map[string]string{
"myLabelKey1": "myLabelValue1",
"myLabelKey2": "myLabelValue2",
},
},
&CredentialIssuerConfigLocationConfig{
Namespace: credentialIssuerConfigNamespaceName,
@ -565,6 +569,46 @@ func TestAnnotaterControllerSync(t *testing.T) {
})
})
when("there is an agent pod without annotations set which does not have the configured additional labels", func() {
it.Before(func() {
delete(agentPod.ObjectMeta.Labels, "myLabelKey1")
r.NoError(agentInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod))
})
when("there is a matching controller manager pod", func() {
it.Before(func() {
r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the annotations according to the controller manager pod", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations[certPathAnnotation] = certPath
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
r.Equal(
[]coretesting.Action{
coretesting.NewGetAction(
podsGVR,
agentPodNamespace,
updatedAgentPod.Name,
),
coretesting.NewUpdateAction(
podsGVR,
agentPodNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
})
when("there is an agent pod with correct annotations set", func() {
it.Before(func() {
agentPod.Annotations = make(map[string]string)

View File

@ -110,7 +110,7 @@ func (c *createrController) Sync(ctx controllerlib.Context) error {
controllerManagerPod,
c.kubeSystemPodInformer,
c.agentPodInformer,
c.agentPodConfig.Labels(),
c.agentPodConfig.AgentSelector(),
)
if err != nil {
return err
@ -158,9 +158,8 @@ func findAgentPodForSpecificControllerManagerPod(
controllerManagerPod *corev1.Pod,
kubeSystemPodInformer corev1informers.PodInformer,
agentPodInformer corev1informers.PodInformer,
agentLabels map[string]string,
agentSelector labels.Selector,
) (*corev1.Pod, error) {
agentSelector := labels.SelectorFromSet(agentLabels)
agentPods, err := agentPodInformer.
Lister().
List(agentSelector)

View File

@ -223,10 +223,24 @@ func TestCreaterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
r.Empty(kubeAPIClient.Actions())
})
})
when("there is a matching agent pod that is missing some of the configured additional labels", func() {
it.Before(func() {
nonMatchingAgentPod := agentPod.DeepCopy()
delete(nonMatchingAgentPod.ObjectMeta.Labels, "myLabelKey1")
r.NoError(agentInformerClient.Tracker().Add(nonMatchingAgentPod))
r.NoError(kubeAPIClient.Tracker().Add(nonMatchingAgentPod))
})
it("does nothing because the deleter controller is responsible for deleting it", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Empty(kubeAPIClient.Actions())
})
})

View File

@ -7,7 +7,6 @@ import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
@ -57,11 +56,10 @@ func NewDeleterController(
// Sync implements controllerlib.Syncer.
func (c *deleterController) Sync(ctx controllerlib.Context) error {
agentSelector := labels.SelectorFromSet(c.agentPodConfig.Labels())
agentPods, err := c.agentPodInformer.
Lister().
Pods(c.agentPodConfig.Namespace).
List(agentSelector)
List(c.agentPodConfig.AgentSelector())
if err != nil {
return fmt.Errorf("informer cannot list agent pods: %w", err)
}

View File

@ -72,6 +72,10 @@ func TestDeleterControllerSync(t *testing.T) {
Namespace: agentPodNamespace,
ContainerImage: "some-agent-image",
PodNamePrefix: "some-agent-name-",
AdditionalLabels: map[string]string{
"myLabelKey1": "myLabelValue1",
"myLabelKey2": "myLabelValue2",
},
},
kubeAPIClient,
kubeSystemInformers.Core().V1().Pods(),
@ -95,6 +99,13 @@ func TestDeleterControllerSync(t *testing.T) {
controllerlib.TestRunSynchronously(t, subject)
}
var requireAgentPodWasDeleted = func() {
r.Equal(
[]coretesting.Action{coretesting.NewDeleteAction(podsGVR, agentPodNamespace, agentPod.Name)},
kubeAPIClient.Actions(),
)
}
it.Before(func() {
r = require.New(t)
@ -148,19 +159,12 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
r.Empty(kubeAPIClient.Actions())
})
when("the agent pod is out of sync with the controller manager via volume mounts", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
{
Name: "some-other-volume-mount",
},
}
controllerManagerPod.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{{Name: "some-other-volume-mount"}}
r.NoError(kubeSystemInformerClient.Tracker().Update(podsGVR, controllerManagerPod, controllerManagerPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, controllerManagerPod, controllerManagerPod.Namespace))
})
@ -170,26 +174,13 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
when("the agent pod is out of sync with the controller manager via volumes", func() {
it.Before(func() {
controllerManagerPod.Spec.Volumes = []corev1.Volume{
{
Name: "some-other-volume",
},
}
controllerManagerPod.Spec.Volumes = []corev1.Volume{{Name: "some-other-volume"}}
r.NoError(kubeSystemInformerClient.Tracker().Update(podsGVR, controllerManagerPod, controllerManagerPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, controllerManagerPod, controllerManagerPod.Namespace))
})
@ -199,16 +190,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -226,16 +208,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -251,26 +224,13 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
when("the agent pod is out of sync with the controller manager via tolerations", func() {
it.Before(func() {
controllerManagerPod.Spec.Tolerations = []corev1.Toleration{
{
Key: "some-other-toleration-key",
},
}
controllerManagerPod.Spec.Tolerations = []corev1.Toleration{{Key: "some-other-toleration-key"}}
r.NoError(kubeSystemInformerClient.Tracker().Update(podsGVR, controllerManagerPod, controllerManagerPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, controllerManagerPod, controllerManagerPod.Namespace))
})
@ -280,16 +240,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -306,16 +257,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -332,16 +274,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -358,16 +291,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -384,16 +308,74 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
when("the agent pod is out of sync with the template via labels", func() {
when("an additional label's value was changed", func() {
it.Before(func() {
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.ObjectMeta.Labels = map[string]string{
agentPodLabelKey: agentPodLabelValue,
// the value of a label is wrong so the pod should be deleted so it can get recreated with the new labels
"myLabelKey1": "myLabelValue1-outdated-value",
"myLabelKey2": "myLabelValue2-outdated-value",
}
r.NoError(agentInformerClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
requireAgentPodWasDeleted()
})
})
when("an additional label was added", func() {
it.Before(func() {
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.ObjectMeta.Labels = map[string]string{
agentPodLabelKey: agentPodLabelValue,
"myLabelKey1": "myLabelValue1",
// "myLabelKey2" is missing so the pod should be deleted so it can get recreated with the new labels
}
r.NoError(agentInformerClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
requireAgentPodWasDeleted()
})
})
when("the agent pod has extra labels that seem unrelated to the additional labels", func() {
it.Before(func() {
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.ObjectMeta.Labels = map[string]string{
agentPodLabelKey: agentPodLabelValue,
"myLabelKey1": "myLabelValue1",
"myLabelKey2": "myLabelValue2",
"extra-label": "not-related-to-the-sepcified-additional-labels",
}
r.NoError(agentInformerClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
})
it("does not delete the agent pod because someone else might have put those labels on it", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Empty(kubeAPIClient.Actions())
})
})
})
@ -410,16 +392,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
})
@ -436,16 +409,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -461,16 +425,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
@ -480,16 +435,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
requireAgentPodWasDeleted()
})
})
})
@ -500,10 +446,7 @@ func TestDeleterControllerSync(t *testing.T) {
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
r.Empty(kubeAPIClient.Actions())
})
})
}, spec.Parallel(), spec.Report(report.Terminal{}))

View File

@ -20,6 +20,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/klog/v2"
@ -81,13 +82,17 @@ type CredentialIssuerConfigLocationConfig struct {
}
func (c *AgentPodConfig) Labels() map[string]string {
labels := map[string]string{
allLabels := map[string]string{
agentPodLabelKey: agentPodLabelValue,
}
for k, v := range c.AdditionalLabels {
labels[k] = v
allLabels[k] = v
}
return labels
return allLabels
}
func (c *AgentPodConfig) AgentSelector() labels.Selector {
return labels.SelectorFromSet(map[string]string{agentPodLabelKey: agentPodLabelValue})
}
func (c *AgentPodConfig) PodTemplate() *corev1.Pod {
@ -164,10 +169,19 @@ func newAgentPod(
}
func isAgentPodUpToDate(actualAgentPod, expectedAgentPod *corev1.Pod) bool {
return equality.Semantic.DeepEqual(
actualAgentPod.Spec.Containers[0].VolumeMounts,
expectedAgentPod.Spec.Containers[0].VolumeMounts,
) &&
requiredLabelsAllPresentWithCorrectValues := true
actualLabels := actualAgentPod.ObjectMeta.Labels
for expectedLabelKey, expectedLabelValue := range expectedAgentPod.ObjectMeta.Labels {
if actualLabels[expectedLabelKey] != expectedLabelValue {
requiredLabelsAllPresentWithCorrectValues = false
break
}
}
return requiredLabelsAllPresentWithCorrectValues &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.Containers[0].VolumeMounts,
expectedAgentPod.Spec.Containers[0].VolumeMounts,
) &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.Containers[0].Name,
expectedAgentPod.Spec.Containers[0].Name,