From 9735122db9be6b442154076bf0d339a2cdc482ed Mon Sep 17 00:00:00 2001 From: Andrew Keesler Date: Wed, 23 Sep 2020 11:01:41 -0400 Subject: [PATCH] Wire in kubecertagent.NewExecerController() to server Also fill in a couple of low-hanging unit tests. Signed-off-by: Andrew Keesler --- .../controller/kubecertagent/annotater.go | 46 ++++++++------- .../kubecertagent/annotater_test.go | 57 ++++++++++++++++-- .../controller/kubecertagent/deleter_test.go | 25 ++++++++ internal/controller/kubecertagent/execer.go | 12 ++-- .../controller/kubecertagent/execer_test.go | 20 +++++++ .../kubecertagent/pod_command_executor.go | 59 +++++++++++++++++++ .../controllermanager/prepare_controllers.go | 52 ++++++++++++---- internal/server/server.go | 22 +++---- 8 files changed, 239 insertions(+), 54 deletions(-) create mode 100644 internal/controller/kubecertagent/pod_command_executor.go diff --git a/internal/controller/kubecertagent/annotater.go b/internal/controller/kubecertagent/annotater.go index a87137f4..42713f00 100644 --- a/internal/controller/kubecertagent/annotater.go +++ b/internal/controller/kubecertagent/annotater.go @@ -20,6 +20,13 @@ import ( "go.pinniped.dev/internal/controllerlib" ) +// These constants are the default values for the kube-controller-manager flags. If the flags are +// not properly set on the kube-controller-manager process, then we will fallback to using these. +const ( + k8sAPIServerCACertPEMDefaultPath = "/etc/kubernetes/ca/ca.pem" + k8sAPIServerCAKeyPEMDefaultPath = "/etc/kubernetes/ca/ca.key" +) + type annotaterController struct { agentInfo *Info k8sClient kubernetes.Interface @@ -86,17 +93,22 @@ func (c *annotaterController) Sync(ctx controllerlib.Context) error { continue } - // TODO if the paths cannot be found, then still add the annotations anyway using the defaults k8sAPIServerCAKeyPEMDefaultPath and k8sAPIServerCACertPEMDefaultPath - certPath, certPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-cert-file") - keyPath, keyPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-key-file") + certPath := getContainerArgByName( + controllerManagerPod, + "cluster-signing-cert-file", + k8sAPIServerCACertPEMDefaultPath, + ) + keyPath := getContainerArgByName( + controllerManagerPod, + "cluster-signing-key-file", + k8sAPIServerCAKeyPEMDefaultPath, + ) if err := c.maybeUpdateAgentPod( ctx.Context, agentPod.Name, agentPod.Namespace, certPath, - certPathOK, keyPath, - keyPathOK, ); err != nil { // TODO Failed, so update the CIC status? return fmt.Errorf("cannot update agent pod: %w", err) @@ -111,9 +123,7 @@ func (c *annotaterController) maybeUpdateAgentPod( name string, namespace string, certPath string, - certPathOK bool, keyPath string, - keyPathOK bool, ) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { agentPod, err := c.agentPodInformer.Lister().Pods(namespace).Get(name) @@ -121,15 +131,13 @@ func (c *annotaterController) maybeUpdateAgentPod( return err } - if (certPathOK && agentPod.Annotations[c.agentInfo.CertPathAnnotation] != certPath) || - (keyPathOK && agentPod.Annotations[c.agentInfo.KeyPathAnnotation] != keyPath) { + if agentPod.Annotations[c.agentInfo.CertPathAnnotation] != certPath || + agentPod.Annotations[c.agentInfo.KeyPathAnnotation] != keyPath { if err := c.reallyUpdateAgentPod( ctx, agentPod, certPath, - certPathOK, keyPath, - keyPathOK, ); err != nil { return err } @@ -143,21 +151,15 @@ func (c *annotaterController) reallyUpdateAgentPod( ctx context.Context, agentPod *corev1.Pod, certPath string, - certPathOK bool, keyPath string, - keyPathOK bool, ) error { // Create a deep copy of the agent pod since it is coming straight from the cache. updatedAgentPod := agentPod.DeepCopy() if updatedAgentPod.Annotations == nil { updatedAgentPod.Annotations = make(map[string]string) } - if certPathOK { - updatedAgentPod.Annotations[c.agentInfo.CertPathAnnotation] = certPath - } - if keyPathOK { - updatedAgentPod.Annotations[c.agentInfo.KeyPathAnnotation] = keyPath - } + updatedAgentPod.Annotations[c.agentInfo.CertPathAnnotation] = certPath + updatedAgentPod.Annotations[c.agentInfo.KeyPathAnnotation] = keyPath klog.InfoS( "updating agent pod annotations", @@ -175,7 +177,7 @@ func (c *annotaterController) reallyUpdateAgentPod( return err } -func getContainerArgByName(pod *corev1.Pod, name string) (string, bool) { +func getContainerArgByName(pod *corev1.Pod, name, fallbackValue string) string { for _, container := range pod.Spec.Containers { flagset := pflag.NewFlagSet("", pflag.ContinueOnError) flagset.ParseErrorsWhitelist = pflag.ParseErrorsWhitelist{UnknownFlags: true} @@ -183,8 +185,8 @@ func getContainerArgByName(pod *corev1.Pod, name string) (string, bool) { flagset.StringVar(&val, name, "", "") _ = flagset.Parse(append(container.Command, container.Args...)) if val != "" { - return val, true + return val } } - return "", false + return fallbackValue } diff --git a/internal/controller/kubecertagent/annotater_test.go b/internal/controller/kubecertagent/annotater_test.go index 8b40c27e..8b1365ab 100644 --- a/internal/controller/kubecertagent/annotater_test.go +++ b/internal/controller/kubecertagent/annotater_test.go @@ -51,6 +51,8 @@ func TestAnnotaterControllerSync(t *testing.T) { spec.Run(t, "AnnotaterControllerSync", func(t *testing.T, when spec.G, it spec.S) { const kubeSystemNamespace = "kube-system" const agentPodNamespace = "agent-pod-namespace" + const defaultKubeControllerManagerClusterSigningCertFileFlagValue = "/etc/kubernetes/ca/ca.pem" + const defaultKubeControllerManagerClusterSigningKeyFileFlagValue = "/etc/kubernetes/ca/ca.key" const ( certPath = "some-cert-path" @@ -278,6 +280,36 @@ func TestAnnotaterControllerSync(t *testing.T) { }) }) + when("there is a controller manager pod with no CLI flags", func() { + it.Before(func() { + controllerManagerPod.Spec.Containers[0].Command = []string{ + "kube-controller-manager", + } + r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod)) + r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) + }) + + it("updates the annotations with the default values", func() { + startInformersAndController() + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations[certPathAnnotation] = defaultKubeControllerManagerClusterSigningCertFileFlagValue + updatedAgentPod.Annotations[keyPathAnnotation] = defaultKubeControllerManagerClusterSigningKeyFileFlagValue + + r.Equal( + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + agentPodNamespace, + updatedAgentPod, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + when("there is a controller manager pod with unparsable CLI flags", func() { it.Before(func() { controllerManagerPod.Spec.Containers[0].Command = []string{ @@ -289,11 +321,22 @@ func TestAnnotaterControllerSync(t *testing.T) { r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) }) - it("does not update any annotations", func() { + it("updates the annotations with the default values", func() { startInformersAndController() r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations[certPathAnnotation] = defaultKubeControllerManagerClusterSigningCertFileFlagValue + updatedAgentPod.Annotations[keyPathAnnotation] = defaultKubeControllerManagerClusterSigningKeyFileFlagValue + r.Equal( - []coretesting.Action{}, + []coretesting.Action{ + coretesting.NewUpdateAction( + podsGVR, + agentPodNamespace, + updatedAgentPod, + ), + }, kubeAPIClient.Actions(), ) }) @@ -310,12 +353,14 @@ func TestAnnotaterControllerSync(t *testing.T) { r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) }) - it("updates the key annotation", func() { + it("updates the key annotation with the default cert flag value", func() { startInformersAndController() r.NoError(controllerlib.TestSync(t, subject, *syncContext)) updatedAgentPod := agentPod.DeepCopy() + updatedAgentPod.Annotations[certPathAnnotation] = defaultKubeControllerManagerClusterSigningCertFileFlagValue updatedAgentPod.Annotations[keyPathAnnotation] = keyPath + r.Equal( []coretesting.Action{ coretesting.NewUpdateAction( @@ -329,7 +374,7 @@ func TestAnnotaterControllerSync(t *testing.T) { }) }) - when("there is a controller manager pod with unparsable keey CLI flag", func() { + when("there is a controller manager pod with unparsable key CLI flag", func() { it.Before(func() { controllerManagerPod.Spec.Containers[0].Command = []string{ "kube-controller-manager", @@ -340,12 +385,14 @@ func TestAnnotaterControllerSync(t *testing.T) { r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod)) }) - it("updates the cert annotation", func() { + it("updates the cert annotation with the default key flag value", func() { startInformersAndController() r.NoError(controllerlib.TestSync(t, subject, *syncContext)) updatedAgentPod := agentPod.DeepCopy() updatedAgentPod.Annotations[certPathAnnotation] = certPath + updatedAgentPod.Annotations[keyPathAnnotation] = defaultKubeControllerManagerClusterSigningKeyFileFlagValue + r.Equal( []coretesting.Action{ coretesting.NewUpdateAction( diff --git a/internal/controller/kubecertagent/deleter_test.go b/internal/controller/kubecertagent/deleter_test.go index 650d031e..cd076f8a 100644 --- a/internal/controller/kubecertagent/deleter_test.go +++ b/internal/controller/kubecertagent/deleter_test.go @@ -462,6 +462,31 @@ func TestDeleterControllerSync(t *testing.T) { }) }) + when("the agent pod is out of sync with the template via image", func() { + it.Before(func() { + agentPod.Spec.Containers[0].Image = "new-image" + r.NoError(agentInformerClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace)) + r.NoError(kubeAPIClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace)) + }) + + it("deletes the agent pod", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + + r.NoError(err) + r.Equal( + []coretesting.Action{ + coretesting.NewDeleteAction( + podsGVR, + agentPodNamespace, + agentPod.Name, + ), + }, + kubeAPIClient.Actions(), + ) + }) + }) + when("there is no matching controller manager pod", func() { it("deletes the agent pod", func() { startInformersAndController() diff --git a/internal/controller/kubecertagent/execer.go b/internal/controller/kubecertagent/execer.go index ccb2783a..fc1d0f30 100644 --- a/internal/controller/kubecertagent/execer.go +++ b/internal/controller/kubecertagent/execer.go @@ -15,7 +15,6 @@ import ( configv1alpha1 "go.pinniped.dev/generated/1.19/apis/config/v1alpha1" pinnipedclientset "go.pinniped.dev/generated/1.19/client/clientset/versioned" - "go.pinniped.dev/internal/certauthority/kubecertauthority" pinnipedcontroller "go.pinniped.dev/internal/controller" "go.pinniped.dev/internal/controller/issuerconfig" "go.pinniped.dev/internal/controllerlib" @@ -27,18 +26,21 @@ type execerController struct { credentialIssuerConfigNamespaceName string credentialIssuerConfigResourceName string dynamicCertProvider dynamiccert.Provider - podCommandExecutor kubecertauthority.PodCommandExecutor + podCommandExecutor PodCommandExecutor clock clock.Clock pinnipedAPIClient pinnipedclientset.Interface agentPodInformer corev1informers.PodInformer } +// NewExecerController returns a controllerlib.Controller that listens for agent pods with proper +// cert/key path annotations and execs into them to get the cert/key material. It sets the retrieved +// key material in a provided dynamicCertProvider. func NewExecerController( agentInfo *Info, credentialIssuerConfigNamespaceName string, credentialIssuerConfigResourceName string, dynamicCertProvider dynamiccert.Provider, - podCommandExecutor kubecertauthority.PodCommandExecutor, + podCommandExecutor PodCommandExecutor, pinnipedAPIClient pinnipedclientset.Interface, clock clock.Clock, agentPodInformer corev1informers.PodInformer, @@ -108,7 +110,9 @@ func (c *execerController) Sync(ctx controllerlib.Context) error { c.dynamicCertProvider.Set([]byte(certPEM), []byte(keyPEM)) err = c.createOrUpdateCredentialIssuerConfig(ctx, c.strategySuccess()) - _ = err // TODO return this error? (needs test) + if err != nil { + return err + } return nil } diff --git a/internal/controller/kubecertagent/execer_test.go b/internal/controller/kubecertagent/execer_test.go index 1c15359a..ef3633ff 100644 --- a/internal/controller/kubecertagent/execer_test.go +++ b/internal/controller/kubecertagent/execer_test.go @@ -5,6 +5,7 @@ package kubecertagent import ( "context" + "errors" "fmt" "io/ioutil" "testing" @@ -15,6 +16,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/clock" kubeinformers "k8s.io/client-go/informers" @@ -397,6 +399,24 @@ func TestManagerControllerSync(t *testing.T) { expectedCreateAction := coretesting.NewUpdateAction(credentialIssuerConfigGVR, credentialIssuerConfigNamespaceName, expectedCredentialIssuerConfig) r.Equal([]coretesting.Action{expectedGetAction, expectedCreateAction}, pinnipedAPIClient.Actions()) }) + + when("updating the CredentialIssuerConfig fails", func() { + it.Before(func() { + pinnipedAPIClient.PrependReactor( + "update", + "credentialissuerconfigs", + func(_ coretesting.Action) (bool, runtime.Object, error) { + return true, nil, errors.New("some update error") + }, + ) + }) + + it("returns an error", func() { + startInformersAndController() + err := controllerlib.TestSync(t, subject, *syncContext) + r.EqualError(err, "could not create or update credentialissuerconfig: some update error") + }) + }) }) when("there is not already a CredentialIssuerConfig", func() { diff --git a/internal/controller/kubecertagent/pod_command_executor.go b/internal/controller/kubecertagent/pod_command_executor.go new file mode 100644 index 00000000..86f94c3f --- /dev/null +++ b/internal/controller/kubecertagent/pod_command_executor.go @@ -0,0 +1,59 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package kubecertagent + +import ( + "bytes" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/deprecated/scheme" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +// PodCommandExecutor can exec a command in a pod located via namespace and name. +type PodCommandExecutor interface { + Exec(podNamespace string, podName string, commandAndArgs ...string) (stdoutResult string, err error) +} + +type kubeClientPodCommandExecutor struct { + kubeConfig *restclient.Config + kubeClient kubernetes.Interface +} + +// NewPodCommandExecutor returns a PodCommandExecutor that will interact with a pod via the provided +// kubeConfig and corresponding kubeClient. +func NewPodCommandExecutor(kubeConfig *restclient.Config, kubeClient kubernetes.Interface) PodCommandExecutor { + return &kubeClientPodCommandExecutor{kubeConfig: kubeConfig, kubeClient: kubeClient} +} + +func (s *kubeClientPodCommandExecutor) Exec(podNamespace string, podName string, commandAndArgs ...string) (string, error) { + request := s.kubeClient. + CoreV1(). + RESTClient(). + Post(). + Namespace(podNamespace). + Resource("pods"). + Name(podName). + SubResource("exec"). + VersionedParams(&v1.PodExecOptions{ + Stdin: false, + Stdout: true, + Stderr: false, + TTY: false, + Command: commandAndArgs, + }, scheme.ParameterCodec) + + executor, err := remotecommand.NewSPDYExecutor(s.kubeConfig, "POST", request.URL()) + if err != nil { + return "", err + } + + var stdoutBuf bytes.Buffer + if err := executor.Stream(remotecommand.StreamOptions{Stdout: &stdoutBuf}); err != nil { + return "", err + } + return stdoutBuf.String(), nil +} diff --git a/internal/controllermanager/prepare_controllers.go b/internal/controllermanager/prepare_controllers.go index a089377f..5f5fdcf7 100644 --- a/internal/controllermanager/prepare_controllers.go +++ b/internal/controllermanager/prepare_controllers.go @@ -12,8 +12,10 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" "k8s.io/klog/v2/klogr" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" @@ -54,6 +56,9 @@ type Config struct { // DynamicServingCertProvider provides a setter and a getter to the Pinniped API's serving cert. DynamicServingCertProvider dynamiccert.Provider + // DynamicSigningCertProvider provides a setter and a getter to the Pinniped API's + // signing cert, i.e., the cert that it uses to sign certs for Pinniped clients wishing to login. + DynamicSigningCertProvider dynamiccert.Provider // ServingCertDuration is the validity period, in seconds, of the API serving certificate. ServingCertDuration time.Duration @@ -75,17 +80,17 @@ type Config struct { // KubeCertAgentKeyPathAnnotation is the name of the annotation key that will be used when setting // the best-guess path to the kube API's key. See kubecertagent.Info for more details. KubeCertAgentKeyPathAnnotation string - - // KubeCertAgentDynamicSigningCertProvider provides a setter and a getter to the Pinniped API's - // signing cert, i.e., the cert that it uses to sign certs for Pinniped clients wishing to login. - KubeCertAgentDynamicSigningCertProvider dynamiccert.Provider } // Prepare the controllers and their informers and return a function that will start them when called. //nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so... func PrepareControllers(c *Config) (func(ctx context.Context), error) { // Create k8s clients. - k8sClient, aggregatorClient, pinnipedClient, err := createClients() + kubeConfig, err := createConfig() + if err != nil { + return nil, fmt.Errorf("could not create config for the controllers: %w", err) + } + k8sClient, aggregatorClient, pinnipedClient, err := createClients(kubeConfig) if err != nil { return nil, fmt.Errorf("could not create clients for the controllers: %w", err) } @@ -207,6 +212,24 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) { controllerlib.WithInformer, ), singletonWorker, + ). + WithController( + kubecertagent.NewExecerController( + &kubecertagent.Info{ + Template: c.KubeCertAgentTemplate, + CertPathAnnotation: c.KubeCertAgentCertPathAnnotation, + KeyPathAnnotation: c.KubeCertAgentKeyPathAnnotation, + }, + c.ServerInstallationNamespace, + c.NamesConfig.CredentialIssuerConfig, + c.DynamicSigningCertProvider, + kubecertagent.NewPodCommandExecutor(kubeConfig, k8sClient), + pinnipedClient, + clock.RealClock{}, + informers.kubeSystemNamespaceK8s.Core().V1().Pods(), + controllerlib.WithInformer, + ), + singletonWorker, ) // Return a function which starts the informers and controllers. @@ -216,19 +239,24 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) { }, nil } +// Create the rest config that will be used by the clients for the controllers. +func createConfig() (*rest.Config, error) { + // Load the Kubernetes client configuration. + kubeConfig, err := restclient.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("could not load in-cluster configuration: %w", err) + } + + return kubeConfig, nil +} + // Create the k8s clients that will be used by the controllers. -func createClients() ( +func createClients(kubeConfig *rest.Config) ( k8sClient *kubernetes.Clientset, aggregatorClient *aggregatorclient.Clientset, pinnipedClient *pinnipedclientset.Clientset, err error, ) { - // Load the Kubernetes client configuration. - kubeConfig, err := restclient.InClusterConfig() - if err != nil { - return nil, nil, nil, fmt.Errorf("could not load in-cluster configuration: %w", err) - } - // explicitly use protobuf when talking to built-in kube APIs protoKubeConfig := createProtoKubeConfig(kubeConfig) diff --git a/internal/server/server.go b/internal/server/server.go index 4e714753..3e537ecb 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -147,17 +147,17 @@ func (a *App) runServer(ctx context.Context) error { // post start hook of the aggregated API server. startControllersFunc, err := controllermanager.PrepareControllers( &controllermanager.Config{ - ServerInstallationNamespace: serverInstallationNamespace, - NamesConfig: &cfg.NamesConfig, - DiscoveryURLOverride: cfg.DiscoveryInfo.URL, - DynamicServingCertProvider: dynamicServingCertProvider, - ServingCertDuration: time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds) * time.Second, - ServingCertRenewBefore: time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds) * time.Second, - IDPCache: idpCache, - KubeCertAgentTemplate: kubeCertAgentTemplate, - KubeCertAgentCertPathAnnotation: kubeCertAgentCertPathAnnotationKey, - KubeCertAgentKeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey, - KubeCertAgentDynamicSigningCertProvider: dynamicSigningCertProvider, + ServerInstallationNamespace: serverInstallationNamespace, + NamesConfig: &cfg.NamesConfig, + DiscoveryURLOverride: cfg.DiscoveryInfo.URL, + DynamicServingCertProvider: dynamicServingCertProvider, + DynamicSigningCertProvider: dynamicSigningCertProvider, + ServingCertDuration: time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds) * time.Second, + ServingCertRenewBefore: time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds) * time.Second, + IDPCache: idpCache, + KubeCertAgentTemplate: kubeCertAgentTemplate, + KubeCertAgentCertPathAnnotation: kubeCertAgentCertPathAnnotationKey, + KubeCertAgentKeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey, }, ) if err != nil {