Carefully merge desired annotations into impersonation proxy Service

Don't overwrite annotations that might have come from a human user or
from some other non-Pinniped controller.
This commit is contained in:
Ryan Richard 2021-07-22 17:09:50 -07:00
parent ee30b78117
commit 708164b878
2 changed files with 303 additions and 26 deletions

View File

@ -8,9 +8,11 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/json"
"encoding/pem"
"fmt"
"net"
"sort"
"strings"
"time"
@ -53,6 +55,7 @@ const (
caCrtKey = "ca.crt"
caKeyKey = "ca.key"
appLabelKey = "app"
annotationKeysKey = "credentialissuer.pinniped.dev/annotation-keys"
)
type impersonatorConfigController struct {
@ -521,37 +524,109 @@ func (c *impersonatorConfigController) ensureClusterIPServiceIsStopped(ctx conte
return utilerrors.FilterOut(err, k8serrors.IsNotFound)
}
func (c *impersonatorConfigController) createOrUpdateService(ctx context.Context, service *v1.Service) error {
log := c.infoLog.WithValues("serviceType", service.Spec.Type, "service", klog.KObj(service))
existing, err := c.servicesInformer.Lister().Services(c.namespace).Get(service.Name)
func (c *impersonatorConfigController) createOrUpdateService(ctx context.Context, desiredService *v1.Service) error {
log := c.infoLog.WithValues("serviceType", desiredService.Spec.Type, "service", klog.KObj(desiredService))
// Prepare to remember which annotation keys were added from the CredentialIssuer spec, both for
// creates and for updates, in case someone removes a key from the spec in the future. We would like
// to be able to detect that the missing key means that we should remove the key. This is needed to
// differentiate it from a key that was added by another actor, which we should not remove.
// But don't bother recording the requested annotations if there were no annotations requested.
desiredAnnotationKeys := []string{}
for k := range desiredService.Annotations {
desiredAnnotationKeys = append(desiredAnnotationKeys, k)
}
if len(desiredAnnotationKeys) > 0 {
// Sort them since they come out of the map in no particular order.
sort.Strings(desiredAnnotationKeys)
keysJSONArray, err := json.Marshal(desiredAnnotationKeys)
if err != nil {
return err // This shouldn't really happen. We should always be able to marshal an array of strings.
}
if desiredService.Annotations == nil {
desiredService.Annotations = map[string]string{}
}
// Save the desired annotations to a bookkeeping annotation.
desiredService.Annotations[annotationKeysKey] = string(keysJSONArray)
}
// Get the Service from the informer, and create it if it does not already exist.
existingService, err := c.servicesInformer.Lister().Services(c.namespace).Get(desiredService.Name)
if k8serrors.IsNotFound(err) {
log.Info("creating service for impersonation proxy")
_, err := c.k8sClient.CoreV1().Services(c.namespace).Create(ctx, service, metav1.CreateOptions{})
_, err := c.k8sClient.CoreV1().Services(c.namespace).Create(ctx, desiredService, metav1.CreateOptions{})
return err
}
if err != nil {
return err
}
// Update only the specific fields that are meaningfully part of our desired state.
updated := existing.DeepCopy()
updated.ObjectMeta.Labels = service.ObjectMeta.Labels
updated.ObjectMeta.Annotations = service.ObjectMeta.Annotations
updated.Spec.LoadBalancerIP = service.Spec.LoadBalancerIP
updated.Spec.Type = service.Spec.Type
updated.Spec.Selector = service.Spec.Selector
// The Service already exists, so update only the specific fields that are meaningfully part of our desired state.
updatedService := existingService.DeepCopy()
updatedService.ObjectMeta.Labels = desiredService.ObjectMeta.Labels
updatedService.Spec.LoadBalancerIP = desiredService.Spec.LoadBalancerIP
updatedService.Spec.Type = desiredService.Spec.Type
updatedService.Spec.Selector = desiredService.Spec.Selector
// Do not simply overwrite the existing annotations with the desired annotations. Instead, merge-overwrite.
// Another actor in the system, like a human user or a non-Pinniped controller, might have updated the
// existing Service's annotations. If they did, then we do not want to overwrite those keys expect for
// the specific keys that are from the CredentialIssuer's spec, because if we overwrite keys belonging
// to another controller then we could end up infinitely flapping back and forth with the other controller,
// both updating that annotation on the Service.
if updatedService.Annotations == nil {
updatedService.Annotations = map[string]string{}
}
for k, v := range desiredService.Annotations {
updatedService.Annotations[k] = v
}
// Check if the the existing Service contains a record of previous annotations that were added by this controller.
// Note that in an upgrade, older versions of Pinniped might have created the Service without this bookkeeping annotation.
oldDesiredAnnotationKeysJSON, foundOldDesiredAnnotationKeysJSON := existingService.Annotations[annotationKeysKey]
oldDesiredAnnotationKeys := []string{}
if foundOldDesiredAnnotationKeysJSON {
_ = json.Unmarshal([]byte(oldDesiredAnnotationKeysJSON), &oldDesiredAnnotationKeys)
// In the unlikely event that we cannot parse the value of our bookkeeping annotation, just act like it
// wasn't present and update it to the new value that it should have based on the current desired state.
}
// Check if any annotations which were previously in the CredentialIssuer spec are now gone from the spec,
// which means that those now-missing annotations should get deleted.
// Nested loops are not efficient here, but these lists of annotations should be small.
for _, oldKey := range oldDesiredAnnotationKeys {
if !stringSliceContains(desiredAnnotationKeys, oldKey) {
delete(updatedService.Annotations, oldKey)
}
}
// If no annotations were requested, then remove the special bookkeeping annotation which might be
// leftover from a previous update. During the next update, non-existence will be taken to mean
// that no annotations were previously requested by the CredentialIssuer spec.
if len(desiredAnnotationKeys) == 0 {
delete(updatedService.Annotations, annotationKeysKey)
}
// If our updates didn't change anything, we're done.
if equality.Semantic.DeepEqual(existing, updated) {
if equality.Semantic.DeepEqual(existingService, updatedService) {
return nil
}
// Otherwise apply the updates.
c.infoLog.Info("updating service for impersonation proxy")
_, err = c.k8sClient.CoreV1().Services(c.namespace).Update(ctx, updated, metav1.UpdateOptions{})
_, err = c.k8sClient.CoreV1().Services(c.namespace).Update(ctx, updatedService, metav1.UpdateOptions{})
return err
}
func stringSliceContains(haystack []string, needle string) bool {
for _, s := range haystack {
if s == needle {
return true
}
}
return false
}
func (c *impersonatorConfigController) ensureTLSSecret(ctx context.Context, nameInfo *certNameInfo, ca *certauthority.CA) error {
secretFromInformer, err := c.secretsInformer.Lister().Secrets(c.namespace).Get(c.tlsSecretName)
notFound := k8serrors.IsNotFound(err)

View File

@ -785,6 +785,13 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
}
}
var addServiceToTrackers = func(service *corev1.Service, clients ...*kubernetesfake.Clientset) {
for _, client := range clients {
serviceCopy := service.DeepCopy()
r.NoError(client.Tracker().Add(serviceCopy))
}
}
var deleteServiceFromTracker = func(resourceName string, client *kubernetesfake.Clientset) {
r.NoError(client.Tracker().Delete(
schema.GroupVersionResource{Version: "v1", Resource: "services"},
@ -1644,7 +1651,6 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
})
when("credentialissuer has service type loadbalancer and custom annotations", func() {
annotations := map[string]string{"some-annotation-key": "some-annotation-value"}
it.Before(func() {
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
@ -1653,7 +1659,7 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
Mode: v1alpha1.ImpersonationProxyModeEnabled,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeLoadBalancer,
Annotations: annotations,
Annotations: map[string]string{"some-annotation-key": "some-annotation-value"},
},
},
},
@ -1667,7 +1673,10 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
r.Len(kubeAPIClient.Actions(), 3)
requireNodesListed(kubeAPIClient.Actions()[0])
lbService := requireLoadBalancerWasCreated(kubeAPIClient.Actions()[1])
require.Equal(t, lbService.Annotations, annotations)
require.Equal(t, lbService.Annotations, map[string]string{
"some-annotation-key": "some-annotation-value",
"credentialissuer.pinniped.dev/annotation-keys": `["some-annotation-key"]`,
})
requireCASecretWasCreated(kubeAPIClient.Actions()[2])
requireTLSServerIsRunningWithoutCerts()
requireCredentialIssuer(newPendingStrategyWaitingForLB())
@ -2386,20 +2395,30 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
// Simulate another actor in the system, like a human user or a non-Pinniped controller,
// updating the new Service's annotations. The map was nil, so we can overwrite the whole thing,
lbService.Annotations = map[string]string{
"annotation-from-unrelated-controller-key": "annotation-from-unrelated-controller-val",
"my-annotation-key": "my-annotation-from-unrelated-controller-val",
}
// Simulate the informer cache's background update from its watch.
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[1], kubeInformers.Core().V1().Services())
addObjectToKubeInformerAndWait(lbService, kubeInformers.Core().V1().Services())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[2], kubeInformers.Core().V1().Secrets())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[3], kubeInformers.Core().V1().Secrets())
// Add annotations to the spec.
annotations := map[string]string{"my-annotation-key": "my-annotation-val"}
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 4) // no new actions because the controller decides there is nothing to update on the Service
// Add annotations to the CredentialIssuer spec.
credentialIssuerAnnotations := map[string]string{"my-annotation-key": "my-annotation-val"}
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeLoadBalancer,
Annotations: annotations,
Annotations: credentialIssuerAnnotations,
},
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
@ -2407,7 +2426,14 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 5) // one more item to update the loadbalancer
lbService = requireLoadBalancerWasUpdated(kubeAPIClient.Actions()[4])
require.Equal(t, annotations, lbService.Annotations) // now the annotations should exist on the load balancer
require.Equal(t, map[string]string{
// Now the CredentialIssuer annotations should be merged on the load balancer.
// In the unlikely case where keys conflict, the CredentialIssuer value overwrites the other value.
// Otherwise the annotations from the other actor should not be modified.
"annotation-from-unrelated-controller-key": "annotation-from-unrelated-controller-val",
"my-annotation-key": "my-annotation-val",
"credentialissuer.pinniped.dev/annotation-keys": `["my-annotation-key"]`,
}, lbService.Annotations)
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
@ -2447,20 +2473,30 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
// Simulate another actor in the system, like a human user or a non-Pinniped controller,
// updating the new Service's annotations.
clusterIPService.Annotations = map[string]string{
"annotation-from-unrelated-controller-key": "annotation-from-unrelated-controller-val",
"my-annotation-key": "my-annotation-from-unrelated-controller-val",
}
// Simulate the informer cache's background update from its watch.
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[1], kubeInformers.Core().V1().Services())
addObjectToKubeInformerAndWait(clusterIPService, kubeInformers.Core().V1().Services())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[2], kubeInformers.Core().V1().Secrets())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[3], kubeInformers.Core().V1().Secrets())
// Add annotations to the spec.
annotations := map[string]string{"my-annotation-key": "my-annotation-val"}
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 4) // no new actions because the controller decides there is nothing to update on the Service
// Add annotations to the CredentialIssuer spec.
credentialIssuerAnnotations := map[string]string{"my-annotation-key": "my-annotation-val"}
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
Annotations: annotations,
Annotations: credentialIssuerAnnotations,
},
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
@ -2468,7 +2504,173 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 5) // one more item to update the loadbalancer
clusterIPService = requireClusterIPWasUpdated(kubeAPIClient.Actions()[4])
require.Equal(t, annotations, clusterIPService.Annotations) // now the annotations should exist on the load balancer
require.Equal(t, map[string]string{
// Now the CredentialIssuer annotations should be merged on the load balancer.
// In the unlikely case where keys conflict, the CredentialIssuer value overwrites the other value.
// Otherwise the annotations from the other actor should not be modified.
"annotation-from-unrelated-controller-key": "annotation-from-unrelated-controller-val",
"my-annotation-key": "my-annotation-val",
"credentialissuer.pinniped.dev/annotation-keys": `["my-annotation-key"]`,
}, clusterIPService.Annotations)
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
})
})
when("requesting a load balancer via CredentialIssuer with annotations, then updating the CredentialIssuer annotations to remove one", func() {
it.Before(func() {
addSecretToTrackers(signingCASecret, kubeInformerClient)
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeLoadBalancer,
Annotations: map[string]string{
"my-initial-annotation1-key": "my-initial-annotation1-val",
"my-initial-annotation2-key": "my-initial-annotation2-val",
"my-initial-annotation3-key": "my-initial-annotation3-val",
},
},
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addNodeWithRoleToTracker("worker", kubeAPIClient)
})
it("creates the load balancer with annotations, then removes the removed annotation", func() {
startInformersAndController()
// Should have started in "enabled" mode with service type load balancer, so one is created.
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 4)
requireNodesListed(kubeAPIClient.Actions()[0])
lbService := requireLoadBalancerWasCreated(kubeAPIClient.Actions()[1])
require.Equal(t, map[string]string{
"my-initial-annotation1-key": "my-initial-annotation1-val",
"my-initial-annotation2-key": "my-initial-annotation2-val",
"my-initial-annotation3-key": "my-initial-annotation3-val",
"credentialissuer.pinniped.dev/annotation-keys": `["my-initial-annotation1-key","my-initial-annotation2-key","my-initial-annotation3-key"]`,
}, lbService.Annotations) // there should be some annotations at first
ca := requireCASecretWasCreated(kubeAPIClient.Actions()[2])
requireTLSSecretWasCreated(kubeAPIClient.Actions()[3], ca)
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
// Simulate another actor in the system, like a human user or a non-Pinniped controller,
// updating the new Service to add another annotation.
lbService.Annotations["annotation-from-unrelated-controller-key"] = "annotation-from-unrelated-controller-val"
// Simulate the informer cache's background update from its watch.
addObjectToKubeInformerAndWait(lbService, kubeInformers.Core().V1().Services())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[2], kubeInformers.Core().V1().Secrets())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[3], kubeInformers.Core().V1().Secrets())
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 4) // no new actions because the controller decides there is nothing to update on the Service
// Remove one of the annotations from the CredentialIssuer spec.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeLoadBalancer,
Annotations: map[string]string{
"my-initial-annotation1-key": "my-initial-annotation1-val",
"my-initial-annotation3-key": "my-initial-annotation3-val",
},
},
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 5) // one more item to update the loadbalancer
lbService = requireLoadBalancerWasUpdated(kubeAPIClient.Actions()[4])
require.Equal(t, map[string]string{
// Now the CredentialIssuer annotations should be merged on the load balancer.
// Since the user removed the "my-initial-annotation2-key" key from the CredentialIssuer spec,
// it should be removed from the Service.
// The annotations from the other actor should not be modified.
"annotation-from-unrelated-controller-key": "annotation-from-unrelated-controller-val",
"my-initial-annotation1-key": "my-initial-annotation1-val",
"my-initial-annotation3-key": "my-initial-annotation3-val",
"credentialissuer.pinniped.dev/annotation-keys": `["my-initial-annotation1-key","my-initial-annotation3-key"]`,
}, lbService.Annotations)
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
// Remove all the rest of the annotations from the CredentialIssuer spec so there are none remaining.
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeLoadBalancer,
Annotations: map[string]string{},
},
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 6) // one more item to update the loadbalancer
lbService = requireLoadBalancerWasUpdated(kubeAPIClient.Actions()[5])
require.Equal(t, map[string]string{
// Since the user removed all annotations from the CredentialIssuer spec,
// they should all be removed from the Service, along with the special bookkeeping annotation too.
// The annotations from the other actor should not be modified.
"annotation-from-unrelated-controller-key": "annotation-from-unrelated-controller-val",
}, lbService.Annotations)
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
})
})
when("requesting a load balancer via CredentialIssuer, but there is already a load balancer with an invalid bookkeeping annotation value", func() {
it.Before(func() {
addSecretToTrackers(signingCASecret, kubeInformerClient)
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeLoadBalancer,
Annotations: map[string]string{"some-annotation": "annotation-value"},
},
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addNodeWithRoleToTracker("worker", kubeAPIClient)
// Add a Service with a messed up bookkeeping annotation.
loadBalancerService := newLoadBalancerService(loadBalancerServiceName, corev1.ServiceStatus{})
loadBalancerService.Annotations = map[string]string{
annotationKeysKey: `["this is not valid json`,
}
addServiceToTrackers(loadBalancerService, kubeInformerClient, kubeAPIClient)
})
it("just acts like the annotation wasn't present since that is better than becoming inoperable", func() {
startInformersAndController()
// Should have started in "enabled" mode with service type load balancer, so one is created.
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 4)
requireNodesListed(kubeAPIClient.Actions()[0])
lbService := requireLoadBalancerWasUpdated(kubeAPIClient.Actions()[1])
require.Equal(t, map[string]string{
"some-annotation": "annotation-value",
"credentialissuer.pinniped.dev/annotation-keys": `["some-annotation"]`,
}, lbService.Annotations)
ca := requireCASecretWasCreated(kubeAPIClient.Actions()[2])
requireTLSSecretWasCreated(kubeAPIClient.Actions()[3], ca)
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)