Supervisor storage garbage collection controller enabled in production

- Also add more log statements to the controller
- Also have the controller apply a rate limit to itself, to avoid
  having a very chatty controller that runs way more often than is
  needed.
- Also add an integration test for the controller's behavior.

Signed-off-by: Margo Crawford <margaretc@vmware.com>
This commit is contained in:
Ryan Richard 2020-12-11 15:21:34 -08:00 committed by Margo Crawford
parent ed9b3ffce5
commit baa1a4a2fc
7 changed files with 256 additions and 34 deletions

View File

@ -29,6 +29,7 @@ import (
"go.pinniped.dev/internal/config/supervisor"
"go.pinniped.dev/internal/controller/supervisorconfig"
"go.pinniped.dev/internal/controller/supervisorconfig/upstreamwatcher"
"go.pinniped.dev/internal/controller/supervisorstorage"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/downward"
"go.pinniped.dev/internal/oidc/jwks"
@ -84,6 +85,15 @@ func startControllers(
// Create controller manager.
controllerManager := controllerlib.
NewManager().
WithController(
supervisorstorage.GarbageCollectorController(
clock.RealClock{},
kubeClient,
kubeInformers.Core().V1().Secrets(),
controllerlib.WithInformer,
),
singletonWorker,
).
WithController(
supervisorconfig.NewOIDCProviderWatcherController(
issuerManager,

View File

@ -6,8 +6,10 @@ package supervisorstorage
import (
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
@ -17,12 +19,17 @@ import (
"go.pinniped.dev/internal/plog"
)
const minimumRepeatInterval = 30 * time.Second
type garbageCollectorController struct {
secretInformer corev1informers.SecretInformer
kubeClient kubernetes.Interface
secretInformer corev1informers.SecretInformer
kubeClient kubernetes.Interface
clock clock.Clock
timeOfMostRecentSweep time.Time
}
func GarbageCollectorController(
clock clock.Clock,
kubeClient kubernetes.Interface,
secretInformer corev1informers.SecretInformer,
withInformer pinnipedcontroller.WithInformerOptionFunc,
@ -33,39 +40,66 @@ func GarbageCollectorController(
Syncer: &garbageCollectorController{
secretInformer: secretInformer,
kubeClient: kubeClient,
clock: clock,
},
},
withInformer(
secretInformer,
pinnipedcontroller.MatchNothingFilter(nil),
pinnipedcontroller.MatchAnythingFilter(nil),
controllerlib.InformerOption{},
),
)
}
func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error {
// The Sync method is triggered upon any change to any Secret, which would make this
// controller too chatty, so it rate limits itself to a more reasonable interval.
// Note that even during a period when no secrets are changing, it will still run
// at the informer's full-resync interval (as long as there are some secrets).
if c.clock.Now().Sub(c.timeOfMostRecentSweep) < minimumRepeatInterval {
return nil
}
plog.Info("starting storage garbage collection sweep")
c.timeOfMostRecentSweep = c.clock.Now()
listOfSecrets, err := c.secretInformer.Lister().List(labels.Everything())
if err != nil {
return err
}
for i := range listOfSecrets {
secret := listOfSecrets[i]
s, ok := secret.Annotations[crud.SecretLifetimeAnnotationKey]
timeString, ok := secret.Annotations[crud.SecretLifetimeAnnotationKey]
if !ok {
continue
}
currentTime := time.Now()
garbageCollectAfterTime, err := time.Parse(time.RFC3339, s)
garbageCollectAfterTime, err := time.Parse(crud.SecretLifetimeAnnotationDateFormat, timeString)
if err != nil {
plog.WarningErr("could not parse for garbage collection", err, "secretName", secret.Name, "garbageCollectAfter", s)
plog.WarningErr("could not parse resource timestamp for garbage collection", err, logKV(secret))
continue
}
if garbageCollectAfterTime.Before(currentTime) {
if garbageCollectAfterTime.Before(c.clock.Now()) {
err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{})
if err != nil {
plog.WarningErr("failed to garbage collect value", err, "secretName", secret.Name, "garbageCollectAfter", s)
plog.WarningErr("failed to garbage collect resource", err, logKV(secret))
continue
}
plog.Info("storage garbage collector deleted resource", logKV(secret))
}
}
return nil
}
func logKV(secret *v1.Secret) []interface{} {
return []interface{}{
"secretName", secret.Name,
"secretNamespace", secret.Namespace,
"secretType", string(secret.Type),
"garbageCollectAfter", secret.Annotations[crud.SecretLifetimeAnnotationKey],
}
}

View File

@ -16,6 +16,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
kubeinformers "k8s.io/client-go/informers"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
kubetesting "k8s.io/client-go/testing"
@ -37,6 +38,7 @@ func TestGarbageCollectorControllerInformerFilters(t *testing.T) {
observableWithInformerOption = testutil.NewObservableWithInformerOption()
secretsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Secrets()
_ = GarbageCollectorController(
clock.RealClock{},
nil,
secretsInformer,
observableWithInformerOption.WithInformer, // make it possible to observe the behavior of the Filters
@ -57,11 +59,11 @@ func TestGarbageCollectorControllerInformerFilters(t *testing.T) {
})
when("any Secret changes", func() {
it("returns false to avoid triggering the sync function", func() {
r.False(subject.Add(secret))
r.False(subject.Update(secret, otherSecret))
r.False(subject.Update(otherSecret, secret))
r.False(subject.Delete(secret))
it("returns true to trigger the sync function for all secrets", func() {
r.True(subject.Add(secret))
r.True(subject.Update(secret, otherSecret))
r.True(subject.Update(otherSecret, secret))
r.True(subject.Delete(secret))
})
})
})
@ -75,10 +77,6 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Resource: "secrets",
}
firstExpiredTime := time.Date(1900, time.January, 1, 1, 0, 0, 0, time.UTC).Format(time.RFC3339)
secondExpiredTime := time.Date(1901, time.January, 1, 1, 0, 0, 0, time.UTC).Format(time.RFC3339)
unexpiredTime := time.Now().Add(time.Hour * 24).UTC().Format(time.RFC3339)
spec.Run(t, "Sync", func(t *testing.T, when spec.G, it spec.S) {
const (
installedInNamespace = "some-namespace"
@ -93,6 +91,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
timeoutContext context.Context
timeoutContextCancel context.CancelFunc
syncContext *controllerlib.Context
fakeClock *clock.FakeClock
frozenNow time.Time
)
// Defer starting the informers until the last possible moment so that the
@ -100,6 +100,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
var startInformersAndController = func() {
// Set this at the last second to allow for injection of server override.
subject = GarbageCollectorController(
fakeClock,
kubeClient,
kubeInformers.Core().V1().Secrets(),
controllerlib.WithInformer,
@ -128,6 +129,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
kubeInformerClient = kubernetesfake.NewSimpleClientset()
kubeClient = kubernetesfake.NewSimpleClientset()
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
frozenNow = time.Now().UTC()
fakeClock = clock.NewFakeClock(frozenNow)
unrelatedSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
@ -163,7 +166,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Name: "first expired secret",
Namespace: installedInNamespace,
Annotations: map[string]string{
"storage.pinniped.dev/garbage-collect-after": firstExpiredTime,
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339),
},
},
}
@ -174,7 +177,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Name: "second expired secret",
Namespace: installedInNamespace,
Annotations: map[string]string{
"storage.pinniped.dev/garbage-collect-after": secondExpiredTime,
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-2 * time.Second).Format(time.RFC3339),
},
},
}
@ -185,7 +188,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Name: "unexpired secret",
Namespace: installedInNamespace,
Annotations: map[string]string{
"storage.pinniped.dev/garbage-collect-after": unexpiredTime,
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(time.Second).Format(time.RFC3339),
},
},
}
@ -211,6 +214,54 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
})
})
when("very little time has passed since the previous sync call", func() {
it.Before(func() {
// Add a secret that will expire in 20 seconds.
expiredSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "expired secret",
Namespace: installedInNamespace,
Annotations: map[string]string{
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(20 * time.Second).Format(time.RFC3339),
},
},
}
r.NoError(kubeInformerClient.Tracker().Add(expiredSecret))
r.NoError(kubeClient.Tracker().Add(expiredSecret))
})
it("should do nothing to avoid being super chatty since it is called for every change to any Secret, until more time has passed", func() {
startInformersAndController()
require.Empty(t, kubeClient.Actions())
// Run sync once with the current time set to frozenTime.
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
require.Empty(t, kubeClient.Actions())
// Run sync again when not enough time has passed since the most recent run, so no delete
// operations should happen even though there is a expired secret now.
fakeClock.Step(29 * time.Second)
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
require.Empty(t, kubeClient.Actions())
// Step to the exact threshold and run Sync again. Now we are past the rate limiting period.
fakeClock.Step(1*time.Second + 1*time.Millisecond)
r.NoError(controllerlib.TestSync(t, subject, *syncContext))
// It should have deleted the expired secret.
r.ElementsMatch(
[]kubetesting.Action{
kubetesting.NewDeleteAction(secretsGVR, installedInNamespace, "expired secret"),
},
kubeClient.Actions(),
)
list, err := kubeClient.CoreV1().Secrets(installedInNamespace).List(context.Background(), metav1.ListOptions{})
r.NoError(err)
r.Len(list.Items, 1)
r.Equal("some other unrelated secret", list.Items[0].Name)
})
})
when("there is a secret with a malformed garbage-collect-after date", func() {
it.Before(func() {
malformedSecret := &corev1.Secret{
@ -229,7 +280,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Name: "expired secret",
Namespace: installedInNamespace,
Annotations: map[string]string{
"storage.pinniped.dev/garbage-collect-after": firstExpiredTime,
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339),
},
},
}
@ -254,14 +305,14 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
})
})
when("the delete call fails", func() {
when("the kube API delete call fails", func() {
it.Before(func() {
erroringSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "erroring secret",
Namespace: installedInNamespace,
Annotations: map[string]string{
"storage.pinniped.dev/garbage-collect-after": firstExpiredTime,
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339),
},
},
}
@ -278,7 +329,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
Name: "expired secret",
Namespace: installedInNamespace,
Annotations: map[string]string{
"storage.pinniped.dev/garbage-collect-after": firstExpiredTime,
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339),
},
},
}
@ -286,7 +337,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
r.NoError(kubeClient.Tracker().Add(expiredSecret))
})
it("continues on to delete the next one", func() {
it("ignores the error and continues on to delete the next expired Secret", func() {
startInformersAndController()
r.NoError(controllerlib.TestSync(t, subject, *syncContext))

View File

@ -22,11 +22,6 @@ func MatchAnythingFilter(parentFunc controllerlib.ParentFunc) controllerlib.Filt
return SimpleFilter(func(object metav1.Object) bool { return true }, parentFunc)
}
// MatchNothingFilter returns a controllerlib.Filter that allows no objects.
func MatchNothingFilter(parentFunc controllerlib.ParentFunc) controllerlib.Filter {
return SimpleFilter(func(object metav1.Object) bool { return false }, parentFunc)
}
// SimpleFilter takes a single boolean match function on a metav1.Object and wraps it into a proper controllerlib.Filter.
func SimpleFilter(match func(metav1.Object) bool, parentFunc controllerlib.ParentFunc) controllerlib.Filter {
return controllerlib.FilterFuncs{

View File

@ -23,8 +23,10 @@ import (
//nolint:gosec // ignore lint warnings that these are credentials
const (
SecretLabelKey = "storage.pinniped.dev/type"
SecretLifetimeAnnotationKey = "storage.pinniped.dev/garbage-collect-after"
SecretLabelKey = "storage.pinniped.dev/type"
SecretLifetimeAnnotationKey = "storage.pinniped.dev/garbage-collect-after"
SecretLifetimeAnnotationDateFormat = time.RFC3339
secretNameFormat = "pinniped-storage-%s-%s"
secretTypeFormat = "storage.pinniped.dev/%s"
@ -178,7 +180,7 @@ func (s *secretsStorage) toSecret(signature, resourceVersion string, data JSON,
ResourceVersion: resourceVersion,
Labels: labelsToAdd,
Annotations: map[string]string{
SecretLifetimeAnnotationKey: s.clock().Add(s.lifetime).UTC().Format(time.RFC3339),
SecretLifetimeAnnotationKey: s.clock().Add(s.lifetime).UTC().Format(SecretLifetimeAnnotationDateFormat),
},
OwnerReferences: nil,
},

View File

@ -0,0 +1,130 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package integration
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"go.pinniped.dev/internal/crud"
"go.pinniped.dev/test/library"
)
func TestStorageGarbageCollection(t *testing.T) {
// Run this test in parallel with the other integration tests because it does a lot of waiting
// and will not impact other tests, or be impacted by other tests, when run in parallel.
t.Parallel()
env := library.IntegrationEnv(t)
client := library.NewClientset(t)
secrets := client.CoreV1().Secrets(env.SupervisorNamespace)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
secretAlreadyExpired := createSecret(ctx, t, secrets, "past", time.Now().Add(-time.Second))
secretWhichWillExpireBeforeTheTestEnds := createSecret(ctx, t, secrets, "near-future", time.Now().Add(30*time.Second))
secretNotYetExpired := createSecret(ctx, t, secrets, "far-future", time.Now().Add(10*time.Minute))
var err error
secretIsNotFound := func(secretName string) func() bool {
return func() bool {
_, err = secrets.Get(ctx, secretName, metav1.GetOptions{})
return k8serrors.IsNotFound(err)
}
}
// Start a background goroutine which will end as soon as the test ends.
// Keep updating a secret in the same namespace just to get the controller to respond faster.
// This is just a performance optimization because otherwise this test has to wait
// ~3 minutes for the controller's next full-resync.
stopCh := make(chan bool, 1) // It is important that this channel be buffered.
go createAndUpdateSecretEveryTwoSeconds(t, stopCh, secrets)
t.Cleanup(func() {
stopCh <- true
})
// Wait long enough for the next periodic sweep of the GC controller for the secrets to be deleted, which
// is the worst-case length of time that we should ever need to wait. Because of the goroutine above,
// in practice we should only need to wait about 30 seconds, which is the GC controller's self-imposed
// rate throttling time period.
slightlyLongerThanGCControllerFullResyncPeriod := 3*time.Minute + 30*time.Second
assert.Eventually(t, secretIsNotFound(secretAlreadyExpired.Name), slightlyLongerThanGCControllerFullResyncPeriod, 250*time.Millisecond)
require.Truef(t, k8serrors.IsNotFound(err), "wanted a NotFound error but got %v", err) // prints out the error and stops the test in case of failure
assert.Eventually(t, secretIsNotFound(secretWhichWillExpireBeforeTheTestEnds.Name), slightlyLongerThanGCControllerFullResyncPeriod, 250*time.Millisecond)
require.Truef(t, k8serrors.IsNotFound(err), "wanted a NotFound error but got %v", err) // prints out the error and stops the test in case of failure
// The unexpired secret should not have been deleted within the timeframe of this test run.
_, err = secrets.Get(ctx, secretNotYetExpired.Name, metav1.GetOptions{})
require.NoError(t, err)
}
func createAndUpdateSecretEveryTwoSeconds(t *testing.T, stopCh chan bool, secrets corev1client.SecretInterface) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
unrelatedSecret := createSecret(ctx, t, secrets, "unrelated-to-gc", time.Time{})
i := 0
for {
select {
case <-stopCh:
// Got a signal, so stop running.
return
default:
// Channel had no message, so keep running.
}
time.Sleep(2 * time.Second)
i++
unrelatedSecret.Data["foo"] = []byte(fmt.Sprintf("bar-%d", i))
var updateErr error
unrelatedSecret, updateErr = secrets.Update(ctx, unrelatedSecret, metav1.UpdateOptions{})
require.NoError(t, updateErr)
}
}
func createSecret(ctx context.Context, t *testing.T, secrets corev1client.SecretInterface, name string, expiresAt time.Time) *v1.Secret {
secret, err := secrets.Create(ctx, newSecret("pinniped-storage-gc-integration-test-"+name+"-", expiresAt), metav1.CreateOptions{})
require.NoError(t, err)
// Make sure the Secret is deleted when the test ends.
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := secrets.Delete(ctx, secret.Name, metav1.DeleteOptions{})
notFound := k8serrors.IsNotFound(err)
if !notFound {
// it's okay if the Secret was already deleted, but other errors are cleanup failures
require.NoError(t, err)
}
})
return secret
}
func newSecret(namePrefix string, expiresAt time.Time) *v1.Secret {
annotations := map[string]string{}
if !expiresAt.Equal(time.Time{}) {
// Mark the secret for garbage collection.
annotations[crud.SecretLifetimeAnnotationKey] = expiresAt.UTC().Format(time.RFC3339)
}
return &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
GenerateName: namePrefix,
Annotations: annotations,
},
Data: map[string][]byte{"some-key": []byte("fake-data")},
Type: "storage.pinniped.dev/gc-test-integration-test",
}
}