Add kube-cert-agent controller for getting kube API keypair

This commit is contained in:
Andrew Keesler 2020-09-21 14:16:14 -04:00
parent 49145791cc
commit 5a608cc84c
No known key found for this signature in database
GPG Key ID: 27CE0444346F9413
16 changed files with 2456 additions and 166 deletions

View File

@ -13,7 +13,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/deprecated/scheme" "k8s.io/client-go/deprecated/scheme"
@ -26,8 +25,8 @@ import (
"go.pinniped.dev/internal/constable" "go.pinniped.dev/internal/constable"
) )
// ErrNoKubeControllerManagerPod is returned when no kube-controller-manager pod is found on the cluster. // ErrNoKubeCertAgentPod is returned when no kube-cert-agent pod is found on the cluster.
const ErrNoKubeControllerManagerPod = constable.Error("did not find kube-controller-manager pod") const ErrNoKubeCertAgentPod = constable.Error("did not find kube-cert-agent pod")
const ErrIncapableOfIssuingCertificates = constable.Error("this cluster is not currently capable of issuing certificates") const ErrIncapableOfIssuingCertificates = constable.Error("this cluster is not currently capable of issuing certificates")
const k8sAPIServerCACertPEMDefaultPath = "/etc/kubernetes/ca/ca.pem" const k8sAPIServerCACertPEMDefaultPath = "/etc/kubernetes/ca/ca.pem"
@ -79,7 +78,25 @@ func (s *kubeClientPodCommandExecutor) Exec(podNamespace string, podName string,
return stdoutBuf.String(), nil return stdoutBuf.String(), nil
} }
// AgentInfo is a data object that holds the fields necessary for a CA to communicate with an agent
// pod.
type AgentInfo struct {
// Namespace is the namespace in which the agent pod is running.
Namespace string
// LabelSelector is a label selector (e.g., "label-key=label=value") that can be used to filter
// the agent pods.
LabelSelector string
// CertPathAnnotation is the annotation used by the agent pod to indicate the path to the CA cert
// inside the pod.
CertPathAnnotation string
// KeyPathAnnotation is the annotation used by the agent pod to indicate the path to the CA key
// inside the pod.
KeyPathAnnotation string
}
type CA struct { type CA struct {
agentInfo *AgentInfo
kubeClient kubernetes.Interface kubeClient kubernetes.Interface
podCommandExecutor PodCommandExecutor podCommandExecutor PodCommandExecutor
@ -103,14 +120,18 @@ type FailureCallback func(error)
// API server's private key in case it failed previously or case the key has changed. It returns // API server's private key in case it failed previously or case the key has changed. It returns
// a function that can be used to shut down that goroutine. Future attempts made by that goroutine // a function that can be used to shut down that goroutine. Future attempts made by that goroutine
// to get the key will also result in success or failure callbacks. // to get the key will also result in success or failure callbacks.
//
// The CA will try to read (via cat(1)) the kube API server's private key from an agent pod located
// via the provided agentInfo.
func New( func New(
agentInfo *AgentInfo,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
podCommandExecutor PodCommandExecutor, podCommandExecutor PodCommandExecutor,
tick <-chan time.Time, tick <-chan time.Time,
onSuccessfulRefresh SuccessCallback, onSuccessfulRefresh SuccessCallback,
onFailedRefresh FailureCallback, onFailedRefresh FailureCallback,
) (*CA, ShutdownFunc) { ) (*CA, ShutdownFunc) {
signer, err := createSignerWithAPIServerSecret(kubeClient, podCommandExecutor) signer, err := createSignerWithAPIServerSecret(agentInfo, kubeClient, podCommandExecutor)
if err != nil { if err != nil {
klog.Errorf("could not initially fetch the API server's signing key: %s", err) klog.Errorf("could not initially fetch the API server's signing key: %s", err)
signer = nil signer = nil
@ -119,6 +140,7 @@ func New(
onSuccessfulRefresh() onSuccessfulRefresh()
} }
result := &CA{ result := &CA{
agentInfo: agentInfo,
kubeClient: kubeClient, kubeClient: kubeClient,
podCommandExecutor: podCommandExecutor, podCommandExecutor: podCommandExecutor,
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
@ -131,15 +153,19 @@ func New(
return result, result.shutdownRefresh return result, result.shutdownRefresh
} }
func createSignerWithAPIServerSecret(kubeClient kubernetes.Interface, podCommandExecutor PodCommandExecutor) (signer, error) { func createSignerWithAPIServerSecret(
agentInfo *AgentInfo,
kubeClient kubernetes.Interface,
podCommandExecutor PodCommandExecutor,
) (signer, error) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel() defer cancel()
pod, err := findControllerManagerPod(ctx, kubeClient) pod, err := findCertAgentPod(ctx, kubeClient, agentInfo.Namespace, agentInfo.LabelSelector)
if err != nil { if err != nil {
return nil, err return nil, err
} }
certPath, keyPath := getKeypairFilePaths(pod) certPath, keyPath := getKeypairFilePaths(pod, agentInfo)
certPEM, err := podCommandExecutor.Exec(pod.Namespace, pod.Name, "cat", certPath) certPEM, err := podCommandExecutor.Exec(pod.Namespace, pod.Name, "cat", certPath)
if err != nil { if err != nil {
@ -167,7 +193,11 @@ func (c *CA) refreshLoop(tick <-chan time.Time) {
} }
func (c *CA) updateSigner() { func (c *CA) updateSigner() {
newSigner, err := createSignerWithAPIServerSecret(c.kubeClient, c.podCommandExecutor) newSigner, err := createSignerWithAPIServerSecret(
c.agentInfo,
c.kubeClient,
c.podCommandExecutor,
)
if err != nil { if err != nil {
klog.Errorf("could not create signer with API server secret: %s", err) klog.Errorf("could not create signer with API server secret: %s", err)
c.onFailedRefresh(err) c.onFailedRefresh(err)
@ -198,36 +228,35 @@ func (c *CA) IssuePEM(subject pkix.Name, dnsNames []string, ttl time.Duration) (
return signer.IssuePEM(subject, dnsNames, ttl) return signer.IssuePEM(subject, dnsNames, ttl)
} }
func findControllerManagerPod(ctx context.Context, kubeClient kubernetes.Interface) (*v1.Pod, error) { func findCertAgentPod(ctx context.Context, kubeClient kubernetes.Interface, namespace, labelSelector string) (*v1.Pod, error) {
pods, err := kubeClient.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{ pods, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: "component=kube-controller-manager", LabelSelector: labelSelector,
FieldSelector: "status.phase=Running", FieldSelector: "status.phase=Running",
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("could not check for kube-controller-manager pod: %w", err) return nil, fmt.Errorf("could not check for kube-cert-agent pod: %w", err)
} }
for _, pod := range pods.Items { for _, pod := range pods.Items {
return &pod, nil return &pod, nil
} }
return nil, ErrNoKubeControllerManagerPod return nil, ErrNoKubeCertAgentPod
}
func getKeypairFilePaths(pod *v1.Pod, agentInfo *AgentInfo) (string, string) {
annotations := pod.Annotations
if annotations == nil {
annotations = make(map[string]string)
}
certPath, ok := annotations[agentInfo.CertPathAnnotation]
if !ok {
certPath = k8sAPIServerCACertPEMDefaultPath
}
keyPath, ok := annotations[agentInfo.KeyPathAnnotation]
if !ok {
keyPath = k8sAPIServerCAKeyPEMDefaultPath
} }
func getKeypairFilePaths(pod *v1.Pod) (string, string) {
certPath := getContainerArgByName(pod, "cluster-signing-cert-file", k8sAPIServerCACertPEMDefaultPath)
keyPath := getContainerArgByName(pod, "cluster-signing-key-file", k8sAPIServerCAKeyPEMDefaultPath)
return certPath, keyPath return certPath, keyPath
} }
func getContainerArgByName(pod *v1.Pod, name string, defaultValue string) string {
for _, container := range pod.Spec.Containers {
flagset := pflag.NewFlagSet("", pflag.ContinueOnError)
flagset.ParseErrorsWhitelist = pflag.ParseErrorsWhitelist{UnknownFlags: true}
var val string
flagset.StringVar(&val, name, "", "")
_ = flagset.Parse(append(container.Command, container.Args...))
if val != "" {
return val
}
}
return defaultValue
}

View File

@ -102,6 +102,7 @@ func TestCA(t *testing.T) {
var neverTicker <-chan time.Time var neverTicker <-chan time.Time
var callbacks *callbackRecorder var callbacks *callbackRecorder
var logger *testutil.TranscriptLogger var logger *testutil.TranscriptLogger
var agentInfo AgentInfo
var requireInitialFailureLogMessage = func(specificErrorMessage string) { var requireInitialFailureLogMessage = func(specificErrorMessage string) {
r.Len(logger.Transcript(), 1) r.Len(logger.Transcript(), 1)
@ -136,18 +137,26 @@ func TestCA(t *testing.T) {
fakeCert2PEM = loadFile("./testdata/test2.crt") fakeCert2PEM = loadFile("./testdata/test2.crt")
fakeKey2PEM = loadFile("./testdata/test2.key") fakeKey2PEM = loadFile("./testdata/test2.key")
agentInfo = AgentInfo{
Namespace: "some-agent-namespace",
LabelSelector: "some-label-key=some-label-value",
CertPathAnnotation: "some-cert-path-annotation",
KeyPathAnnotation: "some-key-path-annotation",
}
fakePod = &corev1.Pod{ fakePod = &corev1.Pod{
TypeMeta: metav1.TypeMeta{}, TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "fake-pod", Name: "fake-pod",
Namespace: "kube-system", Namespace: agentInfo.Namespace,
Labels: map[string]string{"component": "kube-controller-manager"}, Labels: map[string]string{
"some-label-key": "some-label-value",
},
}, },
Spec: corev1.PodSpec{ Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "kube-controller-manager"}}, Containers: []corev1.Container{{Name: "some-agent-container-name"}},
}, },
Status: corev1.PodStatus{ Status: corev1.PodStatus{
Phase: "Running", Phase: corev1.PodRunning,
}, },
} }
@ -172,7 +181,7 @@ func TestCA(t *testing.T) {
klog.SetLogger(nil) klog.SetLogger(nil)
}) })
when("the kube-controller-manager pod is found with default CLI flag values", func() { when("the agent pod is found with default CLI flag values", func() {
it.Before(func() { it.Before(func() {
err := kubeAPIClient.Tracker().Add(fakePod) err := kubeAPIClient.Tracker().Add(fakePod)
r.NoError(err) r.NoError(err)
@ -182,16 +191,16 @@ func TestCA(t *testing.T) {
it("finds the API server's signing key and uses it to issue certificates", func() { it("finds the API server's signing key and uses it to issue certificates", func() {
fakeTicker := make(chan time.Time) fakeTicker := make(chan time.Time)
subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
r.Equal(2, fakeExecutor.callCount) r.Equal(2, fakeExecutor.callCount)
r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[0]) r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[0])
r.Equal("fake-pod", fakeExecutor.calledWithPodName[0]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[0])
r.Equal([]string{"cat", "/etc/kubernetes/ca/ca.pem"}, fakeExecutor.calledWithCommandAndArgs[0]) r.Equal([]string{"cat", "/etc/kubernetes/ca/ca.pem"}, fakeExecutor.calledWithCommandAndArgs[0])
r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[1]) r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[1])
r.Equal("fake-pod", fakeExecutor.calledWithPodName[1]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[1])
r.Equal([]string{"cat", "/etc/kubernetes/ca/ca.key"}, fakeExecutor.calledWithCommandAndArgs[1]) r.Equal([]string{"cat", "/etc/kubernetes/ca/ca.key"}, fakeExecutor.calledWithCommandAndArgs[1])
@ -256,7 +265,7 @@ func TestCA(t *testing.T) {
it("logs an error message", func() { it("logs an error message", func() {
fakeTicker := make(chan time.Time) fakeTicker := make(chan time.Time)
subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
r.Equal(2, fakeExecutor.callCount) r.Equal(2, fakeExecutor.callCount)
r.Equal(1, callbacks.NumberOfTimesSuccessCalled()) r.Equal(1, callbacks.NumberOfTimesSuccessCalled())
@ -294,7 +303,7 @@ func TestCA(t *testing.T) {
it("logs an error message and fails to issue certs until it can get the API server's keypair", func() { it("logs an error message and fails to issue certs until it can get the API server's keypair", func() {
fakeTicker := make(chan time.Time) fakeTicker := make(chan time.Time)
subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure) subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, fakeTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
r.Equal(1, fakeExecutor.callCount) r.Equal(1, fakeExecutor.callCount)
r.Equal(0, callbacks.NumberOfTimesSuccessCalled()) r.Equal(0, callbacks.NumberOfTimesSuccessCalled())
@ -334,7 +343,7 @@ func TestCA(t *testing.T) {
}) })
it("returns a CA who cannot issue certs", func() { it("returns a CA who cannot issue certs", func() {
subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
requireInitialFailureLogMessage("could not load CA: tls: failed to find any PEM data in certificate input") requireInitialFailureLogMessage("could not load CA: tls: failed to find any PEM data in certificate input")
requireNotCapableOfIssuingCerts(subject) requireNotCapableOfIssuingCerts(subject)
@ -350,7 +359,7 @@ func TestCA(t *testing.T) {
}) })
it("returns a CA who cannot issue certs", func() { it("returns a CA who cannot issue certs", func() {
subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
requireInitialFailureLogMessage("some error") requireInitialFailureLogMessage("some error")
requireNotCapableOfIssuingCerts(subject) requireNotCapableOfIssuingCerts(subject)
@ -366,7 +375,7 @@ func TestCA(t *testing.T) {
}) })
it("returns a CA who cannot issue certs", func() { it("returns a CA who cannot issue certs", func() {
subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
requireInitialFailureLogMessage("some error") requireInitialFailureLogMessage("some error")
requireNotCapableOfIssuingCerts(subject) requireNotCapableOfIssuingCerts(subject)
@ -377,72 +386,40 @@ func TestCA(t *testing.T) {
}) })
}) })
when("the kube-controller-manager pod is found with non-default CLI flag values", func() { when("the agent pod is found with non-default CLI flag values", func() {
it.Before(func() { it.Before(func() {
fakePod.Spec.Containers[0].Command = []string{ fakePod.Annotations = make(map[string]string)
"kube-controller-manager", fakePod.Annotations[agentInfo.CertPathAnnotation] = "/etc/kubernetes/ca/non-default.pem"
"--cluster-signing-cert-file=/etc/kubernetes/ca/non-default.pem", fakePod.Annotations[agentInfo.KeyPathAnnotation] = "/etc/kubernetes/ca/non-default.key"
}
fakePod.Spec.Containers[0].Args = []string{
"--cluster-signing-key-file=/etc/kubernetes/ca/non-default.key",
}
err := kubeAPIClient.Tracker().Add(fakePod) err := kubeAPIClient.Tracker().Add(fakePod)
r.NoError(err) r.NoError(err)
}) })
it("finds the API server's signing key and uses it to issue certificates", func() { it("finds the API server's signing key and uses it to issue certificates", func() {
_, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) _, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
r.Equal(2, fakeExecutor.callCount) r.Equal(2, fakeExecutor.callCount)
r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[0]) r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[0])
r.Equal("fake-pod", fakeExecutor.calledWithPodName[0]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[0])
r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.pem"}, fakeExecutor.calledWithCommandAndArgs[0]) r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.pem"}, fakeExecutor.calledWithCommandAndArgs[0])
r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[1]) r.Equal(agentInfo.Namespace, fakeExecutor.calledWithPodNamespace[1])
r.Equal("fake-pod", fakeExecutor.calledWithPodName[1]) r.Equal("fake-pod", fakeExecutor.calledWithPodName[1])
r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.key"}, fakeExecutor.calledWithCommandAndArgs[1]) r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.key"}, fakeExecutor.calledWithCommandAndArgs[1])
}) })
}) })
when("the kube-controller-manager pod is found with non-default CLI flag values separated by spaces", func() { when("the agent pod is not found", func() {
it.Before(func() {
fakePod.Spec.Containers[0].Command = []string{
"kube-controller-manager",
"--cluster-signing-cert-file", "/etc/kubernetes/ca/non-default.pem",
"--cluster-signing-key-file", "/etc/kubernetes/ca/non-default.key",
"--foo=bar",
}
err := kubeAPIClient.Tracker().Add(fakePod)
r.NoError(err)
})
it("finds the API server's signing key and uses it to issue certificates", func() {
_, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc()
r.Equal(2, fakeExecutor.callCount)
r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[0])
r.Equal("fake-pod", fakeExecutor.calledWithPodName[0])
r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.pem"}, fakeExecutor.calledWithCommandAndArgs[0])
r.Equal("kube-system", fakeExecutor.calledWithPodNamespace[1])
r.Equal("fake-pod", fakeExecutor.calledWithPodName[1])
r.Equal([]string{"cat", "/etc/kubernetes/ca/non-default.key"}, fakeExecutor.calledWithCommandAndArgs[1])
})
})
when("the kube-controller-manager pod is not found", func() {
it("returns an error", func() { it("returns an error", func() {
subject, shutdownFunc := New(kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure) subject, shutdownFunc := New(&agentInfo, kubeAPIClient, fakeExecutor, neverTicker, callbacks.OnSuccess, callbacks.OnFailure)
defer shutdownFunc() defer shutdownFunc()
requireInitialFailureLogMessage("did not find kube-controller-manager pod") requireInitialFailureLogMessage("did not find kube-cert-agent pod")
requireNotCapableOfIssuingCerts(subject) requireNotCapableOfIssuingCerts(subject)
r.Equal(0, callbacks.NumberOfTimesSuccessCalled()) r.Equal(0, callbacks.NumberOfTimesSuccessCalled())
r.Equal(1, callbacks.NumberOfTimesFailureCalled()) r.Equal(1, callbacks.NumberOfTimesFailureCalled())
r.EqualError(callbacks.FailureErrors()[0], "did not find kube-controller-manager pod") r.EqualError(callbacks.FailureErrors()[0], "did not find kube-cert-agent pod")
}) })
}) })
}, spec.Sequential(), spec.Report(report.Terminal{})) }, spec.Sequential(), spec.Report(report.Terminal{}))

View File

@ -0,0 +1,178 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"context"
"fmt"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
pinnipedcontroller "go.pinniped.dev/internal/controller"
"go.pinniped.dev/internal/controllerlib"
)
type annotaterController struct {
agentInfo *Info
k8sClient kubernetes.Interface
podInformer corev1informers.PodInformer
}
// NewAnnotaterController returns a controller that updates agent pods with the path to the kube
// API's certificate and key.
//
// This controller will add annotations to agent pods, using the provided
// agentInfo.CertPathAnnotation and agentInfo.KeyPathAnnotation annotation keys, with the best-guess
// paths to the kube API's certificate and key.
func NewAnnotaterController(
agentInfo *Info,
k8sClient kubernetes.Interface,
podInformer corev1informers.PodInformer,
withInformer pinnipedcontroller.WithInformerOptionFunc,
) controllerlib.Controller {
return controllerlib.New(
controllerlib.Config{
Name: "kube-cert-agent-annotater-controller",
Syncer: &annotaterController{
agentInfo: agentInfo,
k8sClient: k8sClient,
podInformer: podInformer,
},
},
withInformer(
podInformer,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels)
}),
controllerlib.InformerOption{},
),
)
}
// Sync implements controllerlib.Syncer.
func (c *annotaterController) Sync(ctx controllerlib.Context) error {
agentSelector := labels.SelectorFromSet(c.agentInfo.Template.Labels)
agentPods, err := c.podInformer.
Lister().
Pods(ControllerManagerNamespace).
List(agentSelector)
if err != nil {
return fmt.Errorf("informer cannot list agent pods: %w", err)
}
for _, agentPod := range agentPods {
controllerManagerPod, err := findControllerManagerPod(agentPod, c.podInformer)
if err != nil {
return err
}
if controllerManagerPod == nil {
// The deleter will clean this orphaned agent.
continue
}
certPath, certPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-cert-file")
keyPath, keyPathOK := getContainerArgByName(controllerManagerPod, "cluster-signing-key-file")
if err := c.maybeUpdateAgentPod(
ctx.Context,
agentPod.Name,
certPath,
certPathOK,
keyPath,
keyPathOK,
); err != nil {
return fmt.Errorf("cannot update agent pod: %w", err)
}
}
return nil
}
func (c *annotaterController) maybeUpdateAgentPod(
ctx context.Context,
name string,
certPath string,
certPathOK bool,
keyPath string,
keyPathOK bool,
) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
agentPod, err := c.podInformer.Lister().Pods(ControllerManagerNamespace).Get(name)
if err != nil {
return err
}
if (certPathOK && agentPod.Annotations[c.agentInfo.CertPathAnnotation] != certPath) ||
(keyPathOK && agentPod.Annotations[c.agentInfo.KeyPathAnnotation] != keyPath) {
if err := c.reallyUpdateAgentPod(
ctx,
agentPod,
certPath,
certPathOK,
keyPath,
keyPathOK,
); err != nil {
return err
}
}
return nil
})
}
func (c *annotaterController) reallyUpdateAgentPod(
ctx context.Context,
agentPod *corev1.Pod,
certPath string,
certPathOK bool,
keyPath string,
keyPathOK bool,
) error {
// Create a deep copy of the agent pod since it is coming straight from the cache.
updatedAgentPod := agentPod.DeepCopy()
if updatedAgentPod.Annotations == nil {
updatedAgentPod.Annotations = make(map[string]string)
}
if certPathOK {
updatedAgentPod.Annotations[c.agentInfo.CertPathAnnotation] = certPath
}
if keyPathOK {
updatedAgentPod.Annotations[c.agentInfo.KeyPathAnnotation] = keyPath
}
klog.InfoS(
"updating agent pod annotations",
"pod",
klog.KObj(updatedAgentPod),
"certPath",
certPath,
"keyPath",
keyPath,
)
_, err := c.k8sClient.
CoreV1().
Pods(ControllerManagerNamespace).
Update(ctx, updatedAgentPod, metav1.UpdateOptions{})
return err
}
func getContainerArgByName(pod *corev1.Pod, name string) (string, bool) {
for _, container := range pod.Spec.Containers {
flagset := pflag.NewFlagSet("", pflag.ContinueOnError)
flagset.ParseErrorsWhitelist = pflag.ParseErrorsWhitelist{UnknownFlags: true}
var val string
flagset.StringVar(&val, name, "", "")
_ = flagset.Parse(append(container.Command, container.Args...))
if val != "" {
return val, true
}
}
return "", false
}

View File

@ -0,0 +1,490 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"context"
"testing"
"time"
"github.com/sclevine/spec"
"github.com/sclevine/spec/report"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/testutil"
)
func TestAnnotaterControllerFilter(t *testing.T) {
runFilterTest(
t,
"AnnotaterControllerFilter",
func(
agentPodTemplate *corev1.Pod,
podsInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption,
) {
_ = NewAnnotaterController(
&Info{
Template: agentPodTemplate,
},
nil, // k8sClient, shouldn't matter
podsInformer,
observableWithInformerOption.WithInformer,
)
},
)
}
func TestAnnotaterControllerSync(t *testing.T) {
spec.Run(t, "AnnotaterControllerSync", func(t *testing.T, when spec.G, it spec.S) {
const kubeSystemNamespace = "kube-system"
const (
certPath = "some-cert-path"
certPathAnnotation = "some-cert-path-annotation"
keyPath = "some-key-path"
keyPathAnnotation = "some-key-path-annotation"
)
var r *require.Assertions
var subject controllerlib.Controller
var kubeAPIClient *kubernetesfake.Clientset
var kubeInformerClient *kubernetesfake.Clientset
var kubeInformers kubeinformers.SharedInformerFactory
var timeoutContext context.Context
var timeoutContextCancel context.CancelFunc
var syncContext *controllerlib.Context
agentPodTemplate := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "some-agent-name-",
Labels: map[string]string{
"some-label-key": "some-label-value",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "some-agent-image",
},
},
},
}
controllerManagerPod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: kubeSystemNamespace,
Name: "some-controller-manager-name",
Labels: map[string]string{
"component": "kube-controller-manager",
},
UID: types.UID("some-controller-manager-uid"),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "some-controller-manager-image",
Command: []string{
"kube-controller-manager",
"--cluster-signing-cert-file=" + certPath,
"--cluster-signing-key-file=" + keyPath,
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "some-volume-mount-name",
},
},
},
},
NodeName: "some-node-name",
NodeSelector: map[string]string{
"some-node-selector-key": "some-node-selector-value",
},
Tolerations: []corev1.Toleration{
{
Key: "some-toleration",
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
// fnv 32a hash of controller-manager uid
controllerManagerPodHash := "fbb0addd"
agentPod := agentPodTemplate.DeepCopy()
agentPod.Namespace = kubeSystemNamespace
agentPod.Name += controllerManagerPodHash
agentPod.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: controllerManagerPod.Name,
UID: controllerManagerPod.UID,
Controller: boolPtr(true),
BlockOwnerDeletion: boolPtr(true),
},
}
agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts
agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever
agentPod.Spec.AutomountServiceAccountToken = boolPtr(false)
agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName
agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector
agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations
podsGVR := schema.GroupVersionResource{
Group: corev1.SchemeGroupVersion.Group,
Version: corev1.SchemeGroupVersion.Version,
Resource: "pods",
}
// Defer starting the informers until the last possible moment so that the
// nested Before's can keep adding things to the informer caches.
var startInformersAndController = func() {
// Set this at the last second to allow for injection of server override.
subject = NewAnnotaterController(
&Info{
Template: agentPodTemplate,
CertPathAnnotation: certPathAnnotation,
KeyPathAnnotation: keyPathAnnotation,
},
kubeAPIClient,
kubeInformers.Core().V1().Pods(),
controllerlib.WithInformer,
)
// Set this at the last second to support calling subject.Name().
syncContext = &controllerlib.Context{
Context: timeoutContext,
Name: subject.Name(),
Key: controllerlib.Key{
Namespace: kubeSystemNamespace,
Name: "should-not-matter",
},
}
// Must start informers before calling TestRunSynchronously()
kubeInformers.Start(timeoutContext.Done())
controllerlib.TestRunSynchronously(t, subject)
}
it.Before(func() {
r = require.New(t)
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
kubeInformerClient = kubernetesfake.NewSimpleClientset()
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
kubeAPIClient = kubernetesfake.NewSimpleClientset()
// Add a pod into the test that doesn't matter to make sure we don't accidentally trigger any
// logic on this thing.
ignorablePod := corev1.Pod{}
ignorablePod.Name = "some-ignorable-pod"
r.NoError(kubeInformerClient.Tracker().Add(&ignorablePod))
r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod))
})
it.After(func() {
timeoutContextCancel()
})
when("there is an agent pod without annotations set", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod))
})
when("there is a matching controller manager pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the annotations according to the controller manager pod", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[certPathAnnotation] = certPath
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
kubeSystemNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is a controller manager pod with CLI flag values separated by spaces", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].Command = []string{
"kube-controller-manager",
"--cluster-signing-cert-file", certPath,
"--cluster-signing-key-file", keyPath,
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the annotations according to the controller manager pod", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[certPathAnnotation] = certPath
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
kubeSystemNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is a controller manager pod with unparsable CLI flags", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].Command = []string{
"kube-controller-manager",
"--cluster-signing-cert-file-blah", certPath,
"--cluster-signing-key-file-blah", keyPath,
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("does not update any annotations", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
when("there is a controller manager pod with unparsable cert CLI flag", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].Command = []string{
"kube-controller-manager",
"--cluster-signing-cert-file-blah", certPath,
"--cluster-signing-key-file", keyPath,
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the key annotation", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
kubeSystemNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is a controller manager pod with unparsable keey CLI flag", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].Command = []string{
"kube-controller-manager",
"--cluster-signing-cert-file", certPath,
"--cluster-signing-key-file-blah", keyPath,
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the cert annotation", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations = make(map[string]string)
updatedAgentPod.Annotations[certPathAnnotation] = certPath
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
kubeSystemNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is a non-matching controller manager pod via uid", func() {
it.Before(func() {
controllerManagerPod.UID = "some-other-controller-manager-uid"
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("does nothing; the deleter will delete this pod to trigger resync", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
when("there is a non-matching controller manager pod via name", func() {
it.Before(func() {
controllerManagerPod.Name = "some-other-controller-manager-name"
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("does nothing; the deleter will delete this pod to trigger resync", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
})
when("there is an agent pod with correct annotations set", func() {
it.Before(func() {
agentPod.Annotations = make(map[string]string)
agentPod.Annotations[certPathAnnotation] = certPath
agentPod.Annotations[keyPathAnnotation] = keyPath
r.NoError(kubeInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod))
})
when("there is a matching controller manager pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("does nothing since the pod is up to date", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
})
when("there is an agent pod with the wrong cert annotation", func() {
it.Before(func() {
agentPod.Annotations = make(map[string]string)
agentPod.Annotations[certPathAnnotation] = "wrong"
agentPod.Annotations[keyPathAnnotation] = keyPath
r.NoError(kubeInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod))
})
when("there is a matching controller manager pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the agent with the correct cert annotation", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations[certPathAnnotation] = certPath
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
kubeSystemNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
})
when("there is an agent pod with the wrong key annotation", func() {
it.Before(func() {
agentPod.Annotations = make(map[string]string)
agentPod.Annotations[certPathAnnotation] = certPath
agentPod.Annotations[keyPathAnnotation] = "key"
r.NoError(kubeInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod))
})
when("there is a matching controller manager pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("updates the agent with the correct key annotation", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Annotations[keyPathAnnotation] = keyPath
r.Equal(
[]coretesting.Action{
coretesting.NewUpdateAction(
podsGVR,
kubeSystemNamespace,
updatedAgentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
})
}, spec.Parallel(), spec.Report(report.Terminal{}))
}

View File

@ -0,0 +1,95 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
pinnipedcontroller "go.pinniped.dev/internal/controller"
"go.pinniped.dev/internal/controllerlib"
)
type createrController struct {
agentInfo *Info
k8sClient kubernetes.Interface
podInformer corev1informers.PodInformer
}
// NewCreaterController returns a controller that creates new kube-cert-agent pods for every known
// kube-controller-manager pod.
//
// This controller only uses the Template field of the provided agentInfo.
func NewCreaterController(
agentInfo *Info,
k8sClient kubernetes.Interface,
podInformer corev1informers.PodInformer,
withInformer pinnipedcontroller.WithInformerOptionFunc,
) controllerlib.Controller {
return controllerlib.New(
controllerlib.Config{
//nolint: misspell
Name: "kube-cert-agent-creater-controller",
Syncer: &createrController{
agentInfo: agentInfo,
k8sClient: k8sClient,
podInformer: podInformer,
},
},
withInformer(
podInformer,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels)
}),
controllerlib.InformerOption{},
),
)
}
// Sync implements controllerlib.Syncer.
func (c *createrController) Sync(ctx controllerlib.Context) error {
controllerManagerSelector, err := labels.Parse("component=kube-controller-manager")
if err != nil {
return fmt.Errorf("cannot create controller manager selector: %w", err)
}
controllerManagerPods, err := c.podInformer.Lister().List(controllerManagerSelector)
if err != nil {
return fmt.Errorf("informer cannot list controller manager pods: %w", err)
}
for _, controllerManagerPod := range controllerManagerPods {
agentPod, err := findAgentPod(controllerManagerPod, c.podInformer, c.agentInfo.Template.Labels)
if err != nil {
return err
}
if agentPod == nil {
agentPod = newAgentPod(controllerManagerPod, c.agentInfo.Template)
klog.InfoS(
"creating agent pod",
"pod",
klog.KObj(agentPod),
"controller",
klog.KObj(controllerManagerPod),
)
_, err := c.k8sClient.CoreV1().
Pods(ControllerManagerNamespace).
Create(ctx.Context, agentPod, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("cannot create agent pod: %w", err)
}
}
// The deleter controller handles the case where the expected fields do not match in the agent
// pod.
}
return nil
}

View File

@ -0,0 +1,286 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"context"
"testing"
"time"
"github.com/sclevine/spec"
"github.com/sclevine/spec/report"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/testutil"
)
func TestCreaterControllerFilter(t *testing.T) {
runFilterTest(
t,
"CreaterControllerFilter",
func(
agentPodTemplate *corev1.Pod,
podsInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption,
) {
_ = NewCreaterController(
&Info{
Template: agentPodTemplate,
},
nil, // k8sClient, shouldn't matter
podsInformer,
observableWithInformerOption.WithInformer,
)
},
)
}
func TestCreaterControllerSync(t *testing.T) {
spec.Run(t, "CreaterControllerSync", func(t *testing.T, when spec.G, it spec.S) {
const kubeSystemNamespace = "kube-system"
var r *require.Assertions
var subject controllerlib.Controller
var kubeAPIClient *kubernetesfake.Clientset
var kubeInformerClient *kubernetesfake.Clientset
var kubeInformers kubeinformers.SharedInformerFactory
var timeoutContext context.Context
var timeoutContextCancel context.CancelFunc
var syncContext *controllerlib.Context
agentPodTemplate := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "some-agent-name-",
Labels: map[string]string{
"some-label-key": "some-label-value",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "some-agent-image",
},
},
},
}
controllerManagerPod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: kubeSystemNamespace,
Name: "some-controller-manager-name",
Labels: map[string]string{
"component": "kube-controller-manager",
},
UID: types.UID("some-controller-manager-uid"),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "some-controller-manager-image",
VolumeMounts: []corev1.VolumeMount{
{
Name: "some-volume-mount-name",
},
},
},
},
NodeName: "some-node-name",
NodeSelector: map[string]string{
"some-node-selector-key": "some-node-selector-value",
},
Tolerations: []corev1.Toleration{
{
Key: "some-toleration",
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
// fnv 32a hash of controller-manager uid
controllerManagerPodHash := "fbb0addd"
agentPod := agentPodTemplate.DeepCopy()
agentPod.Namespace = kubeSystemNamespace
agentPod.Name += controllerManagerPodHash
agentPod.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: controllerManagerPod.Name,
UID: controllerManagerPod.UID,
Controller: boolPtr(true),
BlockOwnerDeletion: boolPtr(true),
},
}
agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts
agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever
agentPod.Spec.AutomountServiceAccountToken = boolPtr(false)
agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName
agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector
agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations
podsGVR := schema.GroupVersionResource{
Group: corev1.SchemeGroupVersion.Group,
Version: corev1.SchemeGroupVersion.Version,
Resource: "pods",
}
// Defer starting the informers until the last possible moment so that the
// nested Before's can keep adding things to the informer caches.
var startInformersAndController = func() {
// Set this at the last second to allow for injection of server override.
subject = NewCreaterController(
&Info{
Template: agentPodTemplate,
},
kubeAPIClient,
kubeInformers.Core().V1().Pods(),
controllerlib.WithInformer,
)
// Set this at the last second to support calling subject.Name().
syncContext = &controllerlib.Context{
Context: timeoutContext,
Name: subject.Name(),
Key: controllerlib.Key{
Namespace: kubeSystemNamespace,
Name: "should-not-matter",
},
}
// Must start informers before calling TestRunSynchronously()
kubeInformers.Start(timeoutContext.Done())
controllerlib.TestRunSynchronously(t, subject)
}
it.Before(func() {
r = require.New(t)
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
kubeInformerClient = kubernetesfake.NewSimpleClientset()
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
kubeAPIClient = kubernetesfake.NewSimpleClientset()
// Add a pod into the test that doesn't matter to make sure we don't accidentally trigger any
// logic on this thing.
ignorablePod := corev1.Pod{}
ignorablePod.Name = "some-ignorable-pod"
r.NoError(kubeInformerClient.Tracker().Add(&ignorablePod))
r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod))
// Add another valid agent pod to make sure our logic works for just the pod we care about.
otherAgentPod := agentPod.DeepCopy()
otherAgentPod.Name = "some-other-agent"
otherAgentPod.OwnerReferences[0].Name = "some-other-controller-manager-name"
otherAgentPod.OwnerReferences[0].UID = "some-other-controller-manager-uid"
r.NoError(kubeInformerClient.Tracker().Add(otherAgentPod))
r.NoError(kubeAPIClient.Tracker().Add(otherAgentPod))
})
it.After(func() {
timeoutContextCancel()
})
when("there is a controller manager pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
when("there is a matching agent pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod))
})
it("does nothing", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
when("there is a non-matching agent pod", func() {
it.Before(func() {
nonMatchingAgentPod := agentPod.DeepCopy()
nonMatchingAgentPod.Name = "some-agent-name-85da432e"
nonMatchingAgentPod.OwnerReferences[0].UID = "some-non-matching-uid"
r.NoError(kubeInformerClient.Tracker().Add(nonMatchingAgentPod))
r.NoError(kubeAPIClient.Tracker().Add(nonMatchingAgentPod))
})
it("creates a matching agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewCreateAction(
podsGVR,
kubeSystemNamespace,
agentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is no matching agent pod", func() {
it("creates a matching agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewCreateAction(
podsGVR,
kubeSystemNamespace,
agentPod,
),
},
kubeAPIClient.Actions(),
)
})
})
})
when("there is no controller manager pod", func() {
it("does nothing; the deleter controller will cleanup any leftover agent pods", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
}, spec.Parallel(), spec.Report(report.Terminal{}))
}

View File

@ -0,0 +1,84 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
pinnipedcontroller "go.pinniped.dev/internal/controller"
"go.pinniped.dev/internal/controllerlib"
)
type deleterController struct {
agentInfo *Info
k8sClient kubernetes.Interface
podInformer corev1informers.PodInformer
}
// NewDeleterController returns a controller that deletes any kube-cert-agent pods that are out of
// sync with the known kube-controller-manager pods.
//
// This controller only uses the Template field of the provided agentInfo.
func NewDeleterController(
agentInfo *Info,
k8sClient kubernetes.Interface,
podInformer corev1informers.PodInformer,
withInformer pinnipedcontroller.WithInformerOptionFunc,
) controllerlib.Controller {
return controllerlib.New(
controllerlib.Config{
Name: "kube-cert-agent-deleter-controller",
Syncer: &deleterController{
agentInfo: agentInfo,
k8sClient: k8sClient,
podInformer: podInformer,
},
},
withInformer(
podInformer,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
return isControllerManagerPod(obj) || isAgentPod(obj, agentInfo.Template.Labels)
}),
controllerlib.InformerOption{},
),
)
}
// Sync implements controllerlib.Syncer.
func (c *deleterController) Sync(ctx controllerlib.Context) error {
agentSelector := labels.SelectorFromSet(c.agentInfo.Template.Labels)
agentPods, err := c.podInformer.
Lister().
Pods(ControllerManagerNamespace).
List(agentSelector)
if err != nil {
return fmt.Errorf("informer cannot list agent pods: %w", err)
}
for _, agentPod := range agentPods {
controllerManagerPod, err := findControllerManagerPod(agentPod, c.podInformer)
if err != nil {
return err
}
if controllerManagerPod == nil ||
!isAgentPodUpToDate(agentPod, newAgentPod(controllerManagerPod, c.agentInfo.Template)) {
klog.InfoS("deleting agent pod", "pod", klog.KObj(agentPod))
err := c.k8sClient.
CoreV1().
Pods(ControllerManagerNamespace).
Delete(ctx.Context, agentPod.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("cannot delete agent pod: %w", err)
}
}
}
return nil
}

View File

@ -0,0 +1,491 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"context"
"testing"
"time"
"github.com/sclevine/spec"
"github.com/sclevine/spec/report"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/testutil"
)
func TestDeleterControllerFilter(t *testing.T) {
runFilterTest(
t,
"DeleterControllerFilter",
func(
agentPodTemplate *corev1.Pod,
podsInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption,
) {
_ = NewDeleterController(
&Info{
Template: agentPodTemplate,
},
nil, // k8sClient, shouldn't matter
podsInformer,
observableWithInformerOption.WithInformer,
)
},
)
}
func TestDeleterControllerSync(t *testing.T) {
spec.Run(t, "DeleterControllerSync", func(t *testing.T, when spec.G, it spec.S) {
const kubeSystemNamespace = "kube-system"
var r *require.Assertions
var subject controllerlib.Controller
var kubeAPIClient *kubernetesfake.Clientset
var kubeInformerClient *kubernetesfake.Clientset
var kubeInformers kubeinformers.SharedInformerFactory
var timeoutContext context.Context
var timeoutContextCancel context.CancelFunc
var syncContext *controllerlib.Context
agentPodTemplate := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "some-agent-name-",
Labels: map[string]string{
"some-label-key": "some-label-value",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "some-agent-image",
},
},
},
}
controllerManagerPod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: kubeSystemNamespace,
Name: "some-controller-manager-name",
Labels: map[string]string{
"component": "kube-controller-manager",
},
UID: types.UID("some-controller-manager-uid"),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "some-controller-manager-image",
VolumeMounts: []corev1.VolumeMount{
{
Name: "some-volume-mount-name",
},
},
},
},
NodeName: "some-node-name",
NodeSelector: map[string]string{
"some-node-selector-key": "some-node-selector-value",
},
Tolerations: []corev1.Toleration{
{
Key: "some-toleration",
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
podsGVR := schema.GroupVersionResource{
Group: corev1.SchemeGroupVersion.Group,
Version: corev1.SchemeGroupVersion.Version,
Resource: "pods",
}
// fnv 32a hash of controller-manager uid
controllerManagerPodHash := "fbb0addd"
agentPod := agentPodTemplate.DeepCopy()
agentPod.Namespace = kubeSystemNamespace
agentPod.Name += controllerManagerPodHash
agentPod.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: controllerManagerPod.Name,
UID: controllerManagerPod.UID,
Controller: boolPtr(true),
BlockOwnerDeletion: boolPtr(true),
},
}
agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts
agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever
agentPod.Spec.AutomountServiceAccountToken = boolPtr(false)
agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName
agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector
agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations
// Defer starting the informers until the last possible moment so that the
// nested Before's can keep adding things to the informer caches.
var startInformersAndController = func() {
// Set this at the last second to allow for injection of server override.
subject = NewDeleterController(
&Info{
Template: agentPodTemplate,
},
kubeAPIClient,
kubeInformers.Core().V1().Pods(),
controllerlib.WithInformer,
)
// Set this at the last second to support calling subject.Name().
syncContext = &controllerlib.Context{
Context: timeoutContext,
Name: subject.Name(),
Key: controllerlib.Key{
Namespace: kubeSystemNamespace,
Name: "should-not-matter",
},
}
// Must start informers before calling TestRunSynchronously()
kubeInformers.Start(timeoutContext.Done())
controllerlib.TestRunSynchronously(t, subject)
}
it.Before(func() {
r = require.New(t)
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
kubeInformerClient = kubernetesfake.NewSimpleClientset()
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
kubeAPIClient = kubernetesfake.NewSimpleClientset()
// Add an pod into the test that doesn't matter to make sure we don't accidentally
// trigger any logic on this thing.
ignorablePod := corev1.Pod{}
ignorablePod.Name = "some-ignorable-pod"
r.NoError(kubeInformerClient.Tracker().Add(&ignorablePod))
r.NoError(kubeAPIClient.Tracker().Add(&ignorablePod))
})
it.After(func() {
timeoutContextCancel()
})
when("there is an agent pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(agentPod))
r.NoError(kubeAPIClient.Tracker().Add(agentPod))
})
when("there is a matching controller manager pod", func() {
it.Before(func() {
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("does nothing", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
when("there is a non-matching controller manager pod via uid", func() {
it.Before(func() {
controllerManagerPod.UID = "some-other-controller-manager-uid"
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is a non-matching controller manager pod via name", func() {
it.Before(func() {
controllerManagerPod.Name = "some-other-controller-manager-name"
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("the agent pod is out of sync with the controller manager via volume mounts", func() {
it.Before(func() {
controllerManagerPod.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
{
Name: "some-other-volume-mount",
},
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("the agent pod is out of sync with the controller manager via volumes", func() {
it.Before(func() {
controllerManagerPod.Spec.Volumes = []corev1.Volume{
{
Name: "some-other-volume",
},
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("the agent pod is out of sync with the controller manager via node selector", func() {
it.Before(func() {
controllerManagerPod.Spec.NodeSelector = map[string]string{
"some-other-node-selector-key": "some-other-node-selector-value",
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("the agent pod is out of sync with the controller manager via node name", func() {
it.Before(func() {
controllerManagerPod.Spec.NodeName = "some-other-node-name"
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("the agent pod is out of sync with the controller manager via tolerations", func() {
it.Before(func() {
controllerManagerPod.Spec.Tolerations = []corev1.Toleration{
{
Key: "some-other-toleration-key",
},
}
r.NoError(kubeInformerClient.Tracker().Add(controllerManagerPod))
r.NoError(kubeAPIClient.Tracker().Add(controllerManagerPod))
})
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("the agent pod is out of sync via restart policy", func() {
it.Before(func() {
updatedAgentPod := agentPod.DeepCopy()
updatedAgentPod.Spec.RestartPolicy = corev1.RestartPolicyAlways
r.NoError(kubeInformerClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, updatedAgentPod, updatedAgentPod.Namespace))
})
it.Pend("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("the agent pod is out of sync via automount service account tokem", func() {
it.Before(func() {
agentPod.Spec.AutomountServiceAccountToken = boolPtr(true)
r.NoError(kubeInformerClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace))
r.NoError(kubeAPIClient.Tracker().Update(podsGVR, agentPod, agentPod.Namespace))
})
it.Pend("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
when("there is no matching controller manager pod", func() {
it("deletes the agent pod", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{
coretesting.NewDeleteAction(
podsGVR,
kubeSystemNamespace,
agentPod.Name,
),
},
kubeAPIClient.Actions(),
)
})
})
})
when("there is no agent pod", func() {
it("does nothing", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Equal(
[]coretesting.Action{},
kubeAPIClient.Actions(),
)
})
})
}, spec.Parallel(), spec.Report(report.Terminal{}))
}

View File

@ -0,0 +1,214 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
// Package kubecertagent provides controllers that ensure a set of pods (the kube-cert-agent), is
// colocated with the Kubernetes controller manager so that Pinniped can access its signing keys.
//
// Note: the controllers use a filter that accepts all pods that look like the controller manager or
// an agent pod, across any add/update/delete event. Each of the controllers only care about a
// subset of these events in reality, but the liberal filter implementation serves as an MVP.
package kubecertagent
import (
"encoding/hex"
"fmt"
"hash/fnv"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/klog/v2"
)
const (
// ControllerManagerNamespace is the assumed namespace of the kube-controller-manager pod(s).
ControllerManagerNamespace = "kube-system"
)
// Info holds necessary information about the agent pod. It was pulled out into a struct to have a
// common parameter for each controller.
type Info struct {
// Template is an injection point for pod fields. The required pod fields are as follows.
// .Name: serves as the name prefix for each of the agent pods
// .Labels: serves as a way to filter for agent pods
// .Spec.Containers[0].Image: serves as the container image for the agent pods
// .Spec.Containers[0].Command: serves as the container command for the agent pods
Template *corev1.Pod
// CertPathAnnotation is the name of the annotation key that will be used when setting the
// best-guess path to the kube API's certificate.
CertPathAnnotation string
// KeyPathAnnotation is the name of the annotation key that will be used when setting the
// best-guess path to the kube API's private key.
KeyPathAnnotation string
}
func isControllerManagerPod(obj metav1.Object) bool {
pod, ok := obj.(*corev1.Pod)
if !ok {
return false
}
if pod.Labels == nil {
return false
}
component, ok := pod.Labels["component"]
if !ok || component != "kube-controller-manager" {
return false
}
if pod.Status.Phase != corev1.PodRunning {
return false
}
return true
}
func isAgentPod(obj metav1.Object, agentLabels map[string]string) bool {
for agentLabelKey, agentLabelValue := range agentLabels {
v, ok := obj.GetLabels()[agentLabelKey]
if !ok {
return false
}
if v != agentLabelValue {
return false
}
}
return true
}
func newAgentPod(
controllerManagerPod *corev1.Pod,
template *corev1.Pod,
) *corev1.Pod {
agentPod := template.DeepCopy()
agentPod.Name = fmt.Sprintf("%s%s", agentPod.Name, hash(controllerManagerPod))
// controller manager namespace because it is our owner
agentPod.Namespace = ControllerManagerNamespace
agentPod.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(
controllerManagerPod,
schema.GroupVersionKind{
Group: corev1.SchemeGroupVersion.Group,
Version: corev1.SchemeGroupVersion.Version,
Kind: "Pod",
},
),
}
agentPod.Spec.Containers[0].VolumeMounts = controllerManagerPod.Spec.Containers[0].VolumeMounts
agentPod.Spec.Volumes = controllerManagerPod.Spec.Volumes
agentPod.Spec.RestartPolicy = corev1.RestartPolicyNever
agentPod.Spec.NodeSelector = controllerManagerPod.Spec.NodeSelector
agentPod.Spec.AutomountServiceAccountToken = boolPtr(false)
agentPod.Spec.NodeName = controllerManagerPod.Spec.NodeName
agentPod.Spec.Tolerations = controllerManagerPod.Spec.Tolerations
return agentPod
}
func isAgentPodUpToDate(actualAgentPod, expectedAgentPod *corev1.Pod) bool {
return equality.Semantic.DeepEqual(
actualAgentPod.Spec.Containers[0].VolumeMounts,
expectedAgentPod.Spec.Containers[0].VolumeMounts,
) &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.Volumes,
expectedAgentPod.Spec.Volumes,
) &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.RestartPolicy,
expectedAgentPod.Spec.RestartPolicy,
) &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.NodeSelector,
expectedAgentPod.Spec.NodeSelector,
) &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.AutomountServiceAccountToken,
expectedAgentPod.Spec.AutomountServiceAccountToken,
) &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.NodeName,
expectedAgentPod.Spec.NodeName,
) &&
equality.Semantic.DeepEqual(
actualAgentPod.Spec.Tolerations,
expectedAgentPod.Spec.Tolerations,
)
}
func findAgentPod(
controllerManagerPod *corev1.Pod,
informer corev1informers.PodInformer,
agentLabels map[string]string,
) (*corev1.Pod, error) {
agentSelector := labels.SelectorFromSet(agentLabels)
agentPods, err := informer.
Lister().
Pods(ControllerManagerNamespace).
List(agentSelector)
if err != nil {
return nil, fmt.Errorf("informer cannot list agent pods: %w", err)
}
for _, maybeAgentPod := range agentPods {
maybeControllerManagerPod, err := findControllerManagerPod(
maybeAgentPod,
informer,
)
if err != nil {
return nil, err
}
if maybeControllerManagerPod != nil &&
maybeControllerManagerPod.UID == controllerManagerPod.UID {
return maybeAgentPod, nil
}
}
return nil, nil
}
func findControllerManagerPod(
agentPod *corev1.Pod,
informer corev1informers.PodInformer,
) (*corev1.Pod, error) {
controller := metav1.GetControllerOf(agentPod)
if controller == nil {
klog.InfoS("found orphan agent pod", "pod", klog.KObj(agentPod))
return nil, nil
}
maybeControllerManagerPod, err := informer.
Lister().
Pods(ControllerManagerNamespace).
Get(controller.Name)
notFound := k8serrors.IsNotFound(err)
if err != nil && !notFound {
return nil, fmt.Errorf("cannot get controller pod: %w", err)
} else if notFound ||
maybeControllerManagerPod == nil ||
maybeControllerManagerPod.UID != controller.UID {
return nil, nil
}
return maybeControllerManagerPod, nil
}
func boolPtr(b bool) *bool { return &b }
func hash(controllerManagerPod *corev1.Pod) string {
// FNV should be faster than SHA, and we don't care about hash-reversibility here, and Kubernetes
// uses FNV for their pod templates, so should be good enough for us?
h := fnv.New32a()
_, _ = h.Write([]byte(controllerManagerPod.UID)) // Never returns an error, per godoc.
return hex.EncodeToString(h.Sum([]byte{}))
}

View File

@ -0,0 +1,138 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package kubecertagent
import (
"testing"
"github.com/sclevine/spec"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/testutil"
)
func runFilterTest(
t *testing.T,
name string,
newFunc func(
agentPodTemplate *corev1.Pod,
podsInformer corev1informers.PodInformer,
observableWithInformerOption *testutil.ObservableWithInformerOption,
),
) {
spec.Run(t, name, func(t *testing.T, when spec.G, it spec.S) {
var r *require.Assertions
var subject controllerlib.Filter
whateverPod := &corev1.Pod{}
it.Before(func() {
r = require.New(t)
agentPodTemplate := &corev1.Pod{}
agentPodTemplate.Labels = map[string]string{
"some-label-key": "some-label-value",
"some-other-label-key": "some-other-label-value",
}
podsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Pods()
observableWithInformerOption := testutil.NewObservableWithInformerOption()
newFunc(agentPodTemplate, podsInformer, observableWithInformerOption)
subject = observableWithInformerOption.GetFilterForInformer(podsInformer)
})
when("a pod with the proper controller manager labels and phase is added/updated/deleted", func() {
it("returns true", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"component": "kube-controller-manager",
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
r.True(subject.Add(pod))
r.True(subject.Update(whateverPod, pod))
r.True(subject.Update(pod, whateverPod))
r.True(subject.Delete(pod))
})
})
when("a pod without the proper controller manager label is added/updated/deleted", func() {
it("returns false", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
r.False(subject.Add(pod))
r.False(subject.Update(whateverPod, pod))
r.False(subject.Update(pod, whateverPod))
r.False(subject.Delete(pod))
})
})
when("a pod without the proper controller manager phase is added/updated/deleted", func() {
it("returns false", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"component": "kube-controller-manager",
},
},
}
r.False(subject.Add(pod))
r.False(subject.Update(whateverPod, pod))
r.False(subject.Update(pod, whateverPod))
r.False(subject.Delete(pod))
})
})
when("a pod with all the agent labels is added/updated/deleted", func() {
it("returns true", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"some-label-key": "some-label-value",
"some-other-label-key": "some-other-label-value",
},
},
}
r.True(subject.Add(pod))
r.True(subject.Update(whateverPod, pod))
r.True(subject.Update(pod, whateverPod))
r.True(subject.Delete(pod))
})
})
when("a pod missing one of the agent labels is added/updated/deleted", func() {
it("returns false", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"some-other-label-key": "some-other-label-value",
},
},
}
r.False(subject.Add(pod))
r.False(subject.Update(whateverPod, pod))
r.False(subject.Update(pod, whateverPod))
r.False(subject.Delete(pod))
})
})
})
}

View File

@ -1,6 +1,8 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved. // Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// Package controllermanager provides an entrypoint into running all of the controllers that run as
// a part of Pinniped.
package controllermanager package controllermanager
import ( import (
@ -8,6 +10,7 @@ import (
"fmt" "fmt"
"time" "time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
k8sinformers "k8s.io/client-go/informers" k8sinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -23,6 +26,7 @@ import (
"go.pinniped.dev/internal/controller/identityprovider/webhookcachecleaner" "go.pinniped.dev/internal/controller/identityprovider/webhookcachecleaner"
"go.pinniped.dev/internal/controller/identityprovider/webhookcachefiller" "go.pinniped.dev/internal/controller/identityprovider/webhookcachefiller"
"go.pinniped.dev/internal/controller/issuerconfig" "go.pinniped.dev/internal/controller/issuerconfig"
"go.pinniped.dev/internal/controller/kubecertagent"
"go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/provider" "go.pinniped.dev/internal/provider"
"go.pinniped.dev/pkg/config/api" "go.pinniped.dev/pkg/config/api"
@ -33,16 +37,49 @@ const (
defaultResyncInterval = 3 * time.Minute defaultResyncInterval = 3 * time.Minute
) )
// Config holds all the input parameters to the set of controllers run as a part of Pinniped.
//
// It is used to inject parameters into PrepareControllers.
type Config struct {
// ServerInstallationNamespace provides the namespace in which Pinniped is deployed.
ServerInstallationNamespace string
// NamesConfig comes from the Pinniped config API (see api.Config). It specifies how Kubernetes
// objects should be named.
NamesConfig *api.NamesConfigSpec
// DiscoveryURLOverride allows a caller to inject a hardcoded discovery URL into Pinniped
// discovery document.
DiscoveryURLOverride *string
// DynamicCertProvider provides a setter and a getter to the Pinniped API's serving cert.
DynamicCertProvider provider.DynamicTLSServingCertProvider
// ServingCertDuration is the validity period, in seconds, of the API serving certificate.
ServingCertDuration time.Duration
// ServingCertRenewBefore is the period of time, in seconds, that pinniped will wait before
// rotating the serving certificate. This period of time starts upon issuance of the serving
// certificate.
ServingCertRenewBefore time.Duration
// IDPCache is a cache of authenticators shared amongst various IDP-related controllers.
IDPCache *idpcache.Cache
// KubeCertAgentTemplate is the template from which the kube-cert-agent controllers will create a
// kube-cert-agent pod. See kubecertagent.Info for more details.
KubeCertAgentTemplate *corev1.Pod
// KubeCertAgentCertPathAnnotation is the name of the annotation key that will be used when
// setting the best-guess path to the kube API's certificate. See kubecertagent.Info for more
// details.
KubeCertAgentCertPathAnnotation string
// KubeCertAgentKeyPathAnnotation is the name of the annotation key that will be used when setting
// the best-guess path to the kube API's key. See kubecertagent.Info for more details.
KubeCertAgentKeyPathAnnotation string
}
// Prepare the controllers and their informers and return a function that will start them when called. // Prepare the controllers and their informers and return a function that will start them when called.
func PrepareControllers( //nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so...
serverInstallationNamespace string, func PrepareControllers(c *Config) (func(ctx context.Context), error) {
namesConfig api.NamesConfigSpec,
discoveryURLOverride *string,
dynamicCertProvider provider.DynamicTLSServingCertProvider,
servingCertDuration time.Duration,
servingCertRenewBefore time.Duration,
idpCache *idpcache.Cache,
) (func(ctx context.Context), error) {
// Create k8s clients. // Create k8s clients.
k8sClient, aggregatorClient, pinnipedClient, err := createClients() k8sClient, aggregatorClient, pinnipedClient, err := createClients()
if err != nil { if err != nil {
@ -50,96 +87,124 @@ func PrepareControllers(
} }
// Create informers. Don't forget to make sure they get started in the function returned below. // Create informers. Don't forget to make sure they get started in the function returned below.
kubePublicNamespaceK8sInformers, installationNamespaceK8sInformers, installationNamespacePinnipedInformers := informers := createInformers(c.ServerInstallationNamespace, k8sClient, pinnipedClient)
createInformers(serverInstallationNamespace, k8sClient, pinnipedClient)
// Create controller manager. // Create controller manager.
controllerManager := controllerlib. controllerManager := controllerlib.
NewManager(). NewManager().
WithController( WithController(
issuerconfig.NewPublisherController(serverInstallationNamespace, issuerconfig.NewPublisherController(
namesConfig.CredentialIssuerConfig, c.ServerInstallationNamespace,
discoveryURLOverride, c.NamesConfig.CredentialIssuerConfig,
c.DiscoveryURLOverride,
pinnipedClient, pinnipedClient,
kubePublicNamespaceK8sInformers.Core().V1().ConfigMaps(), informers.kubePublicNamespaceK8s.Core().V1().ConfigMaps(),
installationNamespacePinnipedInformers.Config().V1alpha1().CredentialIssuerConfigs(), informers.installationNamespacePinniped.Config().V1alpha1().CredentialIssuerConfigs(),
controllerlib.WithInformer, controllerlib.WithInformer,
), ),
singletonWorker, singletonWorker,
). ).
WithController( WithController(
apicerts.NewCertsManagerController( apicerts.NewCertsManagerController(
serverInstallationNamespace, c.ServerInstallationNamespace,
namesConfig.ServingCertificateSecret, c.NamesConfig.ServingCertificateSecret,
k8sClient, k8sClient,
installationNamespaceK8sInformers.Core().V1().Secrets(), informers.installationNamespaceK8s.Core().V1().Secrets(),
controllerlib.WithInformer, controllerlib.WithInformer,
controllerlib.WithInitialEvent, controllerlib.WithInitialEvent,
servingCertDuration, c.ServingCertDuration,
"Pinniped CA", "Pinniped CA",
namesConfig.APIService, c.NamesConfig.APIService,
), ),
singletonWorker, singletonWorker,
). ).
WithController( WithController(
apicerts.NewAPIServiceUpdaterController( apicerts.NewAPIServiceUpdaterController(
serverInstallationNamespace, c.ServerInstallationNamespace,
namesConfig.ServingCertificateSecret, c.NamesConfig.ServingCertificateSecret,
loginv1alpha1.SchemeGroupVersion.Version+"."+loginv1alpha1.GroupName, loginv1alpha1.SchemeGroupVersion.Version+"."+loginv1alpha1.GroupName,
aggregatorClient, aggregatorClient,
installationNamespaceK8sInformers.Core().V1().Secrets(), informers.installationNamespaceK8s.Core().V1().Secrets(),
controllerlib.WithInformer, controllerlib.WithInformer,
), ),
singletonWorker, singletonWorker,
). ).
WithController( WithController(
apicerts.NewCertsObserverController( apicerts.NewCertsObserverController(
serverInstallationNamespace, c.ServerInstallationNamespace,
namesConfig.ServingCertificateSecret, c.NamesConfig.ServingCertificateSecret,
dynamicCertProvider, c.DynamicCertProvider,
installationNamespaceK8sInformers.Core().V1().Secrets(), informers.installationNamespaceK8s.Core().V1().Secrets(),
controllerlib.WithInformer, controllerlib.WithInformer,
), ),
singletonWorker, singletonWorker,
). ).
WithController( WithController(
apicerts.NewCertsExpirerController( apicerts.NewCertsExpirerController(
serverInstallationNamespace, c.ServerInstallationNamespace,
namesConfig.ServingCertificateSecret, c.NamesConfig.ServingCertificateSecret,
k8sClient, k8sClient,
installationNamespaceK8sInformers.Core().V1().Secrets(), informers.installationNamespaceK8s.Core().V1().Secrets(),
controllerlib.WithInformer, controllerlib.WithInformer,
servingCertRenewBefore, c.ServingCertRenewBefore,
), ),
singletonWorker, singletonWorker,
). ).
WithController( WithController(
webhookcachefiller.New( webhookcachefiller.New(
idpCache, c.IDPCache,
installationNamespacePinnipedInformers.IDP().V1alpha1().WebhookIdentityProviders(), informers.installationNamespacePinniped.IDP().V1alpha1().WebhookIdentityProviders(),
klogr.New(), klogr.New(),
), ),
singletonWorker, singletonWorker,
). ).
WithController( WithController(
webhookcachecleaner.New( webhookcachecleaner.New(
idpCache, c.IDPCache,
installationNamespacePinnipedInformers.IDP().V1alpha1().WebhookIdentityProviders(), informers.installationNamespacePinniped.IDP().V1alpha1().WebhookIdentityProviders(),
klogr.New(), klogr.New(),
), ),
singletonWorker, singletonWorker,
).
WithController(
kubecertagent.NewCreaterController(
&kubecertagent.Info{
Template: c.KubeCertAgentTemplate,
},
k8sClient,
informers.kubeSystemNamespaceK8s.Core().V1().Pods(),
controllerlib.WithInformer,
),
singletonWorker,
).
WithController(
kubecertagent.NewDeleterController(
&kubecertagent.Info{
Template: c.KubeCertAgentTemplate,
},
k8sClient,
informers.kubeSystemNamespaceK8s.Core().V1().Pods(),
controllerlib.WithInformer,
),
singletonWorker,
).
WithController(
kubecertagent.NewAnnotaterController(
&kubecertagent.Info{
Template: c.KubeCertAgentTemplate,
CertPathAnnotation: c.KubeCertAgentCertPathAnnotation,
KeyPathAnnotation: c.KubeCertAgentKeyPathAnnotation,
},
k8sClient,
informers.kubeSystemNamespaceK8s.Core().V1().Pods(),
controllerlib.WithInformer,
),
singletonWorker,
) )
// Return a function which starts the informers and controllers. // Return a function which starts the informers and controllers.
return func(ctx context.Context) { return func(ctx context.Context) {
kubePublicNamespaceK8sInformers.Start(ctx.Done()) informers.startAndWaitForSync(ctx)
installationNamespaceK8sInformers.Start(ctx.Done())
installationNamespacePinnipedInformers.Start(ctx.Done())
kubePublicNamespaceK8sInformers.WaitForCacheSync(ctx.Done())
installationNamespaceK8sInformers.WaitForCacheSync(ctx.Done())
installationNamespacePinnipedInformers.WaitForCacheSync(ctx.Done())
go controllerManager.Start(ctx) go controllerManager.Start(ctx)
}, nil }, nil
} }
@ -184,32 +249,53 @@ func createClients() (
return return
} }
type informers struct {
kubePublicNamespaceK8s k8sinformers.SharedInformerFactory
kubeSystemNamespaceK8s k8sinformers.SharedInformerFactory
installationNamespaceK8s k8sinformers.SharedInformerFactory
installationNamespacePinniped pinnipedinformers.SharedInformerFactory
}
// Create the informers that will be used by the controllers. // Create the informers that will be used by the controllers.
func createInformers( func createInformers(
serverInstallationNamespace string, serverInstallationNamespace string,
k8sClient *kubernetes.Clientset, k8sClient *kubernetes.Clientset,
pinnipedClient *pinnipedclientset.Clientset, pinnipedClient *pinnipedclientset.Clientset,
) ( ) *informers {
kubePublicNamespaceK8sInformers k8sinformers.SharedInformerFactory, return &informers{
installationNamespaceK8sInformers k8sinformers.SharedInformerFactory, kubePublicNamespaceK8s: k8sinformers.NewSharedInformerFactoryWithOptions(
installationNamespacePinnipedInformers pinnipedinformers.SharedInformerFactory,
) {
kubePublicNamespaceK8sInformers = k8sinformers.NewSharedInformerFactoryWithOptions(
k8sClient, k8sClient,
defaultResyncInterval, defaultResyncInterval,
k8sinformers.WithNamespace(issuerconfig.ClusterInfoNamespace), k8sinformers.WithNamespace(issuerconfig.ClusterInfoNamespace),
) ),
installationNamespaceK8sInformers = k8sinformers.NewSharedInformerFactoryWithOptions( kubeSystemNamespaceK8s: k8sinformers.NewSharedInformerFactoryWithOptions(
k8sClient,
defaultResyncInterval,
k8sinformers.WithNamespace(kubecertagent.ControllerManagerNamespace),
),
installationNamespaceK8s: k8sinformers.NewSharedInformerFactoryWithOptions(
k8sClient, k8sClient,
defaultResyncInterval, defaultResyncInterval,
k8sinformers.WithNamespace(serverInstallationNamespace), k8sinformers.WithNamespace(serverInstallationNamespace),
) ),
installationNamespacePinnipedInformers = pinnipedinformers.NewSharedInformerFactoryWithOptions( installationNamespacePinniped: pinnipedinformers.NewSharedInformerFactoryWithOptions(
pinnipedClient, pinnipedClient,
defaultResyncInterval, defaultResyncInterval,
pinnipedinformers.WithNamespace(serverInstallationNamespace), pinnipedinformers.WithNamespace(serverInstallationNamespace),
) ),
return }
}
func (i *informers) startAndWaitForSync(ctx context.Context) {
i.kubePublicNamespaceK8s.Start(ctx.Done())
i.kubeSystemNamespaceK8s.Start(ctx.Done())
i.installationNamespaceK8s.Start(ctx.Done())
i.installationNamespacePinniped.Start(ctx.Done())
i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done())
i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done())
i.installationNamespaceK8s.WaitForCacheSync(ctx.Done())
i.installationNamespacePinniped.WaitForCacheSync(ctx.Done())
} }
// Returns a copy of the input config with the ContentConfig set to use protobuf. // Returns a copy of the input config with the ContentConfig set to use protobuf.

View File

@ -11,6 +11,8 @@ import (
"time" "time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
@ -32,6 +34,22 @@ import (
"go.pinniped.dev/internal/provider" "go.pinniped.dev/internal/provider"
"go.pinniped.dev/internal/registry/credentialrequest" "go.pinniped.dev/internal/registry/credentialrequest"
"go.pinniped.dev/pkg/config" "go.pinniped.dev/pkg/config"
configapi "go.pinniped.dev/pkg/config/api"
)
// These constants are various label/annotation keys used in Pinniped. They are namespaced by
// a "pinniped.dev" child domain so they don't collide with other keys.
const (
// kubeCertAgentLabelKey is used to identify which pods are created by the kube-cert-agent
// controllers.
kubeCertAgentLabelKey = "kube-cert-agent.pinniped.dev"
// kubeCertAgentCertPathAnnotationKey is the annotation that the kube-cert-agent pod will use
// to communicate the in-pod path to the kube API's certificate.
kubeCertAgentCertPathAnnotationKey = "kube-cert-agent.pinniped.dev/cert-path"
// kubeCertAgentKeyPathAnnotationKey is the annotation that the kube-cert-agent pod will use
// to communicate the in-pod path to the kube API's key.
kubeCertAgentKeyPathAnnotationKey = "kube-cert-agent.pinniped.dev/key-path"
) )
// App is an object that represents the pinniped-server application. // App is an object that represents the pinniped-server application.
@ -113,7 +131,15 @@ func (a *App) runServer(ctx context.Context) error {
serverInstallationNamespace := podInfo.Namespace serverInstallationNamespace := podInfo.Namespace
// Load the Kubernetes cluster signing CA. // Load the Kubernetes cluster signing CA.
k8sClusterCA, shutdownCA, err := getClusterCASigner(ctx, serverInstallationNamespace, cfg.NamesConfig.CredentialIssuerConfig) kubeCertAgentTemplate, kubeCertAgentLabelSelector := createKubeCertAgentTemplate(
&cfg.KubeCertAgentConfig,
)
k8sClusterCA, shutdownCA, err := getClusterCASigner(
ctx,
serverInstallationNamespace,
cfg.NamesConfig.CredentialIssuerConfig,
kubeCertAgentLabelSelector,
)
if err != nil { if err != nil {
return err return err
} }
@ -132,13 +158,18 @@ func (a *App) runServer(ctx context.Context) error {
// Prepare to start the controllers, but defer actually starting them until the // Prepare to start the controllers, but defer actually starting them until the
// post start hook of the aggregated API server. // post start hook of the aggregated API server.
startControllersFunc, err := controllermanager.PrepareControllers( startControllersFunc, err := controllermanager.PrepareControllers(
serverInstallationNamespace, &controllermanager.Config{
cfg.NamesConfig, ServerInstallationNamespace: serverInstallationNamespace,
cfg.DiscoveryInfo.URL, NamesConfig: &cfg.NamesConfig,
dynamicCertProvider, DiscoveryURLOverride: cfg.DiscoveryInfo.URL,
time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds)*time.Second, DynamicCertProvider: dynamicCertProvider,
time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds)*time.Second, ServingCertDuration: time.Duration(*cfg.APIConfig.ServingCertificateConfig.DurationSeconds) * time.Second,
idpCache, ServingCertRenewBefore: time.Duration(*cfg.APIConfig.ServingCertificateConfig.RenewBeforeSeconds) * time.Second,
IDPCache: idpCache,
KubeCertAgentTemplate: kubeCertAgentTemplate,
KubeCertAgentCertPathAnnotation: kubeCertAgentCertPathAnnotationKey,
KubeCertAgentKeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey,
},
) )
if err != nil { if err != nil {
return fmt.Errorf("could not prepare controllers: %w", err) return fmt.Errorf("could not prepare controllers: %w", err)
@ -168,6 +199,7 @@ func (a *App) runServer(ctx context.Context) error {
func getClusterCASigner( func getClusterCASigner(
ctx context.Context, serverInstallationNamespace string, ctx context.Context, serverInstallationNamespace string,
credentialIssuerConfigResourceName string, credentialIssuerConfigResourceName string,
kubeCertAgentLabelSelector string,
) (credentialrequest.CertIssuer, kubecertauthority.ShutdownFunc, error) { ) (credentialrequest.CertIssuer, kubecertauthority.ShutdownFunc, error) {
// Load the Kubernetes client configuration. // Load the Kubernetes client configuration.
kubeConfig, err := restclient.InClusterConfig() kubeConfig, err := restclient.InClusterConfig()
@ -191,7 +223,14 @@ func getClusterCASigner(
ticker := time.NewTicker(5 * time.Minute) ticker := time.NewTicker(5 * time.Minute)
// Make a CA which uses the Kubernetes cluster API server's signing certs. // Make a CA which uses the Kubernetes cluster API server's signing certs.
kubeCertAgentInfo := kubecertauthority.AgentInfo{
Namespace: "kube-system",
LabelSelector: kubeCertAgentLabelSelector,
CertPathAnnotation: kubeCertAgentCertPathAnnotationKey,
KeyPathAnnotation: kubeCertAgentKeyPathAnnotationKey,
}
k8sClusterCA, shutdownCA := kubecertauthority.New( k8sClusterCA, shutdownCA := kubecertauthority.New(
&kubeCertAgentInfo,
kubeClient, kubeClient,
kubecertauthority.NewPodCommandExecutor(kubeConfig, kubeClient), kubecertauthority.NewPodCommandExecutor(kubeConfig, kubeClient),
ticker.C, ticker.C,
@ -282,3 +321,37 @@ func getAggregatedAPIServerConfig(
} }
return apiServerConfig, nil return apiServerConfig, nil
} }
func createKubeCertAgentTemplate(cfg *configapi.KubeCertAgentSpec) (*corev1.Pod, string) {
terminateImmediately := int64(0)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: *cfg.NamePrefix,
Labels: map[string]string{
kubeCertAgentLabelKey: "",
},
},
Spec: corev1.PodSpec{
TerminationGracePeriodSeconds: &terminateImmediately,
Containers: []corev1.Container{
{
Name: "sleeper",
Image: *cfg.Image,
Command: []string{"/bin/sleep", "infinity"},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("16Mi"),
corev1.ResourceCPU: resource.MustParse("10m"),
},
Requests: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse("16Mi"),
corev1.ResourceCPU: resource.MustParse("10m"),
},
},
},
},
},
}
labelSelector := kubeCertAgentLabelKey + "="
return pod, labelSelector
}

View File

@ -8,6 +8,7 @@ type Config struct {
DiscoveryInfo DiscoveryInfoSpec `json:"discovery"` DiscoveryInfo DiscoveryInfoSpec `json:"discovery"`
APIConfig APIConfigSpec `json:"api"` APIConfig APIConfigSpec `json:"api"`
NamesConfig NamesConfigSpec `json:"names"` NamesConfig NamesConfigSpec `json:"names"`
KubeCertAgentConfig KubeCertAgentSpec `json:"kubeCertAgent"`
} }
// DiscoveryInfoSpec contains configuration knobs specific to // DiscoveryInfoSpec contains configuration knobs specific to
@ -49,3 +50,15 @@ type ServingCertificateConfigSpec struct {
// seconds (about 9 months). // seconds (about 9 months).
RenewBeforeSeconds *int64 `json:"renewBeforeSeconds,omitempty"` RenewBeforeSeconds *int64 `json:"renewBeforeSeconds,omitempty"`
} }
type KubeCertAgentSpec struct {
// NamePrefix is the prefix of the name of the kube-cert-agent pods. For example, if this field is
// set to "some-prefix-", then the name of the pods will look like "some-prefix-blah". The default
// for this value is "pinniped-kube-cert-agent-".
NamePrefix *string `json:"namePrefix,omitempty"`
// Image is the container image that will be used by the kube-cert-agent pod. The container image
// should contain at least 2 binaries: /bin/sleep and cat (somewhere on the $PATH). The default
// for this value is "debian:latest".
Image *string `json:"image"`
}

View File

@ -40,6 +40,7 @@ func FromPath(path string) (*api.Config, error) {
} }
maybeSetAPIDefaults(&config.APIConfig) maybeSetAPIDefaults(&config.APIConfig)
maybeSetKubeCertAgentDefaults(&config.KubeCertAgentConfig)
if err := validateAPI(&config.APIConfig); err != nil { if err := validateAPI(&config.APIConfig); err != nil {
return nil, fmt.Errorf("validate api: %w", err) return nil, fmt.Errorf("validate api: %w", err)
@ -62,6 +63,16 @@ func maybeSetAPIDefaults(apiConfig *api.APIConfigSpec) {
} }
} }
func maybeSetKubeCertAgentDefaults(cfg *api.KubeCertAgentSpec) {
if cfg.NamePrefix == nil {
cfg.NamePrefix = stringPtr("pinniped-kube-cert-agent-")
}
if cfg.Image == nil {
cfg.Image = stringPtr("debian:latest")
}
}
func validateNames(names *api.NamesConfigSpec) error { func validateNames(names *api.NamesConfigSpec) error {
missingNames := []string{} missingNames := []string{}
if names == nil { if names == nil {
@ -98,3 +109,7 @@ func validateAPI(apiConfig *api.APIConfigSpec) error {
func int64Ptr(i int64) *int64 { func int64Ptr(i int64) *int64 {
return &i return &i
} }
func stringPtr(s string) *string {
return &s
}

View File

@ -35,6 +35,10 @@ func TestFromPath(t *testing.T) {
servingCertificateSecret: pinniped-api-tls-serving-certificate servingCertificateSecret: pinniped-api-tls-serving-certificate
credentialIssuerConfig: pinniped-config credentialIssuerConfig: pinniped-config
apiService: pinniped-api apiService: pinniped-api
kubeCertAgentPrefix: kube-cert-agent-prefix
KubeCertAgent:
namePrefix: kube-cert-agent-name-prefix-
image: kube-cert-agent-image
`), `),
wantConfig: &api.Config{ wantConfig: &api.Config{
DiscoveryInfo: api.DiscoveryInfoSpec{ DiscoveryInfo: api.DiscoveryInfoSpec{
@ -51,6 +55,10 @@ func TestFromPath(t *testing.T) {
CredentialIssuerConfig: "pinniped-config", CredentialIssuerConfig: "pinniped-config",
APIService: "pinniped-api", APIService: "pinniped-api",
}, },
KubeCertAgentConfig: api.KubeCertAgentSpec{
NamePrefix: stringPtr("kube-cert-agent-name-prefix-"),
Image: stringPtr("kube-cert-agent-image"),
},
}, },
}, },
{ {
@ -77,6 +85,10 @@ func TestFromPath(t *testing.T) {
CredentialIssuerConfig: "pinniped-config", CredentialIssuerConfig: "pinniped-config",
APIService: "pinniped-api", APIService: "pinniped-api",
}, },
KubeCertAgentConfig: api.KubeCertAgentSpec{
NamePrefix: stringPtr("pinniped-kube-cert-agent-"),
Image: stringPtr("debian:latest"),
},
}, },
}, },
{ {
@ -187,7 +199,3 @@ func TestFromPath(t *testing.T) {
}) })
} }
} }
func stringPtr(s string) *string {
return &s
}

View File

@ -0,0 +1,113 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package integration
import (
"context"
"fmt"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
"go.pinniped.dev/test/library"
)
const (
kubeCertAgentNamespace = "kube-system"
kubeCertAgentLabelSelector = "kube-cert-agent.pinniped.dev="
)
func TestKubeCertAgent(t *testing.T) {
library.SkipUnlessIntegration(t)
library.SkipUnlessClusterHasCapability(t, library.ClusterSigningKeyIsAvailable)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
kubeClient := library.NewClientset(t)
// Get the current number of kube-cert-agent pods.
//
// We can pretty safely assert there should be more than 1, since there should be a
// kube-cert-agent pod per kube-controller-manager pod, and there should probably be at least
// 1 kube-controller-manager for this to be a working kube API.
originalAgentPods, err := kubeClient.CoreV1().Pods(kubeCertAgentNamespace).List(ctx, metav1.ListOptions{
LabelSelector: kubeCertAgentLabelSelector,
})
require.NoError(t, err)
require.NotEmpty(t, originalAgentPods.Items)
sortPods(originalAgentPods)
agentPodsReconciled := func() bool {
var currentAgentPods *corev1.PodList
currentAgentPods, err = kubeClient.CoreV1().Pods(kubeCertAgentNamespace).List(ctx, metav1.ListOptions{
LabelSelector: kubeCertAgentLabelSelector,
})
if err != nil {
return false
}
sortPods(currentAgentPods)
for i := range originalAgentPods.Items {
if !equality.Semantic.DeepEqual(
originalAgentPods.Items[i].Spec,
currentAgentPods.Items[i].Spec,
) {
err = fmt.Errorf(
"original agent pod != current agent pod: %s",
diff.ObjectDiff(originalAgentPods.Items[i].Spec, currentAgentPods.Items[i].Spec),
)
return false
}
}
return true
}
t.Run("reconcile on update", func(t *testing.T) {
// Update the image of the first pod. The controller should see it, and flip it back.
//
// Note that we update the toleration field here because it is the only field, currently, that
// 1) we are allowed to update on a running pod AND 2) the kube-cert-agent controllers care
// about.
updatedAgentPod := originalAgentPods.Items[0].DeepCopy()
updatedAgentPod.Spec.Tolerations = append(
updatedAgentPod.Spec.Tolerations,
corev1.Toleration{Key: "fake-toleration"},
)
_, err = kubeClient.CoreV1().Pods(kubeCertAgentNamespace).Update(ctx, updatedAgentPod, metav1.UpdateOptions{})
require.NoError(t, err)
// Make sure the original pods come back.
assert.Eventually(t, agentPodsReconciled, 10*time.Second, 250*time.Millisecond)
require.NoError(t, err)
})
t.Run("reconcile on delete", func(t *testing.T) {
// Delete the first pod. The controller should see it, and flip it back.
err = kubeClient.
CoreV1().
Pods(kubeCertAgentNamespace).
Delete(ctx, originalAgentPods.Items[0].Name, metav1.DeleteOptions{})
require.NoError(t, err)
// Make sure the original pods come back.
assert.Eventually(t, agentPodsReconciled, 10*time.Second, 250*time.Millisecond)
require.NoError(t, err)
})
}
func sortPods(pods *corev1.PodList) {
sort.Slice(pods.Items, func(i, j int) bool {
return pods.Items[i].Name < pods.Items[j].Name
})
}