// Copyright 2021-2023 the Pinniped contributors. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package integration import ( "context" "encoding/json" "testing" "time" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" authenticationv1 "k8s.io/api/authentication/v1" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/retry" "k8s.io/utils/ptr" "go.pinniped.dev/internal/downward" "go.pinniped.dev/internal/kubeclient" "go.pinniped.dev/internal/leaderelection" "go.pinniped.dev/test/testlib" ) // safe to run in parallel with serial tests since it only interacts with a test local lease, see main_test.go. func TestLeaderElection_Parallel(t *testing.T) { _ = testlib.IntegrationEnv(t) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) t.Cleanup(cancel) leaseName := "leader-election-" + rand.String(5) namespace := testlib.CreateNamespace(ctx, t, leaseName) clients, cancels := leaderElectionClients(t, namespace, leaseName) // the tests below are order dependant to some degree and definitely cannot be run in parallel t.Run("sanity check write prevention", func(t *testing.T) { lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients) logLease(t, lease) }) t.Run("clients handle leader election transition correctly", func(t *testing.T) { lease := forceTransition(ctx, t, namespace, leaseName, clients) logLease(t, lease) }) t.Run("sanity check write prevention after transition", func(t *testing.T) { lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients) logLease(t, lease) }) t.Run("clients handle leader election restart correctly", func(t *testing.T) { lease := forceRestart(ctx, t, namespace, leaseName, clients) logLease(t, lease) }) t.Run("sanity check write prevention after restart", func(t *testing.T) { lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients) logLease(t, lease) }) t.Run("stop current leader", func(t *testing.T) { startLease := waitForIdentity(ctx, t, namespace, leaseName, clients) startTransitions := *startLease.Spec.LeaseTransitions startTime := *startLease.Spec.AcquireTime startLeaderIdentity := *startLease.Spec.HolderIdentity leaderClient := clients[startLeaderIdentity] err := runWriteRequest(ctx, leaderClient) require.NoError(t, err) // emulate stopping the leader process cancels[startLeaderIdentity]() delete(clients, startLeaderIdentity) testlib.RequireEventually(t, func(requireEventually *require.Assertions) { err := runWriteRequest(ctx, leaderClient) requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "leader should no longer be able to write") }, time.Minute, time.Second) if len(clients) > 0 { finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients) finalTransitions := *finalLease.Spec.LeaseTransitions finalTime := *finalLease.Spec.AcquireTime finalLeaderIdentity := *finalLease.Spec.HolderIdentity require.Greater(t, finalTransitions, startTransitions) require.Greater(t, finalTime.UnixNano(), startTime.UnixNano()) require.NotEqual(t, startLeaderIdentity, finalLeaderIdentity, "should have elected new leader") logLease(t, finalLease) } }) t.Run("sanity check write prevention after stopping leader", func(t *testing.T) { if len(clients) == 0 { t.Skip("no clients left to check") } lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients) logLease(t, lease) }) } func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) (*kubeclient.Client, context.CancelFunc) { t.Helper() podInfo := &downward.PodInfo{ Namespace: namespace.Name, Name: identity, } deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: leaseName}} client, leaderElector, err := leaderelection.New(podInfo, deployment, testlib.NewKubeclientOptions(t, testlib.NewClientConfig(t))...) require.NoError(t, err) controllerCtx, controllerCancel := context.WithCancel(context.Background()) leaderCtx, leaderCancel := context.WithCancel(context.Background()) t.Cleanup(func() { controllerCancel() select { case <-leaderCtx.Done(): // leader election client stopped correctly case <-time.After(time.Minute): t.Errorf("leader election client in namespace %q with lease %q and identity %q failed to stop", namespace.Name, leaseName, identity) } }) go func() { time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize start of client and controllers // this blocks leaderElector(controllerCtx, func(ctx context.Context) { <-ctx.Done() time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize stop of controllers }) select { case <-controllerCtx.Done(): // leaderElector correctly stopped but only after controllers stopped default: t.Errorf("leader election client in namespace %q with lease %q and identity %q stopped early", namespace.Name, leaseName, identity) } leaderCancel() }() return client, controllerCancel } func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) (map[string]*kubeclient.Client, map[string]context.CancelFunc) { t.Helper() count := rand.IntnRange(1, 6) clients := make(map[string]*kubeclient.Client, count) cancels := make(map[string]context.CancelFunc, count) for i := 0; i < count; i++ { identity := "leader-election-client-" + rand.String(5) clients[identity], cancels[identity] = leaderElectionClient(t, namespace, leaseName, identity) } t.Logf("running leader election client tests with %d clients: %v", len(clients), sets.StringKeySet(clients).List()) return clients, cancels } func pickRandomLeaderElectionClient(clients map[string]*kubeclient.Client) *kubeclient.Client { for _, client := range clients { client := client return client } panic("clients map was empty") } func waitForIdentity(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { t.Helper() identities := sets.StringKeySet(clients) var out *coordinationv1.Lease testlib.RequireEventuallyWithoutError(t, func() (bool, error) { lease, err := pickRandomLeaderElectionClient(clients).Kubernetes.CoordinationV1().Leases(namespace.Name).Get(ctx, leaseName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { t.Logf("lease %s/%s does not exist", namespace.Name, leaseName) return false, nil } if err != nil { return false, err } out = lease t.Logf("lease %s/%s - current leader identity: %s, valid leader identities: %s", namespace.Name, leaseName, ptr.Deref(lease.Spec.HolderIdentity, ""), identities.List()) return lease.Spec.HolderIdentity != nil && identities.Has(*lease.Spec.HolderIdentity), nil }, 10*time.Minute, 10*time.Second) return out } func runWriteRequest(ctx context.Context, client *kubeclient.Client) error { _, err := client.Kubernetes.AuthenticationV1().TokenReviews().Create(ctx, &authenticationv1.TokenReview{ Spec: authenticationv1.TokenReviewSpec{Token: "any-non-empty-value"}, }, metav1.CreateOptions{}) return err } func runWriteRequests(ctx context.Context, clients map[string]*kubeclient.Client) map[string]error { out := make(map[string]error, len(clients)) for identity, client := range clients { identity, client := identity, client out[identity] = runWriteRequest(ctx, client) } return out } func pickCurrentLeaderClient(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *kubeclient.Client { t.Helper() lease := waitForIdentity(ctx, t, namespace, leaseName, clients) return clients[*lease.Spec.HolderIdentity] } func checkOnlyLeaderCanWrite(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { t.Helper() lease := waitForIdentity(ctx, t, namespace, leaseName, clients) testlib.RequireEventually(t, func(requireEventually *require.Assertions) { var leaders, nonLeaders int for identity, err := range runWriteRequests(ctx, clients) { identity, err := identity, err if identity == *lease.Spec.HolderIdentity { leaders++ requireEventually.NoError(err, "leader client %q should have no error", identity) } else { nonLeaders++ requireEventually.Error(err, "non leader client %q should have write error but it was nil", identity) requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "non leader client %q should have write error: %v", identity, err) } } requireEventually.Equal(1, leaders, "did not see leader") requireEventually.Equal(len(clients)-1, nonLeaders, "did not see non-leader") }, 3*time.Minute, 3*time.Second) return lease } func forceTransition(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { t.Helper() var startTransitions int32 var startTime metav1.MicroTime errRetry := retry.RetryOnConflict(retry.DefaultBackoff, func() error { startLease := waitForIdentity(ctx, t, namespace, leaseName, clients) startTransitions = *startLease.Spec.LeaseTransitions startTime = *startLease.Spec.AcquireTime startLease = startLease.DeepCopy() startLease.Spec.HolderIdentity = ptr.To("some-other-client-" + rand.String(5)) _, err := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients). Kubernetes.CoordinationV1().Leases(namespace.Name).Update(ctx, startLease, metav1.UpdateOptions{}) return err }) require.NoError(t, errRetry) finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients) finalTransitions := *finalLease.Spec.LeaseTransitions finalTime := *finalLease.Spec.AcquireTime require.Greater(t, finalTransitions, startTransitions) require.Greater(t, finalTime.UnixNano(), startTime.UnixNano()) return finalLease } func forceRestart(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease { t.Helper() startLease := waitForIdentity(ctx, t, namespace, leaseName, clients) err := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients). Kubernetes.CoordinationV1().Leases(namespace.Name).Delete(ctx, leaseName, metav1.DeleteOptions{}) require.NoError(t, err) newLease := waitForIdentity(ctx, t, namespace, leaseName, clients) require.Zero(t, *newLease.Spec.LeaseTransitions) require.Greater(t, newLease.Spec.AcquireTime.UnixNano(), startLease.Spec.AcquireTime.UnixNano()) return newLease } func logLease(t *testing.T, lease *coordinationv1.Lease) { t.Helper() bytes, err := json.MarshalIndent(lease, "", "\t") require.NoError(t, err) t.Logf("current lease:\n%s", string(bytes)) }