217 lines
5.5 KiB
Go
217 lines
5.5 KiB
Go
|
/*
|
||
|
Copyright 2020 VMware, Inc.
|
||
|
SPDX-License-Identifier: Apache-2.0
|
||
|
*/
|
||
|
|
||
|
package controller
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||
|
"k8s.io/client-go/tools/cache"
|
||
|
"k8s.io/client-go/tools/events"
|
||
|
"k8s.io/client-go/util/workqueue"
|
||
|
"k8s.io/klog/v2"
|
||
|
)
|
||
|
|
||
|
// Controller interface represents a runnable Kubernetes controller.
|
||
|
// Cancelling the context passed will cause the controller to shutdown.
|
||
|
// Number of workers determine how much parallel the job processing should be.
|
||
|
type Controller interface {
|
||
|
// Run runs the controller and blocks until the controller is finished.
|
||
|
// Number of workers can be specified via workers parameter.
|
||
|
// This function will return when all internal loops are finished.
|
||
|
// Note that having more than one worker usually means handing parallelization of Sync().
|
||
|
Run(ctx context.Context, workers int)
|
||
|
|
||
|
// Name returns the controller name string.
|
||
|
Name() string
|
||
|
|
||
|
// The methods below should only be called during tests via the Test* functions.
|
||
|
|
||
|
// sync contains the main controller logic.
|
||
|
// This can be used in unit tests to exercise the Syncer by directly calling it.
|
||
|
sync(ctx Context) error
|
||
|
|
||
|
// wrap wraps the main controller logic provided via the Syncer.
|
||
|
// This can be used in tests to synchronize asynchronous events as seen by a running controller.
|
||
|
// The wrapping must be done after New is called and before Run is called.
|
||
|
wrap(wrapper SyncWrapperFunc)
|
||
|
}
|
||
|
|
||
|
var _ Controller = &controller{}
|
||
|
|
||
|
type Config struct {
|
||
|
Name string
|
||
|
Syncer Syncer
|
||
|
}
|
||
|
|
||
|
func New(config Config, opts ...Option) Controller {
|
||
|
c := &controller{
|
||
|
config: config,
|
||
|
}
|
||
|
|
||
|
// set up defaults
|
||
|
WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c)
|
||
|
WithRecorder(klogRecorder{})(c)
|
||
|
|
||
|
for _, opt := range opts {
|
||
|
opt(c)
|
||
|
}
|
||
|
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
type controller struct {
|
||
|
config Config
|
||
|
|
||
|
queue workqueue.RateLimitingInterface
|
||
|
queueWrapper Queue
|
||
|
maxRetries int
|
||
|
recorder events.EventRecorder
|
||
|
|
||
|
run bool
|
||
|
runOpts []Option
|
||
|
|
||
|
cacheSyncs []cache.InformerSynced
|
||
|
}
|
||
|
|
||
|
func (c *controller) Run(ctx context.Context, workers int) {
|
||
|
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
||
|
|
||
|
klog.InfoS("starting controller", "controller", c.Name(), "workers", workers)
|
||
|
|
||
|
c.run = true
|
||
|
for _, opt := range c.runOpts {
|
||
|
opt(c)
|
||
|
}
|
||
|
|
||
|
if !c.waitForCacheSyncWithTimeout() {
|
||
|
panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name())))
|
||
|
}
|
||
|
|
||
|
var workerWg sync.WaitGroup
|
||
|
|
||
|
// workerContext is used to track and initiate worker shutdown
|
||
|
workerContext, workerContextCancel := context.WithCancel(context.Background())
|
||
|
|
||
|
defer func() {
|
||
|
c.queue.ShutDown() // shutdown the controller queue first
|
||
|
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
|
||
|
|
||
|
// Wait for all workers to finish their job.
|
||
|
// at this point the Run() can hang and callers have to implement the logic that will kill
|
||
|
// this controller (SIGKILL).
|
||
|
workerWg.Wait()
|
||
|
klog.InfoS("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers)
|
||
|
}()
|
||
|
|
||
|
for i := 1; i <= workers; i++ {
|
||
|
idx := i
|
||
|
klog.InfoS("starting worker", "controller", c.Name(), "worker", idx)
|
||
|
workerWg.Add(1)
|
||
|
go func() {
|
||
|
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
||
|
defer func() {
|
||
|
klog.InfoS("shutting down worker", "controller", c.Name(), "worker", idx)
|
||
|
workerWg.Done()
|
||
|
}()
|
||
|
c.runWorker(workerContext)
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
<-ctx.Done() // wait for controller context to be cancelled
|
||
|
}
|
||
|
|
||
|
func (c *controller) Name() string {
|
||
|
return c.config.Name
|
||
|
}
|
||
|
|
||
|
func (c *controller) sync(ctx Context) error {
|
||
|
return c.config.Syncer.Sync(ctx)
|
||
|
}
|
||
|
|
||
|
func (c *controller) wrap(wrapper SyncWrapperFunc) {
|
||
|
c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) {
|
||
|
c.config.Syncer = wrapper(c.config.Syncer)
|
||
|
}))
|
||
|
}
|
||
|
|
||
|
func (c *controller) waitForCacheSyncWithTimeout() bool {
|
||
|
// prevent us from blocking forever due to a broken informer
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||
|
defer cancel()
|
||
|
|
||
|
return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...)
|
||
|
}
|
||
|
|
||
|
func (c *controller) add(filter Filter, object metav1.Object) {
|
||
|
key := filter.Parent(object)
|
||
|
c.queueWrapper.Add(key)
|
||
|
}
|
||
|
|
||
|
// runWorker runs a single worker
|
||
|
// The worker is asked to terminate when the passed context is cancelled.
|
||
|
func (c *controller) runWorker(ctx context.Context) {
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
default:
|
||
|
c.processNextWorkItem(ctx)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *controller) processNextWorkItem(ctx context.Context) {
|
||
|
queueKey, quit := c.queue.Get()
|
||
|
if quit {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
key := queueKey.(Key)
|
||
|
defer c.queue.Done(key)
|
||
|
|
||
|
syncCtx := Context{
|
||
|
Context: ctx,
|
||
|
Name: c.Name(),
|
||
|
Key: key,
|
||
|
Queue: c.queueWrapper,
|
||
|
Recorder: c.recorder,
|
||
|
}
|
||
|
|
||
|
err := c.sync(syncCtx)
|
||
|
c.handleKey(key, err)
|
||
|
}
|
||
|
|
||
|
func (c *controller) handleKey(key Key, err error) {
|
||
|
if err == nil {
|
||
|
c.queue.Forget(key)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
retryForever := c.maxRetries <= 0
|
||
|
shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries
|
||
|
|
||
|
if !shouldRetry {
|
||
|
utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err))
|
||
|
c.queue.Forget(key)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if errors.Is(err, ErrSyntheticRequeue) {
|
||
|
// logging this helps detecting wedged controllers with missing pre-requirements
|
||
|
klog.V(4).InfoS("requested synthetic requeue", "controller", c.Name(), "key", key)
|
||
|
} else {
|
||
|
utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err))
|
||
|
}
|
||
|
|
||
|
c.queue.AddRateLimited(key)
|
||
|
}
|