From 5a608cc84c1e9753f1404bed7e6def04ff97583a Mon Sep 17 00:00:00 2001 From: Andrew Keesler Date: Mon, 21 Sep 2020 14:16:14 -0400 Subject: [PATCH] Add kube-cert-agent controller for getting kube API keypair --- .../kubecertauthority/kubecertauthority.go | 89 ++-- .../kubecertauthority_test.go | 89 ++-- .../controller/kubecertagent/annotater.go | 178 +++++++ .../kubecertagent/annotater_test.go | 490 +++++++++++++++++ internal/controller/kubecertagent/creater.go | 95 ++++ .../controller/kubecertagent/creater_test.go | 286 ++++++++++ internal/controller/kubecertagent/deleter.go | 84 +++ .../controller/kubecertagent/deleter_test.go | 491 ++++++++++++++++++ .../controller/kubecertagent/kubecertagent.go | 214 ++++++++ .../kubecertagent/kubecertagent_test.go | 138 +++++ .../controllermanager/prepare_controllers.go | 216 +++++--- internal/server/server.go | 89 +++- pkg/config/api/types.go | 19 +- pkg/config/config.go | 15 + pkg/config/config_test.go | 16 +- test/integration/kubecertagent_test.go | 113 ++++ 16 files changed, 2456 insertions(+), 166 deletions(-) create mode 100644 internal/controller/kubecertagent/annotater.go create mode 100644 internal/controller/kubecertagent/annotater_test.go create mode 100644 internal/controller/kubecertagent/creater.go create mode 100644 internal/controller/kubecertagent/creater_test.go create mode 100644 internal/controller/kubecertagent/deleter.go create mode 100644 internal/controller/kubecertagent/deleter_test.go create mode 100644 internal/controller/kubecertagent/kubecertagent.go create mode 100644 internal/controller/kubecertagent/kubecertagent_test.go create mode 100644 test/integration/kubecertagent_test.go diff --git a/internal/certauthority/kubecertauthority/kubecertauthority.go b/internal/certauthority/kubecertauthority/kubecertauthority.go index da30b506..a966c2e4 100644 --- a/internal/certauthority/kubecertauthority/kubecertauthority.go +++ b/internal/certauthority/kubecertauthority/kubecertauthority.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/deprecated/scheme" @@ -26,8 +25,8 @@ import ( "go.pinniped.dev/internal/constable" ) -// ErrNoKubeControllerManagerPod is returned when no kube-controller-manager pod is found on the cluster. -const ErrNoKubeControllerManagerPod = constable.Error("did not find kube-controller-manager pod") +// ErrNoKubeCertAgentPod is returned when no kube-cert-agent pod is found on the cluster. +const ErrNoKubeCertAgentPod = constable.Error("did not find kube-cert-agent pod") const ErrIncapableOfIssuingCertificates = constable.Error("this cluster is not currently capable of issuing certificates") const k8sAPIServerCACertPEMDefaultPath = "/etc/kubernetes/ca/ca.pem" @@ -79,7 +78,25 @@ func (s *kubeClientPodCommandExecutor) Exec(podNamespace string, podName string, return stdoutBuf.String(), nil } +// AgentInfo is a data object that holds the fields necessary for a CA to communicate with an agent +// pod. +type AgentInfo struct { + // Namespace is the namespace in which the agent pod is running. + Namespace string + // LabelSelector is a label selector (e.g., "label-key=label=value") that can be used to filter + // the agent pods. + LabelSelector string + // CertPathAnnotation is the annotation used by the agent pod to indicate the path to the CA cert + // inside the pod. + CertPathAnnotation string + // KeyPathAnnotation is the annotation used by the agent pod to indicate the path to the CA key + // inside the pod. + KeyPathAnnotation string +} + type CA struct { + agentInfo *AgentInfo + kubeClient kubernetes.Interface podCommandExecutor PodCommandExecutor @@ -103,14 +120,18 @@ type FailureCallback func(error) // API server's private key in case it failed previously or case the key has changed. It returns // a function that can be used to shut down that goroutine. Future attempts made by that goroutine // to get the key will also result in success or failure callbacks. +// +// The CA will try to read (via cat(1)) the kube API server's private key from an agent pod located +// via the provided agentInfo. func New( + agentInfo *AgentInfo, kubeClient kubernetes.Interface, podCommandExecutor PodCommandExecutor, tick <-chan time.Time, onSuccessfulRefresh SuccessCallback, onFailedRefresh FailureCallback, ) (*CA, ShutdownFunc) { - signer, err := createSignerWithAPIServerSecret(kubeClient, podCommandExecutor) + signer, err := createSignerWithAPIServerSecret(agentInfo, kubeClient, podCommandExecutor) if err != nil { klog.Errorf("could not initially fetch the API server's signing key: %s", err) signer = nil @@ -119,6 +140,7 @@ func New( onSuccessfulRefresh() } result := &CA{ + agentInfo: agentInfo, kubeClient: kubeClient, podCommandExecutor: podCommandExecutor, shutdown: make(chan struct{}), @@ -131,15 +153,19 @@ func New( return result, result.shutdownRefresh } -func createSignerWithAPIServerSecret(kubeClient kubernetes.Interface, podCommandExecutor PodCommandExecutor) (signer, error) { +func createSignerWithAPIServerSecret( + agentInfo *AgentInfo, + kubeClient kubernetes.Interface, + podCommandExecutor PodCommandExecutor, +) (signer, error) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - pod, err := findControllerManagerPod(ctx, kubeClient) + pod, err := findCertAgentPod(ctx, kubeClient, agentInfo.Namespace, agentInfo.LabelSelector) if err != nil { return nil, err } - certPath, keyPath := getKeypairFilePaths(pod) + certPath, keyPath := getKeypairFilePaths(pod, agentInfo) certPEM, err := podCommandExecutor.Exec(pod.Namespace, pod.Name, "cat", certPath) if err != nil { @@ -167,7 +193,11 @@ func (c *CA) refreshLoop(tick <-chan time.Time) { } func (c *CA) updateSigner() { - newSigner, err := createSignerWithAPIServerSecret(c.kubeClient, c.podCommandExecutor) + newSigner, err := createSignerWithAPIServerSecret( + c.agentInfo, + c.kubeClient, + c.podCommandExecutor, + ) if err != nil { klog.Errorf("could not create signer with API server secret: %s", err) c.onFailedRefresh(err) @@ -198,36 +228,35 @@ func (c *CA) IssuePEM(subject pkix.Name, dnsNames []string, ttl time.Duration) ( return signer.IssuePEM(subject, dnsNames, ttl) } -func findControllerManagerPod(ctx context.Context, kubeClient kubernetes.Interface) (*v1.Pod, error) { - pods, err := kubeClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{ - LabelSelector: "component=kube-controller-manager", +func findCertAgentPod(ctx context.Context, kubeClient kubernetes.Interface, namespace, labelSelector string) (*v1.Pod, error) { + pods, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, FieldSelector: "status.phase=Running", }) if err != nil { - return nil, fmt.Errorf("could not check for kube-controller-manager pod: %w", err) + return nil, fmt.Errorf("could not check for kube-cert-agent pod: %w", err) } for _, pod := range pods.Items { return &pod, nil } - return nil, ErrNoKubeControllerManagerPod + return nil, ErrNoKubeCertAgentPod } -func getKeypairFilePaths(pod *v1.Pod) (string, string) { - certPath := getContainerArgByName(pod, "cluster-signing-cert-file", k8sAPIServerCACertPEMDefaultPath) - keyPath := getContainerArgByName(pod, "cluster-signing-key-file", k8sAPIServerCAKeyPEMDefaultPath) +func getKeypairFilePaths(pod *v1.Pod, agentInfo *AgentInfo) (string, string) { + annotations := pod.Annotations + if annotations == nil { + annotations = make(map[string]string) + } + + certPath, ok := annotations[agentInfo.CertPathAnnotation] + if !ok { + certPath = k8sAPIServerCACertPEMDefaultPath + } + + keyPath, ok := annotations[agentInfo.KeyPathAnnotation] + if !ok { + keyPath = k8sAPIServerCAKeyPEMDefaultPath + } + return certPath, keyPath } - -func getContainerArgByName(pod *v1.Pod, name string, defaultValue string) string { - for _, container := range pod.Spec.Containers { - flagset := pflag.NewFlagSet("", pflag.ContinueOnError) - flagset.ParseErrorsWhitelist = pflag.ParseErrorsWhitelist{UnknownFlags: true} - var val string - flagset.StringVar(&val, name, "", "") - _ = flagset.Parse(append(container.Command, container.Args...)) - if val != "" { - return val - } - } - return defaultValue -} diff --git a/internal/certauthority/kubecertauthority/kubecertauthority_test.go b/internal/certauthority/kubecertauthority/kubecertauthority_test.go index 0c2dc0c7..3807778d 100644 --- a/internal/certauthority/kubecertauthority/kubecertauthority_test.go +++ b/internal/certauthority/kubecertauthority/kubecertauthority_test.go @@ -102,6 +102,7 @@ func TestCA(t *testing.T) { var neverTicker <-chan time.Time var callbacks *callbackRecorder var logger *testutil.TranscriptLogger + var agentInfo AgentInfo var requireInitialFailureLogMessage = func(specificErrorMessage string) { r.Len(logger.Transcript(), 1) @@ -136,18 +137,26 @@ func TestCA(t *testing.T) { fakeCert2PEM = loadFile("./testdata/test2.crt") fakeKey2PEM = loadFile("./testdata/test2.key") + agentInfo = AgentInfo{ + Namespace: "some-agent-namespace", + LabelSelector: "some-label-key=some-label-value", + CertPathAnnotation: "some-cert-path-annotation", + KeyPathAnnotation: "some-key-path-annotation", + } fakePod = &corev1.Pod{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ Name: "fake-pod", - Namespace: "kube-system", - Labels: map[string]string{"component": "kube-controller-manager"}, + Namespace: agentInfo.Namespace, + Labels: map[string]string{ + "some-label-key": "some-label-value", + }, }, Spec: corev1.PodSpec{ - Containers: []corev1.Container{{Name: "kube-controller-manager"}}, + Containers: []corev1.Container{{Name: "some-agent-container-name"}}, }, Status: corev1.PodStatus{ - Phase: "Running", + Phase: corev1.PodRunning, }, } @@ -172,7 +181,7 @@ func TestCA(t *testing.T) { klog.SetLogger(nil) }) - when("the kube-controller-manager pod is found with default CLI flag values", func() { + when("the agent pod is found with default CLI flag values", func() { it.Before(func() { err := kubeAPIClient.Tracker().Add(fakePod) r.NoError(err) @@ -182,16 +191,16 @@ func TestCA(t *testing.T) { it("finds the API server's signing key and uses it to issue certificates", func() { fakeTicker := make(chan time.Time) - subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) + subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() r.Equal(2, fakeExecutor.callCount) - r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[0]) + r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[0]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[0]) r.Equal([]string{"cat", "/etc/kubernetes/ca/ca.pem"}, fakeExecutor.calledWithCommandAndArgs[0]) - r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[1]) + r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[1]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[1]) r.Equal([]string{"cat", "/etc/kubernetes/ca/ca.key"}, fakeExecutor.calledWithCommandAndArgs[1]) @@ -256,7 +265,7 @@ func TestCA(t *testing.T) { it("logs an error message", func() { fakeTicker := make(chan time.Time) - subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) + subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() r.Equal(2, fakeExecutor.callCount) r.Equal(1, callbacks.NumberOfTimesSuccessCalled()) @@ -294,7 +303,7 @@ func TestCA(t *testing.T) { it("logs an error message and fails to issue certs until it can get the API server's keypair", func() { fakeTicker := make(chan time.Time) - subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) + subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() r.Equal(1, fakeExecutor.callCount) r.Equal(0, callbacks.NumberOfTimesSuccessCalled()) @@ -334,7 +343,7 @@ func TestCA(t *testing.T) { }) it("returns a CA who cannot issue certs", func() { - subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) + subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() requireInitialFailureLogMessage("could not load CA: tls: failed to find any PEM data in certificate input") requireNotCapableOfIssuingCerts(subject) @@ -350,7 +359,7 @@ func TestCA(t *testing.T) { }) it("returns a CA who cannot issue certs", func() { - subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) + subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() requireInitialFailureLogMessage("some error") requireNotCapableOfIssuingCerts(subject) @@ -366,7 +375,7 @@ func TestCA(t *testing.T) { }) it("returns a CA who cannot issue certs", func() { - subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) + subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() requireInitialFailureLogMessage("some error") requireNotCapableOfIssuingCerts(subject) @@ -377,72 +386,40 @@ func TestCA(t *testing.T) { }) }) - when("the kube-controller-manager pod is found with non-default CLI flag values", func() { + when("the agent pod is found with non-default CLI flag values", func() { it.Before(func() { - fakePod.Spec.Containers[0].Command = []string{ - "kube-controller-manager", - "--cluster-signing-cert-file=/etc/kubernetes/ca/non-default.pem", - } - fakePod.Spec.Containers[0].Args = []string{ - "--cluster-signing-key-file=/etc/kubernetes/ca/non-default.key", - } + fakePod.Annotations = make(map[string]string) + fakePod.Annotations[agentInfo.CertPathAnnotation] = "/etc/kubernetes/ca/non-default.pem" + fakePod.Annotations[agentInfo.KeyPathAnnotation] = "/etc/kubernetes/ca/non-default.key" err := kubeAPIClient.Tracker().Add(fakePod) r.NoError(err) }) it("finds the API server's signing key and uses it to issue certificates", func() { - _, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) + _, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() r.Equal(2, fakeExecutor.callCount) - r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[0]) + r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[0]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[0]) r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.pem"}, fakeExecutor.calledWithCommandAndArgs[0]) - r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[1]) + r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[1]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[1]) r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.key"}, fakeExecutor.calledWithCommandAndArgs[1]) }) }) - when("the kube-controller-manager pod is found with non-default CLI flag values separated by spaces", func() { - it.Before(func() { - fakePod.Spec.Containers[0].Command = []string{ - "kube-controller-manager", - "--cluster-signing-cert-file", "/etc/kubernetes/ca/non-default.pem", - "--cluster-signing-key-file", "/etc/kubernetes/ca/non-default.key", - "--foo=bar", - } - err := kubeAPIClient.Tracker().Add(fakePod) - r.NoError(err) - }) - - it("finds the API server's signing key and uses it to issue certificates", func() { - _, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) - defer shutdownFunc() - - r.Equal(2, fakeExecutor.callCount) - - r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[0]) - r.Equal("fake-pod", fakeExecutor.calledWithPodName[0]) - r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.pem"}, fakeExecutor.calledWithCommandAndArgs[0]) - - r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[1]) - r.Equal("fake-pod", fakeExecutor.calledWithPodName[1]) - r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.key"}, fakeExecutor.calledWithCommandAndArgs[1]) - }) - }) - - when("the kube-controller-manager pod is not found", func() { + when("the agent pod is not found", func() { it("returns an error", func() { - subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) + subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) defer shutdownFunc() - requireInitialFailureLogMessage("did not find kube-controller-manager pod") + requireInitialFailureLogMessage("did not find kube-cert-agent pod") requireNotCapableOfIssuingCerts(subject) r.Equal(0, callbacks.NumberOfTimesSuccessCalled()) r.Equal(1, callbacks.NumberOfTimesFailureCalled()) - r.EqualError(callbacks.FailureErrors()[0], "did not find kube-controller-manager pod") + r.EqualError(callbacks.FailureErrors()[0], "did not find kube-cert-agent pod") }) }) }, spec.Sequential(), spec.Report(report.Terminal{})) diff --git a/internal/controller/kubecertagent/annotater.go b/internal/controller/kubecertagent/annotater.go new file mode 100644 index 00000000..1c4bdcc3 --- /dev/null +++ b/internal/controller/kubecertagent/annotater.go @@ -0,0 +1,178 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "context" + "fmt" + + "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + + pinnipedcontroller "go.pinniped.dev/internal/controller" + "go.pinniped.dev/internal/controllerlib" +) + +type annotaterController struct { + agentInfo *Info + k8sClient kubernetes.Interface + podInformer corev1informers.PodInformer +} + +// NewAnnotaterController returns a controller that updates agent pods with the path to the kube +// API's certificate and key. +// +// This controller will add annotations to agent pods, using the provided +// agentInfo.CertPathAnnotation and agentInfo.KeyPathAnnotation annotation keys, with the best-guess +// paths to the kube API's certificate and key. +func NewAnnotaterController( + agentInfo *Info, + k8sClient kubernetes.Interface, + podInformer corev1informers.PodInformer, + withInformer pinnipedcontroller.WithInformerOptionFunc, +) controllerlib.Controller { + return controllerlib.New( + controllerlib.Config{ + Name: "kube-cert-agent-annotater-controller", + Syncer: &annotaterController{ + agentInfo: agentInfo, + k8sClient: k8sClient, + podInformer: podInformer, + }, + }, + withInformer( + podInformer, + pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { + return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels) + }), + controllerlib.InformerOption{}, + ), + ) +} + +// Sync implements controllerlib.Syncer. +func (c *annotaterController) Sync(ctx controllerlib.Context) error { + agentSelector := labels.SelectorFromSet(c.agentInfo.Template.Labels) + agentPods, err := c.podInformer. + Lister(). + Pods(ControllerManagerNamespace). + List(agentSelector) + if err != nil { + return fmt.Errorf("informer cannot list agent pods: %w", err) + } + + for _, agentPod := range agentPods { + controllerManagerPod, err := findControllerManagerPod(agentPod, c.podInformer) + if err != nil { + return err + } + if controllerManagerPod == nil { + // The deleter will clean this orphaned agent. + continue + } + + certPath, certPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-cert-file") + keyPath, keyPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-key-file") + if err := c.maybeUpdateAgentPod( + ctx.Context, + agentPod.Name, + certPath, + certPathOK, + keyPath, + keyPathOK, + ); err != nil { + return fmt.Errorf("cannot update agent pod: %w", err) + } + } + + return nil +} + +func (c *annotaterController) maybeUpdateAgentPod( + ctx context.Context, + name string, + certPath string, + certPathOK bool, + keyPath string, + keyPathOK bool, +) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + agentPod, err := c.podInformer.Lister().Pods(ControllerManagerNamespace).Get(name) + if err != nil { + return err + } + + if (certPathOK && agentPod.Annotations[c.agentInfo.CertPathAnnotation] != certPath) || + (keyPathOK && agentPod.Annotations[c.agentInfo.KeyPathAnnotation] != keyPath) { + if err := c.reallyUpdateAgentPod( + ctx, + agentPod, + certPath, + certPathOK, + keyPath, + keyPathOK, + ); err != nil { + return err + } + } + + return nil + }) +} + +func (c *annotaterController) reallyUpdateAgentPod( + ctx context.Context, + agentPod *corev1.Pod, + certPath string, + certPathOK bool, + keyPath string, + keyPathOK bool, +) error { + // Create a deep copy of the agent pod since it is coming straight from the cache. + updatedAgentPod := agentPod.DeepCopy() + if updatedAgentPod.Annotations == nil { + updatedAgentPod.Annotations = make(map[string]string) + } + if certPathOK { + updatedAgentPod.Annotations[c.agentInfo.CertPathAnnotation] = certPath + } + if keyPathOK { + updatedAgentPod.Annotations[c.agentInfo.KeyPathAnnotation] = keyPath + } + + klog.InfoS( + "updating agent pod annotations", + "pod", + klog.KObj(updatedAgentPod), + "certPath", + certPath, + "keyPath", + keyPath, + ) + _, err := c.k8sClient. + CoreV1(). + Pods(ControllerManagerNamespace). + Update(ctx, updatedAgentPod, metav1.UpdateOptions{}) + return err +} + +func getContainerArgByName(pod *corev1.Pod, name string) (string, bool) { + for _, container := range pod.Spec.Containers { + flagset := pflag.NewFlagSet("", pflag.ContinueOnError) + flagset.ParseErrorsWhitelist = pflag.ParseErrorsWhitelist{UnknownFlags: true} + var val string + flagset.StringVar(&val, name, "", "") + _ = flagset.Parse(append(container.Command, container.Args...)) + if val != "" { + return val, true + } + } + return "", false +} diff --git a/internal/controller/kubecertagent/annotater_test.go b/internal/controller/kubecertagent/annotater_test.go new file mode 100644 index 00000000..21d7c3ec --- /dev/null +++ b/internal/controller/kubecertagent/annotater_test.go @@ -0,0 +1,490 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "context" + "testing" + "time" + + "github.com/sclevine/spec" + "github.com/sclevine/spec/report" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + kubeinformers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + kubernetesfake "k8s.io/client-go/kubernetes/fake" + coretesting "k8s.io/client-go/testing" + + "go.pinniped.dev/internal/controllerlib" + "go.pinniped.dev/internal/testutil" +) + +func TestAnnotaterControllerFilter(t *testing.T) { + runFilterTest( + t, + "AnnotaterControllerFilter", + func( + agentPodTemplate *corev1.Pod, + podsInformer corev1informers.PodInformer, + observableWithInformerOption *testutil.ObservableWithInformerOption, + ) { + _ = NewAnnotaterController( + &Info{ + Template: agentPodTemplate, + }, + nil, // k8sClient, shouldn't matter + podsInformer, + observableWithInformerOption.WithInformer, + ) + }, + ) +} + +func TestAnnotaterControllerSync(t *testing.T) { + spec.Run(t, "AnnotaterControllerSync", func(t *testing.T, when spec.G, it spec.S) { + const kubeSystemNamespace = "kube-system" + + const ( + certPath = "some-cert-path" + certPathAnnotation = "some-cert-path-annotation" + + keyPath = "some-key-path" + keyPathAnnotation = "some-key-path-annotation" + ) + + var r *require.Assertions + + var subject controllerlib.Controller + var kubeAPIClient *kubernetesfake.Clientset + var kubeInformerClient *kubernetesfake.Clientset + var kubeInformers kubeinformers.SharedInformerFactory + var timeoutContext context.Context + var timeoutContextCancel context.CancelFunc + var syncContext *controllerlib.Context + + agentPodTemplate := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "some-agent-name-", + Labels: map[string]string{ + "some-label-key": "some-label-value", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "some-agent-image", + }, + }, + }, + } + + controllerManagerPod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: kubeSystemNamespace, + Name: "some-controller-manager-name", + Labels: map[string]string{ + "component": "kube-controller-manager", + }, + UID: types.UID("some-controller-manager-uid"), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "some-controller-manager-image", + Command: []string{ + "kube-controller-manager", + "--cluster-signing-cert-file=" + certPath, + "--cluster-signing-key-file=" + keyPath, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "some-volume-mount-name", + }, + }, + }, + }, + NodeName: "some-node-name", + NodeSelector: map[string]string{ + "some-node-selector-key": "some-node-selector-value", + }, + Tolerations: []corev1.Toleration{ + { + Key: "some-toleration", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + // fnv 32a hash of controller-manager uid + controllerManagerPodHash := "fbb0addd" + agentPod := agentPodTemplate.DeepCopy() + agentPod.Namespace = kubeSystemNamespace + agentPod.Name += controllerManagerPodHash + agentPod.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: controllerManagerPod.Name, + UID: controllerManagerPod.UID, + Controller: boolPtr(true), + BlockOwnerDeletion: boolPtr(true), + }, + } + agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts + agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever + agentPod.Spec.AutomountServiceAccountToken = boolPtr(false) + agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName + agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector + agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations + + podsGVR := schema.GroupVersionResource{ + Group: corev1.SchemeGroupVersion.Group, + Version: corev1.SchemeGroupVersion.Version, + Resource: "pods", + } + + // Defer starting the informers until the last possible moment so that the + // nested Before's can keep adding things to the informer caches. + var startInformersAndController = func() { + // Set this at the last second to allow for injection of server override. + subject = NewAnnotaterController( + &Info{ + Template: agentPodTemplate, + CertPathAnnotation: certPathAnnotation, + KeyPathAnnotation: keyPathAnnotation, + }, + kubeAPIClient, + kubeInformers.Core().V1().Pods(), + controllerlib.WithInformer, + ) + + // Set this at the last second to support calling subject.Name(). + syncContext = &controllerlib.Context{ + Context: timeoutContext, + Name: subject.Name(), + Key: controllerlib.Key{ + Namespace: kubeSystemNamespace, + Name: "should-not-matter", + }, + } + + // Must start informers before calling TestRunSynchronously() + kubeInformers.Start(timeoutContext.Done()) + controllerlib.TestRunSynchronously(t, subject) + } + + it.Before(func() { + r = require.New(t) + + timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3) + + kubeInformerClient = kubernetesfake.NewSimpleClientset() + kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0) + kubeAPIClient = kubernetesfake.NewSimpleClientset() + + // Add a pod into the test that doesn't matter to make sure we don't accidentally trigger any + // logic on this thing. + ignorablePod := corev1.Pod{} + ignorablePod.Name = "some-ignorable-pod" + r.NoError(kubeInformerClient.Tracker().Add(&ignorablePod)) + r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod)) + }) + + it.After(func() { + timeoutContextCancel() + }) + + when("there is an agent pod without annotations set", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(agentPod)) + r.NoError(kubeAPIClient.Tracker().Add(agentPod)) + }) + + when("there is a matching controller manager pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("updates the annotations according to the controller manager pod", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations = make(map[string]string) + updatedAgentPod.Annotations[certPathAnnotation] = certPath + updatedAgentPod.Annotations[keyPathAnnotation] = keyPath + + r.Equal( + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + kubeSystemNamespace, + updatedAgentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a controller manager pod with CLI flag values separated by spaces", func() { + it.Before(func() { + controllerManagerPod.Spec.Containers[0].Command = []string{ + "kube-controller-manager", + "--cluster-signing-cert-file", certPath, + "--cluster-signing-key-file", keyPath, + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("updates the annotations according to the controller manager pod", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations = make(map[string]string) + updatedAgentPod.Annotations[certPathAnnotation] = certPath + updatedAgentPod.Annotations[keyPathAnnotation] = keyPath + + r.Equal( + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + kubeSystemNamespace, + updatedAgentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a controller manager pod with unparsable CLI flags", func() { + it.Before(func() { + controllerManagerPod.Spec.Containers[0].Command = []string{ + "kube-controller-manager", + "--cluster-signing-cert-file-blah", certPath, + "--cluster-signing-key-file-blah", keyPath, + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("does not update any annotations", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a controller manager pod with unparsable cert CLI flag", func() { + it.Before(func() { + controllerManagerPod.Spec.Containers[0].Command = []string{ + "kube-controller-manager", + "--cluster-signing-cert-file-blah", certPath, + "--cluster-signing-key-file", keyPath, + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("updates the key annotation", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations = make(map[string]string) + updatedAgentPod.Annotations[keyPathAnnotation] = keyPath + r.Equal( + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + kubeSystemNamespace, + updatedAgentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a controller manager pod with unparsable keey CLI flag", func() { + it.Before(func() { + controllerManagerPod.Spec.Containers[0].Command = []string{ + "kube-controller-manager", + "--cluster-signing-cert-file", certPath, + "--cluster-signing-key-file-blah", keyPath, + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("updates the cert annotation", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations = make(map[string]string) + updatedAgentPod.Annotations[certPathAnnotation] = certPath + r.Equal( + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + kubeSystemNamespace, + updatedAgentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a non-matching controller manager pod via uid", func() { + it.Before(func() { + controllerManagerPod.UID = "some-other-controller-manager-uid" + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("does nothing; the deleter will delete this pod to trigger resync", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a non-matching controller manager pod via name", func() { + it.Before(func() { + controllerManagerPod.Name = "some-other-controller-manager-name" + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("does nothing; the deleter will delete this pod to trigger resync", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + }) + + when("there is an agent pod with correct annotations set", func() { + it.Before(func() { + agentPod.Annotations = make(map[string]string) + agentPod.Annotations[certPathAnnotation] = certPath + agentPod.Annotations[keyPathAnnotation] = keyPath + r.NoError(kubeInformerClient.Tracker().Add(agentPod)) + r.NoError(kubeAPIClient.Tracker().Add(agentPod)) + }) + + when("there is a matching controller manager pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("does nothing since the pod is up to date", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + }) + + when("there is an agent pod with the wrong cert annotation", func() { + it.Before(func() { + agentPod.Annotations = make(map[string]string) + agentPod.Annotations[certPathAnnotation] = "wrong" + agentPod.Annotations[keyPathAnnotation] = keyPath + r.NoError(kubeInformerClient.Tracker().Add(agentPod)) + r.NoError(kubeAPIClient.Tracker().Add(agentPod)) + }) + + when("there is a matching controller manager pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("updates the agent with the correct cert annotation", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations[certPathAnnotation] = certPath + r.Equal( + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + kubeSystemNamespace, + updatedAgentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + }) + + when("there is an agent pod with the wrong key annotation", func() { + it.Before(func() { + agentPod.Annotations = make(map[string]string) + agentPod.Annotations[certPathAnnotation] = certPath + agentPod.Annotations[keyPathAnnotation] = "key" + r.NoError(kubeInformerClient.Tracker().Add(agentPod)) + r.NoError(kubeAPIClient.Tracker().Add(agentPod)) + }) + + when("there is a matching controller manager pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("updates the agent with the correct key annotation", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations[keyPathAnnotation] = keyPath + r.Equal( + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + kubeSystemNamespace, + updatedAgentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + }) + }, spec.Parallel(), spec.Report(report.Terminal{})) +} diff --git a/internal/controller/kubecertagent/creater.go b/internal/controller/kubecertagent/creater.go new file mode 100644 index 00000000..52d21e3c --- /dev/null +++ b/internal/controller/kubecertagent/creater.go @@ -0,0 +1,95 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + pinnipedcontroller "go.pinniped.dev/internal/controller" + "go.pinniped.dev/internal/controllerlib" +) + +type createrController struct { + agentInfo *Info + k8sClient kubernetes.Interface + podInformer corev1informers.PodInformer +} + +// NewCreaterController returns a controller that creates new kube-cert-agent pods for every known +// kube-controller-manager pod. +// +// This controller only uses the Template field of the provided agentInfo. +func NewCreaterController( + agentInfo *Info, + k8sClient kubernetes.Interface, + podInformer corev1informers.PodInformer, + withInformer pinnipedcontroller.WithInformerOptionFunc, +) controllerlib.Controller { + return controllerlib.New( + controllerlib.Config{ + //nolint: misspell + Name: "kube-cert-agent-creater-controller", + Syncer: &createrController{ + agentInfo: agentInfo, + k8sClient: k8sClient, + podInformer: podInformer, + }, + }, + withInformer( + podInformer, + pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { + return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels) + }), + controllerlib.InformerOption{}, + ), + ) +} + +// Sync implements controllerlib.Syncer. +func (c *createrController) Sync(ctx controllerlib.Context) error { + controllerManagerSelector, err := labels.Parse("component=kube-controller-manager") + if err != nil { + return fmt.Errorf("cannot create controller manager selector: %w", err) + } + + controllerManagerPods, err := c.podInformer.Lister().List(controllerManagerSelector) + if err != nil { + return fmt.Errorf("informer cannot list controller manager pods: %w", err) + } + + for _, controllerManagerPod := range controllerManagerPods { + agentPod, err := findAgentPod(controllerManagerPod, c.podInformer, c.agentInfo.Template.Labels) + if err != nil { + return err + } + if agentPod == nil { + agentPod = newAgentPod(controllerManagerPod, c.agentInfo.Template) + + klog.InfoS( + "creating agent pod", + "pod", + klog.KObj(agentPod), + "controller", + klog.KObj(controllerManagerPod), + ) + _, err := c.k8sClient.CoreV1(). + Pods(ControllerManagerNamespace). + Create(ctx.Context, agentPod, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("cannot create agent pod: %w", err) + } + } + + // The deleter controller handles the case where the expected fields do not match in the agent + // pod. + } + + return nil +} diff --git a/internal/controller/kubecertagent/creater_test.go b/internal/controller/kubecertagent/creater_test.go new file mode 100644 index 00000000..2eec27bf --- /dev/null +++ b/internal/controller/kubecertagent/creater_test.go @@ -0,0 +1,286 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "context" + "testing" + "time" + + "github.com/sclevine/spec" + "github.com/sclevine/spec/report" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + kubeinformers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + kubernetesfake "k8s.io/client-go/kubernetes/fake" + coretesting "k8s.io/client-go/testing" + + "go.pinniped.dev/internal/controllerlib" + "go.pinniped.dev/internal/testutil" +) + +func TestCreaterControllerFilter(t *testing.T) { + runFilterTest( + t, + "CreaterControllerFilter", + func( + agentPodTemplate *corev1.Pod, + podsInformer corev1informers.PodInformer, + observableWithInformerOption *testutil.ObservableWithInformerOption, + ) { + _ = NewCreaterController( + &Info{ + Template: agentPodTemplate, + }, + nil, // k8sClient, shouldn't matter + podsInformer, + observableWithInformerOption.WithInformer, + ) + }, + ) +} + +func TestCreaterControllerSync(t *testing.T) { + spec.Run(t, "CreaterControllerSync", func(t *testing.T, when spec.G, it spec.S) { + const kubeSystemNamespace = "kube-system" + + var r *require.Assertions + + var subject controllerlib.Controller + var kubeAPIClient *kubernetesfake.Clientset + var kubeInformerClient *kubernetesfake.Clientset + var kubeInformers kubeinformers.SharedInformerFactory + var timeoutContext context.Context + var timeoutContextCancel context.CancelFunc + var syncContext *controllerlib.Context + + agentPodTemplate := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "some-agent-name-", + Labels: map[string]string{ + "some-label-key": "some-label-value", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "some-agent-image", + }, + }, + }, + } + + controllerManagerPod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: kubeSystemNamespace, + Name: "some-controller-manager-name", + Labels: map[string]string{ + "component": "kube-controller-manager", + }, + UID: types.UID("some-controller-manager-uid"), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "some-controller-manager-image", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "some-volume-mount-name", + }, + }, + }, + }, + NodeName: "some-node-name", + NodeSelector: map[string]string{ + "some-node-selector-key": "some-node-selector-value", + }, + Tolerations: []corev1.Toleration{ + { + Key: "some-toleration", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + // fnv 32a hash of controller-manager uid + controllerManagerPodHash := "fbb0addd" + agentPod := agentPodTemplate.DeepCopy() + agentPod.Namespace = kubeSystemNamespace + agentPod.Name += controllerManagerPodHash + agentPod.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: controllerManagerPod.Name, + UID: controllerManagerPod.UID, + Controller: boolPtr(true), + BlockOwnerDeletion: boolPtr(true), + }, + } + agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts + agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever + agentPod.Spec.AutomountServiceAccountToken = boolPtr(false) + agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName + agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector + agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations + + podsGVR := schema.GroupVersionResource{ + Group: corev1.SchemeGroupVersion.Group, + Version: corev1.SchemeGroupVersion.Version, + Resource: "pods", + } + + // Defer starting the informers until the last possible moment so that the + // nested Before's can keep adding things to the informer caches. + var startInformersAndController = func() { + // Set this at the last second to allow for injection of server override. + subject = NewCreaterController( + &Info{ + Template: agentPodTemplate, + }, + kubeAPIClient, + kubeInformers.Core().V1().Pods(), + controllerlib.WithInformer, + ) + + // Set this at the last second to support calling subject.Name(). + syncContext = &controllerlib.Context{ + Context: timeoutContext, + Name: subject.Name(), + Key: controllerlib.Key{ + Namespace: kubeSystemNamespace, + Name: "should-not-matter", + }, + } + + // Must start informers before calling TestRunSynchronously() + kubeInformers.Start(timeoutContext.Done()) + controllerlib.TestRunSynchronously(t, subject) + } + + it.Before(func() { + r = require.New(t) + + timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3) + + kubeInformerClient = kubernetesfake.NewSimpleClientset() + kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0) + kubeAPIClient = kubernetesfake.NewSimpleClientset() + + // Add a pod into the test that doesn't matter to make sure we don't accidentally trigger any + // logic on this thing. + ignorablePod := corev1.Pod{} + ignorablePod.Name = "some-ignorable-pod" + r.NoError(kubeInformerClient.Tracker().Add(&ignorablePod)) + r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod)) + + // Add another valid agent pod to make sure our logic works for just the pod we care about. + otherAgentPod := agentPod.DeepCopy() + otherAgentPod.Name = "some-other-agent" + otherAgentPod.OwnerReferences[0].Name = "some-other-controller-manager-name" + otherAgentPod.OwnerReferences[0].UID = "some-other-controller-manager-uid" + r.NoError(kubeInformerClient.Tracker().Add(otherAgentPod)) + r.NoError(kubeAPIClient.Tracker().Add(otherAgentPod)) + }) + + it.After(func() { + timeoutContextCancel() + }) + + when("there is a controller manager pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + when("there is a matching agent pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(agentPod)) + r.NoError(kubeAPIClient.Tracker().Add(agentPod)) + }) + + it("does nothing", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a non-matching agent pod", func() { + it.Before(func() { + nonMatchingAgentPod := agentPod.DeepCopy() + nonMatchingAgentPod.Name = "some-agent-name-85da432e" + nonMatchingAgentPod.OwnerReferences[0].UID = "some-non-matching-uid" + r.NoError(kubeInformerClient.Tracker().Add(nonMatchingAgentPod)) + r.NoError(kubeAPIClient.Tracker().Add(nonMatchingAgentPod)) + }) + + it("creates a matching agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewCreateAction( + podsGVR, + kubeSystemNamespace, + agentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is no matching agent pod", func() { + it("creates a matching agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewCreateAction( + podsGVR, + kubeSystemNamespace, + agentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + }) + + when("there is no controller manager pod", func() { + it("does nothing; the deleter controller will cleanup any leftover agent pods", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + }, spec.Parallel(), spec.Report(report.Terminal{})) +} diff --git a/internal/controller/kubecertagent/deleter.go b/internal/controller/kubecertagent/deleter.go new file mode 100644 index 00000000..240cb534 --- /dev/null +++ b/internal/controller/kubecertagent/deleter.go @@ -0,0 +1,84 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + pinnipedcontroller "go.pinniped.dev/internal/controller" + "go.pinniped.dev/internal/controllerlib" +) + +type deleterController struct { + agentInfo *Info + k8sClient kubernetes.Interface + podInformer corev1informers.PodInformer +} + +// NewDeleterController returns a controller that deletes any kube-cert-agent pods that are out of +// sync with the known kube-controller-manager pods. +// +// This controller only uses the Template field of the provided agentInfo. +func NewDeleterController( + agentInfo *Info, + k8sClient kubernetes.Interface, + podInformer corev1informers.PodInformer, + withInformer pinnipedcontroller.WithInformerOptionFunc, +) controllerlib.Controller { + return controllerlib.New( + controllerlib.Config{ + Name: "kube-cert-agent-deleter-controller", + Syncer: &deleterController{ + agentInfo: agentInfo, + k8sClient: k8sClient, + podInformer: podInformer, + }, + }, + withInformer( + podInformer, + pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { + return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels) + }), + controllerlib.InformerOption{}, + ), + ) +} + +// Sync implements controllerlib.Syncer. +func (c *deleterController) Sync(ctx controllerlib.Context) error { + agentSelector := labels.SelectorFromSet(c.agentInfo.Template.Labels) + agentPods, err := c.podInformer. + Lister(). + Pods(ControllerManagerNamespace). + List(agentSelector) + if err != nil { + return fmt.Errorf("informer cannot list agent pods: %w", err) + } + + for _, agentPod := range agentPods { + controllerManagerPod, err := findControllerManagerPod(agentPod, c.podInformer) + if err != nil { + return err + } + if controllerManagerPod == nil || + !isAgentPodUpToDate(agentPod, newAgentPod(controllerManagerPod, c.agentInfo.Template)) { + klog.InfoS("deleting agent pod", "pod", klog.KObj(agentPod)) + err := c.k8sClient. + CoreV1(). + Pods(ControllerManagerNamespace). + Delete(ctx.Context, agentPod.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("cannot delete agent pod: %w", err) + } + } + } + + return nil +} diff --git a/internal/controller/kubecertagent/deleter_test.go b/internal/controller/kubecertagent/deleter_test.go new file mode 100644 index 00000000..3e1d2c89 --- /dev/null +++ b/internal/controller/kubecertagent/deleter_test.go @@ -0,0 +1,491 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "context" + "testing" + "time" + + "github.com/sclevine/spec" + "github.com/sclevine/spec/report" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + kubeinformers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + kubernetesfake "k8s.io/client-go/kubernetes/fake" + coretesting "k8s.io/client-go/testing" + + "go.pinniped.dev/internal/controllerlib" + "go.pinniped.dev/internal/testutil" +) + +func TestDeleterControllerFilter(t *testing.T) { + runFilterTest( + t, + "DeleterControllerFilter", + func( + agentPodTemplate *corev1.Pod, + podsInformer corev1informers.PodInformer, + observableWithInformerOption *testutil.ObservableWithInformerOption, + ) { + _ = NewDeleterController( + &Info{ + Template: agentPodTemplate, + }, + nil, // k8sClient, shouldn't matter + podsInformer, + observableWithInformerOption.WithInformer, + ) + }, + ) +} + +func TestDeleterControllerSync(t *testing.T) { + spec.Run(t, "DeleterControllerSync", func(t *testing.T, when spec.G, it spec.S) { + const kubeSystemNamespace = "kube-system" + + var r *require.Assertions + + var subject controllerlib.Controller + var kubeAPIClient *kubernetesfake.Clientset + var kubeInformerClient *kubernetesfake.Clientset + var kubeInformers kubeinformers.SharedInformerFactory + var timeoutContext context.Context + var timeoutContextCancel context.CancelFunc + var syncContext *controllerlib.Context + + agentPodTemplate := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "some-agent-name-", + Labels: map[string]string{ + "some-label-key": "some-label-value", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "some-agent-image", + }, + }, + }, + } + + controllerManagerPod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: kubeSystemNamespace, + Name: "some-controller-manager-name", + Labels: map[string]string{ + "component": "kube-controller-manager", + }, + UID: types.UID("some-controller-manager-uid"), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "some-controller-manager-image", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "some-volume-mount-name", + }, + }, + }, + }, + NodeName: "some-node-name", + NodeSelector: map[string]string{ + "some-node-selector-key": "some-node-selector-value", + }, + Tolerations: []corev1.Toleration{ + { + Key: "some-toleration", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + podsGVR := schema.GroupVersionResource{ + Group: corev1.SchemeGroupVersion.Group, + Version: corev1.SchemeGroupVersion.Version, + Resource: "pods", + } + + // fnv 32a hash of controller-manager uid + controllerManagerPodHash := "fbb0addd" + agentPod := agentPodTemplate.DeepCopy() + agentPod.Namespace = kubeSystemNamespace + agentPod.Name += controllerManagerPodHash + agentPod.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: controllerManagerPod.Name, + UID: controllerManagerPod.UID, + Controller: boolPtr(true), + BlockOwnerDeletion: boolPtr(true), + }, + } + agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts + agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever + agentPod.Spec.AutomountServiceAccountToken = boolPtr(false) + agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName + agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector + agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations + + // Defer starting the informers until the last possible moment so that the + // nested Before's can keep adding things to the informer caches. + var startInformersAndController = func() { + // Set this at the last second to allow for injection of server override. + subject = NewDeleterController( + &Info{ + Template: agentPodTemplate, + }, + kubeAPIClient, + kubeInformers.Core().V1().Pods(), + controllerlib.WithInformer, + ) + + // Set this at the last second to support calling subject.Name(). + syncContext = &controllerlib.Context{ + Context: timeoutContext, + Name: subject.Name(), + Key: controllerlib.Key{ + Namespace: kubeSystemNamespace, + Name: "should-not-matter", + }, + } + + // Must start informers before calling TestRunSynchronously() + kubeInformers.Start(timeoutContext.Done()) + controllerlib.TestRunSynchronously(t, subject) + } + + it.Before(func() { + r = require.New(t) + + timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3) + + kubeInformerClient = kubernetesfake.NewSimpleClientset() + kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0) + kubeAPIClient = kubernetesfake.NewSimpleClientset() + + // Add an pod into the test that doesn't matter to make sure we don't accidentally + // trigger any logic on this thing. + ignorablePod := corev1.Pod{} + ignorablePod.Name = "some-ignorable-pod" + r.NoError(kubeInformerClient.Tracker().Add(&ignorablePod)) + r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod)) + }) + + it.After(func() { + timeoutContextCancel() + }) + + when("there is an agent pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(agentPod)) + r.NoError(kubeAPIClient.Tracker().Add(agentPod)) + }) + + when("there is a matching controller manager pod", func() { + it.Before(func() { + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("does nothing", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a non-matching controller manager pod via uid", func() { + it.Before(func() { + controllerManagerPod.UID = "some-other-controller-manager-uid" + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is a non-matching controller manager pod via name", func() { + it.Before(func() { + controllerManagerPod.Name = "some-other-controller-manager-name" + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("the agent pod is out of sync with the controller manager via volume mounts", func() { + it.Before(func() { + controllerManagerPod.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{ + { + Name: "some-other-volume-mount", + }, + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("the agent pod is out of sync with the controller manager via volumes", func() { + it.Before(func() { + controllerManagerPod.Spec.Volumes = []corev1.Volume{ + { + Name: "some-other-volume", + }, + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("the agent pod is out of sync with the controller manager via node selector", func() { + it.Before(func() { + controllerManagerPod.Spec.NodeSelector = map[string]string{ + "some-other-node-selector-key": "some-other-node-selector-value", + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("the agent pod is out of sync with the controller manager via node name", func() { + it.Before(func() { + controllerManagerPod.Spec.NodeName = "some-other-node-name" + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("the agent pod is out of sync with the controller manager via tolerations", func() { + it.Before(func() { + controllerManagerPod.Spec.Tolerations = []corev1.Toleration{ + { + Key: "some-other-toleration-key", + }, + } + r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("the agent pod is out of sync via restart policy", func() { + it.Before(func() { + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Spec.RestartPolicy = corev1.RestartPolicyAlways + r.NoError(kubeInformerClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace)) + r.NoError(kubeAPIClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace)) + }) + + it.Pend("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("the agent pod is out of sync via automount service account tokem", func() { + it.Before(func() { + agentPod.Spec.AutomountServiceAccountToken = boolPtr(true) + r.NoError(kubeInformerClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace)) + r.NoError(kubeAPIClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace)) + }) + + it.Pend("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + + when("there is no matching controller manager pod", func() { + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + kubeSystemNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + }) + + when("there is no agent pod", func() { + it("does nothing", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{}, + kubeAPIClient.Actions(), + ) + }) + }) + }, spec.Parallel(), spec.Report(report.Terminal{})) +} diff --git a/internal/controller/kubecertagent/kubecertagent.go b/internal/controller/kubecertagent/kubecertagent.go new file mode 100644 index 00000000..82eb3e8f --- /dev/null +++ b/internal/controller/kubecertagent/kubecertagent.go @@ -0,0 +1,214 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Package kubecertagent provides controllers that ensure a set of pods (the kube-cert-agent), is +// colocated with the Kubernetes controller manager so that Pinniped can access its signing keys. +// +// Note: the controllers use a filter that accepts all pods that look like the controller manager or +// an agent pod, across any add/update/delete event. Each of the controllers only care about a +// subset of these events in reality, but the liberal filter implementation serves as an MVP. +package kubecertagent + +import ( + "encoding/hex" + "fmt" + "hash/fnv" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + corev1informers "k8s.io/client-go/informers/core/v1" + "k8s.io/klog/v2" +) + +const ( + // ControllerManagerNamespace is the assumed namespace of the kube-controller-manager pod(s). + ControllerManagerNamespace = "kube-system" +) + +// Info holds necessary information about the agent pod. It was pulled out into a struct to have a +// common parameter for each controller. +type Info struct { + // Template is an injection point for pod fields. The required pod fields are as follows. + // .Name: serves as the name prefix for each of the agent pods + // .Labels: serves as a way to filter for agent pods + // .Spec.Containers[0].Image: serves as the container image for the agent pods + // .Spec.Containers[0].Command: serves as the container command for the agent pods + Template *corev1.Pod + + // CertPathAnnotation is the name of the annotation key that will be used when setting the + // best-guess path to the kube API's certificate. + CertPathAnnotation string + + // KeyPathAnnotation is the name of the annotation key that will be used when setting the + // best-guess path to the kube API's private key. + KeyPathAnnotation string +} + +func isControllerManagerPod(obj metav1.Object) bool { + pod, ok := obj.(*corev1.Pod) + if !ok { + return false + } + + if pod.Labels == nil { + return false + } + + component, ok := pod.Labels["component"] + if !ok || component != "kube-controller-manager" { + return false + } + + if pod.Status.Phase != corev1.PodRunning { + return false + } + + return true +} + +func isAgentPod(obj metav1.Object, agentLabels map[string]string) bool { + for agentLabelKey, agentLabelValue := range agentLabels { + v, ok := obj.GetLabels()[agentLabelKey] + if !ok { + return false + } + if v != agentLabelValue { + return false + } + } + return true +} + +func newAgentPod( + controllerManagerPod *corev1.Pod, + template *corev1.Pod, +) *corev1.Pod { + agentPod := template.DeepCopy() + + agentPod.Name = fmt.Sprintf("%s%s", agentPod.Name, hash(controllerManagerPod)) + + // controller manager namespace because it is our owner + agentPod.Namespace = ControllerManagerNamespace + agentPod.OwnerReferences = []metav1.OwnerReference{ + *metav1.NewControllerRef( + controllerManagerPod, + schema.GroupVersionKind{ + Group: corev1.SchemeGroupVersion.Group, + Version: corev1.SchemeGroupVersion.Version, + Kind: "Pod", + }, + ), + } + + agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts + agentPod.Spec.Volumes = controllerManagerPod.Spec.Volumes + agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever + agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector + agentPod.Spec.AutomountServiceAccountToken = boolPtr(false) + agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName + agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations + + return agentPod +} + +func isAgentPodUpToDate(actualAgentPod, expectedAgentPod *corev1.Pod) bool { + return equality.Semantic.DeepEqual( + actualAgentPod.Spec.Containers[0].VolumeMounts, + expectedAgentPod.Spec.Containers[0].VolumeMounts, + ) && + equality.Semantic.DeepEqual( + actualAgentPod.Spec.Volumes, + expectedAgentPod.Spec.Volumes, + ) && + equality.Semantic.DeepEqual( + actualAgentPod.Spec.RestartPolicy, + expectedAgentPod.Spec.RestartPolicy, + ) && + equality.Semantic.DeepEqual( + actualAgentPod.Spec.NodeSelector, + expectedAgentPod.Spec.NodeSelector, + ) && + equality.Semantic.DeepEqual( + actualAgentPod.Spec.AutomountServiceAccountToken, + expectedAgentPod.Spec.AutomountServiceAccountToken, + ) && + equality.Semantic.DeepEqual( + actualAgentPod.Spec.NodeName, + expectedAgentPod.Spec.NodeName, + ) && + equality.Semantic.DeepEqual( + actualAgentPod.Spec.Tolerations, + expectedAgentPod.Spec.Tolerations, + ) +} + +func findAgentPod( + controllerManagerPod *corev1.Pod, + informer corev1informers.PodInformer, + agentLabels map[string]string, +) (*corev1.Pod, error) { + agentSelector := labels.SelectorFromSet(agentLabels) + agentPods, err := informer. + Lister(). + Pods(ControllerManagerNamespace). + List(agentSelector) + if err != nil { + return nil, fmt.Errorf("informer cannot list agent pods: %w", err) + } + + for _, maybeAgentPod := range agentPods { + maybeControllerManagerPod, err := findControllerManagerPod( + maybeAgentPod, + informer, + ) + if err != nil { + return nil, err + } + if maybeControllerManagerPod != nil && + maybeControllerManagerPod.UID == controllerManagerPod.UID { + return maybeAgentPod, nil + } + } + + return nil, nil +} + +func findControllerManagerPod( + agentPod *corev1.Pod, + informer corev1informers.PodInformer, +) (*corev1.Pod, error) { + controller := metav1.GetControllerOf(agentPod) + if controller == nil { + klog.InfoS("found orphan agent pod", "pod", klog.KObj(agentPod)) + return nil, nil + } + + maybeControllerManagerPod, err := informer. + Lister(). + Pods(ControllerManagerNamespace). + Get(controller.Name) + notFound := k8serrors.IsNotFound(err) + if err != nil && !notFound { + return nil, fmt.Errorf("cannot get controller pod: %w", err) + } else if notFound || + maybeControllerManagerPod == nil || + maybeControllerManagerPod.UID != controller.UID { + return nil, nil + } + + return maybeControllerManagerPod, nil +} + +func boolPtr(b bool) *bool { return &b } + +func hash(controllerManagerPod *corev1.Pod) string { + // FNV should be faster than SHA, and we don't care about hash-reversibility here, and Kubernetes + // uses FNV for their pod templates, so should be good enough for us? + h := fnv.New32a() + _, _ = h.Write([]byte(controllerManagerPod.UID)) // Never returns an error, per godoc. + return hex.EncodeToString(h.Sum([]byte{})) +} diff --git a/internal/controller/kubecertagent/kubecertagent_test.go b/internal/controller/kubecertagent/kubecertagent_test.go new file mode 100644 index 00000000..15f6d08b --- /dev/null +++ b/internal/controller/kubecertagent/kubecertagent_test.go @@ -0,0 +1,138 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "testing" + + "github.com/sclevine/spec" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeinformers "k8s.io/client-go/informers" + corev1informers "k8s.io/client-go/informers/core/v1" + + "go.pinniped.dev/internal/controllerlib" + "go.pinniped.dev/internal/testutil" +) + +func runFilterTest( + t *testing.T, + name string, + newFunc func( + agentPodTemplate *corev1.Pod, + podsInformer corev1informers.PodInformer, + observableWithInformerOption *testutil.ObservableWithInformerOption, + ), +) { + spec.Run(t, name, func(t *testing.T, when spec.G, it spec.S) { + var r *require.Assertions + var subject controllerlib.Filter + + whateverPod := &corev1.Pod{} + + it.Before(func() { + r = require.New(t) + + agentPodTemplate := &corev1.Pod{} + agentPodTemplate.Labels = map[string]string{ + "some-label-key": "some-label-value", + "some-other-label-key": "some-other-label-value", + } + podsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Pods() + observableWithInformerOption := testutil.NewObservableWithInformerOption() + newFunc(agentPodTemplate, podsInformer, observableWithInformerOption) + + subject = observableWithInformerOption.GetFilterForInformer(podsInformer) + }) + + when("a pod with the proper controller manager labels and phase is added/updated/deleted", func() { + it("returns true", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "component": "kube-controller-manager", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + r.True(subject.Add(pod)) + r.True(subject.Update(whateverPod, pod)) + r.True(subject.Update(pod, whateverPod)) + r.True(subject.Delete(pod)) + }) + }) + + when("a pod without the proper controller manager label is added/updated/deleted", func() { + it("returns false", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + r.False(subject.Add(pod)) + r.False(subject.Update(whateverPod, pod)) + r.False(subject.Update(pod, whateverPod)) + r.False(subject.Delete(pod)) + }) + }) + + when("a pod without the proper controller manager phase is added/updated/deleted", func() { + it("returns false", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "component": "kube-controller-manager", + }, + }, + } + + r.False(subject.Add(pod)) + r.False(subject.Update(whateverPod, pod)) + r.False(subject.Update(pod, whateverPod)) + r.False(subject.Delete(pod)) + }) + }) + + when("a pod with all the agent labels is added/updated/deleted", func() { + it("returns true", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "some-label-key": "some-label-value", + "some-other-label-key": "some-other-label-value", + }, + }, + } + + r.True(subject.Add(pod)) + r.True(subject.Update(whateverPod, pod)) + r.True(subject.Update(pod, whateverPod)) + r.True(subject.Delete(pod)) + }) + }) + + when("a pod missing one of the agent labels is added/updated/deleted", func() { + it("returns false", func() { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "some-other-label-key": "some-other-label-value", + }, + }, + } + + r.False(subject.Add(pod)) + r.False(subject.Update(whateverPod, pod)) + r.False(subject.Update(pod, whateverPod)) + r.False(subject.Delete(pod)) + }) + }) + }) +} diff --git a/internal/controllermanager/prepare_controllers.go b/internal/controllermanager/prepare_controllers.go index 4a1bc0df..c1162058 100644 --- a/internal/controllermanager/prepare_controllers.go +++ b/internal/controllermanager/prepare_controllers.go @@ -1,6 +1,8 @@ // Copyright 2020 the Pinniped contributors. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +// Package controllermanager provides an entrypoint into running all of the controllers that run as +// a part of Pinniped. package controllermanager import ( @@ -8,6 +10,7 @@ import ( "fmt" "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -23,6 +26,7 @@ import ( "go.pinniped.dev/internal/controller/identityprovider/webhookcachecleaner" "go.pinniped.dev/internal/controller/identityprovider/webhookcachefiller" "go.pinniped.dev/internal/controller/issuerconfig" + "go.pinniped.dev/internal/controller/kubecertagent" "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/provider" "go.pinniped.dev/pkg/config/api" @@ -33,16 +37,49 @@ const ( defaultResyncInterval = 3 * time.Minute ) +// Config holds all the input parameters to the set of controllers run as a part of Pinniped. +// +// It is used to inject parameters into PrepareControllers. +type Config struct { + // ServerInstallationNamespace provides the namespace in which Pinniped is deployed. + ServerInstallationNamespace string + + // NamesConfig comes from the Pinniped config API (see api.Config). It specifies how Kubernetes + // objects should be named. + NamesConfig *api.NamesConfigSpec + + // DiscoveryURLOverride allows a caller to inject a hardcoded discovery URL into Pinniped + // discovery document. + DiscoveryURLOverride *string + + // DynamicCertProvider provides a setter and a getter to the Pinniped API's serving cert. + DynamicCertProvider provider.DynamicTLSServingCertProvider + + // ServingCertDuration is the validity period, in seconds, of the API serving certificate. + ServingCertDuration time.Duration + // ServingCertRenewBefore is the period of time, in seconds, that pinniped will wait before + // rotating the serving certificate. This period of time starts upon issuance of the serving + // certificate. + ServingCertRenewBefore time.Duration + + // IDPCache is a cache of authenticators shared amongst various IDP-related controllers. + IDPCache *idpcache.Cache + + // KubeCertAgentTemplate is the template from which the kube-cert-agent controllers will create a + // kube-cert-agent pod. See kubecertagent.Info for more details. + KubeCertAgentTemplate *corev1.Pod + // KubeCertAgentCertPathAnnotation is the name of the annotation key that will be used when + // setting the best-guess path to the kube API's certificate. See kubecertagent.Info for more + // details. + KubeCertAgentCertPathAnnotation string + // KubeCertAgentKeyPathAnnotation is the name of the annotation key that will be used when setting + // the best-guess path to the kube API's key. See kubecertagent.Info for more details. + KubeCertAgentKeyPathAnnotation string +} + // Prepare the controllers and their informers and return a function that will start them when called. -func PrepareControllers( - serverInstallationNamespace string, - namesConfig api.NamesConfigSpec, - discoveryURLOverride *string, - dynamicCertProvider provider.DynamicTLSServingCertProvider, - servingCertDuration time.Duration, - servingCertRenewBefore time.Duration, - idpCache *idpcache.Cache, -) (func(ctx context.Context), error) { +//nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so... +func PrepareControllers(c *Config) (func(ctx context.Context), error) { // Create k8s clients. k8sClient, aggregatorClient, pinnipedClient, err := createClients() if err != nil { @@ -50,96 +87,124 @@ func PrepareControllers( } // Create informers. Don't forget to make sure they get started in the function returned below. - kubePublicNamespaceK8sInformers, installationNamespaceK8sInformers, installationNamespacePinnipedInformers := - createInformers(serverInstallationNamespace, k8sClient, pinnipedClient) + informers := createInformers(c.ServerInstallationNamespace, k8sClient, pinnipedClient) // Create controller manager. controllerManager := controllerlib. NewManager(). WithController( - issuerconfig.NewPublisherController(serverInstallationNamespace, - namesConfig.CredentialIssuerConfig, - discoveryURLOverride, + issuerconfig.NewPublisherController( + c.ServerInstallationNamespace, + c.NamesConfig.CredentialIssuerConfig, + c.DiscoveryURLOverride, pinnipedClient, - kubePublicNamespaceK8sInformers.Core().V1().ConfigMaps(), - installationNamespacePinnipedInformers.Config().V1alpha1().CredentialIssuerConfigs(), + informers.kubePublicNamespaceK8s.Core().V1().ConfigMaps(), + informers.installationNamespacePinniped.Config().V1alpha1().CredentialIssuerConfigs(), controllerlib.WithInformer, ), singletonWorker, ). WithController( apicerts.NewCertsManagerController( - serverInstallationNamespace, - namesConfig.ServingCertificateSecret, + c.ServerInstallationNamespace, + c.NamesConfig.ServingCertificateSecret, k8sClient, - installationNamespaceK8sInformers.Core().V1().Secrets(), + informers.installationNamespaceK8s.Core().V1().Secrets(), controllerlib.WithInformer, controllerlib.WithInitialEvent, - servingCertDuration, + c.ServingCertDuration, "Pinniped CA", - namesConfig.APIService, + c.NamesConfig.APIService, ), singletonWorker, ). WithController( apicerts.NewAPIServiceUpdaterController( - serverInstallationNamespace, - namesConfig.ServingCertificateSecret, + c.ServerInstallationNamespace, + c.NamesConfig.ServingCertificateSecret, loginv1alpha1.SchemeGroupVersion.Version+"."+loginv1alpha1.GroupName, aggregatorClient, - installationNamespaceK8sInformers.Core().V1().Secrets(), + informers.installationNamespaceK8s.Core().V1().Secrets(), controllerlib.WithInformer, ), singletonWorker, ). WithController( apicerts.NewCertsObserverController( - serverInstallationNamespace, - namesConfig.ServingCertificateSecret, - dynamicCertProvider, - installationNamespaceK8sInformers.Core().V1().Secrets(), + c.ServerInstallationNamespace, + c.NamesConfig.ServingCertificateSecret, + c.DynamicCertProvider, + informers.installationNamespaceK8s.Core().V1().Secrets(), controllerlib.WithInformer, ), singletonWorker, ). WithController( apicerts.NewCertsExpirerController( - serverInstallationNamespace, - namesConfig.ServingCertificateSecret, + c.ServerInstallationNamespace, + c.NamesConfig.ServingCertificateSecret, k8sClient, - installationNamespaceK8sInformers.Core().V1().Secrets(), + informers.installationNamespaceK8s.Core().V1().Secrets(), controllerlib.WithInformer, - servingCertRenewBefore, + c.ServingCertRenewBefore, ), singletonWorker, ). WithController( webhookcachefiller.New( - idpCache, - installationNamespacePinnipedInformers.IDP().V1alpha1().WebhookIdentityProviders(), + c.IDPCache, + informers.installationNamespacePinniped.IDP().V1alpha1().WebhookIdentityProviders(), klogr.New(), ), singletonWorker, ). WithController( webhookcachecleaner.New( - idpCache, - installationNamespacePinnipedInformers.IDP().V1alpha1().WebhookIdentityProviders(), + c.IDPCache, + informers.installationNamespacePinniped.IDP().V1alpha1().WebhookIdentityProviders(), klogr.New(), ), singletonWorker, + ). + WithController( + kubecertagent.NewCreaterController( + &kubecertagent.Info{ + Template: c.KubeCertAgentTemplate, + }, + k8sClient, + informers.kubeSystemNamespaceK8s.Core().V1().Pods(), + controllerlib.WithInformer, + ), + singletonWorker, + ). + WithController( + kubecertagent.NewDeleterController( + &kubecertagent.Info{ + Template: c.KubeCertAgentTemplate, + }, + k8sClient, + informers.kubeSystemNamespaceK8s.Core().V1().Pods(), + controllerlib.WithInformer, + ), + singletonWorker, + ). + WithController( + kubecertagent.NewAnnotaterController( + &kubecertagent.Info{ + Template: c.KubeCertAgentTemplate, + CertPathAnnotation: c.KubeCertAgentCertPathAnnotation, + KeyPathAnnotation: c.KubeCertAgentKeyPathAnnotation, + }, + k8sClient, + informers.kubeSystemNamespaceK8s.Core().V1().Pods(), + controllerlib.WithInformer, + ), + singletonWorker, ) // Return a function which starts the informers and controllers. return func(ctx context.Context) { - kubePublicNamespaceK8sInformers.Start(ctx.Done()) - installationNamespaceK8sInformers.Start(ctx.Done()) - installationNamespacePinnipedInformers.Start(ctx.Done()) - - kubePublicNamespaceK8sInformers.WaitForCacheSync(ctx.Done()) - installationNamespaceK8sInformers.WaitForCacheSync(ctx.Done()) - installationNamespacePinnipedInformers.WaitForCacheSync(ctx.Done()) - + informers.startAndWaitForSync(ctx) go controllerManager.Start(ctx) }, nil } @@ -184,32 +249,53 @@ func createClients() ( return } +type informers struct { + kubePublicNamespaceK8s k8sinformers.SharedInformerFactory + kubeSystemNamespaceK8s k8sinformers.SharedInformerFactory + installationNamespaceK8s k8sinformers.SharedInformerFactory + installationNamespacePinniped pinnipedinformers.SharedInformerFactory +} + // Create the informers that will be used by the controllers. func createInformers( serverInstallationNamespace string, k8sClient *kubernetes.Clientset, pinnipedClient *pinnipedclientset.Clientset, -) ( - kubePublicNamespaceK8sInformers k8sinformers.SharedInformerFactory, - installationNamespaceK8sInformers k8sinformers.SharedInformerFactory, - installationNamespacePinnipedInformers pinnipedinformers.SharedInformerFactory, -) { - kubePublicNamespaceK8sInformers = k8sinformers.NewSharedInformerFactoryWithOptions( - k8sClient, - defaultResyncInterval, - k8sinformers.WithNamespace(issuerconfig.ClusterInfoNamespace), - ) - installationNamespaceK8sInformers = k8sinformers.NewSharedInformerFactoryWithOptions( - k8sClient, - defaultResyncInterval, - k8sinformers.WithNamespace(serverInstallationNamespace), - ) - installationNamespacePinnipedInformers = pinnipedinformers.NewSharedInformerFactoryWithOptions( - pinnipedClient, - defaultResyncInterval, - pinnipedinformers.WithNamespace(serverInstallationNamespace), - ) - return +) *informers { + return &informers{ + kubePublicNamespaceK8s: k8sinformers.NewSharedInformerFactoryWithOptions( + k8sClient, + defaultResyncInterval, + k8sinformers.WithNamespace(issuerconfig.ClusterInfoNamespace), + ), + kubeSystemNamespaceK8s: k8sinformers.NewSharedInformerFactoryWithOptions( + k8sClient, + defaultResyncInterval, + k8sinformers.WithNamespace(kubecertagent.ControllerManagerNamespace), + ), + installationNamespaceK8s: k8sinformers.NewSharedInformerFactoryWithOptions( + k8sClient, + defaultResyncInterval, + k8sinformers.WithNamespace(serverInstallationNamespace), + ), + installationNamespacePinniped: pinnipedinformers.NewSharedInformerFactoryWithOptions( + pinnipedClient, + defaultResyncInterval, + pinnipedinformers.WithNamespace(serverInstallationNamespace), + ), + } +} + +func (i *informers) startAndWaitForSync(ctx context.Context) { + i.kubePublicNamespaceK8s.Start(ctx.Done()) + i.kubeSystemNamespaceK8s.Start(ctx.Done()) + i.installationNamespaceK8s.Start(ctx.Done()) + i.installationNamespacePinniped.Start(ctx.Done()) + + i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done()) + i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done()) + i.installationNamespaceK8s.WaitForCacheSync(ctx.Done()) + i.installationNamespacePinniped.WaitForCacheSync(ctx.Done()) } // Returns a copy of the input config with the ContentConfig set to use protobuf. diff --git a/internal/server/server.go b/internal/server/server.go index 1420182d..aab7b084 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -11,6 +11,8 @@ import ( "time" "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/authentication/authenticator" genericapiserver "k8s.io/apiserver/pkg/server" @@ -32,6 +34,22 @@ import ( "go.pinniped.dev/internal/provider" "go.pinniped.dev/internal/registry/credentialrequest" "go.pinniped.dev/pkg/config" + configapi "go.pinniped.dev/pkg/config/api" +) + +// These constants are various label/annotation keys used in Pinniped. They are namespaced by +// a "pinniped.dev" child domain so they don't collide with other keys. +const ( + // kubeCertAgentLabelKey is used to identify which pods are created by the kube-cert-agent + // controllers. + kubeCertAgentLabelKey = "kube-cert-agent.pinniped.dev" + + // kubeCertAgentCertPathAnnotationKey is the annotation that the kube-cert-agent pod will use + // to communicate the in-pod path to the kube API's certificate. + kubeCertAgentCertPathAnnotationKey = "kube-cert-agent.pinniped.dev/cert-path" + // kubeCertAgentKeyPathAnnotationKey is the annotation that the kube-cert-agent pod will use + // to communicate the in-pod path to the kube API's key. + kubeCertAgentKeyPathAnnotationKey = "kube-cert-agent.pinniped.dev/key-path" ) // App is an object that represents the pinniped-server application. @@ -113,7 +131,15 @@ func (a *App) runServer(ctx context.Context) error { serverInstallationNamespace := podInfo.Namespace // Load the Kubernetes cluster signing CA. - k8sClusterCA, shutdownCA, err := getClusterCASigner(ctx, serverInstallationNamespace, cfg.NamesConfig.CredentialIssuerConfig) + kubeCertAgentTemplate, kubeCertAgentLabelSelector := createKubeCertAgentTemplate( + &cfg.KubeCertAgentConfig, + ) + k8sClusterCA, shutdownCA, err := getClusterCASigner( + ctx, + serverInstallationNamespace, + cfg.NamesConfig.CredentialIssuerConfig, + kubeCertAgentLabelSelector, + ) if err != nil { return err } @@ -132,13 +158,18 @@ func (a *App) runServer(ctx context.Context) error { // Prepare to start the controllers, but defer actually starting them until the // post start hook of the aggregated API server. startControllersFunc, err := controllermanager.PrepareControllers( - serverInstallationNamespace, - cfg.NamesConfig, - cfg.DiscoveryInfo.URL, - dynamicCertProvider, - time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds)*time.Second, - time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds)*time.Second, - idpCache, + &controllermanager.Config{ + ServerInstallationNamespace: serverInstallationNamespace, + NamesConfig: &cfg.NamesConfig, + DiscoveryURLOverride: cfg.DiscoveryInfo.URL, + DynamicCertProvider: dynamicCertProvider, + ServingCertDuration: time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds) * time.Second, + ServingCertRenewBefore: time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds) * time.Second, + IDPCache: idpCache, + KubeCertAgentTemplate: kubeCertAgentTemplate, + KubeCertAgentCertPathAnnotation: kubeCertAgentCertPathAnnotationKey, + KubeCertAgentKeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey, + }, ) if err != nil { return fmt.Errorf("could not prepare controllers: %w", err) @@ -168,6 +199,7 @@ func (a *App) runServer(ctx context.Context) error { func getClusterCASigner( ctx context.Context, serverInstallationNamespace string, credentialIssuerConfigResourceName string, + kubeCertAgentLabelSelector string, ) (credentialrequest.CertIssuer, kubecertauthority.ShutdownFunc, error) { // Load the Kubernetes client configuration. kubeConfig, err := restclient.InClusterConfig() @@ -191,7 +223,14 @@ func getClusterCASigner( ticker := time.NewTicker(5 * time.Minute) // Make a CA which uses the Kubernetes cluster API server's signing certs. + kubeCertAgentInfo := kubecertauthority.AgentInfo{ + Namespace: "kube-system", + LabelSelector: kubeCertAgentLabelSelector, + CertPathAnnotation: kubeCertAgentCertPathAnnotationKey, + KeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey, + } k8sClusterCA, shutdownCA := kubecertauthority.New( + &kubeCertAgentInfo, kubeClient, kubecertauthority.NewPodCommandExecutor(kubeConfig, kubeClient), ticker.C, @@ -282,3 +321,37 @@ func getAggregatedAPIServerConfig( } return apiServerConfig, nil } + +func createKubeCertAgentTemplate(cfg *configapi.KubeCertAgentSpec) (*corev1.Pod, string) { + terminateImmediately := int64(0) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: *cfg.NamePrefix, + Labels: map[string]string{ + kubeCertAgentLabelKey: "", + }, + }, + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: &terminateImmediately, + Containers: []corev1.Container{ + { + Name: "sleeper", + Image: *cfg.Image, + Command: []string{"/bin/sleep", "infinity"}, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("16Mi"), + corev1.ResourceCPU: resource.MustParse("10m"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("16Mi"), + corev1.ResourceCPU: resource.MustParse("10m"), + }, + }, + }, + }, + }, + } + labelSelector := kubeCertAgentLabelKey + "=" + return pod, labelSelector +} diff --git a/pkg/config/api/types.go b/pkg/config/api/types.go index 05723842..9e8857aa 100644 --- a/pkg/config/api/types.go +++ b/pkg/config/api/types.go @@ -5,9 +5,10 @@ package api // Config contains knobs to setup an instance of Pinniped. type Config struct { - DiscoveryInfo DiscoveryInfoSpec `json:"discovery"` - APIConfig APIConfigSpec `json:"api"` - NamesConfig NamesConfigSpec `json:"names"` + DiscoveryInfo DiscoveryInfoSpec `json:"discovery"` + APIConfig APIConfigSpec `json:"api"` + NamesConfig NamesConfigSpec `json:"names"` + KubeCertAgentConfig KubeCertAgentSpec `json:"kubeCertAgent"` } // DiscoveryInfoSpec contains configuration knobs specific to @@ -49,3 +50,15 @@ type ServingCertificateConfigSpec struct { // seconds (about 9 months). RenewBeforeSeconds *int64 `json:"renewBeforeSeconds,omitempty"` } + +type KubeCertAgentSpec struct { + // NamePrefix is the prefix of the name of the kube-cert-agent pods. For example, if this field is + // set to "some-prefix-", then the name of the pods will look like "some-prefix-blah". The default + // for this value is "pinniped-kube-cert-agent-". + NamePrefix *string `json:"namePrefix,omitempty"` + + // Image is the container image that will be used by the kube-cert-agent pod. The container image + // should contain at least 2 binaries: /bin/sleep and cat (somewhere on the $PATH). The default + // for this value is "debian:latest". + Image *string `json:"image"` +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 9cc6fdee..a7f4430c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -40,6 +40,7 @@ func FromPath(path string) (*api.Config, error) { } maybeSetAPIDefaults(&config.APIConfig) + maybeSetKubeCertAgentDefaults(&config.KubeCertAgentConfig) if err := validateAPI(&config.APIConfig); err != nil { return nil, fmt.Errorf("validate api: %w", err) @@ -62,6 +63,16 @@ func maybeSetAPIDefaults(apiConfig *api.APIConfigSpec) { } } +func maybeSetKubeCertAgentDefaults(cfg *api.KubeCertAgentSpec) { + if cfg.NamePrefix == nil { + cfg.NamePrefix = stringPtr("pinniped-kube-cert-agent-") + } + + if cfg.Image == nil { + cfg.Image = stringPtr("debian:latest") + } +} + func validateNames(names *api.NamesConfigSpec) error { missingNames := []string{} if names == nil { @@ -98,3 +109,7 @@ func validateAPI(apiConfig *api.APIConfigSpec) error { func int64Ptr(i int64) *int64 { return &i } + +func stringPtr(s string) *string { + return &s +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index fed1df9a..aec478b2 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -35,6 +35,10 @@ func TestFromPath(t *testing.T) { servingCertificateSecret: pinniped-api-tls-serving-certificate credentialIssuerConfig: pinniped-config apiService: pinniped-api + kubeCertAgentPrefix: kube-cert-agent-prefix + KubeCertAgent: + namePrefix: kube-cert-agent-name-prefix- + image: kube-cert-agent-image `), wantConfig: &api.Config{ DiscoveryInfo: api.DiscoveryInfoSpec{ @@ -51,6 +55,10 @@ func TestFromPath(t *testing.T) { CredentialIssuerConfig: "pinniped-config", APIService: "pinniped-api", }, + KubeCertAgentConfig: api.KubeCertAgentSpec{ + NamePrefix: stringPtr("kube-cert-agent-name-prefix-"), + Image: stringPtr("kube-cert-agent-image"), + }, }, }, { @@ -77,6 +85,10 @@ func TestFromPath(t *testing.T) { CredentialIssuerConfig: "pinniped-config", APIService: "pinniped-api", }, + KubeCertAgentConfig: api.KubeCertAgentSpec{ + NamePrefix: stringPtr("pinniped-kube-cert-agent-"), + Image: stringPtr("debian:latest"), + }, }, }, { @@ -187,7 +199,3 @@ func TestFromPath(t *testing.T) { }) } } - -func stringPtr(s string) *string { - return &s -} diff --git a/test/integration/kubecertagent_test.go b/test/integration/kubecertagent_test.go new file mode 100644 index 00000000..220eb9aa --- /dev/null +++ b/test/integration/kubecertagent_test.go @@ -0,0 +1,113 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/diff" + + "go.pinniped.dev/test/library" +) + +const ( + kubeCertAgentNamespace = "kube-system" + kubeCertAgentLabelSelector = "kube-cert-agent.pinniped.dev=" +) + +func TestKubeCertAgent(t *testing.T) { + library.SkipUnlessIntegration(t) + library.SkipUnlessClusterHasCapability(t, library.ClusterSigningKeyIsAvailable) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + kubeClient := library.NewClientset(t) + + // Get the current number of kube-cert-agent pods. + // + // We can pretty safely assert there should be more than 1, since there should be a + // kube-cert-agent pod per kube-controller-manager pod, and there should probably be at least + // 1 kube-controller-manager for this to be a working kube API. + originalAgentPods, err := kubeClient.CoreV1().Pods(kubeCertAgentNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: kubeCertAgentLabelSelector, + }) + require.NoError(t, err) + require.NotEmpty(t, originalAgentPods.Items) + sortPods(originalAgentPods) + + agentPodsReconciled := func() bool { + var currentAgentPods *corev1.PodList + currentAgentPods, err = kubeClient.CoreV1().Pods(kubeCertAgentNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: kubeCertAgentLabelSelector, + }) + + if err != nil { + return false + } + + sortPods(currentAgentPods) + for i := range originalAgentPods.Items { + if !equality.Semantic.DeepEqual( + originalAgentPods.Items[i].Spec, + currentAgentPods.Items[i].Spec, + ) { + err = fmt.Errorf( + "original agent pod != current agent pod: %s", + diff.ObjectDiff(originalAgentPods.Items[i].Spec, currentAgentPods.Items[i].Spec), + ) + return false + } + } + + return true + } + + t.Run("reconcile on update", func(t *testing.T) { + // Update the image of the first pod. The controller should see it, and flip it back. + // + // Note that we update the toleration field here because it is the only field, currently, that + // 1) we are allowed to update on a running pod AND 2) the kube-cert-agent controllers care + // about. + updatedAgentPod := originalAgentPods.Items[0].DeepCopy() + updatedAgentPod.Spec.Tolerations = append( + updatedAgentPod.Spec.Tolerations, + corev1.Toleration{Key: "fake-toleration"}, + ) + _, err = kubeClient.CoreV1().Pods(kubeCertAgentNamespace).Update(ctx, updatedAgentPod, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Make sure the original pods come back. + assert.Eventually(t, agentPodsReconciled, 10*time.Second, 250*time.Millisecond) + require.NoError(t, err) + }) + + t.Run("reconcile on delete", func(t *testing.T) { + // Delete the first pod. The controller should see it, and flip it back. + err = kubeClient. + CoreV1(). + Pods(kubeCertAgentNamespace). + Delete(ctx, originalAgentPods.Items[0].Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + // Make sure the original pods come back. + assert.Eventually(t, agentPodsReconciled, 10*time.Second, 250*time.Millisecond) + require.NoError(t, err) + }) +} + +func sortPods(pods *corev1.PodList) { + sort.Slice(pods.Items, func(i, j int) bool { + return pods.Items[i].Name < pods.Items[j].Name + }) +}