diff --git a/internal/controller/controller.go b/internal/controller/controller.go deleted file mode 100644 index b1bd9c6c..00000000 --- a/internal/controller/controller.go +++ /dev/null @@ -1,216 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/events" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" -) - -// Controller interface represents a runnable Kubernetes controller. -// Cancelling the context passed will cause the controller to shutdown. -// Number of workers determine how much parallel the job processing should be. -type Controller interface { - // Run runs the controller and blocks until the controller is finished. - // Number of workers can be specified via workers parameter. - // This function will return when all internal loops are finished. - // Note that having more than one worker usually means handing parallelization of Sync(). - Run(ctx context.Context, workers int) - - // Name returns the controller name string. - Name() string - - // The methods below should only be called during tests via the Test* functions. - - // sync contains the main controller logic. - // This can be used in unit tests to exercise the Syncer by directly calling it. - sync(ctx Context) error - - // wrap wraps the main controller logic provided via the Syncer. - // This can be used in tests to synchronize asynchronous events as seen by a running controller. - // The wrapping must be done after New is called and before Run is called. - wrap(wrapper SyncWrapperFunc) -} - -var _ Controller = &controller{} - -type Config struct { - Name string - Syncer Syncer -} - -func New(config Config, opts ...Option) Controller { - c := &controller{ - config: config, - } - - // set up defaults - WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c) - WithRecorder(klogRecorder{})(c) - - for _, opt := range opts { - opt(c) - } - - return c -} - -type controller struct { - config Config - - queue workqueue.RateLimitingInterface - queueWrapper Queue - maxRetries int - recorder events.EventRecorder - - run bool - runOpts []Option - - cacheSyncs []cache.InformerSynced -} - -func (c *controller) Run(ctx context.Context, workers int) { - defer utilruntime.HandleCrash(crash) // prevent panics from killing the process - - klog.InfoS("starting controller", "controller", c.Name(), "workers", workers) - - c.run = true - for _, opt := range c.runOpts { - opt(c) - } - - if !c.waitForCacheSyncWithTimeout() { - panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name()))) - } - - var workerWg sync.WaitGroup - - // workerContext is used to track and initiate worker shutdown - workerContext, workerContextCancel := context.WithCancel(context.Background()) - - defer func() { - c.queue.ShutDown() // shutdown the controller queue first - workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown - - // Wait for all workers to finish their job. - // at this point the Run() can hang and callers have to implement the logic that will kill - // this controller (SIGKILL). - workerWg.Wait() - klog.InfoS("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers) - }() - - for i := 1; i <= workers; i++ { - idx := i - klog.InfoS("starting worker", "controller", c.Name(), "worker", idx) - workerWg.Add(1) - go func() { - defer utilruntime.HandleCrash(crash) // prevent panics from killing the process - defer func() { - klog.InfoS("shutting down worker", "controller", c.Name(), "worker", idx) - workerWg.Done() - }() - c.runWorker(workerContext) - }() - } - - <-ctx.Done() // wait for controller context to be cancelled -} - -func (c *controller) Name() string { - return c.config.Name -} - -func (c *controller) sync(ctx Context) error { - return c.config.Syncer.Sync(ctx) -} - -func (c *controller) wrap(wrapper SyncWrapperFunc) { - c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) { - c.config.Syncer = wrapper(c.config.Syncer) - })) -} - -func (c *controller) waitForCacheSyncWithTimeout() bool { - // prevent us from blocking forever due to a broken informer - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - defer cancel() - - return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...) -} - -func (c *controller) add(filter Filter, object metav1.Object) { - key := filter.Parent(object) - c.queueWrapper.Add(key) -} - -// runWorker runs a single worker -// The worker is asked to terminate when the passed context is cancelled. -func (c *controller) runWorker(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - c.processNextWorkItem(ctx) - } - } -} - -func (c *controller) processNextWorkItem(ctx context.Context) { - queueKey, quit := c.queue.Get() - if quit { - return - } - - key := queueKey.(Key) - defer c.queue.Done(key) - - syncCtx := Context{ - Context: ctx, - Name: c.Name(), - Key: key, - Queue: c.queueWrapper, - Recorder: c.recorder, - } - - err := c.sync(syncCtx) - c.handleKey(key, err) -} - -func (c *controller) handleKey(key Key, err error) { - if err == nil { - c.queue.Forget(key) - return - } - - retryForever := c.maxRetries <= 0 - shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries - - if !shouldRetry { - utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err)) - c.queue.Forget(key) - return - } - - if errors.Is(err, ErrSyntheticRequeue) { - // logging this helps detecting wedged controllers with missing pre-requirements - klog.V(4).InfoS("requested synthetic requeue", "controller", c.Name(), "key", key) - } else { - utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err)) - } - - c.queue.AddRateLimited(key) -} diff --git a/internal/controller/die.go b/internal/controller/die.go deleted file mode 100644 index 1fde150b..00000000 --- a/internal/controller/die.go +++ /dev/null @@ -1,15 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -type die string - -func crash(i interface{}) { - mustDie, ok := i.(die) - if ok { - panic(string(mustDie)) - } -} diff --git a/internal/controller/error.go b/internal/controller/error.go deleted file mode 100644 index f636ba5b..00000000 --- a/internal/controller/error.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -// ErrSyntheticRequeue can be returned from a Syncer to force a retry artificially for the current key. -// This can also be done by re-adding the key to queue, but this is more convenient and has better logging. -const ErrSyntheticRequeue = constErr("synthetic requeue request") - -var _ error = constErr("") - -type constErr string - -func (e constErr) Error() string { - return string(e) -} diff --git a/internal/controller/filter.go b/internal/controller/filter.go deleted file mode 100644 index 5d66752d..00000000 --- a/internal/controller/filter.go +++ /dev/null @@ -1,76 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" -) - -type Filter interface { - Add(obj metav1.Object) bool - Update(oldObj, newObj metav1.Object) bool - Delete(obj metav1.Object) bool - - Parent(obj metav1.Object) Key -} - -var _ Filter = FilterFuncs{} - -type ParentFunc func(obj metav1.Object) Key - -type FilterFuncs struct { - ParentFunc ParentFunc - AddFunc func(obj metav1.Object) bool - UpdateFunc func(oldObj, newObj metav1.Object) bool - DeleteFunc func(obj metav1.Object) bool -} - -func (f FilterFuncs) Parent(obj metav1.Object) Key { - if f.ParentFunc == nil { - return Key{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - } - } - return f.ParentFunc(obj) -} - -func (f FilterFuncs) Add(obj metav1.Object) bool { - if f.AddFunc == nil { - return false - } - return f.AddFunc(obj) -} - -func (f FilterFuncs) Update(oldObj, newObj metav1.Object) bool { - if f.UpdateFunc == nil { - return false - } - return f.UpdateFunc(oldObj, newObj) -} - -func (f FilterFuncs) Delete(obj metav1.Object) bool { - if f.DeleteFunc == nil { - return false - } - return f.DeleteFunc(obj) -} - -func FilterByNames(parentFunc ParentFunc, names ...string) Filter { - set := sets.NewString(names...) - has := func(obj metav1.Object) bool { - return set.Has(obj.GetName()) - } - return FilterFuncs{ - ParentFunc: parentFunc, - AddFunc: has, - UpdateFunc: func(oldObj, newObj metav1.Object) bool { - return has(newObj) - }, - DeleteFunc: has, - } -} diff --git a/internal/controller/informer.go b/internal/controller/informer.go deleted file mode 100644 index 29568215..00000000 --- a/internal/controller/informer.go +++ /dev/null @@ -1,25 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import "k8s.io/client-go/tools/cache" - -type InformerGetter interface { - Informer() cache.SharedIndexInformer -} - -type InformerOption struct { - SkipSync bool - SkipEvents bool - - // TODO maybe add a field like: - // ResyncPeriod time.Duration - // to support using AddEventHandlerWithResyncPeriod - // this field would be mutually exclusive with SkipEvents - // I suspect we do not need this level of flexibility and resyncs can mask bugs in controller logic - // A related change could be an Option such as WithResyncSchedule to allow for cron style control loops - // It is unclear to me if we would ever need that since we assume that all events come from a Kube watch -} diff --git a/internal/controller/manager.go b/internal/controller/manager.go deleted file mode 100644 index c2bda641..00000000 --- a/internal/controller/manager.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "context" - "sync" - - "k8s.io/klog/v2" -) - -type Manager interface { - Start(ctx context.Context) - WithController(controller Controller, workers int) Manager -} - -func NewManager() Manager { - return &controllerManager{} -} - -// runnableController represents single controller runnable configuration. -type runnableController struct { - controller Controller - workers int -} - -type controllerManager struct { - controllers []runnableController -} - -var _ Manager = &controllerManager{} - -func (c *controllerManager) WithController(controller Controller, workers int) Manager { - c.controllers = append(c.controllers, runnableController{ - controller: controller, - workers: workers, - }) - return c -} - -// Start will run all managed controllers and block until all controllers shutdown. -// When the context passed is cancelled, all controllers are signalled to shutdown. -func (c *controllerManager) Start(ctx context.Context) { - var wg sync.WaitGroup - wg.Add(len(c.controllers)) - for i := range c.controllers { - idx := i - go func() { - r := c.controllers[idx] - defer klog.InfoS("controller terminated", "controller", r.controller.Name()) - defer wg.Done() - r.controller.Run(ctx, r.workers) - }() - } - wg.Wait() -} diff --git a/internal/controller/option.go b/internal/controller/option.go deleted file mode 100644 index 36aa95ac..00000000 --- a/internal/controller/option.go +++ /dev/null @@ -1,154 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "fmt" - "sync" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/events" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" -) - -type Option func(*controller) - -func WithMaxRetries(maxRetries int) Option { - return func(c *controller) { - c.maxRetries = maxRetries - } -} - -func WithInitialEvent(key Key) Option { - return toNaiveRunOpt(func(c *controller) { - c.queueWrapper.Add(key) - }) -} - -func WithRateLimiter(limiter workqueue.RateLimiter) Option { - return func(c *controller) { - c.queue = workqueue.NewNamedRateLimitingQueue(limiter, c.Name()) - c.queueWrapper = &queueWrapper{queue: c.queue} - } -} - -func WithRecorder(recorder events.EventRecorder) Option { - return func(c *controller) { - c.recorder = recorder - } -} - -func WithInformer(getter InformerGetter, filter Filter, opt InformerOption) Option { - informer := getter.Informer() // immediately signal that we intend to use this informer in case it is lazily initialized - return toRunOpt(func(c *controller) { - if opt.SkipSync && opt.SkipEvents { - panic(die("cannot skip syncing and event handlers at the same time")) - } - - if !opt.SkipSync { - c.cacheSyncs = append(c.cacheSyncs, informer.HasSynced) - } - - if opt.SkipEvents { - return - } - - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - object := metaOrDie(obj) - if filter.Add(object) { - klog.V(4).InfoS("handling add", - "controller", c.Name(), - "namespace", object.GetNamespace(), - "name", object.GetName(), - "selfLink", object.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?) - "kind", fmt.Sprintf("%T", object), - ) - c.add(filter, object) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldObject := metaOrDie(oldObj) - newObject := metaOrDie(newObj) - if filter.Update(oldObject, newObject) { - klog.V(4).InfoS("handling update", - "controller", c.Name(), - "namespace", newObject.GetNamespace(), - "name", newObject.GetName(), - "selfLink", newObject.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?) - "kind", fmt.Sprintf("%T", newObject), - ) - c.add(filter, newObject) - } - }, - DeleteFunc: func(obj interface{}) { - accessor, err := meta.Accessor(obj) - if err != nil { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - //nolint: goerr113 - utilruntime.HandleError(fmt.Errorf("%s: could not get object from tombstone: %+v", c.Name(), obj)) - return - } - accessor, err = meta.Accessor(tombstone.Obj) - if err != nil { - //nolint: goerr113 - utilruntime.HandleError(fmt.Errorf("%s: tombstone contained object that is not an accessor: %+v", c.Name(), obj)) - return - } - } - if filter.Delete(accessor) { - klog.V(4).InfoS("handling delete", - "controller", c.Name(), - "namespace", accessor.GetNamespace(), - "name", accessor.GetName(), - "selfLink", accessor.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?) - "kind", fmt.Sprintf("%T", accessor), - ) - c.add(filter, accessor) - } - }, - }) - }) -} - -// toRunOpt guarantees that an Option only runs once on the first call to Run (and not New), even if a controller is stopped and restarted. -func toRunOpt(opt Option) Option { - return toOnceOpt(toNaiveRunOpt(opt)) -} - -// toNaiveRunOpt guarantees that an Option only runs on calls to Run (and not New), even if a controller is stopped and restarted. -func toNaiveRunOpt(opt Option) Option { - return func(c *controller) { - if c.run { - opt(c) - return - } - c.runOpts = append(c.runOpts, opt) - } -} - -// toOnceOpt guarantees that an Option only runs once. -func toOnceOpt(opt Option) Option { - var once sync.Once - return func(c *controller) { - once.Do(func() { - opt(c) - }) - } -} - -func metaOrDie(obj interface{}) metav1.Object { - accessor, err := meta.Accessor(obj) - if err != nil { - panic(err) // this should never happen - } - return accessor -} diff --git a/internal/controller/option_test.go b/internal/controller/option_test.go deleted file mode 100644 index fa1d70a5..00000000 --- a/internal/controller/option_test.go +++ /dev/null @@ -1,27 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "testing" - - "k8s.io/client-go/tools/cache" -) - -type getter bool - -func (g *getter) Informer() cache.SharedIndexInformer { - *g = true - return nil -} - -func TestInformerCalled(t *testing.T) { - g := getter(false) - _ = New(Config{}, WithInformer(&g, FilterByNames(nil), InformerOption{})) - if !g { - t.Error("expected InformerGetter.Informer() to be called") - } -} diff --git a/internal/controller/queue.go b/internal/controller/queue.go deleted file mode 100644 index 5abb8f32..00000000 --- a/internal/controller/queue.go +++ /dev/null @@ -1,41 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "time" - - "k8s.io/client-go/util/workqueue" -) - -type Queue interface { - // Add immediately adds a key to the queue and marks it as needing processing. - Add(key Key) - - // AddRateLimited adds a key to the queue after the rate limiter says it is ok. - AddRateLimited(key Key) - - // AddAfter adds a key to the queue after the indicated duration has passed. - AddAfter(key Key, duration time.Duration) -} - -var _ Queue = &queueWrapper{} - -type queueWrapper struct { - queue workqueue.RateLimitingInterface -} - -func (q *queueWrapper) Add(key Key) { - q.queue.Add(key) -} - -func (q *queueWrapper) AddRateLimited(key Key) { - q.queue.AddRateLimited(key) -} - -func (q *queueWrapper) AddAfter(key Key, duration time.Duration) { - q.queue.AddAfter(key, duration) -} diff --git a/internal/controller/recorder.go b/internal/controller/recorder.go deleted file mode 100644 index cd31545e..00000000 --- a/internal/controller/recorder.go +++ /dev/null @@ -1,29 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/events" - "k8s.io/klog/v2" -) - -var _ events.EventRecorder = klogRecorder{} - -type klogRecorder struct{} - -func (n klogRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { - klog.V(4).InfoS("recording event", - "regarding", regarding, - "related", related, - "eventtype", eventtype, - "reason", reason, - "action", action, - "message", fmt.Sprintf(note, args...), - ) -} diff --git a/internal/controller/sync.go b/internal/controller/sync.go deleted file mode 100644 index 6283daa2..00000000 --- a/internal/controller/sync.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "context" - - "k8s.io/client-go/tools/events" -) - -var _ Syncer = SyncFunc(nil) - -type Syncer interface { - Sync(ctx Context) error -} - -type SyncFunc func(ctx Context) error - -func (s SyncFunc) Sync(ctx Context) error { - return s(ctx) -} - -type Context struct { - Context context.Context - Name string - Key Key - Queue Queue - Recorder events.EventRecorder -} - -type Key struct { - Namespace string - Name string - - // TODO determine if it makes sense to add a field like: - // Extra interface{} - // This would allow a custom ParentFunc to pass extra data through to the Syncer - // The boxed type would have to be comparable (i.e. usable as a map key) -} - -type SyncWrapperFunc func(syncer Syncer) Syncer diff --git a/internal/controller/testing.go b/internal/controller/testing.go deleted file mode 100644 index 714a70a5..00000000 --- a/internal/controller/testing.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import "testing" - -func TestSync(t *testing.T, controller Controller, ctx Context) error { - t.Helper() // force testing import to discourage external use - return controller.sync(ctx) -} - -func TestWrap(t *testing.T, controller Controller, wrapper SyncWrapperFunc) { - t.Helper() // force testing import to discourage external use - controller.wrap(wrapper) -} diff --git a/test/integration/examplecontroller/api/api.go b/test/integration/examplecontroller/api/api.go deleted file mode 100644 index 1f3626bd..00000000 --- a/test/integration/examplecontroller/api/api.go +++ /dev/null @@ -1,20 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package api - -// Annotation on service. -const SecretNameAnnotation = "service.placeholder.io/secret-name" - -// Annotations on secret. -const ( - // ServiceUIDAnnotation is an annotation on a secret that indicates which service created it, by UID - ServiceUIDAnnotation = "service.placeholder.io/service-uid" - // ServiceNameAnnotation is an annotation on a secret that indicates which service created it, by Name - // to allow reverse lookups on services for comparison against UIDs - ServiceNameAnnotation = "service.placeholder.io/service-name" -) - -const SecretDataKey = "secret-data" diff --git a/test/integration/examplecontroller/controller/creating.go b/test/integration/examplecontroller/controller/creating.go deleted file mode 100644 index 45a1f77c..00000000 --- a/test/integration/examplecontroller/controller/creating.go +++ /dev/null @@ -1,182 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "context" - "fmt" - - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - corev1informers "k8s.io/client-go/informers/core/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/events" - "k8s.io/klog/v2" - - "github.com/suzerain-io/placeholder-name/internal/controller" - "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" -) - -//nolint:funlen -func NewExampleCreatingController( - services corev1informers.ServiceInformer, - secrets corev1informers.SecretInformer, - secretClient corev1client.SecretsGetter, - recorder events.EventRecorder, - secretData string, -) controller.Controller { - serviceLister := services.Lister() - secretLister := secrets.Lister() - - // note that these functions do not need to be inlined - // this just demonstrates that for simple Syncer implementations, everything can be in one place - - requiresSecretGeneration := func(service *corev1.Service) (bool, error) { - // check the secret since it could not have been created yet - secretName := service.Annotations[api.SecretNameAnnotation] - if len(secretName) == 0 { - return false, nil - } - - secret, err := secretLister.Secrets(service.Namespace).Get(secretName) - if apierrors.IsNotFound(err) { - // we have not created the secret yet - return true, nil - } - if err != nil { - return false, fmt.Errorf("unable to get the secret %s/%s: %w", service.Namespace, secretName, err) - } - - if string(secret.Data[api.SecretDataKey]) == secretData { - return false, nil - } - - // the secret exists but the data does not match what we expect (i.e. we have new secretData now) - return true, nil - } - - generateSecret := func(service *corev1.Service) error { - klog.V(4).InfoS("generating new secret for service", "namespace", service.Namespace, "name", service.Name) - - secret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: service.Annotations[api.SecretNameAnnotation], - Namespace: service.Namespace, - Annotations: map[string]string{ - api.ServiceUIDAnnotation: string(service.UID), - api.ServiceNameAnnotation: service.Name, - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Service", - Name: service.Name, - UID: service.UID, - }, - }, - Finalizers: nil, // TODO maybe add finalizer to guarantee we never miss a delete event? - }, - Type: corev1.SecretTypeOpaque, - Data: map[string][]byte{ - api.SecretDataKey: []byte(secretData), - }, - } - - _, err := secretClient.Secrets(service.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) - if apierrors.IsAlreadyExists(err) { - actualSecret, getErr := secretClient.Secrets(service.Namespace).Get(context.TODO(), secret.Name, metav1.GetOptions{}) - if getErr != nil { - return getErr - } - - if actualSecret.Annotations[api.ServiceUIDAnnotation] != string(service.UID) { - //nolint: goerr113 - utilruntime.HandleError(fmt.Errorf("secret %s/%s does not have corresponding service UID %v", actualSecret.Namespace, actualSecret.Name, service.UID)) - return nil // drop from queue because we cannot safely update this secret - } - - klog.V(4).InfoS("updating data in existing secret", "namespace", secret.Namespace, "name", secret.Name) - // Actually update the secret in the regeneration case (the secret already exists but we want to update to new secretData). - _, updateErr := secretClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}) - return updateErr - } - if err != nil { - return fmt.Errorf("unable to create secret %s/%s: %w", secret.Namespace, secret.Name, err) - } - - return nil - } - - syncer := controller.SyncFunc(func(ctx controller.Context) error { - service, err := serviceLister.Services(ctx.Key.Namespace).Get(ctx.Key.Name) - if apierrors.IsNotFound(err) { - return nil - } - if err != nil { - return fmt.Errorf("unable to get the service %s/%s: %w", service.Namespace, service.Name, err) - } - - ok, err := requiresSecretGeneration(service) - if err != nil || !ok { - return err - } - - return generateSecret(service) - }) - - config := controller.Config{ - Name: "example-controller-creating", - Syncer: syncer, - } - - toServiceName := func(secret *corev1.Secret) (string, bool) { - serviceName := secret.Annotations[api.ServiceNameAnnotation] - return serviceName, len(serviceName) != 0 - } - - hasSecretNameAnnotation := func(obj metav1.Object) bool { - return len(obj.GetAnnotations()[api.SecretNameAnnotation]) != 0 - } - hasSecretNameAnnotationUpdate := func(oldObj, newObj metav1.Object) bool { - return hasSecretNameAnnotation(newObj) || hasSecretNameAnnotation(oldObj) - } - - return controller.New(config, - controller.WithInformer(services, controller.FilterFuncs{ - AddFunc: hasSecretNameAnnotation, - UpdateFunc: hasSecretNameAnnotationUpdate, - }, controller.InformerOption{}), - - controller.WithInformer(secrets, controller.FilterFuncs{ - ParentFunc: func(obj metav1.Object) controller.Key { - secret := obj.(*corev1.Secret) - serviceName, _ := toServiceName(secret) - return controller.Key{Namespace: secret.Namespace, Name: serviceName} - }, - DeleteFunc: func(obj metav1.Object) bool { - secret := obj.(*corev1.Secret) - serviceName, ok := toServiceName(secret) - if !ok { - return false - } - service, err := serviceLister.Services(secret.Namespace).Get(serviceName) - if apierrors.IsNotFound(err) { - return false - } - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to get service %s/%s: %w", secret.Namespace, serviceName, err)) - return false - } - klog.V(4).InfoS("recreating secret", "namespace", service.Namespace, "name", service.Name) - return true - }, - }, controller.InformerOption{}), - - controller.WithRecorder(recorder), // TODO actually use the recorder - ) -} diff --git a/test/integration/examplecontroller/controller/creating_test.go b/test/integration/examplecontroller/controller/creating_test.go deleted file mode 100644 index 1bd37196..00000000 --- a/test/integration/examplecontroller/controller/creating_test.go +++ /dev/null @@ -1,170 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" - coretesting "k8s.io/client-go/testing" - "k8s.io/client-go/tools/events" - - "github.com/suzerain-io/placeholder-name/internal/controller" - "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" -) - -func TestNewExampleCreatingController(t *testing.T) { - secretsGVR := schema.GroupVersionResource{Version: "v1", Resource: "secrets"} - - type args struct { - services []*corev1.Service - secrets []*corev1.Secret - secretData string - } - type keyErr struct { - key controller.Key - err error - } - tests := []struct { - name string - args args - wantActions []coretesting.Action - wantKeyErrs []keyErr - }{ - { - name: "service has annotation but secret does not exist", - args: args{ - services: []*corev1.Service{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns-1", - Name: "service-1", - Annotations: map[string]string{ - api.SecretNameAnnotation: "secret-1", - }, - UID: "0001", - }, - }, - }, - secretData: "foo-secret-1", - }, - wantKeyErrs: []keyErr{ - { - key: controller.Key{ - Namespace: "ns-1", - Name: "service-1", - }, - err: nil, // we expect no error with this key - }, - }, - wantActions: []coretesting.Action{ - coretesting.NewCreateAction(secretsGVR, "ns-1", &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "secret-1", - Namespace: "ns-1", - Annotations: map[string]string{ - api.ServiceUIDAnnotation: "0001", - api.ServiceNameAnnotation: "service-1", - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Service", - Name: "service-1", - UID: "0001", - }, - }, - }, - Type: corev1.SecretTypeOpaque, - Data: map[string][]byte{ - api.SecretDataKey: []byte("foo-secret-1"), - }, - }), - }, - }, - } - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - kubeClient := fake.NewSimpleClientset() - for i := range tt.args.services { - service := tt.args.services[i] - err := kubeClient.Tracker().Add(service) - require.NoError(t, err) - } - for i := range tt.args.secrets { - secret := tt.args.secrets[i] - err := kubeClient.Tracker().Add(secret) - require.NoError(t, err) - } - - recorder := events.NewEventBroadcasterAdapter(kubeClient).NewRecorder("example-controller") - kubeInformers := informers.NewSharedInformerFactory(kubeClient, 0) - - creatingController := NewExampleCreatingController( - kubeInformers.Core().V1().Services(), - kubeInformers.Core().V1().Secrets(), - kubeClient.CoreV1(), - recorder, - tt.args.secretData, - ) - - keyErrs := make(chan keyErr) - controller.TestWrap(t, creatingController, func(syncer controller.Syncer) controller.Syncer { - return controller.SyncFunc(func(ctx controller.Context) error { - err := syncer.Sync(ctx) - - keyErrs <- keyErr{ - key: ctx.Key, - err: err, - } - - return err - }) - }) - - // a different approach would be to use TestSync and run each iteration manually: - // - // err := controller.TestSync(t, c, ...) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - kubeInformers.Start(ctx.Done()) - go creatingController.Run(ctx, 5) // TODO maybe only use one worker? - - var actualKeyErrs []keyErr - done: - for { - select { - case key := <-keyErrs: - actualKeyErrs = append(actualKeyErrs, key) - - case <-time.After(3 * time.Second): - // this assumes that calls to Sync are never more than three seconds apart - // we have five workers so there is little chance they all hang around doing nothing for that long - break done - } - } - - // TODO: Figure out how to capture actions from informers - // TODO: I think we need some more fancy order independent equal comparison here - - require.Equal(t, tt.wantKeyErrs, actualKeyErrs) - - // ignore the discovery call from the event recorder and the list/watch from both informers (first five events) - require.Equal(t, tt.wantActions, kubeClient.Actions()[5:]) - }) - } -} diff --git a/test/integration/examplecontroller/controller/updating.go b/test/integration/examplecontroller/controller/updating.go deleted file mode 100644 index 816477ba..00000000 --- a/test/integration/examplecontroller/controller/updating.go +++ /dev/null @@ -1,149 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package controller - -import ( - "context" - "fmt" - "reflect" - - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - corev1informers "k8s.io/client-go/informers/core/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/events" - - "github.com/suzerain-io/placeholder-name/internal/controller" - "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" -) - -func NewExampleUpdatingController( - services corev1informers.ServiceInformer, - secrets corev1informers.SecretInformer, - secretClient corev1client.SecretsGetter, - recorder events.EventRecorder, - secretData string, -) controller.Controller { - serviceLister := services.Lister() - secretLister := secrets.Lister() - - // note that these functions do not need to be inlined - // this just demonstrates that for simple Syncer implementations, everything can be in one place - - toServiceName := func(secret *corev1.Secret) (string, bool) { - serviceName := secret.Annotations[api.ServiceNameAnnotation] - return serviceName, len(serviceName) != 0 - } - - ensureSecretData := func(service *corev1.Service, secretCopy *corev1.Secret) bool { - var needsUpdate bool - - expectedData := map[string][]byte{ - api.SecretDataKey: []byte(secretData), - } - if !reflect.DeepEqual(secretCopy.Data, expectedData) { - secretCopy.Data = expectedData - needsUpdate = true - } - - expectedOwnerReferences := []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Service", - Name: service.Name, - UID: service.UID, - }, - } - if !reflect.DeepEqual(secretCopy.OwnerReferences, expectedOwnerReferences) { - secretCopy.OwnerReferences = expectedOwnerReferences - needsUpdate = true - } - - return needsUpdate - } - - isSecretValidForService := func(service *corev1.Service, secret *corev1.Secret) bool { - if service.Annotations[api.SecretNameAnnotation] != secret.Name { - return false - } - if secret.Annotations[api.ServiceUIDAnnotation] != string(service.UID) { - return false - } - return true - } - - getServiceForSecret := func(secret *corev1.Secret) (*corev1.Service, error) { - serviceName, ok := toServiceName(secret) - if !ok { - return nil, nil - } - service, err := serviceLister.Services(secret.Namespace).Get(serviceName) - if apierrors.IsNotFound(err) { - return nil, nil - } - if err != nil { - return nil, fmt.Errorf("unable to get service %s/%s: %w", secret.Namespace, serviceName, err) - } - return service, nil - } - - syncer := controller.SyncFunc(func(ctx controller.Context) error { - secret, err := secretLister.Secrets(ctx.Key.Namespace).Get(ctx.Key.Name) - if apierrors.IsNotFound(err) { - return nil - } - if err != nil { - return fmt.Errorf("unable to get the secret %s/%s: %w", secret.Namespace, secret.Name, err) - } - - service, err := getServiceForSecret(secret) - if err != nil || service == nil { - return err - } - - if !isSecretValidForService(service, secret) { - //nolint: goerr113 - utilruntime.HandleError(fmt.Errorf("secret %s/%s does not have corresponding service UID %v", secret.Namespace, secret.Name, service.UID)) - return nil - } - - // make a copy to avoid mutating cache state - secretCopy := secret.DeepCopy() - - if needsUpdate := ensureSecretData(service, secretCopy); needsUpdate { - _, updateErr := secretClient.Secrets(secretCopy.Namespace).Update(context.TODO(), secretCopy, metav1.UpdateOptions{}) - return updateErr - } - - return nil - }) - - config := controller.Config{ - Name: "example-controller-updating", - Syncer: syncer, - } - - addSecret := func(obj metav1.Object) bool { - secret := obj.(*corev1.Secret) - _, ok := toServiceName(secret) - return ok - } - - return controller.New(config, - controller.WithInformer(services, controller.FilterFuncs{}, controller.InformerOption{SkipEvents: true}), - - controller.WithInformer(secrets, controller.FilterFuncs{ - AddFunc: addSecret, - UpdateFunc: func(oldObj, newObj metav1.Object) bool { - return addSecret(newObj) || addSecret(oldObj) - }, - }, controller.InformerOption{}), - - controller.WithRecorder(recorder), // TODO actually use the recorder - ) -} diff --git a/test/integration/examplecontroller/starter/starter.go b/test/integration/examplecontroller/starter/starter.go deleted file mode 100644 index d100b9ed..00000000 --- a/test/integration/examplecontroller/starter/starter.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package starter - -import ( - "context" - "fmt" - "time" - - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/events" - - "github.com/suzerain-io/placeholder-name/internal/controller" - examplecontroller "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/controller" -) - -func StartExampleController(ctx context.Context, config *rest.Config, secretData string) error { - kubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - return fmt.Errorf("failed to build client: %w", err) - } - - kubeInformers := informers.NewSharedInformerFactory(kubeClient, 20*time.Minute) - - recorder := events.NewEventBroadcasterAdapter(kubeClient).NewRecorder("example-controller") - - manager := controller.NewManager(). - WithController( - examplecontroller.NewExampleCreatingController( - kubeInformers.Core().V1().Services(), - kubeInformers.Core().V1().Secrets(), - kubeClient.CoreV1(), - recorder, - secretData, - ), 5, - ). - WithController( - examplecontroller.NewExampleUpdatingController( - kubeInformers.Core().V1().Services(), - kubeInformers.Core().V1().Secrets(), - kubeClient.CoreV1(), - recorder, - secretData, - ), 5, - ) - - kubeInformers.Start(ctx.Done()) - go manager.Start(ctx) - - return nil -} diff --git a/test/integration/examplecontroller_test.go b/test/integration/examplecontroller_test.go deleted file mode 100644 index c994f9a5..00000000 --- a/test/integration/examplecontroller_test.go +++ /dev/null @@ -1,161 +0,0 @@ -/* -Copyright 2020 VMware, Inc. -SPDX-License-Identifier: Apache-2.0 -*/ - -package integration - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - - "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api" - examplestart "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/starter" - "github.com/suzerain-io/placeholder-name/test/library" -) - -func TestExampleController(t *testing.T) { - config := library.NewClientConfig(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - secretData := "super-secret-data-1" - - err := examplestart.StartExampleController(ctx, config, secretData) - require.NoError(t, err) - - client := library.NewClientset(t) - - namespaces := client.CoreV1().Namespaces() - - namespace, err := namespaces.Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "example-controller-test-", - }, - }, metav1.CreateOptions{}) - require.NoError(t, err) - - defer func() { - deleteErr := namespaces.Delete(context.Background(), namespace.Name, metav1.DeleteOptions{}) - require.NoError(t, deleteErr) - }() - - services := client.CoreV1().Services(namespace.Name) - secrets := client.CoreV1().Secrets(namespace.Name) - - secretsWatch, err := secrets.Watch(context.Background(), metav1.ListOptions{}) - require.NoError(t, err) - defer secretsWatch.Stop() - - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "example-service-test", - Annotations: map[string]string{ - api.SecretNameAnnotation: "example-secret-name", - }, - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Port: 443, - }, - }, - }, - } - _, err = services.Create(ctx, service, metav1.CreateOptions{}) - require.NoError(t, err) - - timeout := time.After(10 * time.Second) -done: - for { - select { - case event := <-secretsWatch.ResultChan(): - if event.Type != watch.Added { - continue - } - secret, ok := event.Object.(*corev1.Secret) - if !ok { - continue - } - if secret.Name != service.Annotations[api.SecretNameAnnotation] { - continue - } - - expectedData := map[string][]byte{ - api.SecretDataKey: []byte(secretData), - } - require.Equal(t, expectedData, secret.Data, "expected to see new secret data: %s", library.Sdump(secret)) - break done // immediately stop consuming events because we want to check for updated events below - - case <-timeout: - t.Fatal("timed out waiting to see new secret") - } - } - - // shutdown the controllers so we can change the secret data - cancel() - time.Sleep(5 * time.Second) // wait a bit for the controllers to shut down - - ctx, cancel = context.WithCancel(context.Background()) - defer cancel() - - secretData2 := "super-secret-data-2" - - err = examplestart.StartExampleController(ctx, config, secretData2) - require.NoError(t, err) - - timeout = time.After(10 * time.Second) -done2: - for { - select { - case event := <-secretsWatch.ResultChan(): - if event.Type != watch.Modified { - continue - } - secret, ok := event.Object.(*corev1.Secret) - if !ok { - continue - } - if secret.Name != service.Annotations[api.SecretNameAnnotation] { - continue - } - - expectedData := map[string][]byte{ - api.SecretDataKey: []byte(secretData2), - } - require.Equal(t, expectedData, secret.Data, "expected to see updated secret data: %s", library.Sdump(secret)) - break done2 // immediately stop consuming events because we want to check for hot loops below - - case <-timeout: - t.Fatal("timed out waiting to see updated secret") - } - } - - timeout = time.After(5 * time.Second) -done3: - for { - select { - case event := <-secretsWatch.ResultChan(): - secret, ok := event.Object.(*corev1.Secret) - if !ok { - continue - } - if secret.Name != service.Annotations[api.SecretNameAnnotation] { - continue - } - - // this assumes that no other actor in the system is trying to mutate this secret - t.Errorf("unexpected event seen for secret: %s", library.Sdump(event)) - - case <-timeout: - break done3 // we saw no events matching our secret meaning that we are not hot looping - } - } -}