diff --git a/internal/controller/kubecertagent/kubecertagent.go b/internal/controller/kubecertagent/kubecertagent.go index c97f7877..e5651f65 100644 --- a/internal/controller/kubecertagent/kubecertagent.go +++ b/internal/controller/kubecertagent/kubecertagent.go @@ -184,7 +184,6 @@ func newAgentController( clock clock.Clock, execCache *cache.Expiring, log logr.Logger, - options ...controllerlib.Option, ) controllerlib.Controller { return controllerlib.New( controllerlib.Config{ @@ -204,47 +203,45 @@ func newAgentController( execCache: execCache, }, }, - append([]controllerlib.Option{ - controllerlib.WithInformer( - kubeSystemPods, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return controllerManagerLabels.Matches(labels.Set(obj.GetLabels())) - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - agentDeployments, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName() - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - agentPods, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return agentLabels.Matches(labels.Set(obj.GetLabels())) - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - kubePublicConfigMaps, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName - }), - controllerlib.InformerOption{}, - ), - controllerlib.WithInformer( - credentialIssuers, - pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { - return obj.GetName() == cfg.CredentialIssuerName - }), - controllerlib.InformerOption{}, - ), - // Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager - // pods. We should be able to pass an empty key since we don't use the key in the sync (we sync - // the world). - controllerlib.WithInitialEvent(controllerlib.Key{}), - }, options...)..., + controllerlib.WithInformer( + kubeSystemPods, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return controllerManagerLabels.Matches(labels.Set(obj.GetLabels())) + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + agentDeployments, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName() + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + agentPods, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return agentLabels.Matches(labels.Set(obj.GetLabels())) + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + kubePublicConfigMaps, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName + }), + controllerlib.InformerOption{}, + ), + controllerlib.WithInformer( + credentialIssuers, + pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool { + return obj.GetName() == cfg.CredentialIssuerName + }), + controllerlib.InformerOption{}, + ), + // Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager + // pods. We should be able to pass an empty key since we don't use the key in the sync (we sync + // the world). + controllerlib.WithInitialEvent(controllerlib.Key{}), ) } diff --git a/internal/controller/kubecertagent/kubecertagent_test.go b/internal/controller/kubecertagent/kubecertagent_test.go index cbcad11a..7c77d541 100644 --- a/internal/controller/kubecertagent/kubecertagent_test.go +++ b/internal/controller/kubecertagent/kubecertagent_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -21,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" kubefake "k8s.io/client-go/kubernetes/fake" coretesting "k8s.io/client-go/testing" "k8s.io/utils/pointer" @@ -29,11 +31,13 @@ import ( conciergefake "go.pinniped.dev/generated/latest/client/concierge/clientset/versioned/fake" conciergeinformers "go.pinniped.dev/generated/latest/client/concierge/informers/externalversions" "go.pinniped.dev/internal/controller/kubecertagent/mocks" + "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/here" "go.pinniped.dev/internal/kubeclient" "go.pinniped.dev/internal/testutil" "go.pinniped.dev/internal/testutil/testlogger" + "go.pinniped.dev/test/testlib" ) func TestAgentController(t *testing.T) { @@ -496,12 +500,10 @@ func TestAgentController(t *testing.T) { }, wantAgentDeployment: healthyAgentDeploymentWithOldStyleSelector, // couldn't be deleted, so it didn't change // delete to try to recreate deployment when Selector field changes, but delete always fails, so keeps trying to delete - wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete", "delete", "delete"}, + wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete"}, wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{ testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), - testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), - testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), }, wantStrategy: &configv1alpha1.CredentialIssuerStrategy{ Type: configv1alpha1.KubeClusterSigningCertificateStrategyType, @@ -536,7 +538,7 @@ func TestAgentController(t *testing.T) { }, wantAgentDeployment: nil, // was deleted, but couldn't be recreated // delete to try to recreate deployment when Selector field changes, but create always fails, so keeps trying to recreate - wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create", "create", "create"}, + wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create"}, wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{ testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion), }, @@ -1029,53 +1031,54 @@ func TestAgentController(t *testing.T) { fakeClock, execCache, log, - controllerlib.WithMaxRetries(1), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers, conciergeInformers) + errorMessages := runControllerUntilQuiet(ctx, t, controller, hasDeploymentSynced(trackDeleteKubeClient, kubeInformers), kubeInformers, conciergeInformers) actualErrors := deduplicate(errorMessages) - require.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors") + assert.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors") allAllowedErrors := append([]string{}, tt.wantDistinctErrors...) allAllowedErrors = append(allAllowedErrors, tt.alsoAllowUndesiredDistinctErrors...) - require.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test") + assert.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test") assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs") // Assert on all actions that happened to deployments. var actualDeploymentActionVerbs []string for _, a := range kubeClientset.Actions() { - if a.GetResource().Resource == "deployments" { + if a.GetResource().Resource == "deployments" && a.GetVerb() != "get" { // ignore gets caused by hasDeploymentSynced actualDeploymentActionVerbs = append(actualDeploymentActionVerbs, a.GetVerb()) } } if tt.wantDeploymentActionVerbs != nil { - require.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs) + assert.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs) } if tt.wantDeploymentDeleteActionOpts != nil { - require.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts) + assert.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts) } // Assert that the agent deployment is in the expected final state. deployments, err := kubeClientset.AppsV1().Deployments("concierge").List(ctx, metav1.ListOptions{}) require.NoError(t, err) if tt.wantAgentDeployment == nil { - require.Empty(t, deployments.Items, "did not expect an agent deployment") - } else { - require.Len(t, deployments.Items, 1, "expected a single agent deployment") - require.Equal(t, tt.wantAgentDeployment, &deployments.Items[0]) + assert.Empty(t, deployments.Items, "did not expect an agent deployment") + } else { //nolint: gocritic + if assert.Len(t, deployments.Items, 1, "expected a single agent deployment") { + assert.Equal(t, tt.wantAgentDeployment, &deployments.Items[0]) + } } // Assert that the CredentialIssuer is in the expected final state if tt.wantStrategy != nil { credIssuer, err := conciergeClientset.ConfigV1alpha1().CredentialIssuers().Get(ctx, initialCredentialIssuer.Name, metav1.GetOptions{}) - require.NoError(t, err) - require.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer") - require.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0]) + ok := assert.NoError(t, err) + if ok && assert.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer") { + assert.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0]) + } } }) } @@ -1156,24 +1159,43 @@ func deduplicate(strings []string) []string { return result } -func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, informers ...interface{ Start(<-chan struct{}) }) []string { +func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, synced func(ctx context.Context, t *testing.T), informers ...controllerinit.Informer) []string { ctx, cancel := context.WithCancel(ctx) defer cancel() - errorStream := make(chan error) + var syncErrs []error // track the errors we see during each iteration + errorStream := make(chan error, 100) // do not block the controller loop controllerlib.TestWrap(t, controller, func(syncer controllerlib.Syncer) controllerlib.Syncer { return controllerlib.SyncFunc(func(ctx controllerlib.Context) error { + synced(ctx.Context, t) // make sure that our informer has caught up with our client + + // if we got the same error twice in a row, prevent the controller sync loop from running + if len(syncErrs) >= 2 { + lastErr := syncErrs[len(syncErrs)-1] + secondToLastErr := syncErrs[len(syncErrs)-2] + if lastErr != nil && secondToLastErr != nil && lastErr.Error() == secondToLastErr.Error() { + cancel() // not explicitly required but matches our intent + return nil + } + } + err := syncer.Sync(ctx) errorStream <- err + + syncErrs = append(syncErrs, err) + return err }) }) - for _, informer := range informers { - informer.Start(ctx.Done()) - } - - go controller.Run(ctx, 1) + // start and sync the informers before running the controller + runController, err := controllerinit.Prepare( + func(ctx context.Context) { controller.Run(ctx, 1) }, + func(ctx context.Context, runner controllerinit.Runner) { runner(ctx) }, + informers..., + )(ctx) + require.NoError(t, err) + go runController(ctx) // Wait until the controller is quiet for two seconds. var errorMessages []string @@ -1192,3 +1214,24 @@ done: } return errorMessages } + +func hasDeploymentSynced(client kubernetes.Interface, kubeInformers informers.SharedInformerFactory) func(ctx context.Context, t *testing.T) { + return func(ctx context.Context, t *testing.T) { + testlib.RequireEventually(t, func(requireEventually *require.Assertions) { + realDep, realErr := client.AppsV1().Deployments("concierge"). + Get(ctx, "pinniped-concierge-kube-cert-agent", metav1.GetOptions{}) + + cachedDep, cachedErr := kubeInformers.Apps().V1().Deployments().Lister().Deployments("concierge"). + Get("pinniped-concierge-kube-cert-agent") + + if errors.IsNotFound(realErr) && errors.IsNotFound(cachedErr) { + return + } + + requireEventually.NoError(realErr) + requireEventually.NoError(cachedErr) + + requireEventually.Equal(realDep, cachedDep) + }, 2*time.Second, 100*time.Millisecond) + } +} diff --git a/internal/controller/kubecertagent/legacypodcleaner.go b/internal/controller/kubecertagent/legacypodcleaner.go index 43e4a9f9..8c8a6cf9 100644 --- a/internal/controller/kubecertagent/legacypodcleaner.go +++ b/internal/controller/kubecertagent/legacypodcleaner.go @@ -24,7 +24,6 @@ func NewLegacyPodCleanerController( client *kubeclient.Client, agentPods corev1informers.PodInformer, log logr.Logger, - options ...controllerlib.Option, ) controllerlib.Controller { // legacyAgentLabels are the Kubernetes labels we previously added to agent pods (the new value is "v2"). // We also expect these pods to have the "extra" labels configured on the Concierge. @@ -67,14 +66,12 @@ func NewLegacyPodCleanerController( return nil }), }, - append([]controllerlib.Option{ - controllerlib.WithInformer( - agentPods, - pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { - return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels())) - }, nil), - controllerlib.InformerOption{}, - ), - }, options...)..., + controllerlib.WithInformer( + agentPods, + pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool { + return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels())) + }, nil), + controllerlib.InformerOption{}, + ), ) } diff --git a/internal/controller/kubecertagent/legacypodcleaner_test.go b/internal/controller/kubecertagent/legacypodcleaner_test.go index 7cf89b0b..20cf46b3 100644 --- a/internal/controller/kubecertagent/legacypodcleaner_test.go +++ b/internal/controller/kubecertagent/legacypodcleaner_test.go @@ -18,7 +18,6 @@ import ( kubefake "k8s.io/client-go/kubernetes/fake" coretesting "k8s.io/client-go/testing" - "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/kubeclient" "go.pinniped.dev/internal/testutil" "go.pinniped.dev/internal/testutil/testlogger" @@ -175,13 +174,12 @@ func TestLegacyPodCleanerController(t *testing.T) { &kubeclient.Client{Kubernetes: trackDeleteClient}, kubeInformers.Core().V1().Pods(), log, - controllerlib.WithMaxRetries(1), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers) + errorMessages := runControllerUntilQuiet(ctx, t, controller, func(_ context.Context, _ *testing.T) {}, kubeInformers) assert.Equal(t, tt.wantDistinctErrors, deduplicate(errorMessages), "unexpected errors") assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs") assert.Equal(t, tt.wantActions, kubeClientset.Actions()[2:], "unexpected actions") diff --git a/internal/controllerinit/controllerinit.go b/internal/controllerinit/controllerinit.go index 6663c1af..dbc5dc09 100644 --- a/internal/controllerinit/controllerinit.go +++ b/internal/controllerinit/controllerinit.go @@ -22,8 +22,8 @@ type RunnerWrapper func(context.Context, Runner) // It is expected to be called in the main go routine since the construction can fail. type RunnerBuilder func(context.Context) (Runner, error) -// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync. -type informer interface { +// Informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync. +type Informer interface { Start(stopCh <-chan struct{}) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool } @@ -31,7 +31,7 @@ type informer interface { // Prepare returns RunnerBuilder that, when called: // 1. Starts all provided informers and waits for them sync (and fails if they hang) // 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare -func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder { +func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...Informer) RunnerBuilder { return func(ctx context.Context) (Runner, error) { for _, informer := range informers { informer := informer