Continue the WIP from the previous commit: finish adding second informer

- All of the `kubecertagent` controllers now take two informers
- This is moving in the direction of creating the agent pods in the
  Pinniped installation namespace, but that will come in a future
  commit
This commit is contained in:
Ryan Richard 2020-09-21 16:37:22 -07:00
parent 50258fc569
commit 820f1e977e
9 changed files with 207 additions and 190 deletions

View File

@ -21,9 +21,10 @@ import (
) )
type annotaterController struct { type annotaterController struct {
agentInfo *Info agentInfo *Info
k8sClient kubernetes.Interface k8sClient kubernetes.Interface
podInformer corev1informers.PodInformer kubeSystemPodInformer corev1informers.PodInformer
agentPodInformer corev1informers.PodInformer
} }
// NewAnnotaterController returns a controller that updates agent pods with the path to the kube // NewAnnotaterController returns a controller that updates agent pods with the path to the kube
@ -35,22 +36,29 @@ type annotaterController struct {
func NewAnnotaterController( func NewAnnotaterController(
agentInfo *Info, agentInfo *Info,
k8sClient kubernetes.Interface, k8sClient kubernetes.Interface,
podInformer corev1informers.PodInformer, kubeSystemPodInformer corev1informers.PodInformer,
agentPodInformer corev1informers.PodInformer,
withInformer pinnipedcontroller.WithInformerOptionFunc, withInformer pinnipedcontroller.WithInformerOptionFunc,
) controllerlib.Controller { ) controllerlib.Controller {
return controllerlib.New( return controllerlib.New(
controllerlib.Config{ controllerlib.Config{
Name: "kube-cert-agent-annotater-controller", Name: "kube-cert-agent-annotater-controller",
Syncer: &annotaterController{ Syncer: &annotaterController{
agentInfo: agentInfo, agentInfo: agentInfo,
k8sClient: k8sClient, k8sClient: k8sClient,
podInformer: podInformer, kubeSystemPodInformer: kubeSystemPodInformer,
agentPodInformer: agentPodInformer,
}, },
}, },
withInformer( withInformer(
podInformer, kubeSystemPodInformer,
pinnipedcontroller.SimpleFilter(isControllerManagerPod),
controllerlib.InformerOption{},
),
withInformer(
agentPodInformer,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels) return isAgentPod(obj, agentInfo.Template.Labels)
}), }),
controllerlib.InformerOption{}, controllerlib.InformerOption{},
), ),
@ -60,7 +68,7 @@ func NewAnnotaterController(
// Sync implements controllerlib.Syncer. // Sync implements controllerlib.Syncer.
func (c *annotaterController) Sync(ctx controllerlib.Context) error { func (c *annotaterController) Sync(ctx controllerlib.Context) error {
agentSelector := labels.SelectorFromSet(c.agentInfo.Template.Labels) agentSelector := labels.SelectorFromSet(c.agentInfo.Template.Labels)
agentPods, err := c.podInformer. agentPods, err := c.agentPodInformer.
Lister(). Lister().
Pods(ControllerManagerNamespace). Pods(ControllerManagerNamespace).
List(agentSelector) List(agentSelector)
@ -69,7 +77,7 @@ func (c *annotaterController) Sync(ctx controllerlib.Context) error {
} }
for _, agentPod := range agentPods { for _, agentPod := range agentPods {
controllerManagerPod, err := findControllerManagerPod(agentPod, c.podInformer) controllerManagerPod, err := findControllerManagerPodForSpecificAgentPod(agentPod, c.kubeSystemPodInformer)
if err != nil { if err != nil {
return err return err
} }
@ -104,7 +112,7 @@ func (c *annotaterController) maybeUpdateAgentPod(
keyPathOK bool, keyPathOK bool,
) error { ) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error { return retry.RetryOnConflict(retry.DefaultRetry, func() error {
agentPod, err := c.podInformer.Lister().Pods(ControllerManagerNamespace).Get(name) agentPod, err := c.agentPodInformer.Lister().Pods(ControllerManagerNamespace).Get(name)
if err != nil { if err != nil {
return err return err
} }

View File

@ -25,12 +25,13 @@ import (
) )
func TestAnnotaterControllerFilter(t *testing.T) { func TestAnnotaterControllerFilter(t *testing.T) {
runFilterTest( defineSharedKubecertagentFilterSpecs(
t, t,
"AnnotaterControllerFilter", "AnnotaterControllerFilter",
func( func(
agentPodTemplate *corev1.Pod, agentPodTemplate *corev1.Pod,
podsInformer corev1informers.PodInformer, kubeSystemPodInformer corev1informers.PodInformer,
agentPodInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption, observableWithInformerOption *testutil.ObservableWithInformerOption,
) { ) {
_ = NewAnnotaterController( _ = NewAnnotaterController(
@ -38,7 +39,8 @@ func TestAnnotaterControllerFilter(t *testing.T) {
Template: agentPodTemplate, Template: agentPodTemplate,
}, },
nil, // k8sClient, shouldn't matter nil, // k8sClient, shouldn't matter
podsInformer, kubeSystemPodInformer,
agentPodInformer,
observableWithInformerOption.WithInformer, observableWithInformerOption.WithInformer,
) )
}, },
@ -61,8 +63,10 @@ func TestAnnotaterControllerSync(t *testing.T) {
var subject controllerlib.Controller var subject controllerlib.Controller
var kubeAPIClient *kubernetesfake.Clientset var kubeAPIClient *kubernetesfake.Clientset
var kubeInformerClient *kubernetesfake.Clientset var kubeSystemInformerClient *kubernetesfake.Clientset
var kubeInformers kubeinformers.SharedInformerFactory var kubeSystemInformers kubeinformers.SharedInformerFactory
var agentInformerClient *kubernetesfake.Clientset
var agentInformers kubeinformers.SharedInformerFactory
var timeoutContext context.Context var timeoutContext context.Context
var timeoutContextCancel context.CancelFunc var timeoutContextCancel context.CancelFunc
var syncContext *controllerlib.Context var syncContext *controllerlib.Context
@ -132,15 +136,9 @@ func TestAnnotaterControllerSync(t *testing.T) {
agentPod := agentPodTemplate.DeepCopy() agentPod := agentPodTemplate.DeepCopy()
agentPod.Namespace = kubeSystemNamespace agentPod.Namespace = kubeSystemNamespace
agentPod.Name += controllerManagerPodHash agentPod.Name += controllerManagerPodHash
agentPod.OwnerReferences = []metav1.OwnerReference{ agentPod.Annotations = map[string]string{
{ "kube-cert-agent.pinniped.dev/controller-manager-name": controllerManagerPod.Name,
APIVersion: "v1", "kube-cert-agent.pinniped.dev/controller-manager-uid": string(controllerManagerPod.UID),
Kind: "Pod",
Name: controllerManagerPod.Name,
UID: controllerManagerPod.UID,
Controller: boolPtr(true),
BlockOwnerDeletion: boolPtr(true),
},
} }
agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts
agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever
@ -166,7 +164,8 @@ func TestAnnotaterControllerSync(t *testing.T) {
KeyPathAnnotation: keyPathAnnotation, KeyPathAnnotation: keyPathAnnotation,
}, },
kubeAPIClient, kubeAPIClient,
kubeInformers.Core().V1().Pods(), kubeSystemInformers.Core().V1().Pods(),
agentInformers.Core().V1().Pods(),
controllerlib.WithInformer, controllerlib.WithInformer,
) )
@ -181,24 +180,30 @@ func TestAnnotaterControllerSync(t *testing.T) {
} }
// Must start informers before calling TestRunSynchronously() // Must start informers before calling TestRunSynchronously()
kubeInformers.Start(timeoutContext.Done()) kubeSystemInformers.Start(timeoutContext.Done())
agentInformers.Start(timeoutContext.Done())
controllerlib.TestRunSynchronously(t, subject) controllerlib.TestRunSynchronously(t, subject)
} }
it.Before(func() { it.Before(func() {
r = require.New(t) r = require.New(t)
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
kubeInformerClient = kubernetesfake.NewSimpleClientset()
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
kubeAPIClient = kubernetesfake.NewSimpleClientset() kubeAPIClient = kubernetesfake.NewSimpleClientset()
kubeSystemInformerClient = kubernetesfake.NewSimpleClientset()
kubeSystemInformers = kubeinformers.NewSharedInformerFactory(kubeSystemInformerClient, 0)
agentInformerClient = kubernetesfake.NewSimpleClientset()
agentInformers = kubeinformers.NewSharedInformerFactory(agentInformerClient, 0)
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
// Add a pod into the test that doesn't matter to make sure we don't accidentally trigger any // Add a pod into the test that doesn't matter to make sure we don't accidentally trigger any
// logic on this thing. // logic on this thing.
ignorablePod := corev1.Pod{} ignorablePod := corev1.Pod{}
ignorablePod.Name = "some-ignorable-pod" ignorablePod.Name = "some-ignorable-pod"
r.NoError(kubeInformerClient.Tracker().Add(&ignorablePod)) r.NoError(kubeSystemInformerClient.Tracker().Add(&ignorablePod))
r.NoError(agentInformerClient.Tracker().Add(&ignorablePod))
r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod)) r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod))
}) })
@ -208,13 +213,13 @@ func TestAnnotaterControllerSync(t *testing.T) {
when("there is an agent pod without annotations set", func() { when("there is an agent pod without annotations set", func() {
it.Before(func() { it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(agentPod)) r.NoError(agentInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod)) r.NoError(kubeAPIClient.Tracker().Add(agentPod))
}) })
when("there is a matching controller manager pod", func() { when("there is a matching controller manager pod", func() {
it.Before(func() { it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -223,7 +228,6 @@ func TestAnnotaterControllerSync(t *testing.T) {
r.NoError(controllerlib.TestSync(t, subject, *syncContext)) r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy() updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[certPathAnnotation] = certPath updatedAgentPod.Annotations[certPathAnnotation] = certPath
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
@ -247,7 +251,7 @@ func TestAnnotaterControllerSync(t *testing.T) {
"--cluster-signing-cert-file", certPath, "--cluster-signing-cert-file", certPath,
"--cluster-signing-key-file", keyPath, "--cluster-signing-key-file", keyPath,
} }
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -256,7 +260,6 @@ func TestAnnotaterControllerSync(t *testing.T) {
r.NoError(controllerlib.TestSync(t, subject, *syncContext)) r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy() updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[certPathAnnotation] = certPath updatedAgentPod.Annotations[certPathAnnotation] = certPath
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
@ -280,7 +283,7 @@ func TestAnnotaterControllerSync(t *testing.T) {
"--cluster-signing-cert-file-blah", certPath, "--cluster-signing-cert-file-blah", certPath,
"--cluster-signing-key-file-blah", keyPath, "--cluster-signing-key-file-blah", keyPath,
} }
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -301,7 +304,7 @@ func TestAnnotaterControllerSync(t *testing.T) {
"--cluster-signing-cert-file-blah", certPath, "--cluster-signing-cert-file-blah", certPath,
"--cluster-signing-key-file", keyPath, "--cluster-signing-key-file", keyPath,
} }
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -310,7 +313,6 @@ func TestAnnotaterControllerSync(t *testing.T) {
r.NoError(controllerlib.TestSync(t, subject, *syncContext)) r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy() updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
r.Equal( r.Equal(
[]coretesting.Action{ []coretesting.Action{
@ -332,7 +334,7 @@ func TestAnnotaterControllerSync(t *testing.T) {
"--cluster-signing-cert-file", certPath, "--cluster-signing-cert-file", certPath,
"--cluster-signing-key-file-blah", keyPath, "--cluster-signing-key-file-blah", keyPath,
} }
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -341,7 +343,6 @@ func TestAnnotaterControllerSync(t *testing.T) {
r.NoError(controllerlib.TestSync(t, subject, *syncContext)) r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy() updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[certPathAnnotation] = certPath updatedAgentPod.Annotations[certPathAnnotation] = certPath
r.Equal( r.Equal(
[]coretesting.Action{ []coretesting.Action{
@ -359,7 +360,7 @@ func TestAnnotaterControllerSync(t *testing.T) {
when("there is a non-matching controller manager pod via uid", func() { when("there is a non-matching controller manager pod via uid", func() {
it.Before(func() { it.Before(func() {
controllerManagerPod.UID = "some-other-controller-manager-uid" controllerManagerPod.UID = "some-other-controller-manager-uid"
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -376,7 +377,7 @@ func TestAnnotaterControllerSync(t *testing.T) {
when("there is a non-matching controller manager pod via name", func() { when("there is a non-matching controller manager pod via name", func() {
it.Before(func() { it.Before(func() {
controllerManagerPod.Name = "some-other-controller-manager-name" controllerManagerPod.Name = "some-other-controller-manager-name"
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -396,13 +397,13 @@ func TestAnnotaterControllerSync(t *testing.T) {
agentPod.Annotations = make(map[string]string) agentPod.Annotations = make(map[string]string)
agentPod.Annotations[certPathAnnotation] = certPath agentPod.Annotations[certPathAnnotation] = certPath
agentPod.Annotations[keyPathAnnotation] = keyPath agentPod.Annotations[keyPathAnnotation] = keyPath
r.NoError(kubeInformerClient.Tracker().Add(agentPod)) r.NoError(agentInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod)) r.NoError(kubeAPIClient.Tracker().Add(agentPod))
}) })
when("there is a matching controller manager pod", func() { when("there is a matching controller manager pod", func() {
it.Before(func() { it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -419,16 +420,15 @@ func TestAnnotaterControllerSync(t *testing.T) {
when("there is an agent pod with the wrong cert annotation", func() { when("there is an agent pod with the wrong cert annotation", func() {
it.Before(func() { it.Before(func() {
agentPod.Annotations = make(map[string]string)
agentPod.Annotations[certPathAnnotation] = "wrong" agentPod.Annotations[certPathAnnotation] = "wrong"
agentPod.Annotations[keyPathAnnotation] = keyPath agentPod.Annotations[keyPathAnnotation] = keyPath
r.NoError(kubeInformerClient.Tracker().Add(agentPod)) r.NoError(agentInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod)) r.NoError(kubeAPIClient.Tracker().Add(agentPod))
}) })
when("there is a matching controller manager pod", func() { when("there is a matching controller manager pod", func() {
it.Before(func() { it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })
@ -454,16 +454,15 @@ func TestAnnotaterControllerSync(t *testing.T) {
when("there is an agent pod with the wrong key annotation", func() { when("there is an agent pod with the wrong key annotation", func() {
it.Before(func() { it.Before(func() {
agentPod.Annotations = make(map[string]string)
agentPod.Annotations[certPathAnnotation] = certPath agentPod.Annotations[certPathAnnotation] = certPath
agentPod.Annotations[keyPathAnnotation] = "key" agentPod.Annotations[keyPathAnnotation] = "key"
r.NoError(kubeInformerClient.Tracker().Add(agentPod)) r.NoError(agentInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod)) r.NoError(kubeAPIClient.Tracker().Add(agentPod))
}) })
when("there is a matching controller manager pod", func() { when("there is a matching controller manager pod", func() {
it.Before(func() { it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
}) })

View File

@ -6,6 +6,7 @@ package kubecertagent
import ( import (
"fmt" "fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1" corev1informers "k8s.io/client-go/informers/core/v1"
@ -47,9 +48,7 @@ func NewCreaterController(
}, },
withInformer( withInformer(
kubeSystemPodInformer, kubeSystemPodInformer,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { pinnipedcontroller.SimpleFilter(isControllerManagerPod),
return isControllerManagerPod(obj)
}),
controllerlib.InformerOption{}, controllerlib.InformerOption{},
), ),
withInformer( withInformer(
@ -75,7 +74,7 @@ func (c *createrController) Sync(ctx controllerlib.Context) error {
} }
for _, controllerManagerPod := range controllerManagerPods { for _, controllerManagerPod := range controllerManagerPods {
agentPod, err := findAgentPod( agentPod, err := findAgentPodForSpecificControllerManagerPod(
controllerManagerPod, controllerManagerPod,
c.kubeSystemPodInformer, c.kubeSystemPodInformer,
c.agentPodInformer, c.agentPodInformer,
@ -102,9 +101,39 @@ func (c *createrController) Sync(ctx controllerlib.Context) error {
} }
} }
// The deleter controller handles the case where the expected fields do not match in the agent // The deleter controller handles the case where the expected fields do not match in the agent pod.
// pod.
} }
return nil return nil
} }
func findAgentPodForSpecificControllerManagerPod(
controllerManagerPod *corev1.Pod,
kubeSystemPodInformer corev1informers.PodInformer,
agentPodInformer corev1informers.PodInformer,
agentLabels map[string]string,
) (*corev1.Pod, error) {
agentSelector := labels.SelectorFromSet(agentLabels)
agentPods, err := agentPodInformer.
Lister().
List(agentSelector)
if err != nil {
return nil, fmt.Errorf("informer cannot list agent pods: %w", err)
}
for _, maybeAgentPod := range agentPods {
maybeControllerManagerPod, err := findControllerManagerPodForSpecificAgentPod(
maybeAgentPod,
kubeSystemPodInformer,
)
if err != nil {
return nil, err
}
if maybeControllerManagerPod != nil &&
maybeControllerManagerPod.UID == controllerManagerPod.UID {
return maybeAgentPod, nil
}
}
return nil, nil
}

View File

@ -25,13 +25,13 @@ import (
) )
func TestCreaterControllerFilter(t *testing.T) { func TestCreaterControllerFilter(t *testing.T) {
runFilterTest( defineSharedKubecertagentFilterSpecs(
t, t,
"CreaterControllerFilter", "CreaterControllerFilter",
func( func(
agentPodTemplate *corev1.Pod, agentPodTemplate *corev1.Pod,
kubeSystemPodInformer corev1informers.PodInformer, kubeSystemPodInformer corev1informers.PodInformer,
//agentPodInformer corev1informers.PodInformer, agentPodInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption, observableWithInformerOption *testutil.ObservableWithInformerOption,
) { ) {
_ = NewCreaterController( _ = NewCreaterController(
@ -40,7 +40,7 @@ func TestCreaterControllerFilter(t *testing.T) {
}, },
nil, // k8sClient, shouldn't matter nil, // k8sClient, shouldn't matter
kubeSystemPodInformer, kubeSystemPodInformer,
nil, //agentPodInformer, agentPodInformer,
observableWithInformerOption.WithInformer, observableWithInformerOption.WithInformer,
) )
}, },

View File

@ -44,10 +44,15 @@ func NewDeleterController(
agentPodInformer: agentPodInformer, agentPodInformer: agentPodInformer,
}, },
}, },
withInformer(
kubeSystemPodInformer,
pinnipedcontroller.SimpleFilter(isControllerManagerPod),
controllerlib.InformerOption{},
),
withInformer( withInformer(
agentPodInformer, agentPodInformer,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels) return isAgentPod(obj, agentInfo.Template.Labels)
}), }),
controllerlib.InformerOption{}, controllerlib.InformerOption{},
), ),
@ -66,7 +71,7 @@ func (c *deleterController) Sync(ctx controllerlib.Context) error {
} }
for _, agentPod := range agentPods { for _, agentPod := range agentPods {
controllerManagerPod, err := findControllerManagerPod(agentPod, c.kubeSystemPodInformer) controllerManagerPod, err := findControllerManagerPodForSpecificAgentPod(agentPod, c.kubeSystemPodInformer)
if err != nil { if err != nil {
return err return err
} }

View File

@ -25,12 +25,13 @@ import (
) )
func TestDeleterControllerFilter(t *testing.T) { func TestDeleterControllerFilter(t *testing.T) {
runFilterTest( defineSharedKubecertagentFilterSpecs(
t, t,
"DeleterControllerFilter", "DeleterControllerFilter",
func( func(
agentPodTemplate *corev1.Pod, agentPodTemplate *corev1.Pod,
podsInformer corev1informers.PodInformer, kubeSystemPodInformer corev1informers.PodInformer,
agentPodInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption, observableWithInformerOption *testutil.ObservableWithInformerOption,
) { ) {
_ = NewDeleterController( _ = NewDeleterController(
@ -38,7 +39,8 @@ func TestDeleterControllerFilter(t *testing.T) {
Template: agentPodTemplate, Template: agentPodTemplate,
}, },
nil, // k8sClient, shouldn't matter nil, // k8sClient, shouldn't matter
podsInformer, kubeSystemPodInformer,
agentPodInformer,
observableWithInformerOption.WithInformer, observableWithInformerOption.WithInformer,
) )
}, },
@ -185,6 +187,7 @@ func TestDeleterControllerSync(t *testing.T) {
// trigger any logic on this thing. // trigger any logic on this thing.
ignorablePod := corev1.Pod{} ignorablePod := corev1.Pod{}
ignorablePod.Name = "some-ignorable-pod" ignorablePod.Name = "some-ignorable-pod"
r.NoError(kubeSystemInformerClient.Tracker().Add(&ignorablePod))
r.NoError(agentInformerClient.Tracker().Add(&ignorablePod)) r.NoError(agentInformerClient.Tracker().Add(&ignorablePod))
r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod)) r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod))
}) })
@ -410,11 +413,11 @@ func TestDeleterControllerSync(t *testing.T) {
it.Before(func() { it.Before(func() {
updatedAgentPod := agentPod.DeepCopy() updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Spec.RestartPolicy = corev1.RestartPolicyAlways updatedAgentPod.Spec.RestartPolicy = corev1.RestartPolicyAlways
r.NoError(kubeSystemInformerClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace)) r.NoError(agentInformerClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace)) r.NoError(kubeAPIClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
}) })
it.Pend("deletes the agent pod", func() { it("deletes the agent pod", func() {
startInformersAndController() startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext) err := controllerlib.TestSync(t, subject, *syncContext)
@ -435,11 +438,11 @@ func TestDeleterControllerSync(t *testing.T) {
when("the agent pod is out of sync via automount service account tokem", func() { when("the agent pod is out of sync via automount service account tokem", func() {
it.Before(func() { it.Before(func() {
agentPod.Spec.AutomountServiceAccountToken = boolPtr(true) agentPod.Spec.AutomountServiceAccountToken = boolPtr(true)
r.NoError(kubeSystemInformerClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace)) r.NoError(agentInformerClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace)) r.NoError(kubeAPIClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace))
}) })
it.Pend("deletes the agent pod", func() { it("deletes the agent pod", func() {
startInformersAndController() startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext) err := controllerlib.TestSync(t, subject, *syncContext)

View File

@ -18,7 +18,6 @@ import (
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1" corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -145,50 +144,19 @@ func isAgentPodUpToDate(actualAgentPod, expectedAgentPod *corev1.Pod) bool {
) )
} }
func findAgentPod( func findControllerManagerPodForSpecificAgentPod(
controllerManagerPod *corev1.Pod,
kubeSystemPodInformer corev1informers.PodInformer,
agentPodInformer corev1informers.PodInformer,
agentLabels map[string]string,
) (*corev1.Pod, error) {
agentSelector := labels.SelectorFromSet(agentLabels)
agentPods, err := agentPodInformer.
Lister().
List(agentSelector)
if err != nil {
return nil, fmt.Errorf("informer cannot list agent pods: %w", err)
}
for _, maybeAgentPod := range agentPods {
maybeControllerManagerPod, err := findControllerManagerPod(
maybeAgentPod,
kubeSystemPodInformer,
)
if err != nil {
return nil, err
}
if maybeControllerManagerPod != nil &&
maybeControllerManagerPod.UID == controllerManagerPod.UID {
return maybeAgentPod, nil
}
}
return nil, nil
}
func findControllerManagerPod(
agentPod *corev1.Pod, agentPod *corev1.Pod,
informer corev1informers.PodInformer, informer corev1informers.PodInformer,
) (*corev1.Pod, error) { ) (*corev1.Pod, error) {
name, ok := agentPod.Annotations[controllerManagerNameAnnotationKey] name, ok := agentPod.Annotations[controllerManagerNameAnnotationKey]
if !ok { if !ok {
klog.InfoS("agent pod missing parent name annotation", "pod", agentPod) klog.InfoS("agent pod missing parent name annotation", "pod", agentPod.Name)
return nil, nil return nil, nil
} }
uid, ok := agentPod.Annotations[controllerManagerUIDAnnotationKey] uid, ok := agentPod.Annotations[controllerManagerUIDAnnotationKey]
if !ok { if !ok {
klog.InfoS("agent pod missing parent uid annotation", "pod", agentPod) klog.InfoS("agent pod missing parent uid annotation", "pod", agentPod.Name)
return nil, nil return nil, nil
} }

View File

@ -17,18 +17,14 @@ import (
"go.pinniped.dev/internal/testutil" "go.pinniped.dev/internal/testutil"
) )
func runFilterTest( func defineSharedKubecertagentFilterSpecs(
t *testing.T, t *testing.T,
name string, name string,
newFunc func( newFunc func(agentPodTemplate *corev1.Pod, kubeSystemPodInformer corev1informers.PodInformer, agentPodInformer corev1informers.PodInformer, observableWithInformerOption *testutil.ObservableWithInformerOption),
agentPodTemplate *corev1.Pod,
podsInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption,
),
) { ) {
spec.Run(t, name, func(t *testing.T, when spec.G, it spec.S) { spec.Run(t, name, func(t *testing.T, when spec.G, it spec.S) {
var r *require.Assertions var r *require.Assertions
var subject controllerlib.Filter var kubeSystemPodInformerFilter, agentPodInformerFilter controllerlib.Filter
whateverPod := &corev1.Pod{} whateverPod := &corev1.Pod{}
@ -40,98 +36,104 @@ func runFilterTest(
"some-label-key": "some-label-value", "some-label-key": "some-label-value",
"some-other-label-key": "some-other-label-value", "some-other-label-key": "some-other-label-value",
} }
podsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Pods() kubeSystemPodInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Pods()
agentPodInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Pods()
observableWithInformerOption := testutil.NewObservableWithInformerOption() observableWithInformerOption := testutil.NewObservableWithInformerOption()
newFunc(agentPodTemplate, podsInformer, observableWithInformerOption) newFunc(agentPodTemplate, kubeSystemPodInformer, agentPodInformer, observableWithInformerOption)
subject = observableWithInformerOption.GetFilterForInformer(podsInformer) kubeSystemPodInformerFilter = observableWithInformerOption.GetFilterForInformer(kubeSystemPodInformer)
agentPodInformerFilter = observableWithInformerOption.GetFilterForInformer(agentPodInformer)
}) })
when("a pod with the proper controller manager labels and phase is added/updated/deleted", func() { when("the event is happening in the kube system namespace", func() {
it("returns true", func() { when("a pod with the proper controller manager labels and phase is added/updated/deleted", func() {
pod := &corev1.Pod{ it("returns true", func() {
ObjectMeta: metav1.ObjectMeta{ pod := &corev1.Pod{
Labels: map[string]string{ ObjectMeta: metav1.ObjectMeta{
"component": "kube-controller-manager", Labels: map[string]string{
"component": "kube-controller-manager",
},
}, },
}, Status: corev1.PodStatus{
Status: corev1.PodStatus{ Phase: corev1.PodRunning,
Phase: corev1.PodRunning, },
}, }
}
r.True(subject.Add(pod)) r.True(kubeSystemPodInformerFilter.Add(pod))
r.True(subject.Update(whateverPod, pod)) r.True(kubeSystemPodInformerFilter.Update(whateverPod, pod))
r.True(subject.Update(pod, whateverPod)) r.True(kubeSystemPodInformerFilter.Update(pod, whateverPod))
r.True(subject.Delete(pod)) r.True(kubeSystemPodInformerFilter.Delete(pod))
})
})
when("a pod without the proper controller manager label is added/updated/deleted", func() {
it("returns false", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
r.False(kubeSystemPodInformerFilter.Add(pod))
r.False(kubeSystemPodInformerFilter.Update(whateverPod, pod))
r.False(kubeSystemPodInformerFilter.Update(pod, whateverPod))
r.False(kubeSystemPodInformerFilter.Delete(pod))
})
})
when("a pod without the proper controller manager phase is added/updated/deleted", func() {
it("returns false", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"component": "kube-controller-manager",
},
},
}
r.False(kubeSystemPodInformerFilter.Add(pod))
r.False(kubeSystemPodInformerFilter.Update(whateverPod, pod))
r.False(kubeSystemPodInformerFilter.Update(pod, whateverPod))
r.False(kubeSystemPodInformerFilter.Delete(pod))
})
}) })
}) })
when("a pod without the proper controller manager label is added/updated/deleted", func() { when("the change is happening in the agent's namespace", func() {
it("returns false", func() { when("a pod with all the agent labels is added/updated/deleted", func() {
pod := &corev1.Pod{ it("returns true", func() {
ObjectMeta: metav1.ObjectMeta{}, pod := &corev1.Pod{
Status: corev1.PodStatus{ ObjectMeta: metav1.ObjectMeta{
Phase: corev1.PodRunning, Labels: map[string]string{
}, "some-label-key": "some-label-value",
} "some-other-label-key": "some-other-label-value",
},
r.False(subject.Add(pod))
r.False(subject.Update(whateverPod, pod))
r.False(subject.Update(pod, whateverPod))
r.False(subject.Delete(pod))
})
})
when("a pod without the proper controller manager phase is added/updated/deleted", func() {
it("returns false", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"component": "kube-controller-manager",
}, },
}, }
}
r.False(subject.Add(pod)) r.True(agentPodInformerFilter.Add(pod))
r.False(subject.Update(whateverPod, pod)) r.True(agentPodInformerFilter.Update(whateverPod, pod))
r.False(subject.Update(pod, whateverPod)) r.True(agentPodInformerFilter.Update(pod, whateverPod))
r.False(subject.Delete(pod)) r.True(agentPodInformerFilter.Delete(pod))
})
}) })
})
when("a pod with all the agent labels is added/updated/deleted", func() { when("a pod missing any of the agent labels is added/updated/deleted", func() {
it("returns true", func() { it("returns false", func() {
pod := &corev1.Pod{ pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{ Labels: map[string]string{
"some-label-key": "some-label-value", "some-other-label-key": "some-other-label-value",
"some-other-label-key": "some-other-label-value", },
}, },
}, }
}
r.True(subject.Add(pod)) r.False(agentPodInformerFilter.Add(pod))
r.True(subject.Update(whateverPod, pod)) r.False(agentPodInformerFilter.Update(whateverPod, pod))
r.True(subject.Update(pod, whateverPod)) r.False(agentPodInformerFilter.Update(pod, whateverPod))
r.True(subject.Delete(pod)) r.False(agentPodInformerFilter.Delete(pod))
}) })
})
when("a pod missing one of the agent labels is added/updated/deleted", func() {
it("returns false", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"some-other-label-key": "some-other-label-value",
},
},
}
r.False(subject.Add(pod))
r.False(subject.Update(whateverPod, pod))
r.False(subject.Update(pod, whateverPod))
r.False(subject.Delete(pod))
}) })
}) })
}) })

View File

@ -173,6 +173,7 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
}, },
k8sClient, k8sClient,
informers.kubeSystemNamespaceK8s.Core().V1().Pods(), informers.kubeSystemNamespaceK8s.Core().V1().Pods(),
informers.installationNamespaceK8s.Core().V1().Pods(),
controllerlib.WithInformer, controllerlib.WithInformer,
), ),
singletonWorker, singletonWorker,
@ -184,6 +185,7 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
}, },
k8sClient, k8sClient,
informers.kubeSystemNamespaceK8s.Core().V1().Pods(), informers.kubeSystemNamespaceK8s.Core().V1().Pods(),
informers.installationNamespaceK8s.Core().V1().Pods(),
controllerlib.WithInformer, controllerlib.WithInformer,
), ),
singletonWorker, singletonWorker,
@ -197,6 +199,7 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
}, },
k8sClient, k8sClient,
informers.kubeSystemNamespaceK8s.Core().V1().Pods(), informers.kubeSystemNamespaceK8s.Core().V1().Pods(),
informers.installationNamespaceK8s.Core().V1().Pods(),
controllerlib.WithInformer, controllerlib.WithInformer,
), ),
singletonWorker, singletonWorker,