Merge pull request #1688 from vmware-tanzu/fix_shutdown_deadlock
Fix deadlock during shutdown which prevented leader election cleanup
This commit is contained in:
commit
58c5146592
@ -107,39 +107,45 @@ func (c completedConfig) New() (*PinnipedServer, error) {
|
|||||||
return nil, fmt.Errorf("could not install API groups: %w", err)
|
return nil, fmt.Errorf("could not install API groups: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shutdown := &sync.WaitGroup{}
|
controllersShutdownWaitGroup := &sync.WaitGroup{}
|
||||||
|
controllersCtx, cancelControllerCtx := context.WithCancel(context.Background())
|
||||||
|
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
|
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
|
||||||
func(postStartContext genericapiserver.PostStartHookContext) error {
|
func(postStartContext genericapiserver.PostStartHookContext) error {
|
||||||
plog.Debug("start-controllers post start hook starting")
|
plog.Debug("start-controllers post start hook starting")
|
||||||
|
defer plog.Debug("start-controllers post start hook completed")
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(controllersCtx)
|
||||||
go func() {
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
<-postStartContext.StopCh
|
|
||||||
}()
|
|
||||||
|
|
||||||
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot create run controller func: %w", err)
|
return fmt.Errorf("cannot create run controller func: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shutdown.Add(1)
|
controllersShutdownWaitGroup.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer shutdown.Done()
|
// When this goroutine ends, then also end the WaitGroup, allowing anyone who called Wait() to proceed.
|
||||||
|
defer controllersShutdownWaitGroup.Done()
|
||||||
|
|
||||||
runControllers(ctx)
|
// Start the controllers and block until their context is cancelled and they have shut down.
|
||||||
|
runControllers(controllersCtx)
|
||||||
|
plog.Debug("start-controllers post start hook's background goroutine saw runControllers() finish")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
|
s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
|
||||||
func() error {
|
func() error {
|
||||||
plog.Debug("stop-controllers pre shutdown hook starting")
|
plog.Debug("stop-controllers pre shutdown hook starting")
|
||||||
defer plog.Debug("stop-controllers pre shutdown hook completed")
|
defer plog.Debug("stop-controllers pre shutdown hook completed")
|
||||||
|
|
||||||
shutdown.Wait()
|
// The generic api server is telling us that it wants to shut down, so tell our controllers that we
|
||||||
|
// want them to shut down by cancelling their context.
|
||||||
|
cancelControllerCtx()
|
||||||
|
|
||||||
|
// Now wait for the controllers to finish shutting down. By blocking here, we prevent the generic api server's
|
||||||
|
// graceful shutdown process from continuing until we are finished shutting down our own controllers.
|
||||||
|
controllersShutdownWaitGroup.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -100,6 +100,7 @@ func addCommandlineFlagsToCommand(cmd *cobra.Command, app *App) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Boot the aggregated API server, which will in turn boot the controllers.
|
// Boot the aggregated API server, which will in turn boot the controllers.
|
||||||
|
// In practice, the ctx passed in should be one which will be cancelled when the process receives SIGTERM or SIGINT.
|
||||||
func (a *App) runServer(ctx context.Context) error {
|
func (a *App) runServer(ctx context.Context) error {
|
||||||
// Read the server config file.
|
// Read the server config file.
|
||||||
cfg, err := concierge.FromPath(ctx, a.configPath)
|
cfg, err := concierge.FromPath(ctx, a.configPath)
|
||||||
@ -186,7 +187,9 @@ func (a *App) runServer(ctx context.Context) error {
|
|||||||
return fmt.Errorf("could not create aggregated API server: %w", err)
|
return fmt.Errorf("could not create aggregated API server: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the server. Its post-start hook will start the controllers.
|
// Run the server. Its post-start hook will start the controllers. Its pre shutdown hook will be called when ctx is
|
||||||
|
// cancelled, and that hook should graceful stop the controllers and give up the leader election lease. See the
|
||||||
|
// code for these hooks in internal/concierge/apiserver.go.
|
||||||
return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
|
return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,8 +279,16 @@ func main() error {
|
|||||||
"time-since-build", timeSinceCompile,
|
"time-since-build", timeSinceCompile,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// This context will be cancelled upon the first SIGTERM or SIGINT, and will os.Exit() to kill the process
|
||||||
|
// upon the second SIGTERM or SIGINT.
|
||||||
ctx := genericapiserver.SetupSignalContext()
|
ctx := genericapiserver.SetupSignalContext()
|
||||||
|
|
||||||
|
// Just for debugging purposes, log when the first signal is received.
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done() // wait for the Done channel to be closed, indicating that ctx was cancelled by the signal handler
|
||||||
|
plog.Debug("concierge shutdown initiated due to process receiving SIGTERM or SIGINT")
|
||||||
|
}()
|
||||||
|
|
||||||
return New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run()
|
return New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
|
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
package controllerlib
|
package controllerlib
|
||||||
@ -102,6 +102,7 @@ func (c *controller) Run(ctx context.Context, workers int) {
|
|||||||
workerContext, workerContextCancel := context.WithCancel(context.Background())
|
workerContext, workerContextCancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
plog.Debug("starting to shut down controller workers", "controller", c.Name(), "workers", workers)
|
||||||
c.queue.ShutDown() // shutdown the controller queue first
|
c.queue.ShutDown() // shutdown the controller queue first
|
||||||
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
|
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
|
||||||
|
|
||||||
@ -126,7 +127,9 @@ func (c *controller) Run(ctx context.Context, workers int) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
plog.Debug("controller started", "controller", c.Name(), "workers", workers)
|
||||||
<-ctx.Done() // wait for controller context to be cancelled
|
<-ctx.Done() // wait for controller context to be cancelled
|
||||||
|
plog.Debug("controller context cancelled, next will terminate workers", "controller", c.Name(), "workers", workers)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) invokeAllRunOpts() {
|
func (c *controller) invokeAllRunOpts() {
|
||||||
|
@ -98,6 +98,7 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
||||||
|
plog.Debug("leader election saw controllers have stopped")
|
||||||
|
|
||||||
if isLeader.stop() { // remove our in-memory leader status before we release the lock
|
if isLeader.stop() { // remove our in-memory leader status before we release the lock
|
||||||
plog.Debug("leader lost", "identity", identity, "reason", "controller stop")
|
plog.Debug("leader lost", "identity", identity, "reason", "controller stop")
|
||||||
|
@ -113,39 +113,45 @@ func (c completedConfig) New() (*PinnipedServer, error) {
|
|||||||
return nil, fmt.Errorf("could not install API groups: %w", err)
|
return nil, fmt.Errorf("could not install API groups: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shutdown := &sync.WaitGroup{}
|
controllersShutdownWaitGroup := &sync.WaitGroup{}
|
||||||
|
controllersCtx, cancelControllerCtx := context.WithCancel(context.Background())
|
||||||
|
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
|
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
|
||||||
func(postStartContext genericapiserver.PostStartHookContext) error {
|
func(postStartContext genericapiserver.PostStartHookContext) error {
|
||||||
plog.Debug("start-controllers post start hook starting")
|
plog.Debug("start-controllers post start hook starting")
|
||||||
|
defer plog.Debug("start-controllers post start hook completed")
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(controllersCtx)
|
||||||
go func() {
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
<-postStartContext.StopCh
|
|
||||||
}()
|
|
||||||
|
|
||||||
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot create run controller func: %w", err)
|
return fmt.Errorf("cannot create run controller func: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shutdown.Add(1)
|
controllersShutdownWaitGroup.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer shutdown.Done()
|
// When this goroutine ends, then also end the WaitGroup, allowing anyone who called Wait() to proceed.
|
||||||
|
defer controllersShutdownWaitGroup.Done()
|
||||||
|
|
||||||
runControllers(ctx)
|
// Start the controllers and block until their context is cancelled and they have shut down.
|
||||||
|
runControllers(controllersCtx)
|
||||||
|
plog.Debug("start-controllers post start hook's background goroutine saw runControllers() finish")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
|
s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
|
||||||
func() error {
|
func() error {
|
||||||
plog.Debug("stop-controllers pre shutdown hook starting")
|
plog.Debug("stop-controllers pre shutdown hook starting")
|
||||||
defer plog.Debug("stop-controllers pre shutdown hook completed")
|
defer plog.Debug("stop-controllers pre shutdown hook completed")
|
||||||
|
|
||||||
shutdown.Wait()
|
// The generic api server is telling us that it wants to shut down, so tell our controllers that we
|
||||||
|
// want them to shut down by cancelling their context.
|
||||||
|
cancelControllerCtx()
|
||||||
|
|
||||||
|
// Now wait for the controllers to finish shutting down. By blocking here, we prevent the generic api server's
|
||||||
|
// graceful shutdown process from continuing until we are finished shutting down our own controllers.
|
||||||
|
controllersShutdownWaitGroup.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
@ -382,8 +382,10 @@ func prepareControllers(
|
|||||||
return controllerinit.Prepare(controllerManager.Start, leaderElector, kubeInformers, pinnipedInformers)
|
return controllerinit.Prepare(controllerManager.Start, leaderElector, kubeInformers, pinnipedInformers)
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:funlen
|
// Boot the aggregated API server, which will in turn boot the controllers. Also open the appropriate network ports
|
||||||
func runSupervisor(ctx context.Context, podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
// and start serving the health endpoint and the endpoints of the configured FederationDomains.
|
||||||
|
// In practice, the ctx passed in should be one which will be cancelled when the process receives SIGTERM or SIGINT.
|
||||||
|
func runSupervisor(ctx context.Context, podInfo *downward.PodInfo, cfg *supervisor.Config) error { //nolint:funlen
|
||||||
serverInstallationNamespace := podInfo.Namespace
|
serverInstallationNamespace := podInfo.Namespace
|
||||||
clientSecretSupervisorGroupData := groupsuffix.SupervisorAggregatedGroups(*cfg.APIGroupSuffix)
|
clientSecretSupervisorGroupData := groupsuffix.SupervisorAggregatedGroups(*cfg.APIGroupSuffix)
|
||||||
|
|
||||||
@ -575,7 +577,9 @@ func runSupervisor(ctx context.Context, podInfo *downward.PodInfo, cfg *supervis
|
|||||||
plog.Debug("supervisor started")
|
plog.Debug("supervisor started")
|
||||||
defer plog.Debug("supervisor exiting")
|
defer plog.Debug("supervisor exiting")
|
||||||
|
|
||||||
// Run the server. Its post-start hook will start the controllers.
|
// Run the server. Its post-start hook will start the controllers. Its pre shutdown hook will be called when ctx is
|
||||||
|
// cancelled, and that hook should graceful stop the controllers and give up the leader election lease. See the
|
||||||
|
// code for these hooks in internal/supervisor/apiserver.go.
|
||||||
err = server.GenericAPIServer.PrepareRun().Run(ctx.Done())
|
err = server.GenericAPIServer.PrepareRun().Run(ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2020-2021 the Pinniped contributors. All Rights Reserved.
|
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
package integration
|
package integration
|
||||||
@ -65,8 +65,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
|
|||||||
serialTest := testing.InternalTest{
|
serialTest := testing.InternalTest{
|
||||||
Name: "TestIntegrationSerial",
|
Name: "TestIntegrationSerial",
|
||||||
F: func(t *testing.T) {
|
F: func(t *testing.T) {
|
||||||
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
|
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
|
||||||
t.Parallel() // outer test always runs in parallel for this bucket
|
t.Parallel() // outer test always runs in parallel for this bucket
|
||||||
|
|
||||||
for _, test := range serialTests {
|
for _, test := range serialTests {
|
||||||
test := test
|
test := test
|
||||||
@ -80,8 +80,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
|
|||||||
parallelTest := testing.InternalTest{
|
parallelTest := testing.InternalTest{
|
||||||
Name: "TestIntegrationParallel",
|
Name: "TestIntegrationParallel",
|
||||||
F: func(t *testing.T) {
|
F: func(t *testing.T) {
|
||||||
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
|
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
|
||||||
t.Parallel() // outer test always runs in parallel for this bucket
|
t.Parallel() // outer test always runs in parallel for this bucket
|
||||||
|
|
||||||
for _, test := range parallelTests {
|
for _, test := range parallelTests {
|
||||||
test := test
|
test := test
|
||||||
@ -97,7 +97,7 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
|
|||||||
disruptiveTest := testing.InternalTest{
|
disruptiveTest := testing.InternalTest{
|
||||||
Name: "TestIntegrationDisruptive",
|
Name: "TestIntegrationDisruptive",
|
||||||
F: func(t *testing.T) {
|
F: func(t *testing.T) {
|
||||||
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
|
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
|
||||||
// outer test never runs in parallel for this bucket
|
// outer test never runs in parallel for this bucket
|
||||||
|
|
||||||
for _, test := range disruptiveTests {
|
for _, test := range disruptiveTests {
|
||||||
|
232
test/integration/pod_shutdown_test.go
Normal file
232
test/integration/pod_shutdown_test.go
Normal file
@ -0,0 +1,232 @@
|
|||||||
|
// Copyright 2023 the Pinniped contributors. All Rights Reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/utils/strings/slices"
|
||||||
|
|
||||||
|
"go.pinniped.dev/test/testlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestPodShutdown_Disruptive is intended to test that the Supervisor and Concierge pods can
|
||||||
|
// perform a graceful shutdown. Most importantly, the leader pods should give up their leases
|
||||||
|
// before they die.
|
||||||
|
// Never run this test in parallel since deleting the pods is disruptive, see main_test.go.
|
||||||
|
func TestPodShutdown_Disruptive(t *testing.T) {
|
||||||
|
// Only run this test in CI on Kind clusters, because something about restarting the pods
|
||||||
|
// in this test breaks the "kubectl port-forward" commands that we are using in CI for
|
||||||
|
// AKS, EKS, and GKE clusters. The Go code that we wrote for graceful pod shutdown should
|
||||||
|
// not be sensitive to which distribution it runs on, so running this test only on Kind
|
||||||
|
// should give us sufficient coverage for what we are trying to test here.
|
||||||
|
env := testlib.IntegrationEnv(t, testlib.SkipPodRestartAssertions()).
|
||||||
|
WithKubeDistribution(testlib.KindDistro)
|
||||||
|
|
||||||
|
testShutdownAllPodsOfApp(t, env, env.ConciergeNamespace, env.ConciergeAppName, "-kube-cert-agent-")
|
||||||
|
testShutdownAllPodsOfApp(t, env, env.SupervisorNamespace, env.SupervisorAppName, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func testShutdownAllPodsOfApp(t *testing.T, env *testlib.TestEnv, namespace string, appName string, ignorePodsWithNameSubstring string) {
|
||||||
|
// Precondition: the app should have some pods running initially.
|
||||||
|
initialPods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
|
||||||
|
require.Greater(t, len(initialPods), 0)
|
||||||
|
|
||||||
|
// Precondition: the leader election lease should contain the name of one of the initial pods as the lease's holder.
|
||||||
|
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
|
||||||
|
func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(initialPods), holder) }, 2*time.Minute)
|
||||||
|
|
||||||
|
// Start tailing the logs of all the pods in background goroutines. This struct will keep track
|
||||||
|
// of each background log tail.
|
||||||
|
type podLog struct {
|
||||||
|
pod corev1.Pod // which pod's logs are being tailed
|
||||||
|
tailDoneCh chan struct{} // this channel will be closed when it is safe to read from logsBuf
|
||||||
|
logsBuf *bytes.Buffer // the text of the logs will be put in this buffer
|
||||||
|
}
|
||||||
|
podLogs := make([]*podLog, 0)
|
||||||
|
// Skip tailing pod logs for test runs that are using alternate group suffixes. There seems to be a bug in our
|
||||||
|
// kubeclient package which causes an "unable to find resp serialier" (sic) error for pod log API responses when
|
||||||
|
// the middleware is active. Since we do not tail pod logs in production code (or anywhere else at this time),
|
||||||
|
// we don't need to fix that bug right now just for this test.
|
||||||
|
if env.APIGroupSuffix == "pinniped.dev" {
|
||||||
|
// For each pod, start tailing its logs.
|
||||||
|
for _, pod := range initialPods {
|
||||||
|
tailDoneCh, logTailBuf := tailFollowPodLogs(t, pod)
|
||||||
|
podLogs = append(podLogs, &podLog{
|
||||||
|
pod: pod,
|
||||||
|
tailDoneCh: tailDoneCh,
|
||||||
|
logsBuf: logTailBuf,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scale down the deployment's number of replicas to 0, which will shut down all the pods.
|
||||||
|
originalScale := updateDeploymentScale(t, namespace, appName, 0)
|
||||||
|
|
||||||
|
// When the test is over, restore the deployment to the original scale.
|
||||||
|
t.Cleanup(func() {
|
||||||
|
updateDeploymentScale(t, namespace, appName, originalScale)
|
||||||
|
|
||||||
|
// Wait for all the new pods to be running.
|
||||||
|
var newPods []corev1.Pod
|
||||||
|
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
||||||
|
newPods = getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
|
||||||
|
requireEventually.Len(newPods, originalScale, "wanted pods to return to original scale")
|
||||||
|
}, 2*time.Minute, 200*time.Millisecond)
|
||||||
|
|
||||||
|
// After a short time, leader election should have finished and the lease should contain the name of
|
||||||
|
// one of the new pods as the lease's holder.
|
||||||
|
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
|
||||||
|
func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(newPods), holder) }, 1*time.Minute)
|
||||||
|
|
||||||
|
t.Logf("new pod of Deployment %s/%s has acquired the leader election lease", namespace, appName)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Double check: the deployment's previous scale should have equaled the actual number of running pods from
|
||||||
|
// the start of the test (before we scaled down).
|
||||||
|
require.Equal(t, len(initialPods), originalScale)
|
||||||
|
|
||||||
|
// Now that we have adjusted the scale to 0, the pods should go away.
|
||||||
|
// Our pods are intended to gracefully shut down within a few seconds, so fail unless it happens fairly quickly.
|
||||||
|
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
||||||
|
pods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
|
||||||
|
requireEventually.Len(pods, 0, "wanted no pods but found some")
|
||||||
|
}, 20*time.Second, 200*time.Millisecond)
|
||||||
|
|
||||||
|
// Look for some interesting log messages in each of the now-dead pod's logs, if we started tailing them above.
|
||||||
|
for _, pl := range podLogs {
|
||||||
|
// Wait for the logs of the now-dead pod to be finished collecting.
|
||||||
|
t.Logf("waiting for tail of pod logs for pod %q", pl.pod.Name)
|
||||||
|
<-pl.tailDoneCh
|
||||||
|
// Assert that the Kubernetes generic apiserver library has started and finished a graceful
|
||||||
|
// shutdown according to its log messages. This is to make sure that the whole graceful shutdown
|
||||||
|
// process was performed successfully and without being blocked.
|
||||||
|
require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] shutdown event","name":"ShutdownInitiated"`,
|
||||||
|
"did not find expected message in pod log for pod %q", pl.pod.Name)
|
||||||
|
require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] apiserver is exiting\n"`,
|
||||||
|
"did not find expected message in pod log for pod %q", pl.pod.Name)
|
||||||
|
t.Logf("found expected graceful-termination messages in the logs of pod %q", pl.pod.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The leader election lease should already contain the empty string as the holder, because the old leader
|
||||||
|
// pod should have given up the lease during its graceful shutdown.
|
||||||
|
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
|
||||||
|
func(holder string) bool { return holder == "" }, 200*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Given a list of pods, return a list of their names.
|
||||||
|
func namesOfPods(pods []corev1.Pod) []string {
|
||||||
|
names := make([]string, len(pods))
|
||||||
|
for i, pod := range pods {
|
||||||
|
names[i] = pod.Name
|
||||||
|
}
|
||||||
|
return names
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRunningPodsByNamePrefix(
|
||||||
|
t *testing.T,
|
||||||
|
namespace string,
|
||||||
|
podNamePrefix string,
|
||||||
|
podNameExcludeSubstring string,
|
||||||
|
) (foundPods []corev1.Pod) {
|
||||||
|
t.Helper()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
client := testlib.NewKubernetesClientset(t)
|
||||||
|
|
||||||
|
pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if !strings.HasPrefix(pod.Name, podNamePrefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if podNameExcludeSubstring != "" && strings.Contains(pod.Name, podNameExcludeSubstring) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pod.Status.Phase != corev1.PodRunning {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
foundPods = append(foundPods, pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
return foundPods
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateDeploymentScale(t *testing.T, namespace string, deploymentName string, newScale int) int {
|
||||||
|
t.Helper()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
client := testlib.NewKubernetesClientset(t)
|
||||||
|
|
||||||
|
initialScale, err := client.AppsV1().Deployments(namespace).GetScale(ctx, deploymentName, metav1.GetOptions{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
desiredScale := initialScale.DeepCopy()
|
||||||
|
desiredScale.Spec.Replicas = int32(newScale)
|
||||||
|
updatedScale, err := client.AppsV1().Deployments(namespace).UpdateScale(ctx, deploymentName, desiredScale, metav1.UpdateOptions{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Logf("updated scale of Deployment %s/%s from %d to %d",
|
||||||
|
namespace, deploymentName, initialScale.Spec.Replicas, updatedScale.Spec.Replicas)
|
||||||
|
|
||||||
|
return int(initialScale.Spec.Replicas)
|
||||||
|
}
|
||||||
|
|
||||||
|
func tailFollowPodLogs(t *testing.T, pod corev1.Pod) (chan struct{}, *bytes.Buffer) {
|
||||||
|
t.Helper()
|
||||||
|
done := make(chan struct{})
|
||||||
|
var buf bytes.Buffer
|
||||||
|
client := testlib.NewKubernetesClientset(t)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// At the end of this block, signal that we are done writing to the returned buf,
|
||||||
|
// so it is now safe to read the logs from the returned buf.
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
req := client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
|
||||||
|
Follow: true, // keep streaming until completion
|
||||||
|
})
|
||||||
|
|
||||||
|
// This line should block until the pod dies or the context expires.
|
||||||
|
body, err := req.Stream(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = io.Copy(&buf, body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, body.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
return done, &buf
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForLeaderElectionLeaseToHaveHolderIdentity(
|
||||||
|
t *testing.T,
|
||||||
|
namespace string,
|
||||||
|
leaseName string,
|
||||||
|
holderIdentityPredicate func(string) bool,
|
||||||
|
waitDuration time.Duration,
|
||||||
|
) {
|
||||||
|
t.Helper()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
client := testlib.NewKubernetesClientset(t)
|
||||||
|
|
||||||
|
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
||||||
|
lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{})
|
||||||
|
requireEventually.NoError(err)
|
||||||
|
requireEventually.True(holderIdentityPredicate(*lease.Spec.HolderIdentity))
|
||||||
|
}, waitDuration, 200*time.Millisecond)
|
||||||
|
}
|
@ -37,6 +37,8 @@ const (
|
|||||||
type TestEnv struct {
|
type TestEnv struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
|
|
||||||
|
skipPodRestartAssertions bool
|
||||||
|
|
||||||
ToolsNamespace string `json:"toolsNamespace"`
|
ToolsNamespace string `json:"toolsNamespace"`
|
||||||
ConciergeNamespace string `json:"conciergeNamespace"`
|
ConciergeNamespace string `json:"conciergeNamespace"`
|
||||||
SupervisorNamespace string `json:"supervisorNamespace"`
|
SupervisorNamespace string `json:"supervisorNamespace"`
|
||||||
@ -120,15 +122,28 @@ func (e *TestEnv) ProxyEnv() []string {
|
|||||||
// environment parsing N times per test and so that any implicit assertions happen only once.
|
// environment parsing N times per test and so that any implicit assertions happen only once.
|
||||||
var memoizedTestEnvsByTest sync.Map //nolint:gochecknoglobals
|
var memoizedTestEnvsByTest sync.Map //nolint:gochecknoglobals
|
||||||
|
|
||||||
|
type TestEnvOption func(env *TestEnv)
|
||||||
|
|
||||||
|
// SkipPodRestartAssertions is a functional option that can be passed to IntegrationEnv()
|
||||||
|
// to skip using the implicit assertions which check that no pods get restarted during tests.
|
||||||
|
// Please using this sparingly, since most pod restarts are caused by unintentional crashes
|
||||||
|
// and should therefore cause tests to fail.
|
||||||
|
func SkipPodRestartAssertions() TestEnvOption {
|
||||||
|
return func(t *TestEnv) {
|
||||||
|
t.skipPodRestartAssertions = true
|
||||||
|
t.t.Log("skipping pod restart assertions for test", t.t.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// IntegrationEnv gets the integration test environment from OS environment variables. This
|
// IntegrationEnv gets the integration test environment from OS environment variables. This
|
||||||
// method also implies SkipUnlessIntegration().
|
// method also implies SkipUnlessIntegration().
|
||||||
func IntegrationEnv(t *testing.T) *TestEnv {
|
func IntegrationEnv(t *testing.T, opts ...TestEnvOption) *TestEnv {
|
||||||
if existing, exists := memoizedTestEnvsByTest.Load(t); exists {
|
if existing, exists := memoizedTestEnvsByTest.Load(t); exists {
|
||||||
return existing.(*TestEnv)
|
return existing.(*TestEnv)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Helper()
|
t.Helper()
|
||||||
skipUnlessIntegration(t)
|
SkipUnlessIntegration(t)
|
||||||
|
|
||||||
capabilitiesDescriptionYAML := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_YAML")
|
capabilitiesDescriptionYAML := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_YAML")
|
||||||
capabilitiesDescriptionFile := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_FILE")
|
capabilitiesDescriptionFile := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_FILE")
|
||||||
@ -142,18 +157,26 @@ func IntegrationEnv(t *testing.T) *TestEnv {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var result TestEnv
|
var testEnv TestEnv
|
||||||
err := yaml.Unmarshal([]byte(capabilitiesDescriptionYAML), &result)
|
err := yaml.Unmarshal([]byte(capabilitiesDescriptionYAML), &testEnv)
|
||||||
require.NoErrorf(t, err, "capabilities specification was invalid YAML")
|
require.NoErrorf(t, err, "capabilities specification was invalid YAML")
|
||||||
|
|
||||||
loadEnvVars(t, &result)
|
loadEnvVars(t, &testEnv)
|
||||||
result.t = t
|
testEnv.t = t
|
||||||
memoizedTestEnvsByTest.Store(t, &result)
|
|
||||||
|
|
||||||
// In every integration test, assert that no pods in our namespaces restart during the test.
|
for _, opt := range opts {
|
||||||
assertNoRestartsDuringTest(t, result.ConciergeNamespace, "!pinniped.dev/test")
|
opt(&testEnv)
|
||||||
assertNoRestartsDuringTest(t, result.SupervisorNamespace, "!pinniped.dev/test")
|
}
|
||||||
return &result
|
|
||||||
|
memoizedTestEnvsByTest.Store(t, &testEnv)
|
||||||
|
|
||||||
|
// By default, in every integration test, assert that no pods in our namespaces restart during the test.
|
||||||
|
if !testEnv.skipPodRestartAssertions {
|
||||||
|
assertNoRestartsDuringTest(t, testEnv.ConciergeNamespace, "!pinniped.dev/test")
|
||||||
|
assertNoRestartsDuringTest(t, testEnv.SupervisorNamespace, "!pinniped.dev/test")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &testEnv
|
||||||
}
|
}
|
||||||
|
|
||||||
func needEnv(t *testing.T, key string) string {
|
func needEnv(t *testing.T, key string) string {
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
// Copyright 2020-2022 the Pinniped contributors. All Rights Reserved.
|
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
package testlib
|
package testlib
|
||||||
|
|
||||||
import "testing"
|
import "testing"
|
||||||
|
|
||||||
// skipUnlessIntegration skips the current test if `-short` has been passed to `go test`.
|
// SkipUnlessIntegration skips the current test if `-short` has been passed to `go test`.
|
||||||
func skipUnlessIntegration(t *testing.T) {
|
func SkipUnlessIntegration(t *testing.T) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
|
Loading…
Reference in New Issue
Block a user