diff --git a/cmd/pinniped-supervisor/main.go b/cmd/pinniped-supervisor/main.go index 31f5dff8..0c54964d 100644 --- a/cmd/pinniped-supervisor/main.go +++ b/cmd/pinniped-supervisor/main.go @@ -29,6 +29,7 @@ import ( "go.pinniped.dev/internal/config/supervisor" "go.pinniped.dev/internal/controller/supervisorconfig" "go.pinniped.dev/internal/controller/supervisorconfig/upstreamwatcher" + "go.pinniped.dev/internal/controller/supervisorstorage" "go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/downward" "go.pinniped.dev/internal/oidc/jwks" @@ -84,6 +85,15 @@ func startControllers( // Create controller manager. controllerManager := controllerlib. NewManager(). + WithController( + supervisorstorage.GarbageCollectorController( + clock.RealClock{}, + kubeClient, + kubeInformers.Core().V1().Secrets(), + controllerlib.WithInformer, + ), + singletonWorker, + ). WithController( supervisorconfig.NewOIDCProviderWatcherController( issuerManager, diff --git a/internal/controller/supervisorstorage/garbage_collector.go b/internal/controller/supervisorstorage/garbage_collector.go index 95b87639..d2f6aef3 100644 --- a/internal/controller/supervisorstorage/garbage_collector.go +++ b/internal/controller/supervisorstorage/garbage_collector.go @@ -6,8 +6,10 @@ package supervisorstorage import ( "time" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/clock" corev1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -17,12 +19,17 @@ import ( "go.pinniped.dev/internal/plog" ) +const minimumRepeatInterval = 30 * time.Second + type garbageCollectorController struct { - secretInformer corev1informers.SecretInformer - kubeClient kubernetes.Interface + secretInformer corev1informers.SecretInformer + kubeClient kubernetes.Interface + clock clock.Clock + timeOfMostRecentSweep time.Time } func GarbageCollectorController( + clock clock.Clock, kubeClient kubernetes.Interface, secretInformer corev1informers.SecretInformer, withInformer pinnipedcontroller.WithInformerOptionFunc, @@ -33,39 +40,66 @@ func GarbageCollectorController( Syncer: &garbageCollectorController{ secretInformer: secretInformer, kubeClient: kubeClient, + clock: clock, }, }, withInformer( secretInformer, - pinnipedcontroller.MatchNothingFilter(nil), + pinnipedcontroller.MatchAnythingFilter(nil), controllerlib.InformerOption{}, ), ) } func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error { + // The Sync method is triggered upon any change to any Secret, which would make this + // controller too chatty, so it rate limits itself to a more reasonable interval. + // Note that even during a period when no secrets are changing, it will still run + // at the informer's full-resync interval (as long as there are some secrets). + if c.clock.Now().Sub(c.timeOfMostRecentSweep) < minimumRepeatInterval { + return nil + } + + plog.Info("starting storage garbage collection sweep") + c.timeOfMostRecentSweep = c.clock.Now() + listOfSecrets, err := c.secretInformer.Lister().List(labels.Everything()) if err != nil { return err } + for i := range listOfSecrets { secret := listOfSecrets[i] - s, ok := secret.Annotations[crud.SecretLifetimeAnnotationKey] + + timeString, ok := secret.Annotations[crud.SecretLifetimeAnnotationKey] if !ok { continue } - currentTime := time.Now() - garbageCollectAfterTime, err := time.Parse(time.RFC3339, s) + + garbageCollectAfterTime, err := time.Parse(crud.SecretLifetimeAnnotationDateFormat, timeString) if err != nil { - plog.WarningErr("could not parse for garbage collection", err, "secretName", secret.Name, "garbageCollectAfter", s) + plog.WarningErr("could not parse resource timestamp for garbage collection", err, logKV(secret)) continue } - if garbageCollectAfterTime.Before(currentTime) { + + if garbageCollectAfterTime.Before(c.clock.Now()) { err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{}) if err != nil { - plog.WarningErr("failed to garbage collect value", err, "secretName", secret.Name, "garbageCollectAfter", s) + plog.WarningErr("failed to garbage collect resource", err, logKV(secret)) + continue } + plog.Info("storage garbage collector deleted resource", logKV(secret)) } } + return nil } + +func logKV(secret *v1.Secret) []interface{} { + return []interface{}{ + "secretName", secret.Name, + "secretNamespace", secret.Namespace, + "secretType", string(secret.Type), + "garbageCollectAfter", secret.Annotations[crud.SecretLifetimeAnnotationKey], + } +} diff --git a/internal/controller/supervisorstorage/garbage_collector_test.go b/internal/controller/supervisorstorage/garbage_collector_test.go index e2167b88..81f104df 100644 --- a/internal/controller/supervisorstorage/garbage_collector_test.go +++ b/internal/controller/supervisorstorage/garbage_collector_test.go @@ -16,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" kubeinformers "k8s.io/client-go/informers" kubernetesfake "k8s.io/client-go/kubernetes/fake" kubetesting "k8s.io/client-go/testing" @@ -37,6 +38,7 @@ func TestGarbageCollectorControllerInformerFilters(t *testing.T) { observableWithInformerOption = testutil.NewObservableWithInformerOption() secretsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Secrets() _ = GarbageCollectorController( + clock.RealClock{}, nil, secretsInformer, observableWithInformerOption.WithInformer, // make it possible to observe the behavior of the Filters @@ -57,11 +59,11 @@ func TestGarbageCollectorControllerInformerFilters(t *testing.T) { }) when("any Secret changes", func() { - it("returns false to avoid triggering the sync function", func() { - r.False(subject.Add(secret)) - r.False(subject.Update(secret, otherSecret)) - r.False(subject.Update(otherSecret, secret)) - r.False(subject.Delete(secret)) + it("returns true to trigger the sync function for all secrets", func() { + r.True(subject.Add(secret)) + r.True(subject.Update(secret, otherSecret)) + r.True(subject.Update(otherSecret, secret)) + r.True(subject.Delete(secret)) }) }) }) @@ -75,10 +77,6 @@ func TestGarbageCollectorControllerSync(t *testing.T) { Resource: "secrets", } - firstExpiredTime := time.Date(1900, time.January, 1, 1, 0, 0, 0, time.UTC).Format(time.RFC3339) - secondExpiredTime := time.Date(1901, time.January, 1, 1, 0, 0, 0, time.UTC).Format(time.RFC3339) - unexpiredTime := time.Now().Add(time.Hour * 24).UTC().Format(time.RFC3339) - spec.Run(t, "Sync", func(t *testing.T, when spec.G, it spec.S) { const ( installedInNamespace = "some-namespace" @@ -93,6 +91,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) { timeoutContext context.Context timeoutContextCancel context.CancelFunc syncContext *controllerlib.Context + fakeClock *clock.FakeClock + frozenNow time.Time ) // Defer starting the informers until the last possible moment so that the @@ -100,6 +100,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { var startInformersAndController = func() { // Set this at the last second to allow for injection of server override. subject = GarbageCollectorController( + fakeClock, kubeClient, kubeInformers.Core().V1().Secrets(), controllerlib.WithInformer, @@ -128,6 +129,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) { kubeInformerClient = kubernetesfake.NewSimpleClientset() kubeClient = kubernetesfake.NewSimpleClientset() kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0) + frozenNow = time.Now().UTC() + fakeClock = clock.NewFakeClock(frozenNow) unrelatedSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -163,7 +166,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { Name: "first expired secret", Namespace: installedInNamespace, Annotations: map[string]string{ - "storage.pinniped.dev/garbage-collect-after": firstExpiredTime, + "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339), }, }, } @@ -174,7 +177,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { Name: "second expired secret", Namespace: installedInNamespace, Annotations: map[string]string{ - "storage.pinniped.dev/garbage-collect-after": secondExpiredTime, + "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-2 * time.Second).Format(time.RFC3339), }, }, } @@ -185,7 +188,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { Name: "unexpired secret", Namespace: installedInNamespace, Annotations: map[string]string{ - "storage.pinniped.dev/garbage-collect-after": unexpiredTime, + "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(time.Second).Format(time.RFC3339), }, }, } @@ -211,6 +214,54 @@ func TestGarbageCollectorControllerSync(t *testing.T) { }) }) + when("very little time has passed since the previous sync call", func() { + it.Before(func() { + // Add a secret that will expire in 20 seconds. + expiredSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "expired secret", + Namespace: installedInNamespace, + Annotations: map[string]string{ + "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(20 * time.Second).Format(time.RFC3339), + }, + }, + } + r.NoError(kubeInformerClient.Tracker().Add(expiredSecret)) + r.NoError(kubeClient.Tracker().Add(expiredSecret)) + }) + + it("should do nothing to avoid being super chatty since it is called for every change to any Secret, until more time has passed", func() { + startInformersAndController() + require.Empty(t, kubeClient.Actions()) + + // Run sync once with the current time set to frozenTime. + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + require.Empty(t, kubeClient.Actions()) + + // Run sync again when not enough time has passed since the most recent run, so no delete + // operations should happen even though there is a expired secret now. + fakeClock.Step(29 * time.Second) + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + require.Empty(t, kubeClient.Actions()) + + // Step to the exact threshold and run Sync again. Now we are past the rate limiting period. + fakeClock.Step(1*time.Second + 1*time.Millisecond) + r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + + // It should have deleted the expired secret. + r.ElementsMatch( + []kubetesting.Action{ + kubetesting.NewDeleteAction(secretsGVR, installedInNamespace, "expired secret"), + }, + kubeClient.Actions(), + ) + list, err := kubeClient.CoreV1().Secrets(installedInNamespace).List(context.Background(), metav1.ListOptions{}) + r.NoError(err) + r.Len(list.Items, 1) + r.Equal("some other unrelated secret", list.Items[0].Name) + }) + }) + when("there is a secret with a malformed garbage-collect-after date", func() { it.Before(func() { malformedSecret := &corev1.Secret{ @@ -229,7 +280,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { Name: "expired secret", Namespace: installedInNamespace, Annotations: map[string]string{ - "storage.pinniped.dev/garbage-collect-after": firstExpiredTime, + "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339), }, }, } @@ -254,14 +305,14 @@ func TestGarbageCollectorControllerSync(t *testing.T) { }) }) - when("the delete call fails", func() { + when("the kube API delete call fails", func() { it.Before(func() { erroringSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "erroring secret", Namespace: installedInNamespace, Annotations: map[string]string{ - "storage.pinniped.dev/garbage-collect-after": firstExpiredTime, + "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339), }, }, } @@ -278,7 +329,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { Name: "expired secret", Namespace: installedInNamespace, Annotations: map[string]string{ - "storage.pinniped.dev/garbage-collect-after": firstExpiredTime, + "storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339), }, }, } @@ -286,7 +337,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) { r.NoError(kubeClient.Tracker().Add(expiredSecret)) }) - it("continues on to delete the next one", func() { + it("ignores the error and continues on to delete the next expired Secret", func() { startInformersAndController() r.NoError(controllerlib.TestSync(t, subject, *syncContext)) diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 4d0080e1..354b6a3d 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -22,11 +22,6 @@ func MatchAnythingFilter(parentFunc controllerlib.ParentFunc) controllerlib.Filt return SimpleFilter(func(object metav1.Object) bool { return true }, parentFunc) } -// MatchNothingFilter returns a controllerlib.Filter that allows no objects. -func MatchNothingFilter(parentFunc controllerlib.ParentFunc) controllerlib.Filter { - return SimpleFilter(func(object metav1.Object) bool { return false }, parentFunc) -} - // SimpleFilter takes a single boolean match function on a metav1.Object and wraps it into a proper controllerlib.Filter. func SimpleFilter(match func(metav1.Object) bool, parentFunc controllerlib.ParentFunc) controllerlib.Filter { return controllerlib.FilterFuncs{ diff --git a/internal/crud/crud.go b/internal/crud/crud.go index d1edeca2..84abe142 100644 --- a/internal/crud/crud.go +++ b/internal/crud/crud.go @@ -23,8 +23,10 @@ import ( //nolint:gosec // ignore lint warnings that these are credentials const ( - SecretLabelKey = "storage.pinniped.dev/type" - SecretLifetimeAnnotationKey = "storage.pinniped.dev/garbage-collect-after" + SecretLabelKey = "storage.pinniped.dev/type" + + SecretLifetimeAnnotationKey = "storage.pinniped.dev/garbage-collect-after" + SecretLifetimeAnnotationDateFormat = time.RFC3339 secretNameFormat = "pinniped-storage-%s-%s" secretTypeFormat = "storage.pinniped.dev/%s" @@ -178,7 +180,7 @@ func (s *secretsStorage) toSecret(signature, resourceVersion string, data JSON, ResourceVersion: resourceVersion, Labels: labelsToAdd, Annotations: map[string]string{ - SecretLifetimeAnnotationKey: s.clock().Add(s.lifetime).UTC().Format(time.RFC3339), + SecretLifetimeAnnotationKey: s.clock().Add(s.lifetime).UTC().Format(SecretLifetimeAnnotationDateFormat), }, OwnerReferences: nil, }, diff --git a/test/integration/supervisor_storage_garbage_collection_test.go b/test/integration/supervisor_storage_garbage_collection_test.go new file mode 100644 index 00000000..a80a6a79 --- /dev/null +++ b/test/integration/supervisor_storage_garbage_collection_test.go @@ -0,0 +1,130 @@ +// Copyright 2020 the Pinniped contributors. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + + "go.pinniped.dev/internal/crud" + "go.pinniped.dev/test/library" +) + +func TestStorageGarbageCollection(t *testing.T) { + // Run this test in parallel with the other integration tests because it does a lot of waiting + // and will not impact other tests, or be impacted by other tests, when run in parallel. + t.Parallel() + + env := library.IntegrationEnv(t) + client := library.NewClientset(t) + secrets := client.CoreV1().Secrets(env.SupervisorNamespace) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + secretAlreadyExpired := createSecret(ctx, t, secrets, "past", time.Now().Add(-time.Second)) + secretWhichWillExpireBeforeTheTestEnds := createSecret(ctx, t, secrets, "near-future", time.Now().Add(30*time.Second)) + secretNotYetExpired := createSecret(ctx, t, secrets, "far-future", time.Now().Add(10*time.Minute)) + + var err error + secretIsNotFound := func(secretName string) func() bool { + return func() bool { + _, err = secrets.Get(ctx, secretName, metav1.GetOptions{}) + return k8serrors.IsNotFound(err) + } + } + + // Start a background goroutine which will end as soon as the test ends. + // Keep updating a secret in the same namespace just to get the controller to respond faster. + // This is just a performance optimization because otherwise this test has to wait + // ~3 minutes for the controller's next full-resync. + stopCh := make(chan bool, 1) // It is important that this channel be buffered. + go createAndUpdateSecretEveryTwoSeconds(t, stopCh, secrets) + t.Cleanup(func() { + stopCh <- true + }) + + // Wait long enough for the next periodic sweep of the GC controller for the secrets to be deleted, which + // is the worst-case length of time that we should ever need to wait. Because of the goroutine above, + // in practice we should only need to wait about 30 seconds, which is the GC controller's self-imposed + // rate throttling time period. + slightlyLongerThanGCControllerFullResyncPeriod := 3*time.Minute + 30*time.Second + assert.Eventually(t, secretIsNotFound(secretAlreadyExpired.Name), slightlyLongerThanGCControllerFullResyncPeriod, 250*time.Millisecond) + require.Truef(t, k8serrors.IsNotFound(err), "wanted a NotFound error but got %v", err) // prints out the error and stops the test in case of failure + assert.Eventually(t, secretIsNotFound(secretWhichWillExpireBeforeTheTestEnds.Name), slightlyLongerThanGCControllerFullResyncPeriod, 250*time.Millisecond) + require.Truef(t, k8serrors.IsNotFound(err), "wanted a NotFound error but got %v", err) // prints out the error and stops the test in case of failure + + // The unexpired secret should not have been deleted within the timeframe of this test run. + _, err = secrets.Get(ctx, secretNotYetExpired.Name, metav1.GetOptions{}) + require.NoError(t, err) +} + +func createAndUpdateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1client.SecretInterface) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + unrelatedSecret := createSecret(ctx, t, secrets, "unrelated-to-gc", time.Time{}) + + i := 0 + for { + select { + case <-stopCh: + // Got a signal, so stop running. + return + default: + // Channel had no message, so keep running. + } + + time.Sleep(2 * time.Second) + + i++ + unrelatedSecret.Data["foo"] = []byte(fmt.Sprintf("bar-%d", i)) + var updateErr error + unrelatedSecret, updateErr = secrets.Update(ctx, unrelatedSecret, metav1.UpdateOptions{}) + require.NoError(t, updateErr) + } +} + +func createSecret(ctx context.Context, t *testing.T, secrets corev1client.SecretInterface, name string, expiresAt time.Time) *v1.Secret { + secret, err := secrets.Create(ctx, newSecret("pinniped-storage-gc-integration-test-"+name+"-", expiresAt), metav1.CreateOptions{}) + require.NoError(t, err) + + // Make sure the Secret is deleted when the test ends. + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err := secrets.Delete(ctx, secret.Name, metav1.DeleteOptions{}) + notFound := k8serrors.IsNotFound(err) + if !notFound { + // it's okay if the Secret was already deleted, but other errors are cleanup failures + require.NoError(t, err) + } + }) + + return secret +} + +func newSecret(namePrefix string, expiresAt time.Time) *v1.Secret { + annotations := map[string]string{} + if !expiresAt.Equal(time.Time{}) { + // Mark the secret for garbage collection. + annotations[crud.SecretLifetimeAnnotationKey] = expiresAt.UTC().Format(time.RFC3339) + } + return &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: namePrefix, + Annotations: annotations, + }, + Data: map[string][]byte{"some-key": []byte("fake-data")}, + Type: "storage.pinniped.dev/gc-test-integration-test", + } +} diff --git a/test/integration/storage_test.go b/test/integration/supervisor_storage_test.go similarity index 100% rename from test/integration/storage_test.go rename to test/integration/supervisor_storage_test.go