// Copyright 2021-2023 the Pinniped contributors. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package leaderelection import ( "context" "fmt" "sync/atomic" "time" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "go.pinniped.dev/internal/constable" "go.pinniped.dev/internal/controllerinit" "go.pinniped.dev/internal/downward" "go.pinniped.dev/internal/kubeclient" "go.pinniped.dev/internal/plog" ) const ErrNotLeader constable.Error = "write attempt rejected as client is not leader" // New returns a client that has a leader election middleware injected into it. // This middleware will prevent all non-read requests to the Kubernetes API when // the current process does not hold the leader election lock. Unlike normal // leader election where the process blocks until it acquires the lock, this // middleware approach lets the process run as normal for all read requests. // Another difference is that if the process acquires the lock and then loses it // (i.e. a failed renewal), it will not exit (i.e. restart). Instead, it will // simply attempt to acquire the lock again. // // The returned function is blocking and will run the leader election polling // logic and will coordinate lease release with the input controller starter function. func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) ( *kubeclient.Client, controllerinit.RunnerWrapper, error, ) { internalClient, err := kubeclient.New(opts...) if err != nil { return nil, nil, fmt.Errorf("could not create internal client for leader election: %w", err) } isLeader := &isLeaderTracker{tracker: &atomic.Bool{}} identity := podInfo.Name leaseName := deployment.Name leaderElectionConfig := newLeaderElectionConfig(podInfo.Namespace, leaseName, identity, internalClient.Kubernetes, isLeader) // validate our config here before we rely on it being functioning below if _, err := leaderelection.NewLeaderElector(leaderElectionConfig); err != nil { return nil, nil, fmt.Errorf("invalid config - could not create leader elector: %w", err) } writeOnlyWhenLeader := kubeclient.MiddlewareFunc(func(_ context.Context, rt kubeclient.RoundTrip) { switch rt.Verb() { case kubeclient.VerbGet, kubeclient.VerbList, kubeclient.VerbWatch: // reads are always allowed. // note that while our pods/exec into the kube cert agent pod is a write request from the // perspective of the Kube API, it is semantically a read request since no mutation occurs. // we simply use it to fill a cache, and we need all pods to have a functioning cache. // however, we do not need to handle it here because remotecommand.NewSPDYExecutor uses a // kubeclient.Client.JSONConfig as input. since our middleware logic is only injected into // the generated clientset code, this JSONConfig simply ignores this middleware all together. return } if isLeader.canWrite() { // only perform "expensive" test for writes return // we are currently the leader, all actions are permitted } rt.MutateRequest(func(_ kubeclient.Object) error { return ErrNotLeader // we are not the leader, fail the write request }) }) leaderElectionOpts := append( // all middleware are always executed so this being the first middleware is not relevant []kubeclient.Option{kubeclient.WithMiddleware(writeOnlyWhenLeader)}, opts..., // do not mutate input slice ) client, err := kubeclient.New(leaderElectionOpts...) if err != nil { return nil, nil, fmt.Errorf("could not create leader election client: %w", err) } controllersWithLeaderElector := func(ctx context.Context, controllers controllerinit.Runner) { plog.Debug("leader election loop start", "identity", identity) defer plog.Debug("leader election loop shutdown", "identity", identity) leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context go func() { controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled plog.Debug("leader election saw controllers have stopped") if isLeader.stop() { // remove our in-memory leader status before we release the lock plog.Debug("leader lost", "identity", identity, "reason", "controller stop") } leaderElectorCancel() // once the controllers have all stopped, tell the leader elector to release the lock }() for { // run (and rerun on release) the leader elector with its own context (blocking) select { case <-leaderElectorCtx.Done(): return // keep trying to run until process exit default: // blocks while trying to acquire lease, unblocks on release. // note that this creates a new leader elector on each loop to // prevent any bugs from reusing that struct across elections. // our config was validated above so this should never die. leaderelection.RunOrDie(leaderElectorCtx, leaderElectionConfig) } } } return client, controllersWithLeaderElector, nil } func newLeaderElectionConfig(namespace, leaseName, identity string, internalClient kubernetes.Interface, isLeader *isLeaderTracker) leaderelection.LeaderElectionConfig { return leaderelection.LeaderElectionConfig{ Lock: &releaseLock{ delegate: &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Namespace: namespace, Name: leaseName, }, Client: internalClient.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: identity, }, }, isLeader: isLeader, identity: identity, }, ReleaseOnCancel: true, // semantics for correct release handled by releaseLock.Update and controllersWithLeaderElector below // Copied from defaults used in OpenShift since we want the same semantics: // https://github.com/openshift/library-go/blob/e14e06ba8d476429b10cc6f6c0fcfe6ea4f2c591/pkg/config/leaderelection/leaderelection.go#L87-L109 LeaseDuration: 137 * time.Second, RenewDeadline: 107 * time.Second, RetryPeriod: 26 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { plog.Debug("leader gained", "identity", identity) isLeader.start() }, OnStoppedLeading: func() { if isLeader.stop() { // barring changes to client-go, this branch should only be taken on a panic plog.Debug("leader lost", "identity", identity, "reason", "on stop") } }, OnNewLeader: func(newLeader string) { if newLeader == identity { return } plog.Debug("new leader elected", "newLeader", newLeader) }, }, Name: leaseName, // this must be set to nil because we do not want to associate /healthz with a failed // leader election renewal as we do not want to exit the process if the leader changes. WatchDog: nil, } } type isLeaderTracker struct { tracker *atomic.Bool } func (t *isLeaderTracker) canWrite() bool { return t.tracker.Load() } func (t *isLeaderTracker) start() { t.tracker.Store(true) } func (t *isLeaderTracker) stop() (didStop bool) { return t.tracker.CompareAndSwap(true, false) } // note that resourcelock.Interface is an internal, unstable interface. // so while it would be convenient to embed the implementation within // this struct, we need to make sure our Update override is used and // that no other methods are added that change the meaning of the // interface. thus we must have ~20 lines of boilerplate to have the // compiler ensure that we keep up with this interface over time. var _ resourcelock.Interface = &releaseLock{} // releaseLock works around a limitation of the client-go leader election code: // there is no "BeforeRelease" callback. By the time the "OnStoppedLeading" // callback runs (this callback is meant to always run at the very end since it // normally terminates the process), we have already released the lock. This // creates a race condition in between the release call (the Update func) and the // stop callback where a different client could acquire the lease while we still // believe that we hold the lease in our in-memory leader status. type releaseLock struct { delegate resourcelock.Interface // do not embed this, see comment above isLeader *isLeaderTracker identity string } func (r *releaseLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { // setting an empty HolderIdentity on update means that the client is releasing the lock. // thus we need to make sure to update our in-memory leader status before this occurs // since other clients could immediately acquire the lock. note that even if the Update // call below fails, this client has already chosen to release the lock and thus we must // update the in-memory status regardless of it we succeed in making the Kube API call. // note that while resourcelock.Interface is an unstable interface, the meaning of an // empty HolderIdentity is encoded into the Kube API and thus we can safely rely on that // not changing (since changing that would break older clients). if len(ler.HolderIdentity) == 0 && r.isLeader.stop() { plog.Debug("leader lost", "identity", r.identity, "reason", "release") } return r.delegate.Update(ctx, ler) } // boilerplate passthrough methods below func (r *releaseLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) { return r.delegate.Get(ctx) } func (r *releaseLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error { return r.delegate.Create(ctx, ler) } func (r *releaseLock) RecordEvent(s string) { r.delegate.RecordEvent(s) } func (r *releaseLock) Identity() string { return r.delegate.Identity() } func (r *releaseLock) Describe() string { return r.delegate.Describe() }