kubecertagent: fix flakey tests

This commit makes the following changes to the kube cert agent tests:

1. Informers are synced on start using the controllerinit code
2. Deployment client and informer are synced per controller sync loop
3. Controller sync loop exits after two consistent errors
4. Use assert instead of require to avoid ending the test early

Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
Monis Khan 2021-09-16 10:03:43 -04:00
parent 95b9782549
commit 09467d3e24
No known key found for this signature in database
GPG Key ID: 52C90ADA01B269B8
5 changed files with 118 additions and 83 deletions

View File

@ -184,7 +184,6 @@ func newAgentController(
clock clock.Clock,
execCache *cache.Expiring,
log logr.Logger,
options ...controllerlib.Option,
) controllerlib.Controller {
return controllerlib.New(
controllerlib.Config{
@ -204,47 +203,45 @@ func newAgentController(
execCache: execCache,
},
},
append([]controllerlib.Option{
controllerlib.WithInformer(
kubeSystemPods,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return controllerManagerLabels.Matches(labels.Set(obj.GetLabels()))
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
agentDeployments,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName()
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
agentPods,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return agentLabels.Matches(labels.Set(obj.GetLabels()))
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
kubePublicConfigMaps,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
credentialIssuers,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return obj.GetName() == cfg.CredentialIssuerName
}),
controllerlib.InformerOption{},
),
// Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager
// pods. We should be able to pass an empty key since we don't use the key in the sync (we sync
// the world).
controllerlib.WithInitialEvent(controllerlib.Key{}),
}, options...)...,
controllerlib.WithInformer(
kubeSystemPods,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return controllerManagerLabels.Matches(labels.Set(obj.GetLabels()))
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
agentDeployments,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName()
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
agentPods,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return agentLabels.Matches(labels.Set(obj.GetLabels()))
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
kubePublicConfigMaps,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName
}),
controllerlib.InformerOption{},
),
controllerlib.WithInformer(
credentialIssuers,
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
return obj.GetName() == cfg.CredentialIssuerName
}),
controllerlib.InformerOption{},
),
// Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager
// pods. We should be able to pass an empty key since we don't use the key in the sync (we sync
// the world).
controllerlib.WithInitialEvent(controllerlib.Key{}),
)
}

View File

@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -21,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
kubefake "k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
@ -29,11 +31,13 @@ import (
conciergefake "go.pinniped.dev/generated/latest/client/concierge/clientset/versioned/fake"
conciergeinformers "go.pinniped.dev/generated/latest/client/concierge/informers/externalversions"
"go.pinniped.dev/internal/controller/kubecertagent/mocks"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/here"
"go.pinniped.dev/internal/kubeclient"
"go.pinniped.dev/internal/testutil"
"go.pinniped.dev/internal/testutil/testlogger"
"go.pinniped.dev/test/testlib"
)
func TestAgentController(t *testing.T) {
@ -496,12 +500,10 @@ func TestAgentController(t *testing.T) {
},
wantAgentDeployment: healthyAgentDeploymentWithOldStyleSelector, // couldn't be deleted, so it didn't change
// delete to try to recreate deployment when Selector field changes, but delete always fails, so keeps trying to delete
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete", "delete", "delete"},
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete"},
wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
},
wantStrategy: &configv1alpha1.CredentialIssuerStrategy{
Type: configv1alpha1.KubeClusterSigningCertificateStrategyType,
@ -536,7 +538,7 @@ func TestAgentController(t *testing.T) {
},
wantAgentDeployment: nil, // was deleted, but couldn't be recreated
// delete to try to recreate deployment when Selector field changes, but create always fails, so keeps trying to recreate
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create", "create", "create"},
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create"},
wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
},
@ -1029,53 +1031,54 @@ func TestAgentController(t *testing.T) {
fakeClock,
execCache,
log,
controllerlib.WithMaxRetries(1),
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers, conciergeInformers)
errorMessages := runControllerUntilQuiet(ctx, t, controller, hasDeploymentSynced(trackDeleteKubeClient, kubeInformers), kubeInformers, conciergeInformers)
actualErrors := deduplicate(errorMessages)
require.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors")
assert.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors")
allAllowedErrors := append([]string{}, tt.wantDistinctErrors...)
allAllowedErrors = append(allAllowedErrors, tt.alsoAllowUndesiredDistinctErrors...)
require.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test")
assert.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test")
assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs")
// Assert on all actions that happened to deployments.
var actualDeploymentActionVerbs []string
for _, a := range kubeClientset.Actions() {
if a.GetResource().Resource == "deployments" {
if a.GetResource().Resource == "deployments" && a.GetVerb() != "get" { // ignore gets caused by hasDeploymentSynced
actualDeploymentActionVerbs = append(actualDeploymentActionVerbs, a.GetVerb())
}
}
if tt.wantDeploymentActionVerbs != nil {
require.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs)
assert.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs)
}
if tt.wantDeploymentDeleteActionOpts != nil {
require.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts)
assert.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts)
}
// Assert that the agent deployment is in the expected final state.
deployments, err := kubeClientset.AppsV1().Deployments("concierge").List(ctx, metav1.ListOptions{})
require.NoError(t, err)
if tt.wantAgentDeployment == nil {
require.Empty(t, deployments.Items, "did not expect an agent deployment")
} else {
require.Len(t, deployments.Items, 1, "expected a single agent deployment")
require.Equal(t, tt.wantAgentDeployment, &deployments.Items[0])
assert.Empty(t, deployments.Items, "did not expect an agent deployment")
} else { //nolint: gocritic
if assert.Len(t, deployments.Items, 1, "expected a single agent deployment") {
assert.Equal(t, tt.wantAgentDeployment, &deployments.Items[0])
}
}
// Assert that the CredentialIssuer is in the expected final state
if tt.wantStrategy != nil {
credIssuer, err := conciergeClientset.ConfigV1alpha1().CredentialIssuers().Get(ctx, initialCredentialIssuer.Name, metav1.GetOptions{})
require.NoError(t, err)
require.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer")
require.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0])
ok := assert.NoError(t, err)
if ok && assert.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer") {
assert.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0])
}
}
})
}
@ -1156,24 +1159,43 @@ func deduplicate(strings []string) []string {
return result
}
func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, informers ...interface{ Start(<-chan struct{}) }) []string {
func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, synced func(ctx context.Context, t *testing.T), informers ...controllerinit.Informer) []string {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errorStream := make(chan error)
var syncErrs []error // track the errors we see during each iteration
errorStream := make(chan error, 100) // do not block the controller loop
controllerlib.TestWrap(t, controller, func(syncer controllerlib.Syncer) controllerlib.Syncer {
return controllerlib.SyncFunc(func(ctx controllerlib.Context) error {
synced(ctx.Context, t) // make sure that our informer has caught up with our client
// if we got the same error twice in a row, prevent the controller sync loop from running
if len(syncErrs) >= 2 {
lastErr := syncErrs[len(syncErrs)-1]
secondToLastErr := syncErrs[len(syncErrs)-2]
if lastErr != nil && secondToLastErr != nil && lastErr.Error() == secondToLastErr.Error() {
cancel() // not explicitly required but matches our intent
return nil
}
}
err := syncer.Sync(ctx)
errorStream <- err
syncErrs = append(syncErrs, err)
return err
})
})
for _, informer := range informers {
informer.Start(ctx.Done())
}
go controller.Run(ctx, 1)
// start and sync the informers before running the controller
runController, err := controllerinit.Prepare(
func(ctx context.Context) { controller.Run(ctx, 1) },
func(ctx context.Context, runner controllerinit.Runner) { runner(ctx) },
informers...,
)(ctx)
require.NoError(t, err)
go runController(ctx)
// Wait until the controller is quiet for two seconds.
var errorMessages []string
@ -1192,3 +1214,24 @@ done:
}
return errorMessages
}
func hasDeploymentSynced(client kubernetes.Interface, kubeInformers informers.SharedInformerFactory) func(ctx context.Context, t *testing.T) {
return func(ctx context.Context, t *testing.T) {
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
realDep, realErr := client.AppsV1().Deployments("concierge").
Get(ctx, "pinniped-concierge-kube-cert-agent", metav1.GetOptions{})
cachedDep, cachedErr := kubeInformers.Apps().V1().Deployments().Lister().Deployments("concierge").
Get("pinniped-concierge-kube-cert-agent")
if errors.IsNotFound(realErr) && errors.IsNotFound(cachedErr) {
return
}
requireEventually.NoError(realErr)
requireEventually.NoError(cachedErr)
requireEventually.Equal(realDep, cachedDep)
}, 2*time.Second, 100*time.Millisecond)
}
}

View File

@ -24,7 +24,6 @@ func NewLegacyPodCleanerController(
client *kubeclient.Client,
agentPods corev1informers.PodInformer,
log logr.Logger,
options ...controllerlib.Option,
) controllerlib.Controller {
// legacyAgentLabels are the Kubernetes labels we previously added to agent pods (the new value is "v2").
// We also expect these pods to have the "extra" labels configured on the Concierge.
@ -67,14 +66,12 @@ func NewLegacyPodCleanerController(
return nil
}),
},
append([]controllerlib.Option{
controllerlib.WithInformer(
agentPods,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels()))
}, nil),
controllerlib.InformerOption{},
),
}, options...)...,
controllerlib.WithInformer(
agentPods,
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels()))
}, nil),
controllerlib.InformerOption{},
),
)
}

View File

@ -18,7 +18,6 @@ import (
kubefake "k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/kubeclient"
"go.pinniped.dev/internal/testutil"
"go.pinniped.dev/internal/testutil/testlogger"
@ -175,13 +174,12 @@ func TestLegacyPodCleanerController(t *testing.T) {
&kubeclient.Client{Kubernetes: trackDeleteClient},
kubeInformers.Core().V1().Pods(),
log,
controllerlib.WithMaxRetries(1),
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers)
errorMessages := runControllerUntilQuiet(ctx, t, controller, func(_ context.Context, _ *testing.T) {}, kubeInformers)
assert.Equal(t, tt.wantDistinctErrors, deduplicate(errorMessages), "unexpected errors")
assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs")
assert.Equal(t, tt.wantActions, kubeClientset.Actions()[2:], "unexpected actions")

View File

@ -22,8 +22,8 @@ type RunnerWrapper func(context.Context, Runner)
// It is expected to be called in the main go routine since the construction can fail.
type RunnerBuilder func(context.Context) (Runner, error)
// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync.
type informer interface {
// Informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync.
type Informer interface {
Start(stopCh <-chan struct{})
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}
@ -31,7 +31,7 @@ type informer interface {
// Prepare returns RunnerBuilder that, when called:
// 1. Starts all provided informers and waits for them sync (and fails if they hang)
// 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare
func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder {
func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...Informer) RunnerBuilder {
return func(ctx context.Context) (Runner, error) {
for _, informer := range informers {
informer := informer