ContainerImage.Pinniped/internal/controllerlib/controller.go
Ryan Richard ca6c29e463 Fix deadlock during shutdown which prevented leader election cleanup
Before this fix, the deadlock would prevent the leader pod from giving
up its lease, which would make it take several minutes for new pods to
be allowed to elect a new leader. During that time, no Pinniped
controllers could write to the Kube API, so important resources were not
being updated during that window. It would also make pod shutdown take
about 1 minute.

After this fix, the leader gives up its lease immediately, and pod
shutdown takes about 1 second. This improves restart/upgrade time and
also fixes the problem where there was no leader for several minutes
after a restart/upgrade.

The deadlock was between the post-start hook and the pre-shutdown hook.
The pre-shutdown hook blocked until a certain background goroutine in
the post-start hook finished, but that goroutine could not finish until
the pre-shutdown hook finished. Thus, they were both blocked, waiting
for each other infinitely. Eventually the process would be externally
killed.

This deadlock was most likely introduced by some change in Kube's
generic api server package related to how the many complex channels used
during server shutdown interact with each other, and was not noticed
when we upgraded to the version which introduced the change.
2023-09-20 16:54:24 -07:00

227 lines
6.1 KiB
Go

// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package controllerlib
import (
"context"
"errors"
"fmt"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"go.pinniped.dev/internal/plog"
)
// Controller interface represents a runnable Kubernetes controller.
// Cancelling the context passed will cause the controller to shutdown.
// Number of workers determine how much parallel the job processing should be.
type Controller interface {
// Run runs the controller and blocks until the controller is finished.
// Number of workers can be specified via workers parameter.
// This function will return when all internal loops are finished.
// Note that having more than one worker usually means handing parallelization of Sync().
Run(ctx context.Context, workers int)
// Name returns the controller name string.
Name() string
// The methods below should only be called during tests via the Test* functions.
// sync contains the main controller logic.
// This can be used in unit tests to exercise the Syncer by directly calling it.
sync(ctx Context) error
// wrap wraps the main controller logic provided via the Syncer.
// This can be used in tests to synchronize asynchronous events as seen by a running controller.
// The wrapping must be done after New is called and before Run is called.
wrap(wrapper SyncWrapperFunc)
// These are called by the Run() method but also need to be called by Test* functions sometimes.
waitForCacheSyncWithTimeout() bool
invokeAllRunOpts()
}
var _ Controller = &controller{}
type Config struct {
Name string
Syncer Syncer
}
func New(config Config, opts ...Option) Controller {
c := &controller{
config: config,
}
// set up defaults
WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c)
WithRecorder(klogRecorder{})(c)
for _, opt := range opts {
opt(c)
}
return c
}
type controller struct {
config Config
queue workqueue.RateLimitingInterface
queueWrapper Queue
maxRetries int
recorder events.EventRecorder
run bool
runOpts []Option
cacheSyncs []cache.InformerSynced
}
func (c *controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
plog.Debug("starting controller", "controller", c.Name(), "workers", workers)
c.invokeAllRunOpts()
if !c.waitForCacheSyncWithTimeout() {
panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name())))
}
var workerWg sync.WaitGroup
// workerContext is used to track and initiate worker shutdown
workerContext, workerContextCancel := context.WithCancel(context.Background())
defer func() {
plog.Debug("starting to shut down controller workers", "controller", c.Name(), "workers", workers)
c.queue.ShutDown() // shutdown the controller queue first
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
// Wait for all workers to finish their job.
// at this point the Run() can hang and callers have to implement the logic that will kill
// this controller (SIGKILL).
workerWg.Wait()
plog.Debug("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers)
}()
for i := 1; i <= workers; i++ {
idx := i
plog.Debug("starting worker", "controller", c.Name(), "worker", idx)
workerWg.Add(1)
go func() {
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
defer func() {
plog.Debug("shutting down worker", "controller", c.Name(), "worker", idx)
workerWg.Done()
}()
c.runWorker(workerContext)
}()
}
plog.Debug("controller started", "controller", c.Name(), "workers", workers)
<-ctx.Done() // wait for controller context to be cancelled
plog.Debug("controller context cancelled, next will terminate workers", "controller", c.Name(), "workers", workers)
}
func (c *controller) invokeAllRunOpts() {
c.run = true
for _, opt := range c.runOpts {
opt(c)
}
}
func (c *controller) Name() string {
return c.config.Name
}
func (c *controller) sync(ctx Context) error {
return c.config.Syncer.Sync(ctx)
}
func (c *controller) wrap(wrapper SyncWrapperFunc) {
c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) {
c.config.Syncer = wrapper(c.config.Syncer)
}))
}
func (c *controller) waitForCacheSyncWithTimeout() bool {
// prevent us from blocking forever due to a broken informer
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...)
}
func (c *controller) add(filter Filter, object metav1.Object) {
key := filter.Parent(object)
c.queueWrapper.Add(key)
}
// runWorker runs a single worker
// The worker is asked to terminate when the passed context is cancelled.
func (c *controller) runWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
c.processNextWorkItem(ctx)
}
}
}
func (c *controller) processNextWorkItem(ctx context.Context) {
queueKey, quit := c.queue.Get()
if quit {
return
}
key := queueKey.(Key)
defer c.queue.Done(key)
syncCtx := Context{
Context: ctx,
Name: c.Name(),
Key: key,
Queue: c.queueWrapper,
Recorder: c.recorder,
}
err := c.sync(syncCtx)
c.handleKey(key, err)
}
func (c *controller) handleKey(key Key, err error) {
if err == nil {
c.queue.Forget(key)
return
}
retryForever := c.maxRetries <= 0
shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries
if !shouldRetry {
utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err))
c.queue.Forget(key)
return
}
if errors.Is(err, ErrSyntheticRequeue) {
// logging this helps detecting wedged controllers with missing pre-requirements
plog.Debug("requested synthetic requeue", "controller", c.Name(), "key", key)
} else {
utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err))
}
c.queue.AddRateLimited(key)
}