Remove the controller pkg because it was moved to another repo
Signed-off-by: Ryan Richard <richardry@vmware.com>
This commit is contained in:
parent
43c3f1ab2e
commit
409462e989
@ -1,216 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
"k8s.io/client-go/util/workqueue"
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Controller interface represents a runnable Kubernetes controller.
|
|
||||||
// Cancelling the context passed will cause the controller to shutdown.
|
|
||||||
// Number of workers determine how much parallel the job processing should be.
|
|
||||||
type Controller interface {
|
|
||||||
// Run runs the controller and blocks until the controller is finished.
|
|
||||||
// Number of workers can be specified via workers parameter.
|
|
||||||
// This function will return when all internal loops are finished.
|
|
||||||
// Note that having more than one worker usually means handing parallelization of Sync().
|
|
||||||
Run(ctx context.Context, workers int)
|
|
||||||
|
|
||||||
// Name returns the controller name string.
|
|
||||||
Name() string
|
|
||||||
|
|
||||||
// The methods below should only be called during tests via the Test* functions.
|
|
||||||
|
|
||||||
// sync contains the main controller logic.
|
|
||||||
// This can be used in unit tests to exercise the Syncer by directly calling it.
|
|
||||||
sync(ctx Context) error
|
|
||||||
|
|
||||||
// wrap wraps the main controller logic provided via the Syncer.
|
|
||||||
// This can be used in tests to synchronize asynchronous events as seen by a running controller.
|
|
||||||
// The wrapping must be done after New is called and before Run is called.
|
|
||||||
wrap(wrapper SyncWrapperFunc)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ Controller = &controller{}
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
Name string
|
|
||||||
Syncer Syncer
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(config Config, opts ...Option) Controller {
|
|
||||||
c := &controller{
|
|
||||||
config: config,
|
|
||||||
}
|
|
||||||
|
|
||||||
// set up defaults
|
|
||||||
WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c)
|
|
||||||
WithRecorder(klogRecorder{})(c)
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
type controller struct {
|
|
||||||
config Config
|
|
||||||
|
|
||||||
queue workqueue.RateLimitingInterface
|
|
||||||
queueWrapper Queue
|
|
||||||
maxRetries int
|
|
||||||
recorder events.EventRecorder
|
|
||||||
|
|
||||||
run bool
|
|
||||||
runOpts []Option
|
|
||||||
|
|
||||||
cacheSyncs []cache.InformerSynced
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) Run(ctx context.Context, workers int) {
|
|
||||||
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
|
||||||
|
|
||||||
klog.InfoS("starting controller", "controller", c.Name(), "workers", workers)
|
|
||||||
|
|
||||||
c.run = true
|
|
||||||
for _, opt := range c.runOpts {
|
|
||||||
opt(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !c.waitForCacheSyncWithTimeout() {
|
|
||||||
panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name())))
|
|
||||||
}
|
|
||||||
|
|
||||||
var workerWg sync.WaitGroup
|
|
||||||
|
|
||||||
// workerContext is used to track and initiate worker shutdown
|
|
||||||
workerContext, workerContextCancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
c.queue.ShutDown() // shutdown the controller queue first
|
|
||||||
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
|
|
||||||
|
|
||||||
// Wait for all workers to finish their job.
|
|
||||||
// at this point the Run() can hang and callers have to implement the logic that will kill
|
|
||||||
// this controller (SIGKILL).
|
|
||||||
workerWg.Wait()
|
|
||||||
klog.InfoS("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for i := 1; i <= workers; i++ {
|
|
||||||
idx := i
|
|
||||||
klog.InfoS("starting worker", "controller", c.Name(), "worker", idx)
|
|
||||||
workerWg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
|
||||||
defer func() {
|
|
||||||
klog.InfoS("shutting down worker", "controller", c.Name(), "worker", idx)
|
|
||||||
workerWg.Done()
|
|
||||||
}()
|
|
||||||
c.runWorker(workerContext)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
<-ctx.Done() // wait for controller context to be cancelled
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) Name() string {
|
|
||||||
return c.config.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) sync(ctx Context) error {
|
|
||||||
return c.config.Syncer.Sync(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) wrap(wrapper SyncWrapperFunc) {
|
|
||||||
c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) {
|
|
||||||
c.config.Syncer = wrapper(c.config.Syncer)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) waitForCacheSyncWithTimeout() bool {
|
|
||||||
// prevent us from blocking forever due to a broken informer
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) add(filter Filter, object metav1.Object) {
|
|
||||||
key := filter.Parent(object)
|
|
||||||
c.queueWrapper.Add(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// runWorker runs a single worker
|
|
||||||
// The worker is asked to terminate when the passed context is cancelled.
|
|
||||||
func (c *controller) runWorker(ctx context.Context) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
c.processNextWorkItem(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) processNextWorkItem(ctx context.Context) {
|
|
||||||
queueKey, quit := c.queue.Get()
|
|
||||||
if quit {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
key := queueKey.(Key)
|
|
||||||
defer c.queue.Done(key)
|
|
||||||
|
|
||||||
syncCtx := Context{
|
|
||||||
Context: ctx,
|
|
||||||
Name: c.Name(),
|
|
||||||
Key: key,
|
|
||||||
Queue: c.queueWrapper,
|
|
||||||
Recorder: c.recorder,
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.sync(syncCtx)
|
|
||||||
c.handleKey(key, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) handleKey(key Key, err error) {
|
|
||||||
if err == nil {
|
|
||||||
c.queue.Forget(key)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
retryForever := c.maxRetries <= 0
|
|
||||||
shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries
|
|
||||||
|
|
||||||
if !shouldRetry {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err))
|
|
||||||
c.queue.Forget(key)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if errors.Is(err, ErrSyntheticRequeue) {
|
|
||||||
// logging this helps detecting wedged controllers with missing pre-requirements
|
|
||||||
klog.V(4).InfoS("requested synthetic requeue", "controller", c.Name(), "key", key)
|
|
||||||
} else {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
c.queue.AddRateLimited(key)
|
|
||||||
}
|
|
@ -1,15 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
type die string
|
|
||||||
|
|
||||||
func crash(i interface{}) {
|
|
||||||
mustDie, ok := i.(die)
|
|
||||||
if ok {
|
|
||||||
panic(string(mustDie))
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,18 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
// ErrSyntheticRequeue can be returned from a Syncer to force a retry artificially for the current key.
|
|
||||||
// This can also be done by re-adding the key to queue, but this is more convenient and has better logging.
|
|
||||||
const ErrSyntheticRequeue = constErr("synthetic requeue request")
|
|
||||||
|
|
||||||
var _ error = constErr("")
|
|
||||||
|
|
||||||
type constErr string
|
|
||||||
|
|
||||||
func (e constErr) Error() string {
|
|
||||||
return string(e)
|
|
||||||
}
|
|
@ -1,76 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Filter interface {
|
|
||||||
Add(obj metav1.Object) bool
|
|
||||||
Update(oldObj, newObj metav1.Object) bool
|
|
||||||
Delete(obj metav1.Object) bool
|
|
||||||
|
|
||||||
Parent(obj metav1.Object) Key
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ Filter = FilterFuncs{}
|
|
||||||
|
|
||||||
type ParentFunc func(obj metav1.Object) Key
|
|
||||||
|
|
||||||
type FilterFuncs struct {
|
|
||||||
ParentFunc ParentFunc
|
|
||||||
AddFunc func(obj metav1.Object) bool
|
|
||||||
UpdateFunc func(oldObj, newObj metav1.Object) bool
|
|
||||||
DeleteFunc func(obj metav1.Object) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f FilterFuncs) Parent(obj metav1.Object) Key {
|
|
||||||
if f.ParentFunc == nil {
|
|
||||||
return Key{
|
|
||||||
Namespace: obj.GetNamespace(),
|
|
||||||
Name: obj.GetName(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return f.ParentFunc(obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f FilterFuncs) Add(obj metav1.Object) bool {
|
|
||||||
if f.AddFunc == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return f.AddFunc(obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f FilterFuncs) Update(oldObj, newObj metav1.Object) bool {
|
|
||||||
if f.UpdateFunc == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return f.UpdateFunc(oldObj, newObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f FilterFuncs) Delete(obj metav1.Object) bool {
|
|
||||||
if f.DeleteFunc == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return f.DeleteFunc(obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
func FilterByNames(parentFunc ParentFunc, names ...string) Filter {
|
|
||||||
set := sets.NewString(names...)
|
|
||||||
has := func(obj metav1.Object) bool {
|
|
||||||
return set.Has(obj.GetName())
|
|
||||||
}
|
|
||||||
return FilterFuncs{
|
|
||||||
ParentFunc: parentFunc,
|
|
||||||
AddFunc: has,
|
|
||||||
UpdateFunc: func(oldObj, newObj metav1.Object) bool {
|
|
||||||
return has(newObj)
|
|
||||||
},
|
|
||||||
DeleteFunc: has,
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,25 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import "k8s.io/client-go/tools/cache"
|
|
||||||
|
|
||||||
type InformerGetter interface {
|
|
||||||
Informer() cache.SharedIndexInformer
|
|
||||||
}
|
|
||||||
|
|
||||||
type InformerOption struct {
|
|
||||||
SkipSync bool
|
|
||||||
SkipEvents bool
|
|
||||||
|
|
||||||
// TODO maybe add a field like:
|
|
||||||
// ResyncPeriod time.Duration
|
|
||||||
// to support using AddEventHandlerWithResyncPeriod
|
|
||||||
// this field would be mutually exclusive with SkipEvents
|
|
||||||
// I suspect we do not need this level of flexibility and resyncs can mask bugs in controller logic
|
|
||||||
// A related change could be an Option such as WithResyncSchedule to allow for cron style control loops
|
|
||||||
// It is unclear to me if we would ever need that since we assume that all events come from a Kube watch
|
|
||||||
}
|
|
@ -1,59 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Manager interface {
|
|
||||||
Start(ctx context.Context)
|
|
||||||
WithController(controller Controller, workers int) Manager
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewManager() Manager {
|
|
||||||
return &controllerManager{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// runnableController represents single controller runnable configuration.
|
|
||||||
type runnableController struct {
|
|
||||||
controller Controller
|
|
||||||
workers int
|
|
||||||
}
|
|
||||||
|
|
||||||
type controllerManager struct {
|
|
||||||
controllers []runnableController
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ Manager = &controllerManager{}
|
|
||||||
|
|
||||||
func (c *controllerManager) WithController(controller Controller, workers int) Manager {
|
|
||||||
c.controllers = append(c.controllers, runnableController{
|
|
||||||
controller: controller,
|
|
||||||
workers: workers,
|
|
||||||
})
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start will run all managed controllers and block until all controllers shutdown.
|
|
||||||
// When the context passed is cancelled, all controllers are signalled to shutdown.
|
|
||||||
func (c *controllerManager) Start(ctx context.Context) {
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(len(c.controllers))
|
|
||||||
for i := range c.controllers {
|
|
||||||
idx := i
|
|
||||||
go func() {
|
|
||||||
r := c.controllers[idx]
|
|
||||||
defer klog.InfoS("controller terminated", "controller", r.controller.Name())
|
|
||||||
defer wg.Done()
|
|
||||||
r.controller.Run(ctx, r.workers)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
@ -1,154 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
"k8s.io/client-go/util/workqueue"
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Option func(*controller)
|
|
||||||
|
|
||||||
func WithMaxRetries(maxRetries int) Option {
|
|
||||||
return func(c *controller) {
|
|
||||||
c.maxRetries = maxRetries
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithInitialEvent(key Key) Option {
|
|
||||||
return toNaiveRunOpt(func(c *controller) {
|
|
||||||
c.queueWrapper.Add(key)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithRateLimiter(limiter workqueue.RateLimiter) Option {
|
|
||||||
return func(c *controller) {
|
|
||||||
c.queue = workqueue.NewNamedRateLimitingQueue(limiter, c.Name())
|
|
||||||
c.queueWrapper = &queueWrapper{queue: c.queue}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithRecorder(recorder events.EventRecorder) Option {
|
|
||||||
return func(c *controller) {
|
|
||||||
c.recorder = recorder
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithInformer(getter InformerGetter, filter Filter, opt InformerOption) Option {
|
|
||||||
informer := getter.Informer() // immediately signal that we intend to use this informer in case it is lazily initialized
|
|
||||||
return toRunOpt(func(c *controller) {
|
|
||||||
if opt.SkipSync && opt.SkipEvents {
|
|
||||||
panic(die("cannot skip syncing and event handlers at the same time"))
|
|
||||||
}
|
|
||||||
|
|
||||||
if !opt.SkipSync {
|
|
||||||
c.cacheSyncs = append(c.cacheSyncs, informer.HasSynced)
|
|
||||||
}
|
|
||||||
|
|
||||||
if opt.SkipEvents {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: func(obj interface{}) {
|
|
||||||
object := metaOrDie(obj)
|
|
||||||
if filter.Add(object) {
|
|
||||||
klog.V(4).InfoS("handling add",
|
|
||||||
"controller", c.Name(),
|
|
||||||
"namespace", object.GetNamespace(),
|
|
||||||
"name", object.GetName(),
|
|
||||||
"selfLink", object.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
|
|
||||||
"kind", fmt.Sprintf("%T", object),
|
|
||||||
)
|
|
||||||
c.add(filter, object)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
||||||
oldObject := metaOrDie(oldObj)
|
|
||||||
newObject := metaOrDie(newObj)
|
|
||||||
if filter.Update(oldObject, newObject) {
|
|
||||||
klog.V(4).InfoS("handling update",
|
|
||||||
"controller", c.Name(),
|
|
||||||
"namespace", newObject.GetNamespace(),
|
|
||||||
"name", newObject.GetName(),
|
|
||||||
"selfLink", newObject.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
|
|
||||||
"kind", fmt.Sprintf("%T", newObject),
|
|
||||||
)
|
|
||||||
c.add(filter, newObject)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
DeleteFunc: func(obj interface{}) {
|
|
||||||
accessor, err := meta.Accessor(obj)
|
|
||||||
if err != nil {
|
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
||||||
if !ok {
|
|
||||||
//nolint: goerr113
|
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: could not get object from tombstone: %+v", c.Name(), obj))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
accessor, err = meta.Accessor(tombstone.Obj)
|
|
||||||
if err != nil {
|
|
||||||
//nolint: goerr113
|
|
||||||
utilruntime.HandleError(fmt.Errorf("%s: tombstone contained object that is not an accessor: %+v", c.Name(), obj))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if filter.Delete(accessor) {
|
|
||||||
klog.V(4).InfoS("handling delete",
|
|
||||||
"controller", c.Name(),
|
|
||||||
"namespace", accessor.GetNamespace(),
|
|
||||||
"name", accessor.GetName(),
|
|
||||||
"selfLink", accessor.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
|
|
||||||
"kind", fmt.Sprintf("%T", accessor),
|
|
||||||
)
|
|
||||||
c.add(filter, accessor)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// toRunOpt guarantees that an Option only runs once on the first call to Run (and not New), even if a controller is stopped and restarted.
|
|
||||||
func toRunOpt(opt Option) Option {
|
|
||||||
return toOnceOpt(toNaiveRunOpt(opt))
|
|
||||||
}
|
|
||||||
|
|
||||||
// toNaiveRunOpt guarantees that an Option only runs on calls to Run (and not New), even if a controller is stopped and restarted.
|
|
||||||
func toNaiveRunOpt(opt Option) Option {
|
|
||||||
return func(c *controller) {
|
|
||||||
if c.run {
|
|
||||||
opt(c)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.runOpts = append(c.runOpts, opt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// toOnceOpt guarantees that an Option only runs once.
|
|
||||||
func toOnceOpt(opt Option) Option {
|
|
||||||
var once sync.Once
|
|
||||||
return func(c *controller) {
|
|
||||||
once.Do(func() {
|
|
||||||
opt(c)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func metaOrDie(obj interface{}) metav1.Object {
|
|
||||||
accessor, err := meta.Accessor(obj)
|
|
||||||
if err != nil {
|
|
||||||
panic(err) // this should never happen
|
|
||||||
}
|
|
||||||
return accessor
|
|
||||||
}
|
|
@ -1,27 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
)
|
|
||||||
|
|
||||||
type getter bool
|
|
||||||
|
|
||||||
func (g *getter) Informer() cache.SharedIndexInformer {
|
|
||||||
*g = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInformerCalled(t *testing.T) {
|
|
||||||
g := getter(false)
|
|
||||||
_ = New(Config{}, WithInformer(&g, FilterByNames(nil), InformerOption{}))
|
|
||||||
if !g {
|
|
||||||
t.Error("expected InformerGetter.Informer() to be called")
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,41 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/client-go/util/workqueue"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Queue interface {
|
|
||||||
// Add immediately adds a key to the queue and marks it as needing processing.
|
|
||||||
Add(key Key)
|
|
||||||
|
|
||||||
// AddRateLimited adds a key to the queue after the rate limiter says it is ok.
|
|
||||||
AddRateLimited(key Key)
|
|
||||||
|
|
||||||
// AddAfter adds a key to the queue after the indicated duration has passed.
|
|
||||||
AddAfter(key Key, duration time.Duration)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ Queue = &queueWrapper{}
|
|
||||||
|
|
||||||
type queueWrapper struct {
|
|
||||||
queue workqueue.RateLimitingInterface
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queueWrapper) Add(key Key) {
|
|
||||||
q.queue.Add(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queueWrapper) AddRateLimited(key Key) {
|
|
||||||
q.queue.AddRateLimited(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queueWrapper) AddAfter(key Key, duration time.Duration) {
|
|
||||||
q.queue.AddAfter(key, duration)
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ events.EventRecorder = klogRecorder{}
|
|
||||||
|
|
||||||
type klogRecorder struct{}
|
|
||||||
|
|
||||||
func (n klogRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
|
|
||||||
klog.V(4).InfoS("recording event",
|
|
||||||
"regarding", regarding,
|
|
||||||
"related", related,
|
|
||||||
"eventtype", eventtype,
|
|
||||||
"reason", reason,
|
|
||||||
"action", action,
|
|
||||||
"message", fmt.Sprintf(note, args...),
|
|
||||||
)
|
|
||||||
}
|
|
@ -1,44 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ Syncer = SyncFunc(nil)
|
|
||||||
|
|
||||||
type Syncer interface {
|
|
||||||
Sync(ctx Context) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type SyncFunc func(ctx Context) error
|
|
||||||
|
|
||||||
func (s SyncFunc) Sync(ctx Context) error {
|
|
||||||
return s(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Context struct {
|
|
||||||
Context context.Context
|
|
||||||
Name string
|
|
||||||
Key Key
|
|
||||||
Queue Queue
|
|
||||||
Recorder events.EventRecorder
|
|
||||||
}
|
|
||||||
|
|
||||||
type Key struct {
|
|
||||||
Namespace string
|
|
||||||
Name string
|
|
||||||
|
|
||||||
// TODO determine if it makes sense to add a field like:
|
|
||||||
// Extra interface{}
|
|
||||||
// This would allow a custom ParentFunc to pass extra data through to the Syncer
|
|
||||||
// The boxed type would have to be comparable (i.e. usable as a map key)
|
|
||||||
}
|
|
||||||
|
|
||||||
type SyncWrapperFunc func(syncer Syncer) Syncer
|
|
@ -1,18 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TestSync(t *testing.T, controller Controller, ctx Context) error {
|
|
||||||
t.Helper() // force testing import to discourage external use
|
|
||||||
return controller.sync(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWrap(t *testing.T, controller Controller, wrapper SyncWrapperFunc) {
|
|
||||||
t.Helper() // force testing import to discourage external use
|
|
||||||
controller.wrap(wrapper)
|
|
||||||
}
|
|
@ -1,20 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package api
|
|
||||||
|
|
||||||
// Annotation on service.
|
|
||||||
const SecretNameAnnotation = "service.placeholder.io/secret-name"
|
|
||||||
|
|
||||||
// Annotations on secret.
|
|
||||||
const (
|
|
||||||
// ServiceUIDAnnotation is an annotation on a secret that indicates which service created it, by UID
|
|
||||||
ServiceUIDAnnotation = "service.placeholder.io/service-uid"
|
|
||||||
// ServiceNameAnnotation is an annotation on a secret that indicates which service created it, by Name
|
|
||||||
// to allow reverse lookups on services for comparison against UIDs
|
|
||||||
ServiceNameAnnotation = "service.placeholder.io/service-name"
|
|
||||||
)
|
|
||||||
|
|
||||||
const SecretDataKey = "secret-data"
|
|
@ -1,182 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
"github.com/suzerain-io/placeholder-name/internal/controller"
|
|
||||||
"github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
//nolint:funlen
|
|
||||||
func NewExampleCreatingController(
|
|
||||||
services corev1informers.ServiceInformer,
|
|
||||||
secrets corev1informers.SecretInformer,
|
|
||||||
secretClient corev1client.SecretsGetter,
|
|
||||||
recorder events.EventRecorder,
|
|
||||||
secretData string,
|
|
||||||
) controller.Controller {
|
|
||||||
serviceLister := services.Lister()
|
|
||||||
secretLister := secrets.Lister()
|
|
||||||
|
|
||||||
// note that these functions do not need to be inlined
|
|
||||||
// this just demonstrates that for simple Syncer implementations, everything can be in one place
|
|
||||||
|
|
||||||
requiresSecretGeneration := func(service *corev1.Service) (bool, error) {
|
|
||||||
// check the secret since it could not have been created yet
|
|
||||||
secretName := service.Annotations[api.SecretNameAnnotation]
|
|
||||||
if len(secretName) == 0 {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
secret, err := secretLister.Secrets(service.Namespace).Get(secretName)
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
// we have not created the secret yet
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("unable to get the secret %s/%s: %w", service.Namespace, secretName, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(secret.Data[api.SecretDataKey]) == secretData {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// the secret exists but the data does not match what we expect (i.e. we have new secretData now)
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
generateSecret := func(service *corev1.Service) error {
|
|
||||||
klog.V(4).InfoS("generating new secret for service", "namespace", service.Namespace, "name", service.Name)
|
|
||||||
|
|
||||||
secret := &corev1.Secret{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: service.Annotations[api.SecretNameAnnotation],
|
|
||||||
Namespace: service.Namespace,
|
|
||||||
Annotations: map[string]string{
|
|
||||||
api.ServiceUIDAnnotation: string(service.UID),
|
|
||||||
api.ServiceNameAnnotation: service.Name,
|
|
||||||
},
|
|
||||||
OwnerReferences: []metav1.OwnerReference{
|
|
||||||
{
|
|
||||||
APIVersion: "v1",
|
|
||||||
Kind: "Service",
|
|
||||||
Name: service.Name,
|
|
||||||
UID: service.UID,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Finalizers: nil, // TODO maybe add finalizer to guarantee we never miss a delete event?
|
|
||||||
},
|
|
||||||
Type: corev1.SecretTypeOpaque,
|
|
||||||
Data: map[string][]byte{
|
|
||||||
api.SecretDataKey: []byte(secretData),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := secretClient.Secrets(service.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
|
|
||||||
if apierrors.IsAlreadyExists(err) {
|
|
||||||
actualSecret, getErr := secretClient.Secrets(service.Namespace).Get(context.TODO(), secret.Name, metav1.GetOptions{})
|
|
||||||
if getErr != nil {
|
|
||||||
return getErr
|
|
||||||
}
|
|
||||||
|
|
||||||
if actualSecret.Annotations[api.ServiceUIDAnnotation] != string(service.UID) {
|
|
||||||
//nolint: goerr113
|
|
||||||
utilruntime.HandleError(fmt.Errorf("secret %s/%s does not have corresponding service UID %v", actualSecret.Namespace, actualSecret.Name, service.UID))
|
|
||||||
return nil // drop from queue because we cannot safely update this secret
|
|
||||||
}
|
|
||||||
|
|
||||||
klog.V(4).InfoS("updating data in existing secret", "namespace", secret.Namespace, "name", secret.Name)
|
|
||||||
// Actually update the secret in the regeneration case (the secret already exists but we want to update to new secretData).
|
|
||||||
_, updateErr := secretClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{})
|
|
||||||
return updateErr
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to create secret %s/%s: %w", secret.Namespace, secret.Name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
syncer := controller.SyncFunc(func(ctx controller.Context) error {
|
|
||||||
service, err := serviceLister.Services(ctx.Key.Namespace).Get(ctx.Key.Name)
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to get the service %s/%s: %w", service.Namespace, service.Name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ok, err := requiresSecretGeneration(service)
|
|
||||||
if err != nil || !ok {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return generateSecret(service)
|
|
||||||
})
|
|
||||||
|
|
||||||
config := controller.Config{
|
|
||||||
Name: "example-controller-creating",
|
|
||||||
Syncer: syncer,
|
|
||||||
}
|
|
||||||
|
|
||||||
toServiceName := func(secret *corev1.Secret) (string, bool) {
|
|
||||||
serviceName := secret.Annotations[api.ServiceNameAnnotation]
|
|
||||||
return serviceName, len(serviceName) != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
hasSecretNameAnnotation := func(obj metav1.Object) bool {
|
|
||||||
return len(obj.GetAnnotations()[api.SecretNameAnnotation]) != 0
|
|
||||||
}
|
|
||||||
hasSecretNameAnnotationUpdate := func(oldObj, newObj metav1.Object) bool {
|
|
||||||
return hasSecretNameAnnotation(newObj) || hasSecretNameAnnotation(oldObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
return controller.New(config,
|
|
||||||
controller.WithInformer(services, controller.FilterFuncs{
|
|
||||||
AddFunc: hasSecretNameAnnotation,
|
|
||||||
UpdateFunc: hasSecretNameAnnotationUpdate,
|
|
||||||
}, controller.InformerOption{}),
|
|
||||||
|
|
||||||
controller.WithInformer(secrets, controller.FilterFuncs{
|
|
||||||
ParentFunc: func(obj metav1.Object) controller.Key {
|
|
||||||
secret := obj.(*corev1.Secret)
|
|
||||||
serviceName, _ := toServiceName(secret)
|
|
||||||
return controller.Key{Namespace: secret.Namespace, Name: serviceName}
|
|
||||||
},
|
|
||||||
DeleteFunc: func(obj metav1.Object) bool {
|
|
||||||
secret := obj.(*corev1.Secret)
|
|
||||||
serviceName, ok := toServiceName(secret)
|
|
||||||
if !ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
service, err := serviceLister.Services(secret.Namespace).Get(serviceName)
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to get service %s/%s: %w", secret.Namespace, serviceName, err))
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
klog.V(4).InfoS("recreating secret", "namespace", service.Namespace, "name", service.Name)
|
|
||||||
return true
|
|
||||||
},
|
|
||||||
}, controller.InformerOption{}),
|
|
||||||
|
|
||||||
controller.WithRecorder(recorder), // TODO actually use the recorder
|
|
||||||
)
|
|
||||||
}
|
|
@ -1,170 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
|
||||||
coretesting "k8s.io/client-go/testing"
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
|
|
||||||
"github.com/suzerain-io/placeholder-name/internal/controller"
|
|
||||||
"github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNewExampleCreatingController(t *testing.T) {
|
|
||||||
secretsGVR := schema.GroupVersionResource{Version: "v1", Resource: "secrets"}
|
|
||||||
|
|
||||||
type args struct {
|
|
||||||
services []*corev1.Service
|
|
||||||
secrets []*corev1.Secret
|
|
||||||
secretData string
|
|
||||||
}
|
|
||||||
type keyErr struct {
|
|
||||||
key controller.Key
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
args args
|
|
||||||
wantActions []coretesting.Action
|
|
||||||
wantKeyErrs []keyErr
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "service has annotation but secret does not exist",
|
|
||||||
args: args{
|
|
||||||
services: []*corev1.Service{
|
|
||||||
{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Namespace: "ns-1",
|
|
||||||
Name: "service-1",
|
|
||||||
Annotations: map[string]string{
|
|
||||||
api.SecretNameAnnotation: "secret-1",
|
|
||||||
},
|
|
||||||
UID: "0001",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
secretData: "foo-secret-1",
|
|
||||||
},
|
|
||||||
wantKeyErrs: []keyErr{
|
|
||||||
{
|
|
||||||
key: controller.Key{
|
|
||||||
Namespace: "ns-1",
|
|
||||||
Name: "service-1",
|
|
||||||
},
|
|
||||||
err: nil, // we expect no error with this key
|
|
||||||
},
|
|
||||||
},
|
|
||||||
wantActions: []coretesting.Action{
|
|
||||||
coretesting.NewCreateAction(secretsGVR, "ns-1", &corev1.Secret{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "secret-1",
|
|
||||||
Namespace: "ns-1",
|
|
||||||
Annotations: map[string]string{
|
|
||||||
api.ServiceUIDAnnotation: "0001",
|
|
||||||
api.ServiceNameAnnotation: "service-1",
|
|
||||||
},
|
|
||||||
OwnerReferences: []metav1.OwnerReference{
|
|
||||||
{
|
|
||||||
APIVersion: "v1",
|
|
||||||
Kind: "Service",
|
|
||||||
Name: "service-1",
|
|
||||||
UID: "0001",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Type: corev1.SecretTypeOpaque,
|
|
||||||
Data: map[string][]byte{
|
|
||||||
api.SecretDataKey: []byte("foo-secret-1"),
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
tt := tt
|
|
||||||
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
kubeClient := fake.NewSimpleClientset()
|
|
||||||
for i := range tt.args.services {
|
|
||||||
service := tt.args.services[i]
|
|
||||||
err := kubeClient.Tracker().Add(service)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
for i := range tt.args.secrets {
|
|
||||||
secret := tt.args.secrets[i]
|
|
||||||
err := kubeClient.Tracker().Add(secret)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
recorder := events.NewEventBroadcasterAdapter(kubeClient).NewRecorder("example-controller")
|
|
||||||
kubeInformers := informers.NewSharedInformerFactory(kubeClient, 0)
|
|
||||||
|
|
||||||
creatingController := NewExampleCreatingController(
|
|
||||||
kubeInformers.Core().V1().Services(),
|
|
||||||
kubeInformers.Core().V1().Secrets(),
|
|
||||||
kubeClient.CoreV1(),
|
|
||||||
recorder,
|
|
||||||
tt.args.secretData,
|
|
||||||
)
|
|
||||||
|
|
||||||
keyErrs := make(chan keyErr)
|
|
||||||
controller.TestWrap(t, creatingController, func(syncer controller.Syncer) controller.Syncer {
|
|
||||||
return controller.SyncFunc(func(ctx controller.Context) error {
|
|
||||||
err := syncer.Sync(ctx)
|
|
||||||
|
|
||||||
keyErrs <- keyErr{
|
|
||||||
key: ctx.Key,
|
|
||||||
err: err,
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// a different approach would be to use TestSync and run each iteration manually:
|
|
||||||
//
|
|
||||||
// err := controller.TestSync(t, c, ...)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
kubeInformers.Start(ctx.Done())
|
|
||||||
go creatingController.Run(ctx, 5) // TODO maybe only use one worker?
|
|
||||||
|
|
||||||
var actualKeyErrs []keyErr
|
|
||||||
done:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case key := <-keyErrs:
|
|
||||||
actualKeyErrs = append(actualKeyErrs, key)
|
|
||||||
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
// this assumes that calls to Sync are never more than three seconds apart
|
|
||||||
// we have five workers so there is little chance they all hang around doing nothing for that long
|
|
||||||
break done
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Figure out how to capture actions from informers
|
|
||||||
// TODO: I think we need some more fancy order independent equal comparison here
|
|
||||||
|
|
||||||
require.Equal(t, tt.wantKeyErrs, actualKeyErrs)
|
|
||||||
|
|
||||||
// ignore the discovery call from the event recorder and the list/watch from both informers (first five events)
|
|
||||||
require.Equal(t, tt.wantActions, kubeClient.Actions()[5:])
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,149 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
|
|
||||||
"github.com/suzerain-io/placeholder-name/internal/controller"
|
|
||||||
"github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewExampleUpdatingController(
|
|
||||||
services corev1informers.ServiceInformer,
|
|
||||||
secrets corev1informers.SecretInformer,
|
|
||||||
secretClient corev1client.SecretsGetter,
|
|
||||||
recorder events.EventRecorder,
|
|
||||||
secretData string,
|
|
||||||
) controller.Controller {
|
|
||||||
serviceLister := services.Lister()
|
|
||||||
secretLister := secrets.Lister()
|
|
||||||
|
|
||||||
// note that these functions do not need to be inlined
|
|
||||||
// this just demonstrates that for simple Syncer implementations, everything can be in one place
|
|
||||||
|
|
||||||
toServiceName := func(secret *corev1.Secret) (string, bool) {
|
|
||||||
serviceName := secret.Annotations[api.ServiceNameAnnotation]
|
|
||||||
return serviceName, len(serviceName) != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureSecretData := func(service *corev1.Service, secretCopy *corev1.Secret) bool {
|
|
||||||
var needsUpdate bool
|
|
||||||
|
|
||||||
expectedData := map[string][]byte{
|
|
||||||
api.SecretDataKey: []byte(secretData),
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(secretCopy.Data, expectedData) {
|
|
||||||
secretCopy.Data = expectedData
|
|
||||||
needsUpdate = true
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedOwnerReferences := []metav1.OwnerReference{
|
|
||||||
{
|
|
||||||
APIVersion: "v1",
|
|
||||||
Kind: "Service",
|
|
||||||
Name: service.Name,
|
|
||||||
UID: service.UID,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(secretCopy.OwnerReferences, expectedOwnerReferences) {
|
|
||||||
secretCopy.OwnerReferences = expectedOwnerReferences
|
|
||||||
needsUpdate = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return needsUpdate
|
|
||||||
}
|
|
||||||
|
|
||||||
isSecretValidForService := func(service *corev1.Service, secret *corev1.Secret) bool {
|
|
||||||
if service.Annotations[api.SecretNameAnnotation] != secret.Name {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if secret.Annotations[api.ServiceUIDAnnotation] != string(service.UID) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
getServiceForSecret := func(secret *corev1.Secret) (*corev1.Service, error) {
|
|
||||||
serviceName, ok := toServiceName(secret)
|
|
||||||
if !ok {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
service, err := serviceLister.Services(secret.Namespace).Get(serviceName)
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("unable to get service %s/%s: %w", secret.Namespace, serviceName, err)
|
|
||||||
}
|
|
||||||
return service, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
syncer := controller.SyncFunc(func(ctx controller.Context) error {
|
|
||||||
secret, err := secretLister.Secrets(ctx.Key.Namespace).Get(ctx.Key.Name)
|
|
||||||
if apierrors.IsNotFound(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to get the secret %s/%s: %w", secret.Namespace, secret.Name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
service, err := getServiceForSecret(secret)
|
|
||||||
if err != nil || service == nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !isSecretValidForService(service, secret) {
|
|
||||||
//nolint: goerr113
|
|
||||||
utilruntime.HandleError(fmt.Errorf("secret %s/%s does not have corresponding service UID %v", secret.Namespace, secret.Name, service.UID))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// make a copy to avoid mutating cache state
|
|
||||||
secretCopy := secret.DeepCopy()
|
|
||||||
|
|
||||||
if needsUpdate := ensureSecretData(service, secretCopy); needsUpdate {
|
|
||||||
_, updateErr := secretClient.Secrets(secretCopy.Namespace).Update(context.TODO(), secretCopy, metav1.UpdateOptions{})
|
|
||||||
return updateErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
config := controller.Config{
|
|
||||||
Name: "example-controller-updating",
|
|
||||||
Syncer: syncer,
|
|
||||||
}
|
|
||||||
|
|
||||||
addSecret := func(obj metav1.Object) bool {
|
|
||||||
secret := obj.(*corev1.Secret)
|
|
||||||
_, ok := toServiceName(secret)
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
return controller.New(config,
|
|
||||||
controller.WithInformer(services, controller.FilterFuncs{}, controller.InformerOption{SkipEvents: true}),
|
|
||||||
|
|
||||||
controller.WithInformer(secrets, controller.FilterFuncs{
|
|
||||||
AddFunc: addSecret,
|
|
||||||
UpdateFunc: func(oldObj, newObj metav1.Object) bool {
|
|
||||||
return addSecret(newObj) || addSecret(oldObj)
|
|
||||||
},
|
|
||||||
}, controller.InformerOption{}),
|
|
||||||
|
|
||||||
controller.WithRecorder(recorder), // TODO actually use the recorder
|
|
||||||
)
|
|
||||||
}
|
|
@ -1,56 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package starter
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
"k8s.io/client-go/kubernetes"
|
|
||||||
"k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/tools/events"
|
|
||||||
|
|
||||||
"github.com/suzerain-io/placeholder-name/internal/controller"
|
|
||||||
examplecontroller "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/controller"
|
|
||||||
)
|
|
||||||
|
|
||||||
func StartExampleController(ctx context.Context, config *rest.Config, secretData string) error {
|
|
||||||
kubeClient, err := kubernetes.NewForConfig(config)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to build client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
kubeInformers := informers.NewSharedInformerFactory(kubeClient, 20*time.Minute)
|
|
||||||
|
|
||||||
recorder := events.NewEventBroadcasterAdapter(kubeClient).NewRecorder("example-controller")
|
|
||||||
|
|
||||||
manager := controller.NewManager().
|
|
||||||
WithController(
|
|
||||||
examplecontroller.NewExampleCreatingController(
|
|
||||||
kubeInformers.Core().V1().Services(),
|
|
||||||
kubeInformers.Core().V1().Secrets(),
|
|
||||||
kubeClient.CoreV1(),
|
|
||||||
recorder,
|
|
||||||
secretData,
|
|
||||||
), 5,
|
|
||||||
).
|
|
||||||
WithController(
|
|
||||||
examplecontroller.NewExampleUpdatingController(
|
|
||||||
kubeInformers.Core().V1().Services(),
|
|
||||||
kubeInformers.Core().V1().Secrets(),
|
|
||||||
kubeClient.CoreV1(),
|
|
||||||
recorder,
|
|
||||||
secretData,
|
|
||||||
), 5,
|
|
||||||
)
|
|
||||||
|
|
||||||
kubeInformers.Start(ctx.Done())
|
|
||||||
go manager.Start(ctx)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,161 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2020 VMware, Inc.
|
|
||||||
SPDX-License-Identifier: Apache-2.0
|
|
||||||
*/
|
|
||||||
|
|
||||||
package integration
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
|
|
||||||
"github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/api"
|
|
||||||
examplestart "github.com/suzerain-io/placeholder-name/test/integration/examplecontroller/starter"
|
|
||||||
"github.com/suzerain-io/placeholder-name/test/library"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestExampleController(t *testing.T) {
|
|
||||||
config := library.NewClientConfig(t)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
secretData := "super-secret-data-1"
|
|
||||||
|
|
||||||
err := examplestart.StartExampleController(ctx, config, secretData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
client := library.NewClientset(t)
|
|
||||||
|
|
||||||
namespaces := client.CoreV1().Namespaces()
|
|
||||||
|
|
||||||
namespace, err := namespaces.Create(ctx, &corev1.Namespace{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
GenerateName: "example-controller-test-",
|
|
||||||
},
|
|
||||||
}, metav1.CreateOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
deleteErr := namespaces.Delete(context.Background(), namespace.Name, metav1.DeleteOptions{})
|
|
||||||
require.NoError(t, deleteErr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
services := client.CoreV1().Services(namespace.Name)
|
|
||||||
secrets := client.CoreV1().Secrets(namespace.Name)
|
|
||||||
|
|
||||||
secretsWatch, err := secrets.Watch(context.Background(), metav1.ListOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer secretsWatch.Stop()
|
|
||||||
|
|
||||||
service := &corev1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "example-service-test",
|
|
||||||
Annotations: map[string]string{
|
|
||||||
api.SecretNameAnnotation: "example-secret-name",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: corev1.ServiceSpec{
|
|
||||||
Ports: []corev1.ServicePort{
|
|
||||||
{
|
|
||||||
Port: 443,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
_, err = services.Create(ctx, service, metav1.CreateOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
timeout := time.After(10 * time.Second)
|
|
||||||
done:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-secretsWatch.ResultChan():
|
|
||||||
if event.Type != watch.Added {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
secret, ok := event.Object.(*corev1.Secret)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if secret.Name != service.Annotations[api.SecretNameAnnotation] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedData := map[string][]byte{
|
|
||||||
api.SecretDataKey: []byte(secretData),
|
|
||||||
}
|
|
||||||
require.Equal(t, expectedData, secret.Data, "expected to see new secret data: %s", library.Sdump(secret))
|
|
||||||
break done // immediately stop consuming events because we want to check for updated events below
|
|
||||||
|
|
||||||
case <-timeout:
|
|
||||||
t.Fatal("timed out waiting to see new secret")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// shutdown the controllers so we can change the secret data
|
|
||||||
cancel()
|
|
||||||
time.Sleep(5 * time.Second) // wait a bit for the controllers to shut down
|
|
||||||
|
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
secretData2 := "super-secret-data-2"
|
|
||||||
|
|
||||||
err = examplestart.StartExampleController(ctx, config, secretData2)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
timeout = time.After(10 * time.Second)
|
|
||||||
done2:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-secretsWatch.ResultChan():
|
|
||||||
if event.Type != watch.Modified {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
secret, ok := event.Object.(*corev1.Secret)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if secret.Name != service.Annotations[api.SecretNameAnnotation] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedData := map[string][]byte{
|
|
||||||
api.SecretDataKey: []byte(secretData2),
|
|
||||||
}
|
|
||||||
require.Equal(t, expectedData, secret.Data, "expected to see updated secret data: %s", library.Sdump(secret))
|
|
||||||
break done2 // immediately stop consuming events because we want to check for hot loops below
|
|
||||||
|
|
||||||
case <-timeout:
|
|
||||||
t.Fatal("timed out waiting to see updated secret")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
timeout = time.After(5 * time.Second)
|
|
||||||
done3:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-secretsWatch.ResultChan():
|
|
||||||
secret, ok := event.Object.(*corev1.Secret)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if secret.Name != service.Annotations[api.SecretNameAnnotation] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// this assumes that no other actor in the system is trying to mutate this secret
|
|
||||||
t.Errorf("unexpected event seen for secret: %s", library.Sdump(event))
|
|
||||||
|
|
||||||
case <-timeout:
|
|
||||||
break done3 // we saw no events matching our secret meaning that we are not hot looping
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user