c6c2c525a6
Also fix some tests that were broken by bumping golang and dependencies in the previous commits. Note that in addition to changes made to satisfy the linter which do not impact the behavior of the code, this commit also adds ReadHeaderTimeout to all usages of http.Server to satisfy the linter (and because it seemed like a good suggestion).
248 lines
9.6 KiB
Go
248 lines
9.6 KiB
Go
// Copyright 2021-2022 the Pinniped contributors. All Rights Reserved.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package leaderelection
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"go.uber.org/atomic"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
|
|
"go.pinniped.dev/internal/constable"
|
|
"go.pinniped.dev/internal/controllerinit"
|
|
"go.pinniped.dev/internal/downward"
|
|
"go.pinniped.dev/internal/kubeclient"
|
|
"go.pinniped.dev/internal/plog"
|
|
)
|
|
|
|
const ErrNotLeader constable.Error = "write attempt rejected as client is not leader"
|
|
|
|
// New returns a client that has a leader election middleware injected into it.
|
|
// This middleware will prevent all non-read requests to the Kubernetes API when
|
|
// the current process does not hold the leader election lock. Unlike normal
|
|
// leader election where the process blocks until it acquires the lock, this
|
|
// middleware approach lets the process run as normal for all read requests.
|
|
// Another difference is that if the process acquires the lock and then loses it
|
|
// (i.e. a failed renewal), it will not exit (i.e. restart). Instead, it will
|
|
// simply attempt to acquire the lock again.
|
|
//
|
|
// The returned function is blocking and will run the leader election polling
|
|
// logic and will coordinate lease release with the input controller starter function.
|
|
func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) (
|
|
*kubeclient.Client,
|
|
controllerinit.RunnerWrapper,
|
|
error,
|
|
) {
|
|
internalClient, err := kubeclient.New(opts...)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("could not create internal client for leader election: %w", err)
|
|
}
|
|
|
|
isLeader := &isLeaderTracker{tracker: atomic.NewBool(false)}
|
|
|
|
identity := podInfo.Name
|
|
leaseName := deployment.Name
|
|
|
|
leaderElectionConfig := newLeaderElectionConfig(podInfo.Namespace, leaseName, identity, internalClient.Kubernetes, isLeader)
|
|
|
|
// validate our config here before we rely on it being functioning below
|
|
if _, err := leaderelection.NewLeaderElector(leaderElectionConfig); err != nil {
|
|
return nil, nil, fmt.Errorf("invalid config - could not create leader elector: %w", err)
|
|
}
|
|
|
|
writeOnlyWhenLeader := kubeclient.MiddlewareFunc(func(_ context.Context, rt kubeclient.RoundTrip) {
|
|
switch rt.Verb() {
|
|
case kubeclient.VerbGet, kubeclient.VerbList, kubeclient.VerbWatch:
|
|
// reads are always allowed.
|
|
// note that while our pods/exec into the kube cert agent pod is a write request from the
|
|
// perspective of the Kube API, it is semantically a read request since no mutation occurs.
|
|
// we simply use it to fill a cache, and we need all pods to have a functioning cache.
|
|
// however, we do not need to handle it here because remotecommand.NewSPDYExecutor uses a
|
|
// kubeclient.Client.JSONConfig as input. since our middleware logic is only injected into
|
|
// the generated clientset code, this JSONConfig simply ignores this middleware all together.
|
|
return
|
|
}
|
|
|
|
if isLeader.canWrite() { // only perform "expensive" test for writes
|
|
return // we are currently the leader, all actions are permitted
|
|
}
|
|
|
|
rt.MutateRequest(func(_ kubeclient.Object) error {
|
|
return ErrNotLeader // we are not the leader, fail the write request
|
|
})
|
|
})
|
|
|
|
leaderElectionOpts := append(
|
|
// all middleware are always executed so this being the first middleware is not relevant
|
|
[]kubeclient.Option{kubeclient.WithMiddleware(writeOnlyWhenLeader)},
|
|
opts..., // do not mutate input slice
|
|
)
|
|
|
|
client, err := kubeclient.New(leaderElectionOpts...)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("could not create leader election client: %w", err)
|
|
}
|
|
|
|
controllersWithLeaderElector := func(ctx context.Context, controllers controllerinit.Runner) {
|
|
plog.Debug("leader election loop start", "identity", identity)
|
|
defer plog.Debug("leader election loop shutdown", "identity", identity)
|
|
|
|
leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context
|
|
|
|
go func() {
|
|
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
|
|
|
if isLeader.stop() { // remove our in-memory leader status before we release the lock
|
|
plog.Debug("leader lost", "identity", identity, "reason", "controller stop")
|
|
}
|
|
leaderElectorCancel() // once the controllers have all stopped, tell the leader elector to release the lock
|
|
}()
|
|
|
|
for { // run (and rerun on release) the leader elector with its own context (blocking)
|
|
select {
|
|
case <-leaderElectorCtx.Done():
|
|
return // keep trying to run until process exit
|
|
|
|
default:
|
|
// blocks while trying to acquire lease, unblocks on release.
|
|
// note that this creates a new leader elector on each loop to
|
|
// prevent any bugs from reusing that struct across elections.
|
|
// our config was validated above so this should never die.
|
|
leaderelection.RunOrDie(leaderElectorCtx, leaderElectionConfig)
|
|
}
|
|
}
|
|
}
|
|
|
|
return client, controllersWithLeaderElector, nil
|
|
}
|
|
|
|
func newLeaderElectionConfig(namespace, leaseName, identity string, internalClient kubernetes.Interface, isLeader *isLeaderTracker) leaderelection.LeaderElectionConfig {
|
|
return leaderelection.LeaderElectionConfig{
|
|
Lock: &releaseLock{
|
|
delegate: &resourcelock.LeaseLock{
|
|
LeaseMeta: metav1.ObjectMeta{
|
|
Namespace: namespace,
|
|
Name: leaseName,
|
|
},
|
|
Client: internalClient.CoordinationV1(),
|
|
LockConfig: resourcelock.ResourceLockConfig{
|
|
Identity: identity,
|
|
},
|
|
},
|
|
isLeader: isLeader,
|
|
identity: identity,
|
|
},
|
|
ReleaseOnCancel: true, // semantics for correct release handled by releaseLock.Update and controllersWithLeaderElector below
|
|
|
|
// Copied from defaults used in OpenShift since we want the same semantics:
|
|
// https://github.com/openshift/library-go/blob/e14e06ba8d476429b10cc6f6c0fcfe6ea4f2c591/pkg/config/leaderelection/leaderelection.go#L87-L109
|
|
LeaseDuration: 137 * time.Second,
|
|
RenewDeadline: 107 * time.Second,
|
|
RetryPeriod: 26 * time.Second,
|
|
|
|
Callbacks: leaderelection.LeaderCallbacks{
|
|
OnStartedLeading: func(_ context.Context) {
|
|
plog.Debug("leader gained", "identity", identity)
|
|
isLeader.start()
|
|
},
|
|
OnStoppedLeading: func() {
|
|
if isLeader.stop() { // barring changes to client-go, this branch should only be taken on a panic
|
|
plog.Debug("leader lost", "identity", identity, "reason", "on stop")
|
|
}
|
|
},
|
|
OnNewLeader: func(newLeader string) {
|
|
if newLeader == identity {
|
|
return
|
|
}
|
|
plog.Debug("new leader elected", "newLeader", newLeader)
|
|
},
|
|
},
|
|
Name: leaseName,
|
|
// this must be set to nil because we do not want to associate /healthz with a failed
|
|
// leader election renewal as we do not want to exit the process if the leader changes.
|
|
WatchDog: nil,
|
|
}
|
|
}
|
|
|
|
type isLeaderTracker struct {
|
|
tracker *atomic.Bool
|
|
}
|
|
|
|
func (t *isLeaderTracker) canWrite() bool {
|
|
return t.tracker.Load()
|
|
}
|
|
|
|
func (t *isLeaderTracker) start() {
|
|
t.tracker.Store(true)
|
|
}
|
|
|
|
func (t *isLeaderTracker) stop() (didStop bool) {
|
|
return t.tracker.CompareAndSwap(true, false)
|
|
}
|
|
|
|
// note that resourcelock.Interface is an internal, unstable interface.
|
|
// so while it would be convenient to embed the implementation within
|
|
// this struct, we need to make sure our Update override is used and
|
|
// that no other methods are added that change the meaning of the
|
|
// interface. thus we must have ~20 lines of boilerplate to have the
|
|
// compiler ensure that we keep up with this interface over time.
|
|
var _ resourcelock.Interface = &releaseLock{}
|
|
|
|
// releaseLock works around a limitation of the client-go leader election code:
|
|
// there is no "BeforeRelease" callback. By the time the "OnStoppedLeading"
|
|
// callback runs (this callback is meant to always run at the very end since it
|
|
// normally terminates the process), we have already released the lock. This
|
|
// creates a race condition in between the release call (the Update func) and the
|
|
// stop callback where a different client could acquire the lease while we still
|
|
// believe that we hold the lease in our in-memory leader status.
|
|
type releaseLock struct {
|
|
delegate resourcelock.Interface // do not embed this, see comment above
|
|
isLeader *isLeaderTracker
|
|
identity string
|
|
}
|
|
|
|
func (r *releaseLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
|
// setting an empty HolderIdentity on update means that the client is releasing the lock.
|
|
// thus we need to make sure to update our in-memory leader status before this occurs
|
|
// since other clients could immediately acquire the lock. note that even if the Update
|
|
// call below fails, this client has already chosen to release the lock and thus we must
|
|
// update the in-memory status regardless of it we succeed in making the Kube API call.
|
|
// note that while resourcelock.Interface is an unstable interface, the meaning of an
|
|
// empty HolderIdentity is encoded into the Kube API and thus we can safely rely on that
|
|
// not changing (since changing that would break older clients).
|
|
if len(ler.HolderIdentity) == 0 && r.isLeader.stop() {
|
|
plog.Debug("leader lost", "identity", r.identity, "reason", "release")
|
|
}
|
|
|
|
return r.delegate.Update(ctx, ler)
|
|
}
|
|
|
|
// boilerplate passthrough methods below
|
|
|
|
func (r *releaseLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) {
|
|
return r.delegate.Get(ctx)
|
|
}
|
|
|
|
func (r *releaseLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
|
return r.delegate.Create(ctx, ler)
|
|
}
|
|
|
|
func (r *releaseLock) RecordEvent(s string) {
|
|
r.delegate.RecordEvent(s)
|
|
}
|
|
|
|
func (r *releaseLock) Identity() string {
|
|
return r.delegate.Identity()
|
|
}
|
|
|
|
func (r *releaseLock) Describe() string {
|
|
return r.delegate.Describe()
|
|
}
|