Make the PublisherController use informers

Signed-off-by: Andrew Keesler <akeesler@vmware.com>
This commit is contained in:
Ryan Richard 2020-07-30 14:34:13 -07:00 committed by Andrew Keesler
parent ee865fe97f
commit 5aebb76146
4 changed files with 119 additions and 104 deletions

2
go.mod
View File

@ -10,7 +10,7 @@ require (
github.com/sclevine/spec v1.4.0
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.6.1
github.com/suzerain-io/controller-go v0.0.0-20200728175738-b49edda60499
github.com/suzerain-io/controller-go v0.0.0-20200730212956-7f99b569ca9f
github.com/suzerain-io/placeholder-name-api v0.0.0-20200730131400-4a1da8d7e70b
github.com/suzerain-io/placeholder-name-client-go v0.0.0-20200729202601-9b4b6d38494c
k8s.io/api v0.19.0-rc.0

4
go.sum
View File

@ -530,8 +530,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/suzerain-io/controller-go v0.0.0-20200728175738-b49edda60499 h1:2NQMYv6IIjcYGkNdezwcY7/MS/3goEZThlz7u6ml9j4=
github.com/suzerain-io/controller-go v0.0.0-20200728175738-b49edda60499/go.mod h1:+v9upryFWBJac6KXKlheGHr7e3kqpk1ldH1iIMFopMs=
github.com/suzerain-io/controller-go v0.0.0-20200730212956-7f99b569ca9f h1:gZ6rAdl+VE9DT0yE52xY/kJZ/hOJYxwtsgGoPr5vItI=
github.com/suzerain-io/controller-go v0.0.0-20200730212956-7f99b569ca9f/go.mod h1:+v9upryFWBJac6KXKlheGHr7e3kqpk1ldH1iIMFopMs=
github.com/suzerain-io/placeholder-name-api v0.0.0-20200729202220-f1696913d7c9 h1:xnco3XJMrvlwyQJfKoyVPciATvCJ3Y6SY2D8gI2DT2E=
github.com/suzerain-io/placeholder-name-api v0.0.0-20200729202220-f1696913d7c9/go.mod h1:OuYBJDpMMnvMUoBn+XeMWtHghuYk0cq9bNkNa3T8j/g=
github.com/suzerain-io/placeholder-name-api v0.0.0-20200730131400-4a1da8d7e70b h1:7Fuizf0c3ffqyHj7X4AvXnYNFJHbSgHKjuDxDsxeQ8A=

View File

@ -12,13 +12,14 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"github.com/suzerain-io/controller-go"
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder/v1alpha1"
placeholderclientset "github.com/suzerain-io/placeholder-name-client-go/pkg/generated/clientset/versioned"
placeholderv1alpha1informers "github.com/suzerain-io/placeholder-name-client-go/pkg/generated/informers/externalversions/placeholder/v1alpha1"
)
const (
@ -30,29 +31,46 @@ const (
)
type publisherController struct {
namespace string
kubeClient kubernetes.Interface
placeholderClient placeholderclientset.Interface
namespace string
placeholderClient placeholderclientset.Interface
configMapInformer corev1informers.ConfigMapInformer
loginDiscoveryConfigInformer placeholderv1alpha1informers.LoginDiscoveryConfigInformer
}
func NewPublisherController(namespace string, kubeClient kubernetes.Interface, placeholderClient placeholderclientset.Interface) controller.Controller {
func NewPublisherController(
namespace string,
placeholderClient placeholderclientset.Interface,
configMapInformer corev1informers.ConfigMapInformer,
loginDiscoveryConfigInformer placeholderv1alpha1informers.LoginDiscoveryConfigInformer,
) controller.Controller {
return controller.New(
controller.Config{
Name: "publisher-controller",
Syncer: &publisherController{
namespace: namespace,
kubeClient: kubeClient,
placeholderClient: placeholderClient,
namespace: namespace,
placeholderClient: placeholderClient,
configMapInformer: configMapInformer,
loginDiscoveryConfigInformer: loginDiscoveryConfigInformer,
},
},
controller.WithInformer(
configMapInformer,
controller.FilterFuncs{}, // TODO fix this and write tests
controller.InformerOption{},
),
controller.WithInformer(
loginDiscoveryConfigInformer,
controller.FilterFuncs{}, // TODO fix this and write tests
controller.InformerOption{},
),
)
}
func (c *publisherController) Sync(ctx controller.Context) error {
configMap, err := c.kubeClient.
CoreV1().
configMap, err := c.configMapInformer.
Lister().
ConfigMaps(clusterInfoNamespace).
Get(ctx.Context, clusterInfoName, metav1.GetOptions{})
Get(clusterInfoName)
notFound := k8serrors.IsNotFound(err)
if err != nil && !notFound {
return fmt.Errorf("failed to get %s configmap: %w", clusterInfoName, err)
@ -103,20 +121,18 @@ func (c *publisherController) createOrUpdateLoginDiscoveryConfig(
ctx context.Context,
discoveryConfig *placeholderv1alpha1.LoginDiscoveryConfig,
) error {
loginDiscoveryConfigs := c.placeholderClient.
PlaceholderV1alpha1().
LoginDiscoveryConfigs(c.namespace)
existingDiscoveryConfig, err := loginDiscoveryConfigs.Get(
ctx,
discoveryConfig.Name,
metav1.GetOptions{},
)
existingDiscoveryConfig, err := c.loginDiscoveryConfigInformer.
Lister().
LoginDiscoveryConfigs(c.namespace).
Get(discoveryConfig.Name)
notFound := k8serrors.IsNotFound(err)
if err != nil && !notFound {
return fmt.Errorf("could not get logindiscoveryconfig: %w", err)
}
loginDiscoveryConfigs := c.placeholderClient.
PlaceholderV1alpha1().
LoginDiscoveryConfigs(c.namespace)
if notFound {
if _, err := loginDiscoveryConfigs.Create(
ctx,

View File

@ -19,12 +19,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeinformers "k8s.io/client-go/informers"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
coretesting "k8s.io/client-go/testing"
"github.com/suzerain-io/controller-go"
placeholderv1alpha1 "github.com/suzerain-io/placeholder-name-api/pkg/apis/placeholder/v1alpha1"
placeholderfake "github.com/suzerain-io/placeholder-name-client-go/pkg/generated/clientset/versioned/fake"
placeholderinformers "github.com/suzerain-io/placeholder-name-client-go/pkg/generated/informers/externalversions"
)
func TestRun(t *testing.T) {
@ -34,11 +36,14 @@ func TestRun(t *testing.T) {
var r *require.Assertions
var subject controller.Controller
var kubeClient *kubernetesfake.Clientset
var placeholderClient *placeholderfake.Clientset
var kubeInformerClient *kubernetesfake.Clientset
var placeholderInformerClient *placeholderfake.Clientset
var kubeInformers kubeinformers.SharedInformerFactory
var placeholderInformers placeholderinformers.SharedInformerFactory
var placeholderAPIClient *placeholderfake.Clientset
var timeoutContext context.Context
var timeoutContextCancel context.CancelFunc
var controllerContext *controller.Context
var syncContext *controller.Context
var expectedLoginDiscoveryConfig = func(expectedNamespace, expectedServerURL, expectedCAData string) (schema.GroupVersionResource, *placeholderv1alpha1.LoginDiscoveryConfig) {
expectedLoginDiscoveryConfigGVR := schema.GroupVersionResource{
@ -59,13 +64,34 @@ func TestRun(t *testing.T) {
return expectedLoginDiscoveryConfigGVR, expectedLoginDiscoveryConfig
}
// Defer starting the informers until the last possible moment so that the
// nested Before's can keep adding things to the informer caches.
var startInformersAndController = func() {
// Must start informers before calling TestRunSynchronously()
kubeInformers.Start(timeoutContext.Done())
placeholderInformers.Start(timeoutContext.Done())
controller.TestRunSynchronously(t, subject)
}
it.Before(func() {
r = require.New(t)
kubeClient = kubernetesfake.NewSimpleClientset()
placeholderClient = placeholderfake.NewSimpleClientset()
timeoutContext, timeoutContextCancel = context.WithTimeout(context.Background(), time.Second*3)
subject = NewPublisherController(installedInNamespace, kubeClient, placeholderClient)
controllerContext = &controller.Context{
kubeInformerClient = kubernetesfake.NewSimpleClientset()
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
placeholderAPIClient = placeholderfake.NewSimpleClientset()
placeholderInformerClient = placeholderfake.NewSimpleClientset()
placeholderInformers = placeholderinformers.NewSharedInformerFactory(placeholderInformerClient, 0)
subject = NewPublisherController(
installedInNamespace,
placeholderAPIClient,
kubeInformers.Core().V1().ConfigMaps(),
placeholderInformers.Placeholder().V1alpha1().LoginDiscoveryConfigs(),
)
syncContext = &controller.Context{
Context: timeoutContext,
Name: subject.Name(),
Key: controller.Key{
@ -75,14 +101,17 @@ func TestRun(t *testing.T) {
}
})
it.After(func() {
timeoutContextCancel()
})
when("there is a cluster-info ConfigMap in the kube-public namespace", func() {
const caData = "c29tZS1jZXJ0aWZpY2F0ZS1hdXRob3JpdHktZGF0YQo=" // "some-certificate-authority-data" base64 encoded
const kubeServerURL = "https://some-server"
var clusterInfoConfigMap *corev1.ConfigMap
when("the ConfigMap has the expected `kubeconfig` top-level data key", func() {
it.Before(func() {
clusterInfoConfigMap = &corev1.ConfigMap{
clusterInfoConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "cluster-info", Namespace: "kube-public"},
// Note that go fmt puts tabs in our file, which we must remove from our configmap yaml below.
Data: map[string]string{
@ -97,15 +126,14 @@ func TestRun(t *testing.T) {
"uninteresting-key": "uninteresting-value",
},
}
err := kubeClient.Tracker().Add(clusterInfoConfigMap)
err := kubeInformerClient.Tracker().Add(clusterInfoConfigMap)
r.NoError(err)
})
when("the LoginDiscoveryConfig does not already exist", func() {
it("creates a LoginDiscoveryConfig", func() {
defer timeoutContextCancel()
err := controller.TestSync(t, subject, *controllerContext)
it.Focus("creates a LoginDiscoveryConfig", func() {
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.NoError(err)
expectedLoginDiscoveryConfigGVR, expectedLoginDiscoveryConfig := expectedLoginDiscoveryConfig(
@ -113,24 +141,22 @@ func TestRun(t *testing.T) {
kubeServerURL,
caData,
)
expectedActions := []coretesting.Action{
coretesting.NewGetAction(
expectedLoginDiscoveryConfigGVR,
installedInNamespace,
expectedLoginDiscoveryConfig.Name,
),
coretesting.NewCreateAction(
expectedLoginDiscoveryConfigGVR,
installedInNamespace,
expectedLoginDiscoveryConfig,
),
}
r.Equal(expectedActions, placeholderClient.Actions())
r.Equal(
[]coretesting.Action{
coretesting.NewCreateAction(
expectedLoginDiscoveryConfigGVR,
installedInNamespace,
expectedLoginDiscoveryConfig,
),
},
placeholderAPIClient.Actions(),
)
})
when("creating the LoginDiscoveryConfig fails", func() {
it.Before(func() {
placeholderClient.PrependReactor(
placeholderAPIClient.PrependReactor(
"create",
"logindiscoveryconfigs",
func(_ coretesting.Action) (bool, runtime.Object, error) {
@ -140,7 +166,8 @@ func TestRun(t *testing.T) {
})
it("returns the create error", func() {
err := controller.TestSync(t, subject, *controllerContext)
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.EqualError(err, "could not create logindiscoveryconfig: create failed")
})
})
@ -154,44 +181,16 @@ func TestRun(t *testing.T) {
kubeServerURL,
caData,
)
err := placeholderClient.Tracker().Add(expectedLoginDiscoveryConfig)
err := placeholderInformerClient.Tracker().Add(expectedLoginDiscoveryConfig)
r.NoError(err)
})
it("does not update the LoginDiscoveryConfig to avoid unnecessary etcd writes/api calls", func() {
err := controller.TestSync(t, subject, *controllerContext)
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.NoError(err)
expectedLoginDiscoveryConfigGVR, expectedLoginDiscoveryConfig := expectedLoginDiscoveryConfig(
installedInNamespace,
kubeServerURL,
caData,
)
expectedActions := []coretesting.Action{
coretesting.NewGetAction(
expectedLoginDiscoveryConfigGVR,
installedInNamespace,
expectedLoginDiscoveryConfig.Name,
),
}
r.Equal(expectedActions, placeholderClient.Actions())
})
when("getting the LoginDiscoveryConfig fails", func() {
it.Before(func() {
placeholderClient.PrependReactor(
"get",
"logindiscoveryconfigs",
func(_ coretesting.Action) (bool, runtime.Object, error) {
return true, nil, errors.New("get failed")
},
)
})
it("returns the get error", func() {
err := controller.TestSync(t, subject, *controllerContext)
r.EqualError(err, "could not get logindiscoveryconfig: get failed")
})
r.Empty(placeholderAPIClient.Actions())
})
})
@ -203,12 +202,13 @@ func TestRun(t *testing.T) {
caData,
)
expectedLoginDiscoveryConfig.Spec.Server = "https://some-other-server"
err := placeholderClient.Tracker().Add(expectedLoginDiscoveryConfig)
r.NoError(err)
r.NoError(placeholderInformerClient.Tracker().Add(expectedLoginDiscoveryConfig))
r.NoError(placeholderAPIClient.Tracker().Add(expectedLoginDiscoveryConfig))
})
it("updates the existing LoginDiscoveryConfig", func() {
err := controller.TestSync(t, subject, *controllerContext)
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.NoError(err)
expectedLoginDiscoveryConfigGVR, expectedLoginDiscoveryConfig := expectedLoginDiscoveryConfig(
@ -217,23 +217,18 @@ func TestRun(t *testing.T) {
caData,
)
expectedActions := []coretesting.Action{
coretesting.NewGetAction(
expectedLoginDiscoveryConfigGVR,
installedInNamespace,
expectedLoginDiscoveryConfig.Name,
),
coretesting.NewUpdateAction(
expectedLoginDiscoveryConfigGVR,
installedInNamespace,
expectedLoginDiscoveryConfig,
),
}
r.Equal(expectedActions, placeholderClient.Actions())
r.Equal(expectedActions, placeholderAPIClient.Actions())
})
when("updating the LoginDiscoveryConfig fails", func() {
it.Before(func() {
placeholderClient.PrependReactor(
placeholderAPIClient.PrependReactor(
"update",
"logindiscoveryconfigs",
func(_ coretesting.Action) (bool, runtime.Object, error) {
@ -243,7 +238,8 @@ func TestRun(t *testing.T) {
})
it("returns the update error", func() {
err := controller.TestSync(t, subject, *controllerContext)
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.EqualError(err, "could not update logindiscoveryconfig: update failed")
})
})
@ -253,20 +249,21 @@ func TestRun(t *testing.T) {
when("the ConfigMap is missing the expected `kubeconfig` top-level data key", func() {
it.Before(func() {
clusterInfoConfigMap = &corev1.ConfigMap{
clusterInfoConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "cluster-info", Namespace: "kube-public"},
Data: map[string]string{
"these are not the droids you're looking for": "uninteresting-value",
},
}
err := kubeClient.Tracker().Add(clusterInfoConfigMap)
err := kubeInformerClient.Tracker().Add(clusterInfoConfigMap)
r.NoError(err)
})
it("keeps waiting for it to exist", func() {
err := controller.TestSync(t, subject, *controllerContext)
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Empty(placeholderClient.Actions())
r.Empty(placeholderAPIClient.Actions())
})
})
})
@ -279,20 +276,21 @@ func TestRun(t *testing.T) {
Namespace: "kube-public",
},
}
err := kubeClient.Tracker().Add(unrelatedConfigMap)
err := kubeInformerClient.Tracker().Add(unrelatedConfigMap)
r.NoError(err)
})
it("keeps waiting for one", func() {
err := controller.TestSync(t, subject, *controllerContext)
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.NoError(err)
r.Empty(placeholderClient.Actions())
r.Empty(placeholderAPIClient.Actions())
})
})
when("getting the cluster-info ConfigMap in the kube-public namespace fails", func() {
it.Before(func() {
kubeClient.PrependReactor(
kubeInformerClient.PrependReactor(
"get",
"configmaps",
func(_ coretesting.Action) (bool, runtime.Object, error) {
@ -302,7 +300,8 @@ func TestRun(t *testing.T) {
})
it("returns an error", func() {
err := controller.TestSync(t, subject, *controllerContext)
startInformersAndController()
err := controller.TestSync(t, subject, *syncContext)
r.EqualError(err, "failed to get cluster-info configmap: get failed")
})
})