diff --git a/internal/controller/supervisorstorage/garbage_collector.go b/internal/controller/supervisorstorage/garbage_collector.go index e2d35d30..6da85d3f 100644 --- a/internal/controller/supervisorstorage/garbage_collector.go +++ b/internal/controller/supervisorstorage/garbage_collector.go @@ -59,7 +59,7 @@ func GarbageCollectorController( return isSecretWithGCAnnotation(oldObj) || isSecretWithGCAnnotation(newObj) }, DeleteFunc: func(obj metav1.Object) bool { return false }, // ignore all deletes - ParentFunc: nil, + ParentFunc: pinnipedcontroller.SingletonQueue(), }, controllerlib.InformerOption{}, ), @@ -67,16 +67,20 @@ func GarbageCollectorController( } func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error { + // make sure we have a consistent, static meaning for the current time during the sync loop + frozenClock := clock.NewFakeClock(c.clock.Now()) + // The Sync method is triggered upon any change to any Secret, which would make this // controller too chatty, so it rate limits itself to a more reasonable interval. // Note that even during a period when no secrets are changing, it will still run // at the informer's full-resync interval (as long as there are some secrets). - if c.clock.Now().Sub(c.timeOfMostRecentSweep) < minimumRepeatInterval { + if since := frozenClock.Since(c.timeOfMostRecentSweep); since < minimumRepeatInterval { + ctx.Queue.AddAfter(ctx.Key, minimumRepeatInterval-since) return nil } plog.Info("starting storage garbage collection sweep") - c.timeOfMostRecentSweep = c.clock.Now() + c.timeOfMostRecentSweep = frozenClock.Now() listOfSecrets, err := c.secretInformer.Lister().List(labels.Everything()) if err != nil { @@ -97,7 +101,7 @@ func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error { continue } - if garbageCollectAfterTime.Before(c.clock.Now()) { + if garbageCollectAfterTime.Before(frozenClock.Now()) { err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{}) if err != nil { plog.WarningErr("failed to garbage collect resource", err, logKV(secret)) diff --git a/internal/controller/supervisorstorage/garbage_collector_test.go b/internal/controller/supervisorstorage/garbage_collector_test.go index a5606cf8..ac64acf3 100644 --- a/internal/controller/supervisorstorage/garbage_collector_test.go +++ b/internal/controller/supervisorstorage/garbage_collector_test.go @@ -66,6 +66,10 @@ func TestGarbageCollectorControllerInformerFilters(t *testing.T) { r.True(subject.Update(secretWithAnnotation, otherSecret)) r.True(subject.Update(otherSecret, secretWithAnnotation)) }) + + it("returns the same singleton key", func() { + r.Equal(controllerlib.Key{}, subject.Parent(secretWithAnnotation)) + }) }) when("any Secret with the required annotation is deleted", func() { @@ -136,9 +140,10 @@ func TestGarbageCollectorControllerSync(t *testing.T) { Context: cancelContext, Name: subject.Name(), Key: controllerlib.Key{ - Namespace: "", - Name: "", + Namespace: "foo", + Name: "bar", }, + Queue: &testQueue{t: t}, } // Must start informers before calling TestRunSynchronously() @@ -262,16 +267,23 @@ func TestGarbageCollectorControllerSync(t *testing.T) { // Run sync once with the current time set to frozenTime. r.NoError(controllerlib.TestSync(t, subject, *syncContext)) require.Empty(t, kubeClient.Actions()) + r.False(syncContext.Queue.(*testQueue).called) // Run sync again when not enough time has passed since the most recent run, so no delete // operations should happen even though there is a expired secret now. fakeClock.Step(29 * time.Second) r.NoError(controllerlib.TestSync(t, subject, *syncContext)) require.Empty(t, kubeClient.Actions()) + r.True(syncContext.Queue.(*testQueue).called) + r.Equal(controllerlib.Key{Namespace: "foo", Name: "bar"}, syncContext.Queue.(*testQueue).key) // assert key is passed through + r.Equal(time.Second, syncContext.Queue.(*testQueue).duration) // assert that we get the exact requeue time + + syncContext.Queue = &testQueue{t: t} // reset the queue for the next sync // Step to the exact threshold and run Sync again. Now we are past the rate limiting period. - fakeClock.Step(1*time.Second + 1*time.Millisecond) + fakeClock.Step(time.Second) r.NoError(controllerlib.TestSync(t, subject, *syncContext)) + r.False(syncContext.Queue.(*testQueue).called) // It should have deleted the expired secret. r.ElementsMatch( @@ -381,3 +393,23 @@ func TestGarbageCollectorControllerSync(t *testing.T) { }) }, spec.Parallel(), spec.Report(report.Terminal{})) } + +type testQueue struct { + t *testing.T + + called bool + key controllerlib.Key + duration time.Duration + + controllerlib.Queue // panic if any other methods called +} + +func (q *testQueue) AddAfter(key controllerlib.Key, duration time.Duration) { + q.t.Helper() + + require.False(q.t, q.called, "AddAfter should only be called once") + + q.called = true + q.key = key + q.duration = duration +} diff --git a/test/integration/supervisor_storage_garbage_collection_test.go b/test/integration/supervisor_storage_garbage_collection_test.go index 84b82196..1394b6f4 100644 --- a/test/integration/supervisor_storage_garbage_collection_test.go +++ b/test/integration/supervisor_storage_garbage_collection_test.go @@ -48,10 +48,15 @@ func TestStorageGarbageCollection(t *testing.T) { // in the same namespace just to get the controller to respond faster. // This is just a performance optimization to make this test pass faster because otherwise // this test has to wait ~3 minutes for the controller's next full-resync. - stopCh := make(chan bool, 1) // It is important that this channel be buffered. - go updateSecretEveryTwoSeconds(t, stopCh, secrets, secretNotYetExpired) + stopCh := make(chan struct{}) + errCh := make(chan error) + go updateSecretEveryTwoSeconds(stopCh, errCh, secrets, secretNotYetExpired) t.Cleanup(func() { - stopCh <- true + close(stopCh) + + if updateErr := <-errCh; updateErr != nil { + panic(updateErr) + } }) // Wait long enough for the next periodic sweep of the GC controller for the secrets to be deleted, which @@ -69,10 +74,15 @@ func TestStorageGarbageCollection(t *testing.T) { require.NoError(t, err) } -func updateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1client.SecretInterface, secret *v1.Secret) { +func updateSecretEveryTwoSeconds(stopCh chan struct{}, errCh chan error, secrets corev1client.SecretInterface, secret *v1.Secret) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() + var updateErr error + defer func() { + errCh <- updateErr + }() + i := 0 for { select { @@ -87,9 +97,25 @@ func updateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1c i++ secret.Data["foo"] = []byte(fmt.Sprintf("bar-%d", i)) - var updateErr error secret, updateErr = secrets.Update(ctx, secret, metav1.UpdateOptions{}) - require.NoError(t, updateErr) + + switch { + case updateErr == nil: + // continue to next update + + case k8serrors.IsConflict(updateErr), k8serrors.IsNotFound(updateErr): + select { + case _, ok := <-stopCh: + if !ok { // stopCh is closed meaning that test is already finished so these errors are expected + updateErr = nil + } + default: + } + + return // even if the error is expected, we must stop + default: + return // unexpected error + } } }