supervisor gc: use singleton queue

The supervisor treats all events the same hence it must use a
singleton queue.

Updated the integration test to remove the data race caused by
calling methods on testing.T outside of the main test go routine.

Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
Monis Khan 2021-05-04 12:38:47 -04:00
parent 1586171876
commit 4ce77c4837
No known key found for this signature in database
GPG Key ID: 52C90ADA01B269B8
3 changed files with 75 additions and 13 deletions

View File

@ -59,7 +59,7 @@ func GarbageCollectorController(
return isSecretWithGCAnnotation(oldObj) || isSecretWithGCAnnotation(newObj) return isSecretWithGCAnnotation(oldObj) || isSecretWithGCAnnotation(newObj)
}, },
DeleteFunc: func(obj metav1.Object) bool { return false }, // ignore all deletes DeleteFunc: func(obj metav1.Object) bool { return false }, // ignore all deletes
ParentFunc: nil, ParentFunc: pinnipedcontroller.SingletonQueue(),
}, },
controllerlib.InformerOption{}, controllerlib.InformerOption{},
), ),
@ -67,16 +67,20 @@ func GarbageCollectorController(
} }
func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error { func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error {
// make sure we have a consistent, static meaning for the current time during the sync loop
frozenClock := clock.NewFakeClock(c.clock.Now())
// The Sync method is triggered upon any change to any Secret, which would make this // The Sync method is triggered upon any change to any Secret, which would make this
// controller too chatty, so it rate limits itself to a more reasonable interval. // controller too chatty, so it rate limits itself to a more reasonable interval.
// Note that even during a period when no secrets are changing, it will still run // Note that even during a period when no secrets are changing, it will still run
// at the informer's full-resync interval (as long as there are some secrets). // at the informer's full-resync interval (as long as there are some secrets).
if c.clock.Now().Sub(c.timeOfMostRecentSweep) < minimumRepeatInterval { if since := frozenClock.Since(c.timeOfMostRecentSweep); since < minimumRepeatInterval {
ctx.Queue.AddAfter(ctx.Key, minimumRepeatInterval-since)
return nil return nil
} }
plog.Info("starting storage garbage collection sweep") plog.Info("starting storage garbage collection sweep")
c.timeOfMostRecentSweep = c.clock.Now() c.timeOfMostRecentSweep = frozenClock.Now()
listOfSecrets, err := c.secretInformer.Lister().List(labels.Everything()) listOfSecrets, err := c.secretInformer.Lister().List(labels.Everything())
if err != nil { if err != nil {
@ -97,7 +101,7 @@ func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error {
continue continue
} }
if garbageCollectAfterTime.Before(c.clock.Now()) { if garbageCollectAfterTime.Before(frozenClock.Now()) {
err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{}) err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{})
if err != nil { if err != nil {
plog.WarningErr("failed to garbage collect resource", err, logKV(secret)) plog.WarningErr("failed to garbage collect resource", err, logKV(secret))

View File

@ -66,6 +66,10 @@ func TestGarbageCollectorControllerInformerFilters(t *testing.T) {
r.True(subject.Update(secretWithAnnotation, otherSecret)) r.True(subject.Update(secretWithAnnotation, otherSecret))
r.True(subject.Update(otherSecret, secretWithAnnotation)) r.True(subject.Update(otherSecret, secretWithAnnotation))
}) })
it("returns the same singleton key", func() {
r.Equal(controllerlib.Key{}, subject.Parent(secretWithAnnotation))
})
}) })
when("any Secret with the required annotation is deleted", func() { when("any Secret with the required annotation is deleted", func() {
@ -136,9 +140,10 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Context: cancelContext, Context: cancelContext,
Name: subject.Name(), Name: subject.Name(),
Key: controllerlib.Key{ Key: controllerlib.Key{
Namespace: "", Namespace: "foo",
Name: "", Name: "bar",
}, },
Queue: &testQueue{t: t},
} }
// Must start informers before calling TestRunSynchronously() // Must start informers before calling TestRunSynchronously()
@ -262,16 +267,23 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
// Run sync once with the current time set to frozenTime. // Run sync once with the current time set to frozenTime.
r.NoError(controllerlib.TestSync(t, subject, *syncContext)) r.NoError(controllerlib.TestSync(t, subject, *syncContext))
require.Empty(t, kubeClient.Actions()) require.Empty(t, kubeClient.Actions())
r.False(syncContext.Queue.(*testQueue).called)
// Run sync again when not enough time has passed since the most recent run, so no delete // Run sync again when not enough time has passed since the most recent run, so no delete
// operations should happen even though there is a expired secret now. // operations should happen even though there is a expired secret now.
fakeClock.Step(29 * time.Second) fakeClock.Step(29 * time.Second)
r.NoError(controllerlib.TestSync(t, subject, *syncContext)) r.NoError(controllerlib.TestSync(t, subject, *syncContext))
require.Empty(t, kubeClient.Actions()) require.Empty(t, kubeClient.Actions())
r.True(syncContext.Queue.(*testQueue).called)
r.Equal(controllerlib.Key{Namespace: "foo", Name: "bar"}, syncContext.Queue.(*testQueue).key) // assert key is passed through
r.Equal(time.Second, syncContext.Queue.(*testQueue).duration) // assert that we get the exact requeue time
syncContext.Queue = &testQueue{t: t} // reset the queue for the next sync
// Step to the exact threshold and run Sync again. Now we are past the rate limiting period. // Step to the exact threshold and run Sync again. Now we are past the rate limiting period.
fakeClock.Step(1*time.Second + 1*time.Millisecond) fakeClock.Step(time.Second)
r.NoError(controllerlib.TestSync(t, subject, *syncContext)) r.NoError(controllerlib.TestSync(t, subject, *syncContext))
r.False(syncContext.Queue.(*testQueue).called)
// It should have deleted the expired secret. // It should have deleted the expired secret.
r.ElementsMatch( r.ElementsMatch(
@ -381,3 +393,23 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
}) })
}, spec.Parallel(), spec.Report(report.Terminal{})) }, spec.Parallel(), spec.Report(report.Terminal{}))
} }
type testQueue struct {
t *testing.T
called bool
key controllerlib.Key
duration time.Duration
controllerlib.Queue // panic if any other methods called
}
func (q *testQueue) AddAfter(key controllerlib.Key, duration time.Duration) {
q.t.Helper()
require.False(q.t, q.called, "AddAfter should only be called once")
q.called = true
q.key = key
q.duration = duration
}

View File

@ -48,10 +48,15 @@ func TestStorageGarbageCollection(t *testing.T) {
// in the same namespace just to get the controller to respond faster. // in the same namespace just to get the controller to respond faster.
// This is just a performance optimization to make this test pass faster because otherwise // This is just a performance optimization to make this test pass faster because otherwise
// this test has to wait ~3 minutes for the controller's next full-resync. // this test has to wait ~3 minutes for the controller's next full-resync.
stopCh := make(chan bool, 1) // It is important that this channel be buffered. stopCh := make(chan struct{})
go updateSecretEveryTwoSeconds(t, stopCh, secrets, secretNotYetExpired) errCh := make(chan error)
go updateSecretEveryTwoSeconds(stopCh, errCh, secrets, secretNotYetExpired)
t.Cleanup(func() { t.Cleanup(func() {
stopCh <- true close(stopCh)
if updateErr := <-errCh; updateErr != nil {
panic(updateErr)
}
}) })
// Wait long enough for the next periodic sweep of the GC controller for the secrets to be deleted, which // Wait long enough for the next periodic sweep of the GC controller for the secrets to be deleted, which
@ -69,10 +74,15 @@ func TestStorageGarbageCollection(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func updateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1client.SecretInterface, secret *v1.Secret) { func updateSecretEveryTwoSeconds(stopCh chan struct{}, errCh chan error, secrets corev1client.SecretInterface, secret *v1.Secret) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel() defer cancel()
var updateErr error
defer func() {
errCh <- updateErr
}()
i := 0 i := 0
for { for {
select { select {
@ -87,9 +97,25 @@ func updateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1c
i++ i++
secret.Data["foo"] = []byte(fmt.Sprintf("bar-%d", i)) secret.Data["foo"] = []byte(fmt.Sprintf("bar-%d", i))
var updateErr error
secret, updateErr = secrets.Update(ctx, secret, metav1.UpdateOptions{}) secret, updateErr = secrets.Update(ctx, secret, metav1.UpdateOptions{})
require.NoError(t, updateErr)
switch {
case updateErr == nil:
// continue to next update
case k8serrors.IsConflict(updateErr), k8serrors.IsNotFound(updateErr):
select {
case _, ok := <-stopCh:
if !ok { // stopCh is closed meaning that test is already finished so these errors are expected
updateErr = nil
}
default:
}
return // even if the error is expected, we must stop
default:
return // unexpected error
}
} }
} }