Prevent multiple pinnipeds from thrashing on the API service

Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
Monis Khan 2020-11-11 17:49:13 -05:00
parent 300d522eb0
commit 9c8b081906
No known key found for this signature in database
GPG Key ID: 52C90ADA01B269B8
4 changed files with 79 additions and 11 deletions

View File

@ -8,11 +8,11 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1" corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/klog/v2"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
pinnipedcontroller "go.pinniped.dev/internal/controller" pinnipedcontroller "go.pinniped.dev/internal/controller"
"go.pinniped.dev/internal/controllerlib" "go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/plog"
) )
type apiServiceUpdaterController struct { type apiServiceUpdaterController struct {
@ -59,15 +59,15 @@ func (c *apiServiceUpdaterController) Sync(ctx controllerlib.Context) error {
} }
if notFound { if notFound {
// The secret does not exist yet, so nothing to do. // The secret does not exist yet, so nothing to do.
klog.Info("apiServiceUpdaterController Sync found that the secret does not exist yet or was deleted") plog.Info("apiServiceUpdaterController Sync found that the secret does not exist yet or was deleted")
return nil return nil
} }
// Update the APIService to give it the new CA bundle. // Update the APIService to give it the new CA bundle.
if err := UpdateAPIService(ctx.Context, c.aggregatorClient, c.apiServiceName, certSecret.Data[caCertificateSecretKey]); err != nil { if err := UpdateAPIService(ctx.Context, c.aggregatorClient, c.apiServiceName, c.namespace, certSecret.Data[caCertificateSecretKey]); err != nil {
return fmt.Errorf("could not update the API service: %w", err) return fmt.Errorf("could not update the API service: %w", err)
} }
klog.Info("apiServiceUpdaterController Sync successfully updated API service") plog.Debug("apiServiceUpdaterController Sync complete")
return nil return nil
} }

View File

@ -276,6 +276,36 @@ func TestAPIServiceUpdaterControllerSync(t *testing.T) {
r.Regexp("could not get existing version of API service: .* not found", err.Error()) r.Regexp("could not get existing version of API service: .* not found", err.Error())
}) })
}) })
when("the APIService exists for another pinniped instance", func() {
it.Before(func() {
apiService := &apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: loginv1alpha1.SchemeGroupVersion.Version + "." + loginv1alpha1.GroupName,
},
Spec: apiregistrationv1.APIServiceSpec{
CABundle: nil,
VersionPriority: 1234,
Service: &apiregistrationv1.ServiceReference{
Namespace: installedInNamespace + "-not",
},
},
}
err := aggregatorAPIClient.Tracker().Add(apiService)
r.NoError(err)
})
it("does not update the APIService", func() {
startInformersAndController()
err := controllerlib.TestSync(t, subject, *syncContext)
r.NoError(err)
// make sure we get the API service and decide to leave it alone
r.Len(aggregatorAPIClient.Actions(), 1)
r.Equal("get", aggregatorAPIClient.Actions()[0].GetVerb())
})
})
}) })
}, spec.Parallel(), spec.Report(report.Terminal{})) }, spec.Parallel(), spec.Report(report.Terminal{}))
} }

View File

@ -14,7 +14,7 @@ import (
) )
// UpdateAPIService updates the APIService's CA bundle. // UpdateAPIService updates the APIService's CA bundle.
func UpdateAPIService(ctx context.Context, aggregatorClient aggregatorclient.Interface, apiServiceName string, aggregatedAPIServerCA []byte) error { func UpdateAPIService(ctx context.Context, aggregatorClient aggregatorclient.Interface, apiServiceName, serviceNamespace string, aggregatedAPIServerCA []byte) error {
apiServices := aggregatorClient.ApiregistrationV1().APIServices() apiServices := aggregatorClient.ApiregistrationV1().APIServices()
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
@ -24,6 +24,13 @@ func UpdateAPIService(ctx context.Context, aggregatorClient aggregatorclient.Int
return fmt.Errorf("could not get existing version of API service: %w", err) return fmt.Errorf("could not get existing version of API service: %w", err)
} }
if serviceRef := fetchedAPIService.Spec.Service; serviceRef != nil {
if serviceRef.Namespace != serviceNamespace {
// we do not own this API service so do not attempt to mutate it
return nil
}
}
if bytes.Equal(fetchedAPIService.Spec.CABundle, aggregatedAPIServerCA) { if bytes.Equal(fetchedAPIService.Spec.CABundle, aggregatedAPIServerCA) {
// Already has the same value, perhaps because another process already updated the object, so no need to update. // Already has the same value, perhaps because another process already updated the object, so no need to update.
return nil return nil

View File

@ -24,11 +24,12 @@ func TestUpdateAPIService(t *testing.T) {
const apiServiceName = "v1alpha1.login.concierge.pinniped.dev" const apiServiceName = "v1alpha1.login.concierge.pinniped.dev"
tests := []struct { tests := []struct {
name string name string
mocks func(*aggregatorv1fake.Clientset) mocks func(*aggregatorv1fake.Clientset)
caInput []byte caInput []byte
wantObjects []apiregistrationv1.APIService serviceNamespace string
wantErr string wantObjects []apiregistrationv1.APIService
wantErr string
}{ }{
{ {
name: "happy path update when the pre-existing APIService did not already have a CA bundle", name: "happy path update when the pre-existing APIService did not already have a CA bundle",
@ -93,6 +94,36 @@ func TestUpdateAPIService(t *testing.T) {
}, },
}}, }},
}, },
{
name: "skip update when there is another pinniped instance",
mocks: func(c *aggregatorv1fake.Clientset) {
_ = c.Tracker().Add(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
Spec: apiregistrationv1.APIServiceSpec{
GroupPriorityMinimum: 999,
CABundle: []byte("some-other-different-ca-bundle"),
Service: &apiregistrationv1.ServiceReference{
Namespace: "namespace-2",
},
},
})
c.PrependReactor("update", "apiservices", func(_ kubetesting.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("should not encounter this error because update should be skipped in this case")
})
},
caInput: []byte("some-ca-bundle"),
serviceNamespace: "namespace-1",
wantObjects: []apiregistrationv1.APIService{{
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
Spec: apiregistrationv1.APIServiceSpec{
GroupPriorityMinimum: 999,
CABundle: []byte("some-other-different-ca-bundle"), // unchanged
Service: &apiregistrationv1.ServiceReference{
Namespace: "namespace-2",
},
},
}},
},
{ {
name: "error on update", name: "error on update",
mocks: func(c *aggregatorv1fake.Clientset) { mocks: func(c *aggregatorv1fake.Clientset) {
@ -181,7 +212,7 @@ func TestUpdateAPIService(t *testing.T) {
tt.mocks(client) tt.mocks(client)
} }
err := UpdateAPIService(ctx, client, loginv1alpha1.SchemeGroupVersion.Version+"."+loginv1alpha1.GroupName, tt.caInput) err := UpdateAPIService(ctx, client, loginv1alpha1.SchemeGroupVersion.Version+"."+loginv1alpha1.GroupName, tt.serviceNamespace, tt.caInput)
if tt.wantErr != "" { if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr) require.EqualError(t, err, tt.wantErr)
return return