More error handling for cluster ip

This commit is contained in:
Margo Crawford 2021-05-20 16:21:10 -07:00
parent 599d70d6dc
commit 4606f1d8bd
2 changed files with 163 additions and 20 deletions

View File

@ -368,6 +368,18 @@ func (c *impersonatorConfigController) loadBalancerNeedsUpdate(config *v1alpha1.
return false, nil
}
func (c *impersonatorConfigController) ClusterIPNeedsUpdate(config *v1alpha1.ImpersonationProxySpec) (bool, error) {
clusterIP, err := c.servicesInformer.Lister().Services(c.namespace).Get(c.generatedClusterIPServiceName)
if err != nil {
return false, err
}
// TODO this will break if anything other than pinniped is adding annotations
if !reflect.DeepEqual(clusterIP.Annotations, config.Service.Annotations) {
return true, nil
}
return false, nil
}
func (c *impersonatorConfigController) tlsSecretExists() (bool, *v1.Secret, error) {
secret, err := c.secretsInformer.Lister().Secrets(c.namespace).Get(c.tlsSecretName)
notFound := k8serrors.IsNotFound(err)
@ -536,31 +548,28 @@ func (c *impersonatorConfigController) ensureClusterIPServiceIsStarted(ctx conte
Annotations: config.Service.Annotations,
},
}
running, _ := c.clusterIPExists()
running, err := c.clusterIPExists()
if err != nil {
return err
}
if running {
needsUpdate, err := c.ClusterIPNeedsUpdate(config)
if err != nil {
return err
}
if needsUpdate {
plog.Info("updating cluster ip for impersonation proxy",
"service", c.generatedLoadBalancerServiceName,
"namespace", c.namespace)
_, err = c.k8sClient.CoreV1().Services(c.namespace).Update(ctx, &clusterIP, metav1.UpdateOptions{})
return err
}
return nil
}
// if err != nil {
// return err // TODO test this error case
//}
// if running {
// needsUpdate, err := c.ClusterIPNeedsUpdate(config) // TODO test updating annotations on clusterip
// if err != nil {
// return err
// }
// if needsUpdate {
// plog.Info("updating load balancer for impersonation proxy",
// "service", c.generatedLoadBalancerServiceName,
// "namespace", c.namespace)
// _, err = c.k8sClient.CoreV1().Services(c.namespace).Update(ctx, &loadBalancer, metav1.UpdateOptions{})
// return err
// }
// return nil
// }
plog.Info("creating cluster ip for impersonation proxy",
"service", c.generatedClusterIPServiceName,
"namespace", c.namespace)
_, err := c.k8sClient.CoreV1().Services(c.namespace).Create(ctx, &clusterIP, metav1.CreateOptions{})
_, err = c.k8sClient.CoreV1().Services(c.namespace).Create(ctx, &clusterIP, metav1.CreateOptions{})
return err
}

View File

@ -908,7 +908,7 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
return updatedLoadBalancerService
}
var requireClusterIPWasCreated = func(action coretesting.Action) {
var requireClusterIPWasCreated = func(action coretesting.Action) *corev1.Service {
createAction, ok := action.(coretesting.CreateAction)
r.True(ok, "should have been able to cast this action to CreateAction: %v", action)
r.Equal("create", createAction.GetVerb())
@ -917,6 +917,7 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
r.Equal(corev1.ServiceTypeClusterIP, createdClusterIPService.Spec.Type)
r.Equal("app-name", createdClusterIPService.Spec.Selector["app"])
r.Equal(labels, createdClusterIPService.Labels)
return createdClusterIPService
}
var requireClusterIPWasDeleted = func(action coretesting.Action) {
@ -928,6 +929,19 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
r.Equal("services", deleteAction.GetResource().Resource)
}
var requireClusterIPWasUpdated = func(action coretesting.Action) *corev1.Service {
updateAction, ok := action.(coretesting.UpdateAction)
r.True(ok, "should have been able to cast this action to UpdateAction: %v", action)
r.Equal("update", updateAction.GetVerb())
updatedLoadBalancerService := updateAction.GetObject().(*corev1.Service)
r.Equal(clusterIPServiceName, updatedLoadBalancerService.Name)
r.Equal(installedInNamespace, updatedLoadBalancerService.Namespace)
r.Equal(corev1.ServiceTypeClusterIP, updatedLoadBalancerService.Spec.Type)
r.Equal("app-name", updatedLoadBalancerService.Spec.Selector["app"])
r.Equal(labels, updatedLoadBalancerService.Labels)
return updatedLoadBalancerService
}
var requireTLSSecretWasDeleted = func(action coretesting.Action) {
deleteAction, ok := action.(coretesting.DeleteAction)
r.True(ok, "should have been able to cast this action to DeleteAction: %v", action)
@ -2340,6 +2354,67 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
})
})
when("requesting a cluster ip via CredentialIssuer, then updating the annotations", func() {
it.Before(func() {
addSecretToTrackers(signingCASecret, kubeInformerClient)
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
},
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addNodeWithRoleToTracker("worker", kubeAPIClient)
})
it("creates the cluster ip without annotations, then adds them", func() {
startInformersAndController()
// Should have started in "enabled" mode with service type load balancer, so one is created.
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 4)
requireNodesListed(kubeAPIClient.Actions()[0])
clusterIPService := requireClusterIPWasCreated(kubeAPIClient.Actions()[1])
require.Equal(t, map[string]string(nil), clusterIPService.Annotations) // there should be no annotations at first
ca := requireCASecretWasCreated(kubeAPIClient.Actions()[2])
requireTLSSecretWasCreated(kubeAPIClient.Actions()[3], ca)
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
// Simulate the informer cache's background update from its watch.
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[1], kubeInformers.Core().V1().Services())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[2], kubeInformers.Core().V1().Secrets())
addObjectFromCreateActionToInformerAndWait(kubeAPIClient.Actions()[3], kubeInformers.Core().V1().Secrets())
// Add annotations to the spec.
annotations := map[string]string{"my-annotation-key": "my-annotation-val"}
updateCredentialIssuerInInformerAndWait(credentialIssuerResourceName, v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeEnabled,
ExternalEndpoint: localhostIP,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
Annotations: annotations,
},
},
}, pinnipedInformers.Config().V1alpha1().CredentialIssuers())
r.NoError(runControllerSync())
r.Len(kubeAPIClient.Actions(), 5) // one more item to update the loadbalancer
clusterIPService = requireClusterIPWasUpdated(kubeAPIClient.Actions()[4])
require.Equal(t, annotations, clusterIPService.Annotations) // now the annotations should exist on the load balancer
requireTLSServerIsRunning(ca, testServerAddr(), nil)
requireCredentialIssuer(newSuccessStrategy(localhostIP, ca))
requireSigningCertProviderHasLoadedCerts(signingCACertPEM, signingCAKeyPEM)
})
})
when("requesting a load balancer via CredentialIssuer, then adding a static loadBalancerIP to the spec", func() {
it.Before(func() {
addSecretToTrackers(signingCASecret, kubeInformerClient)
@ -2838,6 +2913,65 @@ func TestImpersonatorConfigControllerSync(t *testing.T) {
})
})
when("there is an error creating the cluster ip", func() {
it.Before(func() {
addNodeWithRoleToTracker("worker", kubeAPIClient)
kubeAPIClient.PrependReactor("create", "services", func(action coretesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, fmt.Errorf("error on create")
})
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeAuto,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
},
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
})
it("returns an error", func() {
startInformersAndController()
r.EqualError(runControllerSync(), "error on create")
requireCredentialIssuer(newErrorStrategy("error on create"))
requireSigningCertProviderIsEmpty()
requireTLSServerIsRunningWithoutCerts()
})
})
when("there is an error updating the cluster ip", func() {
it.Before(func() {
addNodeWithRoleToTracker("worker", kubeAPIClient)
kubeAPIClient.PrependReactor("update", "services", func(action coretesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, fmt.Errorf("error on update")
})
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{
ObjectMeta: metav1.ObjectMeta{Name: credentialIssuerResourceName},
Spec: v1alpha1.CredentialIssuerSpec{
ImpersonationProxy: &v1alpha1.ImpersonationProxySpec{
Mode: v1alpha1.ImpersonationProxyModeAuto,
Service: v1alpha1.ImpersonationProxyServiceSpec{
Type: v1alpha1.ImpersonationProxyServiceTypeClusterIP,
Annotations: map[string]string{"key": "val"},
},
},
},
}, pinnipedInformerClient, pinnipedAPIClient)
addClusterIPServiceToTracker(clusterIPServiceName, localhostIP, kubeAPIClient)
addClusterIPServiceToTracker(clusterIPServiceName, localhostIP, kubeInformerClient)
})
it("returns an error", func() {
startInformersAndController()
r.EqualError(runControllerSync(), "error on update")
requireCredentialIssuer(newErrorStrategy("error on update"))
requireSigningCertProviderIsEmpty()
requireTLSServerIsRunningWithoutCerts()
})
})
when("there is an error creating the tls secret", func() {
it.Before(func() {
addCredentialIssuerToTrackers(v1alpha1.CredentialIssuer{