From 5e06c6d5ad5730bd2845507a67b33e307b057ff7 Mon Sep 17 00:00:00 2001 From: Ryan Richard Date: Mon, 25 Sep 2023 09:51:17 -0700 Subject: [PATCH] add integration test for graceful shutdowns which release leader leases --- test/integration/main_test.go | 12 +- test/integration/pod_shutdown_test.go | 232 ++++++++++++++++++++++++++ test/testlib/env.go | 45 +++-- test/testlib/skip.go | 6 +- 4 files changed, 275 insertions(+), 20 deletions(-) create mode 100644 test/integration/pod_shutdown_test.go diff --git a/test/integration/main_test.go b/test/integration/main_test.go index 6efc897b..5995ef0b 100644 --- a/test/integration/main_test.go +++ b/test/integration/main_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 the Pinniped contributors. All Rights Reserved. +// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package integration @@ -65,8 +65,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) { serialTest := testing.InternalTest{ Name: "TestIntegrationSerial", F: func(t *testing.T) { - _ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests - t.Parallel() // outer test always runs in parallel for this bucket + testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests + t.Parallel() // outer test always runs in parallel for this bucket for _, test := range serialTests { test := test @@ -80,8 +80,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) { parallelTest := testing.InternalTest{ Name: "TestIntegrationParallel", F: func(t *testing.T) { - _ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests - t.Parallel() // outer test always runs in parallel for this bucket + testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests + t.Parallel() // outer test always runs in parallel for this bucket for _, test := range parallelTests { test := test @@ -97,7 +97,7 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) { disruptiveTest := testing.InternalTest{ Name: "TestIntegrationDisruptive", F: func(t *testing.T) { - _ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests + testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests // outer test never runs in parallel for this bucket for _, test := range disruptiveTests { diff --git a/test/integration/pod_shutdown_test.go b/test/integration/pod_shutdown_test.go new file mode 100644 index 00000000..77f08371 --- /dev/null +++ b/test/integration/pod_shutdown_test.go @@ -0,0 +1,232 @@ +// Copyright 2023 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "bytes" + "context" + "io" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/strings/slices" + + "go.pinniped.dev/test/testlib" +) + +// TestPodShutdown_Disruptive is intended to test that the Supervisor and Concierge pods can +// perform a graceful shutdown. Most importantly, the leader pods should give up their leases +// before they die. +// Never run this test in parallel since deleting the pods is disruptive, see main_test.go. +func TestPodShutdown_Disruptive(t *testing.T) { + // Only run this test in CI on Kind clusters, because something about restarting the pods + // in this test breaks the "kubectl port-forward" commands that we are using in CI for + // AKS, EKS, and GKE clusters. The Go code that we wrote for graceful pod shutdown should + // not be sensitive to which distribution it runs on, so running this test only on Kind + // should give us sufficient coverage for what we are trying to test here. + env := testlib.IntegrationEnv(t, testlib.SkipPodRestartAssertions()). + WithKubeDistribution(testlib.KindDistro) + + testShutdownAllPodsOfApp(t, env, env.ConciergeNamespace, env.ConciergeAppName, "-kube-cert-agent-") + testShutdownAllPodsOfApp(t, env, env.SupervisorNamespace, env.SupervisorAppName, "") +} + +func testShutdownAllPodsOfApp(t *testing.T, env *testlib.TestEnv, namespace string, appName string, ignorePodsWithNameSubstring string) { + // Precondition: the app should have some pods running initially. + initialPods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring) + require.Greater(t, len(initialPods), 0) + + // Precondition: the leader election lease should contain the name of one of the initial pods as the lease's holder. + waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName, + func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(initialPods), holder) }, 2*time.Minute) + + // Start tailing the logs of all the pods in background goroutines. This struct will keep track + // of each background log tail. + type podLog struct { + pod corev1.Pod // which pod's logs are being tailed + tailDoneCh chan struct{} // this channel will be closed when it is safe to read from logsBuf + logsBuf *bytes.Buffer // the text of the logs will be put in this buffer + } + podLogs := make([]*podLog, 0) + // Skip tailing pod logs for test runs that are using alternate group suffixes. There seems to be a bug in our + // kubeclient package which causes an "unable to find resp serialier" (sic) error for pod log API responses when + // the middleware is active. Since we do not tail pod logs in production code (or anywhere else at this time), + // we don't need to fix that bug right now just for this test. + if env.APIGroupSuffix == "pinniped.dev" { + // For each pod, start tailing its logs. + for _, pod := range initialPods { + tailDoneCh, logTailBuf := tailFollowPodLogs(t, pod) + podLogs = append(podLogs, &podLog{ + pod: pod, + tailDoneCh: tailDoneCh, + logsBuf: logTailBuf, + }) + } + } + + // Scale down the deployment's number of replicas to 0, which will shut down all the pods. + originalScale := updateDeploymentScale(t, namespace, appName, 0) + + // When the test is over, restore the deployment to the original scale. + t.Cleanup(func() { + updateDeploymentScale(t, namespace, appName, originalScale) + + // Wait for all the new pods to be running. + var newPods []corev1.Pod + testlib.RequireEventually(t, func(requireEventually *require.Assertions) { + newPods = getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring) + requireEventually.Len(newPods, originalScale, "wanted pods to return to original scale") + }, 2*time.Minute, 200*time.Millisecond) + + // After a short time, leader election should have finished and the lease should contain the name of + // one of the new pods as the lease's holder. + waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName, + func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(newPods), holder) }, 1*time.Minute) + + t.Logf("new pod of Deployment %s/%s has acquired the leader election lease", namespace, appName) + }) + + // Double check: the deployment's previous scale should have equaled the actual number of running pods from + // the start of the test (before we scaled down). + require.Equal(t, len(initialPods), originalScale) + + // Now that we have adjusted the scale to 0, the pods should go away. + // Our pods are intended to gracefully shut down within a few seconds, so fail unless it happens fairly quickly. + testlib.RequireEventually(t, func(requireEventually *require.Assertions) { + pods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring) + requireEventually.Len(pods, 0, "wanted no pods but found some") + }, 20*time.Second, 200*time.Millisecond) + + // Look for some interesting log messages in each of the now-dead pod's logs, if we started tailing them above. + for _, pl := range podLogs { + // Wait for the logs of the now-dead pod to be finished collecting. + t.Logf("waiting for tail of pod logs for pod %q", pl.pod.Name) + <-pl.tailDoneCh + // Assert that the Kubernetes generic apiserver library has started and finished a graceful + // shutdown according to its log messages. This is to make sure that the whole graceful shutdown + // process was performed successfully and without being blocked. + require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] shutdown event","name":"ShutdownInitiated"`, + "did not find expected message in pod log for pod %q", pl.pod.Name) + require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] apiserver is exiting\n"`, + "did not find expected message in pod log for pod %q", pl.pod.Name) + t.Logf("found expected graceful-termination messages in the logs of pod %q", pl.pod.Name) + } + + // The leader election lease should already contain the empty string as the holder, because the old leader + // pod should have given up the lease during its graceful shutdown. + waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName, + func(holder string) bool { return holder == "" }, 200*time.Millisecond) +} + +// Given a list of pods, return a list of their names. +func namesOfPods(pods []corev1.Pod) []string { + names := make([]string, len(pods)) + for i, pod := range pods { + names[i] = pod.Name + } + return names +} + +func getRunningPodsByNamePrefix( + t *testing.T, + namespace string, + podNamePrefix string, + podNameExcludeSubstring string, +) (foundPods []corev1.Pod) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + client := testlib.NewKubernetesClientset(t) + + pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + require.NoError(t, err) + + for _, pod := range pods.Items { + if !strings.HasPrefix(pod.Name, podNamePrefix) { + continue + } + if podNameExcludeSubstring != "" && strings.Contains(pod.Name, podNameExcludeSubstring) { + continue + } + if pod.Status.Phase != corev1.PodRunning { + continue + } + foundPods = append(foundPods, pod) + } + + return foundPods +} + +func updateDeploymentScale(t *testing.T, namespace string, deploymentName string, newScale int) int { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + client := testlib.NewKubernetesClientset(t) + + initialScale, err := client.AppsV1().Deployments(namespace).GetScale(ctx, deploymentName, metav1.GetOptions{}) + require.NoError(t, err) + + desiredScale := initialScale.DeepCopy() + desiredScale.Spec.Replicas = int32(newScale) + updatedScale, err := client.AppsV1().Deployments(namespace).UpdateScale(ctx, deploymentName, desiredScale, metav1.UpdateOptions{}) + require.NoError(t, err) + t.Logf("updated scale of Deployment %s/%s from %d to %d", + namespace, deploymentName, initialScale.Spec.Replicas, updatedScale.Spec.Replicas) + + return int(initialScale.Spec.Replicas) +} + +func tailFollowPodLogs(t *testing.T, pod corev1.Pod) (chan struct{}, *bytes.Buffer) { + t.Helper() + done := make(chan struct{}) + var buf bytes.Buffer + client := testlib.NewKubernetesClientset(t) + + go func() { + // At the end of this block, signal that we are done writing to the returned buf, + // so it is now safe to read the logs from the returned buf. + defer close(done) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + req := client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ + Follow: true, // keep streaming until completion + }) + + // This line should block until the pod dies or the context expires. + body, err := req.Stream(ctx) + require.NoError(t, err) + + _, err = io.Copy(&buf, body) + require.NoError(t, err) + + require.NoError(t, body.Close()) + }() + + return done, &buf +} + +func waitForLeaderElectionLeaseToHaveHolderIdentity( + t *testing.T, + namespace string, + leaseName string, + holderIdentityPredicate func(string) bool, + waitDuration time.Duration, +) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + client := testlib.NewKubernetesClientset(t) + + testlib.RequireEventually(t, func(requireEventually *require.Assertions) { + lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{}) + requireEventually.NoError(err) + requireEventually.True(holderIdentityPredicate(*lease.Spec.HolderIdentity)) + }, waitDuration, 200*time.Millisecond) +} diff --git a/test/testlib/env.go b/test/testlib/env.go index 387d8a08..ecbb432e 100644 --- a/test/testlib/env.go +++ b/test/testlib/env.go @@ -37,6 +37,8 @@ const ( type TestEnv struct { t *testing.T + skipPodRestartAssertions bool + ToolsNamespace string `json:"toolsNamespace"` ConciergeNamespace string `json:"conciergeNamespace"` SupervisorNamespace string `json:"supervisorNamespace"` @@ -120,15 +122,28 @@ func (e *TestEnv) ProxyEnv() []string { // environment parsing N times per test and so that any implicit assertions happen only once. var memoizedTestEnvsByTest sync.Map //nolint:gochecknoglobals +type TestEnvOption func(env *TestEnv) + +// SkipPodRestartAssertions is a functional option that can be passed to IntegrationEnv() +// to skip using the implicit assertions which check that no pods get restarted during tests. +// Please using this sparingly, since most pod restarts are caused by unintentional crashes +// and should therefore cause tests to fail. +func SkipPodRestartAssertions() TestEnvOption { + return func(t *TestEnv) { + t.skipPodRestartAssertions = true + t.t.Log("skipping pod restart assertions for test", t.t.Name()) + } +} + // IntegrationEnv gets the integration test environment from OS environment variables. This // method also implies SkipUnlessIntegration(). -func IntegrationEnv(t *testing.T) *TestEnv { +func IntegrationEnv(t *testing.T, opts ...TestEnvOption) *TestEnv { if existing, exists := memoizedTestEnvsByTest.Load(t); exists { return existing.(*TestEnv) } t.Helper() - skipUnlessIntegration(t) + SkipUnlessIntegration(t) capabilitiesDescriptionYAML := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_YAML") capabilitiesDescriptionFile := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_FILE") @@ -142,18 +157,26 @@ func IntegrationEnv(t *testing.T) *TestEnv { require.NoError(t, err) } - var result TestEnv - err := yaml.Unmarshal([]byte(capabilitiesDescriptionYAML), &result) + var testEnv TestEnv + err := yaml.Unmarshal([]byte(capabilitiesDescriptionYAML), &testEnv) require.NoErrorf(t, err, "capabilities specification was invalid YAML") - loadEnvVars(t, &result) - result.t = t - memoizedTestEnvsByTest.Store(t, &result) + loadEnvVars(t, &testEnv) + testEnv.t = t - // In every integration test, assert that no pods in our namespaces restart during the test. - assertNoRestartsDuringTest(t, result.ConciergeNamespace, "!pinniped.dev/test") - assertNoRestartsDuringTest(t, result.SupervisorNamespace, "!pinniped.dev/test") - return &result + for _, opt := range opts { + opt(&testEnv) + } + + memoizedTestEnvsByTest.Store(t, &testEnv) + + // By default, in every integration test, assert that no pods in our namespaces restart during the test. + if !testEnv.skipPodRestartAssertions { + assertNoRestartsDuringTest(t, testEnv.ConciergeNamespace, "!pinniped.dev/test") + assertNoRestartsDuringTest(t, testEnv.SupervisorNamespace, "!pinniped.dev/test") + } + + return &testEnv } func needEnv(t *testing.T, key string) string { diff --git a/test/testlib/skip.go b/test/testlib/skip.go index 8a382b18..6fbc621f 100644 --- a/test/testlib/skip.go +++ b/test/testlib/skip.go @@ -1,12 +1,12 @@ -// Copyright 2020-2022 the Pinniped contributors. All Rights Reserved. +// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package testlib import "testing" -// skipUnlessIntegration skips the current test if `-short` has been passed to `go test`. -func skipUnlessIntegration(t *testing.T) { +// SkipUnlessIntegration skips the current test if `-short` has been passed to `go test`. +func SkipUnlessIntegration(t *testing.T) { t.Helper() if testing.Short() {