Add tests for the new cert controllers and some other small refactorings
- Add a unit test for each cert controller - Make DynamicTLSServingCertProvider an interface and use a mutex internally - Create a shared ToPEM function instead of having two very similar functions - Move the ObservableWithInformerOption test helper to testutils - Rename some variables and imports
This commit is contained in:
parent
86c3f89b2e
commit
cc9ae23a0c
@ -17,7 +17,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
kubetesting "k8s.io/client-go/testing"
|
||||
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
aggregationv1fake "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
|
||||
aggregatorv1fake "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
|
||||
)
|
||||
|
||||
func TestUpdateAPIService(t *testing.T) {
|
||||
@ -25,14 +25,14 @@ func TestUpdateAPIService(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mocks func(*aggregationv1fake.Clientset)
|
||||
mocks func(*aggregatorv1fake.Clientset)
|
||||
caInput []byte
|
||||
wantObjects []apiregistrationv1.APIService
|
||||
wantErr string
|
||||
}{
|
||||
{
|
||||
name: "happy path update when the pre-existing APIService did not already have a CA bundle",
|
||||
mocks: func(c *aggregationv1fake.Clientset) {
|
||||
mocks: func(c *aggregatorv1fake.Clientset) {
|
||||
_ = c.Tracker().Add(&apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
|
||||
Spec: apiregistrationv1.APIServiceSpec{
|
||||
@ -52,7 +52,7 @@ func TestUpdateAPIService(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "happy path update when the pre-existing APIService already had a CA bundle",
|
||||
mocks: func(c *aggregationv1fake.Clientset) {
|
||||
mocks: func(c *aggregatorv1fake.Clientset) {
|
||||
_ = c.Tracker().Add(&apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
|
||||
Spec: apiregistrationv1.APIServiceSpec{
|
||||
@ -72,7 +72,7 @@ func TestUpdateAPIService(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "error on update",
|
||||
mocks: func(c *aggregationv1fake.Clientset) {
|
||||
mocks: func(c *aggregatorv1fake.Clientset) {
|
||||
_ = c.Tracker().Add(&apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
|
||||
Spec: apiregistrationv1.APIServiceSpec{},
|
||||
@ -85,7 +85,7 @@ func TestUpdateAPIService(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "error on get",
|
||||
mocks: func(c *aggregationv1fake.Clientset) {
|
||||
mocks: func(c *aggregatorv1fake.Clientset) {
|
||||
_ = c.Tracker().Add(&apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
|
||||
Spec: apiregistrationv1.APIServiceSpec{},
|
||||
@ -99,7 +99,7 @@ func TestUpdateAPIService(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "conflict error on update, followed by successful retry",
|
||||
mocks: func(c *aggregationv1fake.Clientset) {
|
||||
mocks: func(c *aggregatorv1fake.Clientset) {
|
||||
_ = c.Tracker().Add(&apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
|
||||
Spec: apiregistrationv1.APIServiceSpec{
|
||||
@ -148,7 +148,7 @@ func TestUpdateAPIService(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
client := aggregationv1fake.NewSimpleClientset()
|
||||
client := aggregatorv1fake.NewSimpleClientset()
|
||||
if tt.mocks != nil {
|
||||
tt.mocks(client)
|
||||
}
|
||||
|
@ -195,6 +195,16 @@ func toPEM(cert *tls.Certificate, err error) ([]byte, []byte, error) {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
certPEM, keyPEM, err := ToPEM(cert)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return certPEM, keyPEM, nil
|
||||
}
|
||||
|
||||
// Encode a tls.Certificate into a private key PEM and a cert chain PEM.
|
||||
func ToPEM(cert *tls.Certificate) ([]byte, []byte, error) {
|
||||
// Encode the certificate(s) to PEM.
|
||||
certPEMBlocks := make([][]byte, 0, len(cert.Certificate))
|
||||
for _, c := range cert.Certificate {
|
||||
|
@ -6,10 +6,7 @@ SPDX-License-Identifier: Apache-2.0
|
||||
package apicerts
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -25,7 +22,6 @@ import (
|
||||
"github.com/suzerain-io/placeholder-name/internal/autoregistration"
|
||||
"github.com/suzerain-io/placeholder-name/internal/certauthority"
|
||||
placeholdernamecontroller "github.com/suzerain-io/placeholder-name/internal/controller"
|
||||
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name/kubernetes/1.19/api/apis/placeholder/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -38,28 +34,25 @@ const (
|
||||
|
||||
type certsManagerController struct {
|
||||
namespace string
|
||||
apiServiceName string
|
||||
k8sClient kubernetes.Interface
|
||||
aggregatorClient *aggregatorclient.Clientset
|
||||
aggregatorClient aggregatorclient.Interface
|
||||
secretInformer corev1informers.SecretInformer
|
||||
}
|
||||
|
||||
func NewCertsManagerController(
|
||||
namespace string,
|
||||
k8sClient kubernetes.Interface,
|
||||
aggregationClient *aggregatorclient.Clientset,
|
||||
aggregatorClient aggregatorclient.Interface,
|
||||
secretInformer corev1informers.SecretInformer,
|
||||
withInformer placeholdernamecontroller.WithInformerOptionFunc,
|
||||
) controller.Controller {
|
||||
apiServiceName := placeholderv1alpha1.SchemeGroupVersion.Version + "." + placeholderv1alpha1.GroupName
|
||||
return controller.New(
|
||||
controller.Config{
|
||||
Name: "certs-manager-controller",
|
||||
Syncer: &certsManagerController{
|
||||
apiServiceName: apiServiceName,
|
||||
namespace: namespace,
|
||||
k8sClient: k8sClient,
|
||||
aggregatorClient: aggregationClient,
|
||||
aggregatorClient: aggregatorClient,
|
||||
secretInformer: secretInformer,
|
||||
},
|
||||
},
|
||||
@ -108,7 +101,7 @@ func (c *certsManagerController) Sync(ctx controller.Context) error {
|
||||
}
|
||||
|
||||
// Write the CA's public key bundle and the serving certs to a secret.
|
||||
tlsPrivateKeyPEM, tlsCertChainPEM, err := pemEncode(aggregatedAPIServerTLSCert)
|
||||
tlsPrivateKeyPEM, tlsCertChainPEM, err := certauthority.ToPEM(aggregatedAPIServerTLSCert)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not PEM encode serving certificate: %w", err)
|
||||
}
|
||||
@ -137,28 +130,3 @@ func (c *certsManagerController) Sync(ctx controller.Context) error {
|
||||
klog.Info("certsManagerController Sync successfully created secret and updated API service")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Encode a tls.Certificate into a private key PEM and a cert chain PEM.
|
||||
func pemEncode(cert *tls.Certificate) ([]byte, []byte, error) {
|
||||
privateKeyDER, err := x509.MarshalPKCS8PrivateKey(cert.PrivateKey)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error marshalling private key: %w", err)
|
||||
}
|
||||
privateKeyPEM := pem.EncodeToMemory(&pem.Block{
|
||||
Type: "PRIVATE KEY",
|
||||
Headers: nil,
|
||||
Bytes: privateKeyDER,
|
||||
})
|
||||
|
||||
certChainPEM := make([]byte, 0)
|
||||
for _, certFromChain := range cert.Certificate {
|
||||
certPEMBytes := pem.EncodeToMemory(&pem.Block{
|
||||
Type: "CERTIFICATE",
|
||||
Headers: nil,
|
||||
Bytes: certFromChain,
|
||||
})
|
||||
certChainPEM = append(certChainPEM, certPEMBytes...)
|
||||
}
|
||||
|
||||
return privateKeyPEM, certChainPEM, nil
|
||||
}
|
||||
|
333
internal/controller/apicerts/certs_manager_test.go
Normal file
333
internal/controller/apicerts/certs_manager_test.go
Normal file
@ -0,0 +1,333 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package apicerts
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sclevine/spec"
|
||||
"github.com/sclevine/spec/report"
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
kubernetesfake "k8s.io/client-go/kubernetes/fake"
|
||||
coretesting "k8s.io/client-go/testing"
|
||||
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
aggregatorfake "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
|
||||
|
||||
"github.com/suzerain-io/controller-go"
|
||||
"github.com/suzerain-io/placeholder-name/internal/testutil"
|
||||
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name/kubernetes/1.19/api/apis/placeholder/v1alpha1"
|
||||
)
|
||||
|
||||
func TestManagerControllerInformerFilters(t *testing.T) {
|
||||
spec.Run(t, "informer filters", func(t *testing.T, when spec.G, it spec.S) {
|
||||
const installedInNamespace = "some-namespace"
|
||||
|
||||
var r *require.Assertions
|
||||
var observableWithInformerOption *testutil.ObservableWithInformerOption
|
||||
var secretsInformerFilter controller.Filter
|
||||
|
||||
it.Before(func() {
|
||||
r = require.New(t)
|
||||
observableWithInformerOption = testutil.NewObservableWithInformerOption()
|
||||
secretsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Secrets()
|
||||
_ = NewCertsManagerController(
|
||||
installedInNamespace,
|
||||
nil,
|
||||
nil,
|
||||
secretsInformer,
|
||||
observableWithInformerOption.WithInformer, // make it possible to observe the behavior of the Filters
|
||||
)
|
||||
secretsInformerFilter = observableWithInformerOption.GetFilterForInformer(secretsInformer)
|
||||
})
|
||||
|
||||
when("watching Secret objects", func() {
|
||||
var subject controller.Filter
|
||||
var target, wrongNamespace, wrongName, unrelated *corev1.Secret
|
||||
|
||||
it.Before(func() {
|
||||
subject = secretsInformerFilter
|
||||
target = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "api-serving-cert", Namespace: installedInNamespace}}
|
||||
wrongNamespace = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "api-serving-cert", Namespace: "wrong-namespace"}}
|
||||
wrongName = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "wrong-name", Namespace: installedInNamespace}}
|
||||
unrelated = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "wrong-name", Namespace: "wrong-namespace"}}
|
||||
})
|
||||
|
||||
when("the target Secret changes", func() {
|
||||
it("returns true to trigger the sync method", func() {
|
||||
r.True(subject.Add(target))
|
||||
r.True(subject.Update(target, unrelated))
|
||||
r.True(subject.Update(unrelated, target))
|
||||
r.True(subject.Delete(target))
|
||||
})
|
||||
})
|
||||
|
||||
when("a Secret from another namespace changes", func() {
|
||||
it("returns false to avoid triggering the sync method", func() {
|
||||
r.False(subject.Add(wrongNamespace))
|
||||
r.False(subject.Update(wrongNamespace, unrelated))
|
||||
r.False(subject.Update(unrelated, wrongNamespace))
|
||||
r.False(subject.Delete(wrongNamespace))
|
||||
})
|
||||
})
|
||||
|
||||
when("a Secret with a different name changes", func() {
|
||||
it("returns false to avoid triggering the sync method", func() {
|
||||
r.False(subject.Add(wrongName))
|
||||
r.False(subject.Update(wrongName, unrelated))
|
||||
r.False(subject.Update(unrelated, wrongName))
|
||||
r.False(subject.Delete(wrongName))
|
||||
})
|
||||
})
|
||||
|
||||
when("a Secret with a different name and a different namespace changes", func() {
|
||||
it("returns false to avoid triggering the sync method", func() {
|
||||
r.False(subject.Add(unrelated))
|
||||
r.False(subject.Update(unrelated, unrelated))
|
||||
r.False(subject.Delete(unrelated))
|
||||
})
|
||||
})
|
||||
})
|
||||
}, spec.Parallel(), spec.Report(report.Terminal{}))
|
||||
}
|
||||
|
||||
func TestManagerControllerSync(t *testing.T) {
|
||||
spec.Run(t, "Sync", func(t *testing.T, when spec.G, it spec.S) {
|
||||
const installedInNamespace = "some-namespace"
|
||||
|
||||
var r *require.Assertions
|
||||
|
||||
var subject controller.Controller
|
||||
var kubeAPIClient *kubernetesfake.Clientset
|
||||
var aggregatorAPIClient *aggregatorfake.Clientset
|
||||
var kubeInformerClient *kubernetesfake.Clientset
|
||||
var kubeInformers kubeinformers.SharedInformerFactory
|
||||
var timeoutContext context.Context
|
||||
var timeoutContextCancel context.CancelFunc
|
||||
var syncContext *controller.Context
|
||||
|
||||
// Defer starting the informers until the last possible moment so that the
|
||||
// nested Before's can keep adding things to the informer caches.
|
||||
var startInformersAndController = func() {
|
||||
// Set this at the last second to allow for injection of server override.
|
||||
subject = NewCertsManagerController(
|
||||
installedInNamespace,
|
||||
kubeAPIClient,
|
||||
aggregatorAPIClient,
|
||||
kubeInformers.Core().V1().Secrets(),
|
||||
controller.WithInformer,
|
||||
)
|
||||
|
||||
// Set this at the last second to support calling subject.Name().
|
||||
syncContext = &controller.Context{
|
||||
Context: timeoutContext,
|
||||
Name: subject.Name(),
|
||||
Key: controller.Key{
|
||||
Namespace: installedInNamespace,
|
||||
Name: "api-serving-cert",
|
||||
},
|
||||
}
|
||||
|
||||
// Must start informers before calling TestRunSynchronously()
|
||||
kubeInformers.Start(timeoutContext.Done())
|
||||
controller.TestRunSynchronously(t, subject)
|
||||
}
|
||||
|
||||
it.Before(func() {
|
||||
r = require.New(t)
|
||||
|
||||
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
|
||||
|
||||
kubeInformerClient = kubernetesfake.NewSimpleClientset()
|
||||
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
|
||||
kubeAPIClient = kubernetesfake.NewSimpleClientset()
|
||||
aggregatorAPIClient = aggregatorfake.NewSimpleClientset()
|
||||
})
|
||||
|
||||
it.After(func() {
|
||||
timeoutContextCancel()
|
||||
})
|
||||
|
||||
when("there is not yet an api-serving-cert Secret in the installation namespace or it was deleted", func() {
|
||||
it.Before(func() {
|
||||
unrelatedSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "some other secret",
|
||||
Namespace: installedInNamespace,
|
||||
},
|
||||
}
|
||||
err := kubeInformerClient.Tracker().Add(unrelatedSecret)
|
||||
r.NoError(err)
|
||||
})
|
||||
|
||||
when("the APIService exists", func() {
|
||||
it.Before(func() {
|
||||
apiService := &apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: placeholderv1alpha1.SchemeGroupVersion.Version + "." + placeholderv1alpha1.GroupName,
|
||||
},
|
||||
Spec: apiregistrationv1.APIServiceSpec{
|
||||
CABundle: nil,
|
||||
VersionPriority: 1234,
|
||||
},
|
||||
}
|
||||
err := aggregatorAPIClient.Tracker().Add(apiService)
|
||||
r.NoError(err)
|
||||
})
|
||||
|
||||
it("creates the api-serving-cert Secret and updates the APIService's ca bundle", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.NoError(err)
|
||||
|
||||
// Check all the relevant fields from the create Secret action
|
||||
r.Len(kubeAPIClient.Actions(), 1)
|
||||
actualAction := kubeAPIClient.Actions()[0].(coretesting.CreateActionImpl)
|
||||
r.Equal(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}, actualAction.GetResource())
|
||||
r.Equal(installedInNamespace, actualAction.GetNamespace())
|
||||
actualSecret := actualAction.GetObject().(*corev1.Secret)
|
||||
r.Equal("api-serving-cert", actualSecret.Name)
|
||||
r.Equal(installedInNamespace, actualSecret.Namespace)
|
||||
actualCACert := actualSecret.StringData["caCertificate"]
|
||||
actualPrivateKey := actualSecret.StringData["tlsPrivateKey"]
|
||||
actualCertChain := actualSecret.StringData["tlsCertificateChain"]
|
||||
r.NotEmpty(actualCACert)
|
||||
r.NotEmpty(actualPrivateKey)
|
||||
r.NotEmpty(actualCertChain)
|
||||
|
||||
// Validate the created cert using the CA, and also validate the cert's hostname
|
||||
roots := x509.NewCertPool()
|
||||
ok := roots.AppendCertsFromPEM([]byte(actualCACert))
|
||||
r.True(ok)
|
||||
block, _ := pem.Decode([]byte(actualPrivateKey))
|
||||
r.NotNil(block)
|
||||
parsedCert, err := x509.ParseCertificate(block.Bytes)
|
||||
r.NoError(err)
|
||||
opts := x509.VerifyOptions{
|
||||
DNSName: "placeholder-name-api." + installedInNamespace + ".svc",
|
||||
Roots: roots,
|
||||
}
|
||||
_, err = parsedCert.Verify(opts)
|
||||
r.NoError(err)
|
||||
|
||||
// Check the created cert's validity bounds
|
||||
r.WithinDuration(time.Now(), parsedCert.NotBefore, time.Minute*2)
|
||||
r.WithinDuration(time.Now().Add(24*365*time.Hour), parsedCert.NotAfter, time.Minute*2)
|
||||
|
||||
// TODO How can we validate the tlsCertificateChain?
|
||||
|
||||
// Make sure we updated the APIService caBundle and left it otherwise unchanged
|
||||
r.Len(aggregatorAPIClient.Actions(), 2)
|
||||
r.Equal("get", aggregatorAPIClient.Actions()[0].GetVerb())
|
||||
expectedAPIServiceName := placeholderv1alpha1.SchemeGroupVersion.Version + "." + placeholderv1alpha1.GroupName
|
||||
expectedUpdateAction := coretesting.NewUpdateAction(
|
||||
schema.GroupVersionResource{
|
||||
Group: apiregistrationv1.GroupName,
|
||||
Version: "v1",
|
||||
Resource: "apiservices",
|
||||
},
|
||||
"",
|
||||
&apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: expectedAPIServiceName,
|
||||
Namespace: "",
|
||||
},
|
||||
Spec: apiregistrationv1.APIServiceSpec{
|
||||
VersionPriority: 1234, // only the CABundle is updated, this other field is left unchanged
|
||||
CABundle: []byte(actualCACert),
|
||||
},
|
||||
},
|
||||
)
|
||||
r.Equal(expectedUpdateAction, aggregatorAPIClient.Actions()[1])
|
||||
})
|
||||
|
||||
when("updating the APIService fails", func() {
|
||||
it.Before(func() {
|
||||
aggregatorAPIClient.PrependReactor(
|
||||
"update",
|
||||
"apiservices",
|
||||
func(_ coretesting.Action) (bool, runtime.Object, error) {
|
||||
return true, nil, errors.New("update failed")
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
it("returns the update error", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.EqualError(err, "could not update the API service: could not update API service: update failed")
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
when("the APIService does not exist", func() {
|
||||
it.Before(func() {
|
||||
unrelatedAPIService := &apiregistrationv1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "some other api service"},
|
||||
Spec: apiregistrationv1.APIServiceSpec{},
|
||||
}
|
||||
err := aggregatorAPIClient.Tracker().Add(unrelatedAPIService)
|
||||
r.NoError(err)
|
||||
})
|
||||
|
||||
it("returns an error", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.Error(err)
|
||||
r.Regexp("could not get existing version of API service: .* not found", err.Error())
|
||||
})
|
||||
})
|
||||
|
||||
when("creating the Secret fails", func() {
|
||||
it.Before(func() {
|
||||
kubeAPIClient.PrependReactor(
|
||||
"create",
|
||||
"secrets",
|
||||
func(_ coretesting.Action) (bool, runtime.Object, error) {
|
||||
return true, nil, errors.New("create failed")
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
it("returns the create error", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.EqualError(err, "could not create secret: create failed")
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
when("there is an api-serving-cert Secret already in the installation namespace", func() {
|
||||
it.Before(func() {
|
||||
apiServingCertSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "api-serving-cert",
|
||||
Namespace: installedInNamespace,
|
||||
},
|
||||
}
|
||||
err := kubeInformerClient.Tracker().Add(apiServingCertSecret)
|
||||
r.NoError(err)
|
||||
})
|
||||
|
||||
it("does not need to make any API calls with its API clients", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.NoError(err)
|
||||
r.Empty(kubeAPIClient.Actions())
|
||||
r.Empty(aggregatorAPIClient.Actions())
|
||||
})
|
||||
})
|
||||
}, spec.Parallel(), spec.Report(report.Terminal{}))
|
||||
}
|
@ -19,13 +19,13 @@ import (
|
||||
|
||||
type certsObserverController struct {
|
||||
namespace string
|
||||
dynamicCertProvider *provider.DynamicTLSServingCertProvider
|
||||
dynamicCertProvider provider.DynamicTLSServingCertProvider
|
||||
secretInformer corev1informers.SecretInformer
|
||||
}
|
||||
|
||||
func NewCertsObserverController(
|
||||
namespace string,
|
||||
dynamicCertProvider *provider.DynamicTLSServingCertProvider,
|
||||
dynamicCertProvider provider.DynamicTLSServingCertProvider,
|
||||
secretInformer corev1informers.SecretInformer,
|
||||
withInformer placeholdernamecontroller.WithInformerOptionFunc,
|
||||
) controller.Controller {
|
||||
@ -56,14 +56,12 @@ func (c *certsObserverController) Sync(_ controller.Context) error {
|
||||
if notFound {
|
||||
klog.Info("certsObserverController Sync() found that the secret does not exist yet or was deleted")
|
||||
// The secret does not exist yet or was deleted.
|
||||
c.dynamicCertProvider.CertPEM = nil
|
||||
c.dynamicCertProvider.KeyPEM = nil
|
||||
c.dynamicCertProvider.Set(nil, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mutate the in-memory cert provider to update with the latest cert values.
|
||||
c.dynamicCertProvider.CertPEM = certSecret.Data[tlsCertificateChainSecretKey]
|
||||
c.dynamicCertProvider.KeyPEM = certSecret.Data[tlsPrivateKeySecretKey]
|
||||
c.dynamicCertProvider.Set(certSecret.Data[tlsCertificateChainSecretKey], certSecret.Data[tlsPrivateKeySecretKey])
|
||||
klog.Info("certsObserverController Sync updated certs in the dynamic cert provider")
|
||||
return nil
|
||||
}
|
||||
|
232
internal/controller/apicerts/certs_observer_test.go
Normal file
232
internal/controller/apicerts/certs_observer_test.go
Normal file
@ -0,0 +1,232 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package apicerts
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/sclevine/spec"
|
||||
"github.com/sclevine/spec/report"
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
kubernetesfake "k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
"github.com/suzerain-io/controller-go"
|
||||
"github.com/suzerain-io/placeholder-name/internal/provider"
|
||||
"github.com/suzerain-io/placeholder-name/internal/testutil"
|
||||
)
|
||||
|
||||
func TestObserverControllerInformerFilters(t *testing.T) {
|
||||
spec.Run(t, "informer filters", func(t *testing.T, when spec.G, it spec.S) {
|
||||
const installedInNamespace = "some-namespace"
|
||||
|
||||
var r *require.Assertions
|
||||
var observableWithInformerOption *testutil.ObservableWithInformerOption
|
||||
var secretsInformerFilter controller.Filter
|
||||
|
||||
it.Before(func() {
|
||||
r = require.New(t)
|
||||
observableWithInformerOption = testutil.NewObservableWithInformerOption()
|
||||
secretsInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().Secrets()
|
||||
_ = NewCertsObserverController(
|
||||
installedInNamespace,
|
||||
nil,
|
||||
secretsInformer,
|
||||
observableWithInformerOption.WithInformer, // make it possible to observe the behavior of the Filters
|
||||
)
|
||||
secretsInformerFilter = observableWithInformerOption.GetFilterForInformer(secretsInformer)
|
||||
})
|
||||
|
||||
when("watching Secret objects", func() {
|
||||
var subject controller.Filter
|
||||
var target, wrongNamespace, wrongName, unrelated *corev1.Secret
|
||||
|
||||
it.Before(func() {
|
||||
subject = secretsInformerFilter
|
||||
target = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "api-serving-cert", Namespace: installedInNamespace}}
|
||||
wrongNamespace = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "api-serving-cert", Namespace: "wrong-namespace"}}
|
||||
wrongName = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "wrong-name", Namespace: installedInNamespace}}
|
||||
unrelated = &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "wrong-name", Namespace: "wrong-namespace"}}
|
||||
})
|
||||
|
||||
when("the target Secret changes", func() {
|
||||
it("returns true to trigger the sync method", func() {
|
||||
r.True(subject.Add(target))
|
||||
r.True(subject.Update(target, unrelated))
|
||||
r.True(subject.Update(unrelated, target))
|
||||
r.True(subject.Delete(target))
|
||||
})
|
||||
})
|
||||
|
||||
when("a Secret from another namespace changes", func() {
|
||||
it("returns false to avoid triggering the sync method", func() {
|
||||
r.False(subject.Add(wrongNamespace))
|
||||
r.False(subject.Update(wrongNamespace, unrelated))
|
||||
r.False(subject.Update(unrelated, wrongNamespace))
|
||||
r.False(subject.Delete(wrongNamespace))
|
||||
})
|
||||
})
|
||||
|
||||
when("a Secret with a different name changes", func() {
|
||||
it("returns false to avoid triggering the sync method", func() {
|
||||
r.False(subject.Add(wrongName))
|
||||
r.False(subject.Update(wrongName, unrelated))
|
||||
r.False(subject.Update(unrelated, wrongName))
|
||||
r.False(subject.Delete(wrongName))
|
||||
})
|
||||
})
|
||||
|
||||
when("a Secret with a different name and a different namespace changes", func() {
|
||||
it("returns false to avoid triggering the sync method", func() {
|
||||
r.False(subject.Add(unrelated))
|
||||
r.False(subject.Update(unrelated, unrelated))
|
||||
r.False(subject.Delete(unrelated))
|
||||
})
|
||||
})
|
||||
})
|
||||
}, spec.Parallel(), spec.Report(report.Terminal{}))
|
||||
}
|
||||
|
||||
func TestObserverControllerSync(t *testing.T) {
|
||||
spec.Run(t, "Sync", func(t *testing.T, when spec.G, it spec.S) {
|
||||
const installedInNamespace = "some-namespace"
|
||||
|
||||
var r *require.Assertions
|
||||
|
||||
var subject controller.Controller
|
||||
var kubeInformerClient *kubernetesfake.Clientset
|
||||
var kubeInformers kubeinformers.SharedInformerFactory
|
||||
var timeoutContext context.Context
|
||||
var timeoutContextCancel context.CancelFunc
|
||||
var syncContext *controller.Context
|
||||
var dynamicCertProvider provider.DynamicTLSServingCertProvider
|
||||
|
||||
// Defer starting the informers until the last possible moment so that the
|
||||
// nested Before's can keep adding things to the informer caches.
|
||||
var startInformersAndController = func() {
|
||||
// Set this at the last second to allow for injection of server override.
|
||||
subject = NewCertsObserverController(
|
||||
installedInNamespace,
|
||||
dynamicCertProvider,
|
||||
kubeInformers.Core().V1().Secrets(),
|
||||
controller.WithInformer,
|
||||
)
|
||||
|
||||
// Set this at the last second to support calling subject.Name().
|
||||
syncContext = &controller.Context{
|
||||
Context: timeoutContext,
|
||||
Name: subject.Name(),
|
||||
Key: controller.Key{
|
||||
Namespace: installedInNamespace,
|
||||
Name: "api-serving-cert",
|
||||
},
|
||||
}
|
||||
|
||||
// Must start informers before calling TestRunSynchronously()
|
||||
kubeInformers.Start(timeoutContext.Done())
|
||||
controller.TestRunSynchronously(t, subject)
|
||||
}
|
||||
|
||||
it.Before(func() {
|
||||
r = require.New(t)
|
||||
|
||||
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
|
||||
|
||||
kubeInformerClient = kubernetesfake.NewSimpleClientset()
|
||||
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
|
||||
dynamicCertProvider = provider.NewDynamicTLSServingCertProvider()
|
||||
})
|
||||
|
||||
it.After(func() {
|
||||
timeoutContextCancel()
|
||||
})
|
||||
|
||||
when("there is not yet an api-serving-cert Secret in the installation namespace or it was deleted", func() {
|
||||
it.Before(func() {
|
||||
unrelatedSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "some other secret",
|
||||
Namespace: installedInNamespace,
|
||||
},
|
||||
}
|
||||
err := kubeInformerClient.Tracker().Add(unrelatedSecret)
|
||||
r.NoError(err)
|
||||
|
||||
dynamicCertProvider.Set([]byte("some cert"), []byte("some private key"))
|
||||
})
|
||||
|
||||
it("sets the dynamicCertProvider's cert and key to nil", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.NoError(err)
|
||||
|
||||
actualCertChain, actualKey := dynamicCertProvider.CurrentCertKeyContent()
|
||||
r.Nil(actualCertChain)
|
||||
r.Nil(actualKey)
|
||||
})
|
||||
})
|
||||
|
||||
when("there is an api-serving-cert Secret with the expected keys already in the installation namespace", func() {
|
||||
it.Before(func() {
|
||||
apiServingCertSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "api-serving-cert",
|
||||
Namespace: installedInNamespace,
|
||||
},
|
||||
Data: map[string][]byte{
|
||||
"caCertificate": []byte("fake cert"),
|
||||
"tlsPrivateKey": []byte("fake private key"),
|
||||
"tlsCertificateChain": []byte("fake cert chain"),
|
||||
},
|
||||
}
|
||||
err := kubeInformerClient.Tracker().Add(apiServingCertSecret)
|
||||
r.NoError(err)
|
||||
|
||||
dynamicCertProvider.Set(nil, nil)
|
||||
})
|
||||
|
||||
it("updates the dynamicCertProvider's cert and key", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.NoError(err)
|
||||
|
||||
actualCertChain, actualKey := dynamicCertProvider.CurrentCertKeyContent()
|
||||
r.Equal("fake cert chain", string(actualCertChain))
|
||||
r.Equal("fake private key", string(actualKey))
|
||||
})
|
||||
})
|
||||
|
||||
when("the api-serving-cert Secret exists but is missing the expected keys", func() {
|
||||
it.Before(func() {
|
||||
apiServingCertSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "api-serving-cert",
|
||||
Namespace: installedInNamespace,
|
||||
},
|
||||
Data: map[string][]byte{},
|
||||
}
|
||||
err := kubeInformerClient.Tracker().Add(apiServingCertSecret)
|
||||
r.NoError(err)
|
||||
|
||||
dynamicCertProvider.Set(nil, nil)
|
||||
})
|
||||
|
||||
it("set the missing values in the dynamicCertProvider as nil", func() {
|
||||
startInformersAndController()
|
||||
err := controller.TestSync(t, subject, *syncContext)
|
||||
r.NoError(err)
|
||||
|
||||
actualCertChain, actualKey := dynamicCertProvider.CurrentCertKeyContent()
|
||||
r.Nil(actualCertChain)
|
||||
r.Nil(actualKey)
|
||||
})
|
||||
})
|
||||
}, spec.Parallel(), spec.Report(report.Terminal{}))
|
||||
}
|
@ -24,41 +24,24 @@ import (
|
||||
coretesting "k8s.io/client-go/testing"
|
||||
|
||||
"github.com/suzerain-io/controller-go"
|
||||
"github.com/suzerain-io/placeholder-name/internal/testutil"
|
||||
crdsplaceholderv1alpha1 "github.com/suzerain-io/placeholder-name/kubernetes/1.19/api/apis/crdsplaceholder/v1alpha1"
|
||||
placeholderfake "github.com/suzerain-io/placeholder-name/kubernetes/1.19/client-go/clientset/versioned/fake"
|
||||
placeholderinformers "github.com/suzerain-io/placeholder-name/kubernetes/1.19/client-go/informers/externalversions"
|
||||
)
|
||||
|
||||
type ObservableWithInformerOption struct {
|
||||
InformerToFilterMap map[controller.InformerGetter]controller.Filter
|
||||
}
|
||||
|
||||
func NewObservableWithInformerOption() *ObservableWithInformerOption {
|
||||
return &ObservableWithInformerOption{
|
||||
InformerToFilterMap: make(map[controller.InformerGetter]controller.Filter),
|
||||
}
|
||||
}
|
||||
|
||||
func (owi *ObservableWithInformerOption) WithInformer(
|
||||
getter controller.InformerGetter,
|
||||
filter controller.Filter,
|
||||
opt controller.InformerOption) controller.Option {
|
||||
owi.InformerToFilterMap[getter] = filter
|
||||
return controller.WithInformer(getter, filter, opt)
|
||||
}
|
||||
|
||||
func TestInformerFilters(t *testing.T) {
|
||||
spec.Run(t, "informer filters", func(t *testing.T, when spec.G, it spec.S) {
|
||||
const installedInNamespace = "some-namespace"
|
||||
|
||||
var r *require.Assertions
|
||||
var observableWithInformerOption *ObservableWithInformerOption
|
||||
var observableWithInformerOption *testutil.ObservableWithInformerOption
|
||||
var configMapInformerFilter controller.Filter
|
||||
var loginDiscoveryConfigInformerFilter controller.Filter
|
||||
|
||||
it.Before(func() {
|
||||
r = require.New(t)
|
||||
observableWithInformerOption = NewObservableWithInformerOption()
|
||||
observableWithInformerOption = testutil.NewObservableWithInformerOption()
|
||||
configMapInformer := kubeinformers.NewSharedInformerFactory(nil, 0).Core().V1().ConfigMaps()
|
||||
loginDiscoveryConfigInformer := placeholderinformers.NewSharedInformerFactory(nil, 0).Crds().V1alpha1().LoginDiscoveryConfigs()
|
||||
_ = NewPublisherController(
|
||||
@ -69,8 +52,8 @@ func TestInformerFilters(t *testing.T) {
|
||||
loginDiscoveryConfigInformer,
|
||||
observableWithInformerOption.WithInformer, // make it possible to observe the behavior of the Filters
|
||||
)
|
||||
configMapInformerFilter = observableWithInformerOption.InformerToFilterMap[configMapInformer]
|
||||
loginDiscoveryConfigInformerFilter = observableWithInformerOption.InformerToFilterMap[loginDiscoveryConfigInformer]
|
||||
configMapInformerFilter = observableWithInformerOption.GetFilterForInformer(configMapInformer)
|
||||
loginDiscoveryConfigInformerFilter = observableWithInformerOption.GetFilterForInformer(loginDiscoveryConfigInformer)
|
||||
})
|
||||
|
||||
when("watching ConfigMap objects", func() {
|
||||
|
@ -33,7 +33,7 @@ const (
|
||||
func PrepareControllers(
|
||||
serverInstallationNamespace string,
|
||||
discoveryURLOverride *string,
|
||||
dynamicCertProvider *provider.DynamicTLSServingCertProvider,
|
||||
dynamicCertProvider provider.DynamicTLSServingCertProvider,
|
||||
) (func(ctx context.Context), error) {
|
||||
// Create k8s clients.
|
||||
k8sClient, aggregatorClient, placeholderClient, err := createClients()
|
||||
|
@ -5,15 +5,40 @@ SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package provider
|
||||
|
||||
type DynamicTLSServingCertProvider struct {
|
||||
CertPEM []byte
|
||||
KeyPEM []byte
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||
)
|
||||
|
||||
type DynamicTLSServingCertProvider interface {
|
||||
dynamiccertificates.CertKeyContentProvider
|
||||
Set(certPEM, keyPEM []byte)
|
||||
}
|
||||
|
||||
func (*DynamicTLSServingCertProvider) Name() string {
|
||||
type dynamicTLSServingCertProvider struct {
|
||||
certPEM []byte
|
||||
keyPEM []byte
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
func NewDynamicTLSServingCertProvider() DynamicTLSServingCertProvider {
|
||||
return &dynamicTLSServingCertProvider{}
|
||||
}
|
||||
|
||||
func (p *dynamicTLSServingCertProvider) Set(certPEM, keyPEM []byte) {
|
||||
p.mutex.Lock() // acquire a write lock
|
||||
defer p.mutex.Unlock()
|
||||
p.certPEM = certPEM
|
||||
p.keyPEM = keyPEM
|
||||
}
|
||||
|
||||
func (p *dynamicTLSServingCertProvider) Name() string {
|
||||
return "DynamicTLSServingCertProvider"
|
||||
}
|
||||
|
||||
func (p *DynamicTLSServingCertProvider) CurrentCertKeyContent() (cert []byte, key []byte) {
|
||||
return p.CertPEM, p.KeyPEM
|
||||
func (p *dynamicTLSServingCertProvider) CurrentCertKeyContent() (cert []byte, key []byte) {
|
||||
p.mutex.RLock() // acquire a read lock
|
||||
defer p.mutex.RUnlock()
|
||||
return p.certPEM, p.keyPEM
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
|
||||
// App is an object that represents the placeholder-name-server application.
|
||||
type App struct {
|
||||
serverCommand *cobra.Command
|
||||
cmd *cobra.Command
|
||||
|
||||
// CLI flags
|
||||
configPath string
|
||||
@ -48,27 +48,27 @@ func New(ctx context.Context, args []string, stdout, stderr io.Writer) *App {
|
||||
}
|
||||
|
||||
// Run the server.
|
||||
func (app *App) Run() error {
|
||||
return app.serverCommand.Execute()
|
||||
func (a *App) Run() error {
|
||||
return a.cmd.Execute()
|
||||
}
|
||||
|
||||
// Create the server command and save it into the App.
|
||||
func (app *App) addServerCommand(ctx context.Context, args []string, stdout, stderr io.Writer) {
|
||||
func (a *App) addServerCommand(ctx context.Context, args []string, stdout, stderr io.Writer) {
|
||||
cmd := &cobra.Command{
|
||||
Use: `placeholder-name-server`,
|
||||
Long: "placeholder-name-server provides a generic API for mapping an external\n" +
|
||||
"credential from somewhere to an internal credential to be used for\n" +
|
||||
"authenticating to the Kubernetes API.",
|
||||
RunE: func(cmd *cobra.Command, args []string) error { return app.runServer(ctx) },
|
||||
RunE: func(cmd *cobra.Command, args []string) error { return a.runServer(ctx) },
|
||||
Args: cobra.NoArgs,
|
||||
}
|
||||
|
||||
cmd.SetArgs(args)
|
||||
cmd.SetOut(stdout)
|
||||
cmd.SetErr(stderr)
|
||||
addCommandlineFlagsToCommand(cmd, app)
|
||||
addCommandlineFlagsToCommand(cmd, a)
|
||||
|
||||
app.serverCommand = cmd
|
||||
a.cmd = cmd
|
||||
}
|
||||
|
||||
// Define the app's commandline flags.
|
||||
@ -104,15 +104,15 @@ func addCommandlineFlagsToCommand(cmd *cobra.Command, app *App) {
|
||||
}
|
||||
|
||||
// Boot the aggregated API server, which will in turn boot the controllers.
|
||||
func (app *App) runServer(ctx context.Context) error {
|
||||
func (a *App) runServer(ctx context.Context) error {
|
||||
// Read the server config file.
|
||||
cfg, err := config.FromPath(app.configPath)
|
||||
cfg, err := config.FromPath(a.configPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not load config: %w", err)
|
||||
}
|
||||
|
||||
// Load the Kubernetes cluster signing CA.
|
||||
k8sClusterCA, err := certauthority.Load(app.clusterSigningCertFilePath, app.clusterSigningKeyFilePath)
|
||||
k8sClusterCA, err := certauthority.Load(a.clusterSigningCertFilePath, a.clusterSigningKeyFilePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not load cluster signing CA: %w", err)
|
||||
}
|
||||
@ -124,7 +124,7 @@ func (app *App) runServer(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Discover in which namespace we are installed.
|
||||
podInfo, err := downward.Load(app.downwardAPIPath)
|
||||
podInfo, err := downward.Load(a.downwardAPIPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not read pod metadata: %w", err)
|
||||
}
|
||||
@ -135,7 +135,7 @@ func (app *App) runServer(ctx context.Context) error {
|
||||
// is stored in a k8s Secret. Therefore it also effectively acting as
|
||||
// an in-memory cache of what is stored in the k8s Secret, helping to
|
||||
// keep incoming requests fast.
|
||||
dynamicCertProvider := &provider.DynamicTLSServingCertProvider{}
|
||||
dynamicCertProvider := provider.NewDynamicTLSServingCertProvider()
|
||||
|
||||
// Prepare to start the controllers, but defer actually starting them until the
|
||||
// post start hook of the aggregated API server.
|
||||
@ -171,7 +171,7 @@ func (app *App) runServer(ctx context.Context) error {
|
||||
|
||||
// Create a configuration for the aggregated API server.
|
||||
func getAggregatedAPIServerConfig(
|
||||
dynamicCertProvider *provider.DynamicTLSServingCertProvider,
|
||||
dynamicCertProvider provider.DynamicTLSServingCertProvider,
|
||||
webhookTokenAuthenticator *webhook.WebhookTokenAuthenticator,
|
||||
ca *certauthority.CA,
|
||||
startControllersPostStartHook func(context.Context),
|
||||
|
@ -78,7 +78,7 @@ func TestCommand(t *testing.T) {
|
||||
stderr := bytes.NewBuffer([]byte{})
|
||||
|
||||
a := New(context.Background(), test.args, stdout, stderr)
|
||||
a.serverCommand.RunE = func(cmd *cobra.Command, args []string) error {
|
||||
a.cmd.RunE = func(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
err := a.Run()
|
||||
|
30
internal/testutil/observable_with_informer_option.go
Normal file
30
internal/testutil/observable_with_informer_option.go
Normal file
@ -0,0 +1,30 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package testutil
|
||||
|
||||
import "github.com/suzerain-io/controller-go"
|
||||
|
||||
type ObservableWithInformerOption struct {
|
||||
InformerToFilterMap map[controller.InformerGetter]controller.Filter
|
||||
}
|
||||
|
||||
func NewObservableWithInformerOption() *ObservableWithInformerOption {
|
||||
return &ObservableWithInformerOption{
|
||||
InformerToFilterMap: make(map[controller.InformerGetter]controller.Filter),
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ObservableWithInformerOption) WithInformer(
|
||||
getter controller.InformerGetter,
|
||||
filter controller.Filter,
|
||||
opt controller.InformerOption) controller.Option {
|
||||
i.InformerToFilterMap[getter] = filter
|
||||
return controller.WithInformer(getter, filter, opt)
|
||||
}
|
||||
|
||||
func (i *ObservableWithInformerOption) GetFilterForInformer(getter controller.InformerGetter) controller.Filter {
|
||||
return i.InformerToFilterMap[getter]
|
||||
}
|
Loading…
Reference in New Issue
Block a user