Merge pull request #842 from enj/enj/t/kube_cert_flake
kubecertagent: fix flakey tests
This commit is contained in:
commit
43ba6ba686
@ -184,7 +184,6 @@ func newAgentController(
|
||||
clock clock.Clock,
|
||||
execCache *cache.Expiring,
|
||||
log logr.Logger,
|
||||
options ...controllerlib.Option,
|
||||
) controllerlib.Controller {
|
||||
return controllerlib.New(
|
||||
controllerlib.Config{
|
||||
@ -204,47 +203,45 @@ func newAgentController(
|
||||
execCache: execCache,
|
||||
},
|
||||
},
|
||||
append([]controllerlib.Option{
|
||||
controllerlib.WithInformer(
|
||||
kubeSystemPods,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return controllerManagerLabels.Matches(labels.Set(obj.GetLabels()))
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
agentDeployments,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName()
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
agentPods,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return agentLabels.Matches(labels.Set(obj.GetLabels()))
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
kubePublicConfigMaps,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
credentialIssuers,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return obj.GetName() == cfg.CredentialIssuerName
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
// Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager
|
||||
// pods. We should be able to pass an empty key since we don't use the key in the sync (we sync
|
||||
// the world).
|
||||
controllerlib.WithInitialEvent(controllerlib.Key{}),
|
||||
}, options...)...,
|
||||
controllerlib.WithInformer(
|
||||
kubeSystemPods,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return controllerManagerLabels.Matches(labels.Set(obj.GetLabels()))
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
agentDeployments,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return obj.GetNamespace() == cfg.Namespace && obj.GetName() == cfg.deploymentName()
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
agentPods,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return agentLabels.Matches(labels.Set(obj.GetLabels()))
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
kubePublicConfigMaps,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return obj.GetNamespace() == ClusterInfoNamespace && obj.GetName() == clusterInfoName
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
controllerlib.WithInformer(
|
||||
credentialIssuers,
|
||||
pinnipedcontroller.SimpleFilterWithSingletonQueue(func(obj metav1.Object) bool {
|
||||
return obj.GetName() == cfg.CredentialIssuerName
|
||||
}),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
// Be sure to run once even to make sure the CredentialIssuer is updated if there are no controller manager
|
||||
// pods. We should be able to pass an empty key since we don't use the key in the sync (we sync
|
||||
// the world).
|
||||
controllerlib.WithInitialEvent(controllerlib.Key{}),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -21,6 +22,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/cache"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
coretesting "k8s.io/client-go/testing"
|
||||
"k8s.io/utils/pointer"
|
||||
@ -29,11 +31,13 @@ import (
|
||||
conciergefake "go.pinniped.dev/generated/latest/client/concierge/clientset/versioned/fake"
|
||||
conciergeinformers "go.pinniped.dev/generated/latest/client/concierge/informers/externalversions"
|
||||
"go.pinniped.dev/internal/controller/kubecertagent/mocks"
|
||||
"go.pinniped.dev/internal/controllerinit"
|
||||
"go.pinniped.dev/internal/controllerlib"
|
||||
"go.pinniped.dev/internal/here"
|
||||
"go.pinniped.dev/internal/kubeclient"
|
||||
"go.pinniped.dev/internal/testutil"
|
||||
"go.pinniped.dev/internal/testutil/testlogger"
|
||||
"go.pinniped.dev/test/testlib"
|
||||
)
|
||||
|
||||
func TestAgentController(t *testing.T) {
|
||||
@ -496,12 +500,10 @@ func TestAgentController(t *testing.T) {
|
||||
},
|
||||
wantAgentDeployment: healthyAgentDeploymentWithOldStyleSelector, // couldn't be deleted, so it didn't change
|
||||
// delete to try to recreate deployment when Selector field changes, but delete always fails, so keeps trying to delete
|
||||
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete", "delete", "delete"},
|
||||
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "delete"},
|
||||
wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{
|
||||
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
|
||||
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
|
||||
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
|
||||
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
|
||||
},
|
||||
wantStrategy: &configv1alpha1.CredentialIssuerStrategy{
|
||||
Type: configv1alpha1.KubeClusterSigningCertificateStrategyType,
|
||||
@ -536,7 +538,7 @@ func TestAgentController(t *testing.T) {
|
||||
},
|
||||
wantAgentDeployment: nil, // was deleted, but couldn't be recreated
|
||||
// delete to try to recreate deployment when Selector field changes, but create always fails, so keeps trying to recreate
|
||||
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create", "create", "create"},
|
||||
wantDeploymentActionVerbs: []string{"list", "watch", "delete", "create", "create"},
|
||||
wantDeploymentDeleteActionOpts: []metav1.DeleteOptions{
|
||||
testutil.NewPreconditions(healthyAgentDeploymentWithOldStyleSelector.UID, healthyAgentDeploymentWithOldStyleSelector.ResourceVersion),
|
||||
},
|
||||
@ -1029,53 +1031,54 @@ func TestAgentController(t *testing.T) {
|
||||
fakeClock,
|
||||
execCache,
|
||||
log,
|
||||
controllerlib.WithMaxRetries(1),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers, conciergeInformers)
|
||||
errorMessages := runControllerUntilQuiet(ctx, t, controller, hasDeploymentSynced(trackDeleteKubeClient, kubeInformers), kubeInformers, conciergeInformers)
|
||||
|
||||
actualErrors := deduplicate(errorMessages)
|
||||
require.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors")
|
||||
assert.Subsetf(t, actualErrors, tt.wantDistinctErrors, "required error(s) were not found in the actual errors")
|
||||
|
||||
allAllowedErrors := append([]string{}, tt.wantDistinctErrors...)
|
||||
allAllowedErrors = append(allAllowedErrors, tt.alsoAllowUndesiredDistinctErrors...)
|
||||
require.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test")
|
||||
assert.Subsetf(t, allAllowedErrors, actualErrors, "actual errors contained additional error(s) which is not expected by the test")
|
||||
|
||||
assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs")
|
||||
|
||||
// Assert on all actions that happened to deployments.
|
||||
var actualDeploymentActionVerbs []string
|
||||
for _, a := range kubeClientset.Actions() {
|
||||
if a.GetResource().Resource == "deployments" {
|
||||
if a.GetResource().Resource == "deployments" && a.GetVerb() != "get" { // ignore gets caused by hasDeploymentSynced
|
||||
actualDeploymentActionVerbs = append(actualDeploymentActionVerbs, a.GetVerb())
|
||||
}
|
||||
}
|
||||
if tt.wantDeploymentActionVerbs != nil {
|
||||
require.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs)
|
||||
assert.Equal(t, tt.wantDeploymentActionVerbs, actualDeploymentActionVerbs)
|
||||
}
|
||||
if tt.wantDeploymentDeleteActionOpts != nil {
|
||||
require.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts)
|
||||
assert.Equal(t, tt.wantDeploymentDeleteActionOpts, *actualDeleteActionOpts)
|
||||
}
|
||||
|
||||
// Assert that the agent deployment is in the expected final state.
|
||||
deployments, err := kubeClientset.AppsV1().Deployments("concierge").List(ctx, metav1.ListOptions{})
|
||||
require.NoError(t, err)
|
||||
if tt.wantAgentDeployment == nil {
|
||||
require.Empty(t, deployments.Items, "did not expect an agent deployment")
|
||||
} else {
|
||||
require.Len(t, deployments.Items, 1, "expected a single agent deployment")
|
||||
require.Equal(t, tt.wantAgentDeployment, &deployments.Items[0])
|
||||
assert.Empty(t, deployments.Items, "did not expect an agent deployment")
|
||||
} else { //nolint: gocritic
|
||||
if assert.Len(t, deployments.Items, 1, "expected a single agent deployment") {
|
||||
assert.Equal(t, tt.wantAgentDeployment, &deployments.Items[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Assert that the CredentialIssuer is in the expected final state
|
||||
if tt.wantStrategy != nil {
|
||||
credIssuer, err := conciergeClientset.ConfigV1alpha1().CredentialIssuers().Get(ctx, initialCredentialIssuer.Name, metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer")
|
||||
require.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0])
|
||||
ok := assert.NoError(t, err)
|
||||
if ok && assert.Len(t, credIssuer.Status.Strategies, 1, "expected a single strategy in the CredentialIssuer") {
|
||||
assert.Equal(t, tt.wantStrategy, &credIssuer.Status.Strategies[0])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -1156,24 +1159,43 @@ func deduplicate(strings []string) []string {
|
||||
return result
|
||||
}
|
||||
|
||||
func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, informers ...interface{ Start(<-chan struct{}) }) []string {
|
||||
func runControllerUntilQuiet(ctx context.Context, t *testing.T, controller controllerlib.Controller, synced func(ctx context.Context, t *testing.T), informers ...controllerinit.Informer) []string {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
errorStream := make(chan error)
|
||||
var syncErrs []error // track the errors we see during each iteration
|
||||
errorStream := make(chan error, 100) // do not block the controller loop
|
||||
controllerlib.TestWrap(t, controller, func(syncer controllerlib.Syncer) controllerlib.Syncer {
|
||||
return controllerlib.SyncFunc(func(ctx controllerlib.Context) error {
|
||||
synced(ctx.Context, t) // make sure that our informer has caught up with our client
|
||||
|
||||
// if we got the same error twice in a row, prevent the controller sync loop from running
|
||||
if len(syncErrs) >= 2 {
|
||||
lastErr := syncErrs[len(syncErrs)-1]
|
||||
secondToLastErr := syncErrs[len(syncErrs)-2]
|
||||
if lastErr != nil && secondToLastErr != nil && lastErr.Error() == secondToLastErr.Error() {
|
||||
cancel() // not explicitly required but matches our intent
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
err := syncer.Sync(ctx)
|
||||
errorStream <- err
|
||||
|
||||
syncErrs = append(syncErrs, err)
|
||||
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
for _, informer := range informers {
|
||||
informer.Start(ctx.Done())
|
||||
}
|
||||
|
||||
go controller.Run(ctx, 1)
|
||||
// start and sync the informers before running the controller
|
||||
runController, err := controllerinit.Prepare(
|
||||
func(ctx context.Context) { controller.Run(ctx, 1) },
|
||||
func(ctx context.Context, runner controllerinit.Runner) { runner(ctx) },
|
||||
informers...,
|
||||
)(ctx)
|
||||
require.NoError(t, err)
|
||||
go runController(ctx)
|
||||
|
||||
// Wait until the controller is quiet for two seconds.
|
||||
var errorMessages []string
|
||||
@ -1192,3 +1214,24 @@ done:
|
||||
}
|
||||
return errorMessages
|
||||
}
|
||||
|
||||
func hasDeploymentSynced(client kubernetes.Interface, kubeInformers informers.SharedInformerFactory) func(ctx context.Context, t *testing.T) {
|
||||
return func(ctx context.Context, t *testing.T) {
|
||||
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
||||
realDep, realErr := client.AppsV1().Deployments("concierge").
|
||||
Get(ctx, "pinniped-concierge-kube-cert-agent", metav1.GetOptions{})
|
||||
|
||||
cachedDep, cachedErr := kubeInformers.Apps().V1().Deployments().Lister().Deployments("concierge").
|
||||
Get("pinniped-concierge-kube-cert-agent")
|
||||
|
||||
if errors.IsNotFound(realErr) && errors.IsNotFound(cachedErr) {
|
||||
return
|
||||
}
|
||||
|
||||
requireEventually.NoError(realErr)
|
||||
requireEventually.NoError(cachedErr)
|
||||
|
||||
requireEventually.Equal(realDep, cachedDep)
|
||||
}, 2*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ func NewLegacyPodCleanerController(
|
||||
client *kubeclient.Client,
|
||||
agentPods corev1informers.PodInformer,
|
||||
log logr.Logger,
|
||||
options ...controllerlib.Option,
|
||||
) controllerlib.Controller {
|
||||
// legacyAgentLabels are the Kubernetes labels we previously added to agent pods (the new value is "v2").
|
||||
// We also expect these pods to have the "extra" labels configured on the Concierge.
|
||||
@ -67,14 +66,12 @@ func NewLegacyPodCleanerController(
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
append([]controllerlib.Option{
|
||||
controllerlib.WithInformer(
|
||||
agentPods,
|
||||
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
|
||||
return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels()))
|
||||
}, nil),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
}, options...)...,
|
||||
controllerlib.WithInformer(
|
||||
agentPods,
|
||||
pinnipedcontroller.SimpleFilter(func(obj metav1.Object) bool {
|
||||
return obj.GetNamespace() == cfg.Namespace && legacyAgentSelector.Matches(labels.Set(obj.GetLabels()))
|
||||
}, nil),
|
||||
controllerlib.InformerOption{},
|
||||
),
|
||||
)
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
coretesting "k8s.io/client-go/testing"
|
||||
|
||||
"go.pinniped.dev/internal/controllerlib"
|
||||
"go.pinniped.dev/internal/kubeclient"
|
||||
"go.pinniped.dev/internal/testutil"
|
||||
"go.pinniped.dev/internal/testutil/testlogger"
|
||||
@ -175,13 +174,12 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
||||
&kubeclient.Client{Kubernetes: trackDeleteClient},
|
||||
kubeInformers.Core().V1().Pods(),
|
||||
log,
|
||||
controllerlib.WithMaxRetries(1),
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
errorMessages := runControllerUntilQuiet(ctx, t, controller, kubeInformers)
|
||||
errorMessages := runControllerUntilQuiet(ctx, t, controller, func(_ context.Context, _ *testing.T) {}, kubeInformers)
|
||||
assert.Equal(t, tt.wantDistinctErrors, deduplicate(errorMessages), "unexpected errors")
|
||||
assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs")
|
||||
assert.Equal(t, tt.wantActions, kubeClientset.Actions()[2:], "unexpected actions")
|
||||
|
@ -22,8 +22,8 @@ type RunnerWrapper func(context.Context, Runner)
|
||||
// It is expected to be called in the main go routine since the construction can fail.
|
||||
type RunnerBuilder func(context.Context) (Runner, error)
|
||||
|
||||
// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync.
|
||||
type informer interface {
|
||||
// Informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync.
|
||||
type Informer interface {
|
||||
Start(stopCh <-chan struct{})
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
||||
}
|
||||
@ -31,7 +31,7 @@ type informer interface {
|
||||
// Prepare returns RunnerBuilder that, when called:
|
||||
// 1. Starts all provided informers and waits for them sync (and fails if they hang)
|
||||
// 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare
|
||||
func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder {
|
||||
func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...Informer) RunnerBuilder {
|
||||
return func(ctx context.Context) (Runner, error) {
|
||||
for _, informer := range informers {
|
||||
informer := informer
|
||||
|
Loading…
Reference in New Issue
Block a user