ContainerImage.Pinniped/test/integration/leaderelection_test.go

321 lines
11 KiB
Go
Raw Permalink Normal View History

2023-01-18 22:39:22 +00:00
// Copyright 2021-2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package integration
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
authenticationv1 "k8s.io/api/authentication/v1"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
"go.pinniped.dev/internal/downward"
"go.pinniped.dev/internal/kubeclient"
"go.pinniped.dev/internal/leaderelection"
"go.pinniped.dev/test/testlib"
)
// safe to run in parallel with serial tests since it only interacts with a test local lease, see main_test.go.
func TestLeaderElection_Parallel(t *testing.T) {
_ = testlib.IntegrationEnv(t)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
t.Cleanup(cancel)
leaseName := "leader-election-" + rand.String(5)
namespace := testlib.CreateNamespace(ctx, t, leaseName)
clients, cancels := leaderElectionClients(t, namespace, leaseName)
// the tests below are order dependant to some degree and definitely cannot be run in parallel
t.Run("sanity check write prevention", func(t *testing.T) {
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
logLease(t, lease)
})
t.Run("clients handle leader election transition correctly", func(t *testing.T) {
lease := forceTransition(ctx, t, namespace, leaseName, clients)
logLease(t, lease)
})
t.Run("sanity check write prevention after transition", func(t *testing.T) {
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
logLease(t, lease)
})
t.Run("clients handle leader election restart correctly", func(t *testing.T) {
lease := forceRestart(ctx, t, namespace, leaseName, clients)
logLease(t, lease)
})
t.Run("sanity check write prevention after restart", func(t *testing.T) {
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
logLease(t, lease)
})
t.Run("stop current leader", func(t *testing.T) {
startLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
startTransitions := *startLease.Spec.LeaseTransitions
startTime := *startLease.Spec.AcquireTime
startLeaderIdentity := *startLease.Spec.HolderIdentity
leaderClient := clients[startLeaderIdentity]
err := runWriteRequest(ctx, leaderClient)
require.NoError(t, err)
// emulate stopping the leader process
cancels[startLeaderIdentity]()
delete(clients, startLeaderIdentity)
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
err := runWriteRequest(ctx, leaderClient)
requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "leader should no longer be able to write")
}, time.Minute, time.Second)
if len(clients) > 0 {
finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
finalTransitions := *finalLease.Spec.LeaseTransitions
finalTime := *finalLease.Spec.AcquireTime
finalLeaderIdentity := *finalLease.Spec.HolderIdentity
require.Greater(t, finalTransitions, startTransitions)
require.Greater(t, finalTime.UnixNano(), startTime.UnixNano())
require.NotEqual(t, startLeaderIdentity, finalLeaderIdentity, "should have elected new leader")
logLease(t, finalLease)
}
})
t.Run("sanity check write prevention after stopping leader", func(t *testing.T) {
if len(clients) == 0 {
t.Skip("no clients left to check")
}
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
logLease(t, lease)
})
}
func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) (*kubeclient.Client, context.CancelFunc) {
t.Helper()
podInfo := &downward.PodInfo{
Namespace: namespace.Name,
Name: identity,
}
deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: leaseName}}
client, leaderElector, err := leaderelection.New(podInfo, deployment, testlib.NewKubeclientOptions(t, testlib.NewClientConfig(t))...)
require.NoError(t, err)
controllerCtx, controllerCancel := context.WithCancel(context.Background())
leaderCtx, leaderCancel := context.WithCancel(context.Background())
t.Cleanup(func() {
controllerCancel()
select {
case <-leaderCtx.Done():
// leader election client stopped correctly
case <-time.After(time.Minute):
t.Errorf("leader election client in namespace %q with lease %q and identity %q failed to stop",
namespace.Name, leaseName, identity)
}
})
go func() {
time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize start of client and controllers
// this blocks
leaderElector(controllerCtx, func(ctx context.Context) {
<-ctx.Done()
time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize stop of controllers
})
select {
case <-controllerCtx.Done():
// leaderElector correctly stopped but only after controllers stopped
default:
t.Errorf("leader election client in namespace %q with lease %q and identity %q stopped early",
namespace.Name, leaseName, identity)
}
leaderCancel()
}()
return client, controllerCancel
}
func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) (map[string]*kubeclient.Client, map[string]context.CancelFunc) {
t.Helper()
count := rand.IntnRange(1, 6)
clients := make(map[string]*kubeclient.Client, count)
cancels := make(map[string]context.CancelFunc, count)
for i := 0; i < count; i++ {
identity := "leader-election-client-" + rand.String(5)
clients[identity], cancels[identity] = leaderElectionClient(t, namespace, leaseName, identity)
}
t.Logf("running leader election client tests with %d clients: %v", len(clients), sets.StringKeySet(clients).List())
return clients, cancels
}
func pickRandomLeaderElectionClient(clients map[string]*kubeclient.Client) *kubeclient.Client {
for _, client := range clients {
client := client
return client
}
panic("clients map was empty")
}
func waitForIdentity(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
t.Helper()
identities := sets.StringKeySet(clients)
var out *coordinationv1.Lease
testlib.RequireEventuallyWithoutError(t, func() (bool, error) {
lease, err := pickRandomLeaderElectionClient(clients).Kubernetes.CoordinationV1().Leases(namespace.Name).Get(ctx, leaseName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
t.Logf("lease %s/%s does not exist", namespace.Name, leaseName)
return false, nil
}
if err != nil {
return false, err
}
out = lease
t.Logf("lease %s/%s - current leader identity: %s, valid leader identities: %s",
namespace.Name, leaseName, ptr.Deref(lease.Spec.HolderIdentity, "<nil>"), identities.List())
return lease.Spec.HolderIdentity != nil && identities.Has(*lease.Spec.HolderIdentity), nil
}, 10*time.Minute, 10*time.Second)
return out
}
func runWriteRequest(ctx context.Context, client *kubeclient.Client) error {
_, err := client.Kubernetes.AuthenticationV1().TokenReviews().Create(ctx, &authenticationv1.TokenReview{
Spec: authenticationv1.TokenReviewSpec{Token: "any-non-empty-value"},
}, metav1.CreateOptions{})
return err
}
func runWriteRequests(ctx context.Context, clients map[string]*kubeclient.Client) map[string]error {
out := make(map[string]error, len(clients))
for identity, client := range clients {
identity, client := identity, client
out[identity] = runWriteRequest(ctx, client)
}
return out
}
func pickCurrentLeaderClient(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *kubeclient.Client {
t.Helper()
lease := waitForIdentity(ctx, t, namespace, leaseName, clients)
return clients[*lease.Spec.HolderIdentity]
}
func checkOnlyLeaderCanWrite(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
t.Helper()
lease := waitForIdentity(ctx, t, namespace, leaseName, clients)
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
var leaders, nonLeaders int
for identity, err := range runWriteRequests(ctx, clients) {
identity, err := identity, err
if identity == *lease.Spec.HolderIdentity {
leaders++
requireEventually.NoError(err, "leader client %q should have no error", identity)
} else {
nonLeaders++
requireEventually.Error(err, "non leader client %q should have write error but it was nil", identity)
requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "non leader client %q should have write error: %v", identity, err)
}
}
requireEventually.Equal(1, leaders, "did not see leader")
requireEventually.Equal(len(clients)-1, nonLeaders, "did not see non-leader")
}, 3*time.Minute, 3*time.Second)
return lease
}
func forceTransition(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
t.Helper()
var startTransitions int32
var startTime metav1.MicroTime
errRetry := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
startLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
startTransitions = *startLease.Spec.LeaseTransitions
startTime = *startLease.Spec.AcquireTime
startLease = startLease.DeepCopy()
startLease.Spec.HolderIdentity = ptr.To("some-other-client-" + rand.String(5))
_, err := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients).
Kubernetes.CoordinationV1().Leases(namespace.Name).Update(ctx, startLease, metav1.UpdateOptions{})
return err
})
require.NoError(t, errRetry)
finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
finalTransitions := *finalLease.Spec.LeaseTransitions
finalTime := *finalLease.Spec.AcquireTime
require.Greater(t, finalTransitions, startTransitions)
require.Greater(t, finalTime.UnixNano(), startTime.UnixNano())
return finalLease
}
func forceRestart(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
t.Helper()
startLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
err := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients).
Kubernetes.CoordinationV1().Leases(namespace.Name).Delete(ctx, leaseName, metav1.DeleteOptions{})
require.NoError(t, err)
newLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
require.Zero(t, *newLease.Spec.LeaseTransitions)
require.Greater(t, newLease.Spec.AcquireTime.UnixNano(), startLease.Spec.AcquireTime.UnixNano())
return newLease
}
func logLease(t *testing.T, lease *coordinationv1.Lease) {
t.Helper()
bytes, err := json.MarshalIndent(lease, "", "\t")
require.NoError(t, err)
t.Logf("current lease:\n%s", string(bytes))
}