diff --git a/internal/controller/controller.go b/internal/controller/controller.go new file mode 100644 index 00000000..b1bd9c6c --- /dev/null +++ b/internal/controller/controller.go @@ -0,0 +1,216 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/events" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +// Controller interface represents a runnable Kubernetes controller. +// Cancelling the context passed will cause the controller to shutdown. +// Number of workers determine how much parallel the job processing should be. +type Controller interface { + // Run runs the controller and blocks until the controller is finished. + // Number of workers can be specified via workers parameter. + // This function will return when all internal loops are finished. + // Note that having more than one worker usually means handing parallelization of Sync(). + Run(ctx context.Context, workers int) + + // Name returns the controller name string. + Name() string + + // The methods below should only be called during tests via the Test* functions. + + // sync contains the main controller logic. + // This can be used in unit tests to exercise the Syncer by directly calling it. + sync(ctx Context) error + + // wrap wraps the main controller logic provided via the Syncer. + // This can be used in tests to synchronize asynchronous events as seen by a running controller. + // The wrapping must be done after New is called and before Run is called. + wrap(wrapper SyncWrapperFunc) +} + +var _ Controller = &controller{} + +type Config struct { + Name string + Syncer Syncer +} + +func New(config Config, opts ...Option) Controller { + c := &controller{ + config: config, + } + + // set up defaults + WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c) + WithRecorder(klogRecorder{})(c) + + for _, opt := range opts { + opt(c) + } + + return c +} + +type controller struct { + config Config + + queue workqueue.RateLimitingInterface + queueWrapper Queue + maxRetries int + recorder events.EventRecorder + + run bool + runOpts []Option + + cacheSyncs []cache.InformerSynced +} + +func (c *controller) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash(crash) // prevent panics from killing the process + + klog.InfoS("starting controller", "controller", c.Name(), "workers", workers) + + c.run = true + for _, opt := range c.runOpts { + opt(c) + } + + if !c.waitForCacheSyncWithTimeout() { + panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name()))) + } + + var workerWg sync.WaitGroup + + // workerContext is used to track and initiate worker shutdown + workerContext, workerContextCancel := context.WithCancel(context.Background()) + + defer func() { + c.queue.ShutDown() // shutdown the controller queue first + workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown + + // Wait for all workers to finish their job. + // at this point the Run() can hang and callers have to implement the logic that will kill + // this controller (SIGKILL). + workerWg.Wait() + klog.InfoS("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers) + }() + + for i := 1; i <= workers; i++ { + idx := i + klog.InfoS("starting worker", "controller", c.Name(), "worker", idx) + workerWg.Add(1) + go func() { + defer utilruntime.HandleCrash(crash) // prevent panics from killing the process + defer func() { + klog.InfoS("shutting down worker", "controller", c.Name(), "worker", idx) + workerWg.Done() + }() + c.runWorker(workerContext) + }() + } + + <-ctx.Done() // wait for controller context to be cancelled +} + +func (c *controller) Name() string { + return c.config.Name +} + +func (c *controller) sync(ctx Context) error { + return c.config.Syncer.Sync(ctx) +} + +func (c *controller) wrap(wrapper SyncWrapperFunc) { + c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) { + c.config.Syncer = wrapper(c.config.Syncer) + })) +} + +func (c *controller) waitForCacheSyncWithTimeout() bool { + // prevent us from blocking forever due to a broken informer + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...) +} + +func (c *controller) add(filter Filter, object metav1.Object) { + key := filter.Parent(object) + c.queueWrapper.Add(key) +} + +// runWorker runs a single worker +// The worker is asked to terminate when the passed context is cancelled. +func (c *controller) runWorker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + c.processNextWorkItem(ctx) + } + } +} + +func (c *controller) processNextWorkItem(ctx context.Context) { + queueKey, quit := c.queue.Get() + if quit { + return + } + + key := queueKey.(Key) + defer c.queue.Done(key) + + syncCtx := Context{ + Context: ctx, + Name: c.Name(), + Key: key, + Queue: c.queueWrapper, + Recorder: c.recorder, + } + + err := c.sync(syncCtx) + c.handleKey(key, err) +} + +func (c *controller) handleKey(key Key, err error) { + if err == nil { + c.queue.Forget(key) + return + } + + retryForever := c.maxRetries <= 0 + shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries + + if !shouldRetry { + utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err)) + c.queue.Forget(key) + return + } + + if errors.Is(err, ErrSyntheticRequeue) { + // logging this helps detecting wedged controllers with missing pre-requirements + klog.V(4).InfoS("requested synthetic requeue", "controller", c.Name(), "key", key) + } else { + utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err)) + } + + c.queue.AddRateLimited(key) +} diff --git a/internal/controller/die.go b/internal/controller/die.go new file mode 100644 index 00000000..1fde150b --- /dev/null +++ b/internal/controller/die.go @@ -0,0 +1,15 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +type die string + +func crash(i interface{}) { + mustDie, ok := i.(die) + if ok { + panic(string(mustDie)) + } +} diff --git a/internal/controller/error.go b/internal/controller/error.go new file mode 100644 index 00000000..f636ba5b --- /dev/null +++ b/internal/controller/error.go @@ -0,0 +1,18 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +// ErrSyntheticRequeue can be returned from a Syncer to force a retry artificially for the current key. +// This can also be done by re-adding the key to queue, but this is more convenient and has better logging. +const ErrSyntheticRequeue = constErr("synthetic requeue request") + +var _ error = constErr("") + +type constErr string + +func (e constErr) Error() string { + return string(e) +} diff --git a/internal/controller/filter.go b/internal/controller/filter.go new file mode 100644 index 00000000..5d66752d --- /dev/null +++ b/internal/controller/filter.go @@ -0,0 +1,76 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" +) + +type Filter interface { + Add(obj metav1.Object) bool + Update(oldObj, newObj metav1.Object) bool + Delete(obj metav1.Object) bool + + Parent(obj metav1.Object) Key +} + +var _ Filter = FilterFuncs{} + +type ParentFunc func(obj metav1.Object) Key + +type FilterFuncs struct { + ParentFunc ParentFunc + AddFunc func(obj metav1.Object) bool + UpdateFunc func(oldObj, newObj metav1.Object) bool + DeleteFunc func(obj metav1.Object) bool +} + +func (f FilterFuncs) Parent(obj metav1.Object) Key { + if f.ParentFunc == nil { + return Key{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } + } + return f.ParentFunc(obj) +} + +func (f FilterFuncs) Add(obj metav1.Object) bool { + if f.AddFunc == nil { + return false + } + return f.AddFunc(obj) +} + +func (f FilterFuncs) Update(oldObj, newObj metav1.Object) bool { + if f.UpdateFunc == nil { + return false + } + return f.UpdateFunc(oldObj, newObj) +} + +func (f FilterFuncs) Delete(obj metav1.Object) bool { + if f.DeleteFunc == nil { + return false + } + return f.DeleteFunc(obj) +} + +func FilterByNames(parentFunc ParentFunc, names ...string) Filter { + set := sets.NewString(names...) + has := func(obj metav1.Object) bool { + return set.Has(obj.GetName()) + } + return FilterFuncs{ + ParentFunc: parentFunc, + AddFunc: has, + UpdateFunc: func(oldObj, newObj metav1.Object) bool { + return has(newObj) + }, + DeleteFunc: has, + } +} diff --git a/internal/controller/informer.go b/internal/controller/informer.go new file mode 100644 index 00000000..29568215 --- /dev/null +++ b/internal/controller/informer.go @@ -0,0 +1,25 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import "k8s.io/client-go/tools/cache" + +type InformerGetter interface { + Informer() cache.SharedIndexInformer +} + +type InformerOption struct { + SkipSync bool + SkipEvents bool + + // TODO maybe add a field like: + // ResyncPeriod time.Duration + // to support using AddEventHandlerWithResyncPeriod + // this field would be mutually exclusive with SkipEvents + // I suspect we do not need this level of flexibility and resyncs can mask bugs in controller logic + // A related change could be an Option such as WithResyncSchedule to allow for cron style control loops + // It is unclear to me if we would ever need that since we assume that all events come from a Kube watch +} diff --git a/internal/controller/manager.go b/internal/controller/manager.go new file mode 100644 index 00000000..c2bda641 --- /dev/null +++ b/internal/controller/manager.go @@ -0,0 +1,59 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "context" + "sync" + + "k8s.io/klog/v2" +) + +type Manager interface { + Start(ctx context.Context) + WithController(controller Controller, workers int) Manager +} + +func NewManager() Manager { + return &controllerManager{} +} + +// runnableController represents single controller runnable configuration. +type runnableController struct { + controller Controller + workers int +} + +type controllerManager struct { + controllers []runnableController +} + +var _ Manager = &controllerManager{} + +func (c *controllerManager) WithController(controller Controller, workers int) Manager { + c.controllers = append(c.controllers, runnableController{ + controller: controller, + workers: workers, + }) + return c +} + +// Start will run all managed controllers and block until all controllers shutdown. +// When the context passed is cancelled, all controllers are signalled to shutdown. +func (c *controllerManager) Start(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(len(c.controllers)) + for i := range c.controllers { + idx := i + go func() { + r := c.controllers[idx] + defer klog.InfoS("controller terminated", "controller", r.controller.Name()) + defer wg.Done() + r.controller.Run(ctx, r.workers) + }() + } + wg.Wait() +} diff --git a/internal/controller/option.go b/internal/controller/option.go new file mode 100644 index 00000000..36aa95ac --- /dev/null +++ b/internal/controller/option.go @@ -0,0 +1,154 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/events" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +type Option func(*controller) + +func WithMaxRetries(maxRetries int) Option { + return func(c *controller) { + c.maxRetries = maxRetries + } +} + +func WithInitialEvent(key Key) Option { + return toNaiveRunOpt(func(c *controller) { + c.queueWrapper.Add(key) + }) +} + +func WithRateLimiter(limiter workqueue.RateLimiter) Option { + return func(c *controller) { + c.queue = workqueue.NewNamedRateLimitingQueue(limiter, c.Name()) + c.queueWrapper = &queueWrapper{queue: c.queue} + } +} + +func WithRecorder(recorder events.EventRecorder) Option { + return func(c *controller) { + c.recorder = recorder + } +} + +func WithInformer(getter InformerGetter, filter Filter, opt InformerOption) Option { + informer := getter.Informer() // immediately signal that we intend to use this informer in case it is lazily initialized + return toRunOpt(func(c *controller) { + if opt.SkipSync && opt.SkipEvents { + panic(die("cannot skip syncing and event handlers at the same time")) + } + + if !opt.SkipSync { + c.cacheSyncs = append(c.cacheSyncs, informer.HasSynced) + } + + if opt.SkipEvents { + return + } + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + object := metaOrDie(obj) + if filter.Add(object) { + klog.V(4).InfoS("handling add", + "controller", c.Name(), + "namespace", object.GetNamespace(), + "name", object.GetName(), + "selfLink", object.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?) + "kind", fmt.Sprintf("%T", object), + ) + c.add(filter, object) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldObject := metaOrDie(oldObj) + newObject := metaOrDie(newObj) + if filter.Update(oldObject, newObject) { + klog.V(4).InfoS("handling update", + "controller", c.Name(), + "namespace", newObject.GetNamespace(), + "name", newObject.GetName(), + "selfLink", newObject.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?) + "kind", fmt.Sprintf("%T", newObject), + ) + c.add(filter, newObject) + } + }, + DeleteFunc: func(obj interface{}) { + accessor, err := meta.Accessor(obj) + if err != nil { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + //nolint: goerr113 + utilruntime.HandleError(fmt.Errorf("%s: could not get object from tombstone: %+v", c.Name(), obj)) + return + } + accessor, err = meta.Accessor(tombstone.Obj) + if err != nil { + //nolint: goerr113 + utilruntime.HandleError(fmt.Errorf("%s: tombstone contained object that is not an accessor: %+v", c.Name(), obj)) + return + } + } + if filter.Delete(accessor) { + klog.V(4).InfoS("handling delete", + "controller", c.Name(), + "namespace", accessor.GetNamespace(), + "name", accessor.GetName(), + "selfLink", accessor.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?) + "kind", fmt.Sprintf("%T", accessor), + ) + c.add(filter, accessor) + } + }, + }) + }) +} + +// toRunOpt guarantees that an Option only runs once on the first call to Run (and not New), even if a controller is stopped and restarted. +func toRunOpt(opt Option) Option { + return toOnceOpt(toNaiveRunOpt(opt)) +} + +// toNaiveRunOpt guarantees that an Option only runs on calls to Run (and not New), even if a controller is stopped and restarted. +func toNaiveRunOpt(opt Option) Option { + return func(c *controller) { + if c.run { + opt(c) + return + } + c.runOpts = append(c.runOpts, opt) + } +} + +// toOnceOpt guarantees that an Option only runs once. +func toOnceOpt(opt Option) Option { + var once sync.Once + return func(c *controller) { + once.Do(func() { + opt(c) + }) + } +} + +func metaOrDie(obj interface{}) metav1.Object { + accessor, err := meta.Accessor(obj) + if err != nil { + panic(err) // this should never happen + } + return accessor +} diff --git a/internal/controller/option_test.go b/internal/controller/option_test.go new file mode 100644 index 00000000..fa1d70a5 --- /dev/null +++ b/internal/controller/option_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "testing" + + "k8s.io/client-go/tools/cache" +) + +type getter bool + +func (g *getter) Informer() cache.SharedIndexInformer { + *g = true + return nil +} + +func TestInformerCalled(t *testing.T) { + g := getter(false) + _ = New(Config{}, WithInformer(&g, FilterByNames(nil), InformerOption{})) + if !g { + t.Error("expected InformerGetter.Informer() to be called") + } +} diff --git a/internal/controller/queue.go b/internal/controller/queue.go new file mode 100644 index 00000000..5abb8f32 --- /dev/null +++ b/internal/controller/queue.go @@ -0,0 +1,41 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "time" + + "k8s.io/client-go/util/workqueue" +) + +type Queue interface { + // Add immediately adds a key to the queue and marks it as needing processing. + Add(key Key) + + // AddRateLimited adds a key to the queue after the rate limiter says it is ok. + AddRateLimited(key Key) + + // AddAfter adds a key to the queue after the indicated duration has passed. + AddAfter(key Key, duration time.Duration) +} + +var _ Queue = &queueWrapper{} + +type queueWrapper struct { + queue workqueue.RateLimitingInterface +} + +func (q *queueWrapper) Add(key Key) { + q.queue.Add(key) +} + +func (q *queueWrapper) AddRateLimited(key Key) { + q.queue.AddRateLimited(key) +} + +func (q *queueWrapper) AddAfter(key Key, duration time.Duration) { + q.queue.AddAfter(key, duration) +} diff --git a/internal/controller/recorder.go b/internal/controller/recorder.go new file mode 100644 index 00000000..cd31545e --- /dev/null +++ b/internal/controller/recorder.go @@ -0,0 +1,29 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" +) + +var _ events.EventRecorder = klogRecorder{} + +type klogRecorder struct{} + +func (n klogRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { + klog.V(4).InfoS("recording event", + "regarding", regarding, + "related", related, + "eventtype", eventtype, + "reason", reason, + "action", action, + "message", fmt.Sprintf(note, args...), + ) +} diff --git a/internal/controller/sync.go b/internal/controller/sync.go new file mode 100644 index 00000000..6283daa2 --- /dev/null +++ b/internal/controller/sync.go @@ -0,0 +1,44 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "context" + + "k8s.io/client-go/tools/events" +) + +var _ Syncer = SyncFunc(nil) + +type Syncer interface { + Sync(ctx Context) error +} + +type SyncFunc func(ctx Context) error + +func (s SyncFunc) Sync(ctx Context) error { + return s(ctx) +} + +type Context struct { + Context context.Context + Name string + Key Key + Queue Queue + Recorder events.EventRecorder +} + +type Key struct { + Namespace string + Name string + + // TODO determine if it makes sense to add a field like: + // Extra interface{} + // This would allow a custom ParentFunc to pass extra data through to the Syncer + // The boxed type would have to be comparable (i.e. usable as a map key) +} + +type SyncWrapperFunc func(syncer Syncer) Syncer diff --git a/internal/controller/testing.go b/internal/controller/testing.go new file mode 100644 index 00000000..714a70a5 --- /dev/null +++ b/internal/controller/testing.go @@ -0,0 +1,18 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import "testing" + +func TestSync(t *testing.T, controller Controller, ctx Context) error { + t.Helper() // force testing import to discourage external use + return controller.sync(ctx) +} + +func TestWrap(t *testing.T, controller Controller, wrapper SyncWrapperFunc) { + t.Helper() // force testing import to discourage external use + controller.wrap(wrapper) +} diff --git a/test/integration/examplecontroller/api/api.go b/test/integration/examplecontroller/api/api.go new file mode 100644 index 00000000..1f3626bd --- /dev/null +++ b/test/integration/examplecontroller/api/api.go @@ -0,0 +1,20 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package api + +// Annotation on service. +const SecretNameAnnotation = "service.placeholder.io/secret-name" + +// Annotations on secret. +const ( + // ServiceUIDAnnotation is an annotation on a secret that indicates which service created it, by UID + ServiceUIDAnnotation = "service.placeholder.io/service-uid" + // ServiceNameAnnotation is an annotation on a secret that indicates which service created it, by Name + // to allow reverse lookups on services for comparison against UIDs + ServiceNameAnnotation = "service.placeholder.io/service-name" +) + +const SecretDataKey = "secret-data" diff --git a/test/integration/examplecontroller/controller/creating.go b/test/integration/examplecontroller/controller/creating.go new file mode 100644 index 00000000..45a1f77c --- /dev/null +++ b/test/integration/examplecontroller/controller/creating.go @@ -0,0 +1,182 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" + + "github.com/suzerain-io/placeholder-name/internal/controller" + "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" +) + +//nolint:funlen +func NewExampleCreatingController( + services corev1informers.ServiceInformer, + secrets corev1informers.SecretInformer, + secretClient corev1client.SecretsGetter, + recorder events.EventRecorder, + secretData string, +) controller.Controller { + serviceLister := services.Lister() + secretLister := secrets.Lister() + + // note that these functions do not need to be inlined + // this just demonstrates that for simple Syncer implementations, everything can be in one place + + requiresSecretGeneration := func(service *corev1.Service) (bool, error) { + // check the secret since it could not have been created yet + secretName := service.Annotations[api.SecretNameAnnotation] + if len(secretName) == 0 { + return false, nil + } + + secret, err := secretLister.Secrets(service.Namespace).Get(secretName) + if apierrors.IsNotFound(err) { + // we have not created the secret yet + return true, nil + } + if err != nil { + return false, fmt.Errorf("unable to get the secret %s/%s: %w", service.Namespace, secretName, err) + } + + if string(secret.Data[api.SecretDataKey]) == secretData { + return false, nil + } + + // the secret exists but the data does not match what we expect (i.e. we have new secretData now) + return true, nil + } + + generateSecret := func(service *corev1.Service) error { + klog.V(4).InfoS("generating new secret for service", "namespace", service.Namespace, "name", service.Name) + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: service.Annotations[api.SecretNameAnnotation], + Namespace: service.Namespace, + Annotations: map[string]string{ + api.ServiceUIDAnnotation: string(service.UID), + api.ServiceNameAnnotation: service.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Service", + Name: service.Name, + UID: service.UID, + }, + }, + Finalizers: nil, // TODO maybe add finalizer to guarantee we never miss a delete event? + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + api.SecretDataKey: []byte(secretData), + }, + } + + _, err := secretClient.Secrets(service.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) + if apierrors.IsAlreadyExists(err) { + actualSecret, getErr := secretClient.Secrets(service.Namespace).Get(context.TODO(), secret.Name, metav1.GetOptions{}) + if getErr != nil { + return getErr + } + + if actualSecret.Annotations[api.ServiceUIDAnnotation] != string(service.UID) { + //nolint: goerr113 + utilruntime.HandleError(fmt.Errorf("secret %s/%s does not have corresponding service UID %v", actualSecret.Namespace, actualSecret.Name, service.UID)) + return nil // drop from queue because we cannot safely update this secret + } + + klog.V(4).InfoS("updating data in existing secret", "namespace", secret.Namespace, "name", secret.Name) + // Actually update the secret in the regeneration case (the secret already exists but we want to update to new secretData). + _, updateErr := secretClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}) + return updateErr + } + if err != nil { + return fmt.Errorf("unable to create secret %s/%s: %w", secret.Namespace, secret.Name, err) + } + + return nil + } + + syncer := controller.SyncFunc(func(ctx controller.Context) error { + service, err := serviceLister.Services(ctx.Key.Namespace).Get(ctx.Key.Name) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("unable to get the service %s/%s: %w", service.Namespace, service.Name, err) + } + + ok, err := requiresSecretGeneration(service) + if err != nil || !ok { + return err + } + + return generateSecret(service) + }) + + config := controller.Config{ + Name: "example-controller-creating", + Syncer: syncer, + } + + toServiceName := func(secret *corev1.Secret) (string, bool) { + serviceName := secret.Annotations[api.ServiceNameAnnotation] + return serviceName, len(serviceName) != 0 + } + + hasSecretNameAnnotation := func(obj metav1.Object) bool { + return len(obj.GetAnnotations()[api.SecretNameAnnotation]) != 0 + } + hasSecretNameAnnotationUpdate := func(oldObj, newObj metav1.Object) bool { + return hasSecretNameAnnotation(newObj) || hasSecretNameAnnotation(oldObj) + } + + return controller.New(config, + controller.WithInformer(services, controller.FilterFuncs{ + AddFunc: hasSecretNameAnnotation, + UpdateFunc: hasSecretNameAnnotationUpdate, + }, controller.InformerOption{}), + + controller.WithInformer(secrets, controller.FilterFuncs{ + ParentFunc: func(obj metav1.Object) controller.Key { + secret := obj.(*corev1.Secret) + serviceName, _ := toServiceName(secret) + return controller.Key{Namespace: secret.Namespace, Name: serviceName} + }, + DeleteFunc: func(obj metav1.Object) bool { + secret := obj.(*corev1.Secret) + serviceName, ok := toServiceName(secret) + if !ok { + return false + } + service, err := serviceLister.Services(secret.Namespace).Get(serviceName) + if apierrors.IsNotFound(err) { + return false + } + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to get service %s/%s: %w", secret.Namespace, serviceName, err)) + return false + } + klog.V(4).InfoS("recreating secret", "namespace", service.Namespace, "name", service.Name) + return true + }, + }, controller.InformerOption{}), + + controller.WithRecorder(recorder), // TODO actually use the recorder + ) +} diff --git a/test/integration/examplecontroller/controller/creating_test.go b/test/integration/examplecontroller/controller/creating_test.go new file mode 100644 index 00000000..1bd37196 --- /dev/null +++ b/test/integration/examplecontroller/controller/creating_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + coretesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/events" + + "github.com/suzerain-io/placeholder-name/internal/controller" + "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" +) + +func TestNewExampleCreatingController(t *testing.T) { + secretsGVR := schema.GroupVersionResource{Version: "v1", Resource: "secrets"} + + type args struct { + services []*corev1.Service + secrets []*corev1.Secret + secretData string + } + type keyErr struct { + key controller.Key + err error + } + tests := []struct { + name string + args args + wantActions []coretesting.Action + wantKeyErrs []keyErr + }{ + { + name: "service has annotation but secret does not exist", + args: args{ + services: []*corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns-1", + Name: "service-1", + Annotations: map[string]string{ + api.SecretNameAnnotation: "secret-1", + }, + UID: "0001", + }, + }, + }, + secretData: "foo-secret-1", + }, + wantKeyErrs: []keyErr{ + { + key: controller.Key{ + Namespace: "ns-1", + Name: "service-1", + }, + err: nil, // we expect no error with this key + }, + }, + wantActions: []coretesting.Action{ + coretesting.NewCreateAction(secretsGVR, "ns-1", &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret-1", + Namespace: "ns-1", + Annotations: map[string]string{ + api.ServiceUIDAnnotation: "0001", + api.ServiceNameAnnotation: "service-1", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Service", + Name: "service-1", + UID: "0001", + }, + }, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + api.SecretDataKey: []byte("foo-secret-1"), + }, + }), + }, + }, + } + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + for i := range tt.args.services { + service := tt.args.services[i] + err := kubeClient.Tracker().Add(service) + require.NoError(t, err) + } + for i := range tt.args.secrets { + secret := tt.args.secrets[i] + err := kubeClient.Tracker().Add(secret) + require.NoError(t, err) + } + + recorder := events.NewEventBroadcasterAdapter(kubeClient).NewRecorder("example-controller") + kubeInformers := informers.NewSharedInformerFactory(kubeClient, 0) + + creatingController := NewExampleCreatingController( + kubeInformers.Core().V1().Services(), + kubeInformers.Core().V1().Secrets(), + kubeClient.CoreV1(), + recorder, + tt.args.secretData, + ) + + keyErrs := make(chan keyErr) + controller.TestWrap(t, creatingController, func(syncer controller.Syncer) controller.Syncer { + return controller.SyncFunc(func(ctx controller.Context) error { + err := syncer.Sync(ctx) + + keyErrs <- keyErr{ + key: ctx.Key, + err: err, + } + + return err + }) + }) + + // a different approach would be to use TestSync and run each iteration manually: + // + // err := controller.TestSync(t, c, ...) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + kubeInformers.Start(ctx.Done()) + go creatingController.Run(ctx, 5) // TODO maybe only use one worker? + + var actualKeyErrs []keyErr + done: + for { + select { + case key := <-keyErrs: + actualKeyErrs = append(actualKeyErrs, key) + + case <-time.After(3 * time.Second): + // this assumes that calls to Sync are never more than three seconds apart + // we have five workers so there is little chance they all hang around doing nothing for that long + break done + } + } + + // TODO: Figure out how to capture actions from informers + // TODO: I think we need some more fancy order independent equal comparison here + + require.Equal(t, tt.wantKeyErrs, actualKeyErrs) + + // ignore the discovery call from the event recorder and the list/watch from both informers (first five events) + require.Equal(t, tt.wantActions, kubeClient.Actions()[5:]) + }) + } +} diff --git a/test/integration/examplecontroller/controller/updating.go b/test/integration/examplecontroller/controller/updating.go new file mode 100644 index 00000000..816477ba --- /dev/null +++ b/test/integration/examplecontroller/controller/updating.go @@ -0,0 +1,149 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package controller + +import ( + "context" + "fmt" + "reflect" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/events" + + "github.com/suzerain-io/placeholder-name/internal/controller" + "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" +) + +func NewExampleUpdatingController( + services corev1informers.ServiceInformer, + secrets corev1informers.SecretInformer, + secretClient corev1client.SecretsGetter, + recorder events.EventRecorder, + secretData string, +) controller.Controller { + serviceLister := services.Lister() + secretLister := secrets.Lister() + + // note that these functions do not need to be inlined + // this just demonstrates that for simple Syncer implementations, everything can be in one place + + toServiceName := func(secret *corev1.Secret) (string, bool) { + serviceName := secret.Annotations[api.ServiceNameAnnotation] + return serviceName, len(serviceName) != 0 + } + + ensureSecretData := func(service *corev1.Service, secretCopy *corev1.Secret) bool { + var needsUpdate bool + + expectedData := map[string][]byte{ + api.SecretDataKey: []byte(secretData), + } + if !reflect.DeepEqual(secretCopy.Data, expectedData) { + secretCopy.Data = expectedData + needsUpdate = true + } + + expectedOwnerReferences := []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Service", + Name: service.Name, + UID: service.UID, + }, + } + if !reflect.DeepEqual(secretCopy.OwnerReferences, expectedOwnerReferences) { + secretCopy.OwnerReferences = expectedOwnerReferences + needsUpdate = true + } + + return needsUpdate + } + + isSecretValidForService := func(service *corev1.Service, secret *corev1.Secret) bool { + if service.Annotations[api.SecretNameAnnotation] != secret.Name { + return false + } + if secret.Annotations[api.ServiceUIDAnnotation] != string(service.UID) { + return false + } + return true + } + + getServiceForSecret := func(secret *corev1.Secret) (*corev1.Service, error) { + serviceName, ok := toServiceName(secret) + if !ok { + return nil, nil + } + service, err := serviceLister.Services(secret.Namespace).Get(serviceName) + if apierrors.IsNotFound(err) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("unable to get service %s/%s: %w", secret.Namespace, serviceName, err) + } + return service, nil + } + + syncer := controller.SyncFunc(func(ctx controller.Context) error { + secret, err := secretLister.Secrets(ctx.Key.Namespace).Get(ctx.Key.Name) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("unable to get the secret %s/%s: %w", secret.Namespace, secret.Name, err) + } + + service, err := getServiceForSecret(secret) + if err != nil || service == nil { + return err + } + + if !isSecretValidForService(service, secret) { + //nolint: goerr113 + utilruntime.HandleError(fmt.Errorf("secret %s/%s does not have corresponding service UID %v", secret.Namespace, secret.Name, service.UID)) + return nil + } + + // make a copy to avoid mutating cache state + secretCopy := secret.DeepCopy() + + if needsUpdate := ensureSecretData(service, secretCopy); needsUpdate { + _, updateErr := secretClient.Secrets(secretCopy.Namespace).Update(context.TODO(), secretCopy, metav1.UpdateOptions{}) + return updateErr + } + + return nil + }) + + config := controller.Config{ + Name: "example-controller-updating", + Syncer: syncer, + } + + addSecret := func(obj metav1.Object) bool { + secret := obj.(*corev1.Secret) + _, ok := toServiceName(secret) + return ok + } + + return controller.New(config, + controller.WithInformer(services, controller.FilterFuncs{}, controller.InformerOption{SkipEvents: true}), + + controller.WithInformer(secrets, controller.FilterFuncs{ + AddFunc: addSecret, + UpdateFunc: func(oldObj, newObj metav1.Object) bool { + return addSecret(newObj) || addSecret(oldObj) + }, + }, controller.InformerOption{}), + + controller.WithRecorder(recorder), // TODO actually use the recorder + ) +} diff --git a/test/integration/examplecontroller/starter/starter.go b/test/integration/examplecontroller/starter/starter.go new file mode 100644 index 00000000..d100b9ed --- /dev/null +++ b/test/integration/examplecontroller/starter/starter.go @@ -0,0 +1,56 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package starter + +import ( + "context" + "fmt" + "time" + + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" + + "github.com/suzerain-io/placeholder-name/internal/controller" + examplecontroller "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/controller" +) + +func StartExampleController(ctx context.Context, config *rest.Config, secretData string) error { + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("failed to build client: %w", err) + } + + kubeInformers := informers.NewSharedInformerFactory(kubeClient, 20*time.Minute) + + recorder := events.NewEventBroadcasterAdapter(kubeClient).NewRecorder("example-controller") + + manager := controller.NewManager(). + WithController( + examplecontroller.NewExampleCreatingController( + kubeInformers.Core().V1().Services(), + kubeInformers.Core().V1().Secrets(), + kubeClient.CoreV1(), + recorder, + secretData, + ), 5, + ). + WithController( + examplecontroller.NewExampleUpdatingController( + kubeInformers.Core().V1().Services(), + kubeInformers.Core().V1().Secrets(), + kubeClient.CoreV1(), + recorder, + secretData, + ), 5, + ) + + kubeInformers.Start(ctx.Done()) + go manager.Start(ctx) + + return nil +} diff --git a/test/integration/examplecontroller_test.go b/test/integration/examplecontroller_test.go new file mode 100644 index 00000000..c994f9a5 --- /dev/null +++ b/test/integration/examplecontroller_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2020 VMware, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package integration + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + + "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" + examplestart "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/starter" + "github.com/suzerain-io/placeholder-name/test/library" +) + +func TestExampleController(t *testing.T) { + config := library.NewClientConfig(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + secretData := "super-secret-data-1" + + err := examplestart.StartExampleController(ctx, config, secretData) + require.NoError(t, err) + + client := library.NewClientset(t) + + namespaces := client.CoreV1().Namespaces() + + namespace, err := namespaces.Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "example-controller-test-", + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + defer func() { + deleteErr := namespaces.Delete(context.Background(), namespace.Name, metav1.DeleteOptions{}) + require.NoError(t, deleteErr) + }() + + services := client.CoreV1().Services(namespace.Name) + secrets := client.CoreV1().Secrets(namespace.Name) + + secretsWatch, err := secrets.Watch(context.Background(), metav1.ListOptions{}) + require.NoError(t, err) + defer secretsWatch.Stop() + + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-service-test", + Annotations: map[string]string{ + api.SecretNameAnnotation: "example-secret-name", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Port: 443, + }, + }, + }, + } + _, err = services.Create(ctx, service, metav1.CreateOptions{}) + require.NoError(t, err) + + timeout := time.After(10 * time.Second) +done: + for { + select { + case event := <-secretsWatch.ResultChan(): + if event.Type != watch.Added { + continue + } + secret, ok := event.Object.(*corev1.Secret) + if !ok { + continue + } + if secret.Name != service.Annotations[api.SecretNameAnnotation] { + continue + } + + expectedData := map[string][]byte{ + api.SecretDataKey: []byte(secretData), + } + require.Equal(t, expectedData, secret.Data, "expected to see new secret data: %s", library.Sdump(secret)) + break done // immediately stop consuming events because we want to check for updated events below + + case <-timeout: + t.Fatal("timed out waiting to see new secret") + } + } + + // shutdown the controllers so we can change the secret data + cancel() + time.Sleep(5 * time.Second) // wait a bit for the controllers to shut down + + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + secretData2 := "super-secret-data-2" + + err = examplestart.StartExampleController(ctx, config, secretData2) + require.NoError(t, err) + + timeout = time.After(10 * time.Second) +done2: + for { + select { + case event := <-secretsWatch.ResultChan(): + if event.Type != watch.Modified { + continue + } + secret, ok := event.Object.(*corev1.Secret) + if !ok { + continue + } + if secret.Name != service.Annotations[api.SecretNameAnnotation] { + continue + } + + expectedData := map[string][]byte{ + api.SecretDataKey: []byte(secretData2), + } + require.Equal(t, expectedData, secret.Data, "expected to see updated secret data: %s", library.Sdump(secret)) + break done2 // immediately stop consuming events because we want to check for hot loops below + + case <-timeout: + t.Fatal("timed out waiting to see updated secret") + } + } + + timeout = time.After(5 * time.Second) +done3: + for { + select { + case event := <-secretsWatch.ResultChan(): + secret, ok := event.Object.(*corev1.Secret) + if !ok { + continue + } + if secret.Name != service.Annotations[api.SecretNameAnnotation] { + continue + } + + // this assumes that no other actor in the system is trying to mutate this secret + t.Errorf("unexpected event seen for secret: %s", library.Sdump(event)) + + case <-timeout: + break done3 // we saw no events matching our secret meaning that we are not hot looping + } + } +}