Merge pull request #598 from enj/enj/i/gc_tz

supervisor gc: use singleton queue
This commit is contained in:
Mo Khan 2021-05-04 15:08:06 -04:00 committed by GitHub
commit 4bb0fdeddd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 13 deletions

View File

@ -59,7 +59,7 @@ func GarbageCollectorController(
return isSecretWithGCAnnotation(oldObj) || isSecretWithGCAnnotation(newObj)
},
DeleteFunc: func(obj metav1.Object) bool { return false }, // ignore all deletes
ParentFunc: nil,
ParentFunc: pinnipedcontroller.SingletonQueue(),
},
controllerlib.InformerOption{},
),
@ -67,16 +67,20 @@ func GarbageCollectorController(
}
func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error {
// make sure we have a consistent, static meaning for the current time during the sync loop
frozenClock := clock.NewFakeClock(c.clock.Now())
// The Sync method is triggered upon any change to any Secret, which would make this
// controller too chatty, so it rate limits itself to a more reasonable interval.
// Note that even during a period when no secrets are changing, it will still run
// at the informer's full-resync interval (as long as there are some secrets).
if c.clock.Now().Sub(c.timeOfMostRecentSweep) < minimumRepeatInterval {
if since := frozenClock.Since(c.timeOfMostRecentSweep); since < minimumRepeatInterval {
ctx.Queue.AddAfter(ctx.Key, minimumRepeatInterval-since)
return nil
}
plog.Info("starting storage garbage collection sweep")
c.timeOfMostRecentSweep = c.clock.Now()
c.timeOfMostRecentSweep = frozenClock.Now()
listOfSecrets, err := c.secretInformer.Lister().List(labels.Everything())
if err != nil {
@ -97,7 +101,7 @@ func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error {
continue
}
if garbageCollectAfterTime.Before(c.clock.Now()) {
if garbageCollectAfterTime.Before(frozenClock.Now()) {
err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{})
if err != nil {
plog.WarningErr("failed to garbage collect resource", err, logKV(secret))

View File

@ -66,6 +66,10 @@ func TestGarbageCollectorControllerInformerFilters(t *testing.T) {
r.True(subject.Update(secretWithAnnotation, otherSecret))
r.True(subject.Update(otherSecret, secretWithAnnotation))
})
it("returns the same singleton key", func() {
r.Equal(controllerlib.Key{}, subject.Parent(secretWithAnnotation))
})
})
when("any Secret with the required annotation is deleted", func() {
@ -136,9 +140,10 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Context: cancelContext,
Name: subject.Name(),
Key: controllerlib.Key{
Namespace: "",
Name: "",
Namespace: "foo",
Name: "bar",
},
Queue: &testQueue{t: t},
}
// Must start informers before calling TestRunSynchronously()
@ -262,16 +267,23 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
// Run sync once with the current time set to frozenTime.
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
require.Empty(t, kubeClient.Actions())
r.False(syncContext.Queue.(*testQueue).called)
// Run sync again when not enough time has passed since the most recent run, so no delete
// operations should happen even though there is a expired secret now.
fakeClock.Step(29 * time.Second)
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
require.Empty(t, kubeClient.Actions())
r.True(syncContext.Queue.(*testQueue).called)
r.Equal(controllerlib.Key{Namespace: "foo", Name: "bar"}, syncContext.Queue.(*testQueue).key) // assert key is passed through
r.Equal(time.Second, syncContext.Queue.(*testQueue).duration) // assert that we get the exact requeue time
syncContext.Queue = &testQueue{t: t} // reset the queue for the next sync
// Step to the exact threshold and run Sync again. Now we are past the rate limiting period.
fakeClock.Step(1*time.Second + 1*time.Millisecond)
fakeClock.Step(time.Second)
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.False(syncContext.Queue.(*testQueue).called)
// It should have deleted the expired secret.
r.ElementsMatch(
@ -381,3 +393,23 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
})
}, spec.Parallel(), spec.Report(report.Terminal{}))
}
type testQueue struct {
t *testing.T
called bool
key controllerlib.Key
duration time.Duration
controllerlib.Queue // panic if any other methods called
}
func (q *testQueue) AddAfter(key controllerlib.Key, duration time.Duration) {
q.t.Helper()
require.False(q.t, q.called, "AddAfter should only be called once")
q.called = true
q.key = key
q.duration = duration
}

View File

@ -48,10 +48,15 @@ func TestStorageGarbageCollection(t *testing.T) {
// in the same namespace just to get the controller to respond faster.
// This is just a performance optimization to make this test pass faster because otherwise
// this test has to wait ~3 minutes for the controller's next full-resync.
stopCh := make(chan bool, 1) // It is important that this channel be buffered.
go updateSecretEveryTwoSeconds(t, stopCh, secrets, secretNotYetExpired)
stopCh := make(chan struct{})
errCh := make(chan error)
go updateSecretEveryTwoSeconds(stopCh, errCh, secrets, secretNotYetExpired)
t.Cleanup(func() {
stopCh <- true
close(stopCh)
if updateErr := <-errCh; updateErr != nil {
panic(updateErr)
}
})
// Wait long enough for the next periodic sweep of the GC controller for the secrets to be deleted, which
@ -69,10 +74,15 @@ func TestStorageGarbageCollection(t *testing.T) {
require.NoError(t, err)
}
func updateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1client.SecretInterface, secret *v1.Secret) {
func updateSecretEveryTwoSeconds(stopCh chan struct{}, errCh chan error, secrets corev1client.SecretInterface, secret *v1.Secret) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
var updateErr error
defer func() {
errCh <- updateErr
}()
i := 0
for {
select {
@ -87,9 +97,25 @@ func updateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1c
i++
secret.Data["foo"] = []byte(fmt.Sprintf("bar-%d", i))
var updateErr error
secret, updateErr = secrets.Update(ctx, secret, metav1.UpdateOptions{})
require.NoError(t, updateErr)
switch {
case updateErr == nil:
// continue to next update
case k8serrors.IsConflict(updateErr), k8serrors.IsNotFound(updateErr):
select {
case _, ok := <-stopCh:
if !ok { // stopCh is closed meaning that test is already finished so these errors are expected
updateErr = nil
}
default:
}
return // even if the error is expected, we must stop
default:
return // unexpected error
}
}
}