Merge pull request #788 from enj/enj/i/leader_election
Add Leader Election Middleware
This commit is contained in:
commit
ae505d8009
@ -153,6 +153,9 @@ rules:
|
|||||||
- apiGroups: [ "" ]
|
- apiGroups: [ "" ]
|
||||||
resources: [ configmaps ]
|
resources: [ configmaps ]
|
||||||
verbs: [ list, get, watch ]
|
verbs: [ list, get, watch ]
|
||||||
|
- apiGroups: [ coordination.k8s.io ]
|
||||||
|
resources: [ leases ]
|
||||||
|
verbs: [ create, get, update ]
|
||||||
---
|
---
|
||||||
kind: RoleBinding
|
kind: RoleBinding
|
||||||
apiVersion: rbac.authorization.k8s.io/v1
|
apiVersion: rbac.authorization.k8s.io/v1
|
||||||
|
@ -48,6 +48,9 @@ rules:
|
|||||||
- apiGroups: [apps]
|
- apiGroups: [apps]
|
||||||
resources: [replicasets,deployments]
|
resources: [replicasets,deployments]
|
||||||
verbs: [get]
|
verbs: [get]
|
||||||
|
- apiGroups: [ coordination.k8s.io ]
|
||||||
|
resources: [ leases ]
|
||||||
|
verbs: [ create, get, update ]
|
||||||
---
|
---
|
||||||
kind: RoleBinding
|
kind: RoleBinding
|
||||||
apiVersion: rbac.authorization.k8s.io/v1
|
apiVersion: rbac.authorization.k8s.io/v1
|
||||||
|
1
go.mod
1
go.mod
@ -25,6 +25,7 @@ require (
|
|||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
github.com/tdewolff/minify/v2 v2.9.21
|
github.com/tdewolff/minify/v2 v2.9.21
|
||||||
|
go.uber.org/atomic v1.7.0
|
||||||
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
|
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
|
||||||
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023
|
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023
|
||||||
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
|
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
|
||||||
|
@ -102,7 +102,7 @@ func newInternal( //nolint:funlen // yeah, it's kind of long.
|
|||||||
|
|
||||||
// Wire up the impersonation proxy signer CA as another valid authenticator for client cert auth,
|
// Wire up the impersonation proxy signer CA as another valid authenticator for client cert auth,
|
||||||
// along with the Kube API server's CA.
|
// along with the Kube API server's CA.
|
||||||
// Note: any changes to the the Authentication stack need to be kept in sync with any assumptions made
|
// Note: any changes to the Authentication stack need to be kept in sync with any assumptions made
|
||||||
// by getTransportForUser, especially if we ever update the TCR API to start returning bearer tokens.
|
// by getTransportForUser, especially if we ever update the TCR API to start returning bearer tokens.
|
||||||
kubeClientUnsafeForProxying, err := kubeclient.New(clientOpts...)
|
kubeClientUnsafeForProxying, err := kubeclient.New(clientOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -297,12 +297,7 @@ func TestExpirerControllerSync(t *testing.T) {
|
|||||||
|
|
||||||
if test.wantDelete {
|
if test.wantDelete {
|
||||||
require.Len(t, *opts, 1)
|
require.Len(t, *opts, 1)
|
||||||
require.Equal(t, metav1.DeleteOptions{
|
require.Equal(t, testutil.NewPreconditions(testUID, testRV), (*opts)[0])
|
||||||
Preconditions: &metav1.Preconditions{
|
|
||||||
UID: &testUID,
|
|
||||||
ResourceVersion: &testRV,
|
|
||||||
},
|
|
||||||
}, (*opts)[0])
|
|
||||||
} else {
|
} else {
|
||||||
require.Len(t, *opts, 0)
|
require.Len(t, *opts, 0)
|
||||||
}
|
}
|
||||||
|
@ -29,14 +29,12 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
kubeinformers "k8s.io/client-go/informers"
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
kubernetesfake "k8s.io/client-go/kubernetes/fake"
|
kubernetesfake "k8s.io/client-go/kubernetes/fake"
|
||||||
coretesting "k8s.io/client-go/testing"
|
coretesting "k8s.io/client-go/testing"
|
||||||
"k8s.io/utils/pointer"
|
|
||||||
|
|
||||||
"go.pinniped.dev/generated/latest/apis/concierge/config/v1alpha1"
|
"go.pinniped.dev/generated/latest/apis/concierge/config/v1alpha1"
|
||||||
pinnipedfake "go.pinniped.dev/generated/latest/client/concierge/clientset/versioned/fake"
|
pinnipedfake "go.pinniped.dev/generated/latest/client/concierge/clientset/versioned/fake"
|
||||||
@ -1032,13 +1030,7 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
|
|||||||
// validate that we set delete preconditions correctly
|
// validate that we set delete preconditions correctly
|
||||||
r.NotEmpty(*deleteOptions)
|
r.NotEmpty(*deleteOptions)
|
||||||
for _, opt := range *deleteOptions {
|
for _, opt := range *deleteOptions {
|
||||||
uid := types.UID("uid-1234")
|
r.Equal(testutil.NewPreconditions("uid-1234", "rv-5678"), opt)
|
||||||
r.Equal(metav1.DeleteOptions{
|
|
||||||
Preconditions: &metav1.Preconditions{
|
|
||||||
UID: &uid,
|
|
||||||
ResourceVersion: pointer.String("rv-5678"),
|
|
||||||
},
|
|
||||||
}, opt)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,12 +40,29 @@ func NewLegacyPodCleanerController(
|
|||||||
controllerlib.Config{
|
controllerlib.Config{
|
||||||
Name: "legacy-pod-cleaner-controller",
|
Name: "legacy-pod-cleaner-controller",
|
||||||
Syncer: controllerlib.SyncFunc(func(ctx controllerlib.Context) error {
|
Syncer: controllerlib.SyncFunc(func(ctx controllerlib.Context) error {
|
||||||
if err := client.Kubernetes.CoreV1().Pods(ctx.Key.Namespace).Delete(ctx.Context, ctx.Key.Name, metav1.DeleteOptions{}); err != nil {
|
podClient := client.Kubernetes.CoreV1().Pods(ctx.Key.Namespace)
|
||||||
|
|
||||||
|
// avoid blind writes to the API
|
||||||
|
agentPod, err := podClient.Get(ctx.Context, ctx.Key.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if k8serrors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("could not get legacy agent pod: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := podClient.Delete(ctx.Context, ctx.Key.Name, metav1.DeleteOptions{
|
||||||
|
Preconditions: &metav1.Preconditions{
|
||||||
|
UID: &agentPod.UID,
|
||||||
|
ResourceVersion: &agentPod.ResourceVersion,
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
if k8serrors.IsNotFound(err) {
|
if k8serrors.IsNotFound(err) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("could not delete legacy agent pod: %w", err)
|
return fmt.Errorf("could not delete legacy agent pod: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("deleted legacy kube-cert-agent pod", "pod", klog.KRef(ctx.Key.Namespace, ctx.Key.Name))
|
log.Info("deleted legacy kube-cert-agent pod", "pod", klog.KRef(ctx.Key.Namespace, ctx.Key.Name))
|
||||||
return nil
|
return nil
|
||||||
}),
|
}),
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
|
|
||||||
"go.pinniped.dev/internal/controllerlib"
|
"go.pinniped.dev/internal/controllerlib"
|
||||||
"go.pinniped.dev/internal/kubeclient"
|
"go.pinniped.dev/internal/kubeclient"
|
||||||
|
"go.pinniped.dev/internal/testutil"
|
||||||
"go.pinniped.dev/internal/testutil/testlogger"
|
"go.pinniped.dev/internal/testutil/testlogger"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -28,9 +29,11 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
|
|
||||||
legacyAgentPodWithoutExtraLabel := &corev1.Pod{
|
legacyAgentPodWithoutExtraLabel := &corev1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: "concierge",
|
Namespace: "concierge",
|
||||||
Name: "pinniped-concierge-kube-cert-agent-without-extra-label",
|
Name: "pinniped-concierge-kube-cert-agent-without-extra-label",
|
||||||
Labels: map[string]string{"kube-cert-agent.pinniped.dev": "true"},
|
Labels: map[string]string{"kube-cert-agent.pinniped.dev": "true"},
|
||||||
|
UID: "1",
|
||||||
|
ResourceVersion: "2",
|
||||||
},
|
},
|
||||||
Spec: corev1.PodSpec{},
|
Spec: corev1.PodSpec{},
|
||||||
Status: corev1.PodStatus{Phase: corev1.PodRunning},
|
Status: corev1.PodStatus{Phase: corev1.PodRunning},
|
||||||
@ -40,10 +43,14 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
legacyAgentPodWithExtraLabel.Name = "pinniped-concierge-kube-cert-agent-with-extra-label"
|
legacyAgentPodWithExtraLabel.Name = "pinniped-concierge-kube-cert-agent-with-extra-label"
|
||||||
legacyAgentPodWithExtraLabel.Labels["extralabel"] = "labelvalue"
|
legacyAgentPodWithExtraLabel.Labels["extralabel"] = "labelvalue"
|
||||||
legacyAgentPodWithExtraLabel.Labels["anotherextralabel"] = "labelvalue"
|
legacyAgentPodWithExtraLabel.Labels["anotherextralabel"] = "labelvalue"
|
||||||
|
legacyAgentPodWithExtraLabel.UID = "3"
|
||||||
|
legacyAgentPodWithExtraLabel.ResourceVersion = "4"
|
||||||
|
|
||||||
nonLegacyAgentPod := legacyAgentPodWithExtraLabel.DeepCopy()
|
nonLegacyAgentPod := legacyAgentPodWithExtraLabel.DeepCopy()
|
||||||
nonLegacyAgentPod.Name = "pinniped-concierge-kube-cert-agent-not-legacy"
|
nonLegacyAgentPod.Name = "pinniped-concierge-kube-cert-agent-not-legacy"
|
||||||
nonLegacyAgentPod.Labels["kube-cert-agent.pinniped.dev"] = "v2"
|
nonLegacyAgentPod.Labels["kube-cert-agent.pinniped.dev"] = "v2"
|
||||||
|
nonLegacyAgentPod.UID = "5"
|
||||||
|
nonLegacyAgentPod.ResourceVersion = "6"
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -52,10 +59,12 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
wantDistinctErrors []string
|
wantDistinctErrors []string
|
||||||
wantDistinctLogs []string
|
wantDistinctLogs []string
|
||||||
wantActions []coretesting.Action
|
wantActions []coretesting.Action
|
||||||
|
wantDeleteOptions []metav1.DeleteOptions
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no pods",
|
name: "no pods",
|
||||||
wantActions: []coretesting.Action{},
|
wantActions: []coretesting.Action{},
|
||||||
|
wantDeleteOptions: []metav1.DeleteOptions{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "mix of pods",
|
name: "mix of pods",
|
||||||
@ -69,8 +78,12 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
`legacy-pod-cleaner-controller "level"=0 "msg"="deleted legacy kube-cert-agent pod" "pod"={"name":"pinniped-concierge-kube-cert-agent-with-extra-label","namespace":"concierge"}`,
|
`legacy-pod-cleaner-controller "level"=0 "msg"="deleted legacy kube-cert-agent pod" "pod"={"name":"pinniped-concierge-kube-cert-agent-with-extra-label","namespace":"concierge"}`,
|
||||||
},
|
},
|
||||||
wantActions: []coretesting.Action{ // the first delete triggers the informer again, but the second invocation triggers a Not Found
|
wantActions: []coretesting.Action{ // the first delete triggers the informer again, but the second invocation triggers a Not Found
|
||||||
|
coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
|
},
|
||||||
|
wantDeleteOptions: []metav1.DeleteOptions{
|
||||||
|
testutil.NewPreconditions("3", "4"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -89,9 +102,15 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
"could not delete legacy agent pod: some delete error",
|
"could not delete legacy agent pod: some delete error",
|
||||||
},
|
},
|
||||||
wantActions: []coretesting.Action{
|
wantActions: []coretesting.Action{
|
||||||
|
coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
|
coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
},
|
},
|
||||||
|
wantDeleteOptions: []metav1.DeleteOptions{
|
||||||
|
testutil.NewPreconditions("3", "4"),
|
||||||
|
testutil.NewPreconditions("3", "4"),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "fail to delete because of not found error",
|
name: "fail to delete because of not found error",
|
||||||
@ -107,8 +126,30 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
},
|
},
|
||||||
wantDistinctErrors: []string{""},
|
wantDistinctErrors: []string{""},
|
||||||
wantActions: []coretesting.Action{
|
wantActions: []coretesting.Action{
|
||||||
|
coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
coretesting.NewDeleteAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
},
|
},
|
||||||
|
wantDeleteOptions: []metav1.DeleteOptions{
|
||||||
|
testutil.NewPreconditions("3", "4"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "fail to delete because of not found error on get",
|
||||||
|
kubeObjects: []runtime.Object{
|
||||||
|
legacyAgentPodWithoutExtraLabel, // should not be delete (missing extra label)
|
||||||
|
legacyAgentPodWithExtraLabel, // should be deleted
|
||||||
|
nonLegacyAgentPod, // should not be deleted (missing legacy agent label)
|
||||||
|
},
|
||||||
|
addKubeReactions: func(clientset *kubefake.Clientset) {
|
||||||
|
clientset.PrependReactor("get", "*", func(action coretesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
return true, nil, k8serrors.NewNotFound(action.GetResource().GroupResource(), "")
|
||||||
|
})
|
||||||
|
},
|
||||||
|
wantDistinctErrors: []string{""},
|
||||||
|
wantActions: []coretesting.Action{
|
||||||
|
coretesting.NewGetAction(corev1.Resource("pods").WithVersion("v1"), "concierge", legacyAgentPodWithExtraLabel.Name),
|
||||||
|
},
|
||||||
|
wantDeleteOptions: []metav1.DeleteOptions{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
@ -120,6 +161,10 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
if tt.addKubeReactions != nil {
|
if tt.addKubeReactions != nil {
|
||||||
tt.addKubeReactions(kubeClientset)
|
tt.addKubeReactions(kubeClientset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts := &[]metav1.DeleteOptions{}
|
||||||
|
trackDeleteClient := testutil.NewDeleteOptionsRecorder(kubeClientset, opts)
|
||||||
|
|
||||||
kubeInformers := informers.NewSharedInformerFactory(kubeClientset, 0)
|
kubeInformers := informers.NewSharedInformerFactory(kubeClientset, 0)
|
||||||
log := testlogger.New(t)
|
log := testlogger.New(t)
|
||||||
controller := NewLegacyPodCleanerController(
|
controller := NewLegacyPodCleanerController(
|
||||||
@ -127,7 +172,7 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
Namespace: "concierge",
|
Namespace: "concierge",
|
||||||
Labels: map[string]string{"extralabel": "labelvalue"},
|
Labels: map[string]string{"extralabel": "labelvalue"},
|
||||||
},
|
},
|
||||||
&kubeclient.Client{Kubernetes: kubeClientset},
|
&kubeclient.Client{Kubernetes: trackDeleteClient},
|
||||||
kubeInformers.Core().V1().Pods(),
|
kubeInformers.Core().V1().Pods(),
|
||||||
log,
|
log,
|
||||||
controllerlib.WithMaxRetries(1),
|
controllerlib.WithMaxRetries(1),
|
||||||
@ -140,6 +185,7 @@ func TestLegacyPodCleanerController(t *testing.T) {
|
|||||||
assert.Equal(t, tt.wantDistinctErrors, deduplicate(errorMessages), "unexpected errors")
|
assert.Equal(t, tt.wantDistinctErrors, deduplicate(errorMessages), "unexpected errors")
|
||||||
assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs")
|
assert.Equal(t, tt.wantDistinctLogs, deduplicate(log.Lines()), "unexpected logs")
|
||||||
assert.Equal(t, tt.wantActions, kubeClientset.Actions()[2:], "unexpected actions")
|
assert.Equal(t, tt.wantActions, kubeClientset.Actions()[2:], "unexpected actions")
|
||||||
|
assert.Equal(t, tt.wantDeleteOptions, *opts, "unexpected delete options")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,12 @@ func (c *garbageCollectorController) Sync(ctx controllerlib.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if garbageCollectAfterTime.Before(frozenClock.Now()) {
|
if garbageCollectAfterTime.Before(frozenClock.Now()) {
|
||||||
err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{})
|
err = c.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx.Context, secret.Name, metav1.DeleteOptions{
|
||||||
|
Preconditions: &metav1.Preconditions{
|
||||||
|
UID: &secret.UID,
|
||||||
|
ResourceVersion: &secret.ResourceVersion,
|
||||||
|
},
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.WarningErr("failed to garbage collect resource", err, logKV(secret))
|
plog.WarningErr("failed to garbage collect resource", err, logKV(secret))
|
||||||
continue
|
continue
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
kubeinformers "k8s.io/client-go/informers"
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
kubernetesfake "k8s.io/client-go/kubernetes/fake"
|
kubernetesfake "k8s.io/client-go/kubernetes/fake"
|
||||||
kubetesting "k8s.io/client-go/testing"
|
kubetesting "k8s.io/client-go/testing"
|
||||||
|
|
||||||
@ -116,6 +117,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
|
|||||||
subject controllerlib.Controller
|
subject controllerlib.Controller
|
||||||
kubeInformerClient *kubernetesfake.Clientset
|
kubeInformerClient *kubernetesfake.Clientset
|
||||||
kubeClient *kubernetesfake.Clientset
|
kubeClient *kubernetesfake.Clientset
|
||||||
|
deleteOptions *[]metav1.DeleteOptions
|
||||||
|
deleteOptionsRecorder kubernetes.Interface
|
||||||
kubeInformers kubeinformers.SharedInformerFactory
|
kubeInformers kubeinformers.SharedInformerFactory
|
||||||
cancelContext context.Context
|
cancelContext context.Context
|
||||||
cancelContextCancelFunc context.CancelFunc
|
cancelContextCancelFunc context.CancelFunc
|
||||||
@ -130,7 +133,7 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
|
|||||||
// Set this at the last second to allow for injection of server override.
|
// Set this at the last second to allow for injection of server override.
|
||||||
subject = GarbageCollectorController(
|
subject = GarbageCollectorController(
|
||||||
fakeClock,
|
fakeClock,
|
||||||
kubeClient,
|
deleteOptionsRecorder,
|
||||||
kubeInformers.Core().V1().Secrets(),
|
kubeInformers.Core().V1().Secrets(),
|
||||||
controllerlib.WithInformer,
|
controllerlib.WithInformer,
|
||||||
)
|
)
|
||||||
@ -158,6 +161,8 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
|
|||||||
|
|
||||||
kubeInformerClient = kubernetesfake.NewSimpleClientset()
|
kubeInformerClient = kubernetesfake.NewSimpleClientset()
|
||||||
kubeClient = kubernetesfake.NewSimpleClientset()
|
kubeClient = kubernetesfake.NewSimpleClientset()
|
||||||
|
deleteOptions = &[]metav1.DeleteOptions{}
|
||||||
|
deleteOptionsRecorder = testutil.NewDeleteOptionsRecorder(kubeClient, deleteOptions)
|
||||||
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
|
kubeInformers = kubeinformers.NewSharedInformerFactory(kubeInformerClient, 0)
|
||||||
frozenNow = time.Now().UTC()
|
frozenNow = time.Now().UTC()
|
||||||
fakeClock = clock.NewFakeClock(frozenNow)
|
fakeClock = clock.NewFakeClock(frozenNow)
|
||||||
@ -193,8 +198,10 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
|
|||||||
it.Before(func() {
|
it.Before(func() {
|
||||||
firstExpiredSecret := &corev1.Secret{
|
firstExpiredSecret := &corev1.Secret{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "first expired secret",
|
Name: "first expired secret",
|
||||||
Namespace: installedInNamespace,
|
Namespace: installedInNamespace,
|
||||||
|
UID: "uid-123",
|
||||||
|
ResourceVersion: "rv-456",
|
||||||
Annotations: map[string]string{
|
Annotations: map[string]string{
|
||||||
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339),
|
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-time.Second).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
@ -204,8 +211,10 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
|
|||||||
r.NoError(kubeClient.Tracker().Add(firstExpiredSecret))
|
r.NoError(kubeClient.Tracker().Add(firstExpiredSecret))
|
||||||
secondExpiredSecret := &corev1.Secret{
|
secondExpiredSecret := &corev1.Secret{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "second expired secret",
|
Name: "second expired secret",
|
||||||
Namespace: installedInNamespace,
|
Namespace: installedInNamespace,
|
||||||
|
UID: "uid-789",
|
||||||
|
ResourceVersion: "rv-555",
|
||||||
Annotations: map[string]string{
|
Annotations: map[string]string{
|
||||||
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-2 * time.Second).Format(time.RFC3339),
|
"storage.pinniped.dev/garbage-collect-after": frozenNow.Add(-2 * time.Second).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
@ -237,6 +246,13 @@ func TestGarbageCollectorControllerSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
kubeClient.Actions(),
|
kubeClient.Actions(),
|
||||||
)
|
)
|
||||||
|
r.ElementsMatch(
|
||||||
|
[]metav1.DeleteOptions{
|
||||||
|
testutil.NewPreconditions("uid-123", "rv-456"),
|
||||||
|
testutil.NewPreconditions("uid-789", "rv-555"),
|
||||||
|
},
|
||||||
|
*deleteOptions,
|
||||||
|
)
|
||||||
list, err := kubeClient.CoreV1().Secrets(installedInNamespace).List(context.Background(), metav1.ListOptions{})
|
list, err := kubeClient.CoreV1().Secrets(installedInNamespace).List(context.Background(), metav1.ListOptions{})
|
||||||
r.NoError(err)
|
r.NoError(err)
|
||||||
r.Len(list.Items, 2)
|
r.Len(list.Items, 2)
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"go.pinniped.dev/internal/dynamiccert"
|
"go.pinniped.dev/internal/dynamiccert"
|
||||||
"go.pinniped.dev/internal/groupsuffix"
|
"go.pinniped.dev/internal/groupsuffix"
|
||||||
"go.pinniped.dev/internal/kubeclient"
|
"go.pinniped.dev/internal/kubeclient"
|
||||||
|
"go.pinniped.dev/internal/leaderelection"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -97,7 +98,7 @@ type Config struct {
|
|||||||
func PrepareControllers(c *Config) (func(ctx context.Context), error) {
|
func PrepareControllers(c *Config) (func(ctx context.Context), error) {
|
||||||
loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix)
|
loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix)
|
||||||
|
|
||||||
dref, _, err := deploymentref.New(c.ServerInstallationInfo)
|
dref, deployment, err := deploymentref.New(c.ServerInstallationInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot create deployment ref: %w", err)
|
return nil, fmt.Errorf("cannot create deployment ref: %w", err)
|
||||||
}
|
}
|
||||||
@ -107,7 +108,9 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
|
|||||||
return nil, fmt.Errorf("cannot create API service ref: %w", err)
|
return nil, fmt.Errorf("cannot create API service ref: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := kubeclient.New(
|
client, leaderElector, err := leaderelection.New(
|
||||||
|
c.ServerInstallationInfo,
|
||||||
|
deployment,
|
||||||
dref, // first try to use the deployment as an owner ref (for namespace scoped resources)
|
dref, // first try to use the deployment as an owner ref (for namespace scoped resources)
|
||||||
apiServiceRef, // fallback to our API service (for everything else we create)
|
apiServiceRef, // fallback to our API service (for everything else we create)
|
||||||
kubeclient.WithMiddleware(groupsuffix.New(c.APIGroupSuffix)),
|
kubeclient.WithMiddleware(groupsuffix.New(c.APIGroupSuffix)),
|
||||||
@ -303,7 +306,7 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
|
|||||||
// Return a function which starts the informers and controllers.
|
// Return a function which starts the informers and controllers.
|
||||||
return func(ctx context.Context) {
|
return func(ctx context.Context) {
|
||||||
informers.startAndWaitForSync(ctx)
|
informers.startAndWaitForSync(ctx)
|
||||||
go controllerManager.Start(ctx)
|
go leaderElector(ctx, controllerManager.Start)
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,7 +146,7 @@ func handleOtherVerbs(
|
|||||||
|
|
||||||
result, err := middlewareReq.mutateRequest(obj)
|
result, err := middlewareReq.mutateRequest(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, nil, err
|
return true, nil, fmt.Errorf("middleware request for %#v failed to mutate: %w", middlewareReq, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !result.mutated {
|
if !result.mutated {
|
||||||
@ -231,7 +231,7 @@ func handleCreateOrUpdate(
|
|||||||
|
|
||||||
result, err := middlewareReq.mutateRequest(obj)
|
result, err := middlewareReq.mutateRequest(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, nil, err
|
return true, nil, fmt.Errorf("middleware request for %#v failed to mutate: %w", middlewareReq, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !result.mutated {
|
if !result.mutated {
|
||||||
|
150
internal/leaderelection/leaderelection.go
Normal file
150
internal/leaderelection/leaderelection.go
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package leaderelection
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/client-go/tools/leaderelection"
|
||||||
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
|
|
||||||
|
"go.pinniped.dev/internal/constable"
|
||||||
|
"go.pinniped.dev/internal/downward"
|
||||||
|
"go.pinniped.dev/internal/kubeclient"
|
||||||
|
"go.pinniped.dev/internal/plog"
|
||||||
|
)
|
||||||
|
|
||||||
|
const ErrNotLeader constable.Error = "write attempt rejected as client is not leader"
|
||||||
|
|
||||||
|
// New returns a client that has a leader election middleware injected into it.
|
||||||
|
// This middleware will prevent all non-read requests to the Kubernetes API when
|
||||||
|
// the current process does not hold the leader election lock. Unlike normal
|
||||||
|
// leader election where the process blocks until it acquires the lock, this
|
||||||
|
// middleware approach lets the process run as normal for all read requests.
|
||||||
|
// Another difference is that if the process acquires the lock and then loses it
|
||||||
|
// (i.e. a failed renewal), it will not exit (i.e. restart). Instead, it will
|
||||||
|
// simply attempt to acquire the lock again.
|
||||||
|
//
|
||||||
|
// The returned function is blocking and will run the leader election polling
|
||||||
|
// logic and will coordinate lease release with the input controller starter function.
|
||||||
|
func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) (
|
||||||
|
*kubeclient.Client,
|
||||||
|
func(context.Context, func(context.Context)),
|
||||||
|
error,
|
||||||
|
) {
|
||||||
|
internalClient, err := kubeclient.New(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("could not create internal client for leader election: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
isLeader := atomic.NewBool(false)
|
||||||
|
|
||||||
|
identity := podInfo.Name
|
||||||
|
leaseName := deployment.Name
|
||||||
|
|
||||||
|
leaderElectionConfig := leaderelection.LeaderElectionConfig{
|
||||||
|
Lock: &resourcelock.LeaseLock{
|
||||||
|
LeaseMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: podInfo.Namespace,
|
||||||
|
Name: leaseName,
|
||||||
|
},
|
||||||
|
Client: internalClient.Kubernetes.CoordinationV1(),
|
||||||
|
LockConfig: resourcelock.ResourceLockConfig{
|
||||||
|
Identity: identity,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ReleaseOnCancel: true, // semantics for correct release handled by controllersWithLeaderElector below
|
||||||
|
LeaseDuration: 60 * time.Second,
|
||||||
|
RenewDeadline: 15 * time.Second,
|
||||||
|
RetryPeriod: 5 * time.Second,
|
||||||
|
Callbacks: leaderelection.LeaderCallbacks{
|
||||||
|
OnStartedLeading: func(_ context.Context) {
|
||||||
|
plog.Debug("leader gained", "identity", identity)
|
||||||
|
isLeader.Store(true)
|
||||||
|
},
|
||||||
|
OnStoppedLeading: func() {
|
||||||
|
plog.Debug("leader lost", "identity", identity)
|
||||||
|
isLeader.Store(false)
|
||||||
|
},
|
||||||
|
OnNewLeader: func(newLeader string) {
|
||||||
|
if newLeader == identity {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
plog.Debug("new leader elected", "newLeader", newLeader)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Name: leaseName,
|
||||||
|
// this must be set to nil because we do not want to associate /healthz with a failed
|
||||||
|
// leader election renewal as we do not want to exit the process if the leader changes.
|
||||||
|
WatchDog: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate our config here before we rely on it being functioning below
|
||||||
|
if _, err := leaderelection.NewLeaderElector(leaderElectionConfig); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("invalid config - could not create leader elector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writeOnlyWhenLeader := kubeclient.MiddlewareFunc(func(_ context.Context, rt kubeclient.RoundTrip) {
|
||||||
|
switch rt.Verb() {
|
||||||
|
case kubeclient.VerbGet, kubeclient.VerbList, kubeclient.VerbWatch:
|
||||||
|
// reads are always allowed.
|
||||||
|
// note that while our pods/exec into the kube cert agent pod is a write request from the
|
||||||
|
// perspective of the Kube API, it is semantically a read request since no mutation occurs.
|
||||||
|
// we simply use it to fill a cache, and we need all pods to have a functioning cache.
|
||||||
|
// however, we do not need to handle it here because remotecommand.NewSPDYExecutor uses a
|
||||||
|
// kubeclient.Client.JSONConfig as input. since our middleware logic is only injected into
|
||||||
|
// the generated clientset code, this JSONConfig simply ignores this middleware all together.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if isLeader.Load() { // only perform "expensive" test for writes
|
||||||
|
return // we are currently the leader, all actions are permitted
|
||||||
|
}
|
||||||
|
|
||||||
|
rt.MutateRequest(func(_ kubeclient.Object) error {
|
||||||
|
return ErrNotLeader // we are not the leader, fail the write request
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
leaderElectionOpts := append(
|
||||||
|
// all middleware are always executed so this being the first middleware is not relevant
|
||||||
|
[]kubeclient.Option{kubeclient.WithMiddleware(writeOnlyWhenLeader)},
|
||||||
|
opts..., // do not mutate input slice
|
||||||
|
)
|
||||||
|
|
||||||
|
client, err := kubeclient.New(leaderElectionOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("could not create leader election client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
controllersWithLeaderElector := func(ctx context.Context, controllers func(context.Context)) {
|
||||||
|
leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
|
||||||
|
leaderElectorCancel() // once the controllers have all stopped, tell the leader elector to release the lock
|
||||||
|
}()
|
||||||
|
|
||||||
|
for { // run (and rerun on release) the leader elector with its own context (blocking)
|
||||||
|
select {
|
||||||
|
case <-leaderElectorCtx.Done():
|
||||||
|
return // keep trying to run until process exit
|
||||||
|
|
||||||
|
default:
|
||||||
|
// blocks while trying to acquire lease, unblocks on release.
|
||||||
|
// note that this creates a new leader elector on each loop to
|
||||||
|
// prevent any bugs from reusing that struct across elections.
|
||||||
|
// our config was validated above so this should never die.
|
||||||
|
leaderelection.RunOrDie(leaderElectorCtx, leaderElectionConfig)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, controllersWithLeaderElector, nil
|
||||||
|
}
|
@ -41,6 +41,7 @@ import (
|
|||||||
"go.pinniped.dev/internal/downward"
|
"go.pinniped.dev/internal/downward"
|
||||||
"go.pinniped.dev/internal/groupsuffix"
|
"go.pinniped.dev/internal/groupsuffix"
|
||||||
"go.pinniped.dev/internal/kubeclient"
|
"go.pinniped.dev/internal/kubeclient"
|
||||||
|
"go.pinniped.dev/internal/leaderelection"
|
||||||
"go.pinniped.dev/internal/oidc/jwks"
|
"go.pinniped.dev/internal/oidc/jwks"
|
||||||
"go.pinniped.dev/internal/oidc/provider"
|
"go.pinniped.dev/internal/oidc/provider"
|
||||||
"go.pinniped.dev/internal/oidc/provider/manager"
|
"go.pinniped.dev/internal/oidc/provider/manager"
|
||||||
@ -94,6 +95,7 @@ func startControllers(
|
|||||||
pinnipedClient pinnipedclientset.Interface,
|
pinnipedClient pinnipedclientset.Interface,
|
||||||
kubeInformers kubeinformers.SharedInformerFactory,
|
kubeInformers kubeinformers.SharedInformerFactory,
|
||||||
pinnipedInformers pinnipedinformers.SharedInformerFactory,
|
pinnipedInformers pinnipedinformers.SharedInformerFactory,
|
||||||
|
leaderElector func(context.Context, func(context.Context)),
|
||||||
) {
|
) {
|
||||||
federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains()
|
federationDomainInformer := pinnipedInformers.Config().V1alpha1().FederationDomains()
|
||||||
secretInformer := kubeInformers.Core().V1().Secrets()
|
secretInformer := kubeInformers.Core().V1().Secrets()
|
||||||
@ -261,7 +263,7 @@ func startControllers(
|
|||||||
kubeInformers.WaitForCacheSync(ctx.Done())
|
kubeInformers.WaitForCacheSync(ctx.Done())
|
||||||
pinnipedInformers.WaitForCacheSync(ctx.Done())
|
pinnipedInformers.WaitForCacheSync(ctx.Done())
|
||||||
|
|
||||||
go controllerManager.Start(ctx)
|
go leaderElector(ctx, controllerManager.Start)
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
||||||
@ -275,14 +277,25 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
|||||||
return fmt.Errorf("cannot create deployment ref: %w", err)
|
return fmt.Errorf("cannot create deployment ref: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := kubeclient.New(
|
opts := []kubeclient.Option{
|
||||||
dref,
|
dref,
|
||||||
kubeclient.WithMiddleware(groupsuffix.New(*cfg.APIGroupSuffix)),
|
kubeclient.WithMiddleware(groupsuffix.New(*cfg.APIGroupSuffix)),
|
||||||
|
}
|
||||||
|
|
||||||
|
client, leaderElector, err := leaderelection.New(
|
||||||
|
podInfo,
|
||||||
|
supervisorDeployment,
|
||||||
|
opts...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot create k8s client: %w", err)
|
return fmt.Errorf("cannot create k8s client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clientWithoutLeaderElection, err := kubeclient.New(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot create k8s client without leader election: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
kubeInformers := kubeinformers.NewSharedInformerFactoryWithOptions(
|
kubeInformers := kubeinformers.NewSharedInformerFactoryWithOptions(
|
||||||
client.Kubernetes,
|
client.Kubernetes,
|
||||||
defaultResyncInterval,
|
defaultResyncInterval,
|
||||||
@ -312,7 +325,7 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
|||||||
dynamicJWKSProvider,
|
dynamicJWKSProvider,
|
||||||
dynamicUpstreamIDPProvider,
|
dynamicUpstreamIDPProvider,
|
||||||
&secretCache,
|
&secretCache,
|
||||||
client.Kubernetes.CoreV1().Secrets(serverInstallationNamespace),
|
clientWithoutLeaderElection.Kubernetes.CoreV1().Secrets(serverInstallationNamespace), // writes to kube storage are allowed for non-leaders
|
||||||
)
|
)
|
||||||
|
|
||||||
startControllers(
|
startControllers(
|
||||||
@ -328,6 +341,7 @@ func run(podInfo *downward.PodInfo, cfg *supervisor.Config) error {
|
|||||||
client.PinnipedSupervisor,
|
client.PinnipedSupervisor,
|
||||||
kubeInformers,
|
kubeInformers,
|
||||||
pinnipedInformers,
|
pinnipedInformers,
|
||||||
|
leaderElector,
|
||||||
)
|
)
|
||||||
|
|
||||||
//nolint: gosec // Intentionally binding to all network interfaces.
|
//nolint: gosec // Intentionally binding to all network interfaces.
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
)
|
)
|
||||||
@ -32,10 +33,24 @@ type coreWrapper struct {
|
|||||||
opts *[]metav1.DeleteOptions
|
opts *[]metav1.DeleteOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *coreWrapper) Pods(namespace string) corev1client.PodInterface {
|
||||||
|
return &podsWrapper{PodInterface: c.CoreV1Interface.Pods(namespace), opts: c.opts}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *coreWrapper) Secrets(namespace string) corev1client.SecretInterface {
|
func (c *coreWrapper) Secrets(namespace string) corev1client.SecretInterface {
|
||||||
return &secretsWrapper{SecretInterface: c.CoreV1Interface.Secrets(namespace), opts: c.opts}
|
return &secretsWrapper{SecretInterface: c.CoreV1Interface.Secrets(namespace), opts: c.opts}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type podsWrapper struct {
|
||||||
|
corev1client.PodInterface
|
||||||
|
opts *[]metav1.DeleteOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *podsWrapper) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
|
||||||
|
*s.opts = append(*s.opts, opts)
|
||||||
|
return s.PodInterface.Delete(ctx, name, opts)
|
||||||
|
}
|
||||||
|
|
||||||
type secretsWrapper struct {
|
type secretsWrapper struct {
|
||||||
corev1client.SecretInterface
|
corev1client.SecretInterface
|
||||||
opts *[]metav1.DeleteOptions
|
opts *[]metav1.DeleteOptions
|
||||||
@ -45,3 +60,12 @@ func (s *secretsWrapper) Delete(ctx context.Context, name string, opts metav1.De
|
|||||||
*s.opts = append(*s.opts, opts)
|
*s.opts = append(*s.opts, opts)
|
||||||
return s.SecretInterface.Delete(ctx, name, opts)
|
return s.SecretInterface.Delete(ctx, name, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewPreconditions(uid types.UID, rv string) metav1.DeleteOptions {
|
||||||
|
return metav1.DeleteOptions{
|
||||||
|
Preconditions: &metav1.Preconditions{
|
||||||
|
UID: &uid,
|
||||||
|
ResourceVersion: &rv,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -438,7 +438,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl
|
|||||||
t.Run("using and watching all the basic verbs", func(t *testing.T) {
|
t.Run("using and watching all the basic verbs", func(t *testing.T) {
|
||||||
parallelIfNotEKS(t)
|
parallelIfNotEKS(t)
|
||||||
// Create a namespace, because it will be easier to exercise "deletecollection" if we have a namespace.
|
// Create a namespace, because it will be easier to exercise "deletecollection" if we have a namespace.
|
||||||
namespaceName := createTestNamespace(t, adminClient)
|
namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name
|
||||||
|
|
||||||
// Create and start informer to exercise the "watch" verb for us.
|
// Create and start informer to exercise the "watch" verb for us.
|
||||||
informerFactory := k8sinformers.NewSharedInformerFactoryWithOptions(
|
informerFactory := k8sinformers.NewSharedInformerFactoryWithOptions(
|
||||||
@ -827,7 +827,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl
|
|||||||
// this works because impersonation cannot set UID and thus the final user info the proxy sees has no UID
|
// this works because impersonation cannot set UID and thus the final user info the proxy sees has no UID
|
||||||
t.Run("nested impersonation as a service account is allowed if it has enough RBAC permissions", func(t *testing.T) {
|
t.Run("nested impersonation as a service account is allowed if it has enough RBAC permissions", func(t *testing.T) {
|
||||||
parallelIfNotEKS(t)
|
parallelIfNotEKS(t)
|
||||||
namespaceName := createTestNamespace(t, adminClient)
|
namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name
|
||||||
saName, saToken, saUID := createServiceAccountToken(ctx, t, adminClient, namespaceName)
|
saName, saToken, saUID := createServiceAccountToken(ctx, t, adminClient, namespaceName)
|
||||||
nestedImpersonationClient := newImpersonationProxyClientWithCredentials(t,
|
nestedImpersonationClient := newImpersonationProxyClientWithCredentials(t,
|
||||||
&loginv1alpha1.ClusterCredential{Token: saToken}, impersonationProxyURL, impersonationProxyCACertPEM,
|
&loginv1alpha1.ClusterCredential{Token: saToken}, impersonationProxyURL, impersonationProxyCACertPEM,
|
||||||
@ -916,7 +916,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test using a service account token.
|
// Test using a service account token.
|
||||||
namespaceName := createTestNamespace(t, adminClient)
|
namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name
|
||||||
saName, saToken, _ := createServiceAccountToken(ctx, t, adminClient, namespaceName)
|
saName, saToken, _ := createServiceAccountToken(ctx, t, adminClient, namespaceName)
|
||||||
impersonationProxyServiceAccountPinnipedConciergeClient := newImpersonationProxyClientWithCredentials(t,
|
impersonationProxyServiceAccountPinnipedConciergeClient := newImpersonationProxyClientWithCredentials(t,
|
||||||
&loginv1alpha1.ClusterCredential{Token: saToken},
|
&loginv1alpha1.ClusterCredential{Token: saToken},
|
||||||
@ -935,7 +935,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("WhoAmIRequests and SA token request", func(t *testing.T) {
|
t.Run("WhoAmIRequests and SA token request", func(t *testing.T) {
|
||||||
namespaceName := createTestNamespace(t, adminClient)
|
namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name
|
||||||
kubeClient := adminClient.CoreV1()
|
kubeClient := adminClient.CoreV1()
|
||||||
saName, _, saUID := createServiceAccountToken(ctx, t, adminClient, namespaceName)
|
saName, _, saUID := createServiceAccountToken(ctx, t, adminClient, namespaceName)
|
||||||
|
|
||||||
@ -1145,7 +1145,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl
|
|||||||
|
|
||||||
t.Run("websocket client", func(t *testing.T) {
|
t.Run("websocket client", func(t *testing.T) {
|
||||||
parallelIfNotEKS(t)
|
parallelIfNotEKS(t)
|
||||||
namespaceName := createTestNamespace(t, adminClient)
|
namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name
|
||||||
|
|
||||||
impersonationRestConfig := impersonationProxyRestConfig(
|
impersonationRestConfig := impersonationProxyRestConfig(
|
||||||
refreshCredential(t, impersonationProxyURL, impersonationProxyCACertPEM),
|
refreshCredential(t, impersonationProxyURL, impersonationProxyCACertPEM),
|
||||||
@ -1224,7 +1224,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl
|
|||||||
|
|
||||||
t.Run("http2 client", func(t *testing.T) {
|
t.Run("http2 client", func(t *testing.T) {
|
||||||
parallelIfNotEKS(t)
|
parallelIfNotEKS(t)
|
||||||
namespaceName := createTestNamespace(t, adminClient)
|
namespaceName := testlib.CreateNamespace(ctx, t, "impersonation").Name
|
||||||
|
|
||||||
wantConfigMapLabelKey, wantConfigMapLabelValue := "some-label-key", "some-label-value"
|
wantConfigMapLabelKey, wantConfigMapLabelValue := "some-label-key", "some-label-value"
|
||||||
wantConfigMap := &corev1.ConfigMap{
|
wantConfigMap := &corev1.ConfigMap{
|
||||||
@ -1783,7 +1783,7 @@ func TestImpersonationProxy(t *testing.T) { //nolint:gocyclo // yeah, it's compl
|
|||||||
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
|
||||||
_, err := adminClient.CoreV1().Secrets(env.ConciergeNamespace).Get(ctx, impersonationProxyTLSSecretName(env), metav1.GetOptions{})
|
_, err := adminClient.CoreV1().Secrets(env.ConciergeNamespace).Get(ctx, impersonationProxyTLSSecretName(env), metav1.GetOptions{})
|
||||||
requireEventually.Truef(k8serrors.IsNotFound(err), "expected NotFound error, got %v", err)
|
requireEventually.Truef(k8serrors.IsNotFound(err), "expected NotFound error, got %v", err)
|
||||||
}, 10*time.Second, 250*time.Millisecond)
|
}, time.Minute, time.Second)
|
||||||
|
|
||||||
// Check that the generated CA cert Secret was not deleted by the controller because it's supposed to keep this
|
// Check that the generated CA cert Secret was not deleted by the controller because it's supposed to keep this
|
||||||
// around in case we decide to later re-enable the impersonator. We want to avoid generating new CA certs when
|
// around in case we decide to later re-enable the impersonator. We want to avoid generating new CA certs when
|
||||||
@ -1864,27 +1864,6 @@ func ensureDNSResolves(t *testing.T, urlString string) {
|
|||||||
}, 5*time.Minute, 1*time.Second)
|
}, 5*time.Minute, 1*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTestNamespace(t *testing.T, adminClient kubernetes.Interface) string {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
namespace, err := adminClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{GenerateName: "impersonation-integration-test-"},
|
|
||||||
}, metav1.CreateOptions{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Cleanup(func() {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
t.Logf("cleaning up test namespace %s", namespace.Name)
|
|
||||||
require.NoError(t, adminClient.CoreV1().Namespaces().Delete(ctx, namespace.Name, metav1.DeleteOptions{}))
|
|
||||||
})
|
|
||||||
return namespace.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
func createServiceAccountToken(ctx context.Context, t *testing.T, adminClient kubernetes.Interface, namespaceName string) (name, token string, uid types.UID) {
|
func createServiceAccountToken(ctx context.Context, t *testing.T, adminClient kubernetes.Interface, namespaceName string) (name, token string, uid types.UID) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
278
test/integration/leaderelection_test.go
Normal file
278
test/integration/leaderelection_test.go
Normal file
@ -0,0 +1,278 @@
|
|||||||
|
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
|
authenticationv1 "k8s.io/api/authentication/v1"
|
||||||
|
coordinationv1 "k8s.io/api/coordination/v1"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/rand"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/client-go/util/retry"
|
||||||
|
"k8s.io/utils/pointer"
|
||||||
|
|
||||||
|
"go.pinniped.dev/internal/downward"
|
||||||
|
"go.pinniped.dev/internal/kubeclient"
|
||||||
|
"go.pinniped.dev/internal/leaderelection"
|
||||||
|
"go.pinniped.dev/test/testlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLeaderElection(t *testing.T) {
|
||||||
|
_ = testlib.IntegrationEnv(t)
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
leaseName := "leader-election-" + rand.String(5)
|
||||||
|
|
||||||
|
namespace := testlib.CreateNamespace(ctx, t, leaseName)
|
||||||
|
|
||||||
|
clients := leaderElectionClients(t, namespace, leaseName)
|
||||||
|
|
||||||
|
// the tests below are order dependant to some degree and definitely cannot be run in parallel
|
||||||
|
|
||||||
|
t.Run("sanity check write prevention", func(t *testing.T) {
|
||||||
|
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
|
||||||
|
logLease(t, lease)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("clients handle leader election transition correctly", func(t *testing.T) {
|
||||||
|
lease := forceTransition(ctx, t, namespace, leaseName, clients)
|
||||||
|
logLease(t, lease)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("sanity check write prevention after transition", func(t *testing.T) {
|
||||||
|
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
|
||||||
|
logLease(t, lease)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("clients handle leader election restart correctly", func(t *testing.T) {
|
||||||
|
lease := forceRestart(ctx, t, namespace, leaseName, clients)
|
||||||
|
logLease(t, lease)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("sanity check write prevention after restart", func(t *testing.T) {
|
||||||
|
lease := checkOnlyLeaderCanWrite(ctx, t, namespace, leaseName, clients)
|
||||||
|
logLease(t, lease)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func leaderElectionClient(t *testing.T, namespace *corev1.Namespace, leaseName, identity string) *kubeclient.Client {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
podInfo := &downward.PodInfo{
|
||||||
|
Namespace: namespace.Name,
|
||||||
|
Name: identity,
|
||||||
|
}
|
||||||
|
deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: leaseName}}
|
||||||
|
|
||||||
|
client, leaderElector, err := leaderelection.New(podInfo, deployment, testlib.NewKubeclientOptions(t, testlib.NewClientConfig(t))...)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
controllerCtx, controllerCancel := context.WithCancel(context.Background())
|
||||||
|
leaderCtx, leaderCancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
controllerCancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-leaderCtx.Done():
|
||||||
|
// leader election client stopped correctly
|
||||||
|
|
||||||
|
case <-time.After(time.Minute):
|
||||||
|
t.Errorf("leader election client in namespace %q with lease %q and identity %q failed to stop",
|
||||||
|
namespace.Name, leaseName, identity)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize start of client and controllers
|
||||||
|
|
||||||
|
// this blocks
|
||||||
|
leaderElector(controllerCtx, func(ctx context.Context) {
|
||||||
|
<-ctx.Done()
|
||||||
|
time.Sleep(time.Duration(rand.Int63nRange(1, 10)) * time.Second) // randomize stop of controllers
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-controllerCtx.Done():
|
||||||
|
// leaderElector correctly stopped but only after controllers stopped
|
||||||
|
|
||||||
|
default:
|
||||||
|
t.Errorf("leader election client in namespace %q with lease %q and identity %q stopped early",
|
||||||
|
namespace.Name, leaseName, identity)
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderCancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
|
func leaderElectionClients(t *testing.T, namespace *corev1.Namespace, leaseName string) map[string]*kubeclient.Client {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
count := rand.IntnRange(1, 6)
|
||||||
|
out := make(map[string]*kubeclient.Client, count)
|
||||||
|
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
identity := "leader-election-client-" + rand.String(5)
|
||||||
|
out[identity] = leaderElectionClient(t, namespace, leaseName, identity)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("running leader election client tests with %d clients: %v", len(out), sets.StringKeySet(out).List())
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func pickRandomLeaderElectionClient(clients map[string]*kubeclient.Client) *kubeclient.Client {
|
||||||
|
for _, client := range clients {
|
||||||
|
client := client
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
panic("clients map was empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForIdentity(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
identities := sets.StringKeySet(clients)
|
||||||
|
var out *coordinationv1.Lease
|
||||||
|
|
||||||
|
testlib.RequireEventuallyWithoutError(t, func() (bool, error) {
|
||||||
|
lease, err := pickRandomLeaderElectionClient(clients).Kubernetes.CoordinationV1().Leases(namespace.Name).Get(ctx, leaseName, metav1.GetOptions{})
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
out = lease
|
||||||
|
return lease.Spec.HolderIdentity != nil && identities.Has(*lease.Spec.HolderIdentity), nil
|
||||||
|
}, 3*time.Minute, time.Second)
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteRequest(ctx context.Context, client *kubeclient.Client) error {
|
||||||
|
_, err := client.Kubernetes.AuthenticationV1().TokenReviews().Create(ctx, &authenticationv1.TokenReview{
|
||||||
|
Spec: authenticationv1.TokenReviewSpec{Token: "any-non-empty-value"},
|
||||||
|
}, metav1.CreateOptions{})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func runWriteRequests(ctx context.Context, clients map[string]*kubeclient.Client) map[string]error {
|
||||||
|
out := make(map[string]error, len(clients))
|
||||||
|
|
||||||
|
for identity, client := range clients {
|
||||||
|
identity, client := identity, client
|
||||||
|
|
||||||
|
out[identity] = runWriteRequest(ctx, client)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func pickCurrentLeaderClient(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *kubeclient.Client {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
lease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||||
|
return clients[*lease.Spec.HolderIdentity]
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkOnlyLeaderCanWrite(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
lease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||||
|
|
||||||
|
var leaders, nonLeaders int
|
||||||
|
for identity, err := range runWriteRequests(ctx, clients) {
|
||||||
|
identity, err := identity, err
|
||||||
|
|
||||||
|
if identity == *lease.Spec.HolderIdentity {
|
||||||
|
leaders++
|
||||||
|
assert.NoError(t, err, "leader client %q should have no error", identity)
|
||||||
|
} else {
|
||||||
|
nonLeaders++
|
||||||
|
assert.Error(t, err, "non leader client %q should have write error but it was nil", identity)
|
||||||
|
assert.True(t, errors.Is(err, leaderelection.ErrNotLeader), "non leader client %q should have write error: %v", identity, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Equal(t, 1, leaders, "did not see leader")
|
||||||
|
assert.Equal(t, len(clients)-1, nonLeaders, "did not see non-leader")
|
||||||
|
|
||||||
|
return lease
|
||||||
|
}
|
||||||
|
|
||||||
|
func forceTransition(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
var startTransitions int32
|
||||||
|
var startTime metav1.MicroTime
|
||||||
|
|
||||||
|
errRetry := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||||
|
leaderClient := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients)
|
||||||
|
startLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||||
|
startTransitions = *startLease.Spec.LeaseTransitions
|
||||||
|
startTime = *startLease.Spec.AcquireTime
|
||||||
|
|
||||||
|
startLease = startLease.DeepCopy()
|
||||||
|
startLease.Spec.HolderIdentity = pointer.String("some-other-client" + rand.String(5))
|
||||||
|
|
||||||
|
_, err := leaderClient.Kubernetes.CoordinationV1().Leases(namespace.Name).Update(ctx, startLease, metav1.UpdateOptions{})
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
require.NoError(t, errRetry)
|
||||||
|
|
||||||
|
finalLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||||
|
finalTransitions := *finalLease.Spec.LeaseTransitions
|
||||||
|
finalTime := *finalLease.Spec.AcquireTime
|
||||||
|
|
||||||
|
require.Greater(t, finalTransitions, startTransitions)
|
||||||
|
require.Greater(t, finalTime.UnixNano(), startTime.UnixNano())
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Minute) // need to give clients time to notice this change because leader election is polling based
|
||||||
|
|
||||||
|
return finalLease
|
||||||
|
}
|
||||||
|
|
||||||
|
func forceRestart(ctx context.Context, t *testing.T, namespace *corev1.Namespace, leaseName string, clients map[string]*kubeclient.Client) *coordinationv1.Lease {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
startLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||||
|
|
||||||
|
err := pickCurrentLeaderClient(ctx, t, namespace, leaseName, clients).
|
||||||
|
Kubernetes.CoordinationV1().Leases(namespace.Name).Delete(ctx, leaseName, metav1.DeleteOptions{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
newLease := waitForIdentity(ctx, t, namespace, leaseName, clients)
|
||||||
|
require.Zero(t, *newLease.Spec.LeaseTransitions)
|
||||||
|
require.Greater(t, newLease.Spec.AcquireTime.UnixNano(), startLease.Spec.AcquireTime.UnixNano())
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Minute) // need to give clients time to notice this change because leader election is polling based
|
||||||
|
|
||||||
|
return newLease
|
||||||
|
}
|
||||||
|
|
||||||
|
func logLease(t *testing.T, lease *coordinationv1.Lease) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
bytes, err := json.MarshalIndent(lease, "", "\t")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Logf("current lease:\n%s", string(bytes))
|
||||||
|
}
|
@ -137,13 +137,19 @@ func newAnonymousClientRestConfigWithCertAndKeyAdded(t *testing.T, clientCertifi
|
|||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewKubeclientOptions(t *testing.T, config *rest.Config) []kubeclient.Option {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
return []kubeclient.Option{
|
||||||
|
kubeclient.WithConfig(config),
|
||||||
|
kubeclient.WithMiddleware(groupsuffix.New(IntegrationEnv(t).APIGroupSuffix)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewKubeclient(t *testing.T, config *rest.Config) *kubeclient.Client {
|
func NewKubeclient(t *testing.T, config *rest.Config) *kubeclient.Client {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
env := IntegrationEnv(t)
|
|
||||||
client, err := kubeclient.New(
|
client, err := kubeclient.New(NewKubeclientOptions(t, config)...)
|
||||||
kubeclient.WithConfig(config),
|
|
||||||
kubeclient.WithMiddleware(groupsuffix.New(env.APIGroupSuffix)),
|
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
@ -502,6 +508,30 @@ func CreatePod(ctx context.Context, t *testing.T, name, namespace string, spec c
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CreateNamespace(ctx context.Context, t *testing.T, name string) *corev1.Namespace {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
adminClient := NewKubernetesClientset(t)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
namespace, err := adminClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{GenerateName: name + "-integration-test-"},
|
||||||
|
}, metav1.CreateOptions{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
t.Logf("cleaning up test namespace %s", namespace.Name)
|
||||||
|
require.NoError(t, adminClient.CoreV1().Namespaces().Delete(ctx, namespace.Name, metav1.DeleteOptions{}))
|
||||||
|
})
|
||||||
|
|
||||||
|
return namespace
|
||||||
|
}
|
||||||
|
|
||||||
func WaitForUserToHaveAccess(t *testing.T, user string, groups []string, shouldHaveAccessTo *authorizationv1.ResourceAttributes) {
|
func WaitForUserToHaveAccess(t *testing.T, user string, groups []string, shouldHaveAccessTo *authorizationv1.ResourceAttributes) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
client := NewKubernetesClientset(t)
|
client := NewKubernetesClientset(t)
|
||||||
|
Loading…
Reference in New Issue
Block a user