Merge pull request #198 from enj/enj/i/multi_api_service
Prevent multiple pinnipeds from thrashing on the API service
This commit is contained in:
commit
8859172025
@ -8,11 +8,11 @@ import (
|
|||||||
|
|
||||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||||
"k8s.io/klog/v2"
|
|
||||||
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
||||||
|
|
||||||
pinnipedcontroller "go.pinniped.dev/internal/controller"
|
pinnipedcontroller "go.pinniped.dev/internal/controller"
|
||||||
"go.pinniped.dev/internal/controllerlib"
|
"go.pinniped.dev/internal/controllerlib"
|
||||||
|
"go.pinniped.dev/internal/plog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type apiServiceUpdaterController struct {
|
type apiServiceUpdaterController struct {
|
||||||
@ -59,15 +59,15 @@ func (c *apiServiceUpdaterController) Sync(ctx controllerlib.Context) error {
|
|||||||
}
|
}
|
||||||
if notFound {
|
if notFound {
|
||||||
// The secret does not exist yet, so nothing to do.
|
// The secret does not exist yet, so nothing to do.
|
||||||
klog.Info("apiServiceUpdaterController Sync found that the secret does not exist yet or was deleted")
|
plog.Info("apiServiceUpdaterController Sync found that the secret does not exist yet or was deleted")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the APIService to give it the new CA bundle.
|
// Update the APIService to give it the new CA bundle.
|
||||||
if err := UpdateAPIService(ctx.Context, c.aggregatorClient, c.apiServiceName, certSecret.Data[caCertificateSecretKey]); err != nil {
|
if err := UpdateAPIService(ctx.Context, c.aggregatorClient, c.apiServiceName, c.namespace, certSecret.Data[caCertificateSecretKey]); err != nil {
|
||||||
return fmt.Errorf("could not update the API service: %w", err)
|
return fmt.Errorf("could not update the API service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.Info("apiServiceUpdaterController Sync successfully updated API service")
|
plog.Debug("apiServiceUpdaterController Sync complete")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -276,6 +276,36 @@ func TestAPIServiceUpdaterControllerSync(t *testing.T) {
|
|||||||
r.Regexp("could not get existing version of API service: .* not found", err.Error())
|
r.Regexp("could not get existing version of API service: .* not found", err.Error())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
when("the APIService exists for another pinniped instance", func() {
|
||||||
|
it.Before(func() {
|
||||||
|
apiService := &apiregistrationv1.APIService{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: loginv1alpha1.SchemeGroupVersion.Version + "." + loginv1alpha1.GroupName,
|
||||||
|
},
|
||||||
|
Spec: apiregistrationv1.APIServiceSpec{
|
||||||
|
CABundle: nil,
|
||||||
|
VersionPriority: 1234,
|
||||||
|
|
||||||
|
Service: &apiregistrationv1.ServiceReference{
|
||||||
|
Namespace: installedInNamespace + "-not",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := aggregatorAPIClient.Tracker().Add(apiService)
|
||||||
|
r.NoError(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("does not update the APIService", func() {
|
||||||
|
startInformersAndController()
|
||||||
|
err := controllerlib.TestSync(t, subject, *syncContext)
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
// make sure we get the API service and decide to leave it alone
|
||||||
|
r.Len(aggregatorAPIClient.Actions(), 1)
|
||||||
|
r.Equal("get", aggregatorAPIClient.Actions()[0].GetVerb())
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}, spec.Parallel(), spec.Report(report.Terminal{}))
|
}, spec.Parallel(), spec.Report(report.Terminal{}))
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// UpdateAPIService updates the APIService's CA bundle.
|
// UpdateAPIService updates the APIService's CA bundle.
|
||||||
func UpdateAPIService(ctx context.Context, aggregatorClient aggregatorclient.Interface, apiServiceName string, aggregatedAPIServerCA []byte) error {
|
func UpdateAPIService(ctx context.Context, aggregatorClient aggregatorclient.Interface, apiServiceName, serviceNamespace string, aggregatedAPIServerCA []byte) error {
|
||||||
apiServices := aggregatorClient.ApiregistrationV1().APIServices()
|
apiServices := aggregatorClient.ApiregistrationV1().APIServices()
|
||||||
|
|
||||||
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||||
@ -24,6 +24,13 @@ func UpdateAPIService(ctx context.Context, aggregatorClient aggregatorclient.Int
|
|||||||
return fmt.Errorf("could not get existing version of API service: %w", err)
|
return fmt.Errorf("could not get existing version of API service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if serviceRef := fetchedAPIService.Spec.Service; serviceRef != nil {
|
||||||
|
if serviceRef.Namespace != serviceNamespace {
|
||||||
|
// we do not own this API service so do not attempt to mutate it
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if bytes.Equal(fetchedAPIService.Spec.CABundle, aggregatedAPIServerCA) {
|
if bytes.Equal(fetchedAPIService.Spec.CABundle, aggregatedAPIServerCA) {
|
||||||
// Already has the same value, perhaps because another process already updated the object, so no need to update.
|
// Already has the same value, perhaps because another process already updated the object, so no need to update.
|
||||||
return nil
|
return nil
|
||||||
|
@ -27,6 +27,7 @@ func TestUpdateAPIService(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
mocks func(*aggregatorv1fake.Clientset)
|
mocks func(*aggregatorv1fake.Clientset)
|
||||||
caInput []byte
|
caInput []byte
|
||||||
|
serviceNamespace string
|
||||||
wantObjects []apiregistrationv1.APIService
|
wantObjects []apiregistrationv1.APIService
|
||||||
wantErr string
|
wantErr string
|
||||||
}{
|
}{
|
||||||
@ -93,6 +94,36 @@ func TestUpdateAPIService(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}},
|
}},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "skip update when there is another pinniped instance",
|
||||||
|
mocks: func(c *aggregatorv1fake.Clientset) {
|
||||||
|
_ = c.Tracker().Add(&apiregistrationv1.APIService{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
|
||||||
|
Spec: apiregistrationv1.APIServiceSpec{
|
||||||
|
GroupPriorityMinimum: 999,
|
||||||
|
CABundle: []byte("some-other-different-ca-bundle"),
|
||||||
|
Service: &apiregistrationv1.ServiceReference{
|
||||||
|
Namespace: "namespace-2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
c.PrependReactor("update", "apiservices", func(_ kubetesting.Action) (bool, runtime.Object, error) {
|
||||||
|
return true, nil, fmt.Errorf("should not encounter this error because update should be skipped in this case")
|
||||||
|
})
|
||||||
|
},
|
||||||
|
caInput: []byte("some-ca-bundle"),
|
||||||
|
serviceNamespace: "namespace-1",
|
||||||
|
wantObjects: []apiregistrationv1.APIService{{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
|
||||||
|
Spec: apiregistrationv1.APIServiceSpec{
|
||||||
|
GroupPriorityMinimum: 999,
|
||||||
|
CABundle: []byte("some-other-different-ca-bundle"), // unchanged
|
||||||
|
Service: &apiregistrationv1.ServiceReference{
|
||||||
|
Namespace: "namespace-2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "error on update",
|
name: "error on update",
|
||||||
mocks: func(c *aggregatorv1fake.Clientset) {
|
mocks: func(c *aggregatorv1fake.Clientset) {
|
||||||
@ -181,7 +212,7 @@ func TestUpdateAPIService(t *testing.T) {
|
|||||||
tt.mocks(client)
|
tt.mocks(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := UpdateAPIService(ctx, client, loginv1alpha1.SchemeGroupVersion.Version+"."+loginv1alpha1.GroupName, tt.caInput)
|
err := UpdateAPIService(ctx, client, loginv1alpha1.SchemeGroupVersion.Version+"."+loginv1alpha1.GroupName, tt.serviceNamespace, tt.caInput)
|
||||||
if tt.wantErr != "" {
|
if tt.wantErr != "" {
|
||||||
require.EqualError(t, err, tt.wantErr)
|
require.EqualError(t, err, tt.wantErr)
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user