From c356710f1fae6c708995d72f91e70290443f3425 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Wed, 18 Aug 2021 00:14:38 -0400 Subject: [PATCH] Add leader election middleware Signed-off-by: Monis Khan --- deploy/concierge/rbac.yaml | 3 + deploy/supervisor/rbac.yaml | 3 + go.mod | 1 + .../concierge/impersonator/impersonator.go | 2 +- .../controller/apicerts/certs_expirer_test.go | 7 +- .../impersonator_config_test.go | 10 +- .../kubecertagent/legacypodcleaner.go | 19 +- .../kubecertagent/legacypodcleaner_test.go | 60 +++- .../supervisorstorage/garbage_collector.go | 7 +- .../garbage_collector_test.go | 26 +- .../controllermanager/prepare_controllers.go | 9 +- internal/kubeclient/roundtrip.go | 4 +- internal/leaderelection/leaderelection.go | 150 ++++++++++ internal/supervisor/server/server.go | 20 +- internal/testutil/delete.go | 24 ++ .../concierge_impersonation_proxy_test.go | 35 +-- test/integration/leaderelection_test.go | 278 ++++++++++++++++++ test/testlib/client.go | 40 ++- 18 files changed, 627 insertions(+), 71 deletions(-) create mode 100644 internal/leaderelection/leaderelection.go create mode 100644 test/integration/leaderelection_test.go diff --git a/deploy/concierge/rbac.yaml b/deploy/concierge/rbac.yaml index 82b1c3ed..f6b14dda 100644 --- a/deploy/concierge/rbac.yaml +++ b/deploy/concierge/rbac.yaml @@ -153,6 +153,9 @@ rules: - apiGroups: [ "" ] resources: [ configmaps ] verbs: [ list, get, watch ] + - apiGroups: [ coordination.k8s.io ] + resources: [ leases ] + verbs: [ create, get, update ] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/deploy/supervisor/rbac.yaml b/deploy/supervisor/rbac.yaml index 60447f7c..14a8499a 100644 --- a/deploy/supervisor/rbac.yaml +++ b/deploy/supervisor/rbac.yaml @@ -48,6 +48,9 @@ rules: - apiGroups: [apps] resources: [replicasets,deployments] verbs: [get] + - apiGroups: [ coordination.k8s.io ] + resources: [ leases ] + verbs: [ create, get, update ] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/go.mod b/go.mod index 1c248a77..7f0baec9 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/tdewolff/minify/v2 v2.9.21 + go.uber.org/atomic v1.7.0 golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 diff --git a/internal/concierge/impersonator/impersonator.go b/internal/concierge/impersonator/impersonator.go index 067fcfac..c482511e 100644 --- a/internal/concierge/impersonator/impersonator.go +++ b/internal/concierge/impersonator/impersonator.go @@ -102,7 +102,7 @@ func newInternal( //nolint:funlen // yeah, it's kind of long. // Wire up the impersonation proxy signer CA as another valid authenticator for client cert auth, // along with the Kube API server's CA. - // Note: any changes to the the Authentication stack need to be kept in sync with any assumptions made + // Note: any changes to the Authentication stack need to be kept in sync with any assumptions made // by getTransportForUser, especially if we ever update the TCR API to start returning bearer tokens. kubeClientUnsafeForProxying, err := kubeclient.New(clientOpts...) if err != nil { diff --git a/internal/controller/apicerts/certs_expirer_test.go b/internal/controller/apicerts/certs_expirer_test.go index 01040423..4819e59d 100644 --- a/internal/controller/apicerts/certs_expirer_test.go +++ b/internal/controller/apicerts/certs_expirer_test.go @@ -297,12 +297,7 @@ func TestExpirerControllerSync(t *testing.T) { if test.wantDelete { require.Len(t, *opts, 1) - require.Equal(t, metav1.DeleteOptions{ - Preconditions: &metav1.Preconditions{ - UID: &testUID, - ResourceVersion: &testRV, - }, - }, (*opts)[0]) + require.Equal(t, testutil.NewPreconditions(testUID, testRV), (*opts)[0]) } else { require.Len(t, *opts, 0) } diff --git a/internal/controller/impersonatorconfig/impersonator_config_test.go b/internal/controller/impersonatorconfig/impersonator_config_test.go index af29711b..00aeadd1 100644 --- a/internal/controller/impersonatorconfig/impersonator_config_test.go +++ b/internal/controller/impersonatorconfig/impersonator_config_test.go @@ -29,14 +29,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kubernetesfake "k8s.io/client-go/kubernetes/fake" coretesting "k8s.io/client-go/testing" - "k8s.io/utils/pointer" "go.pinniped.dev/generated/latest/apis/concierge/config/v1alpha1" pinnipedfake "go.pinniped.dev/generated/latest/client/concierge/clientset/versioned/fake" @@ -1032,13 +1030,7 @@ func TestImpersonatorConfigControllerSync(t *testing.T) { // validate that we set delete preconditions correctly r.NotEmpty(*deleteOptions) for _, opt := range *deleteOptions { - uid := types.UID("uid-1234") - r.Equal(metav1.DeleteOptions{ - Preconditions: &metav1.Preconditions{ - UID: &uid, - ResourceVersion: pointer.String("rv-5678"), - }, - }, opt) + r.Equal(testutil.NewPreconditions("uid-1234", "rv-5678"), opt) } } diff --git a/internal/controller/kubecertagent/legacypodcleaner.go b/internal/controller/kubecertagent/legacypodcleaner.go index 1a44477e..43e4a9f9 100644 --- a/internal/controller/kubecertagent/legacypodcleaner.go +++ b/internal/controller/kubecertagent/legacypodcleaner.go @@ -40,12 +40,29 @@ func NewLegacyPodCleanerController( controllerlib.Config{ Name: "legacy-pod-cleaner-controller", Syncer: controllerlib.SyncFunc(func(ctx controllerlib.Context) error { - if err := client.Kubernetes.CoreV1().Pods(ctx.Key.Namespace).Delete(ctx.Context, ctx.Key.Name, metav1.DeleteOptions{}); err != nil { + podClient := client.Kubernetes.CoreV1().Pods(ctx.Key.Namespace) + + // avoid blind writes to the API + agentPod, err := podClient.Get(ctx.Context, ctx.Key.Name, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("could not get legacy agent pod: %w", err) + } + + if err := podClient.Delete(ctx.Context, ctx.Key.Name, metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{ + UID: &agentPod.UID, + ResourceVersion: &agentPod.ResourceVersion, + }, + }); err != nil { if k8serrors.IsNotFound(err) { return nil } return fmt.Errorf("could not delete legacy agent pod: %w", err) } + log.Info("deleted legacy kube-cert-agent pod", "pod", klog.KRef(ctx.Key.Namespace, ctx.Key.Name)) return nil }), diff --git a/internal/controller/kubecertagent/legacypodcleaner_test.go b/internal/controller/kubecertagent/legacypodcleaner_test.go index b2b1a5e7..7cf89b0b 100644 --- a/internal/controller/kubecertagent/legacypodcleaner_test.go +++ b/internal/controller/kubecertagent/legacypodcleaner_test.go @@ -20,6 +20,7 @@ import ( "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/kubeclient" + "go.pinniped.dev/internal/testutil" "go.pinniped.dev/internal/testutil/testlogger" ) @@ -28,9 +29,11 @@ func TestLegacyPodCleanerController(t *testing.T) { legacyAgentPodWithoutExtraLabel := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "concierge", - Name: "pinniped-concierge-kube-cert-agent-without-extra-label", - Labels: map[string]string{"kube-cert-agent.pinniped.dev": "true"}, + Namespace: "concierge", + Name: "pinniped-concierge-kube-cert-agent-without-extra-label", + Labels: map[string]string{"kube-cert-agent.pinniped.dev": "true"}, + UID: "1", + ResourceVersion: "2", }, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{Phase: corev1.PodRunning}, @@ -40,10 +43,14 @@ func TestLegacyPodCleanerController(t *testing.T) { legacyAgentPodWithExtraLabel.Name = "pinniped-concierge-kube-cert-agent-with-extra-label" legacyAgentPodWithExtraLabel.Labels["extralabel"] = "labelvalue" legacyAgentPodWithExtraLabel.Labels["anotherextralabel"] = "labelvalue" + legacyAgentPodWithExtraLabel.UID = "3" + legacyAgentPodWithExtraLabel.ResourceVersion = "4" nonLegacyAgentPod := legacyAgentPodWithExtraLabel.DeepCopy() nonLegacyAgentPod.Name = "pinniped-concierge-kube-cert-agent-not-legacy" nonLegacyAgentPod.Labels["kube-cert-agent.pinniped.dev"] = "v2" + nonLegacyAgentPod.UID = "5" + nonLegacyAgentPod.ResourceVersion = "6" tests := []struct { name string @@ -52,10 +59,12 @@ func TestLegacyPodCleanerController(t *testing.T) { wantDistinctErrors []string wantDistinctLogs []string wantActions []coretesting.Action + wantDeleteOptions []metav1.DeleteOptions }{ { - name: "no pods", - wantActions: []coretesting.Action{}, + name: "no pods", + wantActions: []coretesting.Action{}, + wantDeleteOptions: []metav1.DeleteOptions{}, }, { name: "mix of pods", @@ -69,8 +78,12 @@ func TestLegacyPodCleanerController(t *testing.T) { `legacy-pod-cleaner-controller "level"=0 "msg"="deleted legacy kube-cert-agent pod" "pod"={"name":"pinniped-concierge-kube-cert-agent-with-extra-label","namespace":"concierge"}`, }, wantActions: []coretesting.Action{ // the first delete triggers the informer again, but the second invocation triggers a Not Found + coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), - coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), + coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), + }, + wantDeleteOptions: []metav1.DeleteOptions{ + testutil.NewPreconditions("3", "4"), }, }, { @@ -89,9 +102,15 @@ func TestLegacyPodCleanerController(t *testing.T) { "could not delete legacy agent pod: some delete error", }, wantActions: []coretesting.Action{ + coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), + coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), }, + wantDeleteOptions: []metav1.DeleteOptions{ + testutil.NewPreconditions("3", "4"), + testutil.NewPreconditions("3", "4"), + }, }, { name: "fail to delete because of not found error", @@ -107,8 +126,30 @@ func TestLegacyPodCleanerController(t *testing.T) { }, wantDistinctErrors: []string{""}, wantActions: []coretesting.Action{ + coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), }, + wantDeleteOptions: []metav1.DeleteOptions{ + testutil.NewPreconditions("3", "4"), + }, + }, + { + name: "fail to delete because of not found error on get", + kubeObjects: []runtime.Object{ + legacyAgentPodWithoutExtraLabel, // should not be delete (missing extra label) + legacyAgentPodWithExtraLabel, // should be deleted + nonLegacyAgentPod, // should not be deleted (missing legacy agent label) + }, + addKubeReactions: func(clientset *kubefake.Clientset) { + clientset.PrependReactor("get", "*", func(action coretesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, k8serrors.NewNotFound(action.GetResource().GroupResource(), "") + }) + }, + wantDistinctErrors: []string{""}, + wantActions: []coretesting.Action{ + coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name), + }, + wantDeleteOptions: []metav1.DeleteOptions{}, }, } for _, tt := range tests { @@ -120,6 +161,10 @@ func TestLegacyPodCleanerController(t *testing.T) { if tt.addKubeReactions != nil { tt.addKubeReactions(kubeClientset) } + + opts := &[]metav1.DeleteOptions{} + trackDeleteClient := testutil.NewDeleteOptionsRecorder(kubeClientset, opts) + kubeInformers := informers.NewSharedInformerFactory(kubeClientset, 0) log := testlogger.New(t) controller := NewLegacyPodCleanerController( @@ -127,7 +172,7 @@ func TestLegacyPodCleanerController(t *testing.T) { Namespace: "concierge", Labels: map[string]string{"extralabel": "labelvalue"}, }, - &kubeclient.Client{Kubernetes: kubeClientset}, + &kubeclient.Client{Kubernetes: trackDeleteClient}, kubeInformers.Core().V1().Pods(), log, controllerlib.WithMaxRetries(1), @@ -140,6 +185,7 @@ func TestLegacyPodCleanerController(t *testing.T) { assert.Equal(t, tt.wantDistinctErrors, deduplicate(errorMessages), "unexpected errors") assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs") assert.Equal(t, tt.wantActions, kubeClientset.Actions()[2:], "unexpected actions") + assert.Equal(t, tt.wantDeleteOptions, *opts, "unexpected delete options") }) } } diff --git a/internal/controller/supervisorstorage/garbage_collector.go b/internal/controller/supervisorstorage/garbage_collector.go index a6d7ebce..beb5317f 100644 --- a/internal/controller/supervisorstorage/garbage_collector.go +++ b/internal/controller/supervisorstorage/garbage_collector.go @@ -102,7 +102,12 @@ func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error { } if garbageCollectAfterTime.Before(frozenClock.Now()) { - err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{}) + err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{ + UID: &secret.UID, + ResourceVersion: &secret.ResourceVersion, + }, + }) if err != nil { plog.WarningErr("failed to garbage collect resource", err, logKV(secret)) continue diff --git a/internal/controller/supervisorstorage/garbage_collector_test.go b/internal/controller/supervisorstorage/garbage_collector_test.go index ac64acf3..889de157 100644 --- a/internal/controller/supervisorstorage/garbage_collector_test.go +++ b/internal/controller/supervisorstorage/garbage_collector_test.go @@ -18,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/clock" kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" kubernetesfake "k8s.io/client-go/kubernetes/fake" kubetesting "k8s.io/client-go/testing" @@ -116,6 +117,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) { subject controllerlib.Controller kubeInformerClient *kubernetesfake.Clientset kubeClient *kubernetesfake.Clientset + deleteOptions *[]metav1.DeleteOptions + deleteOptionsRecorder kubernetes.Interface kubeInformers kubeinformers.SharedInformerFactory cancelContext context.Context cancelContextCancelFunc context.CancelFunc @@ -130,7 +133,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { // Set this at the last second to allow for injection of server override. subject = GarbageCollectorController( fakeClock, - kubeClient, + deleteOptionsRecorder, kubeInformers.Core().V1().Secrets(), controllerlib.WithInformer, ) @@ -158,6 +161,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) { kubeInformerClient = kubernetesfake.NewSimpleClientset() kubeClient = kubernetesfake.NewSimpleClientset() + deleteOptions = &[]metav1.DeleteOptions{} + deleteOptionsRecorder = testutil.NewDeleteOptionsRecorder(kubeClient, deleteOptions) kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0) frozenNow = time.Now().UTC() fakeClock = clock.NewFakeClock(frozenNow) @@ -193,8 +198,10 @@ func TestGarbageCollectorControllerSync(t *testing.T) { it.Before(func() { firstExpiredSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: "first expired secret", - Namespace: installedInNamespace, + Name: "first expired secret", + Namespace: installedInNamespace, + UID: "uid-123", + ResourceVersion: "rv-456", Annotations: map[string]string{ "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339), }, @@ -204,8 +211,10 @@ func TestGarbageCollectorControllerSync(t *testing.T) { r.NoError(kubeClient.Tracker().Add(firstExpiredSecret)) secondExpiredSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: "second expired secret", - Namespace: installedInNamespace, + Name: "second expired secret", + Namespace: installedInNamespace, + UID: "uid-789", + ResourceVersion: "rv-555", Annotations: map[string]string{ "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-2 * time.Second).Format(time.RFC3339), }, @@ -237,6 +246,13 @@ func TestGarbageCollectorControllerSync(t *testing.T) { }, kubeClient.Actions(), ) + r.ElementsMatch( + []metav1.DeleteOptions{ + testutil.NewPreconditions("uid-123", "rv-456"), + testutil.NewPreconditions("uid-789", "rv-555"), + }, + *deleteOptions, + ) list, err := kubeClient.CoreV1().Secrets(installedInNamespace).List(context.Background(), metav1.ListOptions{}) r.NoError(err) r.Len(list.Items, 2) diff --git a/internal/controllermanager/prepare_controllers.go b/internal/controllermanager/prepare_controllers.go index e6b69ec1..8c9e9808 100644 --- a/internal/controllermanager/prepare_controllers.go +++ b/internal/controllermanager/prepare_controllers.go @@ -33,6 +33,7 @@ import ( "go.pinniped.dev/internal/dynamiccert" "go.pinniped.dev/internal/groupsuffix" "go.pinniped.dev/internal/kubeclient" + "go.pinniped.dev/internal/leaderelection" ) const ( @@ -97,7 +98,7 @@ type Config struct { func PrepareControllers(c *Config) (func(ctx context.Context), error) { loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix) - dref, _, err := deploymentref.New(c.ServerInstallationInfo) + dref, deployment, err := deploymentref.New(c.ServerInstallationInfo) if err != nil { return nil, fmt.Errorf("cannot create deployment ref: %w", err) } @@ -107,7 +108,9 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) { return nil, fmt.Errorf("cannot create API service ref: %w", err) } - client, err := kubeclient.New( + client, leaderElector, err := leaderelection.New( + c.ServerInstallationInfo, + deployment, dref, // first try to use the deployment as an owner ref (for namespace scoped resources) apiServiceRef, // fallback to our API service (for everything else we create) kubeclient.WithMiddleware(groupsuffix.New(c.APIGroupSuffix)), @@ -303,7 +306,7 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) { // Return a function which starts the informers and controllers. return func(ctx context.Context) { informers.startAndWaitForSync(ctx) - go controllerManager.Start(ctx) + go leaderElector(ctx, controllerManager.Start) }, nil } diff --git a/internal/kubeclient/roundtrip.go b/internal/kubeclient/roundtrip.go index 3d6029dc..21b63eaa 100644 --- a/internal/kubeclient/roundtrip.go +++ b/internal/kubeclient/roundtrip.go @@ -146,7 +146,7 @@ func handleOtherVerbs( result, err := middlewareReq.mutateRequest(obj) if err != nil { - return true, nil, err + return true, nil, fmt.Errorf("middleware request for %#v failed to mutate: %w", middlewareReq, err) } if !result.mutated { @@ -231,7 +231,7 @@ func handleCreateOrUpdate( result, err := middlewareReq.mutateRequest(obj) if err != nil { - return true, nil, err + return true, nil, fmt.Errorf("middleware request for %#v failed to mutate: %w", middlewareReq, err) } if !result.mutated { diff --git a/internal/leaderelection/leaderelection.go b/internal/leaderelection/leaderelection.go new file mode 100644 index 00000000..bb17c3de --- /dev/null +++ b/internal/leaderelection/leaderelection.go @@ -0,0 +1,150 @@ +// Copyright 2021 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package leaderelection + +import ( + "context" + "fmt" + "time" + + "go.uber.org/atomic" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + + "go.pinniped.dev/internal/constable" + "go.pinniped.dev/internal/downward" + "go.pinniped.dev/internal/kubeclient" + "go.pinniped.dev/internal/plog" +) + +const ErrNotLeader constable.Error = "write attempt rejected as client is not leader" + +// New returns a client that has a leader election middleware injected into it. +// This middleware will prevent all non-read requests to the Kubernetes API when +// the current process does not hold the leader election lock. Unlike normal +// leader election where the process blocks until it acquires the lock, this +// middleware approach lets the process run as normal for all read requests. +// Another difference is that if the process acquires the lock and then loses it +// (i.e. a failed renewal), it will not exit (i.e. restart). Instead, it will +// simply attempt to acquire the lock again. +// +// The returned function is blocking and will run the leader election polling +// logic and will coordinate lease release with the input controller starter function. +func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) ( + *kubeclient.Client, + func(context.Context, func(context.Context)), + error, +) { + internalClient, err := kubeclient.New(opts...) + if err != nil { + return nil, nil, fmt.Errorf("could not create internal client for leader election: %w", err) + } + + isLeader := atomic.NewBool(false) + + identity := podInfo.Name + leaseName := deployment.Name + + leaderElectionConfig := leaderelection.LeaderElectionConfig{ + Lock: &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Namespace: podInfo.Namespace, + Name: leaseName, + }, + Client: internalClient.Kubernetes.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: identity, + }, + }, + ReleaseOnCancel: true, // semantics for correct release handled by controllersWithLeaderElector below + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + plog.Debug("leader gained", "identity", identity) + isLeader.Store(true) + }, + OnStoppedLeading: func() { + plog.Debug("leader lost", "identity", identity) + isLeader.Store(false) + }, + OnNewLeader: func(newLeader string) { + if newLeader == identity { + return + } + plog.Debug("new leader elected", "newLeader", newLeader) + }, + }, + Name: leaseName, + // this must be set to nil because we do not want to associate /healthz with a failed + // leader election renewal as we do not want to exit the process if the leader changes. + WatchDog: nil, + } + + // validate our config here before we rely on it being functioning below + if _, err := leaderelection.NewLeaderElector(leaderElectionConfig); err != nil { + return nil, nil, fmt.Errorf("invalid config - could not create leader elector: %w", err) + } + + writeOnlyWhenLeader := kubeclient.MiddlewareFunc(func(_ context.Context, rt kubeclient.RoundTrip) { + switch rt.Verb() { + case kubeclient.VerbGet, kubeclient.VerbList, kubeclient.VerbWatch: + // reads are always allowed. + // note that while our pods/exec into the kube cert agent pod is a write request from the + // perspective of the Kube API, it is semantically a read request since no mutation occurs. + // we simply use it to fill a cache, and we need all pods to have a functioning cache. + // however, we do not need to handle it here because remotecommand.NewSPDYExecutor uses a + // kubeclient.Client.JSONConfig as input. since our middleware logic is only injected into + // the generated clientset code, this JSONConfig simply ignores this middleware all together. + return + } + + if isLeader.Load() { // only perform "expensive" test for writes + return // we are currently the leader, all actions are permitted + } + + rt.MutateRequest(func(_ kubeclient.Object) error { + return ErrNotLeader // we are not the leader, fail the write request + }) + }) + + leaderElectionOpts := append( + // all middleware are always executed so this being the first middleware is not relevant + []kubeclient.Option{kubeclient.WithMiddleware(writeOnlyWhenLeader)}, + opts..., // do not mutate input slice + ) + + client, err := kubeclient.New(leaderElectionOpts...) + if err != nil { + return nil, nil, fmt.Errorf("could not create leader election client: %w", err) + } + + controllersWithLeaderElector := func(ctx context.Context, controllers func(context.Context)) { + leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context + + go func() { + controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled + leaderElectorCancel() // once the controllers have all stopped, tell the leader elector to release the lock + }() + + for { // run (and rerun on release) the leader elector with its own context (blocking) + select { + case <-leaderElectorCtx.Done(): + return // keep trying to run until process exit + + default: + // blocks while trying to acquire lease, unblocks on release. + // note that this creates a new leader elector on each loop to + // prevent any bugs from reusing that struct across elections. + // our config was validated above so this should never die. + leaderelection.RunOrDie(leaderElectorCtx, leaderElectionConfig) + } + } + } + + return client, controllersWithLeaderElector, nil +} diff --git a/internal/supervisor/server/server.go b/internal/supervisor/server/server.go index b4ce7549..a163573e 100644 --- a/internal/supervisor/server/server.go +++ b/internal/supervisor/server/server.go @@ -41,6 +41,7 @@ import ( "go.pinniped.dev/internal/downward" "go.pinniped.dev/internal/groupsuffix" "go.pinniped.dev/internal/kubeclient" + "go.pinniped.dev/internal/leaderelection" "go.pinniped.dev/internal/oidc/jwks" "go.pinniped.dev/internal/oidc/provider" "go.pinniped.dev/internal/oidc/provider/manager" @@ -94,6 +95,7 @@ func startControllers( pinnipedClient pinnipedclientset.Interface, kubeInformers kubeinformers.SharedInformerFactory, pinnipedInformers pinnipedinformers.SharedInformerFactory, + leaderElector func(context.Context, func(context.Context)), ) { federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains() secretInformer := kubeInformers.Core().V1().Secrets() @@ -261,7 +263,7 @@ func startControllers( kubeInformers.WaitForCacheSync(ctx.Done()) pinnipedInformers.WaitForCacheSync(ctx.Done()) - go controllerManager.Start(ctx) + go leaderElector(ctx, controllerManager.Start) } func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { @@ -275,14 +277,25 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { return fmt.Errorf("cannot create deployment ref: %w", err) } - client, err := kubeclient.New( + opts := []kubeclient.Option{ dref, kubeclient.WithMiddleware(groupsuffix.New(*cfg.APIGroupSuffix)), + } + + client, leaderElector, err := leaderelection.New( + podInfo, + supervisorDeployment, + opts..., ) if err != nil { return fmt.Errorf("cannot create k8s client: %w", err) } + clientWithoutLeaderElection, err := kubeclient.New(opts...) + if err != nil { + return fmt.Errorf("cannot create k8s client without leader election: %w", err) + } + kubeInformers := kubeinformers.NewSharedInformerFactoryWithOptions( client.Kubernetes, defaultResyncInterval, @@ -312,7 +325,7 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { dynamicJWKSProvider, dynamicUpstreamIDPProvider, &secretCache, - client.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), + clientWithoutLeaderElection.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), // writes to kube storage are allowed for non-leaders ) startControllers( @@ -328,6 +341,7 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error { client.PinnipedSupervisor, kubeInformers, pinnipedInformers, + leaderElector, ) //nolint: gosec // Intentionally binding to all network interfaces. diff --git a/internal/testutil/delete.go b/internal/testutil/delete.go index 7a6a9fe5..424ae5a3 100644 --- a/internal/testutil/delete.go +++ b/internal/testutil/delete.go @@ -7,6 +7,7 @@ import ( "context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -32,10 +33,24 @@ type coreWrapper struct { opts *[]metav1.DeleteOptions } +func (c *coreWrapper) Pods(namespace string) corev1client.PodInterface { + return &podsWrapper{PodInterface: c.CoreV1Interface.Pods(namespace), opts: c.opts} +} + func (c *coreWrapper) Secrets(namespace string) corev1client.SecretInterface { return &secretsWrapper{SecretInterface: c.CoreV1Interface.Secrets(namespace), opts: c.opts} } +type podsWrapper struct { + corev1client.PodInterface + opts *[]metav1.DeleteOptions +} + +func (s *podsWrapper) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { + *s.opts = append(*s.opts, opts) + return s.PodInterface.Delete(ctx, name, opts) +} + type secretsWrapper struct { corev1client.SecretInterface opts *[]metav1.DeleteOptions @@ -45,3 +60,12 @@ func (s *secretsWrapper) Delete(ctx context.Context, name string, opts metav1.De *s.opts = append(*s.opts, opts) return s.SecretInterface.Delete(ctx, name, opts) } + +func NewPreconditions(uid types.UID, rv string) metav1.DeleteOptions { + return metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{ + UID: &uid, + ResourceVersion: &rv, + }, + } +} diff --git a/test/integration/concierge_impersonation_proxy_test.go b/test/integration/concierge_impersonation_proxy_test.go index 7397f4d0..35c09234 100644 --- a/test/integration/concierge_impersonation_proxy_test.go +++ b/test/integration/concierge_impersonation_proxy_test.go @@ -438,7 +438,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl t.Run("using and watching all the basic verbs", func(t *testing.T) { parallelIfNotEKS(t) // Create a namespace, because it will be easier to exercise "deletecollection" if we have a namespace. - namespaceName := createTestNamespace(t, adminClient) + namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name // Create and start informer to exercise the "watch" verb for us. informerFactory := k8sinformers.NewSharedInformerFactoryWithOptions( @@ -827,7 +827,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl // this works because impersonation cannot set UID and thus the final user info the proxy sees has no UID t.Run("nested impersonation as a service account is allowed if it has enough RBAC permissions", func(t *testing.T) { parallelIfNotEKS(t) - namespaceName := createTestNamespace(t, adminClient) + namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name saName, saToken, saUID := createServiceAccountToken(ctx, t, adminClient, namespaceName) nestedImpersonationClient := newImpersonationProxyClientWithCredentials(t, &loginv1alpha1.ClusterCredential{Token: saToken}, impersonationProxyURL, impersonationProxyCACertPEM, @@ -916,7 +916,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl } // Test using a service account token. - namespaceName := createTestNamespace(t, adminClient) + namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name saName, saToken, _ := createServiceAccountToken(ctx, t, adminClient, namespaceName) impersonationProxyServiceAccountPinnipedConciergeClient := newImpersonationProxyClientWithCredentials(t, &loginv1alpha1.ClusterCredential{Token: saToken}, @@ -935,7 +935,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl }) t.Run("WhoAmIRequests and SA token request", func(t *testing.T) { - namespaceName := createTestNamespace(t, adminClient) + namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name kubeClient := adminClient.CoreV1() saName, _, saUID := createServiceAccountToken(ctx, t, adminClient, namespaceName) @@ -1145,7 +1145,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl t.Run("websocket client", func(t *testing.T) { parallelIfNotEKS(t) - namespaceName := createTestNamespace(t, adminClient) + namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name impersonationRestConfig := impersonationProxyRestConfig( refreshCredential(t, impersonationProxyURL, impersonationProxyCACertPEM), @@ -1224,7 +1224,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl t.Run("http2 client", func(t *testing.T) { parallelIfNotEKS(t) - namespaceName := createTestNamespace(t, adminClient) + namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name wantConfigMapLabelKey, wantConfigMapLabelValue := "some-label-key", "some-label-value" wantConfigMap := &corev1.ConfigMap{ @@ -1783,7 +1783,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl testlib.RequireEventually(t, func(requireEventually *require.Assertions) { _, err := adminClient.CoreV1().Secrets(env.ConciergeNamespace).Get(ctx, impersonationProxyTLSSecretName(env), metav1.GetOptions{}) requireEventually.Truef(k8serrors.IsNotFound(err), "expected NotFound error, got %v", err) - }, 10*time.Second, 250*time.Millisecond) + }, time.Minute, time.Second) // Check that the generated CA cert Secret was not deleted by the controller because it's supposed to keep this // around in case we decide to later re-enable the impersonator. We want to avoid generating new CA certs when @@ -1864,27 +1864,6 @@ func ensureDNSResolves(t *testing.T, urlString string) { }, 5*time.Minute, 1*time.Second) } -func createTestNamespace(t *testing.T, adminClient kubernetes.Interface) string { - t.Helper() - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - namespace, err := adminClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{GenerateName: "impersonation-integration-test-"}, - }, metav1.CreateOptions{}) - require.NoError(t, err) - - t.Cleanup(func() { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - - t.Logf("cleaning up test namespace %s", namespace.Name) - require.NoError(t, adminClient.CoreV1().Namespaces().Delete(ctx, namespace.Name, metav1.DeleteOptions{})) - }) - return namespace.Name -} - func createServiceAccountToken(ctx context.Context, t *testing.T, adminClient kubernetes.Interface, namespaceName string) (name, token string, uid types.UID) { t.Helper() diff --git a/test/integration/leaderelection_test.go b/test/integration/leaderelection_test.go new file mode 100644 index 00000000..7fe80cf7 --- /dev/null +++ b/test/integration/leaderelection_test.go @@ -0,0 +1,278 @@ +// Copyright 2021 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + authenticationv1 "k8s.io/api/authentication/v1" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" + "k8s.io/utils/pointer" + + "go.pinniped.dev/internal/downward" + "go.pinniped.dev/internal/kubeclient" + "go.pinniped.dev/internal/leaderelection" + "go.pinniped.dev/test/testlib" +) + +func TestLeaderElection(t *testing.T) { + _ = testlib.IntegrationEnv(t) + + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + t.Cleanup(cancel) + + leaseName := "leader-election-" + rand.String(5) + + namespace := testlib.CreateNamespace(ctx, t, leaseName) + + clients := leaderElectionClients(t, namespace, leaseName) + + // the tests below are order dependant to some degree and definitely cannot be run in parallel + + t.Run("sanity check write prevention", func(t *testing.T) { + lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients) + logLease(t, lease) + }) + + t.Run("clients handle leader election transition correctly", func(t *testing.T) { + lease := forceTransition(ctx, t, namespace, leaseName, clients) + logLease(t, lease) + }) + + t.Run("sanity check write prevention after transition", func(t *testing.T) { + lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients) + logLease(t, lease) + }) + + t.Run("clients handle leader election restart correctly", func(t *testing.T) { + lease := forceRestart(ctx, t, namespace, leaseName, clients) + logLease(t, lease) + }) + + t.Run("sanity check write prevention after restart", func(t *testing.T) { + lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients) + logLease(t, lease) + }) +} + +func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) *kubeclient.Client { + t.Helper() + + podInfo := &downward.PodInfo{ + Namespace: namespace.Name, + Name: identity, + } + deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: leaseName}} + + client, leaderElector, err := leaderelection.New(podInfo, deployment, testlib.NewKubeclientOptions(t, testlib.NewClientConfig(t))...) + require.NoError(t, err) + + controllerCtx, controllerCancel := context.WithCancel(context.Background()) + leaderCtx, leaderCancel := context.WithCancel(context.Background()) + + t.Cleanup(func() { + controllerCancel() + + select { + case <-leaderCtx.Done(): + // leader election client stopped correctly + + case <-time.After(time.Minute): + t.Errorf("leader election client in namespace %q with lease %q and identity %q failed to stop", + namespace.Name, leaseName, identity) + } + }) + + go func() { + time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize start of client and controllers + + // this blocks + leaderElector(controllerCtx, func(ctx context.Context) { + <-ctx.Done() + time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize stop of controllers + }) + + select { + case <-controllerCtx.Done(): + // leaderElector correctly stopped but only after controllers stopped + + default: + t.Errorf("leader election client in namespace %q with lease %q and identity %q stopped early", + namespace.Name, leaseName, identity) + } + + leaderCancel() + }() + + return client +} + +func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) map[string]*kubeclient.Client { + t.Helper() + + count := rand.IntnRange(1, 6) + out := make(map[string]*kubeclient.Client, count) + + for i := 0; i < count; i++ { + identity := "leader-election-client-" + rand.String(5) + out[identity] = leaderElectionClient(t, namespace, leaseName, identity) + } + + t.Logf("running leader election client tests with %d clients: %v", len(out), sets.StringKeySet(out).List()) + + return out +} + +func pickRandomLeaderElectionClient(clients map[string]*kubeclient.Client) *kubeclient.Client { + for _, client := range clients { + client := client + return client + } + panic("clients map was empty") +} + +func waitForIdentity(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { + t.Helper() + + identities := sets.StringKeySet(clients) + var out *coordinationv1.Lease + + testlib.RequireEventuallyWithoutError(t, func() (bool, error) { + lease, err := pickRandomLeaderElectionClient(clients).Kubernetes.CoordinationV1().Leases(namespace.Name).Get(ctx, leaseName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + out = lease + return lease.Spec.HolderIdentity != nil && identities.Has(*lease.Spec.HolderIdentity), nil + }, 3*time.Minute, time.Second) + + return out +} + +func runWriteRequest(ctx context.Context, client *kubeclient.Client) error { + _, err := client.Kubernetes.AuthenticationV1().TokenReviews().Create(ctx, &authenticationv1.TokenReview{ + Spec: authenticationv1.TokenReviewSpec{Token: "any-non-empty-value"}, + }, metav1.CreateOptions{}) + return err +} + +func runWriteRequests(ctx context.Context, clients map[string]*kubeclient.Client) map[string]error { + out := make(map[string]error, len(clients)) + + for identity, client := range clients { + identity, client := identity, client + + out[identity] = runWriteRequest(ctx, client) + } + + return out +} + +func pickCurrentLeaderClient(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *kubeclient.Client { + t.Helper() + + lease := waitForIdentity(ctx, t, namespace, leaseName, clients) + return clients[*lease.Spec.HolderIdentity] +} + +func checkOnlyLeaderCanWrite(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { + t.Helper() + + lease := waitForIdentity(ctx, t, namespace, leaseName, clients) + + var leaders, nonLeaders int + for identity, err := range runWriteRequests(ctx, clients) { + identity, err := identity, err + + if identity == *lease.Spec.HolderIdentity { + leaders++ + assert.NoError(t, err, "leader client %q should have no error", identity) + } else { + nonLeaders++ + assert.Error(t, err, "non leader client %q should have write error but it was nil", identity) + assert.True(t, errors.Is(err, leaderelection.ErrNotLeader), "non leader client %q should have write error: %v", identity, err) + } + } + assert.Equal(t, 1, leaders, "did not see leader") + assert.Equal(t, len(clients)-1, nonLeaders, "did not see non-leader") + + return lease +} + +func forceTransition(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { + t.Helper() + + var startTransitions int32 + var startTime metav1.MicroTime + + errRetry := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + leaderClient := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients) + startLease := waitForIdentity(ctx, t, namespace, leaseName, clients) + startTransitions = *startLease.Spec.LeaseTransitions + startTime = *startLease.Spec.AcquireTime + + startLease = startLease.DeepCopy() + startLease.Spec.HolderIdentity = pointer.String("some-other-client" + rand.String(5)) + + _, err := leaderClient.Kubernetes.CoordinationV1().Leases(namespace.Name).Update(ctx, startLease, metav1.UpdateOptions{}) + return err + }) + require.NoError(t, errRetry) + + finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients) + finalTransitions := *finalLease.Spec.LeaseTransitions + finalTime := *finalLease.Spec.AcquireTime + + require.Greater(t, finalTransitions, startTransitions) + require.Greater(t, finalTime.UnixNano(), startTime.UnixNano()) + + time.Sleep(2 * time.Minute) // need to give clients time to notice this change because leader election is polling based + + return finalLease +} + +func forceRestart(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { + t.Helper() + + startLease := waitForIdentity(ctx, t, namespace, leaseName, clients) + + err := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients). + Kubernetes.CoordinationV1().Leases(namespace.Name).Delete(ctx, leaseName, metav1.DeleteOptions{}) + require.NoError(t, err) + + newLease := waitForIdentity(ctx, t, namespace, leaseName, clients) + require.Zero(t, *newLease.Spec.LeaseTransitions) + require.Greater(t, newLease.Spec.AcquireTime.UnixNano(), startLease.Spec.AcquireTime.UnixNano()) + + time.Sleep(2 * time.Minute) // need to give clients time to notice this change because leader election is polling based + + return newLease +} + +func logLease(t *testing.T, lease *coordinationv1.Lease) { + t.Helper() + + bytes, err := json.MarshalIndent(lease, "", "\t") + require.NoError(t, err) + + t.Logf("current lease:\n%s", string(bytes)) +} diff --git a/test/testlib/client.go b/test/testlib/client.go index 71396858..46684ff2 100644 --- a/test/testlib/client.go +++ b/test/testlib/client.go @@ -137,13 +137,19 @@ func newAnonymousClientRestConfigWithCertAndKeyAdded(t *testing.T, clientCertifi return config } +func NewKubeclientOptions(t *testing.T, config *rest.Config) []kubeclient.Option { + t.Helper() + + return []kubeclient.Option{ + kubeclient.WithConfig(config), + kubeclient.WithMiddleware(groupsuffix.New(IntegrationEnv(t).APIGroupSuffix)), + } +} + func NewKubeclient(t *testing.T, config *rest.Config) *kubeclient.Client { t.Helper() - env := IntegrationEnv(t) - client, err := kubeclient.New( - kubeclient.WithConfig(config), - kubeclient.WithMiddleware(groupsuffix.New(env.APIGroupSuffix)), - ) + + client, err := kubeclient.New(NewKubeclientOptions(t, config)...) require.NoError(t, err) return client } @@ -502,6 +508,30 @@ func CreatePod(ctx context.Context, t *testing.T, name, namespace string, spec c return result } +func CreateNamespace(ctx context.Context, t *testing.T, name string) *corev1.Namespace { + t.Helper() + + adminClient := NewKubernetesClientset(t) + + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + namespace, err := adminClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{GenerateName: name + "-integration-test-"}, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + t.Logf("cleaning up test namespace %s", namespace.Name) + require.NoError(t, adminClient.CoreV1().Namespaces().Delete(ctx, namespace.Name, metav1.DeleteOptions{})) + }) + + return namespace +} + func WaitForUserToHaveAccess(t *testing.T, user string, groups []string, shouldHaveAccessTo *authorizationv1.ResourceAttributes) { t.Helper() client := NewKubernetesClientset(t)