From 09467d3e243f75b5e2707271f790302a2f6e9e8b Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Thu, 16 Sep 2021 10:03:43 -0400 Subject: [PATCH] kubecertagent: fix flakey tests This commit makes the following changes to the kube cert agent tests: 1. Informers are synced on start using the controllerinit code 2. Deployment client and informer are synced per controller sync loop 3. Controller sync loop exits after two consistent errors 4. Use assert instead of require to avoid ending the test early Signed-off-by: Monis Khan --- .../controller/kubecertagent/kubecertagent.go | 81 ++++++++-------- .../kubecertagent/kubecertagent_test.go | 93 ++++++++++++++----- .../kubecertagent/legacypodcleaner.go | 17 ++-- .../kubecertagent/legacypodcleaner_test.go | 4 +- internal/controllerinit/controllerinit.go | 6 +- 5 files changed, 118 insertions(+), 83 deletions(-) diff --git a/internal/controller/kubecertagent/kubecertagent.go b/internal/controller/kubecertagent/kubecertagent.go index c97f7877..e5651f65 100644 --- a/internal/controller/kubecertagent/kubecertagent.go +++ b/internal/controller/kubecertagent/kubecertagent.go @@ -184,7 +184,6 @@ func newAgentController( clock clock.Clock, execCache *cache.Expiring, log logr.Logger, - options ...controllerlib.Option, ) controllerlib.Controller { return controllerlib.New( controllerlib.Config{ @@ -204,47 +203,45 @@ func newAgentController( execCache: execCache, }, }, - append([]controllerlib.Option{ - controllerlib.WithInformer( - kubeSystemPods, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return controllerManagerLabels.Matches(labels.Set(obj.GetLabels())) - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - agentDeployments, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName() - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - agentPods, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return agentLabels.Matches(labels.Set(obj.GetLabels())) - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - kubePublicConfigMaps, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - credentialIssuers, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return obj.GetName() == cfg.CredentialIssuerName - }), - controllerlib.InformerOption{}, - ), - // Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager - // pods. We should be able to pass an empty key since we don't use the key in the sync (we sync - // the world). - controllerlib.WithInitialEvent(controllerlib.Key{}), - }, options...)..., + controllerlib.WithInformer( + kubeSystemPods, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return controllerManagerLabels.Matches(labels.Set(obj.GetLabels())) + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + agentDeployments, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName() + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + agentPods, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return agentLabels.Matches(labels.Set(obj.GetLabels())) + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + kubePublicConfigMaps, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + credentialIssuers, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return obj.GetName() == cfg.CredentialIssuerName + }), + controllerlib.InformerOption{}, + ), + // Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager + // pods. We should be able to pass an empty key since we don't use the key in the sync (we sync + // the world). + controllerlib.WithInitialEvent(controllerlib.Key{}), ) } diff --git a/internal/controller/kubecertagent/kubecertagent_test.go b/internal/controller/kubecertagent/kubecertagent_test.go index cbcad11a..7c77d541 100644 --- a/internal/controller/kubecertagent/kubecertagent_test.go +++ b/internal/controller/kubecertagent/kubecertagent_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -21,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" kubefake "k8s.io/client-go/kubernetes/fake" coretesting "k8s.io/client-go/testing" "k8s.io/utils/pointer" @@ -29,11 +31,13 @@ import ( conciergefake "go.pinniped.dev/generated/latest/client/concierge/clientset/versioned/fake" conciergeinformers "go.pinniped.dev/generated/latest/client/concierge/informers/externalversions" "go.pinniped.dev/internal/controller/kubecertagent/mocks" + "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/here" "go.pinniped.dev/internal/kubeclient" "go.pinniped.dev/internal/testutil" "go.pinniped.dev/internal/testutil/testlogger" + "go.pinniped.dev/test/testlib" ) func TestAgentController(t *testing.T) { @@ -496,12 +500,10 @@ func TestAgentController(t *testing.T) { }, wantAgentDeployment: healthyAgentDeploymentWithOldStyleSelector, // couldn't be deleted, so it didn't change // delete to try to recreate deployment when Selector field changes, but delete always fails, so keeps trying to delete - wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete", "delete", "delete"}, + wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete"}, wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{ testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), - testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), - testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), }, wantStrategy: &configv1alpha1.CredentialIssuerStrategy{ Type: configv1alpha1.KubeClusterSigningCertificateStrategyType, @@ -536,7 +538,7 @@ func TestAgentController(t *testing.T) { }, wantAgentDeployment: nil, // was deleted, but couldn't be recreated // delete to try to recreate deployment when Selector field changes, but create always fails, so keeps trying to recreate - wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create", "create", "create"}, + wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create"}, wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{ testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), }, @@ -1029,53 +1031,54 @@ func TestAgentController(t *testing.T) { fakeClock, execCache, log, - controllerlib.WithMaxRetries(1), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers, conciergeInformers) + errorMessages := runControllerUntilQuiet(ctx, t, controller, hasDeploymentSynced(trackDeleteKubeClient, kubeInformers), kubeInformers, conciergeInformers) actualErrors := deduplicate(errorMessages) - require.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors") + assert.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors") allAllowedErrors := append([]string{}, tt.wantDistinctErrors...) allAllowedErrors = append(allAllowedErrors, tt.alsoAllowUndesiredDistinctErrors...) - require.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test") + assert.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test") assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs") // Assert on all actions that happened to deployments. var actualDeploymentActionVerbs []string for _, a := range kubeClientset.Actions() { - if a.GetResource().Resource == "deployments" { + if a.GetResource().Resource == "deployments" && a.GetVerb() != "get" { // ignore gets caused by hasDeploymentSynced actualDeploymentActionVerbs = append(actualDeploymentActionVerbs, a.GetVerb()) } } if tt.wantDeploymentActionVerbs != nil { - require.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs) + assert.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs) } if tt.wantDeploymentDeleteActionOpts != nil { - require.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts) + assert.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts) } // Assert that the agent deployment is in the expected final state. deployments, err := kubeClientset.AppsV1().Deployments("concierge").List(ctx, metav1.ListOptions{}) require.NoError(t, err) if tt.wantAgentDeployment == nil { - require.Empty(t, deployments.Items, "did not expect an agent deployment") - } else { - require.Len(t, deployments.Items, 1, "expected a single agent deployment") - require.Equal(t, tt.wantAgentDeployment, &deployments.Items[0]) + assert.Empty(t, deployments.Items, "did not expect an agent deployment") + } else { //nolint: gocritic + if assert.Len(t, deployments.Items, 1, "expected a single agent deployment") { + assert.Equal(t, tt.wantAgentDeployment, &deployments.Items[0]) + } } // Assert that the CredentialIssuer is in the expected final state if tt.wantStrategy != nil { credIssuer, err := conciergeClientset.ConfigV1alpha1().CredentialIssuers().Get(ctx, initialCredentialIssuer.Name, metav1.GetOptions{}) - require.NoError(t, err) - require.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer") - require.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0]) + ok := assert.NoError(t, err) + if ok && assert.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer") { + assert.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0]) + } } }) } @@ -1156,24 +1159,43 @@ func deduplicate(strings []string) []string { return result } -func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, informers ...interface{ Start(<-chan struct{}) }) []string { +func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, synced func(ctx context.Context, t *testing.T), informers ...controllerinit.Informer) []string { ctx, cancel := context.WithCancel(ctx) defer cancel() - errorStream := make(chan error) + var syncErrs []error // track the errors we see during each iteration + errorStream := make(chan error, 100) // do not block the controller loop controllerlib.TestWrap(t, controller, func(syncer controllerlib.Syncer) controllerlib.Syncer { return controllerlib.SyncFunc(func(ctx controllerlib.Context) error { + synced(ctx.Context, t) // make sure that our informer has caught up with our client + + // if we got the same error twice in a row, prevent the controller sync loop from running + if len(syncErrs) >= 2 { + lastErr := syncErrs[len(syncErrs)-1] + secondToLastErr := syncErrs[len(syncErrs)-2] + if lastErr != nil && secondToLastErr != nil && lastErr.Error() == secondToLastErr.Error() { + cancel() // not explicitly required but matches our intent + return nil + } + } + err := syncer.Sync(ctx) errorStream <- err + + syncErrs = append(syncErrs, err) + return err }) }) - for _, informer := range informers { - informer.Start(ctx.Done()) - } - - go controller.Run(ctx, 1) + // start and sync the informers before running the controller + runController, err := controllerinit.Prepare( + func(ctx context.Context) { controller.Run(ctx, 1) }, + func(ctx context.Context, runner controllerinit.Runner) { runner(ctx) }, + informers..., + )(ctx) + require.NoError(t, err) + go runController(ctx) // Wait until the controller is quiet for two seconds. var errorMessages []string @@ -1192,3 +1214,24 @@ done: } return errorMessages } + +func hasDeploymentSynced(client kubernetes.Interface, kubeInformers informers.SharedInformerFactory) func(ctx context.Context, t *testing.T) { + return func(ctx context.Context, t *testing.T) { + testlib.RequireEventually(t, func(requireEventually *require.Assertions) { + realDep, realErr := client.AppsV1().Deployments("concierge"). + Get(ctx, "pinniped-concierge-kube-cert-agent", metav1.GetOptions{}) + + cachedDep, cachedErr := kubeInformers.Apps().V1().Deployments().Lister().Deployments("concierge"). + Get("pinniped-concierge-kube-cert-agent") + + if errors.IsNotFound(realErr) && errors.IsNotFound(cachedErr) { + return + } + + requireEventually.NoError(realErr) + requireEventually.NoError(cachedErr) + + requireEventually.Equal(realDep, cachedDep) + }, 2*time.Second, 100*time.Millisecond) + } +} diff --git a/internal/controller/kubecertagent/legacypodcleaner.go b/internal/controller/kubecertagent/legacypodcleaner.go index 43e4a9f9..8c8a6cf9 100644 --- a/internal/controller/kubecertagent/legacypodcleaner.go +++ b/internal/controller/kubecertagent/legacypodcleaner.go @@ -24,7 +24,6 @@ func NewLegacyPodCleanerController( client *kubeclient.Client, agentPods corev1informers.PodInformer, log logr.Logger, - options ...controllerlib.Option, ) controllerlib.Controller { // legacyAgentLabels are the Kubernetes labels we previously added to agent pods (the new value is "v2"). // We also expect these pods to have the "extra" labels configured on the Concierge. @@ -67,14 +66,12 @@ func NewLegacyPodCleanerController( return nil }), }, - append([]controllerlib.Option{ - controllerlib.WithInformer( - agentPods, - pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { - return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels())) - }, nil), - controllerlib.InformerOption{}, - ), - }, options...)..., + controllerlib.WithInformer( + agentPods, + pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { + return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels())) + }, nil), + controllerlib.InformerOption{}, + ), ) } diff --git a/internal/controller/kubecertagent/legacypodcleaner_test.go b/internal/controller/kubecertagent/legacypodcleaner_test.go index 7cf89b0b..20cf46b3 100644 --- a/internal/controller/kubecertagent/legacypodcleaner_test.go +++ b/internal/controller/kubecertagent/legacypodcleaner_test.go @@ -18,7 +18,6 @@ import ( kubefake "k8s.io/client-go/kubernetes/fake" coretesting "k8s.io/client-go/testing" - "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/kubeclient" "go.pinniped.dev/internal/testutil" "go.pinniped.dev/internal/testutil/testlogger" @@ -175,13 +174,12 @@ func TestLegacyPodCleanerController(t *testing.T) { &kubeclient.Client{Kubernetes: trackDeleteClient}, kubeInformers.Core().V1().Pods(), log, - controllerlib.WithMaxRetries(1), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers) + errorMessages := runControllerUntilQuiet(ctx, t, controller, func(_ context.Context, _ *testing.T) {}, kubeInformers) assert.Equal(t, tt.wantDistinctErrors, deduplicate(errorMessages), "unexpected errors") assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs") assert.Equal(t, tt.wantActions, kubeClientset.Actions()[2:], "unexpected actions") diff --git a/internal/controllerinit/controllerinit.go b/internal/controllerinit/controllerinit.go index 6663c1af..dbc5dc09 100644 --- a/internal/controllerinit/controllerinit.go +++ b/internal/controllerinit/controllerinit.go @@ -22,8 +22,8 @@ type RunnerWrapper func(context.Context, Runner) // It is expected to be called in the main go routine since the construction can fail. type RunnerBuilder func(context.Context) (Runner, error) -// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync. -type informer interface { +// Informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync. +type Informer interface { Start(stopCh <-chan struct{}) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool } @@ -31,7 +31,7 @@ type informer interface { // Prepare returns RunnerBuilder that, when called: // 1. Starts all provided informers and waits for them sync (and fails if they hang) // 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare -func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder { +func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...Informer) RunnerBuilder { return func(ctx context.Context) (Runner, error) { for _, informer := range informers { informer := informer