ca6c29e463
Before this fix, the deadlock would prevent the leader pod from giving up its lease, which would make it take several minutes for new pods to be allowed to elect a new leader. During that time, no Pinniped controllers could write to the Kube API, so important resources were not being updated during that window. It would also make pod shutdown take about 1 minute. After this fix, the leader gives up its lease immediately, and pod shutdown takes about 1 second. This improves restart/upgrade time and also fixes the problem where there was no leader for several minutes after a restart/upgrade. The deadlock was between the post-start hook and the pre-shutdown hook. The pre-shutdown hook blocked until a certain background goroutine in the post-start hook finished, but that goroutine could not finish until the pre-shutdown hook finished. Thus, they were both blocked, waiting for each other infinitely. Eventually the process would be externally killed. This deadlock was most likely introduced by some change in Kube's generic api server package related to how the many complex channels used during server shutdown interact with each other, and was not noticed when we upgraded to the version which introduced the change.
227 lines
6.1 KiB
Go
227 lines
6.1 KiB
Go
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package controllerlib
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/events"
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
"go.pinniped.dev/internal/plog"
|
|
)
|
|
|
|
// Controller interface represents a runnable Kubernetes controller.
|
|
// Cancelling the context passed will cause the controller to shutdown.
|
|
// Number of workers determine how much parallel the job processing should be.
|
|
type Controller interface {
|
|
// Run runs the controller and blocks until the controller is finished.
|
|
// Number of workers can be specified via workers parameter.
|
|
// This function will return when all internal loops are finished.
|
|
// Note that having more than one worker usually means handing parallelization of Sync().
|
|
Run(ctx context.Context, workers int)
|
|
|
|
// Name returns the controller name string.
|
|
Name() string
|
|
|
|
// The methods below should only be called during tests via the Test* functions.
|
|
|
|
// sync contains the main controller logic.
|
|
// This can be used in unit tests to exercise the Syncer by directly calling it.
|
|
sync(ctx Context) error
|
|
|
|
// wrap wraps the main controller logic provided via the Syncer.
|
|
// This can be used in tests to synchronize asynchronous events as seen by a running controller.
|
|
// The wrapping must be done after New is called and before Run is called.
|
|
wrap(wrapper SyncWrapperFunc)
|
|
|
|
// These are called by the Run() method but also need to be called by Test* functions sometimes.
|
|
waitForCacheSyncWithTimeout() bool
|
|
invokeAllRunOpts()
|
|
}
|
|
|
|
var _ Controller = &controller{}
|
|
|
|
type Config struct {
|
|
Name string
|
|
Syncer Syncer
|
|
}
|
|
|
|
func New(config Config, opts ...Option) Controller {
|
|
c := &controller{
|
|
config: config,
|
|
}
|
|
|
|
// set up defaults
|
|
WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c)
|
|
WithRecorder(klogRecorder{})(c)
|
|
|
|
for _, opt := range opts {
|
|
opt(c)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
type controller struct {
|
|
config Config
|
|
|
|
queue workqueue.RateLimitingInterface
|
|
queueWrapper Queue
|
|
maxRetries int
|
|
recorder events.EventRecorder
|
|
|
|
run bool
|
|
runOpts []Option
|
|
|
|
cacheSyncs []cache.InformerSynced
|
|
}
|
|
|
|
func (c *controller) Run(ctx context.Context, workers int) {
|
|
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
|
|
|
plog.Debug("starting controller", "controller", c.Name(), "workers", workers)
|
|
|
|
c.invokeAllRunOpts()
|
|
|
|
if !c.waitForCacheSyncWithTimeout() {
|
|
panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name())))
|
|
}
|
|
|
|
var workerWg sync.WaitGroup
|
|
|
|
// workerContext is used to track and initiate worker shutdown
|
|
workerContext, workerContextCancel := context.WithCancel(context.Background())
|
|
|
|
defer func() {
|
|
plog.Debug("starting to shut down controller workers", "controller", c.Name(), "workers", workers)
|
|
c.queue.ShutDown() // shutdown the controller queue first
|
|
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
|
|
|
|
// Wait for all workers to finish their job.
|
|
// at this point the Run() can hang and callers have to implement the logic that will kill
|
|
// this controller (SIGKILL).
|
|
workerWg.Wait()
|
|
plog.Debug("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers)
|
|
}()
|
|
|
|
for i := 1; i <= workers; i++ {
|
|
idx := i
|
|
plog.Debug("starting worker", "controller", c.Name(), "worker", idx)
|
|
workerWg.Add(1)
|
|
go func() {
|
|
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
|
defer func() {
|
|
plog.Debug("shutting down worker", "controller", c.Name(), "worker", idx)
|
|
workerWg.Done()
|
|
}()
|
|
c.runWorker(workerContext)
|
|
}()
|
|
}
|
|
|
|
plog.Debug("controller started", "controller", c.Name(), "workers", workers)
|
|
<-ctx.Done() // wait for controller context to be cancelled
|
|
plog.Debug("controller context cancelled, next will terminate workers", "controller", c.Name(), "workers", workers)
|
|
}
|
|
|
|
func (c *controller) invokeAllRunOpts() {
|
|
c.run = true
|
|
for _, opt := range c.runOpts {
|
|
opt(c)
|
|
}
|
|
}
|
|
|
|
func (c *controller) Name() string {
|
|
return c.config.Name
|
|
}
|
|
|
|
func (c *controller) sync(ctx Context) error {
|
|
return c.config.Syncer.Sync(ctx)
|
|
}
|
|
|
|
func (c *controller) wrap(wrapper SyncWrapperFunc) {
|
|
c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) {
|
|
c.config.Syncer = wrapper(c.config.Syncer)
|
|
}))
|
|
}
|
|
|
|
func (c *controller) waitForCacheSyncWithTimeout() bool {
|
|
// prevent us from blocking forever due to a broken informer
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
|
defer cancel()
|
|
|
|
return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...)
|
|
}
|
|
|
|
func (c *controller) add(filter Filter, object metav1.Object) {
|
|
key := filter.Parent(object)
|
|
c.queueWrapper.Add(key)
|
|
}
|
|
|
|
// runWorker runs a single worker
|
|
// The worker is asked to terminate when the passed context is cancelled.
|
|
func (c *controller) runWorker(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
c.processNextWorkItem(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *controller) processNextWorkItem(ctx context.Context) {
|
|
queueKey, quit := c.queue.Get()
|
|
if quit {
|
|
return
|
|
}
|
|
|
|
key := queueKey.(Key)
|
|
defer c.queue.Done(key)
|
|
|
|
syncCtx := Context{
|
|
Context: ctx,
|
|
Name: c.Name(),
|
|
Key: key,
|
|
Queue: c.queueWrapper,
|
|
Recorder: c.recorder,
|
|
}
|
|
|
|
err := c.sync(syncCtx)
|
|
c.handleKey(key, err)
|
|
}
|
|
|
|
func (c *controller) handleKey(key Key, err error) {
|
|
if err == nil {
|
|
c.queue.Forget(key)
|
|
return
|
|
}
|
|
|
|
retryForever := c.maxRetries <= 0
|
|
shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries
|
|
|
|
if !shouldRetry {
|
|
utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err))
|
|
c.queue.Forget(key)
|
|
return
|
|
}
|
|
|
|
if errors.Is(err, ErrSyntheticRequeue) {
|
|
// logging this helps detecting wedged controllers with missing pre-requirements
|
|
plog.Debug("requested synthetic requeue", "controller", c.Name(), "key", key)
|
|
} else {
|
|
utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err))
|
|
}
|
|
|
|
c.queue.AddRateLimited(key)
|
|
}
|