Wire in kubecertagent.NewExecerController() to server

Also fill in a couple of low-hanging unit tests.

Signed-off-by: Andrew Keesler <akeesler@vmware.com>
This commit is contained in:
Andrew Keesler 2020-09-23 11:01:41 -04:00
parent 4948e1702f
commit 9735122db9
No known key found for this signature in database
GPG Key ID: 27CE0444346F9413
8 changed files with 239 additions and 54 deletions

View File

@ -20,6 +20,13 @@ import (
"go.pinniped.dev/internal/controllerlib"
)
// These constants are the default values for the kube-controller-manager flags. If the flags are
// not properly set on the kube-controller-manager process, then we will fallback to using these.
const (
k8sAPIServerCACertPEMDefaultPath = "/etc/kubernetes/ca/ca.pem"
k8sAPIServerCAKeyPEMDefaultPath = "/etc/kubernetes/ca/ca.key"
)
type annotaterController struct {
agentInfo *Info
k8sClient kubernetes.Interface
@ -86,17 +93,22 @@ func (c *annotaterController) Sync(ctx controllerlib.Context) error {
continue
}
// TODO if the paths cannot be found, then still add the annotations anyway using the defaults k8sAPIServerCAKeyPEMDefaultPath and k8sAPIServerCACertPEMDefaultPath
certPath, certPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-cert-file")
keyPath, keyPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-key-file")
certPath := getContainerArgByName(
controllerManagerPod,
"cluster-signing-cert-file",
k8sAPIServerCACertPEMDefaultPath,
)
keyPath := getContainerArgByName(
controllerManagerPod,
"cluster-signing-key-file",
k8sAPIServerCAKeyPEMDefaultPath,
)
if err := c.maybeUpdateAgentPod(
ctx.Context,
agentPod.Name,
agentPod.Namespace,
certPath,
certPathOK,
keyPath,
keyPathOK,
); err != nil {
// TODO Failed, so update the CIC status?
return fmt.Errorf("cannot update agent pod: %w", err)
@ -111,9 +123,7 @@ func (c *annotaterController) maybeUpdateAgentPod(
name string,
namespace string,
certPath string,
certPathOK bool,
keyPath string,
keyPathOK bool,
) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
agentPod, err := c.agentPodInformer.Lister().Pods(namespace).Get(name)
@ -121,15 +131,13 @@ func (c *annotaterController) maybeUpdateAgentPod(
return err
}
if (certPathOK && agentPod.Annotations[c.agentInfo.CertPathAnnotation] != certPath) ||
(keyPathOK && agentPod.Annotations[c.agentInfo.KeyPathAnnotation] != keyPath) {
if agentPod.Annotations[c.agentInfo.CertPathAnnotation] != certPath ||
agentPod.Annotations[c.agentInfo.KeyPathAnnotation] != keyPath {
if err := c.reallyUpdateAgentPod(
ctx,
agentPod,
certPath,
certPathOK,
keyPath,
keyPathOK,
); err != nil {
return err
}
@ -143,21 +151,15 @@ func (c *annotaterController) reallyUpdateAgentPod(
ctx context.Context,
agentPod *corev1.Pod,
certPath string,
certPathOK bool,
keyPath string,
keyPathOK bool,
) error {
// Create a deep copy of the agent pod since it is coming straight from the cache.
updatedAgentPod := agentPod.DeepCopy()
if updatedAgentPod.Annotations == nil {
updatedAgentPod.Annotations = make(map[string]string)
}
if certPathOK {
updatedAgentPod.Annotations[c.agentInfo.CertPathAnnotation] = certPath
}
if keyPathOK {
updatedAgentPod.Annotations[c.agentInfo.KeyPathAnnotation] = keyPath
}
updatedAgentPod.Annotations[c.agentInfo.CertPathAnnotation] = certPath
updatedAgentPod.Annotations[c.agentInfo.KeyPathAnnotation] = keyPath
klog.InfoS(
"updating agent pod annotations",
@ -175,7 +177,7 @@ func (c *annotaterController) reallyUpdateAgentPod(
return err
}
func getContainerArgByName(pod *corev1.Pod, name string) (string, bool) {
func getContainerArgByName(pod *corev1.Pod, name, fallbackValue string) string {
for _, container := range pod.Spec.Containers {
flagset := pflag.NewFlagSet("", pflag.ContinueOnError)
flagset.ParseErrorsWhitelist = pflag.ParseErrorsWhitelist{UnknownFlags: true}
@ -183,8 +185,8 @@ func getContainerArgByName(pod *corev1.Pod, name string) (string, bool) {
flagset.StringVar(&val, name, "", "")
_ = flagset.Parse(append(container.Command, container.Args...))
if val != "" {
return val, true
return val
}
}
return "", false
return fallbackValue
}

View File

@ -51,6 +51,8 @@ func TestAnnotaterControllerSync(t *testing.T) {
spec.Run(t, "AnnotaterControllerSync", func(t *testing.T, when spec.G, it spec.S) {
const kubeSystemNamespace = "kube-system"
const agentPodNamespace = "agent-pod-namespace"
const defaultKubeControllerManagerClusterSigningCertFileFlagValue = "/etc/kubernetes/ca/ca.pem"
const defaultKubeControllerManagerClusterSigningKeyFileFlagValue = "/etc/kubernetes/ca/ca.key"
const (
certPath = "some-cert-path"
@ -278,6 +280,36 @@ func TestAnnotaterControllerSync(t *testing.T) {
})
})
when("there is a controller manager pod with no CLI flags", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].Command = []string{
"kube-controller-manager",
}
r.NoError(kubeSystemInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the annotations with the default values", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations[certPathAnnotation] = defaultKubeControllerManagerClusterSigningCertFileFlagValue
updatedAgentPod.Annotations[keyPathAnnotation] = defaultKubeControllerManagerClusterSigningKeyFileFlagValue
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
agentPodNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is a controller manager pod with unparsable CLI flags", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].Command = []string{
@ -289,11 +321,22 @@ func TestAnnotaterControllerSync(t *testing.T) {
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("does not update any annotations", func() {
it("updates the annotations with the default values", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations[certPathAnnotation] = defaultKubeControllerManagerClusterSigningCertFileFlagValue
updatedAgentPod.Annotations[keyPathAnnotation] = defaultKubeControllerManagerClusterSigningKeyFileFlagValue
r.Equal(
[]coretesting.Action{},
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
agentPodNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
@ -310,12 +353,14 @@ func TestAnnotaterControllerSync(t *testing.T) {
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the key annotation", func() {
it("updates the key annotation with the default cert flag value", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations[certPathAnnotation] = defaultKubeControllerManagerClusterSigningCertFileFlagValue
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
@ -329,7 +374,7 @@ func TestAnnotaterControllerSync(t *testing.T) {
})
})
when("there is a controller manager pod with unparsable keey CLI flag", func() {
when("there is a controller manager pod with unparsable key CLI flag", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].Command = []string{
"kube-controller-manager",
@ -340,12 +385,14 @@ func TestAnnotaterControllerSync(t *testing.T) {
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the cert annotation", func() {
it("updates the cert annotation with the default key flag value", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations[certPathAnnotation] = certPath
updatedAgentPod.Annotations[keyPathAnnotation] = defaultKubeControllerManagerClusterSigningKeyFileFlagValue
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(

View File

@ -462,6 +462,31 @@ func TestDeleterControllerSync(t *testing.T) {
})
})
when("the agent pod is out of sync with the template via image", func() {
it.Before(func() {
agentPod.Spec.Containers[0].Image = "new-image"
r.NoError(agentInformerClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
agentPodNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is no matching controller manager pod", func() {
it("deletes the agent pod", func() {
startInformersAndController()

View File

@ -15,7 +15,6 @@ import (
configv1alpha1 "go.pinniped.dev/generated/1.19/apis/config/v1alpha1"
pinnipedclientset "go.pinniped.dev/generated/1.19/client/clientset/versioned"
"go.pinniped.dev/internal/certauthority/kubecertauthority"
pinnipedcontroller "go.pinniped.dev/internal/controller"
"go.pinniped.dev/internal/controller/issuerconfig"
"go.pinniped.dev/internal/controllerlib"
@ -27,18 +26,21 @@ type execerController struct {
credentialIssuerConfigNamespaceName string
credentialIssuerConfigResourceName string
dynamicCertProvider dynamiccert.Provider
podCommandExecutor kubecertauthority.PodCommandExecutor
podCommandExecutor PodCommandExecutor
clock clock.Clock
pinnipedAPIClient pinnipedclientset.Interface
agentPodInformer corev1informers.PodInformer
}
// NewExecerController returns a controllerlib.Controller that listens for agent pods with proper
// cert/key path annotations and execs into them to get the cert/key material. It sets the retrieved
// key material in a provided dynamicCertProvider.
func NewExecerController(
agentInfo *Info,
credentialIssuerConfigNamespaceName string,
credentialIssuerConfigResourceName string,
dynamicCertProvider dynamiccert.Provider,
podCommandExecutor kubecertauthority.PodCommandExecutor,
podCommandExecutor PodCommandExecutor,
pinnipedAPIClient pinnipedclientset.Interface,
clock clock.Clock,
agentPodInformer corev1informers.PodInformer,
@ -108,7 +110,9 @@ func (c *execerController) Sync(ctx controllerlib.Context) error {
c.dynamicCertProvider.Set([]byte(certPEM), []byte(keyPEM))
err = c.createOrUpdateCredentialIssuerConfig(ctx, c.strategySuccess())
_ = err // TODO return this error? (needs test)
if err != nil {
return err
}
return nil
}

View File

@ -5,6 +5,7 @@ package kubecertagent
import (
"context"
"errors"
"fmt"
"io/ioutil"
"testing"
@ -15,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
kubeinformers "k8s.io/client-go/informers"
@ -397,6 +399,24 @@ func TestManagerControllerSync(t *testing.T) {
expectedCreateAction := coretesting.NewUpdateAction(credentialIssuerConfigGVR, credentialIssuerConfigNamespaceName, expectedCredentialIssuerConfig)
r.Equal([]coretesting.Action{expectedGetAction, expectedCreateAction}, pinnipedAPIClient.Actions())
})
when("updating the CredentialIssuerConfig fails", func() {
it.Before(func() {
pinnipedAPIClient.PrependReactor(
"update",
"credentialissuerconfigs",
func(_ coretesting.Action) (bool, runtime.Object, error) {
return true, nil, errors.New("some update error")
},
)
})
it("returns an error", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.EqualError(err, "could not create or update credentialissuerconfig: some update error")
})
})
})
when("there is not already a CredentialIssuerConfig", func() {

View File

@ -0,0 +1,59 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"bytes"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/deprecated/scheme"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
// PodCommandExecutor can exec a command in a pod located via namespace and name.
type PodCommandExecutor interface {
Exec(podNamespace string, podName string, commandAndArgs ...string) (stdoutResult string, err error)
}
type kubeClientPodCommandExecutor struct {
kubeConfig *restclient.Config
kubeClient kubernetes.Interface
}
// NewPodCommandExecutor returns a PodCommandExecutor that will interact with a pod via the provided
// kubeConfig and corresponding kubeClient.
func NewPodCommandExecutor(kubeConfig *restclient.Config, kubeClient kubernetes.Interface) PodCommandExecutor {
return &kubeClientPodCommandExecutor{kubeConfig: kubeConfig, kubeClient: kubeClient}
}
func (s *kubeClientPodCommandExecutor) Exec(podNamespace string, podName string, commandAndArgs ...string) (string, error) {
request := s.kubeClient.
CoreV1().
RESTClient().
Post().
Namespace(podNamespace).
Resource("pods").
Name(podName).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Stdin: false,
Stdout: true,
Stderr: false,
TTY: false,
Command: commandAndArgs,
}, scheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(s.kubeConfig, "POST", request.URL())
if err != nil {
return "", err
}
var stdoutBuf bytes.Buffer
if err := executor.Stream(remotecommand.StreamOptions{Stdout: &stdoutBuf}); err != nil {
return "", err
}
return stdoutBuf.String(), nil
}

View File

@ -12,8 +12,10 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/klogr"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
@ -54,6 +56,9 @@ type Config struct {
// DynamicServingCertProvider provides a setter and a getter to the Pinniped API's serving cert.
DynamicServingCertProvider dynamiccert.Provider
// DynamicSigningCertProvider provides a setter and a getter to the Pinniped API's
// signing cert, i.e., the cert that it uses to sign certs for Pinniped clients wishing to login.
DynamicSigningCertProvider dynamiccert.Provider
// ServingCertDuration is the validity period, in seconds, of the API serving certificate.
ServingCertDuration time.Duration
@ -75,17 +80,17 @@ type Config struct {
// KubeCertAgentKeyPathAnnotation is the name of the annotation key that will be used when setting
// the best-guess path to the kube API's key. See kubecertagent.Info for more details.
KubeCertAgentKeyPathAnnotation string
// KubeCertAgentDynamicSigningCertProvider provides a setter and a getter to the Pinniped API's
// signing cert, i.e., the cert that it uses to sign certs for Pinniped clients wishing to login.
KubeCertAgentDynamicSigningCertProvider dynamiccert.Provider
}
// Prepare the controllers and their informers and return a function that will start them when called.
//nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so...
func PrepareControllers(c *Config) (func(ctx context.Context), error) {
// Create k8s clients.
k8sClient, aggregatorClient, pinnipedClient, err := createClients()
kubeConfig, err := createConfig()
if err != nil {
return nil, fmt.Errorf("could not create config for the controllers: %w", err)
}
k8sClient, aggregatorClient, pinnipedClient, err := createClients(kubeConfig)
if err != nil {
return nil, fmt.Errorf("could not create clients for the controllers: %w", err)
}
@ -207,6 +212,24 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
controllerlib.WithInformer,
),
singletonWorker,
).
WithController(
kubecertagent.NewExecerController(
&kubecertagent.Info{
Template: c.KubeCertAgentTemplate,
CertPathAnnotation: c.KubeCertAgentCertPathAnnotation,
KeyPathAnnotation: c.KubeCertAgentKeyPathAnnotation,
},
c.ServerInstallationNamespace,
c.NamesConfig.CredentialIssuerConfig,
c.DynamicSigningCertProvider,
kubecertagent.NewPodCommandExecutor(kubeConfig, k8sClient),
pinnipedClient,
clock.RealClock{},
informers.kubeSystemNamespaceK8s.Core().V1().Pods(),
controllerlib.WithInformer,
),
singletonWorker,
)
// Return a function which starts the informers and controllers.
@ -216,19 +239,24 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
}, nil
}
// Create the rest config that will be used by the clients for the controllers.
func createConfig() (*rest.Config, error) {
// Load the Kubernetes client configuration.
kubeConfig, err := restclient.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("could not load in-cluster configuration: %w", err)
}
return kubeConfig, nil
}
// Create the k8s clients that will be used by the controllers.
func createClients() (
func createClients(kubeConfig *rest.Config) (
k8sClient *kubernetes.Clientset,
aggregatorClient *aggregatorclient.Clientset,
pinnipedClient *pinnipedclientset.Clientset,
err error,
) {
// Load the Kubernetes client configuration.
kubeConfig, err := restclient.InClusterConfig()
if err != nil {
return nil, nil, nil, fmt.Errorf("could not load in-cluster configuration: %w", err)
}
// explicitly use protobuf when talking to built-in kube APIs
protoKubeConfig := createProtoKubeConfig(kubeConfig)

View File

@ -147,17 +147,17 @@ func (a *App) runServer(ctx context.Context) error {
// post start hook of the aggregated API server.
startControllersFunc, err := controllermanager.PrepareControllers(
&controllermanager.Config{
ServerInstallationNamespace: serverInstallationNamespace,
NamesConfig: &cfg.NamesConfig,
DiscoveryURLOverride: cfg.DiscoveryInfo.URL,
DynamicServingCertProvider: dynamicServingCertProvider,
ServingCertDuration: time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds) * time.Second,
ServingCertRenewBefore: time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds) * time.Second,
IDPCache: idpCache,
KubeCertAgentTemplate: kubeCertAgentTemplate,
KubeCertAgentCertPathAnnotation: kubeCertAgentCertPathAnnotationKey,
KubeCertAgentKeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey,
KubeCertAgentDynamicSigningCertProvider: dynamicSigningCertProvider,
ServerInstallationNamespace: serverInstallationNamespace,
NamesConfig: &cfg.NamesConfig,
DiscoveryURLOverride: cfg.DiscoveryInfo.URL,
DynamicServingCertProvider: dynamicServingCertProvider,
DynamicSigningCertProvider: dynamicSigningCertProvider,
ServingCertDuration: time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds) * time.Second,
ServingCertRenewBefore: time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds) * time.Second,
IDPCache: idpCache,
KubeCertAgentTemplate: kubeCertAgentTemplate,
KubeCertAgentCertPathAnnotation: kubeCertAgentCertPathAnnotationKey,
KubeCertAgentKeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey,
},
)
if err != nil {