Merge pull request #800 from enj/enj/i/leader_election_release
leader election: fix small race duration lease release
This commit is contained in:
commit
c17e7bec49
@ -11,6 +11,7 @@ import (
|
||||
"go.uber.org/atomic"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
|
||||
@ -43,47 +44,12 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
return nil, nil, fmt.Errorf("could not create internal client for leader election: %w", err)
|
||||
}
|
||||
|
||||
isLeader := atomic.NewBool(false)
|
||||
isLeader := &isLeaderTracker{tracker: atomic.NewBool(false)}
|
||||
|
||||
identity := podInfo.Name
|
||||
leaseName := deployment.Name
|
||||
|
||||
leaderElectionConfig := leaderelection.LeaderElectionConfig{
|
||||
Lock: &resourcelock.LeaseLock{
|
||||
LeaseMeta: metav1.ObjectMeta{
|
||||
Namespace: podInfo.Namespace,
|
||||
Name: leaseName,
|
||||
},
|
||||
Client: internalClient.Kubernetes.CoordinationV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: identity,
|
||||
},
|
||||
},
|
||||
ReleaseOnCancel: true, // semantics for correct release handled by controllersWithLeaderElector below
|
||||
LeaseDuration: 60 * time.Second,
|
||||
RenewDeadline: 15 * time.Second,
|
||||
RetryPeriod: 5 * time.Second,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(_ context.Context) {
|
||||
plog.Debug("leader gained", "identity", identity)
|
||||
isLeader.Store(true)
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
plog.Debug("leader lost", "identity", identity)
|
||||
isLeader.Store(false)
|
||||
},
|
||||
OnNewLeader: func(newLeader string) {
|
||||
if newLeader == identity {
|
||||
return
|
||||
}
|
||||
plog.Debug("new leader elected", "newLeader", newLeader)
|
||||
},
|
||||
},
|
||||
Name: leaseName,
|
||||
// this must be set to nil because we do not want to associate /healthz with a failed
|
||||
// leader election renewal as we do not want to exit the process if the leader changes.
|
||||
WatchDog: nil,
|
||||
}
|
||||
leaderElectionConfig := newLeaderElectionConfig(podInfo.Namespace, leaseName, identity, internalClient.Kubernetes, isLeader)
|
||||
|
||||
// validate our config here before we rely on it being functioning below
|
||||
if _, err := leaderelection.NewLeaderElector(leaderElectionConfig); err != nil {
|
||||
@ -103,7 +69,7 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
return
|
||||
}
|
||||
|
||||
if isLeader.Load() { // only perform "expensive" test for writes
|
||||
if isLeader.canWrite() { // only perform "expensive" test for writes
|
||||
return // we are currently the leader, all actions are permitted
|
||||
}
|
||||
|
||||
@ -127,7 +93,11 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context
|
||||
|
||||
go func() {
|
||||
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
||||
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
||||
|
||||
if isLeader.stop() { // remove our in-memory leader status before we release the lock
|
||||
plog.Debug("leader lost", "identity", identity, "reason", "controller stop")
|
||||
}
|
||||
leaderElectorCancel() // once the controllers have all stopped, tell the leader elector to release the lock
|
||||
}()
|
||||
|
||||
@ -148,3 +118,126 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
|
||||
|
||||
return client, controllersWithLeaderElector, nil
|
||||
}
|
||||
|
||||
func newLeaderElectionConfig(namespace, leaseName, identity string, internalClient kubernetes.Interface, isLeader *isLeaderTracker) leaderelection.LeaderElectionConfig {
|
||||
return leaderelection.LeaderElectionConfig{
|
||||
Lock: &releaseLock{
|
||||
delegate: &resourcelock.LeaseLock{
|
||||
LeaseMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: leaseName,
|
||||
},
|
||||
Client: internalClient.CoordinationV1(),
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: identity,
|
||||
},
|
||||
},
|
||||
isLeader: isLeader,
|
||||
identity: identity,
|
||||
},
|
||||
ReleaseOnCancel: true, // semantics for correct release handled by releaseLock.Update and controllersWithLeaderElector below
|
||||
|
||||
// Copied from defaults used in OpenShift since we want the same semantics:
|
||||
// https://github.com/openshift/library-go/blob/e14e06ba8d476429b10cc6f6c0fcfe6ea4f2c591/pkg/config/leaderelection/leaderelection.go#L87-L109
|
||||
LeaseDuration: 137 * time.Second,
|
||||
RenewDeadline: 107 * time.Second,
|
||||
RetryPeriod: 26 * time.Second,
|
||||
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: func(_ context.Context) {
|
||||
plog.Debug("leader gained", "identity", identity)
|
||||
isLeader.start()
|
||||
},
|
||||
OnStoppedLeading: func() {
|
||||
if isLeader.stop() { // barring changes to client-go, this branch should only be taken on a panic
|
||||
plog.Debug("leader lost", "identity", identity, "reason", "on stop")
|
||||
}
|
||||
},
|
||||
OnNewLeader: func(newLeader string) {
|
||||
if newLeader == identity {
|
||||
return
|
||||
}
|
||||
plog.Debug("new leader elected", "newLeader", newLeader)
|
||||
},
|
||||
},
|
||||
Name: leaseName,
|
||||
// this must be set to nil because we do not want to associate /healthz with a failed
|
||||
// leader election renewal as we do not want to exit the process if the leader changes.
|
||||
WatchDog: nil,
|
||||
}
|
||||
}
|
||||
|
||||
type isLeaderTracker struct {
|
||||
tracker *atomic.Bool
|
||||
}
|
||||
|
||||
func (t *isLeaderTracker) canWrite() bool {
|
||||
return t.tracker.Load()
|
||||
}
|
||||
|
||||
func (t *isLeaderTracker) start() {
|
||||
t.tracker.Store(true)
|
||||
}
|
||||
|
||||
func (t *isLeaderTracker) stop() (didStop bool) {
|
||||
return t.tracker.CAS(true, false)
|
||||
}
|
||||
|
||||
// note that resourcelock.Interface is an internal, unstable interface.
|
||||
// so while it would be convenient to embed the implementation within
|
||||
// this struct, we need to make sure our Update override is used and
|
||||
// that no other methods are added that change the meaning of the
|
||||
// interface. thus we must have ~20 lines of boilerplate to have the
|
||||
// compiler ensure that we keep up with this interface over time.
|
||||
var _ resourcelock.Interface = &releaseLock{}
|
||||
|
||||
// releaseLock works around a limitation of the client-go leader election code:
|
||||
// there is no "BeforeRelease" callback. By the time the "OnStoppedLeading"
|
||||
// callback runs (this callback is meant to always run at the very end since it
|
||||
// normally terminates the process), we have already released the lock. This
|
||||
// creates a race condition in between the release call (the Update func) and the
|
||||
// stop callback where a different client could acquire the lease while we still
|
||||
// believe that we hold the lease in our in-memory leader status.
|
||||
type releaseLock struct {
|
||||
delegate resourcelock.Interface // do not embed this, see comment above
|
||||
isLeader *isLeaderTracker
|
||||
identity string
|
||||
}
|
||||
|
||||
func (r *releaseLock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
||||
// setting an empty HolderIdentity on update means that the client is releasing the lock.
|
||||
// thus we need to make sure to update our in-memory leader status before this occurs
|
||||
// since other clients could immediately acquire the lock. note that even if the Update
|
||||
// call below fails, this client has already chosen to release the lock and thus we must
|
||||
// update the in-memory status regardless of it we succeed in making the Kube API call.
|
||||
// note that while resourcelock.Interface is an unstable interface, the meaning of an
|
||||
// empty HolderIdentity is encoded into the Kube API and thus we can safely rely on that
|
||||
// not changing (since changing that would break older clients).
|
||||
if len(ler.HolderIdentity) == 0 && r.isLeader.stop() {
|
||||
plog.Debug("leader lost", "identity", r.identity, "reason", "release")
|
||||
}
|
||||
|
||||
return r.delegate.Update(ctx, ler)
|
||||
}
|
||||
|
||||
// boilerplate passthrough methods below
|
||||
|
||||
func (r *releaseLock) Get(ctx context.Context) (*resourcelock.LeaderElectionRecord, []byte, error) {
|
||||
return r.delegate.Get(ctx)
|
||||
}
|
||||
|
||||
func (r *releaseLock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
|
||||
return r.delegate.Create(ctx, ler)
|
||||
}
|
||||
|
||||
func (r *releaseLock) RecordEvent(s string) {
|
||||
r.delegate.RecordEvent(s)
|
||||
}
|
||||
|
||||
func (r *releaseLock) Identity() string {
|
||||
return r.delegate.Identity()
|
||||
}
|
||||
|
||||
func (r *releaseLock) Describe() string {
|
||||
return r.delegate.Describe()
|
||||
}
|
||||
|
83
internal/leaderelection/leaderelection_test.go
Normal file
83
internal/leaderelection/leaderelection_test.go
Normal file
@ -0,0 +1,83 @@
|
||||
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package leaderelection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
coordinationv1 "k8s.io/api/coordination/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
kubetesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
// see test/integration/leaderelection_test.go for the bulk of the testing related to this code
|
||||
|
||||
func Test_releaseLock_Update(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
f func(t *testing.T, internalClient *kubefake.Clientset, isLeader *isLeaderTracker, cancel context.CancelFunc)
|
||||
}{
|
||||
{
|
||||
name: "renewal fails on update",
|
||||
f: func(t *testing.T, internalClient *kubefake.Clientset, isLeader *isLeaderTracker, cancel context.CancelFunc) {
|
||||
internalClient.PrependReactor("update", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
lease := action.(kubetesting.UpdateAction).GetObject().(*coordinationv1.Lease)
|
||||
if len(pointer.StringDeref(lease.Spec.HolderIdentity, "")) == 0 {
|
||||
require.False(t, isLeader.canWrite(), "client must release in-memory leader status before Kube API call")
|
||||
}
|
||||
return true, nil, errors.New("cannot renew")
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "renewal fails due to context",
|
||||
f: func(t *testing.T, internalClient *kubefake.Clientset, isLeader *isLeaderTracker, cancel context.CancelFunc) {
|
||||
t.Cleanup(func() {
|
||||
require.False(t, isLeader.canWrite(), "client must release in-memory leader status when context is canceled")
|
||||
})
|
||||
start := time.Now()
|
||||
internalClient.PrependReactor("update", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
// keep going for a bit
|
||||
if time.Since(start) < 5*time.Second {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
cancel()
|
||||
return false, nil, nil
|
||||
})
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
internalClient := kubefake.NewSimpleClientset()
|
||||
isLeader := &isLeaderTracker{tracker: atomic.NewBool(false)}
|
||||
|
||||
leaderElectorCtx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
tt.f(t, internalClient, isLeader, cancel)
|
||||
|
||||
leaderElectionConfig := newLeaderElectionConfig("ns-001", "lease-001", "foo-001", internalClient, isLeader)
|
||||
|
||||
// make the tests run quicker
|
||||
leaderElectionConfig.LeaseDuration = 2 * time.Second
|
||||
leaderElectionConfig.RenewDeadline = 1 * time.Second
|
||||
leaderElectionConfig.RetryPeriod = 250 * time.Millisecond
|
||||
|
||||
// note that this will block until it exits on its own or tt.f calls cancel()
|
||||
leaderelection.RunOrDie(leaderElectorCtx, leaderElectionConfig)
|
||||
})
|
||||
}
|
||||
}
|
@ -6,7 +6,6 @@ package integration
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -40,7 +39,7 @@ func TestLeaderElection(t *testing.T) {
|
||||
|
||||
namespace := testlib.CreateNamespace(ctx, t, leaseName)
|
||||
|
||||
clients := leaderElectionClients(t, namespace, leaseName)
|
||||
clients, cancels := leaderElectionClients(t, namespace, leaseName)
|
||||
|
||||
// the tests below are order dependant to some degree and definitely cannot be run in parallel
|
||||
|
||||
@ -68,9 +67,52 @@ func TestLeaderElection(t *testing.T) {
|
||||
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
|
||||
logLease(t, lease)
|
||||
})
|
||||
|
||||
t.Run("stop current leader", func(t *testing.T) {
|
||||
startLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||
startTransitions := *startLease.Spec.LeaseTransitions
|
||||
startTime := *startLease.Spec.AcquireTime
|
||||
startLeaderIdentity := *startLease.Spec.HolderIdentity
|
||||
|
||||
leaderClient := clients[startLeaderIdentity]
|
||||
|
||||
err := runWriteRequest(ctx, leaderClient)
|
||||
require.NoError(t, err)
|
||||
|
||||
// emulate stopping the leader process
|
||||
cancels[startLeaderIdentity]()
|
||||
delete(clients, startLeaderIdentity)
|
||||
|
||||
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
||||
err := runWriteRequest(ctx, leaderClient)
|
||||
requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "leader should no longer be able to write")
|
||||
}, time.Minute, time.Second)
|
||||
|
||||
if len(clients) > 0 {
|
||||
finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||
finalTransitions := *finalLease.Spec.LeaseTransitions
|
||||
finalTime := *finalLease.Spec.AcquireTime
|
||||
finalLeaderIdentity := *finalLease.Spec.HolderIdentity
|
||||
|
||||
require.Greater(t, finalTransitions, startTransitions)
|
||||
require.Greater(t, finalTime.UnixNano(), startTime.UnixNano())
|
||||
require.NotEqual(t, startLeaderIdentity, finalLeaderIdentity, "should have elected new leader")
|
||||
|
||||
logLease(t, finalLease)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("sanity check write prevention after stopping leader", func(t *testing.T) {
|
||||
if len(clients) == 0 {
|
||||
t.Skip("no clients left to check")
|
||||
}
|
||||
|
||||
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
|
||||
logLease(t, lease)
|
||||
})
|
||||
}
|
||||
|
||||
func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) *kubeclient.Client {
|
||||
func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) (*kubeclient.Client, context.CancelFunc) {
|
||||
t.Helper()
|
||||
|
||||
podInfo := &downward.PodInfo{
|
||||
@ -119,23 +161,24 @@ func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName,
|
||||
leaderCancel()
|
||||
}()
|
||||
|
||||
return client
|
||||
return client, controllerCancel
|
||||
}
|
||||
|
||||
func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) map[string]*kubeclient.Client {
|
||||
func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) (map[string]*kubeclient.Client, map[string]context.CancelFunc) {
|
||||
t.Helper()
|
||||
|
||||
count := rand.IntnRange(1, 6)
|
||||
out := make(map[string]*kubeclient.Client, count)
|
||||
clients := make(map[string]*kubeclient.Client, count)
|
||||
cancels := make(map[string]context.CancelFunc, count)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
identity := "leader-election-client-" + rand.String(5)
|
||||
out[identity] = leaderElectionClient(t, namespace, leaseName, identity)
|
||||
clients[identity], cancels[identity] = leaderElectionClient(t, namespace, leaseName, identity)
|
||||
}
|
||||
|
||||
t.Logf("running leader election client tests with %d clients: %v", len(out), sets.StringKeySet(out).List())
|
||||
t.Logf("running leader election client tests with %d clients: %v", len(clients), sets.StringKeySet(clients).List())
|
||||
|
||||
return out
|
||||
return clients, cancels
|
||||
}
|
||||
|
||||
func pickRandomLeaderElectionClient(clients map[string]*kubeclient.Client) *kubeclient.Client {
|
||||
@ -162,7 +205,7 @@ func waitForIdentity(ctx context.Context, t *testing.T, namespace *corev1.Namesp
|
||||
}
|
||||
out = lease
|
||||
return lease.Spec.HolderIdentity != nil && identities.Has(*lease.Spec.HolderIdentity), nil
|
||||
}, 3*time.Minute, time.Second)
|
||||
}, 5*time.Minute, time.Second)
|
||||
|
||||
return out
|
||||
}
|
||||
@ -209,7 +252,7 @@ func checkOnlyLeaderCanWrite(ctx context.Context, t *testing.T, namespace *corev
|
||||
} else {
|
||||
nonLeaders++
|
||||
requireEventually.Error(err, "non leader client %q should have write error but it was nil", identity)
|
||||
requireEventually.True(errors.Is(err, leaderelection.ErrNotLeader), "non leader client %q should have write error: %v", identity, err)
|
||||
requireEventually.ErrorIs(err, leaderelection.ErrNotLeader, "non leader client %q should have write error: %v", identity, err)
|
||||
}
|
||||
}
|
||||
requireEventually.Equal(1, leaders, "did not see leader")
|
||||
|
Loading…
Reference in New Issue
Block a user