leader election: in-memory leader status is stopped before release
This change fixes a small race condition that occurred when the current leader failed to renew its lease. Before this change, the leader would first release the lease via the Kube API and then would update its in-memory status to reflect that change. Now those events occur in the reverse (i.e. correct) order. Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
parent
f7751d13fe
commit
c0617ceda4
@ -11,6 +11,7 @@ import (
|
||||
"go.uber.org/atomic"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
|
||||
@ -43,47 +44,12 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
return nil, nil, fmt.Errorf("could not create internal client for leader election: %w", err)
|
||||
}
|
||||
|
||||
isLeader := atomic.NewBool(false)
|
||||
isLeader := &isLeaderTracker{tracker: atomic.NewBool(false)}
|
||||
|
||||
identity := podInfo.Name
|
||||
leaseName := deployment.Name
|
||||
|
||||
leaderElectionConfig := leaderelection.LeaderElectionConfig{
|
||||
Lock: &resourcelock.LeaseLock{
|
||||
LeaseMeta: metav1.ObjectMeta{
|
||||
Namespace: podInfo.Namespace,
|
||||
Name: leaseName,
|
||||
},
|
||||
Client: internalClient.Kubernetes.CoordinationV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: identity,
|
||||
},
|
||||
},
|
||||
ReleaseOnCancel: true, // semantics for correct release handled by controllersWithLeaderElector below
|
||||
LeaseDuration: 60 * time.Second,
|
||||
RenewDeadline: 15 * time.Second,
|
||||
RetryPeriod: 5 * time.Second,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(_ context.Context) {
|
||||
plog.Debug("leader gained", "identity", identity)
|
||||
isLeader.Store(true)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
plog.Debug("leader lost", "identity", identity)
|
||||
isLeader.Store(false)
|
||||
},
|
||||
OnNewLeader: func(newLeader string) {
|
||||
if newLeader == identity {
|
||||
return
|
||||
}
|
||||
plog.Debug("new leader elected", "newLeader", newLeader)
|
||||
},
|
||||
},
|
||||
Name: leaseName,
|
||||
// this must be set to nil because we do not want to associate /healthz with a failed
|
||||
// leader election renewal as we do not want to exit the process if the leader changes.
|
||||
WatchDog: nil,
|
||||
}
|
||||
leaderElectionConfig := newLeaderElectionConfig(podInfo.Namespace, leaseName, identity, internalClient.Kubernetes, isLeader)
|
||||
|
||||
// validate our config here before we rely on it being functioning below
|
||||
if _, err := leaderelection.NewLeaderElector(leaderElectionConfig); err != nil {
|
||||
@ -103,7 +69,7 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
return
|
||||
}
|
||||
|
||||
if isLeader.Load() { // only perform "expensive" test for writes
|
||||
if isLeader.canWrite() { // only perform "expensive" test for writes
|
||||
return // we are currently the leader, all actions are permitted
|
||||
}
|
||||
|
||||
@ -127,7 +93,11 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context
|
||||
|
||||
go func() {
|
||||
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
||||
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
||||
|
||||
if isLeader.stop() { // remove our in-memory leader status before we release the lock
|
||||
plog.Debug("leader lost", "identity", identity, "reason", "controller stop")
|
||||
}
|
||||
leaderElectorCancel() // once the controllers have all stopped, tell the leader elector to release the lock
|
||||
}()
|
||||
|
||||
@ -148,3 +118,122 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
|
||||
return client, controllersWithLeaderElector, nil
|
||||
}
|
||||
|
||||
func newLeaderElectionConfig(namespace, leaseName, identity string, internalClient kubernetes.Interface, isLeader *isLeaderTracker) leaderelection.LeaderElectionConfig {
|
||||
return leaderelection.LeaderElectionConfig{
|
||||
Lock: &releaseLock{
|
||||
delegate: &resourcelock.LeaseLock{
|
||||
LeaseMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: leaseName,
|
||||
},
|
||||
Client: internalClient.CoordinationV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: identity,
|
||||
},
|
||||
},
|
||||
isLeader: isLeader,
|
||||
identity: identity,
|
||||
},
|
||||
ReleaseOnCancel: true, // semantics for correct release handled by releaseLock.Update and controllersWithLeaderElector below
|
||||
LeaseDuration: 60 * time.Second,
|
||||
RenewDeadline: 15 * time.Second,
|
||||
RetryPeriod: 5 * time.Second,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(_ context.Context) {
|
||||
plog.Debug("leader gained", "identity", identity)
|
||||
isLeader.start()
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
if isLeader.stop() { // barring changes to client-go, this branch should only be taken on a panic
|
||||
plog.Debug("leader lost", "identity", identity, "reason", "on stop")
|
||||
}
|
||||
},
|
||||
OnNewLeader: func(newLeader string) {
|
||||
if newLeader == identity {
|
||||
return
|
||||
}
|
||||
plog.Debug("new leader elected", "newLeader", newLeader)
|
||||
},
|
||||
},
|
||||
Name: leaseName,
|
||||
// this must be set to nil because we do not want to associate /healthz with a failed
|
||||
// leader election renewal as we do not want to exit the process if the leader changes.
|
||||
WatchDog: nil,
|
||||
}
|
||||
}
|
||||
|
||||
type isLeaderTracker struct {
|
||||
tracker *atomic.Bool
|
||||
}
|
||||
|
||||
func (t *isLeaderTracker) canWrite() bool {
|
||||
return t.tracker.Load()
|
||||
}
|
||||
|
||||
func (t *isLeaderTracker) start() {
|
||||
t.tracker.Store(true)
|
||||
}
|
||||
|
||||
func (t *isLeaderTracker) stop() (didStop bool) {
|
||||
return t.tracker.CAS(true, false)
|
||||
}
|
||||
|
||||
// note that resourcelock.Interface is an internal, unstable interface.
|
||||
// so while it would be convenient to embed the implementation within
|
||||
// this struct, we need to make sure our Update override is used and
|
||||
// that no other methods are added that change the meaning of the
|
||||
// interface. thus we must have ~20 lines of boilerplate to have the
|
||||
// compiler ensure that we keep up with this interface over time.
|
||||
var _ resourcelock.Interface = &releaseLock{}
|
||||
|
||||
// releaseLock works around a limitation of the client-go leader election code:
|
||||
// there is no "BeforeRelease" callback. By the time the "OnStoppedLeading"
|
||||
// callback runs (this callback is meant to always run at the very end since it
|
||||
// normally terminates the process), we have already released the lock. This
|
||||
// creates a race condition in between the release call (the Update func) and the
|
||||
// stop callback where a different client could acquire the lease while we still
|
||||
// believe that we hold the lease in our in-memory leader status.
|
||||
type releaseLock struct {
|
||||
delegate resourcelock.Interface // do not embed this, see comment above
|
||||
isLeader *isLeaderTracker
|
||||
identity string
|
||||
}
|
||||
|
||||
func (r *releaseLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
||||
// setting an empty HolderIdentity on update means that the client is releasing the lock.
|
||||
// thus we need to make sure to update our in-memory leader status before this occurs
|
||||
// since other clients could immediately acquire the lock. note that even if the Update
|
||||
// call below fails, this client has already chosen to release the lock and thus we must
|
||||
// update the in-memory status regardless of it we succeed in making the Kube API call.
|
||||
// note that while resourcelock.Interface is an unstable interface, the meaning of an
|
||||
// empty HolderIdentity is encoded into the Kube API and thus we can safely rely on that
|
||||
// not changing (since changing that would break older clients).
|
||||
if len(ler.HolderIdentity) == 0 && r.isLeader.stop() {
|
||||
plog.Debug("leader lost", "identity", r.identity, "reason", "release")
|
||||
}
|
||||
|
||||
return r.delegate.Update(ctx, ler)
|
||||
}
|
||||
|
||||
// boilerplate passthrough methods below
|
||||
|
||||
func (r *releaseLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) {
|
||||
return r.delegate.Get(ctx)
|
||||
}
|
||||
|
||||
func (r *releaseLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
||||
return r.delegate.Create(ctx, ler)
|
||||
}
|
||||
|
||||
func (r *releaseLock) RecordEvent(s string) {
|
||||
r.delegate.RecordEvent(s)
|
||||
}
|
||||
|
||||
func (r *releaseLock) Identity() string {
|
||||
return r.delegate.Identity()
|
||||
}
|
||||
|
||||
func (r *releaseLock) Describe() string {
|
||||
return r.delegate.Describe()
|
||||
}
|
||||
|
83
internal/leaderelection/leaderelection_test.go
Normal file
83
internal/leaderelection/leaderelection_test.go
Normal file
@ -0,0 +1,83 @@
|
||||
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package leaderelection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
coordinationv1 "k8s.io/api/coordination/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
kubetesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
// see test/integration/leaderelection_test.go for the bulk of the testing related to this code
|
||||
|
||||
func Test_releaseLock_Update(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
f func(t *testing.T, internalClient *kubefake.Clientset, isLeader *isLeaderTracker, cancel context.CancelFunc)
|
||||
}{
|
||||
{
|
||||
name: "renewal fails on update",
|
||||
f: func(t *testing.T, internalClient *kubefake.Clientset, isLeader *isLeaderTracker, cancel context.CancelFunc) {
|
||||
internalClient.PrependReactor("update", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
lease := action.(kubetesting.UpdateAction).GetObject().(*coordinationv1.Lease)
|
||||
if len(pointer.StringDeref(lease.Spec.HolderIdentity, "")) == 0 {
|
||||
require.False(t, isLeader.canWrite(), "client must release in-memory leader status before Kube API call")
|
||||
}
|
||||
return true, nil, errors.New("cannot renew")
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "renewal fails due to context",
|
||||
f: func(t *testing.T, internalClient *kubefake.Clientset, isLeader *isLeaderTracker, cancel context.CancelFunc) {
|
||||
t.Cleanup(func() {
|
||||
require.False(t, isLeader.canWrite(), "client must release in-memory leader status when context is canceled")
|
||||
})
|
||||
start := time.Now()
|
||||
internalClient.PrependReactor("update", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
// keep going for a bit
|
||||
if time.Since(start) < 5*time.Second {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
cancel()
|
||||
return false, nil, nil
|
||||
})
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
internalClient := kubefake.NewSimpleClientset()
|
||||
isLeader := &isLeaderTracker{tracker: atomic.NewBool(false)}
|
||||
|
||||
leaderElectorCtx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
tt.f(t, internalClient, isLeader, cancel)
|
||||
|
||||
leaderElectionConfig := newLeaderElectionConfig("ns-001", "lease-001", "foo-001", internalClient, isLeader)
|
||||
|
||||
// make the tests run quicker
|
||||
leaderElectionConfig.LeaseDuration = 2 * time.Second
|
||||
leaderElectionConfig.RenewDeadline = 1 * time.Second
|
||||
leaderElectionConfig.RetryPeriod = 250 * time.Millisecond
|
||||
|
||||
// note that this will block until it exits on its own or tt.f calls cancel()
|
||||
leaderelection.RunOrDie(leaderElectorCtx, leaderElectionConfig)
|
||||
})
|
||||
}
|
||||
}
|
@ -6,7 +6,6 @@ package integration
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -40,7 +39,7 @@ func TestLeaderElection(t *testing.T) {
|
||||
|
||||
namespace := testlib.CreateNamespace(ctx, t, leaseName)
|
||||
|
||||
clients := leaderElectionClients(t, namespace, leaseName)
|
||||
clients, cancels := leaderElectionClients(t, namespace, leaseName)
|
||||
|
||||
// the tests below are order dependant to some degree and definitely cannot be run in parallel
|
||||
|
||||
@ -68,9 +67,52 @@ func TestLeaderElection(t *testing.T) {
|
||||
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
|
||||
logLease(t, lease)
|
||||
})
|
||||
|
||||
t.Run("stop current leader", func(t *testing.T) {
|
||||
startLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||
startTransitions := *startLease.Spec.LeaseTransitions
|
||||
startTime := *startLease.Spec.AcquireTime
|
||||
startLeaderIdentity := *startLease.Spec.HolderIdentity
|
||||
|
||||
leaderClient := clients[startLeaderIdentity]
|
||||
|
||||
err := runWriteRequest(ctx, leaderClient)
|
||||
require.NoError(t, err)
|
||||
|
||||
// emulate stopping the leader process
|
||||
cancels[startLeaderIdentity]()
|
||||
delete(clients, startLeaderIdentity)
|
||||
|
||||
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
||||
err := runWriteRequest(ctx, leaderClient)
|
||||
requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "leader should no longer be able to write")
|
||||
}, time.Minute, time.Second)
|
||||
|
||||
if len(clients) > 0 {
|
||||
finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||
finalTransitions := *finalLease.Spec.LeaseTransitions
|
||||
finalTime := *finalLease.Spec.AcquireTime
|
||||
finalLeaderIdentity := *finalLease.Spec.HolderIdentity
|
||||
|
||||
require.Greater(t, finalTransitions, startTransitions)
|
||||
require.Greater(t, finalTime.UnixNano(), startTime.UnixNano())
|
||||
require.NotEqual(t, startLeaderIdentity, finalLeaderIdentity, "should have elected new leader")
|
||||
|
||||
logLease(t, finalLease)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("sanity check write prevention after stopping leader", func(t *testing.T) {
|
||||
if len(clients) == 0 {
|
||||
t.Skip("no clients left to check")
|
||||
}
|
||||
|
||||
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
|
||||
logLease(t, lease)
|
||||
})
|
||||
}
|
||||
|
||||
func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) *kubeclient.Client {
|
||||
func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) (*kubeclient.Client, context.CancelFunc) {
|
||||
t.Helper()
|
||||
|
||||
podInfo := &downward.PodInfo{
|
||||
@ -119,23 +161,24 @@ func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName,
|
||||
leaderCancel()
|
||||
}()
|
||||
|
||||
return client
|
||||
return client, controllerCancel
|
||||
}
|
||||
|
||||
func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) map[string]*kubeclient.Client {
|
||||
func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) (map[string]*kubeclient.Client, map[string]context.CancelFunc) {
|
||||
t.Helper()
|
||||
|
||||
count := rand.IntnRange(1, 6)
|
||||
out := make(map[string]*kubeclient.Client, count)
|
||||
clients := make(map[string]*kubeclient.Client, count)
|
||||
cancels := make(map[string]context.CancelFunc, count)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
identity := "leader-election-client-" + rand.String(5)
|
||||
out[identity] = leaderElectionClient(t, namespace, leaseName, identity)
|
||||
clients[identity], cancels[identity] = leaderElectionClient(t, namespace, leaseName, identity)
|
||||
}
|
||||
|
||||
t.Logf("running leader election client tests with %d clients: %v", len(out), sets.StringKeySet(out).List())
|
||||
t.Logf("running leader election client tests with %d clients: %v", len(clients), sets.StringKeySet(clients).List())
|
||||
|
||||
return out
|
||||
return clients, cancels
|
||||
}
|
||||
|
||||
func pickRandomLeaderElectionClient(clients map[string]*kubeclient.Client) *kubeclient.Client {
|
||||
@ -209,7 +252,7 @@ func checkOnlyLeaderCanWrite(ctx context.Context, t *testing.T, namespace *corev
|
||||
} else {
|
||||
nonLeaders++
|
||||
requireEventually.Error(err, "non leader client %q should have write error but it was nil", identity)
|
||||
requireEventually.True(errors.Is(err, leaderelection.ErrNotLeader), "non leader client %q should have write error: %v", identity, err)
|
||||
requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "non leader client %q should have write error: %v", identity, err)
|
||||
}
|
||||
}
|
||||
requireEventually.Equal(1, leaders, "did not see leader")
|
||||
|
Loading…
Reference in New Issue
Block a user